[KYUUBI #5375] JDBC Engine supports PostgreSQL

### _Why are the changes needed?_

Close #5375

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5416 from ymZhao1001/jdbc-pg-dialect.

Closes #5375

d81988d84 [zhaoyangming] postgreSQL
915f9fb0a [yangming] change to like
d8da12af5 [yangming] reformat
29c63e38e [zhaoyangming] add postgresql dependency
ec328ad93 [zhaoyangming] add postgresql dependency
a8944fed5 [zhaoyangming] update postgresql to postgreSQL
cf7b69107 [zhaoyangming] Merge remote-tracking branch 'origin/jdbc-pg-dialect' into jdbc-pg-dialect
c127aa3d3 [zhaoyangming] update postgresql to postgreSQL
a693d6c34 [yangming] reformat
0d12a6ceb [zhaoyangming] add postgresql dependency
c7d3fa3da [yangming] fix conflict
dde1564b6 [zhaoyangming] add test info
2a49b338a [zhaoyangming] style
c8ce15f29 [zhaoyangming] StringBuilder is redundant.
5d70173cf [yangming] JDBC Engine supports PostgreSQL

Lead-authored-by: zhaoyangming <zhaoyangming@deepexi.com>
Co-authored-by: yangming <261635393@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
zhaoyangming 2023-11-08 19:42:49 +08:00 committed by Cheng Pan
parent 0210d54796
commit 456b3c3f06
27 changed files with 963 additions and 16 deletions

View File

@ -76,6 +76,12 @@
<artifactId>phoenix-queryserver-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -16,4 +16,5 @@
#
org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider

View File

@ -16,4 +16,5 @@
#
org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect

View File

@ -41,11 +41,11 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
throw KyuubiSQLException.featureNotSupported()
}
def getCatalogsOperation(session: Session): Operation = {
def getCatalogsOperation(): String = {
throw KyuubiSQLException.featureNotSupported()
}
def getSchemasOperation(session: Session): Operation = {
def getSchemasOperation(catalog: String, schema: String): String = {
throw KyuubiSQLException.featureNotSupported()
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, ResultSet, Statement}
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.postgresql.{PostgreSQLRowSetHelper, PostgreSQLSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class PostgreSQLDialect extends JdbcDialect {
override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
if (connection.getAutoCommit) {
statement.setFetchSize(fetchSize)
}
statement
}
override def getCatalogsOperation(): String = "SELECT CATALOG_NAME " +
"FROM INFORMATION_SCHEMA.INFORMATION_SCHEMA_CATALOG_NAME"
override def getSchemasOperation(
catalog: String,
schema: String): String = {
val query = new StringBuilder(
s"""
|SELECT CATALOG_NAME, SCHEMA_NAME, SCHEMA_OWNER,
|DEFAULT_CHARACTER_SET_CATALOG, DEFAULT_CHARACTER_SET_SCHEMA,
|DEFAULT_CHARACTER_SET_NAME, SQL_PATH
|FROM INFORMATION_SCHEMA.SCHEMATA
|""".stripMargin)
val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"catalog_name LIKE '$catalog'"
}
if (StringUtils.isNotBlank(schema)) {
filters += s"schema_name LIKE '$schema'"
}
if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}
query.toString()
}
override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("BASE TABLE", "VIEW")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE,
|SELF_REFERENCING_COLUMN_NAME, REFERENCE_GENERATION, USER_DEFINED_TYPE_CATALOG,
|USER_DEFINED_TYPE_SCHEMA,USER_DEFINED_TYPE_NAME,
|IS_INSERTABLE_INTO,IS_TYPED,COMMIT_ACTION
|FROM INFORMATION_SCHEMA.TABLES
|""".stripMargin)
val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"$TABLE_CATALOG LIKE '$catalog'"
}
if (StringUtils.isNotBlank(schema)) {
filters += s"$TABLE_SCHEMA LIKE '$schema'"
}
if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (tTypes.nonEmpty) {
filters += s"(${
tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")
})"
}
if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}
query.toString()
}
override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION,
|COLUMN_DEFAULT, IS_NULLABLE, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH,
|CHARACTER_OCTET_LENGTH, NUMERIC_PRECISION, NUMERIC_PRECISION_RADIX,
|NUMERIC_SCALE, DATETIME_PRECISION, INTERVAL_TYPE, INTERVAL_PRECISION,
|CHARACTER_SET_CATALOG, CHARACTER_SET_SCHEMA, CHARACTER_SET_NAME,
|COLLATION_CATALOG, COLLATION_SCHEMA, COLLATION_NAME, DOMAIN_CATALOG,
|DOMAIN_SCHEMA, DOMAIN_NAME, UDT_CATALOG, UDT_SCHEMA, UDT_NAME, SCOPE_CATALOG,
|SCOPE_SCHEMA, SCOPE_NAME, MAXIMUM_CARDINALITY, DTD_IDENTIFIER,
|IS_SELF_REFERENCING, IS_IDENTITY, IDENTITY_GENERATION, IDENTITY_START,
|IDENTITY_INCREMENT, IDENTITY_MAXIMUM, IDENTITY_MINIMUM, IDENTITY_CYCLE,
|IS_GENERATED, GENERATION_EXPRESSION, IS_UPDATABLE
|FROM INFORMATION_SCHEMA.COLUMNS
|""".stripMargin)
val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CATALOG LIKE '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEMA LIKE '$schemaName'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}
if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}
query.toString()
}
override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def getRowSetHelper(): RowSetHelper = {
new PostgreSQLRowSetHelper
}
override def getSchemaHelper(): SchemaHelper = {
new PostgreSQLSchemaHelper
}
override def name(): String = {
"postgresql"
}
}

