diff --git a/README.md b/README.md index cfc74ba15..cee61c161 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,9 @@ This repository is web ui for the [Apache Paimon](https://paimon.apache.org/) pr ## About Apache Paimon is an open source project of [The Apache Software Foundation](https://apache.org/) (ASF). + +## build +1. to install nodejs and npm in your environment +2. cd paimon-web-ui && npm install +3. cd ../ && mvn clean package -Prelease -DskipTests -Drat.skip=true +4. you find apache-paimon-webui-0.1-SNAPSHOT-bin.tar.gz in paimon-web-dist/target \ No newline at end of file diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionExecutionResult.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionExecutionResult.java index 13077aad5..75dcf98b0 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionExecutionResult.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionExecutionResult.java @@ -21,18 +21,20 @@ /** ActionExecutionResult. */ public class ActionExecutionResult { + private String flinkJobId; private boolean success; private String errorMsg; - public static ActionExecutionResult success() { + public static ActionExecutionResult success(String flinkJobId) { ActionExecutionResult actionExecutionResult = new ActionExecutionResult(); actionExecutionResult.success = true; + actionExecutionResult.flinkJobId = flinkJobId; return actionExecutionResult; } public static ActionExecutionResult fail(String errorMsg) { ActionExecutionResult actionExecutionResult = new ActionExecutionResult(); - actionExecutionResult.success = true; + actionExecutionResult.success = false; actionExecutionResult.errorMsg = errorMsg; return actionExecutionResult; } @@ -41,15 +43,11 @@ public boolean isSuccess() { return success; } - public void setSuccess(boolean success) { - this.success = success; - } - public String getErrorMsg() { return errorMsg; } - public void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; + public String getFlinkJobId() { + return flinkJobId; } } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkActionContext.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkActionContext.java index d82339940..ce7e95057 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkActionContext.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkActionContext.java @@ -22,19 +22,33 @@ import lombok.experimental.SuperBuilder; +import java.util.Optional; + /** FlinkActionContext. */ @SuperBuilder public abstract class FlinkActionContext extends AbstractActionContext implements ActionContext { - private String sessionUrl; + private Optional sessionUrl; + + private Optional flinkJobType; - private FlinkJobType flinkJobType; + private Optional pipelineName; - public String getSessionUrl() { + private Optional executionCheckPointInterval; + + public Optional getSessionUrl() { return sessionUrl; } - public FlinkJobType getFlinkJobType() { + public Optional getFlinkJobType() { return flinkJobType; } + + public Optional getPipelineName() { + return pipelineName; + } + + public Optional getExecutionCheckPointInterval() { + return executionCheckPointInterval; + } } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java index c4ca2745d..02bd41d33 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java @@ -46,9 +46,17 @@ public abstract class FlinkCdcTableSyncActionContext extends FlinkActionContext @Nullable protected String primaryKeys; + @ActionConf("type_mapping") + @Nullable + protected String typeMapping; + @ActionConf("computed_column") @Nullable - protected String computedColumn; + protected List computedColumnList; + + @ActionConf("metadata_column") + @Nullable + protected String metaDataColumn; @ActionConf(value = "catalog_conf") @Nullable diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java index 312b59c1e..b6303fd66 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java @@ -29,6 +29,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Optional; + /** * A factory designed for creating {@link FlinkCdcActionContextFactory}, implementing full database * synchronization with MySQL. @@ -52,13 +54,21 @@ public FlinkCdcSyncType cdcType() { @Override public ActionContext getActionContext(ObjectNode actionConfigs) { return MysqlSyncDatabaseActionContext.builder() - .sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))) - .flinkJobType(FlinkJobType.SESSION) + .sessionUrl( + Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))) + .flinkJobType(Optional.of(FlinkJobType.SESSION)) .warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE)) .database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE)) .actionPath(ActionContextUtil.getActionJarPath()) .catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF)) .mysqlConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.MYSQL_CONF)) + .executionCheckPointInterval( + Optional.of( + JSONUtils.getInteger( + actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL))) + .pipelineName( + Optional.of( + JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME))) .build(); } } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java index a71d772b4..60fbadc96 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java @@ -29,6 +29,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Optional; + /** MysqlSyncTableActionContextFactory. */ public class MysqlSyncTableActionContextFactory implements FlinkCdcActionContextFactory { @@ -50,15 +52,29 @@ public FlinkCdcSyncType cdcType() { @Override public ActionContext getActionContext(ObjectNode actionConfigs) { return MysqlSyncTableActionContext.builder() - .sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))) - .flinkJobType(FlinkJobType.SESSION) + .sessionUrl( + Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))) + .flinkJobType(Optional.of(FlinkJobType.SESSION)) .warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE)) .database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE)) .table(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TABLE)) .primaryKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PRIMARY_KEYS)) + .typeMapping(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TYPE_MAPPING)) .actionPath(ActionContextUtil.getActionJarPath()) .catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF)) .mysqlConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.MYSQL_CONF)) + .computedColumnList( + JSONUtils.getList(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN)) + .metaDataColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.METADATA_COLUMN)) + .partitionKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PARTITION_KEYS)) + .tableConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.TABLE_CONF)) + .executionCheckPointInterval( + Optional.of( + JSONUtils.getInteger( + actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL))) + .pipelineName( + Optional.of( + JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME))) .build(); } } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java index 3a176b273..0273b56c2 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java @@ -29,6 +29,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Optional; + /** PostgresSyncTableActionContextFactory. */ public class PostgresSyncTableActionContextFactory implements FlinkCdcActionContextFactory { @Override @@ -49,17 +51,29 @@ public FlinkCdcSyncType cdcType() { @Override public ActionContext getActionContext(ObjectNode actionConfigs) { return PostgresSyncTableActionContext.builder() - .sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL))) - .flinkJobType(FlinkJobType.SESSION) + .sessionUrl( + Optional.of(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))) + .flinkJobType(Optional.of(FlinkJobType.SESSION)) .warehouse(JSONUtils.getString(actionConfigs, FlinkCdcOptions.WAREHOUSE)) .database(JSONUtils.getString(actionConfigs, FlinkCdcOptions.DATABASE)) .table(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TABLE)) .partitionKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PARTITION_KEYS)) .primaryKeys(JSONUtils.getString(actionConfigs, FlinkCdcOptions.PRIMARY_KEYS)) - .computedColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN)) + .typeMapping(JSONUtils.getString(actionConfigs, FlinkCdcOptions.TYPE_MAPPING)) + .computedColumnList( + JSONUtils.getList(actionConfigs, FlinkCdcOptions.COMPUTED_COLUMN)) + .metaDataColumn(JSONUtils.getString(actionConfigs, FlinkCdcOptions.METADATA_COLUMN)) .actionPath(ActionContextUtil.getActionJarPath()) .catalogConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.CATALOG_CONF)) .postgresConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.POSTGRES_CONF)) + .tableConfList(JSONUtils.getList(actionConfigs, FlinkCdcOptions.TABLE_CONF)) + .executionCheckPointInterval( + Optional.of( + JSONUtils.getInteger( + actionConfigs, FlinkCdcOptions.EXE_CP_INTERVAL))) + .pipelineName( + Optional.of( + JSONUtils.getString(actionConfigs, FlinkCdcOptions.PIPELINE_NAME))) .build(); } } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java index ce364dca4..a441e075c 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java @@ -24,6 +24,7 @@ public class FlinkCdcOptions { private FlinkCdcOptions() {} public static final String MYSQL_CONF = "mysql_conf"; + public static final String POSTGRES_CONF = "postgres_conf"; public static final String TABLE_CONF = "table_conf"; @@ -38,9 +39,17 @@ private FlinkCdcOptions() {} public static final String PRIMARY_KEYS = "primary_keys"; + public static final String TYPE_MAPPING = "type_mapping"; + public static final String COMPUTED_COLUMN = "computed_column"; + public static final String METADATA_COLUMN = "metadata_column"; + public static final String SESSION_URL = "sessionUrl"; public static final String CATALOG_CONF = "catalog_conf"; + + public static final String PIPELINE_NAME = "pipeline.name"; + + public static final String EXE_CP_INTERVAL = "execution.checkpointing.interval"; } diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java index cf73f55b1..eeb8611d8 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java @@ -33,32 +33,53 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** An abstract Action service that executes actions through the shell. */ @Slf4j public class FlinkCdcActionService implements ActionService { - private static final ExecutorService shellExecutor = Executors.newFixedThreadPool(5); + + private static final int DEFAULT_TIMEOUT_SECCOND = 60; + private static final String JOB_ID_LOG_PREFIX = "Job has been submitted with JobID "; private List getCommand(FlinkActionContext actionContext) { List commandList = new ArrayList<>(); commandList.add("bin/flink"); commandList.add("run"); - if (actionContext.getFlinkJobType() != FlinkJobType.SESSION) { + commandList.add("-d"); + + if (actionContext.getFlinkJobType().isPresent() + && actionContext.getFlinkJobType().get() != FlinkJobType.SESSION) { throw new ActionException("Only support session job now."); } - String sessionUrl = actionContext.getSessionUrl(); - if (StringUtils.isNotBlank(sessionUrl)) { - commandList.add("-m"); - commandList.add(sessionUrl); - } + + actionContext + .getSessionUrl() + .ifPresent( + val -> { + commandList.add("-m"); + commandList.add(val); + }); + actionContext + .getPipelineName() + .ifPresent( + val -> { + commandList.add("-Dpipeline.name=" + val); + }); + + actionContext + .getExecutionCheckPointInterval() + .ifPresent( + val -> { + commandList.add("-Dexecution.checkpointing.interval=" + val); + }); + commandList.add(actionContext.getJarPath()); commandList.addAll(actionContext.getArguments()); return commandList; } - public ActionExecutionResult execute(ActionContext actionContext) throws Exception { + public ActionExecutionResult execute(ActionContext actionContext) { String flinkHome = getFlinkHome(); FlinkActionContext flinkActionContext; if (!(actionContext instanceof FlinkActionContext)) { @@ -69,25 +90,46 @@ public ActionExecutionResult execute(ActionContext actionContext) throws Excepti try { List command = getCommand(flinkActionContext); Process process = new ShellService(flinkHome, command).execute(); - shellExecutor.execute( - () -> { - try (InputStream inputStream = process.getInputStream(); - InputStream errorStream = process.getErrorStream(); ) { - List logLines = - IOUtils.readLines(inputStream, StandardCharsets.UTF_8); - for (String logLine : logLines) { - log.info(logLine); - } - List errorLines = - IOUtils.readLines(errorStream, StandardCharsets.UTF_8); - for (String logLine : errorLines) { - log.error(logLine); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - }); - result = ActionExecutionResult.success(); + boolean hasExited = process.waitFor(DEFAULT_TIMEOUT_SECCOND, TimeUnit.SECONDS); + if (hasExited) { + int value = process.exitValue(); + try (InputStream inputStream = process.getInputStream(); + InputStream errorStream = process.getErrorStream()) { + List lines = IOUtils.readLines(inputStream, StandardCharsets.UTF_8); + if (value == 0) { + String infoLines = String.join("\n", lines); + log.info( + "run shell command [{}] get info log\n{}", + String.join(" ", command), + infoLines); + String flinkJobId = + lines.stream() + .filter(item -> item.startsWith(JOB_ID_LOG_PREFIX)) + .map(item -> item.replaceAll(JOB_ID_LOG_PREFIX, "")) + .findFirst() + .orElse(null); + result = ActionExecutionResult.success(flinkJobId); + } else { + lines.addAll(IOUtils.readLines(errorStream, StandardCharsets.UTF_8)); + String errLines = String.join("\n", lines); + log.info( + "run shell command [{}] get error log\n{}", + String.join(" ", command), + errLines); + result = ActionExecutionResult.fail(errLines); + } + } catch (Exception exception) { + log.error(exception.getMessage(), exception); + result = ActionExecutionResult.fail(exception.getMessage()); + } + } else { + process.destroyForcibly(); + result = + ActionExecutionResult.fail( + String.format( + "run shell command timeout after %s second", + DEFAULT_TIMEOUT_SECCOND)); + } } catch (Exception exception) { log.error(exception.getMessage(), exception); result = ActionExecutionResult.fail(exception.getMessage()); diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/SyncMode.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/SyncMode.java new file mode 100644 index 000000000..8de326649 --- /dev/null +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/SyncMode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.api.enums; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.Objects; + +/** StartupMode. */ +@Getter +public enum SyncMode { + INCREMENTAL_SYNC(0), + FULL_SYNC(1), + TS_SYNC(2); + + private final int mode; + + SyncMode(Integer mode) { + this.mode = mode; + } + + public static SyncMode valueOf(Integer mode) { + return Arrays.stream(values()) + .filter(x -> Objects.equals(x.getMode(), mode)) + .findFirst() + .orElse(null); + } +} diff --git a/paimon-web-api/src/main/java/org/apache/paimon/web/api/shell/ShellService.java b/paimon-web-api/src/main/java/org/apache/paimon/web/api/shell/ShellService.java index 8aa08a016..793150384 100644 --- a/paimon-web-api/src/main/java/org/apache/paimon/web/api/shell/ShellService.java +++ b/paimon-web-api/src/main/java/org/apache/paimon/web/api/shell/ShellService.java @@ -39,9 +39,9 @@ public ShellService(String workingDirectory, List executeCommand) { public Process execute() throws IOException { ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.directory(new File(workingDirectory)); - processBuilder.redirectErrorStream(true); + processBuilder.redirectErrorStream(false); processBuilder.command(executeCommand); - log.info("Executing shell command : {}", String.join(" ", executeCommand)); + log.info("run shell command : {}", String.join(" ", executeCommand)); return processBuilder.start(); } } diff --git a/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java index cb3240d04..203981534 100644 --- a/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java +++ b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java @@ -131,19 +131,17 @@ public static List getList(JsonNode jsonNode, String fieldName, Class if (!(child instanceof ArrayNode) && !(child instanceof POJONode)) { return new ArrayList<>(); } - List childArray; + List result = new ArrayList<>(); if (child instanceof POJONode) { Object pojo = ((POJONode) child).getPojo(); - childArray = (List) pojo; + List childArray = (List) pojo; + childArray.forEach( + e -> result.add(JSONUtils.parseObject(JSONUtils.toJsonString(e), clazz))); } else { - childArray = (List) child; + ArrayNode arrayNode = (ArrayNode) jsonNode.get(fieldName); + arrayNode.forEach( + e -> result.add(JSONUtils.parseObject(JSONUtils.toJsonString(e), clazz))); } - - List result = new ArrayList<>(); - childArray.forEach( - e -> { - result.add(JSONUtils.parseObject(JSONUtils.toJsonString(e), clazz)); - }); return result; } diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/status/JobStatus.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/status/JobStatus.java index aabaab1eb..c14879fc4 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/status/JobStatus.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/status/JobStatus.java @@ -20,11 +20,16 @@ /** Represents the various states that a job. */ public enum JobStatus { + RESTARTING("RESTARTING"), + FRESHED("FRESHED"), + CANCELLING("CANCELLING"), + SUBMITTING("SUBMITTING"), CREATED("CREATED"), RUNNING("RUNNING"), FINISHED("FINISHED"), FAILED("FAILED"), - CANCELED("CANCELED"); + CANCELED("CANCELED"), + UNKNOWN("UNKNOWN"); private final String value; diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/FlinkJobAction.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/FlinkJobAction.java new file mode 100644 index 000000000..f96213320 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/FlinkJobAction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.client; + +import org.apache.paimon.web.engine.flink.sql.gateway.model.JobOverviewEntity; +import org.apache.paimon.web.engine.flink.sql.gateway.model.TriggerIdEntity; + +/** Using to execute flink job action. */ +public interface FlinkJobAction { + + /** + * Execute cluster action to get job overview. + * + * @param jobId the flink jobId + * @return flink job entity. + */ + JobOverviewEntity jobOverview(String jobId); + + /** + * Stops a job with a savepoint. + * + * @param jobId the flink jobId + * @return return a 'triggerId' for further query identifier. + */ + TriggerIdEntity stopWithSavePoint(String jobId); +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessageParameters.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessageParameters.java new file mode 100644 index 000000000..3340f9593 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessageParameters.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.client; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** object using for jobId param. */ +public class JobIdMessageParameters extends MessageParameters { + + private final JobIdMessagePathParameter jobIdMessagePathParameter = + new JobIdMessagePathParameter(); + + public JobIdMessageParameters(String jobId) { + jobIdMessagePathParameter.resolve(jobId); + } + + @Override + public Collection> getPathParameters() { + return Collections.singletonList(this.jobIdMessagePathParameter); + } + + @Override + public Collection> getQueryParameters() { + return Collections.emptyList(); + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessagePathParameter.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessagePathParameter.java new file mode 100644 index 000000000..e719a9511 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobIdMessagePathParameter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.client; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +/** use to resolve jobId param. */ +public class JobIdMessagePathParameter extends MessagePathParameter { + + public static final String KEY = "jobid"; + + protected JobIdMessagePathParameter() { + super(KEY); + } + + @Override + protected String convertFromString(String jobid) throws ConversionException { + return jobid; + } + + @Override + protected String convertToString(String jobid) { + return jobid; + } + + @Override + public String getDescription() { + return "the id of flink job"; + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobOverViewHeaders.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobOverViewHeaders.java new file mode 100644 index 000000000..db777ca24 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/JobOverViewHeaders.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.client; + +import org.apache.paimon.web.engine.flink.sql.gateway.model.JobOverviewEntity; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** job overview headers. */ +public class JobOverViewHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, JobOverviewEntity, JobIdMessageParameters> { + + private final String jobId; + public static final String URL = "/jobs/:" + JobIdMessagePathParameter.KEY; + + public JobOverViewHeaders(String jobId) { + this.jobId = jobId; + } + + @Override + public Class getResponseClass() { + return JobOverviewEntity.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Returns details of a job."; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JobIdMessageParameters getUnresolvedMessageParameters() { + return new JobIdMessageParameters(jobId); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SessionClusterClient.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SessionClusterClient.java index 81b3c515f..b595dddca 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SessionClusterClient.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SessionClusterClient.java @@ -20,6 +20,8 @@ import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus; import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity; +import org.apache.paimon.web.engine.flink.sql.gateway.model.JobOverviewEntity; +import org.apache.paimon.web.engine.flink.sql.gateway.model.TriggerIdEntity; import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient; import lombok.extern.slf4j.Slf4j; @@ -35,7 +37,7 @@ * the cluster status. etc. The flink client implementation of the {@link HeartbeatAction}. */ @Slf4j -public class SessionClusterClient implements HeartbeatAction { +public class SessionClusterClient implements HeartbeatAction, FlinkJobAction { private final SqlGateWayRestClient restClient; @@ -61,13 +63,51 @@ public HeartbeatEntity checkClusterHeartbeat() { .clusterVersion(heartbeat.getVersion()) .build(); } + } catch (Exception ex) { + // log.error( + // "An exception occurred while obtaining the cluster status :{}", + // ex.getMessage(), + // ex); + return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE); + } + return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN); + } + + @Override + public JobOverviewEntity jobOverview(String jobId) { + try { + return restClient + .sendRequest( + new JobOverViewHeaders(jobId), + new JobIdMessageParameters(jobId), + EmptyRequestBody.getInstance()) + .get(); } catch (Exception ex) { log.error( - "An exception occurred while obtaining the cluster status :{}", + "An exception occurred while request job of {} overview: {}", + jobId, ex.getMessage(), ex); - return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE); } - return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN); + return null; + } + + @Override + public TriggerIdEntity stopWithSavePoint(String jobId) { + try { + return restClient + .sendRequest( + new StopWithSavePointHeaders(jobId), + new JobIdMessageParameters(jobId), + EmptyRequestBody.getInstance()) + .get(); + } catch (Exception ex) { + log.error( + "An exception occurred while stop job of {} with savePoint: {}", + jobId, + ex.getMessage(), + ex); + } + return null; } } diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SqlGatewayClient.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SqlGatewayClient.java index 79835fcbc..3d9df7c08 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SqlGatewayClient.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/SqlGatewayClient.java @@ -268,10 +268,10 @@ public HeartbeatEntity checkClusterHeartbeat() { .build(); } } catch (Exception exec) { - log.error( - "An exception occurred while obtaining the cluster status :{}", - exec.getMessage(), - exec); + // log.error( + // "An exception occurred while obtaining the cluster status :{}", + // exec.getMessage(), + // exec); return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE); } return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN); diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/StopWithSavePointHeaders.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/StopWithSavePointHeaders.java new file mode 100644 index 000000000..c715bfe86 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/client/StopWithSavePointHeaders.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.client; + +import org.apache.paimon.web.engine.flink.sql.gateway.model.TriggerIdEntity; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** headers using for stopping with savepoint . */ +public class StopWithSavePointHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, TriggerIdEntity, JobIdMessageParameters> { + + private final String jobId; + public static final String URL = "/jobs/:" + JobIdMessagePathParameter.KEY + "/stop"; + + public StopWithSavePointHeaders(String jobId) { + this.jobId = jobId; + } + + @Override + public Class getResponseClass() { + return TriggerIdEntity.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Stops a job with a savepoint. Optionally, it can also emit a MAX_WATERMARK before taking the savepoint" + + " to flush out any state waiting for timers to fire. This async operation would return a 'triggerid' " + + "for further query identifier."; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JobIdMessageParameters getUnresolvedMessageParameters() { + return new JobIdMessageParameters(jobId); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java index ab58e471c..fcffc15d0 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java @@ -23,6 +23,7 @@ import org.apache.paimon.web.engine.flink.common.parser.CustomSqlParser; import org.apache.paimon.web.engine.flink.common.result.ExecutionResult; import org.apache.paimon.web.engine.flink.common.result.FetchResultParams; +import org.apache.paimon.web.engine.flink.common.status.JobStatus; import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient; import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity; import org.apache.paimon.web.engine.flink.sql.gateway.utils.CollectResultUtil; @@ -30,6 +31,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.flink.api.common.JobID; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; @@ -102,6 +104,14 @@ public ExecutionResult executeSql(String multiStatement, int maxRows) throws Exc executionResult = executeDmlStatement(combinedStatement); } + if (executionResult == null) { + executionResult = + new ExecutionResult.Builder() + .shouldFetchResult(false) + .status(JobStatus.FINISHED.getValue()) + .build(); + } + return executionResult; } @@ -113,8 +123,12 @@ private ExecutionResult executeDqlStatement( ExecutionResult.Builder builder = CollectResultUtil.collectSqlGatewayResult(results.getResults()) .submitId(operationId); - if (operationType.getType().equals(FlinkSqlOperationType.SELECT.getType())) { - builder.jobId(getJobIdFromResults(results)).shouldFetchResult(true); + JobID jobId = results.getJobID(); + if (jobId != null + && operationType.getType().equals(FlinkSqlOperationType.SELECT.getType())) { + builder.jobId(jobId.toString()).shouldFetchResult(true); + } else if (jobId == null) { + builder.shouldFetchResult(false).status(JobStatus.FINISHED.getValue()); } return builder.build(); } diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/JobOverviewEntity.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/JobOverviewEntity.java new file mode 100644 index 000000000..f24b26d7f --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/JobOverviewEntity.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.model; + +import lombok.Getter; +import org.apache.flink.runtime.messages.webmonitor.InfoMessage; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** entity using for job overview . */ +@Getter +@JsonIgnoreProperties(ignoreUnknown = true) +public class JobOverviewEntity implements ResponseBody, InfoMessage { + + public static final String FIELD_NAME_JID = "jid"; + public static final String FIELD_NAME_NAME = "name"; + public static final String FIELD_NAME_IS_STOP_ABLE = "isStoppable"; + public static final String FIELD_NAME_STATE = "state"; + public static final String FIELD_NAME_START_TIME = "start-time"; + public static final String FIELD_NAME_END_TIME = "end-time"; + public static final String FIELD_NAME_DURATION = "duration"; + public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism"; + public static final String FIELD_NAME_NOW = "now"; + private static final String FIELD_NAME_TIMESTAMP = "timestamp"; + private static final String FIELD_NAME_VERTICES = "vertices"; + private static final String FIELD_NAME_STATUS_COUNTS = "status-counts"; + private static final String FIELD_NAME_PLAN = "plan"; + + @JsonProperty(FIELD_NAME_JID) + private String jid; + + @JsonProperty(FIELD_NAME_NAME) + private String name; + + @JsonProperty(FIELD_NAME_IS_STOP_ABLE) + private Boolean isStoppable; + + @JsonProperty(FIELD_NAME_STATE) + private String state; + + @JsonProperty(FIELD_NAME_START_TIME) + private Long startTime; + + @JsonProperty(FIELD_NAME_END_TIME) + private Long endTime; + + @JsonProperty(FIELD_NAME_DURATION) + private Long duration; + + @JsonProperty(FIELD_NAME_MAX_PARALLELISM) + private Integer maxParallelism; + + @JsonProperty(FIELD_NAME_NOW) + private Long now; + + @JsonProperty(FIELD_NAME_TIMESTAMP) + private Map timestamps; + + @JsonProperty(FIELD_NAME_VERTICES) + private List vertices; + + @JsonProperty(FIELD_NAME_STATUS_COUNTS) + private Map statusCounts; + + @JsonProperty(FIELD_NAME_PLAN) + private Object plan; + + @JsonCreator + public JobOverviewEntity( + @JsonProperty(FIELD_NAME_JID) String jid, + @JsonProperty(FIELD_NAME_NAME) String name, + @JsonProperty(FIELD_NAME_IS_STOP_ABLE) Boolean isStoppable, + @JsonProperty(FIELD_NAME_STATE) String state, + @JsonProperty(FIELD_NAME_START_TIME) Long startTime, + @JsonProperty(FIELD_NAME_END_TIME) Long endTime, + @JsonProperty(FIELD_NAME_DURATION) Long duration, + @JsonProperty(FIELD_NAME_MAX_PARALLELISM) Integer maxParallelism, + @JsonProperty(FIELD_NAME_NOW) Long now, + @JsonProperty(FIELD_NAME_TIMESTAMP) Map timestamps, + @JsonProperty(FIELD_NAME_VERTICES) List vertices, + @JsonProperty(FIELD_NAME_STATUS_COUNTS) Map statusCounts, + @JsonProperty(FIELD_NAME_PLAN) Object plan) { + + this.jid = jid; + this.name = name; + this.isStoppable = isStoppable; + this.state = state; + this.startTime = startTime; + this.endTime = endTime; + this.duration = duration; + this.maxParallelism = maxParallelism; + this.now = now; + this.timestamps = timestamps; + this.vertices = vertices; + this.statusCounts = statusCounts; + this.plan = plan; + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/TriggerIdEntity.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/TriggerIdEntity.java new file mode 100644 index 000000000..b472e91e5 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/model/TriggerIdEntity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.sql.gateway.model; + +import lombok.Getter; +import org.apache.flink.runtime.messages.webmonitor.InfoMessage; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** entity using for trigger id . */ +@Getter +public class TriggerIdEntity implements ResponseBody, InfoMessage { + + public static final String FIELD_NAME_TRIGGER_ID = "request-id"; + + @JsonProperty(FIELD_NAME_TRIGGER_ID) + private String triggerId; + + @JsonCreator + public TriggerIdEntity(@JsonProperty(FIELD_NAME_TRIGGER_ID) String triggerId) { + this.triggerId = triggerId; + } +} diff --git a/paimon-web-gateway/src/main/java/org/apache/paimon/web/gateway/enums/DeploymentMode.java b/paimon-web-gateway/src/main/java/org/apache/paimon/web/gateway/enums/DeploymentMode.java index a70961deb..b982b99f0 100644 --- a/paimon-web-gateway/src/main/java/org/apache/paimon/web/gateway/enums/DeploymentMode.java +++ b/paimon-web-gateway/src/main/java/org/apache/paimon/web/gateway/enums/DeploymentMode.java @@ -23,6 +23,7 @@ * supported. */ public enum DeploymentMode { + K8S_SESSION("k8s-session"), YARN_SESSION("yarn-session"), FLINK_SQL_GATEWAY("flink-sql-gateway"); diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml index 9bd9c4333..be8cf9e8a 100644 --- a/paimon-web-server/pom.xml +++ b/paimon-web-server/pom.xml @@ -248,6 +248,12 @@ under the License. + + org.apache.flink + flink-shaded-guava + 31.1-jre-17.0 + + commons-cli commons-cli diff --git a/paimon-web-server/src/main/bin/start.sh b/paimon-web-server/src/main/bin/start.sh index 462922c5c..d63de5528 100644 --- a/paimon-web-server/src/main/bin/start.sh +++ b/paimon-web-server/src/main/bin/start.sh @@ -64,12 +64,18 @@ fi if [ -z "$JAVA_HOME" ]; then echo "JAVA_HOME is null, exit..." exit 1 +else + export JAVA_HOME=$JAVA_HOME fi if [ -z "$FLINK_HOME" ]; then echo "FLINK_HOME is null, CDC cannot be used normally!" +else + export FLINK_HOME=$FLINK_HOME fi if [ -z "$ACTION_JAR_PATH" ]; then echo "ACTION_JAR_PATH is null, CDC cannot be used normally!" +else + export ACTION_JAR_PATH=$ACTION_JAR_PATH fi @@ -83,4 +89,4 @@ else $JAVA_HOME/bin/java $JAVA_OPTS \ -cp "$PAIMON_UI_HOME/config:$PAIMON_UI_HOME/libs/*" \ org.apache.paimon.web.server.PaimonWebServerApplication -fi +fi \ No newline at end of file diff --git a/paimon-web-server/src/main/bin/stop.sh b/paimon-web-server/src/main/bin/stop.sh new file mode 100644 index 000000000..a955a02af --- /dev/null +++ b/paimon-web-server/src/main/bin/stop.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pid=$(ps ax | grep -i 'org.apache.paimon.web.server.PaimonWebServerApplication' | grep java | awk '{print $1}') + +if [ -z "$pid" ]; then + echo "No PaimonWebServerApplication to stop." + exit 1 +else + kill -s TERM $pid + sleep 3 + + if ps -p $pid > /dev/null; then + echo "Failed to stop PaimonWebServerApplication with PID $pid." + exit 1 + fi + + echo "PaimonWebServerApplication Stopped." +fi \ No newline at end of file diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/context/logtool/LogReadPool.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/context/logtool/LogReadPool.java index ca855d1d1..a6506e01f 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/context/logtool/LogReadPool.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/context/logtool/LogReadPool.java @@ -36,12 +36,10 @@ public LRUCache getLRUCache() { } public static void write(String str, String userId) { - consoleEntityCache - .computeIfAbsent(userId, k -> new StringBuilder("Console:\n")) - .append(str); + consoleEntityCache.computeIfAbsent(userId, k -> new StringBuilder()).append(str); } public static void clear(String userId) { - consoleEntityCache.put(userId, new StringBuilder("Console:\n")); + consoleEntityCache.put(userId, new StringBuilder()); } } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java index d7d512c7d..e78dad7d1 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java @@ -18,11 +18,14 @@ package org.apache.paimon.web.server.controller; +import org.apache.paimon.web.engine.flink.common.status.JobStatus; import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO; import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO; import org.apache.paimon.web.server.data.model.CdcJobDefinition; import org.apache.paimon.web.server.data.result.PageR; import org.apache.paimon.web.server.data.result.R; +import org.apache.paimon.web.server.data.vo.ActionExecutionResultVo; +import org.apache.paimon.web.server.data.vo.CdcJobDefinitionVO; import org.apache.paimon.web.server.service.CdcJobDefinitionService; import cn.dev33.satoken.annotation.SaCheckPermission; @@ -67,7 +70,7 @@ public R updateCdcJob(@Valid @RequestBody CdcJobDefinitionDTO cdcJobDefini @SaCheckPermission("cdc:job:list") @GetMapping("list") - public PageR listAllCdcJob( + public PageR listAllCdcJob( @RequestParam(required = false) boolean withConfig, @RequestParam(required = false) String jobName, @RequestParam long currentPage, @@ -88,13 +91,31 @@ public R getById(@PathVariable Integer id) { @SaCheckPermission("cdc:job:delete") @DeleteMapping("{id}") public R deleteById(@PathVariable Integer id) { - cdcJobDefinitionService.removeById(id); - return R.succeed(); + return cdcJobDefinitionService.deleteById(id); + } + + @SaCheckPermission("cdc:job:copy") + @PutMapping("{id}/copy") + public R copy(@PathVariable Integer id) { + return cdcJobDefinitionService.copy(id); } @SaCheckPermission("cdc:job:submit") @PostMapping("{id}/submit") - public R submit(@PathVariable Integer id, @RequestBody CdcJobSubmitDTO cdcJobSubmitDTO) { + public R submit( + @PathVariable Integer id, @RequestBody CdcJobSubmitDTO cdcJobSubmitDTO) { return cdcJobDefinitionService.submit(id, cdcJobSubmitDTO); } + + @SaCheckPermission("cdc:job:cancel") + @PostMapping("{id}/cancel") + public R cancel(@PathVariable Integer id) { + return cdcJobDefinitionService.cancel(id); + } + + @SaCheckPermission("cdc:job:status") + @GetMapping("{id}/status") + public R status(@PathVariable Integer id, @RequestParam Integer logId) { + return cdcJobDefinitionService.status(id, logId); + } } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/ClusterController.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/ClusterController.java index 762f78f76..14bfbbf5f 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/ClusterController.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/ClusterController.java @@ -65,7 +65,7 @@ public R getCluster(@PathVariable("id") Integer id) { @GetMapping("/list") public PageR listClusters(ClusterInfo clusterInfo) { IPage page = PageSupport.startPage(); - List clusterInfos = clusterService.listUsers(page, clusterInfo); + List clusterInfos = clusterService.listClusters(page, clusterInfo); return PageR.builder() .success(true) .total(page.getTotal()) diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobDefinitionDTO.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobDefinitionDTO.java index 5cca00ad3..ff9c46e55 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobDefinitionDTO.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobDefinitionDTO.java @@ -38,4 +38,6 @@ public class CdcJobDefinitionDTO { private String config; private String createUser; + + private int dataDelay; } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobLogDTO.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobLogDTO.java new file mode 100644 index 000000000..df3a69399 --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobLogDTO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.data.dto; + +import lombok.Data; + +import java.util.Map; + +/** DTO of cdcJobLog . */ +@Data +public class CdcJobLogDTO { + + private Integer id; + + private Long clusterId; + + private Long cdcJobDefinitionId; + + private String createUser; + + private String currentStatus; + + private String flinkJobId; + + private Map extra; +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java index 1c4492c9f..ab5aea05c 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java @@ -25,4 +25,7 @@ public class CdcJobSubmitDTO { private String clusterId; + private Long startupTimestamp; + private Integer startupMode; + private String createUser; } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobDefinition.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobDefinition.java index b90092de3..4dc1e2ff6 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobDefinition.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobDefinition.java @@ -47,5 +47,7 @@ public class CdcJobDefinition extends BaseModel implements Serializable { private String createUser; + private Integer dataDelay; + @TableLogic private boolean isDelete; } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobLog.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobLog.java new file mode 100644 index 000000000..414007e6a --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/CdcJobLog.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.data.model; + +import com.baomidou.mybatisplus.annotation.FieldFill; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** Model of cdc_job_log. */ +@TableName(value = "cdc_job_log") +@Data +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +@Builder +public class CdcJobLog extends BaseModel implements Serializable { + + private Integer clusterId; + + private Integer cdcJobDefinitionId; + + private String createUser; + + @TableField(fill = FieldFill.INSERT) + private LocalDateTime terminateTime; + + private String currentStatus; + + private String flinkJobId; +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/result/enums/Status.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/result/enums/Status.java index 50e4b4a89..863368664 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/result/enums/Status.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/result/enums/Status.java @@ -80,6 +80,9 @@ public enum Status { /** ------------cdc-----------------. */ CDC_JOB_EXIST_ERROR(10601, "cdc.job.exist.error"), CDC_JOB_NO_EXIST_ERROR(10602, "cdc.job.not.exist.error"), + CDC_JOB_FLINK_JOB_ID_NOT_EXISTS(10603, "cdc.job.flink.job.id.not.exists"), + CDC_JOB_FLINK_JOB_LOCKED(10604, "cdc.job.flink.job.locked"), + CDC_JOB_CAN_NOT_DELETE(10605, "cdc.job.can.not.deleted"), /** ------------cluster-----------------. */ CLUSTER_NOT_EXIST(10701, "cluster.not.exist"), diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/ActionExecutionResultVo.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/ActionExecutionResultVo.java new file mode 100644 index 000000000..6605aad48 --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/ActionExecutionResultVo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.data.vo; + +import org.apache.paimon.web.engine.flink.common.status.JobStatus; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** vo for action execution result . */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ActionExecutionResultVo { + + private Integer logId; + + private boolean success; + + private String errorMsg; + + private JobStatus jobStatus; +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/CdcJobDefinitionVO.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/CdcJobDefinitionVO.java new file mode 100644 index 000000000..0ce535bb5 --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/CdcJobDefinitionVO.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.data.vo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** VO of database. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CdcJobDefinitionVO { + + private Integer id; + + private String name; + + private String description; + + private Integer cdcType; + + private String config; + + private String createUser; + + private Integer dataDelay; + + private boolean isDelete; + + private String currentStatus; + + private LocalDateTime createTime; + + private LocalDateTime updateTime; +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/mapper/CdcJobLogMapper.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/mapper/CdcJobLogMapper.java new file mode 100644 index 000000000..742edb3e5 --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/mapper/CdcJobLogMapper.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.mapper; + +import org.apache.paimon.web.server.data.model.CdcJobLog; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** interface for cdc log mapper . */ +public interface CdcJobLogMapper extends BaseMapper {} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java index cdbde64fc..b2c90196f 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java @@ -18,11 +18,14 @@ package org.apache.paimon.web.server.service; +import org.apache.paimon.web.engine.flink.common.status.JobStatus; import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO; import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO; import org.apache.paimon.web.server.data.model.CdcJobDefinition; import org.apache.paimon.web.server.data.result.PageR; import org.apache.paimon.web.server.data.result.R; +import org.apache.paimon.web.server.data.vo.ActionExecutionResultVo; +import org.apache.paimon.web.server.data.vo.CdcJobDefinitionVO; import com.baomidou.mybatisplus.extension.service.IService; @@ -31,10 +34,18 @@ public interface CdcJobDefinitionService extends IService { R create(CdcJobDefinitionDTO cdcJobDefinitionDTO); - PageR listAll( + PageR listAll( String name, boolean withConfig, long currentPage, long pageSize); R update(CdcJobDefinitionDTO cdcJobDefinitionDTO); - R submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO); + R submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO); + + R cancel(Integer id); + + R status(Integer id, Integer logId); + + R copy(Integer id); + + R deleteById(Integer id); } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobLogService.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobLogService.java new file mode 100644 index 000000000..7f971dc27 --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobLogService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.service; + +import org.apache.paimon.web.server.data.model.CdcJobLog; + +import com.baomidou.mybatisplus.extension.service.IService; + +/** Cdc Job Log Service. */ +public interface CdcJobLogService extends IService { + + CdcJobLog findLast(Integer cdcJobDefinitionId); +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/ClusterService.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/ClusterService.java index f50d1164e..bfe044909 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/ClusterService.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/ClusterService.java @@ -29,7 +29,7 @@ /** Cluster Service. */ public interface ClusterService extends IService { - List listUsers(IPage page, @Param("cluster") ClusterInfo cluster); + List listClusters(IPage page, @Param("cluster") ClusterInfo cluster); boolean checkClusterNameUnique(ClusterInfo cluster); diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java index 28844b91f..e2452a98a 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.web.server.service.impl; import org.apache.paimon.web.api.action.context.ActionContext; +import org.apache.paimon.web.api.action.context.ActionExecutionResult; import org.apache.paimon.web.api.action.context.factory.ActionContextFactoryServiceLoadUtil; import org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory; import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions; @@ -27,21 +28,30 @@ import org.apache.paimon.web.api.catalog.PaimonServiceFactory; import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType; import org.apache.paimon.web.api.enums.FlinkCdcSyncType; +import org.apache.paimon.web.api.enums.SyncMode; import org.apache.paimon.web.common.util.JSONUtils; +import org.apache.paimon.web.engine.flink.common.status.JobStatus; +import org.apache.paimon.web.engine.flink.sql.gateway.client.SessionClusterClient; +import org.apache.paimon.web.engine.flink.sql.gateway.model.JobOverviewEntity; +import org.apache.paimon.web.engine.flink.sql.gateway.model.TriggerIdEntity; import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO; import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO; import org.apache.paimon.web.server.data.model.CatalogInfo; import org.apache.paimon.web.server.data.model.CdcJobDefinition; +import org.apache.paimon.web.server.data.model.CdcJobLog; import org.apache.paimon.web.server.data.model.ClusterInfo; import org.apache.paimon.web.server.data.model.cdc.CdcGraph; import org.apache.paimon.web.server.data.model.cdc.CdcNode; import org.apache.paimon.web.server.data.result.PageR; import org.apache.paimon.web.server.data.result.R; import org.apache.paimon.web.server.data.result.enums.Status; +import org.apache.paimon.web.server.data.vo.ActionExecutionResultVo; +import org.apache.paimon.web.server.data.vo.CdcJobDefinitionVO; import org.apache.paimon.web.server.data.vo.UserInfoVO; import org.apache.paimon.web.server.mapper.CdcJobDefinitionMapper; import org.apache.paimon.web.server.service.CatalogService; import org.apache.paimon.web.server.service.CdcJobDefinitionService; +import org.apache.paimon.web.server.service.CdcJobLogService; import org.apache.paimon.web.server.service.ClusterService; import org.apache.paimon.web.server.util.StringUtils; @@ -51,25 +61,53 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** CdcJobDefinitionServiceImpl. */ @Service +@Slf4j public class CdcJobDefinitionServiceImpl extends ServiceImpl implements CdcJobDefinitionService { @Autowired private CatalogService catalogService; @Autowired private ClusterService clusterService; + @Autowired private CdcJobLogService cdcJobLogService; + + final LoadingCache cacher = + CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build( + new CacheLoader() { + @Override + public ReentrantLock load(Integer id) { + return new ReentrantLock(); + } + }); @Override public R create(CdcJobDefinitionDTO cdcJobDefinitionDTO) { + // to valid config + CdcGraph.fromCdcGraphJsonString(cdcJobDefinitionDTO.getConfig()); + // to update entity String name = cdcJobDefinitionDTO.getName(); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("name", name); @@ -92,13 +130,30 @@ public R create(CdcJobDefinitionDTO cdcJobDefinitionDTO) { .cdcType(cdcJobDefinitionDTO.getCdcType()) .createUser(jobCreateUser) .description(cdcJobDefinitionDTO.getDescription()) + .dataDelay(cdcJobDefinitionDTO.getDataDelay()) .build(); baseMapper.insert(cdcJobDefinition); return R.succeed(); } @Override - public PageR listAll( + public R deleteById(Integer id) { + CdcJobLog cdcJobLog = cdcJobLogService.findLast(id); + if (cdcJobLog == null) { + removeById(id); + return R.succeed(); + } + R jobStatusR = status(id, cdcJobLog.getId()); + if (isStatusOnIng(jobStatusR.getData())) { + return R.failed(Status.CDC_JOB_CAN_NOT_DELETE); + } else { + removeById(id); + return R.succeed(); + } + } + + @Override + public PageR listAll( String name, boolean withConfig, long currentPage, long pageSize) { Page page = new Page<>(currentPage, pageSize); QueryWrapper queryWrapper = new QueryWrapper<>(); @@ -110,23 +165,49 @@ public PageR listAll( "description", "update_time", "create_time"); - if (!withConfig) { - queryWrapper.select( - "name", - "id", - "description", - "cdc_type", - "create_user", - "update_time", - "create_time"); - } queryWrapper.like(StringUtils.isNotBlank(name), "name", name); Page resPage = baseMapper.selectPage(page, queryWrapper); - return new PageR<>(resPage.getTotal(), true, resPage.getRecords()); + Page voPage = toCdcJobDefinitionVOPage(resPage); + return new PageR<>(voPage.getTotal(), true, voPage.getRecords()); + } + + private Page toCdcJobDefinitionVOPage(Page resPage) { + List voList = + resPage.getRecords().stream() + .map( + item -> { + Integer id = item.getId(); + CdcJobLog cdcJobLog = cdcJobLogService.findLast(id); + String jobStatus = + cdcJobLog == null + ? JobStatus.FRESHED.getValue() + : cdcJobLog.getCurrentStatus(); + return CdcJobDefinitionVO.builder() + .id(id) + .cdcType(item.getCdcType()) + .isDelete(item.isDelete()) + .name(item.getName()) + .config(item.getConfig()) + .createTime(item.getCreateTime()) + .updateTime(item.getUpdateTime()) + .description(item.getDescription()) + .createUser(item.getCreateUser()) + .dataDelay(item.getDataDelay()) + .currentStatus(jobStatus) + .build(); + }) + .collect(Collectors.toList()); + + Page voPage = + new Page<>(resPage.getCurrent(), resPage.getSize(), resPage.getSize()); + return voPage.setRecords(voList); } @Override public R update(CdcJobDefinitionDTO cdcJobDefinitionDTO) { + // to valid config + CdcGraph.fromCdcGraphJsonString(cdcJobDefinitionDTO.getConfig()); + // to update entity QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("id", cdcJobDefinitionDTO.getId()); if (!baseMapper.exists(queryWrapper)) { @@ -137,6 +218,7 @@ public R update(CdcJobDefinitionDTO cdcJobDefinitionDTO) { .name(cdcJobDefinitionDTO.getName()) .config(cdcJobDefinitionDTO.getConfig()) .cdcType(cdcJobDefinitionDTO.getCdcType()) + .dataDelay(cdcJobDefinitionDTO.getDataDelay()) .createUser(cdcJobDefinitionDTO.getCreateUser()) .description(cdcJobDefinitionDTO.getDescription()) .build(); @@ -145,37 +227,239 @@ public R update(CdcJobDefinitionDTO cdcJobDefinitionDTO) { return R.succeed(); } + /** + * to submit cdc job to cluster Use ReentrantLock to ensure that submission, cancellation, and + * status retrieval for the same job are atomic operations. + * + * @see CdcJobDefinitionServiceImpl#cancel(Integer) + * @see CdcJobDefinitionServiceImpl#status(Integer, Integer) + * @param id cdc job id + * @param cdcJobSubmitDTO cdc job dto + * @return action execution result + */ + @Override + public R submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO) { + ReentrantLock lock = cacher.getUnchecked(id); + if (lock.isLocked()) { + return R.of( + null, + Status.CDC_JOB_FLINK_JOB_LOCKED.getCode(), + Status.CDC_JOB_FLINK_JOB_LOCKED.getMsg()); + } + lock.lock(); + try { + CdcJobDefinition cdcJobDefinition = baseMapper.selectById(id); + log.info("to submit job {}: {}", cdcJobDefinition.getId(), cdcJobDefinition.getName()); + FlinkCdcSyncType flinkCdcSyncType = + FlinkCdcSyncType.valueOf(cdcJobDefinition.getCdcType()); + CdcGraph cdcGraph = CdcGraph.fromCdcGraphJsonString(cdcJobDefinition.getConfig()); + String clusterId = cdcJobSubmitDTO.getClusterId(); + ObjectNode actionConfigs = JSONUtils.createObjectNode(); + handBaseActionConfig(actionConfigs, clusterId, cdcJobDefinition); + handleCdcGraphNodeData( + actionConfigs, cdcGraph.getSource(), flinkCdcSyncType, cdcJobSubmitDTO); + handleCdcGraphNodeData( + actionConfigs, cdcGraph.getTarget(), flinkCdcSyncType, cdcJobSubmitDTO); + + FlinkCdcActionContextFactory factory = + ActionContextFactoryServiceLoadUtil.getFlinkCdcActionContextFactory( + cdcGraph.getSource().getType(), + cdcGraph.getTarget().getType(), + flinkCdcSyncType); + ActionContext actionContext = factory.getActionContext(actionConfigs); + + ActionService actionService = new FlinkCdcActionService(); + ActionExecutionResult result = actionService.execute(actionContext); + return createCdcJobLog(result, clusterId, id); + } catch (Exception e) { + log.error("error while submit job of {} to cluster", id, e); + return R.failed(Status.UNKNOWN_ERROR); + } finally { + lock.unlock(); + } + } + + /** + * to cancel cdc job from cluster Use ReentrantLock to ensure that submission, cancellation, and + * status retrieval for the same job are atomic operations. + * + * @see CdcJobDefinitionServiceImpl#submit(Integer, CdcJobSubmitDTO) + * @see CdcJobDefinitionServiceImpl#status(Integer, Integer) + * @param id cdc job id + * @return action execution result + */ + @Override + public R cancel(Integer id) { + log.info("to cancel job {}", id); + ReentrantLock lock = cacher.getUnchecked(id); + if (lock.isLocked()) { + return R.of( + null, + Status.CDC_JOB_FLINK_JOB_LOCKED.getCode(), + Status.CDC_JOB_FLINK_JOB_LOCKED.getMsg()); + } + lock.lock(); + try { + CdcJobLog cdcJobLog = cdcJobLogService.findLast(id); + String flinkJobId = cdcJobLog.getFlinkJobId(); + if (flinkJobId == null) { + return R.of( + null, + Status.CDC_JOB_FLINK_JOB_ID_NOT_EXISTS.getCode(), + Status.CDC_JOB_FLINK_JOB_ID_NOT_EXISTS.getMsg()); + } + // request remote to cancel flink job + ClusterInfo clusterInfo = clusterService.getById(cdcJobLog.getClusterId()); + SessionClusterClient client = + new SessionClusterClient(clusterInfo.getHost(), clusterInfo.getPort()); + TriggerIdEntity triggerIdEntity = client.stopWithSavePoint(flinkJobId); + if (triggerIdEntity != null) { + cdcJobLog.setCurrentStatus(JobStatus.CANCELLING.getValue()); + cdcJobLogService.updateById(cdcJobLog); + ActionExecutionResultVo actionExecutionResultVo = + new ActionExecutionResultVo( + cdcJobLog.getId(), + true, + Status.SUCCESS.getMsg(), + JobStatus.CANCELLING); + return R.succeed(actionExecutionResultVo); + } else { + throw new RuntimeException("cancel job return null triggerIdEntity"); + } + } catch (Exception ex) { + log.error("An exception occurred while cancel job status of {}", id, ex); + return R.failed(Status.UNKNOWN_ERROR); + } finally { + lock.unlock(); + } + } + + /** + * to fetch cdc job status from cluster Use ReentrantLock to ensure that submission, + * cancellation, and status retrieval for the same job are atomic operations. + * + * @see CdcJobDefinitionServiceImpl#submit(Integer, CdcJobSubmitDTO) + * @see CdcJobDefinitionServiceImpl#cancel(Integer) + * @param id cdc job id + * @return action execution result + */ @Override - public R submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO) { + public R status(Integer id, Integer logId) { + log.info("to ask status of jobId {}, logId {}", id, logId); + ReentrantLock lock = cacher.getUnchecked(id); + if (lock.isLocked()) { + return R.of( + null, + Status.CDC_JOB_FLINK_JOB_LOCKED.getCode(), + Status.CDC_JOB_FLINK_JOB_LOCKED.getMsg()); + } + lock.lock(); + CdcJobLog cdcJobLog = cdcJobLogService.getById(logId); + try { + ClusterInfo clusterInfo = clusterService.getById(cdcJobLog.getClusterId()); + String flinkJobId = cdcJobLog.getFlinkJobId(); + if (flinkJobId == null) { + throw new RuntimeException("flink job id is null"); + } else { + SessionClusterClient client = + new SessionClusterClient(clusterInfo.getHost(), clusterInfo.getPort()); + JobOverviewEntity jobOverview = client.jobOverview(cdcJobLog.getFlinkJobId()); + if (jobOverview != null && jobOverview.getState() != null) { + JobStatus jobStatus = JobStatus.fromValue(jobOverview.getState()); + Instant instant = Instant.ofEpochMilli(jobOverview.getEndTime()); + LocalDateTime terminateTime = + LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + cdcJobLog.setCurrentStatus(jobStatus.getValue()); + cdcJobLog.setTerminateTime(terminateTime); + cdcJobLogService.updateById(cdcJobLog); + return R.succeed(jobStatus); + } else { + throw new RuntimeException("find null jobOverview"); + } + } + } catch (Exception ex) { + log.error("An exception occurred while refresh job status of {}-{}", id, logId, ex); + cdcJobLog.setCurrentStatus(JobStatus.UNKNOWN.getValue()); + cdcJobLogService.updateById(cdcJobLog); + return R.failed(JobStatus.UNKNOWN); + } finally { + lock.unlock(); + } + } + + @Override + public R copy(Integer id) { CdcJobDefinition cdcJobDefinition = baseMapper.selectById(id); - String config = cdcJobDefinition.getConfig(); - FlinkCdcSyncType flinkCdcSyncType = FlinkCdcSyncType.valueOf(cdcJobDefinition.getCdcType()); - ActionService actionService = new FlinkCdcActionService(); - CdcGraph cdcGraph = CdcGraph.fromCdcGraphJsonString(config); - FlinkCdcActionContextFactory factory = - ActionContextFactoryServiceLoadUtil.getFlinkCdcActionContextFactory( - cdcGraph.getSource().getType(), - cdcGraph.getTarget().getType(), - flinkCdcSyncType); - ObjectNode actionConfigs = JSONUtils.createObjectNode(); - String clusterId = cdcJobSubmitDTO.getClusterId(); + String createUser = getLoginUser().getUser().getUsername(); + int maxId = getMaxId() + 1; + CdcJobDefinition entity = + CdcJobDefinition.builder() + .cdcType(cdcJobDefinition.getCdcType()) + .dataDelay(cdcJobDefinition.getDataDelay()) + .name(String.format("%s-%s", cdcJobDefinition.getName(), maxId)) + .config(cdcJobDefinition.getConfig()) + .description(cdcJobDefinition.getDescription()) + .createUser(createUser) + .build(); + + int rowId = baseMapper.insert(entity); + return R.succeed(rowId); + } + + private Integer getMaxId() { + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("MAX(id) as max_id").last(" OR is_delete=1"); + return baseMapper.selectObjs(queryWrapper).stream() + .map(item -> (Integer) item) + .findFirst() + .orElse(0); + } + + private UserInfoVO getLoginUser() { + return (UserInfoVO) StpUtil.getSession().get(Integer.toString(StpUtil.getLoginIdAsInt())); + } + + private R createCdcJobLog( + ActionExecutionResult result, String clusterId, Integer id) { + String createUser = getLoginUser().getUser().getUsername(); + CdcJobLog cdcJobLog; + JobStatus jobStatus = result.isSuccess() ? JobStatus.SUBMITTING : JobStatus.FAILED; + String errorMsg = result.isSuccess() ? R.succeed().getMsg() : R.failed().getMsg(); + String flinkJobId = result.getFlinkJobId(); + cdcJobLog = + CdcJobLog.builder() + .clusterId(Integer.parseInt(clusterId)) + .cdcJobDefinitionId(id) + .createUser(createUser) + .flinkJobId(flinkJobId) + .currentStatus(jobStatus.getValue()) + .build(); + cdcJobLogService.save(cdcJobLog); + ActionExecutionResultVo actionExecutionResultVo = + ActionExecutionResultVo.builder() + .jobStatus(jobStatus) + .success(result.isSuccess()) + .errorMsg(errorMsg) + .logId(cdcJobLog.getId()) + .build(); + return R.succeed(actionExecutionResultVo); + } + + private void handBaseActionConfig( + ObjectNode actionConfigs, String clusterId, CdcJobDefinition cdcJobDefinition) { ClusterInfo clusterInfo = clusterService.getById(clusterId); actionConfigs.put( FlinkCdcOptions.SESSION_URL, String.format("%s:%s", clusterInfo.getHost(), clusterInfo.getPort())); - handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource(), flinkCdcSyncType); - handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget(), flinkCdcSyncType); - ActionContext actionContext = factory.getActionContext(actionConfigs); - try { - actionService.execute(actionContext); - } catch (Exception e) { - throw new RuntimeException(e); - } - return R.succeed(); + actionConfigs.put(FlinkCdcOptions.PIPELINE_NAME, cdcJobDefinition.getName()); + actionConfigs.put(FlinkCdcOptions.EXE_CP_INTERVAL, cdcJobDefinition.getDataDelay()); } private void handleCdcGraphNodeData( - ObjectNode actionConfigs, CdcNode node, FlinkCdcSyncType cdcSyncType) { + ObjectNode actionConfigs, + CdcNode node, + FlinkCdcSyncType cdcSyncType, + CdcJobSubmitDTO cdcJobSubmitDTO) { FlinkCdcDataSourceType cdcDataSourceType = FlinkCdcDataSourceType.of(node.getType()); Preconditions.checkNotNull( cdcDataSourceType, @@ -185,7 +469,7 @@ private void handleCdcGraphNodeData( handlePaimonNodeData(actionConfigs, node.getData(), cdcSyncType); break; case MYSQL: - handleMysqlNodeData(actionConfigs, node.getData(), cdcSyncType); + handleMysqlNodeData(actionConfigs, node.getData(), cdcSyncType, cdcJobSubmitDTO); break; case POSTGRESQL: handlePostgresNodeData(actionConfigs, node.getData()); @@ -193,19 +477,24 @@ private void handleCdcGraphNodeData( } } - private List getOtherConfigs(ObjectNode node) { - String otherConfigs = JSONUtils.getString(node, "other_configs"); - List configList; - if (StringUtils.isBlank(otherConfigs)) { - configList = new ArrayList<>(); - } else { - configList = new ArrayList<>(Arrays.asList(otherConfigs.split(";"))); - } - return configList; + private List getByKeyToList(ObjectNode node, String key) { + String stringValue = JSONUtils.getString(node, key); + return Optional.of(stringValue) + .map(item -> new ArrayList<>(Arrays.asList(item.split(";")))) + .orElse(new ArrayList<>()); } private void handlePostgresNodeData(ObjectNode actionConfigs, ObjectNode postgresData) { - List postgresConfList = getOtherConfigs(postgresData); + List computedColumnList = + getByKeyToList(postgresData, FlinkCdcOptions.COMPUTED_COLUMN); + actionConfigs.putPOJO(FlinkCdcOptions.COMPUTED_COLUMN, computedColumnList); + actionConfigs.put( + FlinkCdcOptions.METADATA_COLUMN, + JSONUtils.getString(postgresData, FlinkCdcOptions.METADATA_COLUMN)); + actionConfigs.put( + FlinkCdcOptions.TYPE_MAPPING, JSONUtils.getString(postgresData, "type_mapping")); + + List postgresConfList = getByKeyToList(postgresData, "other_configs"); postgresConfList.add( buildKeyValueString("hostname", JSONUtils.getString(postgresData, "host"))); postgresConfList.add( @@ -228,8 +517,20 @@ private void handlePostgresNodeData(ObjectNode actionConfigs, ObjectNode postgre } private void handleMysqlNodeData( - ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType cdcSyncType) { - List mysqlConfList = getOtherConfigs(actionConfigs); + ObjectNode actionConfigs, + ObjectNode mysqlData, + FlinkCdcSyncType cdcSyncType, + CdcJobSubmitDTO cdcJobSubmitDTO) { + List computedColumnList = + getByKeyToList(mysqlData, FlinkCdcOptions.COMPUTED_COLUMN); + actionConfigs.putPOJO(FlinkCdcOptions.COMPUTED_COLUMN, computedColumnList); + actionConfigs.put( + FlinkCdcOptions.METADATA_COLUMN, + JSONUtils.getString(mysqlData, FlinkCdcOptions.METADATA_COLUMN)); + String typeMapping = String.join(",", JSONUtils.getList(mysqlData, "type_mapping")); + actionConfigs.put(FlinkCdcOptions.TYPE_MAPPING, typeMapping); + + List mysqlConfList = getByKeyToList(mysqlData, "other_configs"); mysqlConfList.add(buildKeyValueString("hostname", JSONUtils.getString(mysqlData, "host"))); mysqlConfList.add( buildKeyValueString("username", JSONUtils.getString(mysqlData, "username"))); @@ -243,11 +544,33 @@ private void handleMysqlNodeData( } mysqlConfList.add( buildKeyValueString("password", JSONUtils.getString(mysqlData, "password"))); + + SyncMode syncMode = SyncMode.valueOf(cdcJobSubmitDTO.getStartupMode()); + switch (syncMode) { + case INCREMENTAL_SYNC: + mysqlConfList.add(buildKeyValueString("scan.startup.mode", "latest-offset")); + break; + case FULL_SYNC: + mysqlConfList.add(buildKeyValueString("scan.startup.mode", "initial")); + break; + case TS_SYNC: + mysqlConfList.add(buildKeyValueString("scan.startup.mode", "timestamp")); + mysqlConfList.add( + buildKeyValueString( + "scan.startup.timestamp-millis", + String.valueOf(cdcJobSubmitDTO.getStartupTimestamp()))); + break; + } + actionConfigs.putPOJO(FlinkCdcOptions.MYSQL_CONF, mysqlConfList); } private void handlePaimonNodeData( ObjectNode actionConfigs, ObjectNode paimonData, FlinkCdcSyncType cdcSyncType) { + actionConfigs.put( + FlinkCdcOptions.PARTITION_KEYS, + JSONUtils.getString(paimonData, "partition_column")); + Integer catalog = JSONUtils.getInteger(paimonData, "catalog"); CatalogInfo catalogInfo = catalogService.getById(catalog); actionConfigs.put(FlinkCdcOptions.WAREHOUSE, catalogInfo.getWarehouse()); @@ -255,23 +578,14 @@ private void handlePaimonNodeData( actionConfigs.put(FlinkCdcOptions.TABLE, JSONUtils.getString(paimonData, "table_name")); } actionConfigs.put(FlinkCdcOptions.DATABASE, JSONUtils.getString(paimonData, "database")); - actionConfigs.put( - FlinkCdcOptions.PRIMARY_KEYS, JSONUtils.getString(paimonData, "primary_key")); - String otherConfigs = JSONUtils.getString(paimonData, "other_configs2"); - if (StringUtils.isBlank(otherConfigs)) { - actionConfigs.putPOJO(FlinkCdcOptions.TABLE_CONF, new ArrayList<>()); - } else { - actionConfigs.putPOJO( - FlinkCdcOptions.TABLE_CONF, Arrays.asList(otherConfigs.split(";"))); - } + List configList = getByKeyToList(paimonData, "other_configs2"); + actionConfigs.putPOJO(FlinkCdcOptions.TABLE_CONF, configList); + List catalogConfList = new ArrayList<>(); Map options = catalogInfo.getOptions(); PaimonServiceFactory.convertToPaimonOptions(options) .toMap() - .forEach( - (k, v) -> { - catalogConfList.add(buildKeyValueString(k, v)); - }); + .forEach((k, v) -> catalogConfList.add(buildKeyValueString(k, v))); actionConfigs.putPOJO(FlinkCdcOptions.CATALOG_CONF, catalogConfList); } @@ -279,4 +593,33 @@ private void handlePaimonNodeData( private String buildKeyValueString(String key, String value) { return key + "=" + value; } + + /** Crontab to check cdc job status. */ + @Scheduled(cron = "0 * * * * ?") + public R checkAllStatus() { + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("id"); + List cdcJobDefinitionIdList = listObjs(queryWrapper, item -> (Integer) item); + cdcJobDefinitionIdList.forEach( + id -> { + CdcJobLog cdcJobLog = cdcJobLogService.findLast(id); + Optional.of(cdcJobLog) + .ifPresent( + item -> { + if (isStatusOnIng( + JobStatus.fromValue( + cdcJobLog.getCurrentStatus()))) { + status(id, cdcJobLog.getId()); + } + }); + }); + return R.succeed(); + } + + private boolean isStatusOnIng(JobStatus jobStatus) { + return (JobStatus.CANCELLING == jobStatus + || JobStatus.SUBMITTING == jobStatus + || JobStatus.RESTARTING == jobStatus + || JobStatus.RUNNING == jobStatus); + } } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobLogImpl.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobLogImpl.java new file mode 100644 index 000000000..1d239dd8d --- /dev/null +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobLogImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.server.service.impl; + +import org.apache.paimon.web.server.data.model.CdcJobLog; +import org.apache.paimon.web.server.mapper.CdcJobLogMapper; +import org.apache.paimon.web.server.service.CdcJobLogService; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** CdcJobLogImpl. */ +@Service +@Slf4j +public class CdcJobLogImpl extends ServiceImpl + implements CdcJobLogService { + + /** + * Find last status log of cdc job. + * + * @param cdcJobDefinitionId The cdc job primary key id. + * @return The last status log of cdc job. + */ + @Override + public CdcJobLog findLast(Integer cdcJobDefinitionId) { + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper + .eq("cdc_job_definition_id", cdcJobDefinitionId) + .orderByDesc("create_time") + .last("LIMIT 1"); + return baseMapper.selectOne(queryWrapper); + } +} diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/ClusterServiceImpl.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/ClusterServiceImpl.java index e2bc2bca0..bfce27ce0 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/ClusterServiceImpl.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/ClusterServiceImpl.java @@ -53,7 +53,7 @@ public class ClusterServiceImpl extends ServiceImpl @Autowired private ClusterMapper clusterMapper; @Override - public List listUsers(IPage page, ClusterInfo cluster) { + public List listClusters(IPage page, ClusterInfo cluster) { return clusterMapper.listClusters(page, cluster); } diff --git a/paimon-web-server/src/main/resources/i18n/messages.properties b/paimon-web-server/src/main/resources/i18n/messages.properties index 3d554154a..2dd2993a0 100644 --- a/paimon-web-server/src/main/resources/i18n/messages.properties +++ b/paimon-web-server/src/main/resources/i18n/messages.properties @@ -49,8 +49,9 @@ table.remove.option.error=Exception calling Paimon Catalog API to remove an opti table.alter.column.error=Exception calling Paimon Catalog API to alter a column. table.drop.error=Exception calling Paimon Catalog API to drop a table. table.rename.error=Exception calling Paimon Catalog API to rename a table. -cdc.job.exist.error=Paimon CDC job exists. -cdc.job.not.exist.error=Paimon CDC job is not exist. +cdc.job.exist.error=job exists. +cdc.job.not.exist.error=job is not exist. +cdc.job.submit.error=job submit error. cluster.not.exist=This cluster is not exist. cluster.name.exist=This cluster name {0} already exists. job.submit.error=Exception submitting a job. diff --git a/paimon-web-server/src/main/resources/i18n/messages_en_US.properties b/paimon-web-server/src/main/resources/i18n/messages_en_US.properties index 3d554154a..1d1a9dcb7 100644 --- a/paimon-web-server/src/main/resources/i18n/messages_en_US.properties +++ b/paimon-web-server/src/main/resources/i18n/messages_en_US.properties @@ -51,6 +51,8 @@ table.drop.error=Exception calling Paimon Catalog API to drop a table. table.rename.error=Exception calling Paimon Catalog API to rename a table. cdc.job.exist.error=Paimon CDC job exists. cdc.job.not.exist.error=Paimon CDC job is not exist. +cdc.job.flink.job.id.not.exists=FlinkJobID not exist. +cdc.job.flink.job.locked=job locked, try it latter cluster.not.exist=This cluster is not exist. cluster.name.exist=This cluster name {0} already exists. job.submit.error=Exception submitting a job. diff --git a/paimon-web-server/src/main/resources/i18n/messages_zh_CN.properties b/paimon-web-server/src/main/resources/i18n/messages_zh_CN.properties index 4fdc4edb9..c1d03a024 100644 --- a/paimon-web-server/src/main/resources/i18n/messages_zh_CN.properties +++ b/paimon-web-server/src/main/resources/i18n/messages_zh_CN.properties @@ -49,8 +49,11 @@ table.remove.option.error=\u8C03\u7528 Paimon API \u79FB\u9664 Option \u65F6\u53 table.alter.column.error=\u8C03\u7528 Paimon API \u4FEE\u6539 Column \u65F6\u53D1\u751F\u5F02\u5E38 table.drop.error=\u8C03\u7528 Paimon API \u5220\u9664 Table \u65F6\u53D1\u751F\u5F02\u5E38 table.rename.error=\u8C03\u7528 Paimon API \u91CD\u547D\u540D Table \u65F6\u53D1\u751F\u5F02\u5E38 -cdc.job.exist.error=paimon cdc\u4F5C\u4E1A\u5DF2\u5B58\u5728 -cdc.job.not.exist.error=paimon cdc\u4F5C\u4E1A\u4E0D\u5B58\u5728 +cdc.job.exist.error=\u4F5C\u4E1A\u5DF2\u5B58\u5728 +cdc.job.not.exist.error=\u4F5C\u4E1A\u4E0D\u5B58\u5728 +cdc.job.flink.job.id.not.exists=FlinkJobID\u4E0D\u5B58\u5728 +cdc.job.flink.job.locked=\u4F5C\u4E1A\u88AB\u9501\u4F4F\uFF0C\u7A0D\u7B49\u518D\u8BD5 +cdc.job.can.not.deleted=\u8BE5\u72B6\u6001\u4E0B\u7684\u4F5C\u4E1A\u4E0D\u80FD\u5220\u9664 cluster.not.exist=\u6B64\u96C6\u7FA4\u4E0D\u5B58\u5728 cluster.name.exist=\u6B64\u96C6\u7FA4\u540D{0}\u5DF2\u7ECF\u5B58\u5728 job.submit.error=\u63D0\u4EA4\u4F5C\u4E1A\u65F6\u53D1\u751F\u5F02\u5E38 diff --git a/paimon-web-server/src/main/resources/mapper/CdcJobDefinitionMapper.xml b/paimon-web-server/src/main/resources/mapper/CdcJobDefinitionMapper.xml index 132a6093c..de5e61d36 100644 --- a/paimon-web-server/src/main/resources/mapper/CdcJobDefinitionMapper.xml +++ b/paimon-web-server/src/main/resources/mapper/CdcJobDefinitionMapper.xml @@ -32,11 +32,12 @@ under the License. + id,name,description, cdc_type,config,create_user, - create_time,update_time + create_time,update_time,dataDelay diff --git a/paimon-web-server/src/main/resources/mapper/CdcJobLogMapper.xml b/paimon-web-server/src/main/resources/mapper/CdcJobLogMapper.xml new file mode 100644 index 000000000..113b54fbd --- /dev/null +++ b/paimon-web-server/src/main/resources/mapper/CdcJobLogMapper.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + id,cluster_id,cdc_job_definition_id, + create_user,create_time,update_time, + terminate_time,current_status,flink_job_id + + diff --git a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java index 42848197e..4c4cf00c9 100644 --- a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java +++ b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java @@ -233,7 +233,7 @@ public void submitCdcJob() throws Exception { .getResponse(); } - @Order(4) + @Order(5) @Test public void submitDatabaseSyncCdcJob() throws Exception { System.setProperty("FLINK_HOME", "/opt/flink"); diff --git a/paimon-web-ui/README.md b/paimon-web-ui/README.md index 32992dd0e..234214c5d 100644 --- a/paimon-web-ui/README.md +++ b/paimon-web-ui/README.md @@ -45,8 +45,6 @@ pnpm dev pnpm dev:mock & pnpm run mock ``` - - ### Type-Check, Compile and Minify for Production ```sh @@ -68,4 +66,3 @@ You can run this command to automatically generate the corresponding protocol be ```sh pnpm run gen:license ``` - diff --git a/paimon-web-ui/eslint.config.mjs b/paimon-web-ui/eslint.config.mjs index d937ca13d..14b5c4572 100644 --- a/paimon-web-ui/eslint.config.mjs +++ b/paimon-web-ui/eslint.config.mjs @@ -35,6 +35,6 @@ export default antfu({ '**/node_modules', '**/dist', 'mock', - 'httpData' - ] + 'httpData', + ], }) diff --git a/paimon-web-ui/license.node.js b/paimon-web-ui/license.node.js index 725907828..f01ac2bca 100644 --- a/paimon-web-ui/license.node.js +++ b/paimon-web-ui/license.node.js @@ -79,7 +79,9 @@ function fileDisplay(filePath) { if (isFile) { const res = read(filedir) const fileName = path.basename(filedir) - res && console.log(`success file: ${fileName}`) + if (res) { + console.log(`success file: ${fileName}`) + } } if (isDir) fileDisplay(filedir) diff --git a/paimon-web-ui/pom.xml b/paimon-web-ui/pom.xml index 9575d2ae0..63c500d22 100644 --- a/paimon-web-ui/pom.xml +++ b/paimon-web-ui/pom.xml @@ -70,7 +70,7 @@ under the License. generate-resources - install --legacy-peer-deps + install diff --git a/paimon-web-ui/postcss.config.js b/paimon-web-ui/postcss.config.js index 0d095452d..c4862c850 100644 --- a/paimon-web-ui/postcss.config.js +++ b/paimon-web-ui/postcss.config.js @@ -16,5 +16,5 @@ specific language governing permissions and limitations under the License. */ export const plugins = { - autoprefixer: {} + autoprefixer: {}, } diff --git a/paimon-web-ui/src/api/models/cdc/index.ts b/paimon-web-ui/src/api/models/cdc/index.ts index 0f6d59da3..78b98ac95 100644 --- a/paimon-web-ui/src/api/models/cdc/index.ts +++ b/paimon-web-ui/src/api/models/cdc/index.ts @@ -15,10 +15,10 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -import type { CdcJobDefinition, CdcJobSubmit } from './interface' +import type { CdcJobDefinition, CdcJobSubmit } from './types/cdcJob' import httpRequest from '@/api/request' -export * from './interface' +export * from './types/cdcJob' /** * # Create cdc job definition @@ -54,9 +54,31 @@ export function getCdcJobDefinition(id: number) { export function deleteCdcJobDefinition(id: number) { return httpRequest.delete(`/cdc-job-definition/${id}`) } + +/** + * # Delete cdc job definition + */ +export function copyCdcJobDefinition(id: number) { + return httpRequest.put(`/cdc-job-definition/${id}/copy`) +} + /** * # Submit cdc job */ export function submitCdcJob(id: number, form: CdcJobSubmit) { return httpRequest.post(`/cdc-job-definition/${id}/submit`, form) } + +/** + * # Submit cdc job + */ +export function cancelCdcJob(id: number) { + return httpRequest.post(`/cdc-job-definition/${id}/cancel`) +} + +/** + * # Get cdc job status + */ +export function freshCdcJobStatus(id: number, logId: number) { + return httpRequest.get(`/cdc-job-definition/${id}/status?logId=${logId}`) +} diff --git a/paimon-web-ui/src/api/models/cdc/interface.ts b/paimon-web-ui/src/api/models/cdc/types/cdcJob.ts similarity index 72% rename from paimon-web-ui/src/api/models/cdc/interface.ts rename to paimon-web-ui/src/api/models/cdc/types/cdcJob.ts index 01109c2d6..1f4ff2d94 100644 --- a/paimon-web-ui/src/api/models/cdc/interface.ts +++ b/paimon-web-ui/src/api/models/cdc/types/cdcJob.ts @@ -17,15 +17,33 @@ under the License. */ export interface CdcJobDefinition { id?: number + name: string createTime?: string updateTime?: string - name: string description?: string cdcType?: number config?: string createUser?: string + dataDelay: number + currentStatus?: string + logId?: number } export interface CdcJobSubmit { flinkSessionUrl: string + startupMode: number + startupTimestamp?: number +} + +export enum JobStatus { + RESTARTING = 'RESTARTING', + FRESHED = 'FRESHED', + SUBMITTING = 'SUBMITTING', + CANCELLING = 'CANCELLING', + RUNNING = 'RUNNING', + FAILED = 'FAILED', + CANCELED = 'CANCELED', + CREATED = 'CREATED', + FINISHED = 'FINISHED', + UNKNOWN = 'UNKNOWN', } diff --git a/paimon-web-ui/src/components/table-action/index.tsx b/paimon-web-ui/src/components/table-action/index.tsx index 7df9a6d75..5e9954bf4 100644 --- a/paimon-web-ui/src/components/table-action/index.tsx +++ b/paimon-web-ui/src/components/table-action/index.tsx @@ -15,7 +15,9 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -import { CreateOutline, PlayOutline, TrashOutline } from '@vicons/ionicons5' +import { CopyOutline, CreateOutline, PauseOutline, PlayOutline, TrashOutline } from '@vicons/ionicons5' +import { JobStatus } from '@/api/models/cdc/types/cdcJob' +import type { CdcJobDefinition } from '@/api/models/cdc' const props = { row: { @@ -27,32 +29,136 @@ const props = { export default defineComponent({ name: 'TableAction', props, - emits: ['handleEdit', 'handleRun', 'handleDelete'], + emits: ['handleEdit', 'handleRun', 'handleCancel', 'handleDelete', 'handleCopy'], setup(props, { emit }) { const { t } = useLocaleHooks() + const dialog = useDialog() + const message = useMessage() + const cdcJobStatusMap = reactive>(new Map()) + const { mittBus } = getCurrentInstance()!.appContext.config.globalProperties - const handleEdit = (row: any) => { + const handleEdit = (row: CdcJobDefinition) => { emit('handleEdit', row) } - const handleRun = (row: any) => { - emit('handleRun', row) + const handleRunOrCancel = (row: CdcJobDefinition) => { + switch (row.currentStatus) { + case JobStatus.FRESHED: + case JobStatus.FINISHED: + case JobStatus.FAILED: + case JobStatus.CANCELED: + case JobStatus.UNKNOWN: + emit('handleRun', row) + break + case JobStatus.RESTARTING: + case JobStatus.RUNNING: + emit('handleCancel', row) + break + default: + message.warning(t('cdc.cdc_job_exe_not_support')) + } } - const handleDelete = (row: any) => { - emit('handleDelete', row) + const handleDelete = (row: CdcJobDefinition) => { + if (row.currentStatus === JobStatus.CANCELLING + || row.currentStatus === JobStatus.RUNNING + || row.currentStatus === JobStatus.RESTARTING + || row.currentStatus === JobStatus.SUBMITTING) { + message.warning(t('cdc.cdc_job_can_not_delete')) + return + } + dialog.warning({ + title: t('cdc.confirm_title'), + content: t('cdc.confirm_delete_content'), + positiveText: t('cdc.confirm_sure'), + negativeText: t('cdc.confirm_cancel'), + onPositiveClick: () => { + emit('handleDelete', row) + }, + }) } + const handleCopy = (row: CdcJobDefinition) => { + emit('handleCopy', row) + } + + const isLoading = (row: CdcJobDefinition) => { + return row.currentStatus === JobStatus.CANCELLING + || row.currentStatus === JobStatus.SUBMITTING + } + + const getLabel = (row: CdcJobDefinition) => { + switch (row.currentStatus) { + case JobStatus.FRESHED: + case JobStatus.FINISHED: + case JobStatus.FAILED: + case JobStatus.CANCELED: + case JobStatus.UNKNOWN: + return t('cdc.run') + case JobStatus.RESTARTING: + case JobStatus.RUNNING: + return t('cdc.cancel') + case JobStatus.SUBMITTING: + case JobStatus.CANCELLING: + case JobStatus.CREATED: + return t('cdc.loading') + default: + return t('cdc.loading') + } + } + + const getIcon = (row: CdcJobDefinition) => { + switch (row.currentStatus) { + case JobStatus.UNKNOWN: + case JobStatus.FRESHED: + case JobStatus.FINISHED: + case JobStatus.FAILED: + case JobStatus.CANCELED: + return PlayOutline + case JobStatus.RESTARTING: + case JobStatus.RUNNING: + return PauseOutline + default: + return PlayOutline + } + } + + mittBus.on('cdcSubmitConfirmFinish', (data: any) => { + const id = data.id + cdcJobStatusMap.set(id, JobStatus.CANCELLING) + }) + return { t, handleEdit, - handleRun, + handleRunOrCancel, handleDelete, + handleCopy, + isLoading, + getLabel, + getIcon, } }, render() { return ( + + {{ + default: () => this.getLabel(this.row), + trigger: () => ( + + this.handleRunOrCancel(this.row)} + circle + > + + + ), + }} + {{ default: () => this.t('cdc.edit'), @@ -71,16 +177,16 @@ export default defineComponent({ {{ - default: () => this.t('cdc.run'), + default: () => this.t('cdc.copy'), trigger: () => ( - this.handleRun(this.row)} + this.handleCopy(this.row)} circle > - + ), }} @@ -101,6 +207,7 @@ export default defineComponent({ ), }} + ) }, diff --git a/paimon-web-ui/src/form-lib/cdc/use-cdc-list.ts b/paimon-web-ui/src/form-lib/cdc/use-cdc-list.ts index 09970bd76..7c471b3ea 100644 --- a/paimon-web-ui/src/form-lib/cdc/use-cdc-list.ts +++ b/paimon-web-ui/src/form-lib/cdc/use-cdc-list.ts @@ -20,12 +20,6 @@ import type { IJsonItem } from '@/components/dynamic-form/types' export function useCDCList(item: any) { const { t } = useLocaleHooks() - const model = reactive({ - name: item.name, - description: item.description, - synchronizationType: item.synchronizationType, - }) - const synchronizationTypeOptions = [ { label: t('cdc.single_table_synchronization'), @@ -37,6 +31,41 @@ export function useCDCList(item: any) { }, ] + const dataDelayModelOptions = [ + { + label: '1min', + value: 60 * 1000, + }, + { + label: '5min', + value: 5 * 60 * 1000, + }, + { + label: '10min', + value: 10 * 60 * 1000, + }, + { + label: '15min', + value: 15 * 60 * 1000, + }, + { + label: '30min', + value: 30 * 60 * 1000, + }, + { + label: '1hour', + value: 60 * 60 * 1000, + }, + ] + + const data = item.data + const model = reactive({ + name: data?.name, + description: data?.description, + cdcType: data?.cdcType || 0, + dataDelay: data?.dataDelay || 60 * 1000, + }) + return { json: [ { @@ -62,14 +91,22 @@ export function useCDCList(item: any) { name: t('cdc.task_description'), props: { placeholder: '', + type: 'textarea', }, }, + { + type: 'select', + field: 'dataDelay', + name: t('cdc.data_delay_option'), + options: dataDelayModelOptions, + value: model.dataDelay, + }, { type: 'radio', - field: 'synchronizationType', + field: 'cdcType', name: t('cdc.synchronization_type'), options: synchronizationTypeOptions, - value: 0, + value: model.cdcType, }, ] as IJsonItem[], model, diff --git a/paimon-web-ui/src/form-lib/cdc/use-mysql.ts b/paimon-web-ui/src/form-lib/cdc/use-mysql.ts index cf44e4b23..2b94b9d07 100644 --- a/paimon-web-ui/src/form-lib/cdc/use-mysql.ts +++ b/paimon-web-ui/src/form-lib/cdc/use-mysql.ts @@ -36,7 +36,32 @@ export function useMYSQL(item: any) { computed_column: data.computed_column || '', }) - const TypeMappingOptions = [] as any + const TypeMappingOptions = [ + { + label: 'tinyint1-not-bool', + value: 'tinyint1-not-bool', + }, + { + label: 'to-nullable', + value: 'to-nullable', + }, + { + label: 'to-string', + value: 'to-string', + }, + { + label: 'char-to-string', + value: 'char-to-string', + }, + { + label: 'longtext-to-bytes', + value: 'longtext-to-bytes', + }, + { + label: 'bigint-unsigned-to-bigint', + value: 'bigint-unsigned-to-bigint', + } + ] return { json: [ @@ -91,6 +116,7 @@ export function useMYSQL(item: any) { name: t('cdc.password'), props: { placeholder: '', + type: 'password', }, span: computed(() => tabType.value === 'connection_information' ? 24 : 0), validate: { @@ -155,6 +181,9 @@ export function useMYSQL(item: any) { name: t('cdc.type_mapping'), options: TypeMappingOptions, span: computed(() => tabType.value === 'synchronization_configuration' ? 24 : 0), + props: { + 'multiple': true + } }, { type: 'input', diff --git a/paimon-web-ui/src/form-lib/cdc/use-submit-cdc-job.ts b/paimon-web-ui/src/form-lib/cdc/use-submit-cdc-job.ts index bd1cdb56c..786a80e64 100644 --- a/paimon-web-ui/src/form-lib/cdc/use-submit-cdc-job.ts +++ b/paimon-web-ui/src/form-lib/cdc/use-submit-cdc-job.ts @@ -26,18 +26,39 @@ export function useSumbitCdcJob(item: any) { const model = reactive({ flinkSessionUrl: item.flinkSessionUrl, + startupMode: item.startupMode, + startupTimestamp: item.startupTimestamp, }) const flinkSessionClusterOptions = ref([]) getClusterListByType ('Flink', 1, Number.MAX_SAFE_INTEGER).then((response) => { if (response && response.data) { const clusterList = response.data as Cluster[] - flinkSessionClusterOptions.value = clusterList.map(cluster => ({ + flinkSessionClusterOptions.value = clusterList.filter((cluster) => { + return cluster.deploymentMode === 'yarn-session' || cluster.deploymentMode === 'k8s-session' + }).map(cluster => ({ label: cluster.clusterName, value: cluster.id.toString(), })) } }) + + const synchronizationModeOptions = [ + { + label: t('cdc.inc_synchronization'), + value: 0, + }, + { + label: t('cdc.full_synchronization'), + value: 1, + }, + { + label: t('cdc.ts_synchronization'), + value: 2, + }, + + ] + return { json: [ { @@ -58,6 +79,21 @@ export function useSumbitCdcJob(item: any) { }, }, }, + { + type: 'radio', + field: 'startupMode', + name: t('cdc.synchronization_mode'), + options: synchronizationModeOptions, + value: 0, + }, + { + type: 'input', + field: 'startupTimestamp', + name: t('cdc.startup_timestamp'), + props: { + placeholder: t('cdc.startup_timestamp_tip'), + }, + }, ] as IJsonItem[], model, } diff --git a/paimon-web-ui/src/locales/en/modules/cdc.ts b/paimon-web-ui/src/locales/en/modules/cdc.ts index 8a985b65e..d9065df68 100644 --- a/paimon-web-ui/src/locales/en/modules/cdc.ts +++ b/paimon-web-ui/src/locales/en/modules/cdc.ts @@ -17,9 +17,13 @@ under the License. */ export default { cdc_job_definition: 'Cdc Job Definition', + cdc_data_source: 'Cdc Data Source', + cdc_data_sink: 'Cdc Data Sink', create_synchronization_job: 'Create Synchronization Job', + save_synchronization_job: 'Save Synchronization Job', job_name: 'Job Name', - synchronization_type: 'Synchronization type', + synchronization_type: 'Synchronization Type', + synchronization_mode: 'Synchronization Mode', job_description: 'Job Description', create_user: 'Create User', create_time: 'Create Time', @@ -31,6 +35,11 @@ export default { synchronization_job_name: 'Synchronization Job Name', edit_synchronization_job: 'Edit Synchronization Job', task_description: 'Task Description', + data_delay_option: 'Data Delay', + full_synchronization: 'Full Synchronization', + inc_synchronization: 'Incremental Synchronization', + ts_synchronization: 'Timestamp Synchronization', + cp_synchronization: 'Checkpoint Synchronization', single_table_synchronization: 'Single Table Synchronization', whole_database_synchronization: 'Whole Database Synchronization', save: 'Save', diff --git a/paimon-web-ui/src/locales/zh/modules/cdc.ts b/paimon-web-ui/src/locales/zh/modules/cdc.ts index f6ed18010..6481a878e 100644 --- a/paimon-web-ui/src/locales/zh/modules/cdc.ts +++ b/paimon-web-ui/src/locales/zh/modules/cdc.ts @@ -16,10 +16,29 @@ specific language governing permissions and limitations under the License. */ export default { - cdc_job_definition: 'CDC集成任务定义', - create_synchronization_job: '创建同步作业', + cdc_job_definition: '数据入湖', + cdc_job_info: '作业信息', + cdc_data_source: '数据来源', + cdc_data_sink: '数据写入', + create_synchronization_job: '创建作业', + save_synchronization_job: '保存作业', + cdc_job_save_not_null: '拖拽节点配置后再保存', + cdc_job_save_dag_edge: '节点之间连线后再保存', + confirm_title: '确定操作', + confirm_delete_content: '您确定要删除吗?', + confirm_cancel_content: '您确定要取消吗?', + confirm_sure: '确定', + confirm_cancel: '取消', + cdc_job_can_not_delete: '该状态下的作业不能删除', job_name: '作业名称', + cdc_job_exe_error: '作业执行失败', + cdc_job_exe_success: '作业执行成功', + cdc_job_exe_loading: '作业正在执行', + cdc_job_exe_not_support: '不支持的操作', synchronization_type: '同步类型', + synchronization_mode: '同步模式', + startup_timestamp: '指定时间', + startup_timestamp_tip: '选择同步模式为指定时间时填写,13位时间戳', job_description: '作业描述', create_user: '创建用户', create_time: '创建时间', @@ -27,10 +46,19 @@ export default { operation: '操作', edit: '编辑', run: '运行', + cancel: '取消', + loading: '正在执行', delete: '删除', - synchronization_job_name: '同步作业名称', + copy: '复制', + job_current_status: '作业状态', + synchronization_job_name: '作业名称', edit_synchronization_job: '编辑同步作业', task_description: '任务描述', + data_delay_option: '数据时效', + full_synchronization: '全量同步', + inc_synchronization: '增量同步', + ts_synchronization: '指定时间', + cp_synchronization: '断点同步', single_table_synchronization: '单表同步', whole_database_synchronization: '整库同步', save: '保存', @@ -49,5 +77,5 @@ export default { synchronization_configuration: '同步配置', primary_key: '主键', partition_column: '分区列', - submit_cdc_job: '提交 CDC 作业', + submit_cdc_job: '提交作业', } diff --git a/paimon-web-ui/src/locales/zh/modules/common.ts b/paimon-web-ui/src/locales/zh/modules/common.ts index 644276d39..752cd9596 100644 --- a/paimon-web-ui/src/locales/zh/modules/common.ts +++ b/paimon-web-ui/src/locales/zh/modules/common.ts @@ -27,5 +27,5 @@ export default { yes: '是', no: '否', action: '操作', - flink_session_url: 'Flink Session 集群地址', + flink_session_url: '集群地址', } diff --git a/paimon-web-ui/src/locales/zh/modules/layout.ts b/paimon-web-ui/src/locales/zh/modules/layout.ts index 4e222f4ab..6b79e1f5c 100644 --- a/paimon-web-ui/src/locales/zh/modules/layout.ts +++ b/paimon-web-ui/src/locales/zh/modules/layout.ts @@ -18,7 +18,7 @@ under the License. */ export default { playground: '查询控制台', metadata: '元数据管理', - cdc_ingestion: 'CDC 集成', + cdc_ingestion: '数据入湖', system: '系统管理', light: '浅色', dark: '暗色', diff --git a/paimon-web-ui/src/locales/zh/modules/system.ts b/paimon-web-ui/src/locales/zh/modules/system.ts index 0193c5da8..a13d4fa47 100644 --- a/paimon-web-ui/src/locales/zh/modules/system.ts +++ b/paimon-web-ui/src/locales/zh/modules/system.ts @@ -44,7 +44,7 @@ const role = { const roleKey = { system: '系统管理', metadata: '元数据管理', - cdc: 'CDC 集成', + cdc: '数据入湖', playground: 'Playground', menu_manager: '菜单管理', user_manager: '用户管理', diff --git a/paimon-web-ui/src/views/cdc/components/dag/dag-canvas.tsx b/paimon-web-ui/src/views/cdc/components/dag/dag-canvas.tsx index 6a0bca9de..27eeb2949 100644 --- a/paimon-web-ui/src/views/cdc/components/dag/dag-canvas.tsx +++ b/paimon-web-ui/src/views/cdc/components/dag/dag-canvas.tsx @@ -41,6 +41,7 @@ export default defineComponent({ if (graph.value) { graph.value.on('node:dblclick', ({ node }) => { nodeVariables.showDrawer = true + nodeVariables.showContextMenu = false nodeVariables.row = node.data }) graph.value.on('node:contextmenu', ({ e, node }) => { diff --git a/paimon-web-ui/src/views/cdc/components/dag/dag-slider.tsx b/paimon-web-ui/src/views/cdc/components/dag/dag-slider.tsx index 69c1875f7..fe542cb12 100644 --- a/paimon-web-ui/src/views/cdc/components/dag/dag-slider.tsx +++ b/paimon-web-ui/src/views/cdc/components/dag/dag-slider.tsx @@ -39,21 +39,6 @@ export default defineComponent({ value: 'MYSQL', type: 'INPUT', }, - { - name: 'Kafka', - value: 'KAFKA', - type: 'INPUT', - }, - { - name: 'MongoDB', - value: 'MONGODB', - type: 'INPUT', - }, - { - name: 'PostgreSQL', - value: 'POSTGRESQL', - type: 'INPUT', - }, ], sinkList: [ { @@ -83,11 +68,11 @@ export default defineComponent({ render() { return (
- +
-
Source
+
{this.t('cdc.cdc_data_source')}
{ this.sourceList.map((item) => { return ( @@ -101,7 +86,7 @@ export default defineComponent({
-
Sink
+
{this.t('cdc.cdc_data_sink')}
{ this.sinkList.map((item) => { return ( diff --git a/paimon-web-ui/src/views/cdc/components/dag/index.module.scss b/paimon-web-ui/src/views/cdc/components/dag/index.module.scss index 77658b51a..8a2a4d3ed 100644 --- a/paimon-web-ui/src/views/cdc/components/dag/index.module.scss +++ b/paimon-web-ui/src/views/cdc/components/dag/index.module.scss @@ -46,7 +46,7 @@ under the License. */ border-radius: 5px; .title { - font-size: 18px; + font-size: 16px; } .list-item { diff --git a/paimon-web-ui/src/views/cdc/components/dag/index.tsx b/paimon-web-ui/src/views/cdc/components/dag/index.tsx index f6b45a73f..d6a8dcfbe 100644 --- a/paimon-web-ui/src/views/cdc/components/dag/index.tsx +++ b/paimon-web-ui/src/views/cdc/components/dag/index.tsx @@ -15,12 +15,13 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -import { Leaf, Save } from '@vicons/ionicons5' +import { Leaf } from '@vicons/ionicons5' import type { Router } from 'vue-router' import styles from './index.module.scss' import DagCanvas from './dag-canvas' import { useCDCStore } from '@/store/cdc' -import { createCdcJob, updateCdcJob } from '@/api/models/cdc' +import { type CdcJobDefinition, createCdcJob, updateCdcJob } from '@/api/models/cdc' +import Modal from '@/components/modal' export default defineComponent({ name: 'DagPage', @@ -28,23 +29,79 @@ export default defineComponent({ const { t } = useLocaleHooks() const CDCStore = useCDCStore() const router: Router = useRouter() + const message = useMessage() + const showModalRef = ref(false) + const CDCModalRef = ref() + const row = reactive({ + name: '', + description: '', + cdcType: 0, + dataDelay: 60 * 1000, + }) const name = ref('') + const dagRef = ref() as any + onMounted(() => { name.value = CDCStore.getModel.name + if (dagRef.value && dagRef.value.graph) { + dagRef.value.graph.fromJSON({ + cells: CDCStore.getModel.cells, + }) + } }) - const dagRef = ref() as any + onUpdated(() => { + name.value = CDCStore.getModel.name + if (dagRef.value && dagRef.value.graph) { + dagRef.value.graph.fromJSON({ + cells: CDCStore.getModel.cells, + }) + } + }) + + const handleShowJobInfo = () => { + row.name = CDCStore.getModel.name + row.description = CDCStore.getModel.description + row.cdcType = CDCStore.getModel.cdcType + row.dataDelay = CDCStore.getModel.dataDelay + showModalRef.value = true + } + + const handleConfirm = async (model: any) => { + await CDCModalRef.value.formRef.validate() + const rawModel = useCDCStore().getModel + rawModel.name = model.name + rawModel.description = model.description + rawModel.cdcType = model.cdcType + rawModel.dataDelay = model.dataDelay + CDCStore.setModel(rawModel) + showModalRef.value = false + } + const handleSave = () => { const editMode = CDCStore.getModel.editMode + const config = dagRef.value.graph.toJSON() + if (config.cells.length === 0) { + message.warning(t('cdc.cdc_job_save_not_null')) + return + } + const jobParam: CdcJobDefinition = { + name: CDCStore.getModel.name, + description: CDCStore.getModel.description, + cdcType: CDCStore.getModel.cdcType, + config: JSON.stringify(config), + dataDelay: CDCStore.getModel.dataDelay, + } if (editMode === 'edit') { - updateCdcJob({ id: CDCStore.getModel.id, name: `${name.value}`, description: CDCStore.getModel.description, cdcType: CDCStore.getModel.synchronizationType, config: JSON.stringify(dagRef.value.graph.toJSON()) }) + jobParam.id = CDCStore.getModel.id + updateCdcJob(jobParam) .then(() => { router.push({ path: '/cdc_ingestion' }) }) } else { - createCdcJob({ name: `${name.value}`, description: CDCStore.getModel.description, cdcType: CDCStore.getModel.synchronizationType, config: JSON.stringify(dagRef.value.graph.toJSON()) }) + createCdcJob(jobParam) .then(() => { router.push({ path: '/cdc_ingestion' }) }) @@ -54,18 +111,14 @@ export default defineComponent({ const handleJump = () => { router.push({ path: '/cdc_ingestion' }) } - - onMounted(() => { - if (dagRef.value && dagRef.value.graph) { - dagRef.value.graph.fromJSON({ - cells: CDCStore.getModel.cells, - }) - } - }) - return { t, name, + row, + handleShowJobInfo, + handleConfirm, + showModalRef, + CDCModalRef, handleSave, dagRef, handleJump, @@ -86,26 +139,26 @@ export default defineComponent({
- ( - , - }} - > - - ), - }} - > - {this.t('cdc.save')} - + + {this.t('cdc.cdc_job_info')} + + + {this.t('cdc.save_synchronization_job')} +
+ {this.showModalRef && ( + (this.showModalRef = false)} + onConfirm={this.handleConfirm} + /> + )}
diff --git a/paimon-web-ui/src/views/cdc/components/list/index.tsx b/paimon-web-ui/src/views/cdc/components/list/index.tsx index 0d04bab22..e0fdad3c3 100644 --- a/paimon-web-ui/src/views/cdc/components/list/index.tsx +++ b/paimon-web-ui/src/views/cdc/components/list/index.tsx @@ -20,7 +20,7 @@ import { useTable } from './use-table' export default defineComponent({ name: 'ListPage', - emits: ['cdcJobSubmit'], + emits: ['cdcJobSubmit', 'cdcJobCancel'], setup(_, ctx) { const { t } = useLocaleHooks() diff --git a/paimon-web-ui/src/views/cdc/components/list/use-table.ts b/paimon-web-ui/src/views/cdc/components/list/use-table.ts index 5344fc30b..bc7188ca8 100644 --- a/paimon-web-ui/src/views/cdc/components/list/use-table.ts +++ b/paimon-web-ui/src/views/cdc/components/list/use-table.ts @@ -16,9 +16,11 @@ specific language governing permissions and limitations under the License. */ import type { Router } from 'vue-router' +import { NTag } from 'naive-ui' import { useCDCStore } from '@/store/cdc' import TableAction from '@/components/table-action' -import { deleteCdcJobDefinition, getCdcJobDefinition, listAllCdcJob } from '@/api/models/cdc' +import { copyCdcJobDefinition, deleteCdcJobDefinition, getCdcJobDefinition, listAllCdcJob } from '@/api/models/cdc' +import { JobStatus } from '@/api/models/cdc/types/cdcJob' export function useTable(ctx: any) { const router: Router = useRouter() @@ -39,11 +41,6 @@ export function useTable(ctx: any) { return t(message) }, }, - { - title: t('cdc.job_description'), - key: 'description', - resizable: true, - }, { title: t('cdc.create_user'), key: 'createUser', @@ -59,18 +56,43 @@ export function useTable(ctx: any) { key: 'updateTime', resizable: true, }, + { + title: t('cdc.job_current_status'), + key: 'currentStatus', + resizable: true, + render: (row: any) => { + const currentStatus = row.currentStatus + switch (currentStatus) { + case JobStatus.RESTARTING: + case JobStatus.RUNNING: + return h(NTag, { type: 'success', size: 'small' }, { default: () => currentStatus }) + case JobStatus.CANCELED: + case JobStatus.FINISHED: + case JobStatus.FRESHED: + return h(NTag, { type: 'info', size: 'small' }, { default: () => currentStatus }) + case JobStatus.FAILED: + case JobStatus.UNKNOWN: + return h(NTag, { type: 'error', size: 'small' }, { default: () => currentStatus }) + default: + return h(NTag, { type: 'warning', size: 'small' }, { default: () => currentStatus }) + } + }, + }, { title: t('cdc.operation'), key: 'actions', render: (row: any) => h(TableAction, { row, - onHandleEdit: (row) => { + onHandleEdit: (row: any) => { getCdcJobDefinition(row.id).then((res) => { const CDCStore = useCDCStore() CDCStore.setModel({ cells: JSON.parse(res.data.config).cells, name: res.data.name, + description: res.data.description, + cdcType: res.data.cdcType, + dataDelay: res.data.dataDelay, editMode: 'edit', id: res.data.id, }) @@ -82,13 +104,25 @@ export function useTable(ctx: any) { CDCStore.setModel({ id: row.id, }) - ctx.emit('cdcJobSubmit') + ctx.emit('cdcJobSubmit', row) }, - onHandleDelete: (row) => { + onHandleCancel: (row: any) => { + const CDCStore = useCDCStore() + CDCStore.setModel({ + id: row.id, + }) + ctx.emit('cdcJobCancel', row) + }, + onHandleDelete: (row: any) => { deleteCdcJobDefinition(row.id).then(() => { getTableData() }) }, + onHandleCopy: (row: any) => { + copyCdcJobDefinition(row.id).then(() => { + getTableData() + }) + }, }), }, ], diff --git a/paimon-web-ui/src/views/cdc/index.tsx b/paimon-web-ui/src/views/cdc/index.tsx index ba1e23dc3..fbb0b5690 100644 --- a/paimon-web-ui/src/views/cdc/index.tsx +++ b/paimon-web-ui/src/views/cdc/index.tsx @@ -21,15 +21,17 @@ import List from './components/list' import styles from './index.module.scss' import Modal from '@/components/modal' import { useCDCStore } from '@/store/cdc' -import { type CdcJobSubmit, submitCdcJob } from '@/api/models/cdc' +import { type CdcJobDefinition, type CdcJobSubmit, cancelCdcJob, freshCdcJobStatus, submitCdcJob } from '@/api/models/cdc' +import { JobStatus } from '@/api/models/cdc/types/cdcJob' export default defineComponent({ name: 'CDCPage', setup() { const { t } = useLocaleHooks() + const message = useMessage() + const dialog = useDialog() const showModalRef = ref(false) const showSubmitCdcJobModalRef = ref(false) - const filterValue = ref() const CDCStore = useCDCStore() const router: Router = useRouter() @@ -43,37 +45,129 @@ export default defineComponent({ } const cdcJobTableRef = ref() + // the actionJobMap member is stateful + const actionJobMap = new Map() + const refreshStatusMap = new Map() + const INTERVAL_MILLISECOND = 1000 function handleOpenModal() { showModalRef.value = true } - function handleOpenSubmitCdcJobModal() { - showSubmitCdcJobModalRef.value = true + function freshStatus(row: CdcJobDefinition, logId: number, lastStatus: string | undefined) { + const id = row.id + if (id) { + freshCdcJobStatus(id, logId).then((res: any) => { + const status = res.data + if (status !== JobStatus.SUBMITTING + && status !== JobStatus.CANCELLING + && status !== lastStatus) { + // fresh target row status + row.currentStatus = status + actionJobMap.delete(id) + const refreshJobStatusIntervalId = refreshStatusMap.get(id) + if (refreshJobStatusIntervalId) { + clearInterval(refreshJobStatusIntervalId) + refreshStatusMap.delete(id) + } + message.success(t('cdc.cdc_job_exe_success')) + } + }).catch((res) => { + message.error(res.msg) + }) + } + } + + function watchStatus(id: number, logId: number, lastStatus: string | undefined) { + const targetRow = actionJobMap.get(id) + let refreshJobStatusIntervalId = refreshStatusMap.get(id) + if (targetRow && !refreshJobStatusIntervalId) { + refreshJobStatusIntervalId = setInterval(() => { + freshStatus(targetRow, logId, lastStatus) + }, INTERVAL_MILLISECOND) + refreshStatusMap.set(id, refreshJobStatusIntervalId) + } + } + + function toUpdateJobStatus(id: number, res: any) { + const data = res.data + const logId = data.logId + const targetRow = actionJobMap.get(id) + if (targetRow) { + // update status + const lastStatus = targetRow.currentStatus + targetRow.currentStatus = data.jobStatus + // cron interval to fresh status + if (targetRow.currentStatus === JobStatus.SUBMITTING + || targetRow.currentStatus === JobStatus.CANCELLING) { + watchStatus(id, logId, lastStatus) + } + else { + // remove the target row in actionJobMap + actionJobMap.delete(id) + } + } } function handleCdcSubmitConfirm(form: CdcJobSubmit) { const CDCStore = useCDCStore() - submitCdcJob(CDCStore.getModel.id, form) + const id = CDCStore.getModel.id + // show submit status + const row = actionJobMap.get(id) + if (row) { + row.currentStatus = JobStatus.SUBMITTING + } + // to submit cdc job + submitCdcJob(id, form).then((res) => { + toUpdateJobStatus(id, res) + }) + // close modal dialog showSubmitCdcJobModalRef.value = false } - function handleSeachCdcJobTable() { + function handleSearchCdcJobTable() { cdcJobTableRef.value.getTableData(filterValue.value) } + function showSubmitCdcJobModal(row: any) { + // put the target row to action map + actionJobMap.set(row.id, row) + showSubmitCdcJobModalRef.value = true + } + + function handleCancelCdcJob(row: any) { + const id = row.id + cancelCdcJob(id).then((res) => { + toUpdateJobStatus(id, res) + }) + } + + function showCancelCdcJobMessage(row: any) { + dialog.warning({ + title: t('cdc.confirm_title'), + content: t('cdc.confirm_cancel_content'), + positiveText: t('cdc.confirm_sure'), + negativeText: t('cdc.confirm_cancel'), + onPositiveClick: () => { + actionJobMap.set(row.id, row) + handleCancelCdcJob(row) + }, + }) + } + return { t, showModalRef, showSubmitCdcJobModalRef, + showSubmitCdcJobModal, + showCancelCdcJobMessage, handleOpenModal, handleConfirm, CDCModalRef, submitCdcJobModalRef, - handleOpenSubmitCdcJobModal, handleCdcSubmitConfirm, cdcJobTableRef, - handleSeachCdcJobTable, + handleSearchCdcJobTable, filterValue, } }, @@ -96,7 +190,7 @@ export default defineComponent({ v-slots={{ prefix: () => , }} - onBlur={this.handleSeachCdcJobTable} + onBlur={this.handleSearchCdcJobTable} /> {this.t('cdc.create_synchronization_job')} @@ -105,7 +199,11 @@ export default defineComponent({
- (this.showSubmitCdcJobModalRef = true)}> + (this.showSubmitCdcJobModal(row))} + onCdcJobCancel={row => (this.showCancelCdcJobMessage(row))} + /> {this.showModalRef && ( { if (e.code === 'Enter') { isSearch.value = Boolean(filterValue.value) - filterValue.value - ? await catalogStore.getTablesByDataBaseId({ + if (filterValue.value) { + await catalogStore.getTablesByDataBaseId({ name: filterValue.value, }) - : await catalogStore.getAllCatalogs(true) + } + else { + await catalogStore.getAllCatalogs(true) + } } } diff --git a/paimon-web-ui/src/views/playground/components/query/components/console/components/controls/index.tsx b/paimon-web-ui/src/views/playground/components/query/components/console/components/controls/index.tsx index 4e965a1d4..47c14cfdd 100644 --- a/paimon-web-ui/src/views/playground/components/query/components/console/components/controls/index.tsx +++ b/paimon-web-ui/src/views/playground/components/query/components/console/components/controls/index.tsx @@ -106,6 +106,7 @@ export default defineComponent({ message.warning(t('playground.job_stopping_failed')) } catch (error) { + console.error(error) message.warning(t('playground.job_stopping_failed')) } } diff --git a/paimon-web-ui/src/views/playground/components/query/components/debugger/index.tsx b/paimon-web-ui/src/views/playground/components/query/components/debugger/index.tsx index f3b625610..a6e701b56 100644 --- a/paimon-web-ui/src/views/playground/components/query/components/debugger/index.tsx +++ b/paimon-web-ui/src/views/playground/components/query/components/debugger/index.tsx @@ -202,6 +202,7 @@ export default defineComponent({ message.warning(t('playground.job_stopping_failed')) } catch (error) { + console.error(error) message.warning(t('playground.job_stopping_failed')) } } diff --git a/paimon-web-ui/src/views/playground/components/query/components/tabs/index.tsx b/paimon-web-ui/src/views/playground/components/query/components/tabs/index.tsx index f0b189110..cf5027eea 100644 --- a/paimon-web-ui/src/views/playground/components/query/components/tabs/index.tsx +++ b/paimon-web-ui/src/views/playground/components/query/components/tabs/index.tsx @@ -58,7 +58,9 @@ export default defineComponent({ tabVariables.panelsList.find((item: any) => item.key === tabVariables.chooseTab).content = toRaw(editorVariables.editor).getValue() handleFormat() tabVariables.panelsList.find((item: any) => item.key === tabVariables.chooseTab).isSaved = true - menuTreeRef.value && menuTreeRef.value?.onLoadRecordData() + if (menuTreeRef.value && menuTreeRef.value.onLoadRecordData) { + menuTreeRef.value.onLoadRecordData() + } } const handleContentChange = (value: string) => { diff --git a/paimon-web-ui/src/views/playground/components/query/index.tsx b/paimon-web-ui/src/views/playground/components/query/index.tsx index 8d21ba1d1..4d429df41 100644 --- a/paimon-web-ui/src/views/playground/components/query/index.tsx +++ b/paimon-web-ui/src/views/playground/components/query/index.tsx @@ -107,12 +107,12 @@ export default defineComponent({ const { connect, subscribe } = useWebSocket(wsUrl, { onMessage: (message) => { const data = JSON.parse(message.body) - if (data && data.jobId && data.fileName) { + if (data && data.fileName) { const jobDetail: JobDetails = { executionMode: data.executeMode as ExecutionMode, job: data, jobResultData: null, - jobStatus: '', + jobStatus: data.status == null ? '' : data.status, executionTime: 0, startTime: Date.now(), displayResult: true, diff --git a/paimon-web-ui/src/views/system/cluster/components/cluster-delete/index.tsx b/paimon-web-ui/src/views/system/cluster/components/cluster-delete/index.tsx index 4a7e674ac..b1b18a5ee 100644 --- a/paimon-web-ui/src/views/system/cluster/components/cluster-delete/index.tsx +++ b/paimon-web-ui/src/views/system/cluster/components/cluster-delete/index.tsx @@ -27,8 +27,12 @@ export default defineComponent({ }, setup(props) { const onDelete = async () => { - (props.clusterId || props.clusterId === 0) && await deleteCluster(props.clusterId) - props.onDelete && props.onDelete() + if (props.clusterId || props.clusterId === 0) { + await deleteCluster(props.clusterId) + } + if (props.onDelete) { + props.onDelete() + } } return { diff --git a/paimon-web-ui/src/views/system/cluster/components/cluster-form/index.tsx b/paimon-web-ui/src/views/system/cluster/components/cluster-form/index.tsx index 29e0c0fc5..78ec21034 100644 --- a/paimon-web-ui/src/views/system/cluster/components/cluster-form/index.tsx +++ b/paimon-web-ui/src/views/system/cluster/components/cluster-form/index.tsx @@ -121,26 +121,32 @@ export default defineComponent({ const formRef = ref() const handleCloseModal = () => { - props['onUpdate:visible'] && props['onUpdate:visible'](false) + if (props['onUpdate:visible']) { + props['onUpdate:visible'](false) + } resetState() } async function handleConfirm() { await formRef.value.validate() - props && props.onConfirm && props.onConfirm() + if (props && props.onConfirm) { + props.onConfirm() + } handleCloseModal() resetState() } function resetState() { - props['onUpdate:formValue'] && props['onUpdate:formValue']({ - clusterName: '', - host: '', - port: 0, - enabled: true, - type: '', - deploymentMode: '', - }) + if (props['onUpdate:formValue']) { + props['onUpdate:formValue']({ + clusterName: '', + host: '', + port: 0, + enabled: true, + type: '', + deploymentMode: '', + }) + } } return { diff --git a/paimon-web-ui/src/views/system/role/components/role-delete/index.tsx b/paimon-web-ui/src/views/system/role/components/role-delete/index.tsx index 5e993d9d5..d26270594 100644 --- a/paimon-web-ui/src/views/system/role/components/role-delete/index.tsx +++ b/paimon-web-ui/src/views/system/role/components/role-delete/index.tsx @@ -27,8 +27,12 @@ export default defineComponent({ }, setup(props) { const onDelete = async () => { - (props.roleId || props.roleId === 0) && await deleteRole(props.roleId) - props.onDelete && props.onDelete() + if (props.roleId || props.roleId === 0) { + await deleteRole(props.roleId) + } + if (props.onDelete) { + props.onDelete() + } } return { diff --git a/paimon-web-ui/src/views/system/role/components/role-form/index.tsx b/paimon-web-ui/src/views/system/role/components/role-form/index.tsx index 95a93e654..a05918572 100644 --- a/paimon-web-ui/src/views/system/role/components/role-form/index.tsx +++ b/paimon-web-ui/src/views/system/role/components/role-form/index.tsx @@ -84,7 +84,9 @@ export default defineComponent({ const formRef = ref() const handleCloseModal = () => { - props['onUpdate:visible'] && props['onUpdate:visible'](false) + if (props['onUpdate:visible']) { + props['onUpdate:visible'](false) + } resetState() } @@ -102,20 +104,24 @@ export default defineComponent({ const handleConfirm = async () => { await formRef.value.validate() - props && props.onConfirm && props.onConfirm() + if (props && props.onConfirm) { + props.onConfirm() + } handleCloseModal() resetState() } function resetState() { - props['onUpdate:formValue'] && props['onUpdate:formValue']({ - roleName: '', - roleKey: '', - enabled: true, - remark: '', - menuIds: [], - indeterminateKeys: [], - }) + if (props['onUpdate:formValue']) { + props['onUpdate:formValue']({ + roleName: '', + roleKey: '', + enabled: true, + remark: '', + menuIds: [], + indeterminateKeys: [], + }) + } } return { diff --git a/paimon-web-ui/src/views/system/user/components/user-delete/index.tsx b/paimon-web-ui/src/views/system/user/components/user-delete/index.tsx index f03f963c7..102410443 100644 --- a/paimon-web-ui/src/views/system/user/components/user-delete/index.tsx +++ b/paimon-web-ui/src/views/system/user/components/user-delete/index.tsx @@ -27,10 +27,13 @@ export default defineComponent({ }, setup(props) { const onDelete = async () => { - (props.userId || props.userId === 0) && await deleteUser(props.userId) - props.onDelete && props.onDelete() + if (props.userId || props.userId === 0) { + await deleteUser(props.userId) + } + if (props.onDelete) { + props.onDelete() + } } - return { onDelete, } diff --git a/paimon-web-ui/src/views/system/user/components/user-form/index.tsx b/paimon-web-ui/src/views/system/user/components/user-form/index.tsx index 5336b0a07..3f86cdb35 100644 --- a/paimon-web-ui/src/views/system/user/components/user-form/index.tsx +++ b/paimon-web-ui/src/views/system/user/components/user-form/index.tsx @@ -88,27 +88,33 @@ export default defineComponent({ ) const handleCloseModal = () => { - props['onUpdate:visible'] && props['onUpdate:visible'](false) + if (props['onUpdate:visible']) { + props['onUpdate:visible'](false) + } resetState() } async function handleConfirm() { await formRef.value.validate() - props && props.onConfirm && props.onConfirm() + if (props && props.onConfirm) { + props.onConfirm() + } handleCloseModal() resetState() } function resetState() { - props['onUpdate:formValue'] && props['onUpdate:formValue']({ - username: '', - nickname: '', - password: '', - enabled: true, - mobile: '', - email: '', - roleIds: undefined, - }) + if (props['onUpdate:formValue']) { + props['onUpdate:formValue']({ + username: '', + nickname: '', + password: '', + enabled: true, + mobile: '', + email: '', + roleIds: undefined, + }) + } } return { diff --git a/paimon-web-ui/vite.config.ts b/paimon-web-ui/vite.config.ts index ff16513a7..eedc18480 100644 --- a/paimon-web-ui/vite.config.ts +++ b/paimon-web-ui/vite.config.ts @@ -31,11 +31,11 @@ export default defineConfig({ server: { proxy: { '/mock': { - target: 'http://127.0.0.1:10088', + target: 'http://172.21.200.175:10088', changeOrigin: true, }, '/api': { - target: 'http://127.0.0.1:10088', + target: 'http://172.21.200.175:10088', changeOrigin: true, }, }, diff --git a/scripts/sql/paimon-mysql.sql b/scripts/sql/paimon-mysql.sql index e958db0be..b4b993a7f 100644 --- a/scripts/sql/paimon-mysql.sql +++ b/scripts/sql/paimon-mysql.sql @@ -152,11 +152,26 @@ CREATE TABLE if not exists `cdc_job_definition` create_user varchar(20) null comment 'create user', create_time datetime default CURRENT_TIMESTAMP null comment 'create time', update_time datetime default CURRENT_TIMESTAMP null comment 'update time', + data_delay int default null COMMENT 'data delay', is_delete tinyint(1) default 0 not null comment 'is delete', constraint cdc_job_definition_pk unique (`name`) ) engine = innodb; +DROP TABLE IF EXISTS `cdc_job_log`; +CREATE TABLE if not exists `cdc_job_log` +( + id int(11) not null auto_increment primary key comment 'id', + cluster_id int(11) not null comment 'cluster id', + cdc_job_definition_id int (11) not null comment 'cdc job definition id', + create_user varchar(20) not null comment 'create user', + create_time datetime default CURRENT_TIMESTAMP null comment 'create time', + update_time datetime default CURRENT_TIMESTAMP null comment 'update time', + terminate_time datetime default null comment 'terminate time', + current_status varchar(16) not null comment 'cdc job current FRESHED, RESTARTING, SUBMITTING, CREATED, RUNNING, FAILED, FINISHED, CANCELED', + flink_job_id varchar(32) default null comment 'flink job id' + ) engine = innodb; + CREATE TABLE if not exists `job` ( `id` int(11) not null auto_increment primary key comment 'id',