[KYUUBI #1731][FEATURE][FOLLOWUP] Use validated HadoopFs URI in HadoopFsDelegationTokenProvider#obtainDelegationTokens

### _Why are the changes needed?_

If core-site.xml contains a FileSystem uri, whose FileSystem implementation class is not present, following error will appear repeatedly in Kyuubi log after any user connects to Kyuubi and executed any SQL:

```
22/01/12 19:45:08 WARN credentials.HadoopFsDelegationTokenProvider: Failed to get Hadoop FileSystem instance by URI: alluxio://localhost
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2667)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$2(HadoopFsDelegationTokenProvider.scala:100)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.$anonfun$hadoopFSsToAccess$1(HadoopFsDelegationTokenProvider.scala:100)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.hadoopFSsToAccess(HadoopFsDelegationTokenProvider.scala:99)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.$anonfun$obtainDelegationTokens$1(HadoopFsDelegationTokenProvider.scala:62)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$$anon$1.run(HadoopFsDelegationTokenProvider.scala:125)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider$.doAsProxyUser(HadoopFsDelegationTokenProvider.scala:124)
	at org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.obtainDelegationTokens(HadoopFsDelegationTokenProvider.scala:60)
	at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1(HadoopCredentialsManager.scala:210)
	at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.$anonfun$run$1$adapted(HadoopCredentialsManager.scala:210)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:214)
	at org.apache.kyuubi.credentials.HadoopCredentialsManager$$anon$1.run(HadoopCredentialsManager.scala:210)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2571)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2665)
	... 40 more

```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1739 from zhouyifan279/1731.

Closes #1731

387036d8 [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found
6d80b8e9 [zhouyifan279] [KYUUBI #1731][FEATURE] Kyuubi server should not fail to start when Hadoop FileSystem class not found

Authored-by: zhouyifan279 <zhouyifan279@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
zhouyifan279 2022-01-13 20:23:02 +08:00 committed by ulysses-you
parent a533b2fa37
commit c17dd9d490
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
2 changed files with 31 additions and 46 deletions

View File

@ -18,20 +18,20 @@
package org.apache.kyuubi.credentials
import java.lang.reflect.UndeclaredThrowableException
import java.net.URI
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsCache, doAsProxyUser}
import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{doAsProxyUser, validatedFsUris}
import org.apache.kyuubi.util.KyuubiHadoopUtils
class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
@ -39,6 +39,7 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
private var tokenRequired: Boolean = _
private var hadoopConf: Configuration = _
private var kyuubiConf: KyuubiConf = _
private var fsUris: Seq[URI] = _
override val serviceName: String = "hadoopfs"
@ -46,20 +47,23 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
this.tokenRequired =
SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE
// FileSystem objects are cached in FileSystem.CACHE by a composite key.
// The UserGroupInformation object used to create it is part of that key.
// If cache is enabled, new FileSystem objects are created and cached at every method
// invocation.
this.hadoopConf = disableFsCache(kyuubiConf, new HdfsConfiguration(hadoopConf))
this.kyuubiConf = kyuubiConf
this.fsUris = validatedFsUris(kyuubiConf, hadoopConf)
this.hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf, loadDefaults = false)
// Using HdfsConfiguration to ensure hdfs-site.xml is loaded
new HdfsConfiguration(hadoopConf).iterator().asScala.foreach(e =>
this.hadoopConf.set(e.getKey, e.getValue))
// Disable FileSystem cache as its size grows at each invocation of #obtainDelegationTokens
this.fsUris.foreach(uri =>
this.hadoopConf.setBoolean(s"fs.${uri.getScheme}.impl.disable.cache", true))
}
override def delegationTokensRequired(): Boolean = tokenRequired
override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = {
doAsProxyUser(owner) {
val fileSystems =
HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hadoopConf)
val fileSystems = fsUris.map(FileSystem.get(_, hadoopConf)).toSet
try {
// Renewer is not needed. But setting a renewer can avoid potential NPE.
@ -80,39 +84,19 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
object HadoopFsDelegationTokenProvider extends Logging {
def disableFsCache(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Configuration = {
// Avoid unnecessary disk io by not loading default resources
val newConf = KyuubiHadoopUtils.newHadoopConf(
kyuubiConf,
loadDefaults = false)
hadoopConf.iterator().asScala.foreach(e => newConf.set(e.getKey, e.getValue))
hadoopFSsToAccess(kyuubiConf, hadoopConf)
.foreach(fs => newConf.setBoolean(s"fs.${fs.getScheme}.impl.disable.cache", true))
newConf
}
def hadoopFSsToAccess(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = kyuubiConf
.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS)
.flatMap { uri =>
Try(new Path(uri).getFileSystem(hadoopConf)) match {
case Success(value) =>
Some(value)
case Failure(e) =>
warn(s"Failed to get Hadoop FileSystem instance by URI: $uri", e)
None
}
def validatedFsUris(kyuubiConf: KyuubiConf, hadoopConf: Configuration): Seq[URI] = {
val uris = kyuubiConf.get(KyuubiConf.CREDENTIALS_HADOOP_FS_URIS) :+
hadoopConf.get("fs.defaultFS", "file:///")
uris.flatMap { str =>
try {
val uri = URI.create(str)
FileSystem.get(uri, hadoopConf)
Some(uri)
} catch {
case e: Throwable =>
warn(s"Failed to get Hadoop FileSystem instance by URI: $str", e)
None
}
.toSet
Try(FileSystem.get(hadoopConf)) match {
case Success(value) =>
filesystemsToAccess + value
case Failure(e) =>
warn(s"Failed to get default Hadoop FileSystem instance", e)
filesystemsToAccess
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kyuubi.credentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@ -73,9 +72,11 @@ class HadoopFsDelegationTokenProviderSuite extends WithSecuredDFSService {
KyuubiConf.CREDENTIALS_HADOOP_FS_URIS,
Seq("unknown://kyuubi", hdfsUri))
val fileSystems = HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, hdfsConf)
assert(fileSystems.size == 1)
assert(fileSystems.head.isInstanceOf[DistributedFileSystem])
val uris = HadoopFsDelegationTokenProvider.validatedFsUris(kyuubiConf, hdfsConf)
assert(uris.size == 1)
assert(uris.head.toString == hdfsUri)
new HadoopFsDelegationTokenProvider().initialize(hdfsConf, kyuubiConf)
}
}
}