RocketMQ实战

PunkLu 2020年02月01日 108次浏览
RocketMQ实战

RocketMQ实战

消息批量发送

RocketMQ消息批量发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率,使用方法:

public class SimpleBatchProducer{
	public static void main(String args) throws Exception{
		DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
		producer.setNamesrvAddr("127.0.0.1:9876");
		producer.start();
		String topic = "BatchTest";
		List<Message> messages = new ArrayList();
		message.add(new Message(topic,"Tag","OrderID001","Hello world1".getBytes()));
		message.add(new Message(topic,"Tag","OrderID002","Hello world2".getBytes()));
		message.add(new Message(topic,"Tag","OrderID003","Hello world3".getBytes()));
		System.out.println(producer.send(messages));
		producer.shutdown();
	}
}

消息发送队列自选择

消息发送默认根据主题的路由信息(主题消息队列)进行负载均衡,负载均衡机制为轮询策略。例如有这样一个场景,订单的状态变更消息发送到特定主题,为了避免消息消费者同时消费同一订单的不同状态的变更信息,应该使用顺序消息。为了提高消息消费的并发度,如果能根据某种负载均衡算法,相同订单的不同消息能统一发到同一个消息消费队列上,则可以避免引入分布式锁,RocketMQ在消息发送时提供了消息队列选择器MessageQueueSelector:

String[] tags = new String[]{"TagA","TagB","TagC","TagD","TagE"};
for(int i = 0 ;i < 100; i++){
	int orderId = i % 10;
	Message msg = new Message("TopicTestjjj",tags[i % tags.length],"KEY" + i,"Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
	SendResult sendResult = producer.send(msg,new MessageQueueSelector(){
		public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){
			Integer id = (Integer) arg;
			int index = id % mqs.size():
			return mqs.get(index);
		}
	},orderId);
	System.out.printf("%s%n",sendResult);
}

消息过滤

TAG模式过滤

例子:

for(int i = 0;i< 10; i++){
	if(i % 2 == 0){
		Message msg = new Message("TopicFilter7","TOPICA_TAG_ALL","OrderID001","Helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET));
		System.out.printf("%s%n",producer.send(msg));
	}else{
		Message msg = new Message("TopicFilter7","TOPICA_TAG_ORD","OrderID001","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
		System.out.println("%s%n",producer.send(msg));
	}
}

STEP1:

在消息发送时,可以为每一条消息设置一个TAG标签,消息消费者订阅自己感兴趣的TAG,一般使用的场景是,对于同一类的功能创建一个主题,但对于该主题下的数据,可能不同的系统关心的数据不一样,可以设置不同的TAG。

STEP2:

消费者组订阅相同的主题不同的TAG,多个TAG用“|”分隔。同一个消费组订阅的主题,TAG必须相同。

SQL表达模式过滤

例子:

SQL表达式消息发送方式:

Message msg = new Message("TopicTest","TagA","Tag",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","1");
msg.putUserProperty("sellerId","21");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);

STEP1:

基于SQL92表达式消息过滤,其实是对消息的属性运用SQL过滤表达式进行条件匹配,所以消息发送时应该调用putUserProperty方法设置消息属性。

STEP2:

订阅模式为一条SQL条件过滤表达式,上下文环境为消息的属性。

基于SQL过滤消息消费者构建示例:

consumer.subscribe("TopicTest",MessageSelector.bySql("(orderStatus is not null and orderStatus > 0)"));

事务消息

以一个订单流转流程为例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景。通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常有下面几步:

  1. A系统创建订单并入库
  2. 发送消息到MQ
  3. MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。

方案一:

伪代码:

public Map createOrder(){
	Map result = new HashMap();
	// 执行下订单相关的业务流程,例如操作本地数据库落库相关代码
	// 调用消息发送端API发送消息
	// 返回结果,提交事务
	return result;
}

方案一有以下弊端:

  1. 如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间的数据不一致
  2. 由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要验证订单是否存在时可能出现订单不存在的情况。
  3. 消息发送可以考虑异步发送

方案二:

由于存在上述问题,在MQ不支持事务消息的前提条件下,可以采用下面的方式进行优化。

public Map createOrder(){
	Map result = new HashMap();
	// 执行下订单相关的业务流程,例如操作本地数据库落库相关代码
	// 生成事务消息唯一业务提示,将该业务表示组装到待发送的消息体中
	// 往待发送消息表中插入一条记录,本次唯一消息发送业务ID,消息JSON、创建时间、发送状态
	// 将消息体返回到控制器层
	// 返回结果,提交事务
	return result;
}

然后在控制器层异步发送消息,同时需要引入定时机制,去扫描待发送消息记录,避免消息丢失。

方案二有以下弊端:

  1. 消息有可能重复发送,但在消费端可通过唯一业务编号来进行去重设计
  2. 实现过于复杂,为了避免极端情况下的消息丢失,需要使用定时任务。

方案三:

基于RocketMQ事务消息

STEP1:订单下单伪代码

public Map createOrder(){
	Map result = new HashMap():
	// 执行下订单相关的业务流程,例如操作本地数据库落库相关代码
	// 生成事务消息唯一业务表示,将该业务表示到待发送的消息体中,方便消息消费端进行幂等消费
	// 调用消息客户端API,发送事务prepare消息消费
	// 返回结果,提交事务
	return result;
}

上述是第一步,发送事务消息,接下来需要实现TransactionListener,实现执行本地事务与本地事务回查。

public class OrderTransactionListenerImpl implements TransactionListener{
	private ConcurrentHashMap<String,Integer> countHashMap = new ConcurrentHashMap();
	private final static int MAX_COUNT = 5;
	
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg,Object arg){
		// 从消息中获取唯一业务ID
		String bizUniNo = msg.getUserProperty("bizUniNo");
		// 将bizUniNo入库,表名:t_message_transaction,表结构bizUniNo(主键),业务类型
		return LocalTransactionState.UNKNOW;
	}
	
	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg){
		Integer status = 0;
		// 从数据库查询t_message_transaction表,如果该表中存在记录,则提交
		String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID
		// 然后查询t_message_transaction表,是否存在bizUniNo,如果存在,则返回COMMIT_MESSAGE
		// 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE
		if(query(bizUniNo) > 0 ){
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		return rollBackOrUnown(bizUniNo);
	}
	
	public int query(String bizUniNo){
		// 从数据库中查询编号为bizUniNo的记录数量并返回
	}
	
	public LocalTransactionState rollBackOrUnown(String bizUniNo){
		Integer num = countHashMap.get(bizUniNo);
		if(num != null && ++num > MAX_COUNT){
			countHashMap.remove(bizUniNo);
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		if(num == null){
			num = new Integer(1);
		}
		countHashMap.put(bizUniNo,num);
		return LocalTransactionState.UNKNOW;
	}
}

TransactionListener实现要点如下:

  1. executeLocalTransaction

    该方法主要是设置本地事务状态,与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。故在这里,主要是向t_message_transaction添加一条记录,在事务回查时,如果存在记录,就认为是该消息需要提交,其返回值建议返回LocalTransactionState.UNKNOW。

  2. checkLocalTransaction

    该方法主要是告知RocketMQ消息是需要提交还是回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交,如果不存在,可以设置回查次数,如果指定次数内还是未查到消息,则回滚,否则返回未知。RocketMQ会按一定的频率回查事务,回查次数也有限制,默认为5次,可配置。