前提
通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ ,今天就将 SpringBoot 和 RocketMQ 整合起来使用。
SpringBoot 系列文章
相关文章
1、SpringBoot Kafka 整合使用
2、SpringBoot RabbitMQ 整合使用
3、SpringBoot ActiveMQ 整合使用
4、Kafka 安装及快速入门
5、SpringBoot RabbitMQ 整合进阶版
6、RocketMQ 初探
7、RocketMQ 安装及快速入门
关注我
转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/
创建项目
在 IDEA 创建一个 SpringBoot 项目,项目结构如下:
pom 文件
引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.zhisheng</groupId> <artifactId>rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>rocketmq</name> <description>Demo project for Spring Boot RocketMQ</description>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.2.0</version> </dependency>
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
|
配置文件
application.properties 中如下:
1 2 3 4 5 6
| # 消费者的组名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=localhost:9876
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package com.zhisheng.rocketmq.client;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch;
import javax.annotation.PostConstruct;
@Component public class RocketMQClient {
@Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr;
@PostConstruct public void defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch(); stop.start();
for (int i = 0; i < 10000; i++) { SendResult result = producer.send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } stop.stop(); System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package com.zhisheng.rocketmq.server;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component public class RocketMQServer {
@Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr;
@PostConstruct public void defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr); try { consumer.subscribe("TopicTest", "push");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
|
启动类
1 2 3 4 5 6 7 8 9 10 11 12
| package com.zhisheng.rocketmq;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RocketmqApplication {
public static void main(String[] args) { SpringApplication.run(RocketmqApplication.class, args); } }
|
RocketMQ
代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。
启动 Name Server
在前面文章中已经写过怎么启动,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer
进入到目录 :
1
| cd distribution/target/apache-rocketmq
|
启动:
1 2 3
| nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功
|
启动 Broker
1 2 3
| nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log //通过日志查看是否启动成功
|
然后运行启动类,运行效果如下:
监控
RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,地址在:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console ,官方也给出了其支持的功能的中文文档:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md , 那么该如何安装?
Docker 安装
1、获取 Docker 镜像
1
| docker pull styletang/rocketmq-console-ng
|
2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1
1
| docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
|
非 Docker 安装
我们 git clone 一份代码到本地:
1 2 3
| git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console/
|
需要 jdk 1.7 以上。 执行以下命令:
或者
1 2 3
| mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar
|
注意:
1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 为阿里云的镜像
1 2 3 4 5 6 7 8
| <mirrors> <mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> </mirrors>
|
2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false
(或者您可以在 ops 页面中更改它)
3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)
错误解决方法
1、Docker 启动项目报错
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
将 Docker 启动命令改成如下以后:
1
| docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
|
报错信息改变了,新的报错信息如下:
1 2 3
| ERROR op=global_exception_handler_print_error
org.apache.rocketmq.console.exception.ServiceException: This date have't data!
|
看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!
搞了很久这种方法没成功,暂时放弃!mmp
2、非 Docker 安装,只好把源码编译打包了。
1) 注意需要修改如下图中的配置:
1 2 3 4
| rocketmq.config.namesrvAddr=localhost:9876 //注意替换你自己的ip
#如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的 rocketmq.config.isVIPChannel=
|
2) 执行以下命令:
1
| mvn clean package -Dmaven.test.skip=true
|
编译成功:
可以看到已经打好了 jar 包:
运行:
1
| java -jar rocketmq-console-ng-1.0.0.jar
|
成功,不报错了,开心😄,访问 http://localhost:8080/
整个监控大概就是这些了。
然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:
总结
整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建。
参考文章
1、http://www.ymq.io/2018/02/02/spring-boot-rocketmq-example/#%E6%96%B0%E5%8A%A0%E9%A1%B9%E7%9B%AE
2、GitHub 官方 README