本文记录Pipeline设计模式在业务流程编排中的应用

前言

Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。

本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整条流水线的组装,并将“原材料”加工为“商品”。其中管道产品负责承载各个阶段的产品信息;管道任务负责不同阶段对产品的加工;管道节点约束了管道产品及任务的关系,通过信号量定义了任务的执行方式。

依赖

工具依赖如下

                                        cn.hutool                hutool-all                最新版本            

编程示例1. 管道产品定义

package com.example.demo.pipeline.model;/** * 管道产品接口 * * @param  信号量 * @author  * @date 2023/05/15 11:49 */public interface PipelineProduct {}

2. 管道任务定义

package com.example.demo.pipeline.model;/** * 管道任务接口 * * @param 

管道产品 * @author * @date 2023/05/15 11:52 */@FunctionalInterfacepublic interface PipelineJob

{ /** * 执行任务 * * @param product 管道产品 * @return {@link P} */ P execute(P product);}

3. 管道节点定义

package com.jd.baoxian.mall.market.service.pipeline.model;import java.util.function.Predicate;/** * 管道节点定义 * * @param  信号量 * @param 

管道产品 * @author * @date 2023/05/15 11:54 */public interface PipelineNode<S, P extends PipelineProduct> { /** * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob * * @param pipelineJob 管道任务 * @return {@link PipelineNode} */ PipelineNode flax(PipelineJob

pipelineJob); /** * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob * * @param signal 信号 * @param pipelineJob 管道任务 * @return {@link PipelineNode} */ PipelineNode flax(S signal, PipelineJob

pipelineJob); /** * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob * * @param predicate 信号 * @param pipelineJob 管道任务 * @return {@link PipelineNode} */ PipelineNode flax(Predicate predicate, PipelineJob

pipelineJob); /** * 管道节点-任务执行 * * @param product 管道产品 * @return {@link P} */ P execute(P product);}

4. 管道产品、任务,节点的实现4.1 管道产品

package com.example.demo.pipeline.factory;import com.example.demo.model.request.DemoReq;import com.example.demo.model.response.DemoResp;import com.example.demo.pipeline.model.PipelineProduct;import lombok.*;/** * 样例-管道产品 * * @author  * @date 2023/05/15 14:04 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class DemoPipelineProduct implements PipelineProduct {    /**     * 信号量     */    private DemoSignalEnum signal;    /**     * 产品-入参及回参     */    private DemoProductData productData;    /**     * 异常信息     */    private Exception exception;    /**     * 流程Id     */    private String tradeId;    @Data    @Builder    @NoArgsConstructor    @AllArgsConstructor    public static class DemoProductData {        /**         * 待验证入参         */        private DemoReq userRequestData;        /**         * 待验证回参         */        private DemoResp userResponseData;    }    /**     * 产品-信号量     *     * @author      * @date 2023/05/15 13:54     */    @Getter    public enum DemoSignalEnum {        /**         *         */        NORMAL(0, "正常"),        /**         *         */        CHECK_NOT_PASS(1, "校验不通过"),        /**         *         */        BUSINESS_ERROR(2, "业务异常"),        /**         *         */        LOCK_ERROR(3, "锁处理异常"),        /**         *         */        DB_ERROR(4, "事务处理异常"),        ;        /**         * 枚举码值         */        private final int code;        /**         * 枚举描述         */        private final String desc;        /**         * 构造器         *         * @param code         * @param desc         */        DemoSignalEnum(int code, String desc) {            this.code = code;            this.desc = desc;        }    }}

4.2 管道任务(抽象类)

package com.example.demo.pipeline.factory.job;import cn.hutool.core.util.ClassUtil;import cn.hutool.json.JSONUtil;import com.example.demo.pipeline.factory.DemoPipelineProduct;import com.example.demo.pipeline.model.PipelineJob;import lombok.extern.slf4j.Slf4j;/** * 管道任务-抽象层 * * @author  * @date 2023/05/15 19:48 */@Slf4jpublic abstract class AbstractDemoJob implements PipelineJob {    /**     * 公共执行逻辑     *     * @param product 产品     * @return     */    @Override    public DemoPipelineProduct execute(DemoPipelineProduct product) {        DemoPipelineProduct.DemoSignalEnum newSignal;        try {            newSignal = execute(product.getTradeId(), product.getProductData());        } catch (Exception e) {            product.setException(e);            newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR;        }        product.setSignal(newSignal);        defaultLogPrint(product.getTradeId(), product);        return product;    }    /**     * 子类执行逻辑     *     * @param tradeId     流程Id     * @param productData 请求数据     * @return     * @throws Exception 异常     */    abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception;    /**     * 默认的日志打印     */    public void defaultLogPrint(String tradeId, DemoPipelineProduct product) {        if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) {            log.info("流水线任务处理异常:流程Id=【{}】,信号量=【{}】,任务=【{}】,参数=【{}】", tradeId, product.getSignal(),                    ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());        }    }}

4.3 管道节点

