[CELEBORN-1609] Support SSL for celeborn RESTful service
### What changes were proposed in this pull request? Support SSL for celeborn RESTful service. ### Why are the changes needed? For HTTP SSL connection requirements. ### Does this PR introduce _any_ user-facing change? No, SSL is disabled by defaults. ### How was this patch tested? Integration testing. ``` celeborn.master.http.ssl.enabled=true celeborn.master.http.ssl.keystore.path=/hadoop/keystore.jks celeborn.master.http.ssl.keystore.password=xxxxxxx ``` <img width="1143" alt="image" src="https://github.com/user-attachments/assets/2334561d-1de3-4b38-bc80-5d5d86d3b8ff"> <img width="695" alt="image" src="https://github.com/user-attachments/assets/e3877468-cc3b-4a4a-bf75-2994f557a104"> Closes #2756 from turboFei/HADP_1609_ssl2. Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
parent
d8809793f3
commit
b2aa359d91
@ -2376,6 +2376,64 @@ object CelebornConf extends Logging {
|
||||
.toSequence
|
||||
.createWithDefault(Seq.empty)
|
||||
|
||||
val MASTER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.master.http.ssl.enabled")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("Set this to true for using SSL encryption in http server.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val MASTER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.master.http.ssl.keystore.path")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore location.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val MASTER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.master.http.ssl.keystore.password")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore password.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val MASTER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.master.http.ssl.keystore.type")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore type.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val MASTER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.master.http.ssl.keystore.algorithm")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore algorithm.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val MASTER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
|
||||
buildConf("celeborn.master.http.ssl.disallowed.protocols")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("SSL versions to disallow.")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Seq("SSLv2", "SSLv3"))
|
||||
|
||||
val MASTER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
|
||||
buildConf("celeborn.master.http.ssl.include.ciphersuites")
|
||||
.categories("master")
|
||||
.version("0.6.0")
|
||||
.doc("A comma-separated list of include SSL cipher suite names.")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val HA_ENABLED: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.master.ha.enabled")
|
||||
.withAlternative("celeborn.ha.enabled")
|
||||
@ -3123,6 +3181,64 @@ object CelebornConf extends Logging {
|
||||
.toSequence
|
||||
.createWithDefault(Seq.empty)
|
||||
|
||||
val WORKER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.worker.http.ssl.enabled")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("Set this to true for using SSL encryption in http server.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val WORKER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.worker.http.ssl.keystore.path")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore location.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val WORKER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.worker.http.ssl.keystore.password")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore password.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val WORKER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.worker.http.ssl.keystore.type")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore type.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val WORKER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.worker.http.ssl.keystore.algorithm")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("SSL certificate keystore algorithm.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val WORKER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
|
||||
buildConf("celeborn.worker.http.ssl.disallowed.protocols")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("SSL versions to disallow.")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Seq("SSLv2", "SSLv3"))
|
||||
|
||||
val WORKER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
|
||||
buildConf("celeborn.worker.http.ssl.include.ciphersuites")
|
||||
.categories("worker")
|
||||
.version("0.6.0")
|
||||
.doc("A comma-separated list of include SSL cipher suite names.")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val WORKER_RPC_PORT: ConfigEntry[Int] =
|
||||
buildConf("celeborn.worker.rpc.port")
|
||||
.categories("worker")
|
||||
|
||||
@ -57,6 +57,13 @@ license: |
|
||||
| celeborn.master.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
|
||||
| celeborn.master.http.spnego.keytab | <undefined> | false | The keytab file for SPNego authentication. | 0.6.0 | |
|
||||
| celeborn.master.http.spnego.principal | <undefined> | false | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.include.ciphersuites | | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.keystore.algorithm | <undefined> | false | SSL certificate keystore algorithm. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.keystore.password | <undefined> | false | SSL certificate keystore password. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.keystore.path | <undefined> | false | SSL certificate keystore location. | 0.6.0 | |
|
||||
| celeborn.master.http.ssl.keystore.type | <undefined> | false | SSL certificate keystore type. | 0.6.0 | |
|
||||
| celeborn.master.http.stopTimeout | 5s | false | Master http server stop timeout. | 0.5.0 | |
|
||||
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
|
||||
| celeborn.master.persist.workerNetworkLocation | false | false | | 0.6.0 | |
|
||||
|
||||
@ -104,6 +104,13 @@ license: |
|
||||
| celeborn.worker.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
|
||||
| celeborn.worker.http.spnego.keytab | <undefined> | false | The keytab file for SPNego authentication. | 0.6.0 | |
|
||||
| celeborn.worker.http.spnego.principal | <undefined> | false | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.include.ciphersuites | | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.keystore.algorithm | <undefined> | false | SSL certificate keystore algorithm. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.keystore.password | <undefined> | false | SSL certificate keystore password. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.keystore.path | <undefined> | false | SSL certificate keystore location. | 0.6.0 | |
|
||||
| celeborn.worker.http.ssl.keystore.type | <undefined> | false | SSL certificate keystore type. | 0.6.0 | |
|
||||
| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop timeout. | 0.5.0 | |
|
||||
| celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | |
|
||||
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | |
|
||||
|
||||
@ -201,7 +201,14 @@ abstract class HttpService extends Service with Logging {
|
||||
httpPort(),
|
||||
httpMaxWorkerThreads(),
|
||||
httpStopTimeout(),
|
||||
httpIdleTimeout())
|
||||
httpIdleTimeout(),
|
||||
httpSslEnabled(),
|
||||
httpSslKeyStorePath(),
|
||||
httpSslKeyStorePassword(),
|
||||
httpSslKeyStoreType(),
|
||||
httpSslKeyStoreAlgorithm(),
|
||||
httpSslDisallowedProtocols(),
|
||||
httpSslIncludedCipherSuites())
|
||||
httpServer.start()
|
||||
startInternal()
|
||||
// block until the HTTP server is started, otherwise, we may get
|
||||
@ -261,6 +268,69 @@ abstract class HttpService extends Service with Logging {
|
||||
}
|
||||
}
|
||||
|
||||
private[celeborn] def httpSslEnabled(): Boolean = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_ENABLED)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_ENABLED)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslKeyStorePath(): Option[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_PATH)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_PATH)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslKeyStorePassword(): Option[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_PASSWORD)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_PASSWORD)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslKeyStoreType(): Option[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_TYPE)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_TYPE)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslKeyStoreAlgorithm(): Option[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_ALGORITHM)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_ALGORITHM)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslDisallowedProtocols(): Seq[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_DISALLOWED_PROTOCOLS)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_DISALLOWED_PROTOCOLS)
|
||||
}
|
||||
}
|
||||
|
||||
private def httpSslIncludedCipherSuites(): Seq[String] = {
|
||||
serviceName match {
|
||||
case Service.MASTER =>
|
||||
conf.get(CelebornConf.MASTER_HTTP_SSL_INCLUDE_CIPHER_SUITES)
|
||||
case Service.WORKER =>
|
||||
conf.get(CelebornConf.WORKER_HTTP_SSL_INCLUDE_CIPHER_SUITES)
|
||||
}
|
||||
}
|
||||
|
||||
def connectionUrl: String = {
|
||||
httpServer.getServerUri
|
||||
}
|
||||
|
||||
@ -20,9 +20,11 @@ package org.apache.celeborn.server.common.http
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils
|
||||
import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector}
|
||||
import org.eclipse.jetty.http.HttpVersion
|
||||
import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
|
||||
import org.eclipse.jetty.server.handler.{ContextHandlerCollection, ErrorHandler}
|
||||
import org.eclipse.jetty.util.component.LifeCycle
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory
|
||||
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
|
||||
|
||||
import org.apache.celeborn.common.internal.Logging
|
||||
@ -105,7 +107,7 @@ private[celeborn] case class HttpServer(
|
||||
def getState: String = server.getState
|
||||
}
|
||||
|
||||
object HttpServer {
|
||||
object HttpServer extends Logging {
|
||||
|
||||
def apply(
|
||||
role: String,
|
||||
@ -113,7 +115,14 @@ object HttpServer {
|
||||
port: Int,
|
||||
poolSize: Int,
|
||||
stopTimeout: Long,
|
||||
idleTimeout: Long): HttpServer = {
|
||||
idleTimeout: Long,
|
||||
sslEnabled: Boolean,
|
||||
keyStorePath: Option[String],
|
||||
keyStorePassword: Option[String],
|
||||
keyStoreType: Option[String],
|
||||
keyStoreAlgorithm: Option[String],
|
||||
sslDisallowedProtocols: Seq[String],
|
||||
sslIncludeCipherSuites: Seq[String]): HttpServer = {
|
||||
val pool = new QueuedThreadPool(math.max(poolSize, 8))
|
||||
pool.setName(s"$role-JettyThreadPool")
|
||||
pool.setDaemon(true)
|
||||
@ -130,14 +139,51 @@ object HttpServer {
|
||||
|
||||
val serverExecutor = new ScheduledExecutorScheduler(s"$role-JettyScheduler", true)
|
||||
val httpConf = new HttpConfiguration()
|
||||
val connector = new ServerConnector(
|
||||
server,
|
||||
null,
|
||||
serverExecutor,
|
||||
null,
|
||||
-1,
|
||||
-1,
|
||||
new HttpConnectionFactory(httpConf))
|
||||
|
||||
val connector =
|
||||
if (sslEnabled) {
|
||||
if (keyStorePath.isEmpty) {
|
||||
throw new IllegalArgumentException("KeyStorePath is not provided for SSL connection.")
|
||||
}
|
||||
if (keyStorePassword.isEmpty) {
|
||||
throw new IllegalArgumentException("KeyStorePassword is not provided for SSL connection.")
|
||||
}
|
||||
|
||||
val sslContextFactory = new SslContextFactory.Server()
|
||||
logInfo(
|
||||
"HTTP Server SSL: adding excluded protocols: " + sslDisallowedProtocols.mkString(","))
|
||||
sslContextFactory.addExcludeProtocols(sslDisallowedProtocols: _*)
|
||||
logInfo(s"HTTP Server SSL: SslContextFactory.getExcludeProtocols = ${sslContextFactory.getExcludeProtocols.mkString(",")}")
|
||||
logInfo(
|
||||
"HTTP Server SSL: adding included cipher suites: " + sslIncludeCipherSuites.mkString(","))
|
||||
sslContextFactory.setIncludeCipherSuites(sslIncludeCipherSuites: _*)
|
||||
logInfo(s"HTTP Server SSL: SslContextFactory.getIncludeCipherSuites = ${sslContextFactory.getIncludeCipherSuites.mkString(",")}")
|
||||
|
||||
sslContextFactory.setKeyStorePath(keyStorePath.get)
|
||||
sslContextFactory.setKeyStorePassword(keyStorePassword.get)
|
||||
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
|
||||
keyStoreAlgorithm.foreach(sslContextFactory.setKeyManagerFactoryAlgorithm)
|
||||
|
||||
new ServerConnector(
|
||||
server,
|
||||
null,
|
||||
serverExecutor,
|
||||
null,
|
||||
-1,
|
||||
-1,
|
||||
new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.toString),
|
||||
new HttpConnectionFactory(httpConf))
|
||||
} else {
|
||||
new ServerConnector(
|
||||
server,
|
||||
null,
|
||||
serverExecutor,
|
||||
null,
|
||||
-1,
|
||||
-1,
|
||||
new HttpConnectionFactory(httpConf))
|
||||
}
|
||||
|
||||
connector.setHost(host)
|
||||
connector.setPort(port)
|
||||
connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.celeborn.server.common.http.api
|
||||
|
||||
import java.net.URI
|
||||
import javax.servlet.ServletConfig
|
||||
import javax.ws.rs.{GET, Path, PathParam, Produces}
|
||||
import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType, Response, UriInfo}
|
||||
@ -60,7 +61,7 @@ class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext
|
||||
.ctxId(ctxId)
|
||||
.buildContext(true)
|
||||
|
||||
val openApi = setCelebornOpenAPIDefinition(ctx.read(), uriInfo.getBaseUri.toString)
|
||||
val openApi = setCelebornOpenAPIDefinition(ctx.read(), uriInfo.getBaseUri)
|
||||
|
||||
if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) {
|
||||
Response.status(Response.Status.OK)
|
||||
@ -81,9 +82,10 @@ class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext
|
||||
}
|
||||
}
|
||||
|
||||
private def setCelebornOpenAPIDefinition(openApi: OpenAPI, requestBaseUrl: String): OpenAPI = {
|
||||
// TODO: to improve when https is enabled.
|
||||
val apiUrls = List(requestBaseUrl, s"http://${httpService.connectionUrl}/").distinct
|
||||
private def setCelebornOpenAPIDefinition(openApi: OpenAPI, requestBaseUri: URI): OpenAPI = {
|
||||
val httpScheme = if (httpService.httpSslEnabled()) "https:" else "http:"
|
||||
val requestBaseUrl = s"$httpScheme${requestBaseUri.getSchemeSpecificPart}"
|
||||
val apiUrls = List(requestBaseUrl, s"$httpScheme//${httpService.connectionUrl}/").distinct
|
||||
openApi.info(
|
||||
new Info().title(
|
||||
s"Apache Celeborn REST API Documentation")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user