![RocketMQ技术内幕:RocketMQ架构设计与实现原理(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/686/40935686/b_40935686.jpg)
1.1 获取和调试RocketMQ的源码
RocketMQ最早是阿里巴巴内部使用的消息中间件,于2016年提交到Apache基金会,成为Apache基金会的顶级开源项目[1]。在GitHub网站上搜索RocketMQ,主页如图1-1所示。
![014-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/014-1.jpg?sign=1739321630-l3ntZdrIZQwbC5T9pYAZV7kdFSgaUo7U-0-67b9bd4808e06564ae665b895dc0fb3b)
图1-1 GitHub RocketMQ搜索界面
1.1.1 Eclipse获取RocketMQ源码
下面介绍Eclipse获取RocketMQ源码的方法。
第一步:单击右键,从菜单中选择Import Git,弹出如图1-2所示的界面。
![014-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/014-2.jpg?sign=1739321630-xd7sve4dtlgngQ80nAXyYEuVK7W9ZvcJ-0-35c1675fb9cf03db3a6d91c7738a1b62)
图1-2 Import界面
第二步:单击Next按钮,弹出Import Projects from Git界面,如图1-3所示。
![014-3](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/014-3.jpg?sign=1739321630-ak5QNDdBDUBbLSB8sLIYwWsoMnYqMuyF-0-7291544a2d82e56a0439c242c5ea34f1)
图1-3 Import Projects from Git界面
第三步:单击Next按钮,选择Clone URI,得到的界面如图1-4所示。
![015-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/015-1.jpg?sign=1739321630-GHdOQsSI3PdUe8bcwGc4E5QFX02j52wi-0-4adb5a7f675cd7b0dae78896d893a99b)
图1-4 选择Clone URI后得到的界面
第四步:继续单击Next按钮进入下一步,选择代码分支,如图1-5所示。
![015-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/015-2.jpg?sign=1739321630-23m588btp0JEiw7yynahXb7kFk4wRN6g-0-5295f0266da1b7e4e8dd6a942b4210b2)
图1-5 Import Projects from Git选择分支
第五步:选择需要的分支后单击Next按钮,进入代码存放目录,如图1-6所示。
![016-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/016-1.jpg?sign=1739321630-TNQzpkDOU2vnLduuIZOB2GaEwkgmiYPQ-0-56b082b06a67f1393202ee65e584fd49)
图1-6 选择源码存放路径
第六步:单击Next按钮,Eclipse将从远程仓库下载代码,如图1-7所示。
![016-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/016-2.jpg?sign=1739321630-AEZRG5SyIWFplQEC9MHhommT5dDT7iBJ-0-fb8f192242aafa66852fa8298364d62d)
图1-7 Cloning from git界面
第七步:将代码下载到指定目录后,默认选择Import existing Eclipse projects(单分支),这里手动选择Import as general project(多分支),单击Finish按钮,导入成功,如图1-8所示。
![017-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/017-1.jpg?sign=1739321630-KneRzk1axsWKAxuF7zMw4cH1EAy3Me4I-0-31057d283fe83364113f9e15e51c1f0b)
图1-8 Cloning from git界面
第八步:代码导入成功后,需要将项目转换成Maven项目。导入成功后的效果如图1-9所示。
![017-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/017-2.jpg?sign=1739321630-vZUzwjTTVtSCcwMKBAGjYaQHOeQUCPZv-0-0427b7a087a75844f6dcf42a5228cc70)
图1-9 导入项目初始状态
第九步:单击鼠标右键,从菜单中选择rocketmq_new(文件下载目录名)→Configure→Configure and Detect Nested Projects...,将项目转换成Maven项目,如图1-10所示。
![018-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/018-1.jpg?sign=1739321630-wsNbn3coy2KuF9LqqM3xucLjiEyNytNq-0-f8735da8675f764bb50365ad073ba124)
图1-10 转换Maven项目
第十步:单击Finish按钮,执行Maven项目转换,完成RocketMQ的导入,如图1-11所示。
![018-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/018-2.jpg?sign=1739321630-0u2XJwdsXxuH0zyGKy9t9Tj6012pvJWr-0-c736552a7bb1925d203b8e3ac4fad534)
图1-11 完成RocketMQ的导入
转换过程中可能会弹出如图1-12所示提示框。
![019-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/019-1.jpg?sign=1739321630-eFtdkD3ga8EtBbevFVCr3fnFUKSXcxpw-0-ef7d35f5a0dd26dec4d861d96e36fc5d)
图1-12 转换Maven项目过程中弹出的提示框
解决办法有3种。
1)修改根pom.xml文件,找到如代码清单1-1所示的条目,加上注释。
代码清单1-1 修改根pom.xml文件
<!-- <plugin> <artifactId>maven-help-plugin</artifactId> <version>2.2</version> <executions> <execution> <id>generate-effective-dependencies-pom</id> <phase>generate-resources</phase> <goals> <goal>effective-pom</goal> </goals> <configuration> <output>${project.build.directory}/effective-pom/effective-depende ncies.xml</output> </configuration> </execution> </executions> </plugin> --> <!-- <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.19.1</version> <configuration> <forkCount>1</forkCount> <reuseForks>true</reuseForks> </configuration> </plugin> -->
2)注释remoting模块下pom.xml文件中的部分代码,如代码清单1-2所示。
代码清单1-2 注释pom.xml文件的部分代码
<!-- <dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> <version>1.1.33.Fork22</version> <classifier>${os.detected.classifier}</classifier> </dependency> -->
3)单击右键,选中一个项目,Maven→Update Project...,如图1-13所示。
![020-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/020-1.jpg?sign=1739321630-frKRGisqylUc4k8Jr7L1ggbLPbDAcmyr-0-b4cf113aa4b14c6b364dbca18c1179aa)
图1-13 更新Maven项目
1.1.2 Eclipse调试RocketMQ源码
本节将展示如何在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。
1. 启动NameServer
第一步:展开namesrv模块,右键选中NamesrvStartup.java,将其拖曳到Debug As,选中Debug Configurations,这时会弹出Debug Configurations界面,如图1-14所示。
![021-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/021-1.jpg?sign=1739321630-r22QHWbbAi96uLiM8zTFfTuDiC6ParxP-0-cef66bc9a52993721824d0c2e392dc06)
图1-14 选择Debug Configurations界面
第二步:选中Java Application条目并单击右键,选择New,弹出Debug Configurations界面,如图1-15所示。
![021-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/021-2.jpg?sign=1739321630-6gFoqGTfvj9uCHbuE2uoyDFG0ApsGcbu-0-9525c16f3b823b119f3ceeeda3869a12)
图1-15 设置环境变量
第三步:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKETMQ_HOME。
第四步:在RocketMQ运行主目录中创建conf、store、logs三个文件夹,如图1-16所示。
![022-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/022-1.jpg?sign=1739321630-DRRCCv6MSo67GgsudN7jw0E91nJxXRMA-0-7b319c6963bdd178e9e94ca7231c2d8a)
图1-16 RocketMQ主目录
第五步:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件只须修改日志文件的目录,broker.conf文件内容如代码清单1-3所示。
代码清单1-3 broker.conf文件
brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 #nameServer 地址,分号分割 namesrvAddr=127.0.0.1:9876 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=D:\\rocketmq\\store #CommitLog 存储路径 storePathCommitLog=D:\\rocketmq\\store\\commitlog # 消费队列存储路径 storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue # 消息索引存储路径 storePathIndex=D:\\rocketmq\\store\\index #Checkpoint 文件存储路径 storeCheckpoint=D:\\rocketmq\\store\\checkpoint #abort 文件存储路径 abortFile=D:\\rocketmq\\store\\abort
第六步:在Eclipse Debug中运行NamesrvStartup,输出“The Name Server boot success. Serializetype=JSON”。
2. 启动Broker
第一步:展开Broker模块,右键选中BrokerStartup.java,将其拖曳到Debug As,选中Debug Configurations,弹出如图1-17所示的界面,选择Arguments选项卡,配置-c属性,指定broker配置文件路径。
![023-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/023-1.jpg?sign=1739321630-l8XUHWVGuSMtFjoGjH9NooCCmbjZZ8rq-0-5cf12a91475ffac9f58be816cf3095c7)
图1-17 Arguments选项卡配置
第二步:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。
![024-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/024-1.jpg?sign=1739321630-IPpthHKM9YKPXMw3Bo69tzmE1cOqSfIP-0-9875cf440ed0c0b43bb2b3da2d1c0ec1)
图1-18 Environment选项卡配置
第三步:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker. log文件。未报错则表示启动成功,如代码清单1-4所示。
代码清单1-4 Broker启动日志截图
2021-05-01 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes 2021-05-01 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes 2021v-05-01 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
3. 使用RocketMQ提供的实例验证消息发送与消息消费
第一步:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer的地址,如代码清单1-5所示。
代码清单1-5 消息发送示例程序
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET)/* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace() ; Thread.sleep(1000); } } producer.shutdown(); } }
第二步:运行该示例程序,查看运行结果。如果输出代码清单1-6所示的结果,则表示消息发送成功。
代码清单1-6 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000, offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
第三步:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer的地址,如代码清单1-7所示。
代码清单1-7 消息消费示例程序
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
第四步:运行消息消费者程序,如果输出如代码清单1-8所示的结果,则表示消息消费成功。
代码清单1-8 消息消费结果
Consumer Started. ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443, bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510, storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419, UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,说明RocketMQ调试环境已经搭建成功,可以直接调试源码,探知RocketMQ的奥秘了。
1.1.3 IntelliJ IDEA获取RocketMQ源码
第一步:在IntelliJ IDEA VCS菜单中选择Get from Version Control...,如图1-19所示。
![026-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/026-1.jpg?sign=1739321630-VHeKs6RSlqD7isaulmOibrlH1v4epdf3-0-6db555bdeeb435a8327eb41c76fc5a28)
图1-19 VCS菜单
第二步:在弹出的对话框中输入RocketMQ源码地址,选择保存的本地路径,单击Clone按钮,如图1-20所示。
![027-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/027-1.jpg?sign=1739321630-zJfUvVWJJtMbeLW8KTvfvvrI1BvwcsvN-0-78289a1b11772a4c367a287aff5e0227)
图1-20 Version Control界面
状态栏有代码下载的进度,如图1-21所示。
![027-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/027-2.jpg?sign=1739321630-gawIzBD9gWxjcF3oU9O4pHOB8dxMo0xn-0-2ebe33262f1eee5bb9290fd40f7f13bb)
图1-21 RocketMQ Cloning进度条
第三步:源码导入成功后,效果如图1-22所示。
![027-3](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/027-3.jpg?sign=1739321630-JDDcCBtfys75uWWCxghKcgJ7ZLLBUt7h-0-2a71ff39fa53c1159a5c3640909ec84f)
图1-22 RocketMQ项目结构
第四步:执行Maven命令clean install,下载并编译依赖,可以看到控制台显示BUILD SUCCESS的提示信息,如图1-23所示。
![028-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/028-1.jpg?sign=1739321630-j4eE1KDgu6gvkzW96l8Xk8VqVtZjW6D2-0-62d106b01e6a00ceff1541e3e9866f29)
图1-23 提示信息
1.1.4 IntelliJ IDEA调试RocketMQ源码
本节将展示如何在IntelliJ IDEA中启动NameServer、Broker,并编写一个消息发送与消息消费示例程序。
1. 启动NameServer
第一步:展开namesrv模块,鼠标右键选中NamesrvStartup.java,将其拖曳到Debug As,选中Debug 'NamesrvStartup.java.main()',弹出如图1-24、图1-25所示的界面。
![029-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/029-1.jpg?sign=1739321630-F4ktryMUGsaw8PEQmg6Exgyef64oB5i0-0-1ddc456556e5cef9966e796d46c83977)
图1-24 NamesrvStartup Debug界面
![029-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/029-2.jpg?sign=1739321630-5CFSgo0OSAbLcb6WzM9NhuyHZ1NU2ouB-0-b7d6a46e5ceafa5fb6548f7826f8b0d0)
图1-25 NamesrvStartup Debug Configuration界面
第二步:单击Environment variables后面的按钮,弹出Environment Variables界面,如图1-26所示。
![030-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/030-1.jpg?sign=1739321630-LFsVKOTfUgIGpFpwgDJUHXykyDNlZh0v-0-e8ec9808e9cb7bcf3e840adc4d60af25)
图1-26 Environment Variables列表
第三步:单击“+”,在Name输入框中输入ROCKETMQ_HOME,在Value输入框中输入源码的保存路径。单击OK按钮,回到Debug Configuration界面。再单击OK按钮,如图1-27所示。
![030-2](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/030-2.jpg?sign=1739321630-2HLixNiQOi7HRH2kAjAIv6YH1DVRQb9y-0-23ba75e77a1fde9a961b160393a6efcd)
图1-27 增加ROCKETMQ_HOME环境变量
第四步:在RocketMQ运行主目录中创建conf、logs、store文件夹。
第五步:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml、logback_namesrv.xml等文件复制到conf目录中,按需修改logback_broker.xml、logback_namesrv.xml文件中日志文件的目录,broker.conf文件目录内容如代码清单1-9所示。
代码清单1-9 broker.conf文件
brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 #nameServer地址,分号分割 namesrvAddr=127.0.0.1:9876 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH # 存储路径 storePathRootDir=D:\\rocketmq\\store #CommitLog存储路径 storePathCommitLog=D:\\rocketmq\\store\\commitlog # 消费队列存储路径 storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue # 消息索引存储路径 storePathIndex=D:\\rocketmq\\store\\index #checkpoint文件存储路径 storeCheckpoint=D:\\rocketmq\\store\\checkpoint #abort文件存储路径 abortFile=D:\\rocketmq\\store\\abort
第六步:在IntelliJ IDEA Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。
2. 启动Broker
第一步:展开Broker模块,鼠标右键执行BrokerStartup.java,会提示需要配置ROCKETMQ_HOME。在idea右上角选中Debug Configurations,在弹出的界面中选择arguments选项卡,配置-c属性,指定broker配置文件的路径,如图1-28所示。
![031-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/031-1.jpg?sign=1739321630-kqbBsU8V6DKvnTr8a2ze2g7l87W9SggX-0-cb6b2e5bc9af71556d7bffe7f422c831)
图1-28 设置环境变量
第二步:切换选项卡Environment,配置RocketMQ主目录和broker配置文件,如图1-29所示。
![032-1](https://epubservercos.yuewen.com/33872A/21276926801738806/epubprivate/OEBPS/Images/032-1.jpg?sign=1739321630-SSvDXzkjRSslL3EpzgBWX0z2YrNtquLX-0-bb0c582321f4950f12a206266170b0e0)
图1-29 运行或调试运行时的环境设置
第三步:以Debug模式运行BrokerStartup.java,查看 ${ROCKET_HOME}/logs/broker. log文件,未报错则表示Broker启动成功,如代码清单1-10所示。
代码清单1-10 Broker启动日志截图
2021-05-01 17:14:27 INFO PullRequestHoldService - PullRequestHoldService service started 2021-05-01 17:14:28 INFO main - register broker to name server 127.0.0.1:9876 OK 2021-05-01 17:14:28 INFO main - The broker[broker-a, 192.168.41.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2021-05-01 17:14:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 534 bytes 2021-05-01 17:14:38 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK 2021-05-01 17:14:41 INFO ClientManageThread_1 - new consumer connected, group: please_rename_unique_group_name_4 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081078] 2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null] 2021-05-01 17:14:41 INFO ClientManageThread_1 - subscription changed, add new topic, group: please_rename_unique_group_name_4 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null] 2021-05-01 17:14:41 INFO ClientManageThread_1 - registerConsumer info changed ConsumerData [groupName=please_rename_unique_group_name_4, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311, expressionType=null], SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1529053720326, expressionType=null]]] 192.168.41.1:50635 2021-05-01 17:14:41 INFO ClientManageThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911 - R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA, version=253, lastUpdateTimestamp=1529054081079]
3. 使用RocketMQ提供的实例验证消息发送与消息消费
第一步:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者的NameServer地址,如代码清单1-11所示。
代码清单1-11 消息发送示例程序
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET)/* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
第二步:运行示例程序,查看运行结果,如果输出结果如代码清单1-12所示,则表示消息发送成功。
代码清单1-12 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8006606EC18B4AAC24BC584450000, offsetMsgId=C0A8290100002A9F00000000000000B2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
第三步:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者的NameServer地址,如代码清单1-13所示。
代码清单1-13 消息消费示例程序
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
第四步:运行消息消费者程序,如果输出如代码清单1-14所示消息,则表示消息消费成功。
代码清单1-14 消息消费结果
Consumer Started. ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1529053736201, bornHost=/192.168.41.1:50331, storeTimestamp=1529053736210, storeHost=/192.168.41.1:10911, msgId=C0A8290100002A9F0000000000000164, commitLogOffset=356, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1529053736226, UNIQ_KEY=C0A800662C8C18B4AAC24BC70D080000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,说明RocketMQ调试环境已搭建成功。
[1]GitHub代码库链接:https://github.com/apache/rocketmq.git。