package com.example.demo.pipeline.factory;import cn.hutool.core.util.ClassUtil;import cn.hutool.json.JSONUtil;import com.example.demo.pipeline.model.PipelineJob;import com.example.demo.pipeline.model.PipelineNode;import lombok.extern.slf4j.Slf4j;import java.util.function.Predicate;/** * 审核-管道节点 * * @author  * @date 2023/05/15 14:32 */@Slf4jpublic class DemoPipelineNode implements PipelineNode {    /**     * 下一管道节点     */    private DemoPipelineNode next;    /**     * 当前管道任务     */    private PipelineJob job;    /**     * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob     *     * @param pipelineJob 管道任务     * @return {@link DemoPipelineNode}     */    @Override    public DemoPipelineNode flax(PipelineJob pipelineJob) {        return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob);    }    /**     * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob     *     * @param signal      信号     * @param pipelineJob 管道任务     * @return {@link DemoPipelineNode}     */    @Override    public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob pipelineJob) {        return flax(signal::equals, pipelineJob);    }    /**     * 节点组装,上个管道过来的信号运行 predicate 后是true的话,执行 pipelineJob     *     * @param predicate     * @param pipelineJob     * @return     */    @Override    public DemoPipelineNode flax(Predicate predicate,                                 PipelineJob pipelineJob) {        this.next = new DemoPipelineNode();        this.job = (job) -> {            if (predicate.test(job.getSignal())) {                return pipelineJob.execute(job);            } else {                return job;            }        };        return next;    }    /**     * 管道节点-任务执行     *     * @param product 管道产品     * @return     */    @Override    public DemoPipelineProduct execute(DemoPipelineProduct product) {        // 执行当前任务        try {            product = job == null ? product : job.execute(product);            return next == null ? product : next.execute(product);        } catch (Exception e) {            log.error("流水线处理异常:流程Id=【{}】,任务=【{}】,参数=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());            return null;        }    }}

5. 业务实现

通过之前的定义,我们已经可以通过Pipeline完成流水线的搭建,接下来以“审核信息提交”这一业务场景,完成应用。

5.1 定义Api、入参、回参

package com.example.demo.api;import com.example.demo.model.request.DemoReq;import com.example.demo.model.response.DemoResp;import com.example.demo.pipeline.factory.PipelineForManagerSubmit;import org.springframework.stereotype.Service;import javax.annotation.Resource;/** * 演示-API * * @author  * @date 2023/08/06 16:27 */@Servicepublic class DemoManagerApi {    /**     * 管道-审核提交     */    @Resource    private PipelineForManagerSubmit pipelineForManagerSubmit;    /**     * 审核提交     *     * @param requestData 请求数据     * @return {@link DemoResp}     */    public DemoResp managerSubmit(DemoReq requestData) {        return pipelineForManagerSubmit.managerSubmitCheck(requestData);    }}package com.example.demo.model.request;/** * 演示入参 * * @author  * @date 2023/08/06 16:33 */public class DemoReq {}package com.example.demo.model.response;import lombok.Data;/** * 演示回参 * * @author  * @date 2023/08/06 16:33 */@Datapublic class DemoResp {    /**     * 成功标识     */    private Boolean success = false;    /**     * 结果信息     */    private String resultMsg;    /**     * 构造方法     *     * @param message 消息     * @return {@link DemoResp}     */    public static DemoResp buildRes(String message) {        DemoResp response = new DemoResp();        response.setResultMsg(message);        return response;    }}

5.2 定义具体任务

假定审核提交的流程需要包含:参数验证、加锁、解锁、事务提交

package com.example.demo.pipeline.factory.job;import cn.hutool.json.JSONUtil;import com.example.demo.model.request.DemoReq;import com.example.demo.pipeline.factory.DemoPipelineProduct;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;/** * 加锁-实现层 * * @author  * @date 2023/05/17 17:00 */@Service@Slf4jpublic class CheckRequestLockJob extends AbstractDemoJob {    /**     * 子类执行逻辑     *     * @param tradeId     流程Id     * @param productData 请求数据     * @return     * @throws Exception 异常     */    @Override    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {        DemoReq userRequestData = productData.getUserRequestData();        log.info("任务[{}]加锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);        return DemoPipelineProduct.DemoSignalEnum.NORMAL;    }}package com.example.demo.pipeline.factory.job;import cn.hutool.json.JSONUtil;import com.example.demo.model.request.DemoReq;import com.example.demo.pipeline.factory.DemoPipelineProduct;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;/** * 解锁-实现层 * * @author  * @date 2023/05/17 17:00 */@Service@Slf4jpublic class CheckRequestUnLockJob extends AbstractDemoJob {    /**     * 子类执行逻辑     *     * @param tradeId     流程Id     * @param productData 请求数据     * @return     * @throws Exception 异常     */    @Override    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {        DemoReq userRequestData = productData.getUserRequestData();        log.info("任务[{}]解锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);        return DemoPipelineProduct.DemoSignalEnum.NORMAL;    }}package com.example.demo.pipeline.factory.job;import cn.hutool.json.JSONUtil;import com.example.demo.model.request.DemoReq;import com.example.demo.pipeline.factory.DemoPipelineProduct;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * 审核-参数验证-实现类 * * @author  * @date 2023/05/15 19:50 */@Slf4j@Componentpublic class ManagerCheckParamJob extends AbstractDemoJob {    /**     * 执行基本入参验证     *     * @param tradeId     * @param productData 请求数据     * @return     */    @Override    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) {        /*         * 入参验证         */        DemoReq userRequestData = productData.getUserRequestData();        log.info("任务[{}]入参验证,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);        // 非空验证        // 有效验证        // 校验通过,退出        return DemoPipelineProduct.DemoSignalEnum.NORMAL;    }}package com.example.demo.pipeline.factory.job;import cn.hutool.json.JSONUtil;import com.example.demo.model.request.DemoReq;import com.example.demo.model.response.DemoResp;import com.example.demo.pipeline.factory.DemoPipelineProduct;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;/** * 审核-信息提交-业务实现 * * @author  * @date 2023/05/12 14:36 */@Service@Slf4jpublic class ManagerSubmitJob extends AbstractDemoJob {    /**     * 子类执行逻辑     *     * @param tradeId     流程Id     * @param productData 请求数据     * @return     * @throws Exception 异常     */    @Override    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {        DemoReq userRequestData = productData.getUserRequestData();        try {            /*             * DB操作             */            log.info("任务[{}]信息提交,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);            productData.setUserResponseData(DemoResp.buildRes("成功"));        } catch (Exception ex) {            log.error("审核-信息提交-DB操作失败,入参:{}", JSONUtil.toJsonStr(userRequestData), ex);            throw ex;        }        return DemoPipelineProduct.DemoSignalEnum.NORMAL;    }}