View File

@ -60,16 +60,20 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
}
override def newGetCatalogsOperation(session: Session): Operation = {
val operation = dialect.getCatalogsOperation(session)
addOperation(operation)
val query = dialect.getCatalogsOperation()
val executeStatement =
new ExecuteStatement(session, query, false, 0L, true)
addOperation(executeStatement)
}
override def newGetSchemasOperation(
session: Session,
catalog: String,
schema: String): Operation = {
val operation = dialect.getSchemasOperation(session)
addOperation(operation)
val query = dialect.getSchemasOperation(catalog, schema)
val executeStatement =
new ExecuteStatement(session, query, false, 0L, true)
addOperation(executeStatement)
}
override def newGetTablesOperation(

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider
class PostgreSQLConnectionProvider extends JdbcConnectionProvider {
override val name: String = classOf[PostgreSQLConnectionProvider].getSimpleName
override val driverClass: String = "org.postgresql.Driver"
override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
class PostgreSQLRowSetHelper extends RowSetHelper {
override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
class PostgreSQLSchemaHelper extends SchemaHelper {
override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE
}

View File

@ -26,7 +26,7 @@ class OperationWithEngineSuite extends DorisOperationSuite with HiveJDBCTestHelp
override protected def jdbcUrl: String = jdbcConnectionUrl
test("Test for Jdbc engine getInfo") {
test("doris - test for Jdbc engine getInfo") {
val metaData = ConnectionProvider.create(kyuubiConf).getMetaData
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
@ -60,7 +60,7 @@ class OperationWithEngineSuite extends DorisOperationSuite with HiveJDBCTestHelp
}
}
test("JDBC ExecuteStatement operation should contain operationLog") {
test("doris - JDBC ExecuteStatement operation should contain operationLog") {
withSessionHandle { (client, handle) =>
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(handle)

View File

@ -20,7 +20,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SessionSuite extends WithDorisEngine with HiveJDBCTestHelper {
test("test session") {
test("doris - test session") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"select '1' as id")

View File

@ -22,7 +22,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class StatementSuite extends WithDorisEngine with HiveJDBCTestHelper {
test("test select") {
test("doris - test select") {
withJdbcStatement("test1") { statement =>
statement.execute("create database if not exists db1")
statement.execute("use db1")
@ -44,7 +44,7 @@ class StatementSuite extends WithDorisEngine with HiveJDBCTestHelper {
}
}
test("test types") {
test("doris - test types") {
withJdbcStatement("test1") { statement =>
statement.execute("create database if not exists db1")
statement.execute("use db1")

View File

@ -26,7 +26,7 @@ class OperationWithPhoenixEngineSuite extends PhoenixOperationSuite with HiveJDB
override protected def jdbcUrl: String = jdbcConnectionUrl
test("Test for Jdbc engine getInfo") {
test("phoenix - test for Jdbc engine getInfo") {
val metaData = ConnectionProvider.create(kyuubiConf).getMetaData
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {

View File

@ -20,7 +20,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SessionSuite extends WithPhoenixEngine with HiveJDBCTestHelper {
test("test session") {
test("phoenix - test session") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"select '1' as id")

View File

@ -22,7 +22,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class StatementSuite extends WithPhoenixEngine with HiveJDBCTestHelper {
test("test select") {
test("phoenix - test select") {
withJdbcStatement("test1") { statement =>
statement.execute("create table db1.test1(id bigint primary key, " +
"name varchar(255), age integer)")

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class OperationWithPostgreSQLEngineSuite extends PostgreSQLOperationSuite with HiveJDBCTestHelper {
override protected def jdbcUrl: String = jdbcConnectionUrl
test("postgreSQL - test for Jdbc engine getInfo") {
val metaData = ConnectionProvider.create(kyuubiConf).getMetaData
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue == metaData.getDatabaseProductName)
val req2 = new TGetInfoReq()
req2.setSessionHandle(handle)
req2.setInfoType(TGetInfoType.CLI_DBMS_VER)
assert(
client.GetInfo(req2).getInfoValue.getStringValue == metaData.getDatabaseProductVersion)
val req3 = new TGetInfoReq()
req3.setSessionHandle(handle)
req3.setInfoType(TGetInfoType.CLI_MAX_COLUMN_NAME_LEN)
assert(client.GetInfo(req3).getInfoValue.getLenValue == metaData.getMaxColumnNameLength)
val req4 = new TGetInfoReq()
req4.setSessionHandle(handle)
req4.setInfoType(TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN)
assert(client.GetInfo(req4).getInfoValue.getLenValue == metaData.getMaxSchemaNameLength)
val req5 = new TGetInfoReq()
req5.setSessionHandle(handle)
req5.setInfoType(TGetInfoType.CLI_MAX_TABLE_NAME_LEN)
assert(client.GetInfo(req5).getInfoValue.getLenValue == metaData.getMaxTableNameLength)
}
}
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import java.sql.ResultSet
import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
abstract class PostgreSQLOperationSuite extends WithPostgreSQLEngine with HiveJDBCTestHelper {
test("postgreSQL - get catalog") {
case class Catalog(catalog: String)
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val resultBuffer = ArrayBuffer[Catalog]()
val catalogs = meta.getCatalogs
while (catalogs.next()) {
resultBuffer +=
Catalog(catalogs.getString("catalog_name"))
}
assert(resultBuffer.contains(Catalog("postgres")))
resultBuffer.clear()
}
}
test("postgreSQL - get schemas") {
case class Schema(catalog: String, schema: String)
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val resultBuffer = ArrayBuffer[Schema]()
val schemas = meta.getSchemas
while (schemas.next()) {
resultBuffer +=
Schema(schemas.getString("catalog_name"), schemas.getString("schema_name"))
}
assert(resultBuffer.contains(Schema("postgres", "information_schema")))
resultBuffer.clear()
}
}
test("postgreSQL - get tables") {
case class Table(catalog: String, schema: String, tableName: String, tableType: String)
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val resultBuffer = ArrayBuffer[Table]()
var tables = meta.getTables(null, null, null, null)
while (tables.next()) {
resultBuffer +=
Table(
null,
null,
tables.getString(TABLE_NAME),
tables.getString(TABLE_TYPE))
}
assert(resultBuffer.contains(Table(null, null, "pg_statistic", "BASE TABLE")))
assert(resultBuffer.contains(Table(null, null, "pg_roles", "VIEW")))
resultBuffer.clear()
statement.execute("create table public.test1(id bigint primary key)")
statement.execute("create table public.test2(id bigint primary key)")
tables = meta.getTables(null, null, "test1", Array("BASE TABLE"))
while (tables.next()) {
val table = Table(
null,
null,
tables.getString(TABLE_NAME),
tables.getString(TABLE_TYPE))
assert(table == Table(null, null, "test1", "BASE TABLE"))
}
tables = meta.getTables(null, null, "test2", null)
while (tables.next()) {
resultBuffer += Table(
null,
null,
tables.getString(TABLE_NAME),
tables.getString(TABLE_TYPE))
}
assert(resultBuffer.contains(Table(null, null, "test2", "BASE TABLE")))
resultBuffer.clear()
tables = meta.getTables(null, null, null, Array("BASE TABLE"))
while (tables.next()) {
resultBuffer += Table(
null,
null,
tables.getString(TABLE_NAME),
tables.getString(TABLE_TYPE))
}
assert(resultBuffer.contains(Table(null, null, "test1", "BASE TABLE")))
assert(resultBuffer.contains(Table(null, null, "test2", "BASE TABLE")))
resultBuffer.clear()
tables = meta.getTables(null, null, null, Array("BASE TABLE", "VIEW"))
while (tables.next()) {
resultBuffer += Table(
null,
null,
tables.getString(TABLE_NAME),
tables.getString(TABLE_TYPE))
}
assert(resultBuffer.contains(Table(null, null, "test1", "BASE TABLE")))
assert(resultBuffer.contains(Table(null, null, "test2", "BASE TABLE")))
assert(resultBuffer.contains(Table(null, null, "pg_shadow", "VIEW")))
assert(resultBuffer.contains(Table(null, null, "pg_roles", "VIEW")))
resultBuffer.clear()
statement.execute("drop table public.test1")
statement.execute("drop table public.test2")
}
}
test("postgreSQL - get columns") {
case class Column(tableName: String, columnName: String)
def buildColumn(resultSet: ResultSet): Column = {
val tableName = resultSet.getString(TABLE_NAME)
val columnName = resultSet.getString(COLUMN_NAME)
val column = Column(tableName, columnName)
column
}
withJdbcStatement() { statement =>
val metadata = statement.getConnection.getMetaData
statement.execute("create table if not exists public.test1" +
"(id bigint primary key, str1 varchar, str2 varchar, age integer)")
statement.execute("create table if not exists public.test2" +
"(id bigint primary key, str1 varchar, str2 varchar, age integer)")
val resultBuffer = ArrayBuffer[Column]()
val resultSet1 = metadata.getColumns(null, null, null, null)
while (resultSet1.next()) {
val column = buildColumn(resultSet1)
resultBuffer += column
}
assert(resultBuffer.contains(Column("test1", "id")))
assert(resultBuffer.contains(Column("test1", "str1")))
assert(resultBuffer.contains(Column("test1", "str2")))
assert(resultBuffer.contains(Column("test1", "age")))
assert(resultBuffer.contains(Column("test2", "id")))
assert(resultBuffer.contains(Column("test2", "str1")))
assert(resultBuffer.contains(Column("test2", "str2")))
assert(resultBuffer.contains(Column("test2", "age")))
resultBuffer.clear()
val resultSet2 = metadata.getColumns(null, null, "test1", null)
while (resultSet2.next()) {
val column = buildColumn(resultSet2)
resultBuffer += column
}
assert(resultBuffer.contains(Column("test1", "id")))
assert(resultBuffer.contains(Column("test1", "str1")))
assert(resultBuffer.contains(Column("test1", "str2")))
assert(resultBuffer.contains(Column("test1", "age")))
resultBuffer.clear()
val resultSet3 = metadata.getColumns(null, null, null, "age")
while (resultSet3.next()) {
val column = buildColumn(resultSet3)
resultBuffer += column
}
assert(resultBuffer.contains(Column("test1", "age")))
assert(resultBuffer.contains(Column("test2", "age")))
resultBuffer.clear()
val resultSet4 = metadata.getColumns(null, null, "t%1", "str%")
while (resultSet4.next()) {
val column = buildColumn(resultSet4)
resultBuffer += column
}
assert(resultBuffer.contains(Column("test1", "str1")))
resultBuffer.clear()
val resultSet5 = metadata.getColumns(null, null, "t%1", "fake")
assert(!resultSet5.next())
statement.execute("drop table public.test1")
statement.execute("drop table public.test2")
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SessionSuite extends WithPostgreSQLEngine with HiveJDBCTestHelper {
test("postgreSQL - test session") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"select '1' as id")
val metadata = resultSet.getMetaData
for (i <- 1 to metadata.getColumnCount) {
assert(metadata.getColumnName(i) == "id")
}
while (resultSet.next()) {
val id = resultSet.getObject(1)
assert(id == "1")
}
}
}
override protected def jdbcUrl: String = jdbcConnectionUrl
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import java.sql.{Date, Timestamp}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class StatementSuite extends WithPostgreSQLEngine with HiveJDBCTestHelper {
test("postgreSQL - test select") {
withJdbcStatement("test1") { statement =>
statement.execute("create table public.test1(id bigint primary key, " +
"name varchar(255), age integer)")
statement.execute("insert into public.test1 values(1, 'a', 11)")
val resultSet1 = statement.executeQuery("select * from public.test1")
while (resultSet1.next()) {
val id = resultSet1.getObject(1)
assert(id == 1)
val name = resultSet1.getObject(2)
assert(name == "a")
val age = resultSet1.getObject(3)
assert(age == 11)
}
}
}
test("postgreSQL - test types") {
withJdbcStatement("type_test") { statement =>
statement.execute("create table public.type_test(" +
"id bigint primary key, " +
"smallint_col smallint, " +
"int_col integer, " +
"bigint_col bigint, " +
"date_col date, " +
"timestamp_col timestamp, " +
"char_col char(10), " +
"varchar_col varchar(255), " +
"boolean_col boolean, " +
"double_col double precision, " +
"float_col float)")
statement.execute("insert into public.type_test" +
"(id, " +
"smallint_col, " +
"int_col, " +
"bigint_col, " +
"date_col, " +
"timestamp_col, " +
"char_col, " +
"varchar_col, " +
"boolean_col, " +
"double_col, " +
"float_col) " +
"VALUES (1, " +
"2, " +
"3, " +
"4, " +
"'2022-05-08', " +
"'2022-05-08 17:47:45'," +
"'a', " +
"'Hello', " +
"true, " +
"8.8, " +
"9.9)")
val resultSet1 = statement.executeQuery("select * from public.type_test")
while (resultSet1.next()) {
val id = resultSet1.getObject(1)
assert(resultSet1.getObject(1) == 1)
assert(resultSet1.getObject(2) == 2)
assert(resultSet1.getObject(3) == 3)
assert(resultSet1.getObject(4) == 4)
assert(resultSet1.getObject(5) == Date.valueOf("2022-05-08"))
assert(resultSet1.getObject(6) == Timestamp.valueOf("2022-05-08 17:47:45"))
assert(resultSet1.getString(7).trim == "a")
assert(resultSet1.getObject(8) == "Hello")
assert(resultSet1.getObject(9) == true)
assert(resultSet1.getObject(10) == 8.8)
assert(resultSet1.getObject(11) == 9.9)
}
}
}
override protected def jdbcUrl: String = jdbcConnectionUrl
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import com.dimafeng.testcontainers.{GenericContainer, SingleContainer}
import org.testcontainers.containers.wait.strategy.Wait
import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer
trait WithPostgreSQLContainer extends WithJdbcServerContainer {
private val POSTGRESQL_PORT = 5432
private val postgreSQLDockerImage = "postgres"
override val container: SingleContainer[_] = GenericContainer(
dockerImage = postgreSQLDockerImage,
exposedPorts = Seq(POSTGRESQL_PORT),
env = Map[String, String](
"POSTGRES_PASSWORD" -> "postgres"),
waitStrategy = Wait.forListeningPort)
protected def queryUrl: String = {
val queryServerHost: String = container.host
val queryServerPort: Int = container.mappedPort(POSTGRESQL_PORT)
val url = s"$queryServerHost:$queryServerPort"
url
}
override def afterAll(): Unit = {
super.afterAll()
container.close()
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.postgresql
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine
trait WithPostgreSQLEngine extends WithJdbcEngine with WithPostgreSQLContainer {
override def withKyuubiConf: Map[String, String] = Map(
ENGINE_SHARE_LEVEL.key -> "SERVER",
ENGINE_JDBC_CONNECTION_URL.key -> s"jdbc:postgresql://$queryUrl/postgres",
ENGINE_JDBC_CONNECTION_USER.key -> "postgres",
ENGINE_JDBC_CONNECTION_PASSWORD.key -> "postgres",
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "postgresql",
ENGINE_JDBC_DRIVER_CLASS.key -> "org.postgresql.Driver")
}

View File

@ -108,6 +108,13 @@
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.jdbc.postgresql
import org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLOperationSuite
class OperationWithServerSuite extends PostgreSQLOperationSuite
with WithKyuubiServerAndPostgreSQLContainer {
override protected def jdbcUrl: String = getJdbcUrl
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.jdbc.postgresql
import org.apache.kyuubi.engine.jdbc.postgresql.SessionSuite
class SessionWithServerSuite extends SessionSuite
with WithKyuubiServerAndPostgreSQLContainer {
override protected def jdbcUrl: String = getJdbcUrl
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.jdbc.postgresql
import org.apache.kyuubi.engine.jdbc.postgresql.StatementSuite
class StatementWithServerSuite extends StatementSuite
with WithKyuubiServerAndPostgreSQLContainer {
override protected def jdbcUrl: String = getJdbcUrl
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.jdbc.postgresql
import java.nio.file.{Files, Path, Paths}
import java.time.Duration
import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_JDBC_EXTRA_CLASSPATH, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.jdbc.postgresql.WithPostgreSQLEngine
trait WithKyuubiServerAndPostgreSQLContainer extends WithKyuubiServer with WithPostgreSQLEngine {
private val kyuubiHome: String = Utils
.getCodeSourceLocation(getClass).split("integration-tests").head
private val postgresqlJdbcConnectorPath: String = {
val keyword = "postgresql"
val jarsDir = Paths.get(kyuubiHome)
.resolve("integration-tests")
.resolve("kyuubi-jdbc-it")
.resolve("target")
Files.list(jarsDir)
.filter { p: Path => p.getFileName.toString contains keyword }
.findFirst
.orElseThrow { () => new IllegalStateException(s"Can not find $keyword in $jarsDir.") }
.toAbsolutePath
.toString
}
override protected val conf: KyuubiConf = {
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_JDBC_EXTRA_CLASSPATH, postgresqlJdbcConnectorPath)
.set(ENGINE_IDLE_TIMEOUT, Duration.ofMinutes(1).toMillis)
}
override def beforeAll(): Unit = {
val configs = withKyuubiConf
configs.foreach(config => conf.set(config._1, config._2))
super.beforeAll()
}
}

View File

@ -189,6 +189,7 @@
<paimon.spark.binary.version>${spark.binary.version}</paimon.spark.binary.version>
<parquet.version>1.10.1</parquet.version>
<phoenix.version>6.0.0</phoenix.version>
<postgresql.version>42.6.0</postgresql.version>
<prometheus.version>0.16.0</prometheus.version>
<protobuf.version>3.21.7</protobuf.version>
<py4j.version>0.10.7</py4j.version>
@ -1335,6 +1336,12 @@
<version>${phoenix.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>