背景:
因为项目需要异步接受各方的mq消息进行发起流程/完成任务等操作。但是发起流程因为业务逻辑较重,在并发tps很高的时候容易导致mysql竞争激烈(分库分表逻辑需要优化),进而导致消费速率跟不上生产速率,导致mq消息积压。所以要针对这种情况进行监控-告警-降级
方案:
详细介绍
1.消费速率怎么统计
消费速率和限流算法(令牌桶)不太一样,一个是限制流量最大值问题,一个要计算实时或者周期内的均值流量。本项目使用的是滑动窗口,统计时间窗口内的流量均值。
2.获取rabbitmq队列积压数量
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();// 创建连接Connection connection = connectionFactory.createConnection();// 创建通道Channel channel = connection.createChannel(false);// 设置消息交换机channel.exchangeDeclare("amp.topic", "direct", true, false, null);DeclareOk declareOk = channel.queueDeclarePassive(LETTER_CLASS_QUEUE);//获取队列中的消息个数int queueCount = declareOk.getMessageCount();result.put("queueCount", String.valueOf(queueCount));// 关闭通道和连接channel.close();connection.close();
3.进入同步队列算法
进入同步队列的时候必须进行报警,数据进入数据库进行持久化
4.定时任务轮询存在的问题,以及如何优化
定时任务持续轮询浪费资源,当数据持久化到mq时,创建线程池持续从数据库拉取数据消费。
为了保证服务稳定,请求速率稳定需要实现:流量整形:分布式令牌桶,流量监控
5.数据库的数据消费速率设置多少合适
借鉴TCP的拥塞控制实现速率的慢启动、拥塞避免、快重试
6.怎么设计成通用的组件
模型抽象,注解,拦截器,sdk