dolphinscheduler-笔记2

springboot集成dolphinscheduler

说明

为了避免对DolphinScheduler产生过度依赖,实践中通常不会全面采用其内置的所有任务节点类型。相反,会选择性地利用DolphinScheduler的HTTP任务节点功能,以此作为工作流执行管理的桥梁,对接并驱动自有项目的业务流程。这种策略不仅确保了流程编排的灵活性与扩展性,还有效减少了对外部调度系统的深度绑定,从而在提升项目自洽能力的同时,保持了良好的系统间解耦。

简而言之,我们倾向于仅采纳DolphinScheduler中的HTTP任务节点,作为调度机制的一部分,来促进我们内部项目工作流的自动化执行。这样做既能享受DolphinScheduler带来的调度便利,又避免了全盘接受其所有组件所带来的潜在风险,实现了更为稳健、可控的项目管理方案。

代码实现

为了优化与DolphinScheduler的集成,以下是三个关键配置类的概述,它们旨在通过初始化接口实现项目及租户信息的同步通知。值得注意的是,为了确保数据一致性和高效通信,你的Spring Boot应用所使用的数据库应与DolphinScheduler共享同一数据源。这一策略不仅简化了数据管理,还促进了实时状态更新,增强了系统的整体协调性。

简而言之,我们精心设计了三组配置规则,允许我们的Spring Boot项目无缝对接DolphinScheduler平台。通过这些配置,项目和租户的动态变化能够及时反映到DolphinScheduler中,前提是两者共用一个数据库实例。这种架构决策不仅优化了资源分配,还促进了跨系统间的紧密协作,为后续的业务拓展奠定了坚实的基础。

package cn.com.lyb.data.dev.init;

import cn.com.lyb.common.security.annotation.InnerRequest;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.core.web.request.Response;
import cn.com.lyb.core.web.request.ResultWrap;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;

@Api(tags = "初始化dolphinscheduler数据库信息")
@RestController
@RequestMapping("/data-dev")
public class InitializePlugin {

    @Autowired
    private DataSource dataSource;

    @Autowired
    private DolphinschedulerConfig dolphinschedulerConfig;

    @GetMapping("/init")
    @ApiOperation("部署全新的环境可以用此接口,否则会报错")
    @InnerRequest
    public Response init(){
        Connection connection = null;
        try{
            connection = dataSource.getConnection();
            // 项目表(xgov如果集成项目的话,这个sql不能再执行,配置也需要改)
            String projectSql = "INSERT INTO `dolphinscheduler`.`t_ds_project` (`id`, `name`, `code`, `description`, `user_id`, `flag`, `create_time`, `update_time`) VALUES (1, 'lyb', '" + dolphinschedulerConfig.getProjectCode() + "', '', 1, 1, '2024-06-13 02:49:43', '2024-06-13 02:49:43');";
            String tokenSql = "INSERT INTO `dolphinscheduler`.`t_ds_access_token` (`id`, `user_id`, `token`, `expire_time`, `create_time`, `update_time`) VALUES (1, 1, '"+dolphinschedulerConfig.getDsdToken()+"', '2039-12-30 10:51:26', '2024-06-13 02:50:37', '2024-06-18 10:00:13');";
            String tenantSql = "INSERT INTO `dolphinscheduler`.`t_ds_tenant` (`id`, `tenant_code`, `description`, `queue_id`, `create_time`, `update_time`) VALUES (1, 'default', '', 1, '2024-06-13 02:50:20', '2024-06-13 02:50:20');";
            String userSql = "UPDATE `dolphinscheduler`.`t_ds_user` SET `user_name` = 'admin', `user_password` = '470b9934942620215ad1cb3ac2d48497', `user_type` = 0, `email` = 'xxx@qq.com', `phone` = '', `tenant_id` = 1, `create_time` = '2024-06-12 10:23:37', `update_time` = '2024-06-18 09:59:52', `queue` = '', `state` = 1, `time_zone` = 'Asia/Shanghai' WHERE `id` = 1;";
            List<String> sqlList = Arrays.asList(projectSql, tenantSql, tokenSql, userSql);
            connection.setAutoCommit(false);
            Statement statement = connection.createStatement();
            for (String sql : sqlList) {
                statement.addBatch(sql);
            }

            statement.executeBatch();
            connection.commit();
            statement.close();
        }catch (Exception e){
            throw new BizException("初始化报错");
        }finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return ResultWrap.ok();
    }
}

