ActiveMQ 支持 JMS 规范中的 2 种消息转发模式,支持消息事务,支持异步发送消息和生产者流量控制,提供了 4 种消息消费应答模式,提供消息发送失败后重试机制,
本文分别对 ActiveMQ 消息转发模式,事务,消息过期,消息积压,应答模式,重试,死信队列等相关配置进行描述。
消息转发模式
ActiveMQ 支持 持久化(PERSISTENT)和 非持久化(NON_PERSISTENT) 两种消息转发模式。更多参考 MQ系列(一):ActiveMQ特性,概念,持久化,安装,应用集成 的 消息持久化 小节。
持久化/非持久化
持久化(PERSISTENT):
更看重消息可靠性,持久化需要先把消息存储起来再传递,Producer 需要等待 Consumer 的 receipt 消息,会损失一些性能。
非持久化(NON_PERSISTENT):
相对更看重性能,非持久化模式下发送消息是异步的,Producer 不需要等待 Consumer 的 receipt 消息。
设置转发模式
在 ActiveMQ 中设置消息转发模式有两种方式:
创建生产者(MessageProducer)时设置转发模式,对此生产者的所有消息有效。
void setDeliveryMode(int deliveryMode) throws JMSException;
生产者在发送消息时设置转发模式,只对当前消息有效。
void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException; void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
转发模式(deliveryMode)常量类:javax.jms.DeliveryMode
public interface DeliveryMode {
static final int NON_PERSISTENT = 1;
static final int PERSISTENT = 2;
}
消息积压
如果消息服务转发的消息没有得到及时回复,则会导致持久化消息不断积压而得不到释放,从而堵塞消息队列。对此情况,可以通过配置消息的过期时间和死信队列处理来预防。
消息过期
默认情况下,ActiveMQ 的消息是永不过期的。若业务场景需要,可以给消息指定过期时间。
设置过期时间有两种方式:
创建生产者(MessageProducer)时设置消息过期时间,对此生产者的所有消息有效。
void setTimeToLive(long timeToLive) throws JMSException;
生产者在发送消息时设置消息过期时间,只对当前消息有效。
void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException; void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
如果 timeToLive 为 0,则表示该消息永不过期。如果消息已到过期时间仍未被发送到目的地,则该消息将被清除。
消息积压处理
消息积压可以通过配置消息 过期时间 和 死信队列 来处理和预防。
Broker 不会将过期的消息发送给消费者端,持久化消息过期默认会进入死信队列,且不会被自动清除。
对于过期的消息进入死信队列可以配置一些处理策略,比如直接抛弃死信队列、定时抛弃死信队列、设置慢消费者策略等。更多可参考下面的 消息重传与死信队列 小节。
产生消息积压的问题多种多样,因定期检查消息队列数据的吞吐速率,保持生产和消费的平衡才不会出现大量积压。
消息事务
ActiveMQ 支持两种消息事务:
JMS Transaction:
使用 Session 接口的 commit 和 rollback 方法来控制,这与 JDBC connection 中的 commit 和 rollback 很象。
XA Transsaction:
为支持两阶段提交协义(XA),通过使用 XASession 与消息服务器通过来充当 XAResource,这与 XA 事务中的 JDBC connection 类似。
因 XA 性能较差,很少使用,所以通常所说的消息事务指的是第一种,JMS事务。
JMS Transaction
以消息生产者为例,如果连接创建会话开启了事务,Producer 在发送消息时会在消息中附加一个事务号(Transaction ID),然后可以在事务中发送多条消息。
Broker 在接收到消息后会判断是否有 Transaction ID,如果有就把消息保存在 Transaction Store 中等待提交或回滚消息。
所以,这里的事务概念不是针对 Producer 的,而是针对 Broker 的,所以不管 Session 有没有提交,Broker 都会收到消息,且消息的生产和消费不能被包含在同一个事务中。
如果消息转发是 PERSISTENT 模式且消息过期,则默认进入死信队列,在进入死信队列之前 ActiveMQ 会删除消息中的 Transaction ID,这样过期的消息就不在事务中了。
如果需要,可以使用事务来组合消息的发送和接收。commit 和 rollback 方法一旦被调用,就表示一个事务结束,另一个事务开始。
- 事务提交表示生产者的所有消息都被发送,消费的所有消息都被确认。
- 事务回滚表示生产的消息被销毁,消费的消息被恢复并重新提交。
消息事务示例
在使用连接创建会话时设置是否开启事务。
Connection 接口创建 Session 的方法:
Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
开启和提交事务示例:
public class MQTransaction {
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建全话,启用事务,Session.SESSION_TRANSACTED 事务消息
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic txTopic = session.createTopic("activemq-topic-tx-test1");
MessageProducer producer = session.createProducer(txTopic);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("发送消息:" + 1);
producer.send(txTopic, message);
//每发送10条消息就提交事务
if((i % 10) == 0){
session.commit();
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消息重传与死信队列
消息重传
当发生下列任一情况时,消息将重新传递给消费者端:
开启了事务,使用事务 session 并调用了
rollback()
方法回滚了消息。开启了事务,但在
commit()
方法之前,事务 session 就已关闭。会话使用的是
CLIENT_ACKNOWLEDGE
应答模式并且调用了Session.recover()
。Session.recover()
是停止会话中的消息传递,并重新启动(session)传递最旧未确认的消息。客户端连接超时(也许实际连接需要的时间超过了配置的时间)。
Broker 在其 BrokerInfo 命令包中将其使用的默认传递策略传递给客户端连接。客户端可以使用 ActiveMQConnection.getRedeliveryPolicy()
方法来重置策置,如下:
RedeliveryPolicy policy = ((ActiveMQConnectionFactory) connectionFactory).getRedeliveryPolicy();
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);
一旦消息的重发尝试超过了为重发策略配置的最大重发数(maximumRedeliveries
),就会发送一个Poison ACK
(毒ACK)到 Broker,让其知道消息被认为是poison pill
(毒丸-毒消息)。Broker 将接收消息并将其发送到死信队列(Dead Letter Queue
)以便后续对其进行分析处理。
死信队列
ActiveMQ 默认的死信队列 名为 ActiveMQ.DLQ
,所有无法传递的消息都将发送到此队列。
ActiveMQ 提供的死信策略有:
SharedDeadLetterStrategy
死信策略的默认实现,使用一个固定目的地(队列)接收死信消息。
IndividualDeadLetterStrategy
通过配置具有层次化的名称,使每个目的地可以有自己单独的死信队列。
使用默认的死信队列接收所有无法传递的信是很难管理的,无法识别消息来源和目的地。因此,可以在 Activemq.xml
配置文件的目标策略映射中设置 individualDeadLetterStrategy
策略,从而可以为给定的队列或主题指定的死信队列前缀。
可以根据需要使用通配符来应用此策略,以便所有队列都具有自己的死信队列,如下例所示。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<deadLetterStrategy>
<!-- 使用 'DLQ.' 前缀做为目标名, 并将 DLQ 设置为队列而不是主题 -->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
跳过DLQ
某些业务场景允许将过期的消息丢弃,而不是发送到 DLQ
,这简化了 DLQ 的管理,也就不必筛选大量已过期的消息来查找问题消息。
要让 ActiveMQ 只丢弃过期的消息,需要在死信策略中将 processExpired
属性设置为 false
,如下示例:
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
非持久消息入DLQ
默认情况下,ActiveMQ 不会将无法传递的非持久消息放置在死信队列中。
这样处理的考虑是,如果应用程序不太关心使消息持久存在,则记录该消息无法传递就没有任何价值。 如果确实要在死信队列上放置非持久消息,则应在死信策略上设置 processNonPersistent ="true"
。
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
DLQ消息过期
默认情况下,ActiveMQ 将永不过期发送到 DLQ
的消息。 但是,从ActiveMQ 5.12 开始,deadLetterStrategy
支持expiration
属性,其值以毫秒
为单位。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="QueueWhereItIsOkToExpireDLQEntries">
<deadLetterStrategy>
<.... expiration="300000"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
注意:
在如何应用这一点上要有选择性。特别是不要通过在默认或包含通配符策略项上设置过期时间并应用于DLQ
消息。如果一个DLQ
过期并转发到同一个或另一个有过期的DLQ
,则会引入一个循环,如果策略审核
被禁用或超出了它的滑动窗口,则可能会出现问题。
DLQ消息审核
死信策略默认情况下启用了消息审核。这样可以防止将重复的消息添加到已配置的 DLQ 中。
从5.15.0开始,可以通过 maxProducersToAudit
和 maxAuditDepth
属性来配置审核限制,使用 enableAudit="false"
关闭审核。
丢弃DLQ插件
从 ActiveMQ 5.9 开始,目标策略 policyEntry
支持 DeadLetterStrategy
丢弃:
<deadLetterStrategy>
<discarding/>
</deadLetterStrategy>
这与插件具有相同的功能,但基于每个目标。基于插件的正则表达式进行的匹配比目标匹配要强大一些,因此在某些情况下该插件可能仍然有用。
一个非常简单但非常有用的 Broker插件。该插件允许配置队列和主题(全部或基于Java SE正则表达式匹配),以丢弃已发送到 DLQ
的消息。
当使用的等待消息限制策略或其丢弃规则时,但是又不想引入另一个消费者来清除DLQ而产生开销,这就极其有用。
下示是清除所有内容的基本配置示例:
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true"/>
</plugins>
</broker>
</beans>
下面是一个稍微复杂的示例:
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"/>
</plugins>
</broker>
</beans>
- 指定目标名称丢弃。注意,目标名称以空格分隔。
reportInterval
属性用于表示输出丢弃消息的频率,设置为0
表示禁用。
下面是一个更复杂的例子:
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000"/>
</plugins>
</broker>
</beans>
此示例指定要丢弃的目标名称,目标名称使用了正则表达式,与每个目标名称末尾的数字 000...999
相匹配。
更多消息,可查看 DiscardingDLQBrokerPlugin 和 DiscardingDLQBroker 源码。
消费者应答模式
对于消息消费者,除了可以用事务的方式来告知 Broker 一批消息已成功处理之外,实际上更常用的是设置消息应答模式。
应答模式类型
在 javax.jms.Session 接口中定义了 4 种应答模式;ActiveMQ 扩展了 JMS 的 Session,新增了一种 INDIVIDUAL_ACKNOWLEDGE
模式 。
应答模式描述了 Consumer 与 Broker 确认消息的时机。即 Consumer 接收到消息后,将在何时向 Broker 发送 ACK 指令表示消息已正确接收,Broker 只有接收到客户端回传的 ACK 指令,才会认为消息被正确接收或者处理成功,才会删除消息。通过 ACK,Consumer 与 Broker 之间建立了一种简单的 确认 机制。
javax.jms.Session:
public interface Session extends Runnable, AutoCloseable {
static final int AUTO_ACKNOWLEDGE = 1;
static final int CLIENT_ACKNOWLEDGE = 2;
static final int DUPS_OK_ACKNOWLEDGE = 3;
static final int SESSION_TRANSACTED = 0;
//.....省略其它.....
}
org.apache.activemq.ActiveMQSession: 继承自 javax.jms.Session
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
//......省略其它.....
}
JMS 规范四种确认类型:
AUTO_ACKNOWLEDGE = 1
会话自动确认:当消费者通过 receive 或 onMessage(消息监听器)成功返回消息时,Session 自动确认客户端成功收到消息。
CLIENT_ACKNOWLEDGE = 2
消费者手动确认:当消费者成功接收消息后,必须显示调用
javax.jms.Message#acknowledge()
方法确认消息被消费,否则 ActiveMQ 认为消费不成功。注意:消费者在处理消息但还未确认时,可能会积压大量未确认的消息。JMS应用(ActiveMQ)应为管理员提供一种限制客户端溢出的方法,以便在客户端正在使用的某些资源被暂时阻塞时,不会导致客户端资源用尽而失败。
DUPS_OK_ACKNOWLEDGE = 3
自动批量确认:具有延迟确认的特点。消费者按照一定的策略向 Broker 发送一个 ack 标识,表示一批消息处理完成。对开发者而言,此模式的代码结构与
AUTO_ACKNOWLEDGE
一样,但是批量确认。如果 Consumer 故障后,那些已处理而ACK因延迟而未确认的消息会被 Broker 重新发送,则可能导致重复消息。但此模式减少了会话开销,所以只能在可以容忍重复消息的业务场景中使用。可以通过最小化会话来降低重复消息的概率。
SESSION_TRANSACTED = 0
事务确认:指定会话使用本地事务。
如果会话正在使用本地事务,则可使用
javax.jms.Session#getAcknowledgeMode
方法获取此模式的值,而不管是使用createSession(int sessionMode)
或是createSession(boolean transacted, int acknowledgeMode)
方法来创建会话。
ActiveMQ 新增的确认类型:
INDIVIDUAL_ACKNOWLEDGE = 4
逐条确认:消息者将会逐条向 Broker 发送 ack 标识,性能较差,除非业务特别需要,否则一般不建议使用。
一般情况优,优先使用 AUTO_ACKNOWLEDGE
模式自动确认消息。
设置应答模式
应答模式设置通常是在调用 Connection 接口的 createSession
方法创建 session 时设置,如下:
javax.jms.Connection:
Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
//使用
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Stomp消息推送
Web 端通过 WebSocket 与 ActiveMQ 通信,订阅消息队列中的消息;服务端使用 Stomp 协议与 ActiveMQ 连接发送消息,实现向 Web 页面消息推送。
ActiveMQ 默认的传输连接器配置
<!-- 传输连接器通过给定协议把 ActiveMQ 暴露给客户端和其它代理服务器 --> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
创建服务端,使用 stomp 协议连接。
如果是 SpringBoot 项目,入引的是
spring-boot-starter-activemq
依赖,还需要添加以下依赖包,才能使用 stomp 连接 ActiveMQ 服务器。pom.xml
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-stomp</artifactId> </dependency>
StompProducer.java
public class StompProducer { public static void main(String[] args) { StompConnection connection = new StompConnection(); try { connection.open("localhost", 61613); connection.connect(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD); String message = "好货半价, 速来抢购"; connection.send("/topic/shopping-discount", message); connection.disconnect(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
创建 Web 页面,引入
stomp.js
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>ActiveMQ 消息通知</title> <script type="text/javascript" src="js/stomp.js"></script> </head> <script type="text/javascript"> var url = "ws://localhost:61614"; var destination = "/topic/shopping-discount"; var client = Stomp.client(url); var callbackMSG = function(message) { if(message.body) { alert("got message with body:" + message.body) } else { alert("got empty message"); } }; var connect_callback = function(frame) { client.subscribe(destination, callbackMSG); }; var error_callback = function(error) { alert(error.headers.message); }; var headers = { login: 'admin', passcode: 'admin', // additional header //'client-id': 'my-client-id' }; client.connect(headers, connect_callback, error_callback); </script> <body> </body> </html>
运行
StompProducer.java
,页面会弹提示框,显示消息内容。
相关参考
官方文档
相关博文
注意:本文归作者所有,未经作者允许,不得转载