From 3581578d37c668768d0a2411fbedc6e6270d005b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Tue, 6 Jan 2026 20:30:55 +0800 Subject: [PATCH 1/4] add task result alert from worker to master by rpc --- .../common/enums/AlertType.java | 3 +- .../dao/mapper/TaskDefinitionMapperTest.java | 4 +- .../master/ITaskResultAlertService.java | 30 ++++++++ ...TaskExecutorEventRemoteReporterClient.java | 37 ++++++++++ .../transportor/TaskResultAlertRequest.java | 42 +++++++++++ .../transportor/TaskResultAlertResponse.java | 40 +++++++++++ .../rpc/TaskResultAlertServiceImpl.java | 72 +++++++++++++++++++ .../TaskExecutorEventBusCoordinator.java | 5 ++ .../TaskExecutorLifecycleEventType.java | 2 + ...TaskExecutorResultAlertLifecycleEvent.java | 59 +++++++++++++++ .../ITaskExecutorLifecycleEventListener.java | 2 + .../TaskExecutorLifecycleEventListener.java | 6 ++ .../plugin/task/api/AbstractTask.java | 12 ++-- .../plugin/task/api/TaskCallBack.java | 3 + ...lertInfo.java => TaskResultAlertInfo.java} | 28 ++------ .../task/api/parameters/SqlParameters.java | 14 ++-- .../api/parameters/SqlParametersTest.java | 6 +- .../plugin/task/datax/DataxTaskTest.java | 6 ++ .../plugin/task/emr/EmrAddStepsTaskTest.java | 6 ++ .../plugin/task/emr/EmrJobFlowTaskTest.java | 6 ++ .../plugin/task/sql/SqlTask.java | 22 ++++-- .../task/zeppelin/ZeppelinTaskTest.java | 6 ++ .../components/node/fields/use-sql-type.ts | 14 ++-- .../task/components/node/format-data.ts | 4 +- .../projects/task/components/node/types.ts | 2 +- .../worker/executor/PhysicalTaskExecutor.java | 8 +++ 26 files changed, 382 insertions(+), 57 deletions(-) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java create mode 100644 dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java rename dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/{TaskAlertInfo.java => TaskResultAlertInfo.java} (66%) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java index f7a5ba0d3906..096f4e3c7918 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java @@ -29,7 +29,7 @@ public enum AlertType { /** * 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning, - * 5 task failure, 6 task success, 7 task timeout, 8 close alert + * 5 task failure, 6 task success, 7 task timeout, 8 task result */ WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"), WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"), @@ -39,6 +39,7 @@ public enum AlertType { TASK_FAILURE(5, "task failure"), TASK_SUCCESS(6, "task success"), TASK_TIMEOUT(7, "task timeout"), + TASK_RESULT(8, "task result"); ; AlertType(int code, String descp) { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java index da3cc1d27481..83fc14e5fec7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java @@ -140,7 +140,7 @@ public void testDeleteByCode() { @Test public void testNullPropertyValueOfLocalParams() { String definitionJson = - "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; + "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class); Map taskParamsMap = definition.getTaskParamMap(); @@ -157,7 +157,7 @@ public void testNullPropertyValueOfLocalParams() { @Test public void testNullLocalParamsOfTaskParams() { String definitionJson = - "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; + "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class); Assertions.assertNull(definition.getTaskParamMap(), "Serialize the task definition success"); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java new file mode 100644 index 000000000000..32eeb9e3c160 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master; + +import org.apache.dolphinscheduler.extract.base.RpcMethod; +import org.apache.dolphinscheduler.extract.base.RpcService; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; + +@RpcService +public interface ITaskResultAlertService { + + @RpcMethod + TaskResultAlertResponse reportTaskResultAlertToMaster(TaskResultAlertRequest taskResultAlertRequest); +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java index cec9f8eab781..58229a576539 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java @@ -18,12 +18,16 @@ package org.apache.dolphinscheduler.extract.master; import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorEventRemoteReporterClient; import org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorDispatchedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; +import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -67,6 +71,10 @@ public void reportTaskExecutionEventToMaster(final String masterAddress, reportTaskSuccessEventToMaster(masterAddress, (TaskExecutorSuccessLifecycleEvent) taskExecutorLifecycleEvent); break; + case RESULT_ALERT: + reportTaskResultAlertEventToMaster(masterAddress, + (TaskExecutorResultAlertLifecycleEvent) taskExecutorLifecycleEvent); + break; default: log.warn("Unsupported TaskExecutionEvent: {}", taskExecutorLifecycleEvent); } @@ -131,4 +139,33 @@ private static void reportTaskSuccessEventToMaster(final String masterAddress, .withHost(masterAddress) .onTaskExecutorSuccess(taskExecutionSuccessEvent); } + + private static void reportTaskResultAlertEventToMaster(final String masterAddress, + final TaskExecutorResultAlertLifecycleEvent taskExecutionResultAlertEvent) { + TaskResultAlertInfo taskResultAlertInfo = taskExecutionResultAlertEvent.getTaskResultAlertInfo(); + TaskResultAlertRequest taskResultAlertRequest = + TaskResultAlertRequest.builder() + .alertGroupId(taskResultAlertInfo.getAlertGroupId()) + .title(taskResultAlertInfo.getTitle()) + .content(taskResultAlertInfo.getContent()) + .workflowDefinitionCode(taskResultAlertInfo.getWorkflowDefinitionCode()) + .workflowInstanceId(taskResultAlertInfo.getWorkflowInstanceId()) + .taskInstanceId(taskResultAlertInfo.getTaskInstanceId()) + .build(); + + TaskResultAlertResponse taskResultAlertResponse = Clients.withService(ITaskResultAlertService.class) + .withHost(masterAddress) + .reportTaskResultAlertToMaster(taskResultAlertRequest); + + if (taskResultAlertResponse != null && taskResultAlertResponse.isSuccess()) { + log.info( + "Successfully reported task result alert to master. TaskInstanceId: {}, Title: '{}', MasterAddress: {}", + taskResultAlertInfo.getTaskInstanceId(), taskResultAlertInfo.getTitle(), masterAddress); + } else { + log.warn( + "Failed to report task result alert to master. TaskInstanceId: {}, Title: '{}', MasterAddress: {}, Reason: {}", + taskResultAlertInfo.getTaskInstanceId(), taskResultAlertInfo.getTitle(), masterAddress, + taskResultAlertResponse != null ? taskResultAlertResponse.getMessage() : "response is null"); + } + } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java new file mode 100644 index 000000000000..54ea1e52d9d1 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskResultAlertRequest { + + private String title; + + private String content; + + private Integer alertGroupId; + + private Long workflowDefinitionCode; + + private Integer workflowInstanceId; + + private int taskInstanceId; +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java new file mode 100644 index 000000000000..7915e64836bb --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TaskResultAlertResponse { + + private boolean success; + + private String message; + + public static TaskResultAlertResponse success() { + return new TaskResultAlertResponse(true, null); + } + + public static TaskResultAlertResponse failed(String message) { + return new TaskResultAlertResponse(false, message); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java new file mode 100644 index 000000000000..1b41a2aa9f1f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.extract.master.ITaskResultAlertService; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskResultAlertServiceImpl implements ITaskResultAlertService { + + @Autowired + private AlertDao alertDao; + + @Override + public TaskResultAlertResponse reportTaskResultAlertToMaster(TaskResultAlertRequest taskResultAlertRequest) { + log.info("Received TaskResultAlertRequest request{}", taskResultAlertRequest); + + try { + Alert alert = new Alert(); + alert.setTitle(taskResultAlertRequest.getTitle()); + alert.setContent(taskResultAlertRequest.getContent()); + alert.setWarningType(WarningType.NONE); + alert.setCreateTime(new Date()); + alert.setUpdateTime(new Date()); + alert.setAlertGroupId(taskResultAlertRequest.getAlertGroupId()); + alert.setWorkflowDefinitionCode(taskResultAlertRequest.getWorkflowDefinitionCode()); + alert.setWorkflowInstanceId(taskResultAlertRequest.getWorkflowInstanceId()); + Map info = new HashMap<>(); + info.put("taskInstanceId", taskResultAlertRequest.getTaskInstanceId()); + alert.setInfo(info); + + alert.setAlertType(AlertType.TASK_RESULT); + alertDao.addAlert(alert); + + log.info("add alert success"); + return TaskResultAlertResponse.success(); + } catch (Exception ex) { + log.error("add alert failed:{} ", ex.getMessage()); + return TaskResultAlertResponse.failed(ex.getMessage()); + } + } +} diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java index 1ced554bc7a8..905d0690b2f6 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; +import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -183,6 +184,10 @@ private void doFireTaskExecutorEventBus(final ITaskExecutor taskExecutor) { taskExecutorLifecycleEventListener.onTaskExecutorFinalizeLifecycleEvent( ((TaskExecutorFinalizeLifecycleEvent) taskExecutorLifecycleEvent)); break; + case RESULT_ALERT: + taskExecutorLifecycleEventListener.onTaskExecutorResultAlertLifecycleEvent( + ((TaskExecutorResultAlertLifecycleEvent) taskExecutorLifecycleEvent)); + break; default: throw new IllegalArgumentException( "Unsupported TaskExecutorLifecycleEvent: " + taskExecutorLifecycleEvent); diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java index aa0aa2cd4e56..88e973beb578 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java @@ -37,6 +37,8 @@ public enum TaskExecutorLifecycleEventType { FAILED, + RESULT_ALERT, + FINALIZE, ; diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java new file mode 100644 index 000000000000..e9a2ad158ef0 --- /dev/null +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.task.executor.events; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; +import org.apache.dolphinscheduler.task.executor.ITaskExecutor; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +@Data +@ToString(callSuper = true) +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class TaskExecutorResultAlertLifecycleEvent extends AbstractTaskExecutorLifecycleEvent + implements + IReportableTaskExecutorLifecycleEvent { + + private int workflowInstanceId; + + private String taskInstanceHost; + + private Long latestReportTime; + + private TaskResultAlertInfo taskResultAlertInfo; + + public static TaskExecutorResultAlertLifecycleEvent of(final ITaskExecutor taskExecutor, + TaskResultAlertInfo taskResultAlertInfo) { + final TaskExecutionContext taskExecutionContext = taskExecutor.getTaskExecutionContext(); + return TaskExecutorResultAlertLifecycleEvent.builder() + .workflowInstanceId(taskExecutionContext.getWorkflowInstanceId()) + .taskInstanceId(taskExecutionContext.getTaskInstanceId()) + .taskInstanceHost(taskExecutionContext.getHost()) + .taskResultAlertInfo(taskResultAlertInfo) + .type(TaskExecutorLifecycleEventType.RESULT_ALERT) + .build(); + } + +} diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java index 07da199cb5e4..0755a87288bb 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; +import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -50,4 +51,5 @@ public interface ITaskExecutorLifecycleEventListener { void onTaskExecutorFinalizeLifecycleEvent(final TaskExecutorFinalizeLifecycleEvent event); + void onTaskExecutorResultAlertLifecycleEvent(final TaskExecutorResultAlertLifecycleEvent event); } diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java index c17afaca636b..5d2df2846967 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; +import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -114,6 +115,11 @@ public void onTaskExecutorFinalizeLifecycleEvent(final TaskExecutorFinalizeLifec executorContainer.finalize(taskExecutor); } + @Override + public void onTaskExecutorResultAlertLifecycleEvent(TaskExecutorResultAlertLifecycleEvent event) { + reportTaskExecutorLifecycleEventToMaster(event); + } + private void reportTaskExecutorLifecycleEventToMaster(IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) { taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(taskExecutorLifecycleEvent); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 006f40aa1416..249f9e69860c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import java.util.Map; @@ -58,7 +58,7 @@ public abstract class AbstractTask { protected boolean needAlert = false; - protected TaskAlertInfo taskAlertInfo; + protected TaskResultAlertInfo taskResultAlertInfo; /** * constructor @@ -117,12 +117,12 @@ public void setNeedAlert(boolean needAlert) { this.needAlert = needAlert; } - public TaskAlertInfo getTaskAlertInfo() { - return taskAlertInfo; + public TaskResultAlertInfo getTaskResultAlertInfo() { + return taskResultAlertInfo; } - public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) { - this.taskAlertInfo = taskAlertInfo; + public void setTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + this.taskResultAlertInfo = taskResultAlertInfo; } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java index 010e328c35d9..f3a5095243d3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; public interface TaskCallBack { @@ -26,4 +27,6 @@ public interface TaskCallBack { // todo:The pid should put into runtime context @Deprecated void updateTaskInstanceInfo(int taskInstanceId); + + void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java similarity index 66% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java index dc83af276134..2e768b0cf60f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java @@ -17,7 +17,10 @@ package org.apache.dolphinscheduler.plugin.task.api.model; -public class TaskAlertInfo { +import lombok.Data; + +@Data +public class TaskResultAlertInfo { private String title; @@ -25,27 +28,10 @@ public class TaskAlertInfo { private Integer alertGroupId; - public String getTitle() { - return title; - } - - public void setTitle(String title) { - this.title = title; - } - - public String getContent() { - return content; - } + private Long workflowDefinitionCode; - public void setContent(String content) { - this.content = content; - } + private Integer workflowInstanceId; - public Integer getAlertGroupId() { - return alertGroupId; - } + private int taskInstanceId; - public void setAlertGroupId(Integer alertGroupId) { - this.alertGroupId = alertGroupId; - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index 8864943bac43..d6ef15973152 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -66,9 +66,9 @@ public class SqlParameters extends AbstractParameters { private int sqlType; /** - * send email + * send alert */ - private Boolean sendEmail; + private Boolean sendAlert; /** * display rows @@ -147,12 +147,12 @@ public void setSqlType(int sqlType) { this.sqlType = sqlType; } - public Boolean getSendEmail() { - return sendEmail; + public Boolean getSendAlert() { + return sendAlert; } - public void setSendEmail(Boolean sendEmail) { - this.sendEmail = sendEmail; + public void setSendAlert(Boolean sendAlert) { + this.sendAlert = sendAlert; } public int getDisplayRows() { @@ -273,7 +273,7 @@ public String toString() { + ", datasource=" + datasource + ", sql='" + sql + '\'' + ", sqlType=" + sqlType - + ", sendEmail=" + sendEmail + + ", sendAlert=" + sendAlert + ", displayRows=" + displayRows + ", limit=" + limit + ", showType='" + showType + '\'' diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java index 8f1ee7656000..bfa8ba6416cf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java @@ -35,7 +35,7 @@ public class SqlParametersTest { private final String sql = "select * from t_ds_user"; private final int datasource = 1; private final int sqlType = 0; - private final Boolean sendEmail = true; + private final Boolean sendAlert = true; private final int displayRows = 10; private final String showType = "TABLE"; private final String title = "sql test"; @@ -58,7 +58,7 @@ public void testSqlParameters() { sqlParameters.setSql(sql); sqlParameters.setDatasource(datasource); sqlParameters.setSqlType(sqlType); - sqlParameters.setSendEmail(sendEmail); + sqlParameters.setSendAlert(sendAlert); sqlParameters.setDisplayRows(displayRows); sqlParameters.setShowType(showType); sqlParameters.setTitle(title); @@ -68,7 +68,7 @@ public void testSqlParameters() { Assertions.assertEquals(sql, sqlParameters.getSql()); Assertions.assertEquals(datasource, sqlParameters.getDatasource()); Assertions.assertEquals(sqlType, sqlParameters.getSqlType()); - Assertions.assertEquals(sendEmail, sqlParameters.getSendEmail()); + Assertions.assertEquals(sendAlert, sqlParameters.getSendAlert()); Assertions.assertEquals(displayRows, sqlParameters.getDisplayRows()); Assertions.assertEquals(showType, sqlParameters.getShowType()); Assertions.assertEquals(title, sqlParameters.getTitle()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java index 3b1dd82c9283..48264c076ae4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -78,6 +79,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java index b1fab4d436d1..a24f5b97e790 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.commons.io.IOUtils; @@ -86,6 +87,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java index fc50f7ed97b8..2a9a920fea7b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.commons.io.IOUtils; @@ -108,6 +109,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 44296bc3313b..c58695c63710 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -156,6 +156,11 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); + if (this.getNeedAlert()) { + // callback to report taskResult alert + taskCallBack.reportTaskResultAlertInfo(this.getTaskResultAlertInfo()); + } + } catch (Exception e) { if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) { log.info("sql task has been killed"); @@ -282,8 +287,8 @@ private String resultProcess(ResultSet resultSet) throws Exception { String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet)) : JSONUtils.toJsonString(resultJSONArray); - if (Boolean.TRUE.equals(sqlParameters.getSendEmail())) { - sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) + if (Boolean.TRUE.equals(sqlParameters.getSendAlert())) { + sendTaskResultAlert(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", result); } @@ -312,18 +317,21 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { } /** - * send alert as an attachment + * send alert * * @param title title * @param content content */ - private void sendAttachment(int groupId, String title, String content) { + private void sendTaskResultAlert(int groupId, String title, String content) { setNeedAlert(Boolean.TRUE); - TaskAlertInfo taskAlertInfo = new TaskAlertInfo(); + TaskResultAlertInfo taskAlertInfo = new TaskResultAlertInfo(); taskAlertInfo.setAlertGroupId(groupId); taskAlertInfo.setContent(content); taskAlertInfo.setTitle(title); - setTaskAlertInfo(taskAlertInfo); + taskAlertInfo.setWorkflowDefinitionCode(this.taskExecutionContext.getWorkflowDefinitionCode()); + taskAlertInfo.setWorkflowInstanceId(this.taskExecutionContext.getWorkflowInstanceId()); + taskAlertInfo.setTaskInstanceId(this.taskExecutionContext.getTaskInstanceId()); + setTaskResultAlertInfo(taskAlertInfo); } private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index ffaf85b207b0..55ae6baf93e3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.zeppelin.client.NoteResult; @@ -82,6 +83,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + + } }; @BeforeEach diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts index d30ef9ef3221..b9ac314ff3d4 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql-type.ts @@ -24,8 +24,8 @@ import type { IJsonItem } from '../types' export function useSqlType(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const querySpan = computed(() => (model.sqlType === '0' ? 6 : 0)) - const emailSpan = computed(() => - model.sqlType === '0' && model.sendEmail ? 24 : 0 + const alertSpan = computed(() => + model.sqlType === '0' && model.sendAlert ? 24 : 0 ) const groups = ref([]) const groupsLoading = ref(false) @@ -69,7 +69,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { }, { type: 'switch', - field: 'sendEmail', + field: 'sendAlert', span: querySpan, name: t('project.node.send_alarm') }, @@ -109,12 +109,12 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.title_tips') }, - span: emailSpan, + span: alertSpan, validate: { trigger: ['input', 'blur'], required: true, validator(unuse, value) { - if (model.sendEmail && !value) + if (model.sendAlert && !value) return new Error(t('project.node.title_tips')) } } @@ -124,7 +124,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { field: 'groupId', name: t('project.node.alarm_group'), options: groups, - span: emailSpan, + span: alertSpan, props: { loading: groupsLoading, placeholder: t('project.node.alarm_group_tips') @@ -133,7 +133,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { trigger: ['input', 'blur'], required: true, validator(unuse, value) { - if (model.sendEmail && !value) + if (model.sendAlert && !value) return new Error(t('project.node.alarm_group_tips')) } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 8fc67906775c..cd7fec72a85b 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -208,9 +208,9 @@ export function formatParams(data: INodeData): { taskParams.sqlType = data.sqlType taskParams.preStatements = data.preStatements taskParams.postStatements = data.postStatements - taskParams.sendEmail = data.sendEmail + taskParams.sendAlert = data.sendAlert taskParams.displayRows = data.displayRows - if (data.sqlType === '0' && data.sendEmail) { + if (data.sqlType === '0' && data.sendAlert) { taskParams.title = data.title taskParams.groupId = data.groupId } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 76dcde23ee2d..570073cebe35 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -305,7 +305,7 @@ interface ITaskParams { datasource?: string sql?: string sqlType?: string - sendEmail?: boolean + sendAlert?: boolean displayRows?: number title?: string groupId?: string diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java index 7118a099d5f9..f25c63a75e2b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogMarkers; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.TaskExecutorState; import org.apache.dolphinscheduler.task.executor.TaskExecutorStateMappings; +import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import java.util.ArrayList; @@ -86,6 +88,12 @@ public void updateRemoteApplicationInfo(final int taskInstanceId, final Applicat public void updateTaskInstanceInfo(final int taskInstanceId) { taskExecutorEventBus.publish(TaskExecutorRuntimeContextChangedLifecycleEvent.of(taskExecutor)); } + + @Override + public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + taskExecutorEventBus + .publish(TaskExecutorResultAlertLifecycleEvent.of(taskExecutor, taskResultAlertInfo)); + } }); } From 4ad437abea4bc3447194b06484ddebddc482c665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Mon, 2 Feb 2026 11:30:15 +0800 Subject: [PATCH 2/4] send result alert sync by rpc from worker to alert --- .../alert/request/AlertSendRequest.java | 4 +- .../master/ITaskResultAlertService.java | 30 ------ ...TaskExecutorEventRemoteReporterClient.java | 37 -------- .../transportor/TaskResultAlertResponse.java | 40 -------- .../rpc/TaskResultAlertServiceImpl.java | 72 -------------- .../TaskExecutorEventBusCoordinator.java | 5 - ...TaskExecutorResultAlertLifecycleEvent.java | 59 ------------ .../ITaskExecutorLifecycleEventListener.java | 1 - .../TaskExecutorLifecycleEventListener.java | 1 - .../plugin/task/api/TaskCallBack.java | 4 +- .../task/api/model/TaskResultAlertInfo.java | 14 +-- .../plugin/task/datax/DataxTaskTest.java | 4 +- .../plugin/task/emr/EmrAddStepsTaskTest.java | 4 +- .../plugin/task/emr/EmrJobFlowTaskTest.java | 4 +- .../plugin/task/sql/SqlTask.java | 53 ++++++----- .../task/zeppelin/ZeppelinTaskTest.java | 4 +- .../server/worker/alert/AlertService.java | 24 +---- .../server/worker/alert/AlertServiceImpl.java | 93 +++++++++++++++++++ .../worker/executor/PhysicalTaskExecutor.java | 15 +-- .../executor/PhysicalTaskExecutorFactory.java | 9 +- 20 files changed, 161 insertions(+), 316 deletions(-) delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java delete mode 100644 dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java => dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java (63%) create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java index dfa6515ddea2..82996bfc6c2f 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.extract.alert.request; +import org.apache.dolphinscheduler.common.enums.AlertType; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -32,6 +34,6 @@ public class AlertSendRequest { private String content; - private int warnType; + private AlertType alertType; } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java deleted file mode 100644 index 32eeb9e3c160..000000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ITaskResultAlertService.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master; - -import org.apache.dolphinscheduler.extract.base.RpcMethod; -import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; - -@RpcService -public interface ITaskResultAlertService { - - @RpcMethod - TaskResultAlertResponse reportTaskResultAlertToMaster(TaskResultAlertRequest taskResultAlertRequest); -} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java index 58229a576539..cec9f8eab781 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java @@ -18,16 +18,12 @@ package org.apache.dolphinscheduler.extract.master; import org.apache.dolphinscheduler.extract.base.client.Clients; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorEventRemoteReporterClient; import org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorDispatchedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -71,10 +67,6 @@ public void reportTaskExecutionEventToMaster(final String masterAddress, reportTaskSuccessEventToMaster(masterAddress, (TaskExecutorSuccessLifecycleEvent) taskExecutorLifecycleEvent); break; - case RESULT_ALERT: - reportTaskResultAlertEventToMaster(masterAddress, - (TaskExecutorResultAlertLifecycleEvent) taskExecutorLifecycleEvent); - break; default: log.warn("Unsupported TaskExecutionEvent: {}", taskExecutorLifecycleEvent); } @@ -139,33 +131,4 @@ private static void reportTaskSuccessEventToMaster(final String masterAddress, .withHost(masterAddress) .onTaskExecutorSuccess(taskExecutionSuccessEvent); } - - private static void reportTaskResultAlertEventToMaster(final String masterAddress, - final TaskExecutorResultAlertLifecycleEvent taskExecutionResultAlertEvent) { - TaskResultAlertInfo taskResultAlertInfo = taskExecutionResultAlertEvent.getTaskResultAlertInfo(); - TaskResultAlertRequest taskResultAlertRequest = - TaskResultAlertRequest.builder() - .alertGroupId(taskResultAlertInfo.getAlertGroupId()) - .title(taskResultAlertInfo.getTitle()) - .content(taskResultAlertInfo.getContent()) - .workflowDefinitionCode(taskResultAlertInfo.getWorkflowDefinitionCode()) - .workflowInstanceId(taskResultAlertInfo.getWorkflowInstanceId()) - .taskInstanceId(taskResultAlertInfo.getTaskInstanceId()) - .build(); - - TaskResultAlertResponse taskResultAlertResponse = Clients.withService(ITaskResultAlertService.class) - .withHost(masterAddress) - .reportTaskResultAlertToMaster(taskResultAlertRequest); - - if (taskResultAlertResponse != null && taskResultAlertResponse.isSuccess()) { - log.info( - "Successfully reported task result alert to master. TaskInstanceId: {}, Title: '{}', MasterAddress: {}", - taskResultAlertInfo.getTaskInstanceId(), taskResultAlertInfo.getTitle(), masterAddress); - } else { - log.warn( - "Failed to report task result alert to master. TaskInstanceId: {}, Title: '{}', MasterAddress: {}, Reason: {}", - taskResultAlertInfo.getTaskInstanceId(), taskResultAlertInfo.getTitle(), masterAddress, - taskResultAlertResponse != null ? taskResultAlertResponse.getMessage() : "response is null"); - } - } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java deleted file mode 100644 index 7915e64836bb..000000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertResponse.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.extract.master.transportor; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class TaskResultAlertResponse { - - private boolean success; - - private String message; - - public static TaskResultAlertResponse success() { - return new TaskResultAlertResponse(true, null); - } - - public static TaskResultAlertResponse failed(String message) { - return new TaskResultAlertResponse(false, message); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java deleted file mode 100644 index 1b41a2aa9f1f..000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskResultAlertServiceImpl.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.rpc; - -import org.apache.dolphinscheduler.common.enums.AlertType; -import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.entity.Alert; -import org.apache.dolphinscheduler.extract.master.ITaskResultAlertService; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskResultAlertResponse; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class TaskResultAlertServiceImpl implements ITaskResultAlertService { - - @Autowired - private AlertDao alertDao; - - @Override - public TaskResultAlertResponse reportTaskResultAlertToMaster(TaskResultAlertRequest taskResultAlertRequest) { - log.info("Received TaskResultAlertRequest request{}", taskResultAlertRequest); - - try { - Alert alert = new Alert(); - alert.setTitle(taskResultAlertRequest.getTitle()); - alert.setContent(taskResultAlertRequest.getContent()); - alert.setWarningType(WarningType.NONE); - alert.setCreateTime(new Date()); - alert.setUpdateTime(new Date()); - alert.setAlertGroupId(taskResultAlertRequest.getAlertGroupId()); - alert.setWorkflowDefinitionCode(taskResultAlertRequest.getWorkflowDefinitionCode()); - alert.setWorkflowInstanceId(taskResultAlertRequest.getWorkflowInstanceId()); - Map info = new HashMap<>(); - info.put("taskInstanceId", taskResultAlertRequest.getTaskInstanceId()); - alert.setInfo(info); - - alert.setAlertType(AlertType.TASK_RESULT); - alertDao.addAlert(alert); - - log.info("add alert success"); - return TaskResultAlertResponse.success(); - } catch (Exception ex) { - log.error("add alert failed:{} ", ex.getMessage()); - return TaskResultAlertResponse.failed(ex.getMessage()); - } - } -} diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java index 905d0690b2f6..1ced554bc7a8 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; @@ -184,10 +183,6 @@ private void doFireTaskExecutorEventBus(final ITaskExecutor taskExecutor) { taskExecutorLifecycleEventListener.onTaskExecutorFinalizeLifecycleEvent( ((TaskExecutorFinalizeLifecycleEvent) taskExecutorLifecycleEvent)); break; - case RESULT_ALERT: - taskExecutorLifecycleEventListener.onTaskExecutorResultAlertLifecycleEvent( - ((TaskExecutorResultAlertLifecycleEvent) taskExecutorLifecycleEvent)); - break; default: throw new IllegalArgumentException( "Unsupported TaskExecutorLifecycleEvent: " + taskExecutorLifecycleEvent); diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java deleted file mode 100644 index e9a2ad158ef0..000000000000 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorResultAlertLifecycleEvent.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.task.executor.events; - -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; -import org.apache.dolphinscheduler.task.executor.ITaskExecutor; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.ToString; -import lombok.experimental.SuperBuilder; - -@Data -@ToString(callSuper = true) -@SuperBuilder -@EqualsAndHashCode(callSuper = true) -@NoArgsConstructor -public class TaskExecutorResultAlertLifecycleEvent extends AbstractTaskExecutorLifecycleEvent - implements - IReportableTaskExecutorLifecycleEvent { - - private int workflowInstanceId; - - private String taskInstanceHost; - - private Long latestReportTime; - - private TaskResultAlertInfo taskResultAlertInfo; - - public static TaskExecutorResultAlertLifecycleEvent of(final ITaskExecutor taskExecutor, - TaskResultAlertInfo taskResultAlertInfo) { - final TaskExecutionContext taskExecutionContext = taskExecutor.getTaskExecutionContext(); - return TaskExecutorResultAlertLifecycleEvent.builder() - .workflowInstanceId(taskExecutionContext.getWorkflowInstanceId()) - .taskInstanceId(taskExecutionContext.getTaskInstanceId()) - .taskInstanceHost(taskExecutionContext.getHost()) - .taskResultAlertInfo(taskResultAlertInfo) - .type(TaskExecutorLifecycleEventType.RESULT_ALERT) - .build(); - } - -} diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java index 0755a87288bb..7f0a5fa2070b 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java index 5d2df2846967..547f6153e4f7 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java index f3a5095243d3..f5118f13e958 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.plugin.task.api; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; public interface TaskCallBack { @@ -28,5 +28,5 @@ public interface TaskCallBack { @Deprecated void updateTaskInstanceInfo(int taskInstanceId); - void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo); + void sendAlert(int groupId, String title, String content, AlertType alertType); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java index 2e768b0cf60f..503762dc6554 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java @@ -17,21 +17,21 @@ package org.apache.dolphinscheduler.plugin.task.api.model; +import org.apache.dolphinscheduler.common.enums.AlertType; + +import lombok.AllArgsConstructor; import lombok.Data; @Data +@AllArgsConstructor public class TaskResultAlertInfo { - private String title; - - private String content; - private Integer alertGroupId; - private Long workflowDefinitionCode; + private String title; - private Integer workflowInstanceId; + private String content; - private int taskInstanceId; + private AlertType alertType; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java index 48264c076ae4..4bd76c024106 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; @@ -37,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -81,7 +81,7 @@ public void updateTaskInstanceInfo(int taskInstanceId) { } @Override - public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + public void sendAlert(int groupId, String title, String content, AlertType alertType) { } }; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java index a24f5b97e790..6997efc43217 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java @@ -22,12 +22,12 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.mockito.ArgumentMatchers.any; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.commons.io.IOUtils; @@ -89,7 +89,7 @@ public void updateTaskInstanceInfo(int taskInstanceId) { } @Override - public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + public void sendAlert(int groupId, String title, String content, AlertType alertType) { } }; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java index 2a9a920fea7b..b511461a29cd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java @@ -22,12 +22,12 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.mockito.ArgumentMatchers.any; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.commons.io.IOUtils; @@ -111,7 +111,7 @@ public void updateTaskInstanceInfo(int taskInstanceId) { } @Override - public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + public void sendAlert(int groupId, String title, String content, AlertType alertType) { } }; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index c58695c63710..fd77d6503de8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.sql; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; @@ -154,13 +155,14 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { // execute sql task executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds); - setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); - if (this.getNeedAlert()) { - // callback to report taskResult alert - taskCallBack.reportTaskResultAlertInfo(this.getTaskResultAlertInfo()); + log.info("Begin to send sql result alert"); + taskCallBack.sendAlert(taskResultAlertInfo.getAlertGroupId(), taskResultAlertInfo.getTitle(), + taskResultAlertInfo.getContent(), taskResultAlertInfo.getAlertType()); } + setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); + } catch (Exception e) { if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) { log.info("sql task has been killed"); @@ -288,10 +290,29 @@ private String resultProcess(ResultSet resultSet) throws Exception { : JSONUtils.toJsonString(resultJSONArray); if (Boolean.TRUE.equals(sqlParameters.getSendAlert())) { - sendTaskResultAlert(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) - ? sqlParameters.getTitle() - : taskExecutionContext.getTaskName() + " query result sets", result); + int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() + : TaskConstants.DEFAULT_DISPLAY_ROWS; + String alertContent; + if (resultJSONArray.size() > displayRows) { + ArrayNode truncatedArray = JSONUtils.createArrayNode(); + for (int i = 0; i < Math.min(displayRows, resultJSONArray.size()); i++) { + truncatedArray.add(resultJSONArray.get(i)); + } + alertContent = JSONUtils.toJsonString(truncatedArray); + log.debug("Alert content truncated to {} rows", displayRows); + } else { + alertContent = result; + } + + setNeedAlert(true); + TaskResultAlertInfo taskResultAlertInfo = new TaskResultAlertInfo(sqlParameters.getGroupId(), + StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() + : taskExecutionContext.getTaskName() + " query result sets", + alertContent, + AlertType.TASK_RESULT); + setTaskResultAlertInfo(taskResultAlertInfo); } + log.debug("execute sql result : {}", result); return result; } @@ -316,24 +337,6 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { return resultJSONArray; } - /** - * send alert - * - * @param title title - * @param content content - */ - private void sendTaskResultAlert(int groupId, String title, String content) { - setNeedAlert(Boolean.TRUE); - TaskResultAlertInfo taskAlertInfo = new TaskResultAlertInfo(); - taskAlertInfo.setAlertGroupId(groupId); - taskAlertInfo.setContent(content); - taskAlertInfo.setTitle(title); - taskAlertInfo.setWorkflowDefinitionCode(this.taskExecutionContext.getWorkflowDefinitionCode()); - taskAlertInfo.setWorkflowInstanceId(this.taskExecutionContext.getWorkflowInstanceId()); - taskAlertInfo.setTaskInstanceId(this.taskExecutionContext.getTaskInstanceId()); - setTaskResultAlertInfo(taskAlertInfo); - } - private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception { try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) { log.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index e610274e932b..883bbab1e22e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; @@ -37,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.zeppelin.client.NoteResult; @@ -89,7 +89,7 @@ public void updateTaskInstanceInfo(int taskInstanceId) { } @Override - public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { + public void sendAlert(int groupId, String title, String content, AlertType alertType) { } }; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java similarity index 63% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java index 54ea1e52d9d1..72c8136139ea 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskResultAlertRequest.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java @@ -15,28 +15,12 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.master.transportor; +package org.apache.dolphinscheduler.server.worker.alert; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.common.enums.AlertType; -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class TaskResultAlertRequest { +public interface AlertService { - private String title; + void sentAlert(int groupId, String title, String content, AlertType alertType); - private String content; - - private Integer alertGroupId; - - private Long workflowDefinitionCode; - - private Integer workflowInstanceId; - - private int taskInstanceId; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java new file mode 100644 index 000000000000..29e281d9abbc --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.alert; + +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.extract.alert.IAlertOperator; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.base.utils.Host; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.Optional; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AlertServiceImpl implements AlertService { + + @Autowired + private RegistryClient registryClient; + + @Override + public void sentAlert(int groupId, String title, String content, AlertType alertType) { + log.debug("Attempting to send alert - groupId: {}, title: {}", groupId, title); + + Optional alertServerAddressOptional = getAlertServerAddress(); + if (!alertServerAddressOptional.isPresent()) { + log.error("Failed to send alert: alert server not exist."); + throw new TaskException("Failed to send alert: alert server not exist"); + } + + Host alertServerAddress = alertServerAddressOptional.get(); + + AlertSendRequest alertSendRequest = new AlertSendRequest(groupId, title, content, alertType); + AlertSendResponse alertSendResponse; + + try { + alertSendResponse = Clients + .withService(IAlertOperator.class) + .withHost(alertServerAddress.getAddress()) + .sendAlert(alertSendRequest); + log.info("Alert sent successfully to {} - groupId: {}, title: '{}', response: {}", + alertServerAddress.getAddress(), groupId, title, alertSendResponse); + + if (!alertSendResponse.isSuccess()) { + log.error("Failed to send alert: alertSendResponse is fail"); + throw new TaskException("Failed to send alert: alertSendResponse is fail"); + } else { + log.info("Success to send alert"); + } + } catch (Exception e) { + String errorMsg = String.format("Exception occurred while sending alert to %s - groupId: %d, title: '%s'", + alertServerAddress.getAddress(), groupId, title); + log.error(errorMsg, e); + throw new TaskException("Failed to send alert due to exception: " + e.getMessage(), e); + } + } + + public Optional getAlertServerAddress() { + List serverList = registryClient.getServerList(RegistryNodeType.ALERT_SERVER); + if (CollectionUtils.isEmpty(serverList)) { + return Optional.empty(); + } + Server server = serverList.get(0); + return Optional.of(new Host(server.getHost(), server.getPort())); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java index f25c63a75e2b..9a8a03e64722 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.executor; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; @@ -25,8 +26,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogMarkers; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; +import org.apache.dolphinscheduler.server.worker.alert.AlertService; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils; import org.apache.dolphinscheduler.server.worker.utils.TenantUtils; @@ -34,7 +35,6 @@ import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.TaskExecutorState; import org.apache.dolphinscheduler.task.executor.TaskExecutorStateMappings; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorResultAlertLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; import java.util.ArrayList; @@ -54,12 +54,16 @@ public class PhysicalTaskExecutor extends AbstractTaskExecutor { private final PhysicalTaskPluginFactory physicalTaskPluginFactory; - public PhysicalTaskExecutor(final PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder) { + private final AlertService alertService; + + public PhysicalTaskExecutor(final PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder, + AlertService alertService) { super(physicalTaskExecutorBuilder.getTaskExecutionContext(), physicalTaskExecutorBuilder.getTaskExecutorEventBus()); this.workerConfig = physicalTaskExecutorBuilder.getWorkerConfig(); this.storageOperator = physicalTaskExecutorBuilder.getStorageOperator(); this.physicalTaskPluginFactory = physicalTaskExecutorBuilder.getPhysicalTaskPluginFactory(); + this.alertService = alertService; } @Override @@ -90,9 +94,8 @@ public void updateTaskInstanceInfo(final int taskInstanceId) { } @Override - public void reportTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { - taskExecutorEventBus - .publish(TaskExecutorResultAlertLifecycleEvent.of(taskExecutor, taskResultAlertInfo)); + public void sendAlert(int groupId, String title, String content, AlertType alertType) { + alertService.sentAlert(groupId, title, content, alertType); } }); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java index 7330bddcc270..9554459f977e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.worker.alert.AlertService; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.ITaskExecutorFactory; @@ -38,12 +39,16 @@ public class PhysicalTaskExecutorFactory implements ITaskExecutorFactory { private final StorageOperator storageOperator; + private final AlertService alertService; + public PhysicalTaskExecutorFactory(final WorkerConfig workerConfig, final PhysicalTaskPluginFactory physicalTaskPluginFactory, - final StorageOperator storageOperator) { + final StorageOperator storageOperator, + final AlertService alertService) { this.workerConfig = workerConfig; this.physicalTaskPluginFactory = physicalTaskPluginFactory; this.storageOperator = storageOperator; + this.alertService = alertService; } @Override @@ -56,7 +61,7 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution .storageOperator(storageOperator) .physicalTaskPluginFactory(physicalTaskPluginFactory) .build(); - return new PhysicalTaskExecutor(physicalTaskExecutorBuilder); + return new PhysicalTaskExecutor(physicalTaskExecutorBuilder, alertService); } private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) { From 3a3e1dcf725229f28f8c67da2b79804945dcbea4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Mon, 2 Feb 2026 11:37:07 +0800 Subject: [PATCH 3/4] remove event --- .../task/executor/events/TaskExecutorLifecycleEventType.java | 2 -- .../listener/ITaskExecutorLifecycleEventListener.java | 1 - .../listener/TaskExecutorLifecycleEventListener.java | 5 ----- 3 files changed, 8 deletions(-) diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java index 88e973beb578..aa0aa2cd4e56 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorLifecycleEventType.java @@ -37,8 +37,6 @@ public enum TaskExecutorLifecycleEventType { FAILED, - RESULT_ALERT, - FINALIZE, ; diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java index 7f0a5fa2070b..07da199cb5e4 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/ITaskExecutorLifecycleEventListener.java @@ -50,5 +50,4 @@ public interface ITaskExecutorLifecycleEventListener { void onTaskExecutorFinalizeLifecycleEvent(final TaskExecutorFinalizeLifecycleEvent event); - void onTaskExecutorResultAlertLifecycleEvent(final TaskExecutorResultAlertLifecycleEvent event); } diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java index 547f6153e4f7..c17afaca636b 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/listener/TaskExecutorLifecycleEventListener.java @@ -114,11 +114,6 @@ public void onTaskExecutorFinalizeLifecycleEvent(final TaskExecutorFinalizeLifec executorContainer.finalize(taskExecutor); } - @Override - public void onTaskExecutorResultAlertLifecycleEvent(TaskExecutorResultAlertLifecycleEvent event) { - reportTaskExecutorLifecycleEventToMaster(event); - } - private void reportTaskExecutorLifecycleEventToMaster(IReportableTaskExecutorLifecycleEvent taskExecutorLifecycleEvent) { taskExecutorLifecycleEventReporter.reportTaskExecutorLifecycleEvent(taskExecutorLifecycleEvent); } From f618c410ceb7ac48e29a8a8498574f3f78550192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 27 Feb 2026 14:22:12 +0800 Subject: [PATCH 4/4] Add database migration to rename JSON key sendEmail to sendAlert --- .../mysql/dolphinscheduler_dml.sql | 22 +++++++++++++++++++ .../postgresql/dolphinscheduler_dml.sql | 14 ++++++++++++ .../plugin/task/api/AbstractTask.java | 21 ------------------ .../task/api/parameters/SqlParameters.java | 9 ++++---- .../plugin/task/sql/SqlTask.java | 20 +++++++++++++++-- 5 files changed, 58 insertions(+), 28 deletions(-) diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql index 4a14f326b985..b2933c89ff67 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql @@ -14,3 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +UPDATE t_ds_task_definition +SET task_params = JSON_REMOVE( + JSON_INSERT( + task_params, + '$.sendAlert', + JSON_EXTRACT(task_params, '$.sendEmail') + ), + '$.sendEmail' +) +WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL; + +UPDATE t_ds_task_definition_log +SET task_params = JSON_REMOVE( + JSON_INSERT( + task_params, + '$.sendAlert', + JSON_EXTRACT(task_params, '$.sendEmail') + ), + '$.sendEmail' +) +WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql index 4a14f326b985..10283a6d16f2 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql @@ -14,3 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +UPDATE t_ds_task_definition +SET task_params = ( + (task_params::jsonb - 'sendEmail') + || jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail') +)::text +WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail'); + +UPDATE t_ds_task_definition_log +SET task_params = ( + (task_params::jsonb - 'sendEmail') + || jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail') +)::text +WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail'); \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 249f9e69860c..d83f998121b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -19,7 +19,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import java.util.Map; @@ -56,10 +55,6 @@ public abstract class AbstractTask { */ protected volatile int exitStatusCode = -1; - protected boolean needAlert = false; - - protected TaskResultAlertInfo taskResultAlertInfo; - /** * constructor * @@ -109,22 +104,6 @@ public void setAppIds(String appIds) { this.appIds = appIds; } - public boolean getNeedAlert() { - return needAlert; - } - - public void setNeedAlert(boolean needAlert) { - this.needAlert = needAlert; - } - - public TaskResultAlertInfo getTaskResultAlertInfo() { - return taskResultAlertInfo; - } - - public void setTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) { - this.taskResultAlertInfo = taskResultAlertInfo; - } - /** * get task parameters * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index d6ef15973152..6810345c25ec 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -65,11 +65,6 @@ public class SqlParameters extends AbstractParameters { */ private int sqlType; - /** - * send alert - */ - private Boolean sendAlert; - /** * display rows */ @@ -96,6 +91,10 @@ public class SqlParameters extends AbstractParameters { */ private List postStatements; + /** + * send alert + */ + private Boolean sendAlert; /** * groupId */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 4416671f3464..f19e2a583084 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -92,6 +92,10 @@ public class SqlTask extends AbstractTask { private Connection sessionConnection; private Statement sessionStatement; + private boolean needAlert = false; + + private TaskResultAlertInfo taskResultAlertInfo; + public SqlTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskExecutionContext = taskRequest; @@ -305,12 +309,12 @@ private String resultProcess(ResultSet resultSet) throws Exception { } setNeedAlert(true); - TaskResultAlertInfo taskResultAlertInfo = new TaskResultAlertInfo(sqlParameters.getGroupId(), + TaskResultAlertInfo resultAlertInfo = new TaskResultAlertInfo(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", alertContent, AlertType.TASK_RESULT); - setTaskResultAlertInfo(taskResultAlertInfo); + setTaskResultAlertInfo(resultAlertInfo); } log.debug("execute sql result : {}", result); @@ -472,4 +476,16 @@ private String replaceOriginalValue(String content, String rgex, Map