Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution progress complete #140

Merged
merged 20 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f8d173b
ExchangisListener and TenancyParallelConsumerManager
Davidhua1996 Jan 11, 2022
b5c5ce7
Basic logic code in TaskObserver
Davidhua1996 Jan 12, 2022
662b497
onPublish and subscribe method in AbstractTaskObserver; implement in …
Davidhua1996 Jan 14, 2022
64a723f
LoadBalance Module
Davidhua1996 Jan 14, 2022
4e478f7
LoadBalanceSchedulerTask, LoadBalancePoller and FlexibleTenancyLoadBa…
Davidhua1996 Jan 16, 2022
e345608
JobExecutionUnitTest
Davidhua1996 Jan 18, 2022
421dfa6
Merge branch 'dev-1.0.0' of github.com:WeBankFinTech/Exchangis into d…
Davidhua1996 Jan 19, 2022
57a837c
Job Log module which supports rpc call
Davidhua1996 Jan 19, 2022
f2ac951
Add a custom log appender to send rpc message to server
Davidhua1996 Jan 19, 2022
9059a0f
Dive the Job Log cache by jobExecutionId
Davidhua1996 Jan 20, 2022
f3f592b
Reconstruct the SqoopExchangisEngineJobBuilder
Davidhua1996 Jan 21, 2022
8ac508c
newValue/getValue(Source source) method in JobParamDefine
Davidhua1996 Jan 22, 2022
fce9371
Merge branch 'dev-1.0.0' of github.com:WeBankFinTech/Exchangis into d…
Davidhua1996 Jan 23, 2022
178c306
Complete the modules related by execution
Davidhua1996 Jan 23, 2022
825c7f0
TaskGenerateService and TaskExecuteService
Davidhua1996 Jan 24, 2022
8dd9865
Merge branch 'dev-1.0.0' of github.com:WeBankFinTech/Exchangis into d…
Davidhua1996 Jan 24, 2022
90d0fa6
Merge and resolve the conflicts
Davidhua1996 Jan 24, 2022
cfb5457
Fix the problem in merging
Davidhua1996 Jan 24, 2022
99a0992
CreateTime/LastUpdateTime in launchableExchangisJob
Davidhua1996 Jan 24, 2022
989df65
Execution Debug
Davidhua1996 Jan 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions assembly-package/config/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@

<configuration status="error" monitorInterval="30">
<appenders>
<RpcLog name="RpcLog" >
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level - %msg%xEx%n"/>
</RpcLog>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M - %msg%xEx%n"/>
</Console>
<RollingFile name="RollingFile" fileName="${sys:log.path}/${sys:serviceName}.log"
filePattern="${sys:log.path}/$${date:yyyy-MM}/${sys:serviceName}/exchangis-log-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="100MB"/>
<DefaultRolloverStrategy max="20"/>
</RollingFile>
<!-- <RollingFile name="RollingFile" fileName="${sys:log.path}/${sys:serviceName}.log"-->
<!-- filePattern="${sys:log.path}/$${date:yyyy-MM}/${sys:serviceName}/exchangis-log-%d{yyyy-MM-dd}-%i.log">-->
<!-- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>-->
<!-- <SizeBasedTriggeringPolicy size="100MB"/>-->
<!-- <DefaultRolloverStrategy max="20"/>-->
<!-- </RollingFile>-->
</appenders>
<loggers>
<root level="INFO">
<appender-ref ref="RollingFile"/>
<!-- <appender-ref ref="RollingFile"/>-->
<appender-ref ref="Console"/>
</root>
<logger name="com.webank.wedatasphere.exchangis.job.server.log.DefaultRpcJobLogger$" level="INFO" additivity="true">
<appender-ref ref="RpcLog"/>
</logger>
</loggers>
</configuration>

