Skip to content

Commit

Permalink
add secured cluster serving (intel-analytics#2635)
Browse files Browse the repository at this point in the history
* add secured cluster serving

* correct a ut error
  • Loading branch information
glorysdj committed Aug 3, 2020
1 parent 332d9c5 commit 494046e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object ClusterServing {
helper.initArgs()
params = new SerParams(helper)
val serving = StreamExecutionEnvironment.getExecutionEnvironment
serving.addSource(new FlinkRedisSource(params)).setParallelism(1)
serving.addSource(new FlinkRedisSource(params))
.map(new FlinkInference(params))
.addSink(new FlinkRedisSink(params))
val jobClient = serving.executeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,45 @@ import com.intel.analytics.zoo.serving.utils.SerParams
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.log4j.Logger
import redis.clients.jedis.{Jedis, JedisPool}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}


class FlinkRedisSink(params: SerParams) extends RichSinkFunction[List[(String, String)]] {
var redisPool: JedisPool = null
var jedis: Jedis = null
var logger: Logger = null

override def open(parameters: Configuration): Unit = {
redisPool = new JedisPool(params.redisHost, params.redisPort)
// db = RedisIO.getRedisClient(redisPool)
logger = Logger.getLogger(getClass)

if (params.redisSecureEnabled) {
System.setProperty("javax.net.ssl.trustStore", params.redisSecureTrustStorePath)
System.setProperty("javax.net.ssl.trustStorePassword", params.redisSecureTrustStorePassword)
System.setProperty("javax.net.ssl.keyStoreType", "JKS")
System.setProperty("javax.net.ssl.keyStore", params.redisSecureTrustStorePath)
System.setProperty("javax.net.ssl.keyStorePassword", params.redisSecureTrustStorePassword)
}

redisPool = new JedisPool(new JedisPoolConfig(),
params.redisHost, params.redisPort, params.redisSecureEnabled)
params.redisSecureEnabled match {
case true => logger.info(s"FlinkRedisSink connect to secured Redis successfully.")
case false => logger.info(s"FlinkRedisSink connect to plain Redis successfully.")
}
jedis = RedisIO.getRedisClient(redisPool)

}

override def close(): Unit = {
redisPool.close()
if (null != redisPool) {
redisPool.close()
}
}


override def invoke(value: List[(String, String)]): Unit = {
// logger.info(s"Preparing to write result to redis")
jedis = RedisIO.getRedisClient(redisPool)

val ppl = jedis.pipelined()
value.foreach(v => RedisIO.writeHashMap(ppl, v._1, v._2))
ppl.sync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.intel.analytics.zoo.serving.utils.{FileUtils, SerParams}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.log4j.Logger
import redis.clients.jedis.{Jedis, JedisPool, StreamEntryID}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, StreamEntryID}

import scala.collection.JavaConverters._

Expand All @@ -38,21 +38,33 @@ class FlinkRedisSource(params: SerParams) extends RichSourceFunction[List[(Strin

override def open(parameters: Configuration): Unit = {
logger = Logger.getLogger(getClass)
redisPool = new JedisPool(params.redisHost, params.redisPort)

if (params.redisSecureEnabled) {
System.setProperty("javax.net.ssl.trustStore", params.redisSecureTrustStorePath)
System.setProperty("javax.net.ssl.trustStorePassword", params.redisSecureTrustStorePassword)
System.setProperty("javax.net.ssl.keyStoreType", "JKS")
System.setProperty("javax.net.ssl.keyStore", params.redisSecureTrustStorePath)
System.setProperty("javax.net.ssl.keyStorePassword", params.redisSecureTrustStorePassword)
}

redisPool = new JedisPool(new JedisPoolConfig(),
params.redisHost, params.redisPort, 5000, params.redisSecureEnabled)
params.redisSecureEnabled match {
case true => logger.info(s"FlinkRedisSource connect to secured Redis successfully.")
case false => logger.info(s"FlinkRedisSource connect to plain Redis successfully.")
}
jedis = RedisIO.getRedisClient(redisPool)
try {
jedis.xgroupCreate("serving_stream", "serving",
new StreamEntryID(0, 0), true)
jedis.xgroupCreate("serving_stream", "serving", new StreamEntryID(0, 0), true)
} catch {
case e: Exception =>
println(s"$e exist group")
}

}

override def run(sourceContext: SourceFunction
.SourceContext[List[(String, String)]]): Unit = while (isRunning) {
// logger.info(s">>> get from source begin ${System.currentTimeMillis()} ms")
// logger.info(s">>> get from source begin ${System.currentTimeMillis()} ms")
val start = System.nanoTime()
val groupName = "serving"
val consumerName = "consumer-" + UUID.randomUUID().toString
Expand All @@ -63,7 +75,7 @@ class FlinkRedisSource(params: SerParams) extends RichSourceFunction[List[(Strin
1,
false,
new SimpleEntry("serving_stream", StreamEntryID.UNRECEIVED_ENTRY))
// logger.info(s">>> get from source readed redis ${System.currentTimeMillis()} ms")
// logger.info(s">>> get from source readed redis ${System.currentTimeMillis()} ms")
if (response != null) {
for (streamMessages <- response.asScala) {
val key = streamMessages.getKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ object FrontEndApp extends Supportive {
}
}
}

if (arguments.httpsEnabled) {
val serverContext = defineServerContext(arguments.httpsKeyStorePassword,
arguments.httpsKeyStorePath)
Expand Down Expand Up @@ -262,6 +263,9 @@ object FrontEndApp extends Supportive {
opt[Int]('a', "tokenAcquireTimeout")
.action((x, c) => c.copy(tokenAcquireTimeout = x))
.text("token acquire timeout")
opt[Boolean]('s', "httpsEnabled")
.action((x, c) => c.copy(httpsEnabled = x))
.text("https enabled or not")
opt[String]('p', "httpsKeyStorePath")
.action((x, c) => c.copy(httpsKeyStorePath = x))
.text("https keyStore path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import akka.util.Timeout

trait JedisEnabledActor extends Actor with Supportive {
val actorName = self.path.name
override val logger = LoggerFactory.getLogger(getClass)

def retrieveJedis(
redisHost: String,
Expand All @@ -48,6 +49,10 @@ trait JedisEnabledActor extends Actor with Supportive {
}
val jedisPoolConfig = new JedisPoolConfig()
val jedisPool = new JedisPool(new JedisPoolConfig(), redisHost, redisPort, redisSecureEnabled)
redisSecureEnabled match {
case true => logger.info(s"$actorName connect to secured Redis successfully.")
case false => logger.info(s"$actorName connect to plain Redis successfully.")
}
jedisPool.getResource()
}
}
Expand Down Expand Up @@ -136,7 +141,7 @@ class RedisGetActor(
redisSecureEnabled: Boolean,
redissTrustStorePath: String,
redissTrustStorePassword: String) extends JedisEnabledActor {
override val logger = LoggerFactory.getLogger(classOf[RedisPutActor])
override val logger = LoggerFactory.getLogger(classOf[RedisGetActor])
val jedis = retrieveJedis(redisHost, redisPort,
redisSecureEnabled, redissTrustStorePath, redissTrustStorePassword)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ClusterServingHelper(_configPath: String = "config.yaml") {

val secureConfig = configList.get("secure").asInstanceOf[HM]
redisSecureEnabled = if (getYaml(secureConfig, "secure_enabled", false) != null) {
getYaml(secureConfig, "secure_enabled", false).asInstanceOf[Boolean]
getYaml(secureConfig, "secure_enabled", "false").asInstanceOf[String].toBoolean
} else {
false
}
Expand Down

0 comments on commit 494046e

Please sign in to comment.