Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum AlertType {

/**
* 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
* 5 task failure, 6 task success, 7 task timeout, 8 close alert
* 5 task failure, 6 task success, 7 task timeout, 8 task result
*/
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),
Expand All @@ -39,6 +39,7 @@ public enum AlertType {
TASK_FAILURE(5, "task failure"),
TASK_SUCCESS(6, "task success"),
TASK_TIMEOUT(7, "task timeout"),
TASK_RESULT(8, "task result");
;

AlertType(int code, String descp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

UPDATE t_ds_task_definition
SET task_params = JSON_REMOVE(
JSON_INSERT(
task_params,
'$.sendAlert',
JSON_EXTRACT(task_params, '$.sendEmail')
),
'$.sendEmail'
)
WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL;

UPDATE t_ds_task_definition_log
SET task_params = JSON_REMOVE(
JSON_INSERT(
task_params,
'$.sendAlert',
JSON_EXTRACT(task_params, '$.sendEmail')
),
'$.sendEmail'
)
WHERE task_params IS NOT NULL AND JSON_EXTRACT(task_params, '$.sendEmail') IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

UPDATE t_ds_task_definition
SET task_params = (
(task_params::jsonb - 'sendEmail')
|| jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail')
)::text
WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail');

UPDATE t_ds_task_definition_log
SET task_params = (
(task_params::jsonb - 'sendEmail')
|| jsonb_build_object('sendAlert', task_params::jsonb->'sendEmail')
)::text
WHERE jsonb_path_exists(task_params::jsonb, '$.sendEmail');
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testDeleteByCode() {
@Test
public void testNullPropertyValueOfLocalParams() {
String definitionJson =
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class);

Map<String, String> taskParamsMap = definition.getTaskParamMap();
Expand All @@ -157,7 +157,7 @@ public void testNullPropertyValueOfLocalParams() {
@Test
public void testNullLocalParamsOfTaskParams() {
String definitionJson =
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class);

Assertions.assertNull(definition.getTaskParamMap(), "Serialize the task definition success");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.extract.alert.request;

import org.apache.dolphinscheduler.common.enums.AlertType;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -32,6 +34,6 @@ public class AlertSendRequest {

private String content;

private int warnType;
private AlertType alertType;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;

import java.util.Map;
Expand Down Expand Up @@ -56,10 +55,6 @@ public abstract class AbstractTask {
*/
protected volatile int exitStatusCode = -1;

protected boolean needAlert = false;

protected TaskAlertInfo taskAlertInfo;

/**
* constructor
*
Expand Down Expand Up @@ -109,22 +104,6 @@ public void setAppIds(String appIds) {
this.appIds = appIds;
}

public boolean getNeedAlert() {
return needAlert;
}

public void setNeedAlert(boolean needAlert) {
this.needAlert = needAlert;
}

public TaskAlertInfo getTaskAlertInfo() {
return taskAlertInfo;
}

public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) {
this.taskAlertInfo = taskAlertInfo;
}

/**
* get task parameters
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.api;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;

public interface TaskCallBack {
Expand All @@ -26,4 +27,6 @@ public interface TaskCallBack {
// todo:The pid should put into runtime context
@Deprecated
void updateTaskInstanceInfo(int taskInstanceId);

void sendAlert(int groupId, String title, String content, AlertType alertType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add this at TaskCallBack, you can see Deprecated at updateTaskInstanceInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add this at TaskCallBack, you can see Deprecated at updateTaskInstanceInfo.

Sorry, but given the current design, TaskCallBack is still the most straightforward solution for cross-module calls.
I couldn't find a clean way for task-sql module to reach the EventBus channel; attempting to do so would require massive changes that seem disproportionate.

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,21 @@

package org.apache.dolphinscheduler.plugin.task.api.model;

public class TaskAlertInfo {
import org.apache.dolphinscheduler.common.enums.AlertType;

private String title;
import lombok.AllArgsConstructor;
import lombok.Data;

private String content;
@Data
@AllArgsConstructor
public class TaskResultAlertInfo {

private Integer alertGroupId;

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getContent() {
return content;
}
private String title;

public void setContent(String content) {
this.content = content;
}
private String content;

public Integer getAlertGroupId() {
return alertGroupId;
}
private AlertType alertType;

public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public class SqlParameters extends AbstractParameters {
*/
private int sqlType;

/**
* send email
*/
private Boolean sendEmail;

/**
* display rows
*/
Expand All @@ -96,6 +91,10 @@ public class SqlParameters extends AbstractParameters {
*/
private List<String> postStatements;

/**
* send alert
*/
private Boolean sendAlert;
/**
* groupId
*/
Expand Down Expand Up @@ -147,12 +146,12 @@ public void setSqlType(int sqlType) {
this.sqlType = sqlType;
}

public Boolean getSendEmail() {
return sendEmail;
public Boolean getSendAlert() {
return sendAlert;
}

public void setSendEmail(Boolean sendEmail) {
this.sendEmail = sendEmail;
public void setSendAlert(Boolean sendAlert) {
this.sendAlert = sendAlert;
}

public int getDisplayRows() {
Expand Down Expand Up @@ -273,7 +272,7 @@ public String toString() {
+ ", datasource=" + datasource
+ ", sql='" + sql + '\''
+ ", sqlType=" + sqlType
+ ", sendEmail=" + sendEmail
+ ", sendAlert=" + sendAlert
+ ", displayRows=" + displayRows
+ ", limit=" + limit
+ ", showType='" + showType + '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SqlParametersTest {
private final String sql = "select * from t_ds_user";
private final int datasource = 1;
private final int sqlType = 0;
private final Boolean sendEmail = true;
private final Boolean sendAlert = true;
private final int displayRows = 10;
private final String showType = "TABLE";
private final String title = "sql test";
Expand All @@ -58,7 +58,7 @@ public void testSqlParameters() {
sqlParameters.setSql(sql);
sqlParameters.setDatasource(datasource);
sqlParameters.setSqlType(sqlType);
sqlParameters.setSendEmail(sendEmail);
sqlParameters.setSendAlert(sendAlert);
sqlParameters.setDisplayRows(displayRows);
sqlParameters.setShowType(showType);
sqlParameters.setTitle(title);
Expand All @@ -68,7 +68,7 @@ public void testSqlParameters() {
Assertions.assertEquals(sql, sqlParameters.getSql());
Assertions.assertEquals(datasource, sqlParameters.getDatasource());
Assertions.assertEquals(sqlType, sqlParameters.getSqlType());
Assertions.assertEquals(sendEmail, sqlParameters.getSendEmail());
Assertions.assertEquals(sendAlert, sqlParameters.getSendAlert());
Assertions.assertEquals(displayRows, sqlParameters.getDisplayRows());
Assertions.assertEquals(showType, sqlParameters.getShowType());
Assertions.assertEquals(title, sqlParameters.getTitle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
Expand Down Expand Up @@ -78,6 +79,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
Expand Down Expand Up @@ -86,6 +87,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
Expand Down Expand Up @@ -108,6 +109,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Loading