最近在做呼叫中心的时候,其中回调事件需要发送消息队列,经过选型用了ActiveMQ,虽然比较熟悉的还是Kafka,但总有第一次嘛,所以把碰到的一些注意点罗列一下,避免后人再踩。
集群模式说明
ActiveMQ集群其实是Master-Slave模式,跟Kafka不一样,ActiveMQ只有master节点提供服务,如果master节点宕机,那么slave节点会转为master节点;
队列使用
ActiveMQ分两种消息机制:Queue和Topic;
- Queue:队列形式,一个消息只能被消费一次,如果由多个业务方都要消费该队列,这种形式是不支持的;
- 优点:
- 简单
- 支持业务端多节点消费
- 缺点:
- 仅能提供一个业务进行消费,不适合多业务消费场景
- 优点:
- Topic:topic形式,Kafka就是这种模式,
- 优点:
- 支持可以多业务方进行消费
- 缺点:
- 但如果一个业务想保证消费的幂等性,那么需要设置ClentId属性,但设置了ClientId以后只能一个实例进行消费
- 优点:
因此使用的过程中,这两种模式都有一定的缺陷。
那么ActvieMQ其实还有一种模式:虚拟Topic(VirtualTopic),这种结构是这样的:对于生产端来说是个Topic,但对于消费端来说其实是个Queue。
虚拟Topic只要按照约定命名即可,以“VirtualTopic.”开头,消费的时候“Consumer.xxx.VirtualTopic.”开头,xxx对于不同的业务可以定义不同的值,如下图所示
这样消费端就可以多消费者消费了,各业务又互不影响。
failover策略
由于开始运维经验不足,导致ActiveMQ集群宕机了几次,也暴露了问题,就是默认情况下,在宕机情况下,生产端无法发消息会导致该线程一直等待,那么结果显而易见,消息挤压一多,直接导致服务不可用,不再接收新请求。
解决方案显而易见,就是增加超时机制,连接串如下:
failover://(tcp://192.168.5.1:61616,tcp://192.168.5.2:61616,tcp://192.168.5.3:61616)?nested.wireFormat.maxInactivityDuration=2000&nested.connectionTimeout=2000&maxReconnectAttempts=2&timeout=3000&initialReconnectDelay=10&startupMaxReconnectAttempts=2&maxReconnectDelay=1000
- nested.wireFormat.maxInactivityDuration:最大检测不活跃时间,2000毫秒
- nested.connectionTimeout:连接超时事件,2000毫秒
- maxReconnectAttempts:重连次数
- timeout:超时时间
- initialReconnectDelay:首次尝试延时
- startupMaxReconnectAttempts:最大重试次数
- maxReconnectDelay:最大重连间隔