4 changes: 2 additions & 2 deletions db/exchangis_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ CREATE TABLE `exchangis_job_param_config`
`sort` int(11) DEFAULT NULL,
`description` varchar(255) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `config_key` (`config_key`)
PRIMARY KEY (`id`)
-- UNIQUE KEY `config_key` (`config_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.webank.wedatasphere.exchangis.dao.domain;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
Expand Down Expand Up @@ -73,7 +76,9 @@ public void setProjectId(Long projectId) {
this.projectId = projectId;
}

public Long getDssProjectId() { return dssProjectId; }
public Long getDssProjectId() {
return dssProjectId;
}

public void setDssProjectId(Long dssProjectId) { this.dssProjectId = dssProjectId; }

Expand Down Expand Up @@ -213,14 +218,4 @@ public void setModifyUser(String modifyUser) {
this.modifyUser = modifyUser;
}

@Override
public String toString() {
return "ExchangisJob{" + "id=" + id + ", projectId=" + projectId + ", jobName=" + jobName + ", jobType="
+ jobType + ", engineType=" + engineType + ", jobLabels=" + jobLabels + ", jobDesc=" + jobDesc
+ ", content=" + content + ", alarmUser=" + alarmUser + ", alarmLevel=" + alarmLevel + ", proxyUser="
+ proxyUser + ", executeNode=" + executeNode + ", syncType=" + syncType + ", jobParams=" + jobParams
+ ", createTime=" + createTime + ", createUser=" + createUser + ", modifyTime=" + modifyTime
+ ", modifyUser=" + modifyUser + "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ public List<ElementUI> getDataSourceParamsUI(String dsType, String engineAndDire
ExchangisDataSource exchangisDataSource = this.context.getExchangisDataSource(dsType);
List<ExchangisJobParamConfig> paramConfigs = exchangisDataSource.getDataSourceParamConfigs();
List<ExchangisJobParamConfig> filteredConfigs = new ArrayList<>();
String[] engineDirect = engineAndDirection.split("-");
String direction = engineDirect[1];
for (ExchangisJobParamConfig paramConfig : paramConfigs) {
if (Optional.ofNullable(paramConfig.getConfigDirection()).orElse("").equalsIgnoreCase(engineAndDirection)) {
filteredConfigs.add(paramConfig);
}
Optional.ofNullable(paramConfig.getConfigDirection()).ifPresent(configDirection -> {
if (configDirection.equalsIgnoreCase(engineAndDirection) || configDirection.equalsIgnoreCase(direction)){
filteredConfigs.add(paramConfig);
}
});
}
return this.buildDataSourceParamsUI(filteredConfigs);
}
Expand Down Expand Up @@ -1192,7 +1196,7 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
List<DataSourceDbTableColumnDTO> sinkFields = (List<DataSourceDbTableColumnDTO>) sinkMessage.getData().get("columns");
for (int i = 0; i < sinkFields.size(); i++) {
DataSourceDbTableColumnDTO field = sinkFields.get(i);
field.setFieldIndex(i);
// field.setFieldIndex(i);
field.setFieldEditable(!"HIVE".equals(vo.getSinkTypeId()));
}
message.data("sinkFields", sinkFields);
Expand All @@ -1209,21 +1213,28 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
right = sourceFields;
exchanged = !exchanged;
}

for (DataSourceDbTableColumnDTO l : left) {
String lname = l.getName();
for (DataSourceDbTableColumnDTO r : right) {
String rname = r.getName();
if (lname.equals(rname)) {
Map<String, Object> deduction = new HashMap<>();
deduction.put("source", exchanged ? r : l);
deduction.put("sink", exchanged ? l : r);
deduction.put("deleteEnable", !containHive);
deductions.add(deduction);
}
}
// for (DataSourceDbTableColumnDTO l : left) {
// String lname = l.getName();
// for (DataSourceDbTableColumnDTO r : right) {
// String rname = r.getName();
// if (lname.equals(rname)) {
// Map<String, Object> deduction = new HashMap<>();
// deduction.put("source", exchanged ? r : l);
// deduction.put("sink", exchanged ? l : r);
// deduction.put("deleteEnable", !containHive);
// deductions.add(deduction);
// }
// }
// }
for (int i = 0; i < left.size(); i ++){
DataSourceDbTableColumnDTO leftElement = left.get(i);
DataSourceDbTableColumnDTO rightElement = right.get(i % right.size());
Map<String, Object> deduction = new HashMap<>();
deduction.put("source", exchanged ? rightElement : leftElement);
deduction.put("sink", exchanged ? leftElement : rightElement);
deduction.put("deleteEnable", !containHive);
deductions.add(deduction);
}

message.data("deductions", deductions);

return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.vo.ExchangisJobVO;
import com.webank.wedatasphere.exchangis.job.listener.JobLogListener;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -17,6 +17,11 @@ public class ExchangisJobBuilderContext {
*/
private ExchangisJobInfo originalJob;

/**
* Listen the log event
*/
private JobLogListener jobLogListener;

private Map<String, Object> env = new HashMap<>();

private Map<String, Map<String, Object>> datasourceParams = new HashMap<>();
Expand All @@ -25,8 +30,17 @@ public ExchangisJobBuilderContext() {

}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob) {
public ExchangisJobBuilderContext(ExchangisJobInfo originalJob, JobLogListener jobLogListener){
this.originalJob = originalJob;
this.jobLogListener = jobLogListener;
}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob) {
this(originalJob, null);
}

public JobLogListener getJobLogListener() {
return jobLogListener;
}

public ExchangisJobInfo getOriginalJob() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package com.webank.wedatasphere.exchangis.job.builder.api;

import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisBase;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJob;
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamDefine;
import com.webank.wedatasphere.exchangis.job.domain.params.JobParamSet;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.utils.TypeGenericUtils;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.*;

public abstract class AbstractExchangisJobBuilder<T extends ExchangisJob, E extends ExchangisJob> implements ExchangisJobBuilder<T, E> {
public abstract class AbstractExchangisJobBuilder<T extends ExchangisJob, E extends ExchangisBase> implements ExchangisJobBuilder<T, E> {
@Override
@SuppressWarnings("unchecked")
public Class<T> inputJob() {
return (Class<T>) getGenericType(0);
return (Class<T>) TypeGenericUtils.getActualTypeFormGenericClass(this.getClass(), AbstractExchangisJobBuilder.class, 0);
}

@Override
@SuppressWarnings("unchecked")
public Class<E> outputJob() {
return (Class<E>)getGenericType(1);
return (Class<E>) TypeGenericUtils.getActualTypeFormGenericClass(this.getClass(), AbstractExchangisJobBuilder.class, 1);
}

@Override
Expand All @@ -30,46 +36,16 @@ public boolean canBuild(T inputJob) {
return true;
}

public Class<?> getGenericType(int position){
Map<String, Type> typeVariableReflect = new HashMap<>();
Map<Class<?>, Type[]> classTypeVariableMap = new HashMap<>();
Queue<Class<?>> traverseQueue = new LinkedList<>();
Type[] classTypes = null;
Class<?> currentClass = null;
traverseQueue.offer(this.getClass());
while (!traverseQueue.isEmpty()) {
currentClass = traverseQueue.poll();
Type[] typeParameters = currentClass.getTypeParameters();
if (typeParameters.length > 0) {
classTypes = classTypeVariableMap.get(currentClass);
//Ignore the builder which has the parameterType (not resolved)
if (null == classTypes) {
return null;
}
for (int i = 0; i < classTypes.length; i++) {
typeVariableReflect.put(typeParameters[i].getTypeName(), classTypes[i]);
}
}
if (Objects.equals(currentClass, AbstractExchangisJobBuilder.class)) {
break;
}
//Just traverse the superclass ignore interfaces
Type superclassType = currentClass.getGenericSuperclass();
if (Objects.nonNull(superclassType) && superclassType instanceof ParameterizedType) {
Type[] actualTypes = ((ParameterizedType) superclassType).getActualTypeArguments();
for (int i = 0 ; i < actualTypes.length; i++){
Type actualType = actualTypes[i];
if (actualType instanceof TypeVariable){
actualTypes[i] = typeVariableReflect.getOrDefault(actualType.getTypeName(), actualType);
}
}
classTypeVariableMap.put(currentClass.getSuperclass(), actualTypes);
}
traverseQueue.offer(currentClass.getSuperclass());
};
if (Objects.nonNull(classTypes) && classTypes.length > position){
return (Class<?>)classTypes[position];

@Override
public E build(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException {
JobParamDefine.defaultParam.set(new JobParamSet());
try {
return buildJob(inputJob, expectOut, ctx);
} finally{
JobParamDefine.defaultParam.remove();
}
return null;
}

public abstract E buildJob(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ public interface ExchangisJobBuilder<T extends ExchangisJob, E extends Exchangis
* @param ctx context
* @return outputJob
*/
E buildJob(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException;
E build(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.webank.wedatasphere.exchangis.job.builder.api;

import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisBase;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJob;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;

Expand All @@ -13,7 +14,7 @@
* @param <T> input job
* @param <E> output job
*/
public class GenericExchangisJobBuilderChain<T extends ExchangisJob, E extends ExchangisJob> implements ExchangisJobBuilderChain<T, E>{
public class GenericExchangisJobBuilderChain<T extends ExchangisJob, E extends ExchangisBase> implements ExchangisJobBuilderChain<T, E>{

/**
* Chain list
Expand All @@ -36,7 +37,7 @@ public E build(T inputJob, ExchangisJobBuilderContext ctx) throws ExchangisJobEx
if (Objects.nonNull(inputJob)){
for( ExchangisJobBuilder<T, E> builder : builderChain){
if(builder.canBuild(inputJob)){
expectJob.set(builder.buildJob(inputJob, expectJob.get(), ctx));
expectJob.set(builder.build(inputJob, expectJob.get(), ctx));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ public class ExchangisJobInfo extends GenericExchangisJob {

/**
* Convert from view object
* @param exchangisJobVO vo
* @param jobVo vo
*/
public ExchangisJobInfo(ExchangisJobVO exchangisJobVO){

public ExchangisJobInfo(ExchangisJobVO jobVo){
this.id = jobVo.getId();
this.name = jobVo.getJobName();
this.engineType = jobVo.getEngineType();
this.jobLabel = jobVo.getJobLabels();
this.createTime = jobVo.getCreateTime();
this.createUser = jobVo.getCreateUser();
this.lastUpdateTime = jobVo.getModifyTime();
this.jobContent = jobVo.getContent();
this.executeUser = jobVo.getProxyUser();
this.jobParams = jobVo.getJobParams();
}

public ExchangisJobInfo(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GenericExchangisJob implements ExchangisJob {

protected String engineType;

private String jobLabel;
protected String jobLabel;

private Map<String, Object> labelHolder = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.webank.wedatasphere.exchangis.job.domain.params;

import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobExceptionCode;

import java.util.Objects;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -40,7 +43,12 @@ public T getValue(Object source) {
if (!Objects.equals(sourceReference, source) &&
Objects.nonNull(valueLoader) &&
sourceType.isAssignableFrom(source.getClass())) {
this.value = this.valueLoader.apply(key, source);
try {
this.value = this.valueLoader.apply(key, source);
} catch (Exception e){
throw new ExchangisJobException.Runtime(ExchangisJobExceptionCode.TASK_PARM_ERROR.getCode(),
"Exception in loading param: [" + key + "]", e);
}
this.sourceReference = source;
}
}
Expand All @@ -52,7 +60,12 @@ public JobParam<T> loadValue(Object source){
if(Objects.nonNull(source) &&
Objects.nonNull(valueLoader) &&
sourceType.isAssignableFrom(source.getClass())){
this.value = this.valueLoader.apply(key, source);
try {
this.value = this.valueLoader.apply(key, source);
} catch (Exception e){
throw new ExchangisJobException.Runtime(ExchangisJobExceptionCode.TASK_PARM_ERROR.getCode(),
"Exception in loading param: [" + key + "]", e);
}
this.sourceReference = source;
}
return this;
Expand Down
Loading