diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java index f7a5ba0d3906..096f4e3c7918 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java @@ -29,7 +29,7 @@ public enum AlertType { /** * 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning, - * 5 task failure, 6 task success, 7 task timeout, 8 close alert + * 5 task failure, 6 task success, 7 task timeout, 8 task result */ WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"), WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"), @@ -39,6 +39,7 @@ public enum AlertType { TASK_FAILURE(5, "task failure"), TASK_SUCCESS(6, "task success"), TASK_TIMEOUT(7, "task timeout"), + TASK_RESULT(8, "task result"); ; AlertType(int code, String descp) { diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql index 4a14f326b985..b2933c89ff67 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql @@ -14,3 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +UPDATE t_ds_task_definition +SET task_params = JSON_REMOVE( + JSON_INSERT( + task_params, + '$.sendAlert', + JSON_EXTRACT(task_params, '$.sendEmail') + ), + '$.sendEmail' +) +WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL; + +UPDATE t_ds_task_definition_log +SET task_params = JSON_REMOVE( + JSON_INSERT( + task_params, + '$.sendAlert', + JSON_EXTRACT(task_params, '$.sendEmail') + ), + '$.sendEmail' +) +WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql index 4a14f326b985..10283a6d16f2 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql @@ -14,3 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +UPDATE t_ds_task_definition +SET task_params = ( + (task_params::jsonb - 'sendEmail') + || jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail') +)::text +WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail'); + +UPDATE t_ds_task_definition_log +SET task_params = ( + (task_params::jsonb - 'sendEmail') + || jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail') +)::text +WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail'); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java index da3cc1d27481..83fc14e5fec7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java @@ -140,7 +140,7 @@ public void testDeleteByCode() { @Test public void testNullPropertyValueOfLocalParams() { String definitionJson = - "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; + "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class); Map taskParamsMap = definition.getTaskParamMap(); @@ -157,7 +157,7 @@ public void testNullPropertyValueOfLocalParams() { @Test public void testNullLocalParamsOfTaskParams() { String definitionJson = - "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; + "{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}"; TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class); Assertions.assertNull(definition.getTaskParamMap(), "Serialize the task definition success"); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java index dfa6515ddea2..82996bfc6c2f 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-alert/src/main/java/org/apache/dolphinscheduler/extract/alert/request/AlertSendRequest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.extract.alert.request; +import org.apache.dolphinscheduler.common.enums.AlertType; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -32,6 +34,6 @@ public class AlertSendRequest { private String content; - private int warnType; + private AlertType alertType; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 006f40aa1416..d83f998121b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -19,7 +19,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import java.util.Map; @@ -56,10 +55,6 @@ public abstract class AbstractTask { */ protected volatile int exitStatusCode = -1; - protected boolean needAlert = false; - - protected TaskAlertInfo taskAlertInfo; - /** * constructor * @@ -109,22 +104,6 @@ public void setAppIds(String appIds) { this.appIds = appIds; } - public boolean getNeedAlert() { - return needAlert; - } - - public void setNeedAlert(boolean needAlert) { - this.needAlert = needAlert; - } - - public TaskAlertInfo getTaskAlertInfo() { - return taskAlertInfo; - } - - public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) { - this.taskAlertInfo = taskAlertInfo; - } - /** * get task parameters * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java index 010e328c35d9..f5118f13e958 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.api; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; public interface TaskCallBack { @@ -26,4 +27,6 @@ public interface TaskCallBack { // todo:The pid should put into runtime context @Deprecated void updateTaskInstanceInfo(int taskInstanceId); + + void sendAlert(int groupId, String title, String content, AlertType alertType); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java similarity index 66% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java index dc83af276134..503762dc6554 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskAlertInfo.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/TaskResultAlertInfo.java @@ -17,35 +17,21 @@ package org.apache.dolphinscheduler.plugin.task.api.model; -public class TaskAlertInfo { +import org.apache.dolphinscheduler.common.enums.AlertType; - private String title; +import lombok.AllArgsConstructor; +import lombok.Data; - private String content; +@Data +@AllArgsConstructor +public class TaskResultAlertInfo { private Integer alertGroupId; - public String getTitle() { - return title; - } - - public void setTitle(String title) { - this.title = title; - } - - public String getContent() { - return content; - } + private String title; - public void setContent(String content) { - this.content = content; - } + private String content; - public Integer getAlertGroupId() { - return alertGroupId; - } + private AlertType alertType; - public void setAlertGroupId(Integer alertGroupId) { - this.alertGroupId = alertGroupId; - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index 8864943bac43..6810345c25ec 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -65,11 +65,6 @@ public class SqlParameters extends AbstractParameters { */ private int sqlType; - /** - * send email - */ - private Boolean sendEmail; - /** * display rows */ @@ -96,6 +91,10 @@ public class SqlParameters extends AbstractParameters { */ private List postStatements; + /** + * send alert + */ + private Boolean sendAlert; /** * groupId */ @@ -147,12 +146,12 @@ public void setSqlType(int sqlType) { this.sqlType = sqlType; } - public Boolean getSendEmail() { - return sendEmail; + public Boolean getSendAlert() { + return sendAlert; } - public void setSendEmail(Boolean sendEmail) { - this.sendEmail = sendEmail; + public void setSendAlert(Boolean sendAlert) { + this.sendAlert = sendAlert; } public int getDisplayRows() { @@ -273,7 +272,7 @@ public String toString() { + ", datasource=" + datasource + ", sql='" + sql + '\'' + ", sqlType=" + sqlType - + ", sendEmail=" + sendEmail + + ", sendAlert=" + sendAlert + ", displayRows=" + displayRows + ", limit=" + limit + ", showType='" + showType + '\'' diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java index 8f1ee7656000..bfa8ba6416cf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java @@ -35,7 +35,7 @@ public class SqlParametersTest { private final String sql = "select * from t_ds_user"; private final int datasource = 1; private final int sqlType = 0; - private final Boolean sendEmail = true; + private final Boolean sendAlert = true; private final int displayRows = 10; private final String showType = "TABLE"; private final String title = "sql test"; @@ -58,7 +58,7 @@ public void testSqlParameters() { sqlParameters.setSql(sql); sqlParameters.setDatasource(datasource); sqlParameters.setSqlType(sqlType); - sqlParameters.setSendEmail(sendEmail); + sqlParameters.setSendAlert(sendAlert); sqlParameters.setDisplayRows(displayRows); sqlParameters.setShowType(showType); sqlParameters.setTitle(title); @@ -68,7 +68,7 @@ public void testSqlParameters() { Assertions.assertEquals(sql, sqlParameters.getSql()); Assertions.assertEquals(datasource, sqlParameters.getDatasource()); Assertions.assertEquals(sqlType, sqlParameters.getSqlType()); - Assertions.assertEquals(sendEmail, sqlParameters.getSendEmail()); + Assertions.assertEquals(sendAlert, sqlParameters.getSendAlert()); Assertions.assertEquals(displayRows, sqlParameters.getDisplayRows()); Assertions.assertEquals(showType, sqlParameters.getShowType()); Assertions.assertEquals(title, sqlParameters.getTitle()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java index 3b1dd82c9283..4bd76c024106 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; @@ -78,6 +79,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void sendAlert(int groupId, String title, String content, AlertType alertType) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java index b1fab4d436d1..6997efc43217 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java @@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.mockito.ArgumentMatchers.any; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -86,6 +87,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void sendAlert(int groupId, String title, String content, AlertType alertType) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java index fc50f7ed97b8..b511461a29cd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java @@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.mockito.ArgumentMatchers.any; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -108,6 +109,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl public void updateTaskInstanceInfo(int taskInstanceId) { } + + @Override + public void sendAlert(int groupId, String title, String content, AlertType alertType) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 33b85e1a66b8..f19e2a583084 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.sql; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; @@ -32,7 +33,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -91,6 +92,10 @@ public class SqlTask extends AbstractTask { private Connection sessionConnection; private Statement sessionStatement; + private boolean needAlert = false; + + private TaskResultAlertInfo taskResultAlertInfo; + public SqlTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskExecutionContext = taskRequest; @@ -154,6 +159,12 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { // execute sql task executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds); + if (this.getNeedAlert()) { + log.info("Begin to send sql result alert"); + taskCallBack.sendAlert(taskResultAlertInfo.getAlertGroupId(), taskResultAlertInfo.getTitle(), + taskResultAlertInfo.getContent(), taskResultAlertInfo.getAlertType()); + } + setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); } catch (Exception e) { @@ -282,11 +293,30 @@ private String resultProcess(ResultSet resultSet) throws Exception { String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet)) : JSONUtils.toJsonString(resultJSONArray); - if (Boolean.TRUE.equals(sqlParameters.getSendEmail())) { - sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) - ? sqlParameters.getTitle() - : taskExecutionContext.getTaskName() + " query result sets", result); + if (Boolean.TRUE.equals(sqlParameters.getSendAlert())) { + int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() + : TaskConstants.DEFAULT_DISPLAY_ROWS; + String alertContent; + if (resultJSONArray.size() > displayRows) { + ArrayNode truncatedArray = JSONUtils.createArrayNode(); + for (int i = 0; i < Math.min(displayRows, resultJSONArray.size()); i++) { + truncatedArray.add(resultJSONArray.get(i)); + } + alertContent = JSONUtils.toJsonString(truncatedArray); + log.debug("Alert content truncated to {} rows", displayRows); + } else { + alertContent = result; + } + + setNeedAlert(true); + TaskResultAlertInfo resultAlertInfo = new TaskResultAlertInfo(sqlParameters.getGroupId(), + StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() + : taskExecutionContext.getTaskName() + " query result sets", + alertContent, + AlertType.TASK_RESULT); + setTaskResultAlertInfo(resultAlertInfo); } + log.debug("execute sql result : {}", result); return result; } @@ -311,21 +341,6 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { return resultJSONArray; } - /** - * send alert as an attachment - * - * @param title title - * @param content content - */ - private void sendAttachment(int groupId, String title, String content) { - setNeedAlert(Boolean.TRUE); - TaskAlertInfo taskAlertInfo = new TaskAlertInfo(); - taskAlertInfo.setAlertGroupId(groupId); - taskAlertInfo.setContent(content); - taskAlertInfo.setTitle(title); - setTaskAlertInfo(taskAlertInfo); - } - private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception { try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) { log.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql()); @@ -461,4 +476,16 @@ private String replaceOriginalValue(String content, String rgex, Map (model.sqlType === '0' ? 6 : 0)) - const emailSpan = computed(() => - model.sqlType === '0' && model.sendEmail ? 24 : 0 + const alertSpan = computed(() => + model.sqlType === '0' && model.sendAlert ? 24 : 0 ) const groups = ref([]) const groupsLoading = ref(false) @@ -69,7 +69,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { }, { type: 'switch', - field: 'sendEmail', + field: 'sendAlert', span: querySpan, name: t('project.node.send_alarm') }, @@ -109,12 +109,12 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.title_tips') }, - span: emailSpan, + span: alertSpan, validate: { trigger: ['input', 'blur'], required: true, validator(unuse, value) { - if (model.sendEmail && !value) + if (model.sendAlert && !value) return new Error(t('project.node.title_tips')) } } @@ -124,7 +124,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { field: 'groupId', name: t('project.node.alarm_group'), options: groups, - span: emailSpan, + span: alertSpan, props: { loading: groupsLoading, placeholder: t('project.node.alarm_group_tips') @@ -133,7 +133,7 @@ export function useSqlType(model: { [field: string]: any }): IJsonItem[] { trigger: ['input', 'blur'], required: true, validator(unuse, value) { - if (model.sendEmail && !value) + if (model.sendAlert && !value) return new Error(t('project.node.alarm_group_tips')) } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 8fc67906775c..cd7fec72a85b 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -208,9 +208,9 @@ export function formatParams(data: INodeData): { taskParams.sqlType = data.sqlType taskParams.preStatements = data.preStatements taskParams.postStatements = data.postStatements - taskParams.sendEmail = data.sendEmail + taskParams.sendAlert = data.sendAlert taskParams.displayRows = data.displayRows - if (data.sqlType === '0' && data.sendEmail) { + if (data.sqlType === '0' && data.sendAlert) { taskParams.title = data.title taskParams.groupId = data.groupId } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 76dcde23ee2d..570073cebe35 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -305,7 +305,7 @@ interface ITaskParams { datasource?: string sql?: string sqlType?: string - sendEmail?: boolean + sendAlert?: boolean displayRows?: number title?: string groupId?: string diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java new file mode 100644 index 000000000000..72c8136139ea --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.alert; + +import org.apache.dolphinscheduler.common.enums.AlertType; + +public interface AlertService { + + void sentAlert(int groupId, String title, String content, AlertType alertType); + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java new file mode 100644 index 000000000000..29e281d9abbc --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/alert/AlertServiceImpl.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.alert; + +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.extract.alert.IAlertOperator; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.base.utils.Host; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.Optional; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AlertServiceImpl implements AlertService { + + @Autowired + private RegistryClient registryClient; + + @Override + public void sentAlert(int groupId, String title, String content, AlertType alertType) { + log.debug("Attempting to send alert - groupId: {}, title: {}", groupId, title); + + Optional alertServerAddressOptional = getAlertServerAddress(); + if (!alertServerAddressOptional.isPresent()) { + log.error("Failed to send alert: alert server not exist."); + throw new TaskException("Failed to send alert: alert server not exist"); + } + + Host alertServerAddress = alertServerAddressOptional.get(); + + AlertSendRequest alertSendRequest = new AlertSendRequest(groupId, title, content, alertType); + AlertSendResponse alertSendResponse; + + try { + alertSendResponse = Clients + .withService(IAlertOperator.class) + .withHost(alertServerAddress.getAddress()) + .sendAlert(alertSendRequest); + log.info("Alert sent successfully to {} - groupId: {}, title: '{}', response: {}", + alertServerAddress.getAddress(), groupId, title, alertSendResponse); + + if (!alertSendResponse.isSuccess()) { + log.error("Failed to send alert: alertSendResponse is fail"); + throw new TaskException("Failed to send alert: alertSendResponse is fail"); + } else { + log.info("Success to send alert"); + } + } catch (Exception e) { + String errorMsg = String.format("Exception occurred while sending alert to %s - groupId: %d, title: '%s'", + alertServerAddress.getAddress(), groupId, title); + log.error(errorMsg, e); + throw new TaskException("Failed to send alert due to exception: " + e.getMessage(), e); + } + } + + public Optional getAlertServerAddress() { + List serverList = registryClient.getServerList(RegistryNodeType.ALERT_SERVER); + if (CollectionUtils.isEmpty(serverList)) { + return Optional.empty(); + } + Server server = serverList.get(0); + return Optional.of(new Host(server.getHost(), server.getPort())); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java index 7118a099d5f9..9a8a03e64722 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.executor; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; @@ -26,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogMarkers; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; +import org.apache.dolphinscheduler.server.worker.alert.AlertService; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils; import org.apache.dolphinscheduler.server.worker.utils.TenantUtils; @@ -52,12 +54,16 @@ public class PhysicalTaskExecutor extends AbstractTaskExecutor { private final PhysicalTaskPluginFactory physicalTaskPluginFactory; - public PhysicalTaskExecutor(final PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder) { + private final AlertService alertService; + + public PhysicalTaskExecutor(final PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder, + AlertService alertService) { super(physicalTaskExecutorBuilder.getTaskExecutionContext(), physicalTaskExecutorBuilder.getTaskExecutorEventBus()); this.workerConfig = physicalTaskExecutorBuilder.getWorkerConfig(); this.storageOperator = physicalTaskExecutorBuilder.getStorageOperator(); this.physicalTaskPluginFactory = physicalTaskExecutorBuilder.getPhysicalTaskPluginFactory(); + this.alertService = alertService; } @Override @@ -86,6 +92,11 @@ public void updateRemoteApplicationInfo(final int taskInstanceId, final Applicat public void updateTaskInstanceInfo(final int taskInstanceId) { taskExecutorEventBus.publish(TaskExecutorRuntimeContextChangedLifecycleEvent.of(taskExecutor)); } + + @Override + public void sendAlert(int groupId, String title, String content, AlertType alertType) { + alertService.sentAlert(groupId, title, content, alertType); + } }); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java index 7330bddcc270..9554459f977e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.worker.alert.AlertService; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.ITaskExecutorFactory; @@ -38,12 +39,16 @@ public class PhysicalTaskExecutorFactory implements ITaskExecutorFactory { private final StorageOperator storageOperator; + private final AlertService alertService; + public PhysicalTaskExecutorFactory(final WorkerConfig workerConfig, final PhysicalTaskPluginFactory physicalTaskPluginFactory, - final StorageOperator storageOperator) { + final StorageOperator storageOperator, + final AlertService alertService) { this.workerConfig = workerConfig; this.physicalTaskPluginFactory = physicalTaskPluginFactory; this.storageOperator = storageOperator; + this.alertService = alertService; } @Override @@ -56,7 +61,7 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution .storageOperator(storageOperator) .physicalTaskPluginFactory(physicalTaskPluginFactory) .build(); - return new PhysicalTaskExecutor(physicalTaskExecutorBuilder); + return new PhysicalTaskExecutor(physicalTaskExecutorBuilder, alertService); } private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) {