[KYUUBI #5407][AUTHZ] Tests for Iceberg system procedures of snapshot management
### _Why are the changes needed?_ To close #5407 . Follow up for https://github.com/apache/kyuubi/pull/5248 . Add some UT for snapshot management procedures. These procedures require alter permissions. 1. rollback_to_snapshot (https://iceberg.apache.org/docs/latest/spark-procedures/#rollback_to_snapshot): Usage: `CALL catalog_name.system.rollback_to_snapshot('db.sample', 1) ` Meaning: rollback a table to a specific snapshot ID. 2. rollback_to_timestamp (https://iceberg.apache.org/docs/latest/spark-procedures/#rollback_to_timestamp) Usage: `CALL catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00')` Meaning: rollback the table to the latest snapshot less than time. 3. set_current_snapshot (https://iceberg.apache.org/docs/latest/spark-procedures/#set_current_snapshot) Usage: `CALL catalog_name.system.set_current_snapshot('db.sample', 1)` Meaning: Set a table to a specific snapshot ID. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5394 from yabola/add_call. Closes #5407 98b7492e0 [Bowen Liang] split into 3 ut for snapshot management 23fe49ae4 [Bowen Liang] refactor ut 8ba97c6ef [chenliang.lu] Add UT for checking previleges in iceberg call snapshot management Lead-authored-by: chenliang.lu <marssss2929@gmail.com> Co-authored-by: Bowen Liang <liangbowen@gf.com.cn> Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
This commit is contained in:
parent
98b74d2ad0
commit
fd69c6ee1d
@ -16,11 +16,15 @@
|
||||
*/
|
||||
package org.apache.kyuubi.plugin.spark.authz.ranger
|
||||
|
||||
// scalastyle:off
|
||||
import java.sql.Timestamp
|
||||
import java.text.SimpleDateFormat
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.scalatest.Outcome
|
||||
|
||||
// scalastyle:off
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
|
||||
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
|
||||
@ -281,4 +285,64 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private def prepareExampleIcebergTable(table: String, initSnapshots: Int): Unit = {
|
||||
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $table (id int, name string) USING iceberg"))
|
||||
(0 until initSnapshots).foreach(i =>
|
||||
doAs(admin, sql(s"INSERT INTO $table VALUES ($i, 'user_$i')")))
|
||||
}
|
||||
|
||||
private def getFirstSnapshot(table: String): Row = {
|
||||
val existedSnapshots =
|
||||
sql(s"SELECT * FROM $table.snapshots ORDER BY committed_at ASC LIMIT 1").collect()
|
||||
existedSnapshots(0)
|
||||
}
|
||||
|
||||
test("CALL rollback_to_snapshot") {
|
||||
val tableName = "table_rollback_to_snapshot"
|
||||
val table = s"$catalogV2.$namespace1.$tableName"
|
||||
withCleanTmpResources(Seq((table, "table"))) {
|
||||
prepareExampleIcebergTable(table, 2)
|
||||
val targetSnapshotId = getFirstSnapshot(table).getAs[Long]("snapshot_id")
|
||||
val callRollbackToSnapshot =
|
||||
s"CALL $catalogV2.system.rollback_to_snapshot (table => '$table', snapshot_id => $targetSnapshotId)"
|
||||
|
||||
interceptContains[AccessControlException](doAs(someone, sql(callRollbackToSnapshot)))(
|
||||
s"does not have [alter] privilege on [$namespace1/$tableName]")
|
||||
doAs(admin, sql(callRollbackToSnapshot))
|
||||
}
|
||||
}
|
||||
|
||||
test("CALL rollback_to_timestamp") {
|
||||
val tableName = "table_rollback_to_timestamp"
|
||||
val table = s"$catalogV2.$namespace1.$tableName"
|
||||
withCleanTmpResources(Seq((table, "table"))) {
|
||||
prepareExampleIcebergTable(table, 2)
|
||||
val callRollbackToTimestamp = {
|
||||
val targetSnapshotCommittedAt = getFirstSnapshot(table).getAs[Timestamp]("committed_at")
|
||||
val targetTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
|
||||
.format(targetSnapshotCommittedAt.getTime + 1)
|
||||
s"CALL $catalogV2.system.rollback_to_timestamp (table => '$table', timestamp => TIMESTAMP '$targetTimestamp')"
|
||||
}
|
||||
|
||||
interceptContains[AccessControlException](doAs(someone, sql(callRollbackToTimestamp)))(
|
||||
s"does not have [alter] privilege on [$namespace1/$tableName]")
|
||||
doAs(admin, sql(callRollbackToTimestamp))
|
||||
}
|
||||
}
|
||||
|
||||
test("CALL set_current_snapshot") {
|
||||
val tableName = "table_set_current_snapshot"
|
||||
val table = s"$catalogV2.$namespace1.$tableName"
|
||||
withCleanTmpResources(Seq((table, "table"))) {
|
||||
prepareExampleIcebergTable(table, 2)
|
||||
val targetSnapshotId = getFirstSnapshot(table).getAs[Long]("snapshot_id")
|
||||
val callSetCurrentSnapshot =
|
||||
s"CALL $catalogV2.system.set_current_snapshot (table => '$table', snapshot_id => $targetSnapshotId)"
|
||||
|
||||
interceptContains[AccessControlException](doAs(someone, sql(callSetCurrentSnapshot)))(
|
||||
s"does not have [alter] privilege on [$namespace1/$tableName]")
|
||||
doAs(admin, sql(callSetCurrentSnapshot))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user