package cn.com.lyb.data.dev.web.config;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

@Configuration
public class RestTemplateConfig {

    @Value("${xgov.template.connectTimeout}")
    private int connectTimeout;

    @Value("${xgov.template.socketTimeout}")
    private int socketTimeout;

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate(httpRequestFactory());
    }

    @Bean
    public HttpComponentsClientHttpRequestFactory httpRequestFactory() {
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(200); // 最大连接数
        connectionManager.setDefaultMaxPerRoute(20); // 每个路由默认的最大连接数

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(connectTimeout) // 连接超时时间
                .setSocketTimeout(socketTimeout) // 读取超时时间
                .setConnectionRequestTimeout(5000) // 从连接池获取连接的超时时间
                .build();

        CloseableHttpClient httpClient = HttpClients.custom()
                .setConnectionManager(connectionManager)
                .setDefaultRequestConfig(requestConfig)
                .build();

        return new HttpComponentsClientHttpRequestFactory(httpClient);
    }
}

package cn.com.dev.data.dev.web.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DolphinschedulerConfig {

    @Value("${lyb.dolphinscheduler.server.username}")
    private String dsdUsername;

    @Value("${lyb.dolphinscheduler.server.token}")
    private String dsdToken;

    @Value("${lyb.dolphinscheduler.server.url}")
    private String dsdUrl;

    @Value("${lyb.dolphinscheduler.server.porjectCode}")
    private String projectCode;

    @Value("${lyb.dolphinscheduler.server.tenantCode}")
    private String tenantCode;

    public String getDsdUsername() {
        return dsdUsername;
    }

    public void setDsdUsername(String dsdUsername) {
        this.dsdUsername = dsdUsername;
    }

    public String getDsdToken() {
        return dsdToken;
    }

    public void setDsdToken(String dsdToken) {
        this.dsdToken = dsdToken;
    }

    public String getDsdUrl() {
        return dsdUrl;
    }

    public void setDsdUrl(String dsdUrl) {
        this.dsdUrl = dsdUrl;
    }

    public String getProjectCode() {
        return projectCode;
    }

    public void setProjectCode(String projectCode) {
        this.projectCode = projectCode;
    }

    public String getTenantCode() {return tenantCode;}

    public void setTenantCode(String tenantCode) {this.tenantCode = tenantCode;}

}

构建一个调用类,该类全面集成了与DolphinScheduler接口的交互逻辑,为我们的应用提供了一层抽象。对于涉及具体业务逻辑的数据封装细节,此处将不再赘述,旨在保持代码的清晰度与通用性。

简言之,我们设计了一个专门的类来处理所有与DolphinScheduler的API调用,确保了业务核心逻辑的独立性和可维护性。这一封装策略使得代码库更加整洁,同时也提升了开发效率和系统的整体健壮性。

通过这种方式,我们不仅隔离了与外部服务的直接交互,还简化了业务逻辑的实现,使其更加专注于核心功能,而非调度系统的细节。这样的架构设计,有助于团队成员快速理解系统架构,同时也便于未来的功能扩展和系统维护。

package cn.com.lyb.data.dev.web.dolphinscheduler.service;


import cn.com.lyb.common.redis.service.RedisService;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.data.dev.enums.ProcessExecutionTypeEnum;
import cn.com.lyb.data.dev.enums.TaskExecutionStatus;
import cn.com.lyb.data.dev.enums.WorkflowExecutionStatus;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import cn.com.lyb.data.dev.workflow.entity.delphinscheduler.TaskDefinition;
import cn.com.lyb.data.dev.workflow.entity.vo.GanttTaskVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ProcessInstanceVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ResponseTaskLog;
import cn.com.lyb.data.dev.workflow.entity.vo.TaskInstanceVO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Service
public class DolphinschedulerService {

