Skip to content

Commit

Permalink
Merge pull request #57 from WeDataSphere/dev-0.2.3-bugfix
Browse files Browse the repository at this point in the history
Fix the problem of transformer component in Streamis
  • Loading branch information
jefftlin authored Dec 20, 2022
2 parents 74b118b + 08cf1ab commit b2d8c02
Showing 1 changed file with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.utils.JobUtils

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Created by enjoyyin on 2021/9/23.
Expand All @@ -50,20 +51,24 @@ class FlinkJarStreamisStartupParamsTransform extends Transform {
startupMap.put("flink.app.main.class.jar", transformJobContent.getMainClassJar.getFileName)
startupMap.put("flink.app.main.class.jar.bml.json",
JsonUtils.jackson.writeValueAsString(getStreamisFileContent(transformJobContent.getMainClassJar)))
val classpathFiles = if(transformJobContent.getDependencyJars != null && transformJobContent.getResources != null) {
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
transformJobContent.getDependencyJars.asScala ++ transformJobContent.getResources.asScala
} else if(transformJobContent.getDependencyJars != null) {
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
transformJobContent.getDependencyJars.asScala
} else if(transformJobContent.getResources != null) {
startupMap.put("flink.yarn.ship-directories", transformJobContent.getResources.asScala.map(_.getFileName).mkString(","))
transformJobContent.getResources.asScala

/**
* Notice : "flink.app.user.class.path" equals to PipelineOptions.CLASSPATHS in Flink
* paths must specify a protocol (e.g. file://) and be accessible on all nodes
* so we use "flink.yarn.ship-directories" instead
*/
var classPathFiles = Option(transformJobContent.getDependencyJars) match {
case Some(list) => list.asScala
case _ => mutable.Buffer[StreamisFile]()
}
Option(transformJobContent.getResources) match {
case Some(list) => classPathFiles = classPathFiles ++ list.asScala
case _ => // Do nothing
}
else mutable.Buffer[StreamisFile]()
if(classpathFiles.nonEmpty)
startupMap.put("flink.yarn.ship-directories", classPathFiles.map(_.getFileName).mkString(","))
if(classPathFiles.nonEmpty)
startupMap.put("flink.app.user.class.path.bml.json",
JsonUtils.jackson.writeValueAsString(classpathFiles.map(getStreamisFileContent).asJava))
JsonUtils.jackson.writeValueAsString(classPathFiles.map(getStreamisFileContent).asJava))
if(transformJobContent.getHdfsJars != null)
startupMap.put("flink.user.lib.path", transformJobContent.getHdfsJars.asScala.mkString(","))
val params = if(job.getParams == null) new util.HashMap[String, Any] else job.getParams
Expand Down

0 comments on commit b2d8c02

Please sign in to comment.