From c17dd9d490d8ccca5a92fa343fc83a55750da5a4 Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Thu, 13 Jan 2022 20:23:02 +0800 Subject: [PATCH] [KYUUBI #1731][FEATURE][FOLLOWUP] Use validated HadoopFs URI in HadoopFsDelegationTokenProvider#obtainDelegationTokens ### _Why are the changes needed?_ If core-site.xml contains a FileSystem uri, whose FileSystem implementation class is not present, following error will appear repeatedly in Kyuubi log after any user connects to Kyuubi and executed any SQL: ``` 22/01/12 19:45:08 WARN credentials.HadoopFsDelegationTokenProvider: Failed to get Hadoop FileSystem instance by URI: alluxio://localhost java.lang.RuntimeException: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2667) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$2(HadoopFsDelegationTokenProvider.scala:100) at scala.util.Try$.apply(Try.scala:213) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$1(HadoopFsDelegationTokenProvider.scala:100) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.hadoopFSsToAccess(HadoopFsDelegationTokenProvider.scala:99) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.$anonfun$obtainDelegationTokens$1(HadoopFsDelegationTokenProvider.scala:62) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$$anon$1.run(HadoopFsDelegationTokenProvider.scala:125) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.doAsProxyUser(HadoopFsDelegationTokenProvider.scala:124) at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.obtainDelegationTokens(HadoopFsDelegationTokenProvider.scala:60) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1(HadoopCredentialsManager.scala:210) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1$adapted(HadoopCredentialsManager.scala:210) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:214) at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.run(HadoopCredentialsManager.scala:210) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2571) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2665) ... 40 more ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1739 from zhouyifan279/1731. Closes #1731 387036d8 [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found 6d80b8e9 [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found Authored-by: zhouyifan279 Signed-off-by: ulysses-you --- .../HadoopFsDelegationTokenProvider.scala | 68 +++++++------------ ...HadoopFsDelegationTokenProviderSuite.scala | 9 +-- 2 files changed, 31 insertions(+), 46 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala index 965e03b58..da9f4cb95 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala @@ -18,20 +18,20 @@ package org.apache.kyuubi.credentials import java.lang.reflect.UndeclaredThrowableException +import java.net.URI import java.security.PrivilegedExceptionAction import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsCache, doAsProxyUser} +import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{doAsProxyUser, validatedFsUris} import org.apache.kyuubi.util.KyuubiHadoopUtils class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { @@ -39,6 +39,7 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with private var tokenRequired: Boolean = _ private var hadoopConf: Configuration = _ private var kyuubiConf: KyuubiConf = _ + private var fsUris: Seq[URI] = _ override val serviceName: String = "hadoopfs" @@ -46,20 +47,23 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with this.tokenRequired = SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE - // FileSystem objects are cached in FileSystem.CACHE by a composite key. - // The UserGroupInformation object used to create it is part of that key. - // If cache is enabled, new FileSystem objects are created and cached at every method - // invocation. - this.hadoopConf = disableFsCache(kyuubiConf, new HdfsConfiguration(hadoopConf)) this.kyuubiConf = kyuubiConf + this.fsUris = validatedFsUris(kyuubiConf, hadoopConf) + this.hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf, loadDefaults = false) + + // Using HdfsConfiguration to ensure hdfs-site.xml is loaded + new HdfsConfiguration(hadoopConf).iterator().asScala.foreach(e => + this.hadoopConf.set(e.getKey, e.getValue)) + // Disable FileSystem cache as its size grows at each invocation of #obtainDelegationTokens + this.fsUris.foreach(uri => + this.hadoopConf.setBoolean(s"fs.${uri.getScheme}.impl.disable.cache", true)) } override def delegationTokensRequired(): Boolean = tokenRequired override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = { doAsProxyUser(owner) { - val fileSystems = - HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hadoopConf) + val fileSystems = fsUris.map(FileSystem.get(_, hadoopConf)).toSet try { // Renewer is not needed. But setting a renewer can avoid potential NPE. @@ -80,39 +84,19 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with object HadoopFsDelegationTokenProvider extends Logging { - def disableFsCache(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Configuration = { - // Avoid unnecessary disk io by not loading default resources - val newConf = KyuubiHadoopUtils.newHadoopConf( - kyuubiConf, - loadDefaults = false) - - hadoopConf.iterator().asScala.foreach(e => newConf.set(e.getKey, e.getValue)) - - hadoopFSsToAccess(kyuubiConf, hadoopConf) - .foreach(fs => newConf.setBoolean(s"fs.${fs.getScheme}.impl.disable.cache", true)) - newConf - } - - def hadoopFSsToAccess(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Set[FileSystem] = { - val filesystemsToAccess = kyuubiConf - .get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) - .flatMap { uri => - Try(new Path(uri).getFileSystem(hadoopConf)) match { - case Success(value) => - Some(value) - case Failure(e) => - warn(s"Failed to get Hadoop FileSystem instance by URI: $uri", e) - None - } + def validatedFsUris(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Seq[URI] = { + val uris = kyuubiConf.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) :+ + hadoopConf.get("fs.defaultFS", "file:///") + uris.flatMap { str => + try { + val uri = URI.create(str) + FileSystem.get(uri, hadoopConf) + Some(uri) + } catch { + case e: Throwable => + warn(s"Failed to get Hadoop FileSystem instance by URI: $str", e) + None } - .toSet - - Try(FileSystem.get(hadoopConf)) match { - case Success(value) => - filesystemsToAccess + value - case Failure(e) => - warn(s"Failed to get default Hadoop FileSystem instance", e) - filesystemsToAccess } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala index 8e8bbd511..ef4588f09 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala @@ -19,7 +19,6 @@ package org.apache.kyuubi.credentials import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} @@ -73,9 +72,11 @@ class HadoopFsDelegationTokenProviderSuite extends WithSecuredDFSService { KyuubiConf.CREDENTIALS_HADOOP_FS_URIS, Seq("unknown://kyuubi", hdfsUri)) - val fileSystems = HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hdfsConf) - assert(fileSystems.size == 1) - assert(fileSystems.head.isInstanceOf[DistributedFileSystem]) + val uris = HadoopFsDelegationTokenProvider.validatedFsUris(kyuubiConf, hdfsConf) + assert(uris.size == 1) + assert(uris.head.toString == hdfsUri) + + new HadoopFsDelegationTokenProvider().initialize(hdfsConf, kyuubiConf) } } }