# MessageQueue 消息队列
MornBoot支持通用API操作多种消息队列中间件,同时提供了基于注解的本地消息分发策略。
Since:v1.2.1
目前支持:
Kafka- RocketMQ
开始使用MornBoot前,请确保应用与MQ中间件已经连通
# 必要配置
SpringBootApplication
@EnableCaching // 开启缓存
# 推荐配置
无
# Maven of RocketMQ
<!--Morn-->
<dependency>
<groupId>site.morn.boot</groupId>
<artifactId>morn-boot-rocket</artifactId>
</dependency>
<!--Rocket-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocket.version}</version>
</dependency>
# Reference
# Send Message
使用BroadcastMessageBuilder构建消息,
使用BroadcastMessageSendingOperations发送消息。
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RocketSendingOperationsTest {
@Autowired
private BroadcastMessageSendingOperations sendingOperations;
@Test
public void syncSend() {
// 创建消息内容
TestUser user = new TestUser();
user.setId(2L);
user.setUsername("bar");
// 构建消息
BroadcastMessage<TestUser> message = BroadcastMessageBuilder.withPayload(user)
.setTopic("userData").setType("add").build();
// 同步发送消息
MessageResult messageResult = sendingOperations.syncSend(message);
Assert.assertNotNull(messageResult);
Assert.assertTrue(messageResult.isSuccess());
}
}
原生结果(如
SendResult)转成MessageResult的过程是隐式的, 可以通过注册MessageResultConverter覆盖现有实现。
# Consume Message
消息消费使用原生框架,示例为RocketMQ。
使用BroadcastMessageResolvingOperations进行本地消息分发。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "testGroup", topic = "userData")
public class UserMessageConsumer implements RocketMQListener<MessageExt> {
private final BroadcastMessageResolvingOperations<MessageExt> resolvingOperations;
public UserMessageConsumer(BroadcastMessageResolvingOperations<MessageExt> resolvingOperations) {
this.resolvingOperations = resolvingOperations;
}
@Override
public void onMessage(MessageExt message) {
resolvingOperations.syncResolve(message);
}
}
# Handle Message
基于注解编写消息处理类和方法。
消息处理类的@MessageTopic注解对应消息的主题Topic,
消息处理方法的@MessageType注解对应消息的类型Type。
消息处理方法允许3种类型的参数,分别为消息体类型、
BroadcastMessage、BroadcastMessageHeaders。 参数顺序不限,消息体必选,且必须使用@Payload注解。BroadcastMessage和BroadcastMessageHeaders可选。 注意:消息体会反序列化为@Payload参数类型(即TestUser),务必确保二者类型一致。
@Slf4j
@Component
@MessageTopic("userData")
public class UserMessageHandler {
@MessageType("add")
public void addUser(@Payload TestUser user, BroadcastMessageHeaders headers) {
log.info("Message id {}", headers.getId());
log.info("Add user: {}", user.getUsername());
}
@MessageType("update")
public void updateUser(@Payload TestUser user, BroadcastMessageHeaders headers) {
log.info("Message id {}", headers.getId());
log.info("Update user: {}", user.getUsername());
}
}
原生消息(如
MessageExt)解析成BroadcastMessage的过程是隐式的, 可以通过注册BroadcastMessageHeaderResolver、BroadcastMessagePayloadResolver覆盖现有实现。
# Async Send Message
异步发送示例。
ListenableFuture<MessageResult> future = sendingOperations.asyncSend(message);
future.addCallback(result -> log.info("Rocket send success."),
ex -> log.info("Rocket send failure."));
更多自定义配置及实现参考
MessageAutoConfiguration、RocketAutoConfiguration。