diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala index 965e03b58..da9f4cb95 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala @@ -18,20 +18,20 @@ package org.apache.kyuubi.credentials import java.lang.reflect.UndeclaredThrowableException +import java.net.URI import java.security.PrivilegedExceptionAction import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsCache, doAsProxyUser} +import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{doAsProxyUser, validatedFsUris} import org.apache.kyuubi.util.KyuubiHadoopUtils class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { @@ -39,6 +39,7 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with private var tokenRequired: Boolean = _ private var hadoopConf: Configuration = _ private var kyuubiConf: KyuubiConf = _ + private var fsUris: Seq[URI] = _ override val serviceName: String = "hadoopfs" @@ -46,20 +47,23 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with this.tokenRequired = SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE - // FileSystem objects are cached in FileSystem.CACHE by a composite key. - // The UserGroupInformation object used to create it is part of that key. - // If cache is enabled, new FileSystem objects are created and cached at every method - // invocation. - this.hadoopConf = disableFsCache(kyuubiConf, new HdfsConfiguration(hadoopConf)) this.kyuubiConf = kyuubiConf + this.fsUris = validatedFsUris(kyuubiConf, hadoopConf) + this.hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf, loadDefaults = false) + + // Using HdfsConfiguration to ensure hdfs-site.xml is loaded + new HdfsConfiguration(hadoopConf).iterator().asScala.foreach(e => + this.hadoopConf.set(e.getKey, e.getValue)) + // Disable FileSystem cache as its size grows at each invocation of #obtainDelegationTokens + this.fsUris.foreach(uri => + this.hadoopConf.setBoolean(s"fs.${uri.getScheme}.impl.disable.cache", true)) } override def delegationTokensRequired(): Boolean = tokenRequired override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = { doAsProxyUser(owner) { - val fileSystems = - HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hadoopConf) + val fileSystems = fsUris.map(FileSystem.get(_, hadoopConf)).toSet try { // Renewer is not needed. But setting a renewer can avoid potential NPE. @@ -80,39 +84,19 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with object HadoopFsDelegationTokenProvider extends Logging { - def disableFsCache(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Configuration = { - // Avoid unnecessary disk io by not loading default resources - val newConf = KyuubiHadoopUtils.newHadoopConf( - kyuubiConf, - loadDefaults = false) - - hadoopConf.iterator().asScala.foreach(e => newConf.set(e.getKey, e.getValue)) - - hadoopFSsToAccess(kyuubiConf, hadoopConf) - .foreach(fs => newConf.setBoolean(s"fs.${fs.getScheme}.impl.disable.cache", true)) - newConf - } - - def hadoopFSsToAccess(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Set[FileSystem] = { - val filesystemsToAccess = kyuubiConf - .get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) - .flatMap { uri => - Try(new Path(uri).getFileSystem(hadoopConf)) match { - case Success(value) => - Some(value) - case Failure(e) => - warn(s"Failed to get Hadoop FileSystem instance by URI: $uri", e) - None - } + def validatedFsUris(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Seq[URI] = { + val uris = kyuubiConf.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) :+ + hadoopConf.get("fs.defaultFS", "file:///") + uris.flatMap { str => + try { + val uri = URI.create(str) + FileSystem.get(uri, hadoopConf) + Some(uri) + } catch { + case e: Throwable => + warn(s"Failed to get Hadoop FileSystem instance by URI: $str", e) + None } - .toSet - - Try(FileSystem.get(hadoopConf)) match { - case Success(value) => - filesystemsToAccess + value - case Failure(e) => - warn(s"Failed to get default Hadoop FileSystem instance", e) - filesystemsToAccess } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala index 8e8bbd511..ef4588f09 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala @@ -19,7 +19,6 @@ package org.apache.kyuubi.credentials import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} @@ -73,9 +72,11 @@ class HadoopFsDelegationTokenProviderSuite extends WithSecuredDFSService { KyuubiConf.CREDENTIALS_HADOOP_FS_URIS, Seq("unknown://kyuubi", hdfsUri)) - val fileSystems = HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hdfsConf) - assert(fileSystems.size == 1) - assert(fileSystems.head.isInstanceOf[DistributedFileSystem]) + val uris = HadoopFsDelegationTokenProvider.validatedFsUris(kyuubiConf, hdfsConf) + assert(uris.size == 1) + assert(uris.head.toString == hdfsUri) + + new HadoopFsDelegationTokenProvider().initialize(hdfsConf, kyuubiConf) } } }