[KYUUBI #2962] [SUB-TASK][KPIP-4] Throw exception if the metadata update count is zero
### _Why are the changes needed?_ Throw exception if the metadata update count is zero, in case that the pre insert metadata operation failed. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2962 from turboFei/retry_on_created. Closes #2962 3610be91 [Fei Wang] If update count is zero, throw exception 25d957ca [Fei Wang] Revert "If the metadata does not be created successfully, put into retry queue" 24cdc7e0 [Fei Wang] If the metadata does not be created successfully, put into retry queue Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
parent
6e4b5582da
commit
10affbf64e
@ -269,9 +269,14 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|
||||
queryBuilder.append(" WHERE identifier = ? ")
|
||||
params += metadata.identifier
|
||||
|
||||
val query = queryBuilder.toString()
|
||||
withConnection() { connection =>
|
||||
execute(connection, queryBuilder.toString(), params: _*)
|
||||
|
||||
withUpdateCount(connection, query, params: _*) { updateCount =>
|
||||
if (updateCount == 0) {
|
||||
throw new KyuubiException(
|
||||
s"Error updating metadata for ${metadata.identifier} with $query")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -397,6 +402,26 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|
||||
}
|
||||
}
|
||||
|
||||
private def withUpdateCount[T](
|
||||
conn: Connection,
|
||||
sql: String,
|
||||
params: Any*)(f: Int => T): T = {
|
||||
debug(s"executing sql $sql with update count")
|
||||
var statement: PreparedStatement = null
|
||||
try {
|
||||
statement = conn.prepareStatement(sql)
|
||||
setStatementParams(statement, params: _*)
|
||||
f(statement.executeUpdate())
|
||||
} catch {
|
||||
case e: SQLException =>
|
||||
throw new KyuubiException(e.getMessage, e)
|
||||
} finally {
|
||||
if (statement != null) {
|
||||
Utils.tryLogNonFatalError(statement.close())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def setStatementParams(statement: PreparedStatement, params: Any*): Unit = {
|
||||
params.zipWithIndex.foreach { case (param, index) =>
|
||||
param match {
|
||||
|
||||
@ -22,7 +22,7 @@ import java.util.UUID
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
|
||||
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._
|
||||
@ -266,4 +266,11 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
|
||||
assert(jdbcMetadataStore.getMetadata(batchId, true) == null)
|
||||
}
|
||||
}
|
||||
|
||||
test("throw exception if update count is 0") {
|
||||
val metadata = Metadata(identifier = UUID.randomUUID().toString, state = "RUNNING")
|
||||
intercept[KyuubiException] {
|
||||
jdbcMetadataStore.updateMetadata(metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user