Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ This repository is web ui for the [Apache Paimon](https://paimon.apache.org/) pr
## About

Apache Paimon is an open source project of [The Apache Software Foundation](https://apache.org/) (ASF).

## build
1. to install nodejs and npm in your environment
2. cd paimon-web-ui && npm install
3. cd ../ && mvn clean package -Prelease -DskipTests -Drat.skip=true
4. you find apache-paimon-webui-0.1-SNAPSHOT-bin.tar.gz in paimon-web-dist/target
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@
/** ActionExecutionResult. */
public class ActionExecutionResult {

private String flinkJobId;
private boolean success;
private String errorMsg;

public static ActionExecutionResult success() {
public static ActionExecutionResult success(String flinkJobId) {
ActionExecutionResult actionExecutionResult = new ActionExecutionResult();
actionExecutionResult.success = true;
actionExecutionResult.flinkJobId = flinkJobId;
return actionExecutionResult;
}

public static ActionExecutionResult fail(String errorMsg) {
ActionExecutionResult actionExecutionResult = new ActionExecutionResult();
actionExecutionResult.success = true;
actionExecutionResult.success = false;
actionExecutionResult.errorMsg = errorMsg;
return actionExecutionResult;
}
Expand All @@ -41,15 +43,11 @@ public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public String getErrorMsg() {
return errorMsg;
}

public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
public String getFlinkJobId() {
return flinkJobId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,33 @@

import lombok.experimental.SuperBuilder;

import java.util.Optional;

/** FlinkActionContext. */
@SuperBuilder
public abstract class FlinkActionContext extends AbstractActionContext implements ActionContext {

private String sessionUrl;
private Optional<String> sessionUrl;

private Optional<FlinkJobType> flinkJobType;

private FlinkJobType flinkJobType;
private Optional<String> pipelineName;

public String getSessionUrl() {
private Optional<Integer> executionCheckPointInterval;

public Optional<String> getSessionUrl() {
return sessionUrl;
}

public FlinkJobType getFlinkJobType() {
public Optional<FlinkJobType> getFlinkJobType() {
return flinkJobType;
}

public Optional<String> getPipelineName() {
return pipelineName;
}

public Optional<Integer> getExecutionCheckPointInterval() {
return executionCheckPointInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ public abstract class FlinkCdcTableSyncActionContext extends FlinkActionContext
@Nullable
protected String primaryKeys;

@ActionConf("type_mapping")
@Nullable
protected String typeMapping;

@ActionConf("computed_column")
@Nullable
protected String computedColumn;
protected List<String> computedColumnList;

@ActionConf("metadata_column")
@Nullable
protected String metaDataColumn;

@ActionConf(value = "catalog_conf")
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Optional;

/**
* A factory designed for creating {@link FlinkCdcActionContextFactory}, implementing full database
* synchronization with MySQL.
Expand All @@ -52,13 +54,21 @@ public FlinkCdcSyncType cdcType() {
@Override
public ActionContext getActionContext(ObjectNode actionConfigs) {
return MysqlSyncDatabaseActionContext.builder()
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
.flinkJobType(FlinkJobType.SESSION)
.sessionUrl(
Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))))
.flinkJobType(Optional.of(FlinkJobType.SESSION))
.warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE))
.database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE))
.actionPath(ActionContextUtil.getActionJarPath())
.catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF))
.mysqlConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.MYSQL_CONF))
.executionCheckPointInterval(
Optional.of(
JSONUtils.getInteger(
actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL)))
.pipelineName(
Optional.of(
JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Optional;

/** MysqlSyncTableActionContextFactory. */
public class MysqlSyncTableActionContextFactory implements FlinkCdcActionContextFactory {

Expand All @@ -50,15 +52,29 @@ public FlinkCdcSyncType cdcType() {
@Override
public ActionContext getActionContext(ObjectNode actionConfigs) {
return MysqlSyncTableActionContext.builder()
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
.flinkJobType(FlinkJobType.SESSION)
.sessionUrl(
Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))))
.flinkJobType(Optional.of(FlinkJobType.SESSION))
.warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE))
.database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE))
.table(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TABLE))
.primaryKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PRIMARY_KEYS))
.typeMapping(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TYPE_MAPPING))
.actionPath(ActionContextUtil.getActionJarPath())
.catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF))
.mysqlConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.MYSQL_CONF))
.computedColumnList(
JSONUtils.getList(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN))
.metaDataColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.METADATA_COLUMN))
.partitionKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PARTITION_KEYS))
.tableConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.TABLE_CONF))
.executionCheckPointInterval(
Optional.of(
JSONUtils.getInteger(
actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL)))
.pipelineName(
Optional.of(
JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Optional;

/** PostgresSyncTableActionContextFactory. */
public class PostgresSyncTableActionContextFactory implements FlinkCdcActionContextFactory {
@Override
Expand All @@ -49,17 +51,29 @@ public FlinkCdcSyncType cdcType() {
@Override
public ActionContext getActionContext(ObjectNode actionConfigs) {
return PostgresSyncTableActionContext.builder()
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
.flinkJobType(FlinkJobType.SESSION)
.sessionUrl(
Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))))
.flinkJobType(Optional.of(FlinkJobType.SESSION))
.warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE))
.database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE))
.table(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TABLE))
.partitionKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PARTITION_KEYS))
.primaryKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PRIMARY_KEYS))
.computedColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN))
.typeMapping(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TYPE_MAPPING))
.computedColumnList(
JSONUtils.getList(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN))
.metaDataColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.METADATA_COLUMN))
.actionPath(ActionContextUtil.getActionJarPath())
.catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF))
.postgresConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.POSTGRES_CONF))
.tableConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.TABLE_CONF))
.executionCheckPointInterval(
Optional.of(
JSONUtils.getInteger(
actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL)))
.pipelineName(
Optional.of(
JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class FlinkCdcOptions {
private FlinkCdcOptions() {}

public static final String MYSQL_CONF = "mysql_conf";

public static final String POSTGRES_CONF = "postgres_conf";

public static final String TABLE_CONF = "table_conf";
Expand All @@ -38,9 +39,17 @@ private FlinkCdcOptions() {}

public static final String PRIMARY_KEYS = "primary_keys";

public static final String TYPE_MAPPING = "type_mapping";

public static final String COMPUTED_COLUMN = "computed_column";

public static final String METADATA_COLUMN = "metadata_column";

public static final String SESSION_URL = "sessionUrl";

public static final String CATALOG_CONF = "catalog_conf";

public static final String PIPELINE_NAME = "pipeline.name";

public static final String EXE_CP_INTERVAL = "execution.checkpointing.interval";
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,53 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/** An abstract Action service that executes actions through the shell. */
@Slf4j
public class FlinkCdcActionService implements ActionService {
private static final ExecutorService shellExecutor = Executors.newFixedThreadPool(5);

private static final int DEFAULT_TIMEOUT_SECCOND = 60;
private static final String JOB_ID_LOG_PREFIX = "Job has been submitted with JobID ";

private List<String> getCommand(FlinkActionContext actionContext) {
List<String> commandList = new ArrayList<>();
commandList.add("bin/flink");
commandList.add("run");
if (actionContext.getFlinkJobType() != FlinkJobType.SESSION) {
commandList.add("-d");

if (actionContext.getFlinkJobType().isPresent()
&& actionContext.getFlinkJobType().get() != FlinkJobType.SESSION) {
throw new ActionException("Only support session job now.");
}
String sessionUrl = actionContext.getSessionUrl();
if (StringUtils.isNotBlank(sessionUrl)) {
commandList.add("-m");
commandList.add(sessionUrl);
}

actionContext
.getSessionUrl()
.ifPresent(
val -> {
commandList.add("-m");
commandList.add(val);
});
actionContext
.getPipelineName()
.ifPresent(
val -> {
commandList.add("-Dpipeline.name=" + val);
});

actionContext
.getExecutionCheckPointInterval()
.ifPresent(
val -> {
commandList.add("-Dexecution.checkpointing.interval=" + val);
});

commandList.add(actionContext.getJarPath());
commandList.addAll(actionContext.getArguments());
return commandList;
}

public ActionExecutionResult execute(ActionContext actionContext) throws Exception {
public ActionExecutionResult execute(ActionContext actionContext) {
String flinkHome = getFlinkHome();
FlinkActionContext flinkActionContext;
if (!(actionContext instanceof FlinkActionContext)) {
Expand All @@ -69,25 +90,46 @@ public ActionExecutionResult execute(ActionContext actionContext) throws Excepti
try {
List<String> command = getCommand(flinkActionContext);
Process process = new ShellService(flinkHome, command).execute();
shellExecutor.execute(
() -> {
try (InputStream inputStream = process.getInputStream();
InputStream errorStream = process.getErrorStream(); ) {
List<String> logLines =
IOUtils.readLines(inputStream, StandardCharsets.UTF_8);
for (String logLine : logLines) {
log.info(logLine);
}
List<String> errorLines =
IOUtils.readLines(errorStream, StandardCharsets.UTF_8);
for (String logLine : errorLines) {
log.error(logLine);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
result = ActionExecutionResult.success();
boolean hasExited = process.waitFor(DEFAULT_TIMEOUT_SECCOND, TimeUnit.SECONDS);
if (hasExited) {
int value = process.exitValue();
try (InputStream inputStream = process.getInputStream();
InputStream errorStream = process.getErrorStream()) {
List<String> lines = IOUtils.readLines(inputStream, StandardCharsets.UTF_8);
if (value == 0) {
String infoLines = String.join("\n", lines);
log.info(
"run shell command [{}] get info log\n{}",
String.join(" ", command),
infoLines);
String flinkJobId =
lines.stream()
.filter(item -> item.startsWith(JOB_ID_LOG_PREFIX))
.map(item -> item.replaceAll(JOB_ID_LOG_PREFIX, ""))
.findFirst()
.orElse(null);
result = ActionExecutionResult.success(flinkJobId);
} else {
lines.addAll(IOUtils.readLines(errorStream, StandardCharsets.UTF_8));
String errLines = String.join("\n", lines);
log.info(
"run shell command [{}] get error log\n{}",
String.join(" ", command),
errLines);
result = ActionExecutionResult.fail(errLines);
}
} catch (Exception exception) {
log.error(exception.getMessage(), exception);
result = ActionExecutionResult.fail(exception.getMessage());
}
} else {
process.destroyForcibly();
result =
ActionExecutionResult.fail(
String.format(
"run shell command timeout after %s second",
DEFAULT_TIMEOUT_SECCOND));
}
} catch (Exception exception) {
log.error(exception.getMessage(), exception);
result = ActionExecutionResult.fail(exception.getMessage());
Expand Down
Loading
Loading