    private static final Logger logger = LoggerFactory.getLogger(DolphinschedulerService.class);
    private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
    private static final String X_REQUESTED_WITH = "XMLHttpRequest";

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private DolphinschedulerConfig dolphinschedulerConfig;

    private static final Boolean DSD_SUCCESS = true;

    @Autowired
    private RedisService redisService;

    private static final String DSD_SESSION_KEY = "DSD_SESSION_KEY";

    private static final String SUCCESS = "success";

    private static final String MSG = "msg";

    // 此种方法适用登录后获取SESSION设置到header里面
    private static final String SESSION_ID = "sessionId";

    private static final String TOKEN = "token";

    private static final String DATA = "data";


    /**
     * 登录,返回 sessionId
     */
    public String login() {
        String sessionValue = redisService.getCacheObject(DSD_SESSION_KEY);
        if (StringUtils.isNotBlank(sessionValue)) {
            return sessionValue;
        }
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
        linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());
        //linkedMultiValueMap.add("userPassword", dolphinschedulerConfig.getDsdPassword());

        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/login";
        JSONObject resultJSON = doPostForObject(url, httpEntity);

        JSONObject data = (JSONObject) resultJSON.get(DATA);
        String sessionId = data.get(SESSION_ID).toString();
        redisService.setCacheObject(DSD_SESSION_KEY, sessionId, 23L, TimeUnit.HOURS);
        return sessionId;
    }

    /**
     * 创建项目
     *
     * @return
     */
    public void createProject(String projectName, String description) {
        // 如果是https登录可以使用该方法
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
        linkedMultiValueMap.add("projectName", projectName);
        linkedMultiValueMap.add("description", description);
        linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());

        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        logger.info("Azkaban请求信息:" + httpEntity.toString());
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects";
        doPostForObject(url, httpEntity);
    }

    /**
     * 项目列表
     *
     * @param pageNo
     * @param pageSize
     * @param searchVal
     */
    public Object projectsPage(Integer pageNo, Integer pageSize, String searchVal) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        String url;
        if (StringUtils.isNotBlank(searchVal)) {
            url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;
        } else {
            url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize;
        }
        JSONObject resultJSON = doGetForObject(url, entity);
        return resultJSON.get(DATA);
    }


    /**
     * 修改项目
     *
     * @param code
     * @param projectName
     * @param description
     * @return
     */
    public Object updateProjects(String code, String projectName, String description) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
        linkedMultiValueMap.add("projectName", projectName);
        linkedMultiValueMap.add("description", description);
        linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());

        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;

        JSONObject resultJSON = doPutForObject(url, httpEntity);
        return resultJSON.get(DATA);
    }

    /**
     * 删除项目
     *
     * @param code
     * @return
     */
    public Object delProjects(String code) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;

        JSONObject resultJSON = doDeleteForObject(url, httpEntity);
        return resultJSON.get(DATA);
    }


    public JSONObject doPostForObject(String url, HttpEntity httpEntity) {
        logger.info("调用url:{}", url);
        try {
            ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class);
            String result = exchange.getBody();
            logger.info("post类型接口调用返回信息:{}", result);
            return getJsonObject(result);
        } catch (BizException be) {
            throw be;
        } catch (Exception e) {
            logger.error("post类型接口调用失败:{}", e);
            throw new BizException(e.getMessage());
        }
    }

    public JSONObject doGetForObject(String url, HttpEntity httpEntity) {
        logger.info("调用url:{}", url);
        try {
            ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
            String result = exchange.getBody();
            logger.info("get类型接口调用返回信息:{}", result);
            return getJsonObject(result);
        } catch (BizException be) {
            throw be;
        } catch (Exception e) {
            logger.error("get类型接口调用失败:{}", e);
            throw new BizException(e.getMessage());
        }
    }

    public JSONObject doPutForObject(String url, HttpEntity httpEntity) {
        logger.info("调用url:{}", url);
        try {
            ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, String.class);
            String result = exchange.getBody();
            logger.info("put类型接口调用返回信息:{}", result);
            return getJsonObject(result);
        } catch (BizException be) {
            throw be;
        } catch (Exception e) {
            logger.error("put类型接口调用失败:{}", e);
            throw new BizException(e.getMessage());
        }
    }


    public JSONObject doDeleteForObject(String url, HttpEntity httpEntity) {
        logger.info("调用url:{}", url);
        try {
            ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.DELETE, httpEntity, String.class);
            String result = exchange.getBody();
            logger.info("delete类型接口调用返回信息:{}", result);
            return getJsonObject(result);
        } catch (BizException be) {
            throw be;
        } catch (Exception e) {
            logger.error("delete类型接口调用失败:{}", e);
            throw new BizException(e.getMessage());
        }
    }

    private static JSONObject getJsonObject(String result) {
        JSONObject resultJSON = JSON.parseObject(result);
        if (!DSD_SUCCESS.equals(resultJSON.get(SUCCESS))) {
            logger.error("调用结果返回异常:{}" + result);
            Integer code = (Integer) resultJSON.get("code");
            if(code.intValue() == 50019){
                throw new BizException("流程节点间存在循环依赖");
            }else if(code.intValue() == 50036){
                throw new BizException("工作流任务关系参数错误");
            } else {
                throw new BizException(resultJSON.get(MSG).toString());
            }
        }
        return resultJSON;
    }


    /**
     * 创建工作流
     *
     * @param name
     * @param description
     * @param globalParams
     * @param locations
     * @param timeout
     * @param taskRelationJson
     * @param taskDefinitionJson
     * @param otherParamsJson
     * @param executionType
     * @return 3.2.0 版本
     */

    public Long createWorkFlow320(String name, String description, String globalParams, String locations, int timeout,
                                  String taskRelationJson, String taskDefinitionJson, String otherParamsJson,
                                  ProcessExecutionTypeEnum executionType) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("description", description);
        linkedMultiValueMap.add("name", name);
        linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
        linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
        linkedMultiValueMap.add("timeout", timeout);
        linkedMultiValueMap.add("executionType", executionType);
        linkedMultiValueMap.add("otherParamsJson", otherParamsJson);
        linkedMultiValueMap.add("globalParams", globalParams);

        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";

        JSONObject resultJSON = doPostForObject(url, httpEntity);
        JSONObject data = (JSONObject) resultJSON.get(DATA);
        Long code = (Long) data.get("code");
        return code;
    }

    /**
     * 查询工作流列表
     *
     * @param pageNo
     * @param pageSize
     * @param searchVal
     * @return
     */
    public Object selectFlowPage(Integer pageNo, Integer pageSize, String searchVal) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        String url;
        if (StringUtils.isNotBlank(searchVal)) {
            url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;
        } else {
            url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize;
        }
        JSONObject resultJSON = doGetForObject(url, entity);
        return resultJSON.get(DATA);
    }

    public Object selectOneFlow(String code) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code;
        JSONObject resultJSON = doGetForObject(url, entity);
        return resultJSON.get(DATA);
    }

    public Long createWorkFlow(String name, String description, String locations,
                               String taskDefinitionJson, String taskRelationJson,
                               String executionType) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("description", description);
        linkedMultiValueMap.add("name", name);
        linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
        linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
        linkedMultiValueMap.add("timeout", 0);
        linkedMultiValueMap.add("executionType", executionType);
        linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());
        linkedMultiValueMap.add("locations", locations);

        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";

        JSONObject resultJSON = doPostForObject(url, httpEntity);
        JSONObject data = (JSONObject) resultJSON.get(DATA);
        Long code = (Long) data.get("code");
        return code;
    }

    public void updateReleaseState(String name, String releaseState, Long code) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();

        linkedMultiValueMap.add("name", name);
        linkedMultiValueMap.add("releaseState", releaseState);

        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code + "/release";

        doPostForObject(url, httpEntity);

    }


    public List<TaskDefinition> getTaskByWorkflowCode(Long dsdCode) {
        //SSLUtil.turnOffSslChecking();
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());

        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;

        JSONObject resultJSON = doGetForObject(url, httpEntity);
        JSONObject data = (JSONObject) resultJSON.get(DATA);

        JSONArray jsonArray = (JSONArray) data.get("taskDefinitionList");
        return jsonArray.toJavaList(TaskDefinition.class);
    }

    /**
     * 删除工作流
     *
     * @param codes
     */
    public void delWorkflow(String codes) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("codes", codes);
        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/batch-delete";
        doPostForObject(url, httpEntity);
    }


    public void updateWorkFlow(Long dsdCode, String name, String description, String locations, String taskDefinitionJson, String taskRelationJson, String executionType) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("description", description);
        linkedMultiValueMap.add("name", name);
        linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
        linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
        linkedMultiValueMap.add("timeout", 0);
        linkedMultiValueMap.add("executionType", executionType);
        linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());
        linkedMultiValueMap.add("locations", locations);
        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;
        doPutForObject(url, httpEntity);
    }

    /**
     * 运行工作流
     *
     * @param code
     */
    public void runWorkflow(Long code) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        linkedMultiValueMap.add("processDefinitionCode", code);
        linkedMultiValueMap.add("failureStrategy", "CONTINUE");
        linkedMultiValueMap.add("warningType", "NONE");
        linkedMultiValueMap.add("scheduleTime", getStringDate());
        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/start-process-instance";
        doPostForObject(url, httpEntity);
    }

    public String getStringDate() {
        LocalDateTime currentDateTime = LocalDateTime.now();
        LocalDateTime startDate = currentDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
        LocalDateTime endDate = startDate;
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        String formattedStartDate = startDate.format(formatter);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("complementStartDate", formattedStartDate);
        jsonObject.put("complementEndDate", formattedStartDate);
        return jsonObject.toString();
    }

    /**
     * 获取任务日志
     *
     * @param id
     * @return
     */
    public ResponseTaskLog getLog(Integer id, Integer limit, Integer skipLineNum) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/log/detail?taskInstanceId=" + id + "&limit=" + limit + "&skipLineNum=" + skipLineNum;
        JSONObject resultJSON = doGetForObject(url, entity);
        return JSON.parseObject(resultJSON.get(DATA).toString(), ResponseTaskLog.class);
    }

    /**
     * 重跑任务
     *
     * @param processInstanceId
     */
    public void operation(Integer processInstanceId, String executeType) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
        // 1 REPEAT_RUNNING 重跑 2 STOP 停止 3 RECOVER_SUSPENDED_PROCESS 恢复运行 4 PAUSE 暂停
        linkedMultiValueMap.add("processInstanceId", processInstanceId);
        switch (executeType) {
            case "1":
                addExecutionDetails(linkedMultiValueMap, 1, "REPEAT_RUNNING", "run");
                break;
            case "2":
                linkedMultiValueMap.add("executeType", "STOP");
                break;
            case "3":
                addExecutionDetails(linkedMultiValueMap, 0, "RECOVER_SUSPENDED_PROCESS", "suspend");
                break;
            case "4":
                linkedMultiValueMap.add("executeType", "PAUSE");
                break;
            default:
                throw new BizException("暂不支持该操作");
        }
        HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
        String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/execute";
        doPostForObject(url, httpEntity);
    }

    public void addExecutionDetails(MultiValueMap<String, Object> map, int index, String executeType, String buttonType) {
        map.add("index", String.valueOf(index));
        map.add("executeType", executeType);
        if (buttonType != null) {
            map.add("buttonType", buttonType);
        }
    }

    public PageInfo<ProcessInstanceVO> processInstances(Long dsdWorkflowCode, String searchVal, Integer pageNum, Integer pageSize,
                                                        String startDate, String endDate, String stateType) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);

        StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
                .append("/projects/")
                .append(dolphinschedulerConfig.getProjectCode())
                .append("/process-instances?pageNo=").append(pageNum)
                .append("&pageSize=").append(pageSize)
                .append("&call=").append("1");// 这个必须加,不然删除工作流后,实例会不见
        if(null != dsdWorkflowCode){
            urlBuilder.append("&processDefineCode=").append(dsdWorkflowCode);
        }
        if(StringUtils.isNotBlank(searchVal)){
            urlBuilder.append("&searchVal=").append(searchVal);
        }

        supplementaryParameters(startDate, endDate, stateType, urlBuilder);

        JSONObject resultJSON = doGetForObject(urlBuilder.toString(), entity);

        JSONObject data = (JSONObject) resultJSON.get(DATA);

        JSONArray jsonArray = (JSONArray) data.get("totalList");
        List<ProcessInstanceVO> javaList = jsonArray.toJavaList(ProcessInstanceVO.class);
        Integer total = (Integer) data.get("total");
        PageInfo<ProcessInstanceVO> res = new PageInfo<>();
        res.setList(javaList);
        res.setTotal(total);
        return res;
    }

    private void supplementaryParameters(String startDate, String endDate, String stateType, StringBuilder urlBuilder) {
        if (stateType != null && !stateType.isEmpty()) {
            urlBuilder.append("&stateType=").append(stateType);
        }

        if (startDate != null && !startDate.isEmpty()) {
            urlBuilder.append("&startDate=").append(startDate);
        }

        if (endDate != null && !endDate.isEmpty()) {
            urlBuilder.append("&endDate=").append(endDate);
        }
    }

    public PageInfo<TaskInstanceVO> taskInstances(Integer processInstanceId, String startDate, String endDate,
                                                  String stateType, int pageNum, int pageSize) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
                .append("/projects/")
                .append(dolphinschedulerConfig.getProjectCode())
                .append("/task-instances?pageNo=").append(pageNum)
                .append("&pageSize=").append(pageSize)
                .append("&processInstanceId=").append(processInstanceId)
                .append("&taskExecuteType=").append("BATCH");

        supplementaryParameters(startDate, endDate, stateType, urlBuilder);

        String url = urlBuilder.toString();
        JSONObject resultJSON = doGetForObject(url, entity);
        JSONObject data = (JSONObject) resultJSON.get(DATA);
        JSONArray jsonArray = (JSONArray) data.get("totalList");
        List<TaskInstanceVO> javaList = jsonArray.toJavaList(TaskInstanceVO.class);
        Integer total = (Integer) data.get("total");
        PageInfo<TaskInstanceVO> res = new PageInfo<>();
        res.setList(javaList);
        res.setTotal(total);
        return res;
    }

    /**
     * 获取工作流执行顺序
     * @param processInstanceId
     * @return
     */
    public List<GanttTaskVO> viewGantt(Long processInstanceId) {
        HttpHeaders hs = new HttpHeaders();
        hs.add("Content-Type", CONTENT_TYPE);
        hs.add("X-Requested-With", X_REQUESTED_WITH);
        hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
        HttpEntity<String> entity = new HttpEntity<String>(hs);
        StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
                .append("/projects/")
                .append(dolphinschedulerConfig.getProjectCode())
                .append("/process-instances/")
                .append(processInstanceId)
                .append("/view-gantt");
        String url = urlBuilder.toString();
        JSONObject resultJSON = doGetForObject(url, entity);
        JSONObject data = (JSONObject) resultJSON.get(DATA);
        JSONArray jsonArray = (JSONArray) data.get("tasks");
        List<GanttTaskVO> javaList = jsonArray.toJavaList(GanttTaskVO.class);
        return javaList;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/773012.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开始尝试从0写一个项目--后端(一)

创建文件的目录结构 利用这个界面创建 序号 名称 说明 1 SEMS maven父工程&#xff0c;统一管理依赖版本&#xff0c;聚合其他子模块 2 sems-common 子模块&#xff0c;存放公共类&#xff0c;例如&#xff1a;工具类、常量类、异常类等 3 sems-pojo 子模块&#x…

硅纪元视角 | AI纳米机器人突破癌症治疗,精准打击肿瘤细胞

在数字化浪潮的推动下&#xff0c;人工智能&#xff08;AI&#xff09;正成为塑造未来的关键力量。硅纪元视角栏目紧跟AI科技的最新发展&#xff0c;捕捉行业动态&#xff1b;提供深入的新闻解读&#xff0c;助您洞悉技术背后的逻辑&#xff1b;汇聚行业专家的见解&#xff0c;…

打卡第2天----数组双指针,滑动窗口

今天是参与训练营第二天&#xff0c;这几道题我都看懂了&#xff0c;自己也能写出来了&#xff0c;实现思路很重要&#xff0c;万事开头难&#xff0c;希望我可以坚持下去。希望最后的结果是量变带来质变。 一、理解双指针思想 leetcode编号&#xff1a;977 不止是在卡尔这里…

深入探讨JavaScript中的队列,结合leetcode全面解读

前言 队列作为一种基本的数据结构&#xff0c;为解决许多实际问题提供了有效的组织和处理方式&#xff0c;对于提高系统的稳定性、可靠性和效率具有重要作用&#xff0c;所以理解队列是很重要的。 本文深入探讨JavaScript中的队列这种数据结构,结合leetcode题目讲解 题目直达…

接口测试工具Postman

Postman Postman介绍 开发API后&#xff0c;用于API测试的工具。在我们平时开发中&#xff0c;特别是需要与接口打交道时&#xff0c;无论是写接口还是用接口&#xff0c;拿到接口后肯定都得提前测试一下。在开发APP接口的过程中&#xff0c;一般接口写完之后&#xff0c;后端…

78110A雷达信号模拟软件

78110A雷达信号模拟软件 78110A雷达信号模拟软件(简称雷达信号模拟软件)主要用于模拟产生雷达发射信号和目标回波信号&#xff0c;软件将编译生成的雷达信号任意波数据下载到信号发生器中&#xff0c;主要是1466-V矢量信号发生器&#xff0c;可实现雷达信号模拟产生。软件可模…

TensorRT-Int8量化详解

int8量化是利用int8乘法替换float32乘法实现性能加速的一种方法 对于常规模型有&#xff1a;y kx b&#xff0c;此时x、k、b都是float32, 对于kx的计算使用float32的乘法 对于int8模型有&#xff1a;y tofp32(toint8(k) * toint8(x)) b&#xff0c;其中int8 * int8结果为in…

SpringBoot的热部署和日志体系

SpringBoot的热部署 每次修改完代码&#xff0c;想看效果的话&#xff0c;不用每次都重新启动代码&#xff0c;等待项目重启 这样就可以了 JDK官方提出的日志框架&#xff1a;Jul log4j的使用方式&#xff1a; &#xff08;1&#xff09;引入maven依赖 &#xff08;2&#x…

头歌资源库(20)最大最小数

一、 问题描述 二、算法思想 使用分治法&#xff0c;可以将数组递归地分割成两部分&#xff0c;直到数组长度为1或2。然后比较这两部分的最大、次大、次小、最小数&#xff0c;最终得到整个数组中的最大两个数和最小两个数。 算法步骤如下&#xff1a; 定义一个函数 findMinM…

uniapp/Android App上架三星市场需要下载所需要的SDK

只需添加以下一个权限在AndroidManifest.xml <uses-permission android:name"com.samsung.android.providers.context.permission.WRITE_USE_APP_FEATURE_SURVEY"/>uniapp开发的&#xff0c;需要在App权限配置中加入以上的额外权限&#xff1a;

Generative Modeling by Estimating Gradients of the Data Distribution

Generative Modeling by Estimating Gradients of the Data Distribution 本文介绍宋飏提出的带噪声扰动的基于得分的生成模型。首先介绍基本的基于得分的生成模型的训练方法&#xff08;得分匹配&#xff09;和采样方法&#xff08;朗之万动力学&#xff09;。然后基于流形假…

2024 年 亚太赛 APMCM (B题)中文赛道国际大学生数学建模挑战赛 |洪水灾害数据分析 | 数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题&#xff01; 完整内容可以在文章末尾领取&#xff01; 该段文字…

HTML内容爬取:使用Objective-C进行网页数据提取

网页爬取简介 网页爬取&#xff0c;通常被称为网络爬虫或爬虫&#xff0c;是一种自动浏览网页并提取所需数据的技术。这些数据可以是文本、图片、链接或任何网页上的元素。爬虫通常遵循一定的规则&#xff0c;访问网页&#xff0c;解析页面内容&#xff0c;并存储所需信息。 …

自动化立体仓库出入库能力及堆垛机节拍

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》人俱乐部 完整版文件和更多学习资料&#xff0c;请球友到知识星球【智能仓储物流技术研习社】自行下载 自动化立体仓库的出入库能力、堆垛机节拍以…

用720云搭建数字孪生VR智慧安防系统,赋能安防升级!

“安全防范"一直是我国城镇化发展进程中重点关注的工作板块&#xff0c;随着时代发展需求与科技的日新月异&#xff0c;安防行业正在积极融合VR3D数字孪生技术&#xff0c;升级安防数字基础设施和安防产品服务创新。 今年2月&#xff0c;《数字中国建设整体布局规划》的出…

Pycharm的终端(Terminal)中切换到当前项目所在的虚拟环境

1.在Pycharm最下端点击终端/Terminal, 2.点击终端窗口最上端最右边的∨&#xff0c; 3.点击Command Prompt&#xff0c;切换环境&#xff0c; 可以看到现在环境已经由默认的PS(Window PowerShell)切换为项目所使用的虚拟环境。 4.更近一步&#xff0c;如果想让Pycharm默认显示…

macOS使用Karabiner-Elements解决罗技鼠标G304连击、单击变双击的故障

记录一下罗技鼠标G304单击变双击的软件解决过程和方案&#xff08;适用于macOS&#xff0c; 如果是Windows&#xff0c;使用AutoHotKey也有类似解决办法、方案&#xff0c;改日提供&#xff09;&#xff1a; 背景&#xff1a;通过罗技Logitech G HUB软件对罗技的游戏鼠标侧键b…

1-4 NLP发展历史与我的工作感悟

1-4 NLP发展历史与我的工作感悟 主目录点这里 第一个重要节点&#xff1a;word2vec词嵌入 能够将无限的词句表示为有限的词向量空间&#xff0c;而且运算比较快&#xff0c;使得文本与文本间的运算有了可能。 第二个重要节点&#xff1a;Transformer和bert 为预训练语言模型发…

2024 世界人工智能大会暨人工智能全球治理高级别会议全体会议在上海举办,推动智能向善造福全人类

2024 年 7 月 4 日&#xff0c;2024 世界人工智能大会暨人工智能全球治理高级别会议-全体会议在上海世博中心举办。联合国以及各国政府代表、专业国际组织代表&#xff0c;全球知名专家、企业家、投资家 1000 余人参加了本次会议&#xff0c;围绕“以共商促共享&#xff0c;以善…

搜维尔科技:如何使用 SenseGlove Nova 加速手部运动功能的恢复

District XR 的VR 培训 5 年多来&#xff0c;District XR 一直在为最大的工业公司创建 VR 和 AR 项目。 客户&#xff1a;District XR 客户代表&#xff1a;尼古拉沃尔科夫 他的角色&#xff1a;District XR 首席执行官 面临解决的挑战 该公司正在寻找一种方法来加速身体伤…