5.3 完成流水线组装

针对入回参转换,管道任务执行顺序及执行信号量的构建

package com.example.demo.pipeline.factory;import com.example.demo.model.request.DemoReq;import com.example.demo.model.response.DemoResp;import com.example.demo.pipeline.factory.job.CheckRequestLockJob;import com.example.demo.pipeline.factory.job.CheckRequestUnLockJob;import com.example.demo.pipeline.factory.job.ManagerCheckParamJob;import com.example.demo.pipeline.factory.job.ManagerSubmitJob;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.Objects;import java.util.UUID;/** * 管道工厂入口-审核流水线 * * @author  * @date 2023/05/15 19:52 */@Slf4j@Service@RequiredArgsConstructorpublic class PipelineForManagerSubmit {    /**     * 审核-管道节点     */    private final DemoPipelineNode managerSubmitNode = new DemoPipelineNode();    /**     * 审核-管道任务-提交-防刷锁-加锁     */    private final CheckRequestLockJob checkRequestLockJob;    /**     * 审核-管道任务-提交-防刷锁-解锁     */    private final CheckRequestUnLockJob checkRequestUnLockJob;    /**     * 审核-管道任务-参数验证     */    private final ManagerCheckParamJob managerCheckParamJob;    /**     * 审核-管道任务-事务操作     */    private final ManagerSubmitJob managerSubmitJob;    /**     * 组装审核的处理链     */    @PostConstruct    private void assembly() {        assemblyManagerSubmit();    }    /**     * 组装处理链     */    private void assemblyManagerSubmit() {        managerSubmitNode                // 参数验证及填充                .flax(managerCheckParamJob)                // 防刷锁                .flax(checkRequestLockJob)                // 事务操作                .flax(managerSubmitJob)                // 锁释放                .flax((ignore) -> true, checkRequestUnLockJob);    }    /**     * 审核-提交处理     *     * @param requestData 入参     * @return     */    public DemoResp managerSubmitCheck(DemoReq requestData) {        DemoPipelineProduct initialProduct = managerSubmitCheckInitial(requestData);        DemoPipelineProduct finalProduct = managerSubmitNode.execute(initialProduct);        if (Objects.isNull(finalProduct) || Objects.nonNull(finalProduct.getException())) {            return DemoResp.buildRes("未知异常");        }        return finalProduct.getProductData().getUserResponseData();    }    /**     * 审核-初始化申请的流水线数据     *     * @param requestData 入参     * @return 初始的流水线数据     */    private DemoPipelineProduct managerSubmitCheckInitial(DemoReq requestData) {        // 初始化        return DemoPipelineProduct.builder()                .signal(DemoPipelineProduct.DemoSignalEnum.NORMAL)                .tradeId(UUID.randomUUID().toString())                .productData(DemoPipelineProduct.DemoProductData.builder().userRequestData(requestData).build())                .build();    }}

总结

本文重点为管道模式的抽象与应用,上述示例仅为个人理解。实际应用中,此案例长于应对各种规则冗杂的业务场景,便于规则编排。
待改进点:

  1. 各个任务其实隐含了执行的先后顺序,此项内容可进一步实现;

  2. 针对最后“流水线组装”这一步,可通过配置描述的方式,进一步抽象,从而将变动控制在每个“管道任务”的描述上,针对规则项做到“可插拔”式处理。

作者:京东保险 侯亚东