生产者流量限制

ActiveMQ支持为每个生产者单独设置流量控制。流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞直到资源可以继续使用,或者抛出一个JMSException

同步发送消息(useAsynSend为false)和异步发送消息时(useAsynSend为true),流量控制实现的方式不一样的。

参考:https://activemq.apache.org/producer-flow-control

issue:

2023-07-25 08:22:57,687 | WARN| Usage Manager Memory Limit (751619277) reached on queue://quality_inspection_detail_append, size 900148. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info. | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.233.84.53:49990@61616

同步发送流控设置

同步发送消息:此时将在SystemUsage的限制下,使用destinationPolicy中的policyEntry来实现流量控制。

<destinationPolicy><policyMap><policyEntries><policyEntry queue="Foo" producerFlowControl="true"memoryLimit="1mb"> <pendingQueuePolicy><vmQueueCursor/></pendingQueuePolicy></policyEntry></policyEntries></policyMap></destinationPolicy>

限制队列Foo的最大内存为1M,当使用内存达到1M时,该生产者将直接被阻塞至有空余的内存时才会继续发送消息。
也可以在systemUsage上设置sendFailIfNoSpace="true"sendFailIfNoSpaceAfterTimeout="3000"来控制客户端的异常和等待时间。

异步发送流控设置

异步发送消息:由于不阻塞生产者,可以通过connctionFactory.setProducerWindowSize(1024000)来控制broker确认收到消息前生产者能发送的最大数据量。

Usage(default:memory:queue://Consumer.AppOne.VirtualTopic.Process.Incoming:memory) percentUsage=100%, usage=1048828, limit=1048576, percentUsageMinDelta=1%;Parent:Usage(default:memory) percentUsage=0%, usage=4414167, limit=1073741824, percentUsageMinDelta=1%: Usage Manager Memory Limit reached. Producer (ID:hostname-22563-1396035238078-1:1:2:1) stopped to prevent flooding queue://Consumer.AppOne.VirtualTopic.Process.Incoming. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 301s)

参考:https://www.sourceallies.com/2014/10/activemq-memory-tuning

附录

参考

shell参数:https://activemq.apache.org/unix-shell-script.html

memory tuning:https://www.sourceallies.com/2014/10/activemq-memory-tuning

生产者流量控制:https://activemq.apache.org/producer-flow-control

资源限制

 <systemUsage sendFailIfNoSpace="true" sendFailIfNoSpaceAfterTimeout="30000"><memoryUsage><memoryUsage percentOfJvmHeap="70" /></memoryUsage><storeUsage><storeUsage limit="100 gb"/></storeUsage><tempUsage><tempUsage limit="50 gb"/></tempUsage></systemUsage> 

systemUsage属性:

  • sendFailIfNoSpace:空间不足,而导致不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常。
  • sendFailIfNoSpaceAfterTimeout:导致send()方法失败,并在客户端抛出异常,但仅当等待了指定时间之后才触发。单位是毫秒。

memoryUsage可以设置为:

  • percentOfJvmHeap:表示使用“百分数值”进行设置。
  • limit:指定值。limit=”1000 mb”

tempUsagenon persistent时的消息存储空间。消息进入mq后,首先会占用memoryUsagepersistent模式下,消息总是会被另外存储于指定介质(默认KahaDB),如果non persistent,当消息占用内存量超过memoryUsage的70%(默认值,可调整),则消息会被交换到临时区域(如果默认FileCursor,则会以文件形式存储于磁盘空间),这个交换过程会很慢,所以如果这种情况频繁存在,设置PFC很有必要了。如果消息的消费能力有限,tempUsage可以设置大一些,不然很容易出现tempusage超限情况。还有一点要注意,tempusage不能小于32m,而且最好还要大于memoryUsage,不然对于情况1,转存单个文件默认大小32M,写文件还未完成tempusage已经超限了,情况2也可能会出现内存的消息还未完全交换到temp区,tempusage已经超限的情况。

镜像启动参数(since 1.17.3)

/entrypoint.sh

  • ACTIVEMQ_USERNAME:用户名,替换conf/credentials.propertiesactivemq.username=system
  • ACTIVEMQ_PASSWORD:密码,替换conf/credentials.propertiesactivemq.password=manager
  • ACTIVEMQ_WEBADMIN_USERNAME:Web Admin,替换conf/jetty-realm.propertiesadmin: admin, admin
  • ACTIVEMQ_WEBADMIN_PASSWORD:Web Admin,替换conf/jetty-realm.propertiesadmin: admin, admin

${ACTIVEMQ_CONFIG}/bin/activemq

  • ACTIVEMQ_OPTS_MEMORY:内存设置。
  • ACTIVEMQ_OPTS:如果未设置,则使用ACTIVEMQ_OPTS_MEMORY的内存设置,并增加参数“-Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config”。
  • ACTIVEMQ_DEBUG_OPTS:调试参数。
  • ACTIVEMQ_SUNJMX_START:JMX参数
  • ACTIVEMQ_SSL_OPTS:SSL参数

参数配置:

  • /etc/default/activemq
  • $HOME/.activemqrc
  • $ACTIVEMQ_HOME/bin/env

后面的优先级高

env中支持的从docker外部配置的环境变量:

  • ACTIVEMQ_OPTS_MEMORY
  • ACTIVEMQ_OPTS
  • ACTIVEMQ_OUT
  • ACTIVEMQ_QUEUEMANAGERURL
  • ACTIVEMQ_SSL_OPTS
  • ACTIVEMQ_DEBUG_OPTS
  • ACTIVEMQ_KILL_MAXSECONDS
if [ -z "$ACTIVEMQ_OPTS_MEMORY" ] ; thenACTIVEMQ_OPTS_MEMORY="-Xms64M -Xmx1G"fiif [ -z "$ACTIVEMQ_OPTS" ] ; thenACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.conffiif [ -z "$ACTIVEMQ_OUT" ]; then ACTIVEMQ_OUT="/dev/null"fi# Specify the queue manager URL for using "browse" option of sysv initscriptif [ -z "$ACTIVEMQ_QUEUEMANAGERURL" ]; thenACTIVEMQ_QUEUEMANAGERURL="--amqurl tcp://localhost:61616"fi# Set additional JSE argumentsif [ -z "$ACTIVEMQ_SSL_OPTS" ] ; then#ACTIVEMQ_SSL_OPTS="-Djava.security.properties=$ACTIVEMQ_CONF/java.security"ACTIVEMQ_SSL_OPTS=""fi# Uncomment to enable remote debugging#ACTIVEMQ_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"if [ -z "$ACTIVEMQ_KILL_MAXSECONDS" ]; thenACTIVEMQ_KILL_MAXSECONDS=30fi