1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

消息中间件RocketMQ

RocketMQ的前世今生

RocketMQ是一款阿里巴巴开源的消息中间件,在2017年9月份成为Apache的顶级项目,是国内首个互联网中间件在 Apache 上的顶级项目。

RocketMQ的起源受到另一款消息中间件Kafka的启发。最初,淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容。为了进一步降低成本和提升写入性能,需要在存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,被Kafka无限消息堆积,高效的持久化速度所吸引。

不过当时Kafka主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,例如:延迟消息,消费重试,事务消息,消息过滤等,这些都是一些企业级消息中间件需要具备的功能。为此,淘宝中间件团队重新用Java语言编写了RocketMQ,定位于非日志的可靠消息传输。不过随着RocketMQ的演进,现在也支持了日志流式处理。

目前RocketMQ经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。

RocketMQ整个产品发展历史主要经历以下几个阶段:

消息中间件RocketMQ

阶段1:Metaq(Metamorphosis) 1.x

由开源社区 killme2008 维护,开源社区地址:https://github.com/killme2008/Metamorphosis,最后一次更新是在2017年1月份。

阶段2:Metaq 2.x

于 2012 年 10 月份上线,在淘宝内部被广泛使用。

阶段3:RocketMQ 3.x

基于阿里内部开源共建原则, RocketMQ 项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个 BU 的个性化需求都在 RocketMQ 项目之上进行深度定制。 RocketMQ 向其他 BU 提供的仅仅是Jar 包,例如要定制一个 Broker,那么只需要依赖 rocketmq-broker 这个 jar 包即可,可通过 API 迕行交互,如果定制 client,则依赖 rocketmq-client 这个 jar 包,对其提供的 api 进行再封装。开源社区地址:https://github.com/alibaba/RocketMQ,目前已无法访问。

阶段4:进入Apache

2016年11月28日,阿里巴巴向 Apache 软件基金会捐赠消息中间件 RocketMQ,成为 Apache 孵化项目。美国时间 2017 年 9 月 25 日,Apache 软件基金会(ASF)宣布 Apache®RocketMQ™ 已孵化成为 Apache 顶级项目(TLP ),是国内首个互联网中间件在 Apache 上的顶级项目。官网地址:http://rocketmq.apache.org/

与其他MQ对比

目前业界还有很多其他MQ,如Kafka、RabbitMQ、ActiveMQ、Apache Pulsar等。下图列出了全球范围内这些MQ在2018.12~2019.12一年时间内,在Google Trends的搜索频率,某种程度可以反映出这些中间件的火爆程度。

消息中间件RocketMQ

从这张图上,我们可以看出来,Kafka是一枝独秀,RabbmitMQ紧随其后,ActiveMQ和Apache Pulsar也有一定的占比。而RocketMQ的搜索量可以说是微乎其微。

对于除了RocketMQ的其他几个MQ产品,可以根据这张图初步对比下流行程度。但是对于RocketMQ必须排除在外,因为一些原因,很多国内的用户无法通过Google进行搜索,因此关于RocketMQ的统计实际上是不准确的。

而在这里,我们主要对比的是RocketMQ与其他MQ有哪些功能特性上的差异。功能特性,主要取决于产品定位,如Kafka定位于高吞吐的流失日志和实时计算场景;ActiveMQ、RabbitMQ等则定位于企业级消息中间件,因此提供了很多企业开发时非常有用的功能,如延迟消息、事务消息、消息重试、消息过滤等,而这些特性Kafka都不具备,但是这类产品的吞吐量要明显的低于Kafka。

RocketMQ则是结合了Kafka和ActiveMQ、RabbitMQ的特性。在性能上,可以与Kafka抗衡;而在企业级MQ的特性上,则具备了很多ActiveMQ、RabbitMQ提供的特性。因此,企业在选择消息中间件选型时,RocketMQ是非常值得考虑的一款产品。

为什么要使用消息中间件

对于很多消息中间件的初学者,一个比较困惑的地方在于,为什么要使用消息中间件。本文通过介绍消息中间件的两大特性:异步结构、流量削峰,来对这个问题进行解答。

异步解耦

当今微服务架构盛行,从传统的巨石应用拆分为多个微服务。在进行服务化之后,网页或者手机客户端发起的业务请求需要将后端不同的服务进行组合调用来实现业务处理。在《企业IT架构转型》一书中提到,目前淘宝的订单创建流程需要调用超过200个服务,如下图所示:

消息中间件RocketMQ

如果所有业务逻辑均是在一个进程中顺序执行的方式,完成超过200次的远程服务调用,就算所有服务的调用时间都控制在20ms内返回,整个订单的创建时间也会超过4s,这个时间长度对于今天互联网时代的用户来说已经超过了忍耐极限。

显然我们可以想到将交易创建流程进行异步化,对于有严格先后调用的关系的服务保持顺序执行,对于所有能够异步执行的服务都进行异步化处理。进行异步化处理时,将需要发送的消息投递到消息中间件中,再有消息中间件的消费端进行处理。而消息发送方将消息发送到消息中间件中,即可返回,无需等待消费者消费完成。下图展示了淘宝交易流程异步化后的处理示意图:

消息中间件RocketMQ

通过异步化,可以极大的简化了整个交易流程创建的耗时,提升用户的体验。而异步化可能带来的数据不一致问题,则可以通过其他技术手段来保证最终一致,不在本文讲述范畴内。

流量削峰

互联网公司通常都会举办很多线上活动,活动开始时会有大量的用户涌入进行访问,例如大家熟悉的双十一秒杀,又或者春节火车票抢购,一趟列车出可以购票时,会有大量的用户同一时间去抢。对于这些场景,业务系统瞬时要承受的压力可能是平时的几十甚至上百倍,超过系统的最大处理能力,如果不能很好的处理这些流量,那么就很有可能会打挂下游数据库,导致业务系统瘫痪,不能正常提供服务。

此时可以利用消息中间件进行流量削峰,如下图所示:

消息中间件RocketMQ

对于流量洪峰,可以将用户的请求包装成一个消息,投递到消息中间件中,之后异步的进行处理,下游业务系统根据自己多大的处理能力,进行匀速的消费,从而达到削峰填谷到的作用。可以看到,在削峰之前,流量有明显的波峰和波谷,但是在削峰之后,流量可以允许的进行处理。因此流量削峰还有另外一个比较洋气的名字,称之为流量整形。

消息投递模式

消息中间件( Message Oriented Middleware,简称MOM)在企业开发中变得越来越重要。本文介绍消息中间件中的四种消息投递模型,主要是介绍模型的核心特性,以及不同模型之前的区别。这四种模型分别是:

  • PTP模型
  • Pub/Sub模型
  • Partition模型
  • Transfer模型(笔者自己起的名字)

其中PTP模型和Pub/Sub模型在JMS(Java Message Service)规范中有定义,消息中间件ActiveMQ就实现了JMS规范。JMS规范旨在于为消息中间件厂商提供了一个规范,使得Java应用可以更加容易的访问不同的消息中间件产品,类似于我们可以通过JDBC规范定义的相关接口不同厂商的数据库(Mysql/Oracle/SqlServer等)产品。然而一些消息中间件,并没有实现JMS规范,而是自己设计出了一套模型,例如Kafka和RocketMQ就采用了Partition模型。此外业界还有一些其他的消息投递模型,例如Transfer模型,这是笔者自己起的名字。

PTP模型

Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。

消息中间件RocketMQ

需要注意的是,尽管这里使用Queue的概念,但并不是先进入队列消息,一定会被先消费。在存在多个下游Consumer情况下,一些消息中间件,例如ActiveMQ,为了提升消费能力,会将队列中的消息分发到不同Consumer并行进行处理。这意味着消息发送的时候可能是有序的,但是在消费的时候,就变成无序了。

为了保证消费的有序,一些MQ提供了"专有消费者”或者"排他消费者”的概念,在这种情况下,队列中的消息仅允许一个消费者进行消费,如果存在多个消费者,那么从中选择一个。但是,这意味着该消息在处理中没有了并行性。如果消息量很多的情况下,将会产生消息积压。

为了解决"专有消费者”的性能问题,一些消息中间件采用分区的概念来解决性能问题,我们将在后文进行介绍。

Pub/Sub模型

publish-and- subscribe, 即发布订阅模型。在Pub/Sub模型中,生产者将消息发布到一个主题(Topic)中,订阅了该Topic的所有下游消费者,都可以接收到这条消息。如下图:

消息中间件RocketMQ

通常情况下,一条消息只要被消费一次就行了,那么什么情况下需要所有的消费者都对这条消息进行消费呢?最典型的情况就是需要在内存中对数据进行缓存,并需要实时进行更新。

例如,笔者做过一个违禁词系统,对用户输入的评论内容进行违禁词汇检测。这个违禁词系统,部署了在N台服务器上,为了提升检测性能,每台机器都会将违禁词系统的词库全量加载内存中,之后检测时,对于用户的评论内容进行分词,判断分词后的内容有没有出现在违禁词词汇中。因为词库中不可能包含所有的敏感词,所以还要支持动态的添加敏感词。这个时候,Pub/Sub模型就发挥作用了,每个机器上都启动一个consumer,当生产者发送一个敏感词到Topic中,所有的消费者都会接受到这些消息,更新敏感词库。

Partition模型

为了解决在PTP模型下,有序消息需要通过"专有消费者”消费带来的性能问题,一些消息中间件,如rocketmq,kafka采用了Partition模型,即分区模型,如下所示:

消息中间件RocketMQ

生产者发送消息到某个Topic中时,最终选择其中一个Partition进行发送。你可以将Parition模型中的分区,理解为PTP模型的队列,不同的是,PTP模型中的队列存储的是所有的消息,而每个Partition只会存储部分数据。

对于消息者,此时多了一个消费者组的概念,Paritition会在同一个消费者组下的消费者中进行分配,每个消费者只会消费分配给自己的Paritition。例如上图中,consumer 1分配到了parititon1,consumer 2分配到了parititon2和parititon3,consumer N分配到了paritition 4。

通过这种方式,Paritition模式巧妙的将PTP模型和Pub/Sub模型结合在了一起。

对于PTP模型,一条消息只会由一个消费者进行消费,而Partition模型中每个分区最终也只会有一个消费者进行消费。对于通过"专有消费者"来保证全局消费有序的场景,在Partition模型中,只需保证创建的Topic只有一个Partition即可,这个Paritition最终也只会分配其中一个消费者。

另外,在绝大部分场景下,我们没有必要保证全局有序,例如一个订单产生了3条消息,分别是订单创建,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。但是订单之间是可以并行消费的,例如将订单1产生的3条消息发送到Partiton 1,将订单2产生的3条消息发送到Partition 2,如此便达到了不同订单之间的并行消费。

对于Pub/Sub模型,一条消息所有的下游消费者都可以进行消费。在Paritition模型中,只需要为每个消费者设置成不同的消费者组即可。

然而,过多的消费者组,会给消息中间件运维带来麻烦。所以一些消息中间件,结合了Partition模型和Pub/Sub模型。例例如RocketMQ,支持为消费者组设置消费模式,如果是集群模式,就按照上述描述进行消费,如果是广播模式,就按照Pub/Sub模型进行消费。

当然,Partition模型也不全是优点,其最大的限制在于Partition数量是固定的(虽然可以调整),针对同一个消费者组,一个Partition最多只可以分配给其中一个消费者。当消费者的数量大于Partition数量时,这些多出来的消费者将无法消费到消息。一些消息中间件对此进行了优化,即在对单个消费者内,同时启动多个线程,来消费这个Partition中的数据,当然前提是要求消息不是有序的,对于有序的消息,只能使用一个线程按顺序消费这个Partition中的数据。

Transfer模型

Paritition模型中的消费者组概念很有用,同一个Topic下的消息可以由多个不同业务方进行消费,只要使用不同的消费者组即可,不同消费者组消费到的位置单独记录,互不影响。 但是,Paritition模型还是限制了消费者数量不能多于分区数。

因此,又有了另外一种消费模型,笔者称之为Transfer模型,如下图所示:

消息中间件RocketMQ

生产者还是将消息发送到Topic中,针对一个Topic,可以创建多个通道,笔者称之为channel。与分区不同的是,发送到Topic中的每条消息,都会转发到每个channel,因此每个channel都有这个Topic的全量数据。当然,没有必要把真的把消息体完整的拷贝一份到channel中,可以只记录一下消息元数据,表示有一条放到这个channel中了。

消费者在消费消息时,必须指定从哪个channel消费。多个消费者消费同一个channel时,每条消息只会有一个消费者消费达到,这一点与PTP模型类似。事实上,我们可以认为,消费了同一个channel的消费者,就自动组成了一个消费者组。但是,与Partition模型不同的是,这里没有分区的概念,因此消费者的数量可以是任意的。事实上,GO语言编写的NSQ消息中间件,采用的就是这种模型。

当然,这种模型与PTP一样,也不能保证本消息有序,除非通过类似于”专用消费者”的概念。

集群搭建

首先会对rocketmq集群四种部署模式进行介绍,包括:单主模式,多master模式,多master多slave模式异步复制,多master多slave模式同步复制,对比各种模式的优缺点。接着将每一种模式部署成一个集群,因此总共有4个集群,由一个NameServer集群进行管理。最后会介绍常见部署错误问题的解决方案。

部署模式介绍

一个完整的RocketMQ集群由NameServer集群Broker集群Producer集群Consumer集群组成。本节主要介绍NameServer集群,Broker集群的搭建。

一个NameServer集群可以管理多个Broker集群,每个Broker集群由多组broker复制组构成,多个broker复制组通过指定相同的集群名称,来构成一个Broker集群。

具体来说,每个broker复制组都满足以下几点:

  • 包含一个master节点,零个或者多个slave节点,且这些节点需要指定相同的broker名称;不同的broker复制组的broker名称必须不同。
  • master和slave通过brokerId参数进行区分。master的 brokerId参数必须是 0,slave 的 brokerId 必须是大于0的整数,如果有多个slave,这些slave的brokerId需要指定为不同的值。
  • master可读可写,slave只可以读,master通过主从复制的方式将数据同步给slave,支持同步复制和异步复制两种复制方式,目前master宕机后,slave不能自动切换为master

基于Broker复制组的特性,一个Broker集群通常有多种部署方式:

1. 单个 Master

集群中只有一个broker复制组,且只包含一个master节点。这种方式部署风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,通常是开发调试时使用,不建议线上环境使用

2. 多 Master 模式

集群中有多个broker复制组,且都只有master节点,没有slave节点。例如 2 个 master 或者 3 个 master节点。

优点: 配置简单,单个 Master 宕机或重启维护对应用无影响,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

缺点: 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

3. 多 Master 多 Slave 模式,异步复制

集群中有多个broker复制组,且每个复制组都有master节点,也有slave节点。例如:每个 master 配置一个 slave。HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。

优点: 即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以 从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。

缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。

4. 多 Master 多 Slave 模式,同步复制

与第三种方式类似,不同的是,HA 采用同步复制,生产者发送发送消息时,只有再主备都写成功,才向应用返回成功。

优点: 数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

缺点: 性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。

接下来,笔者将将演示在Linux操作系统中如何搭建一个单节点NameServer集群,以及上述四种Broker集群,并由这个单节点的NameServer集群来管理这四个Broker集群。

注意:在实际生产环境中,NameServer以及每个Broker节点(不管是master还是slave),都是部署在不同的机器上的。这里简单起见,将通过伪分布式的方式进行搭建,即所有节点都运行在一台机器上。如果读者希望搭建完整的分布式集群,可以使用vmvare/virtualbox等工具,只需要将本文的配置拷贝即可。

前提条件

wRocketMQ NameServer和Broker是基于Java 开发的,需要安装JDK,且需要保证二者版本的匹配。下图列出安装/运行RocketMQ需要的JDK版本。

消息中间件RocketMQ

本文以RocketMQ 4.6.0版本为例进行讲解,对应JDK版本为1.8。本文不讲解JDK如何安装,读者可自行查阅相关资料。确保JDK的版本>=1.8,可以通过如下方式验证:

$ java -versionjava version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode

下载安装

下载

该地址列出了RocketMQ所有发布的版本:https://github.com/apache/rocketmq/releases

这里将RocketMQ安装到Linux文件系统的/opt目录,首先进入/opt目录

cd /opt

可以直接从github下载,但是网速较慢

$ wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.6.0.zip

网速慢的同学也可以从国内镜像下载:

$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip

下载之后进行解压缩:

$ unzip rocketmq-all-4.6.0-bin-release.zip

解压目录说明

rocketmq-all-4.6.0-bin-release ├── benchmark #性能测试脚本 ├── bin       #命令行工具 ├── conf      #配置文件目录 ├── lib       #依赖的第三方类库 ├── LICENSE ├── NOTICE └── README.md

设置ROKCKET_HOME环境变量:

在这里,将我们将RocketMQ安装包的解压目录设置为ROCKETMQ_HOME环境变量。例如笔者的解压目录为:

$ pwd/opt/rocketmq-all-4.6.0-bin-release

为了以后升级方便,我们创建一个软连接:

sudo ln -s /opt/rocketmq-all-4.6.0-bin-release rocketmq

修改/etc/profile,添加以下一行:

export ROCKETMQ_HOME=/opt/rocketmq

执行以下命令,使得环境变量生效

source /etc/profile

验证环境变量生效:

$ echo $ROCKETMQ_HOME /opt/rocketmq

启动NameServer

启动

$ nohup sh bin/mqnamesrv &

验证启动成功

$ jps -l 3961157 sun.tools.jps.Jps 3953057 org.apache.rocketmq.namesrv.NamesrvStartup #NameServer进程

NameServer默认监听9876端口,也可以通过如下方式验证:

$ lsof -iTCP -nP | grep 9876 java    3953057 tianshouzhi.robin   65u  IPv6 134849198      0t0  TCP *:9876 (LISTEN)

设置NAMESRV_ADDR环境变量,修改etc/profile,添加以下内容:

export NAMESRV_ADDR=localhost:9876

并执行"source /etc/profile"使得其生效

启动Broker

${ROCKETMQ_HOME}/conf目录下,提供了我们讲解到的RocketMQ四种部署模式的demo配置文件,如下所示:

    conf     ├── 2m-2s-async //多Master多Slave模式,异步复制     │   ├── broker-a.properties     │   ├── broker-a-s.properties     │   ├── broker-b.properties     │   └── broker-b-s.properties     ├── 2m-2s-sync //多Master多Slave 模式,同步复制     │   ├── broker-a.properties     │   ├── broker-a-s.properties     │   ├── broker-b.properties     │   └── broker-b-s.properties     ├── 2m-noslave //多Master模式     │   ├── broker-a.properties     │   ├── broker-b.properties     │   └── broker-trace.properties     └── broker.conf //单Master模式

在实际生产环境中,你可以选择其中一种模式进行部署。从学习的角度,笔者将详细讲解每一种模式,每种模式部署为一个集群,因此总共会部署4个集群。

另外,生产环境中至少需要部署为双主模式,每个机器只会部署一个broker,因此只使用broker.conf配置文件即可,根据要配置的节点的类型,将其他模式下的配置复制到broker.conf,或者直接修改broker.conf。

单Master模式

修改配置文件:

单master模式可以使用conf目录下的broker.conf 配置文件,内容如下所示:

    #集群名称     brokerClusterName=single-master     #broker复制组名称     brokerName=broker-a     #nameserver地址     namesrvAddr=127.0.0.1:9876     #brokerId,因为是master节点,所以这里设置为0     brokerId=0     #监听端口     listenPort=10911     #rocketmq定时清除     deleteWhen=04     #文件保留时间,默认48小时     fileReservedTime=48     #broker角色,异步复制     brokerRole=ASYNC_MASTER     #异步刷盘     flushDiskType=ASYNC_FLUSH     #存储目录     storePathRootDir=/data/rocketmq/single-master/broker-a/store     storePathCommitLog=/data/rocketmq/single-master/broker-a/store/commitlog

注意:如果配置项名称或者值写错,broker启动时并不会报错,会使用默认值替代,常见错误:如在=号两边加了空格,这里是不需要的。

启动通过bin目录下的mqbroker脚本。由于默认的配置,启动后会立即占用8G内存,如果机器内存不够,可以修改bin/runbroker.sh,找到以下这一行:

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

将其修改为:

JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"

启动:

$ nohup sh bin/mqbroker -c conf/broker.conf &

注意:broker启动时不会读取broker.conf中的配置,尽管也可以启动,但是如果需要使得配置文件生效,必须通过-c参数进行指定。

验证启动成功:

    $ jps -l     3961157 sun.tools.jps.Jps     3960977 org.apache.rocketmq.broker.BrokerStartup     3953057 org.apache.rocketmq.namesrv.NamesrvStartup

NameServer默认监听在10911端口,也可以通过以下方式验证:

    $ lsof -iTCP -nP | grep 10911     java    37686 tianshouzhi.robin  107u  IPv6 137040246      0t0  TCP *:10911 (LISTEN)

如果启动失败,可以通过以下命令查看错误的具体信息:

tail -200f ~/logs/rocketmqlogs/broker.log

测试发送/消费消息

安装包bin目录下提供了一个tools.sh工具,我们可以通过其来测试发送/接收消息。

测试发送消息:

执行以下命令将会往一个名为TopicTest主题中发送1000条消息

    $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer      SendResult [sendStatus=SEND_OK, msgId=FDBDDC0300FF00010001022700120225003C3D4EAC696720298203E7,      offsetMsgId=AC11000100002A9F0000000000037567,      messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3],      wqueueOffset=249]     ...

测试消费消息:

执行以下命令,将会之前的消费1000条消息

    $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer     ConsumeMessageThread_%d Receive New Messages: [MessageExt…      ...

这里我们是通过命令行工具来发送/消费消息,在后文中,我们将介绍如何通过API的方式来完成相同的功能。

查看集群列表信息:

    $ sh bin/mqadmin clusterList -n localhost:9876     #Cluster Name    #Broker Name   #BID        #Addr           #Version   #...(略)     single-master    broker-a      0        192.168.1.3:10911     V4_6_0    

输出的每一列说明如下:

  • Cluster Name:集群的名称,即brokerClusterName配置项的值
  • Broker Name:Broker的名称,即brokerName配置项的值
  • BID:Broker的ID,这里显示为0,即brokerId配置项的值
  • Addr:监听的IP/端口,供生产者/消费者访问,端口即listenPort配置项的值
  • Version:broker的版本

多Master模式

这里演示的多master模式是双主模式:包含2个master节点,没有slave节点。如前所述,这里是伪分布式,在一台机器上启动两个master节点。我们需要对conf/2m-noslave目录下的2个配置文件进行一些修改,否则会与前面搭建的单master模式存在一些冲突,如监听的端口和保存数据的路径等。

修改后的结果如下所示:

conf/2m-noslave/broker-a.properties

    brokerClusterName=2m-noslave     listenPort=11911     namesrvAddr=127.0.0.1:9876     brokerName=2m-broker-a     brokerId=0     deleteWhen=04     fileReservedTime=48     brokerRole=ASYNC_MASTER     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-noslave/broker-a/store/     storePathCommitLog=/data/rocketmq/2m-noslave/broker-a/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-noslave/broker-a/store/consumequeue/

conf/2m-noslave/broker-b.properties

    brokerClusterName=2m-noslave     listenPort=12911     namesrvAddr=127.0.0.1:9876     brokerName=2m-broker-b     brokerId=0     deleteWhen=04     fileReservedTime=48     brokerRole=ASYNC_MASTER     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-noslave/broker-b/store/     storePathCommitLog=/data/rocketmq/2m-noslave/broker-b/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-noslave/broker-b/store/consumequeue/

在这里,我们将两个配置文件中的brokerClusterName都改成了2m-noslave,表名这两个broker节点将组成一个新的集群。也别修改了listenPort配置项以监听不同的端口,此外,我们修改了三个storePath前缀的配置项,将数据存储到不同的目录中。

特别需要注意的是:一些同学可能认为brokerClusterName已经不同了,没有必要修改brokerName配置项,这是一种误解。在RocketMQ中,一个NameServer集群可以多个Broker集群,但是broker集群的名称并没有起到命名空间的作用,因此管理的所有Broker集群下的broker复制组的名称都不能相同。

启动broker-a

nohup sh bin/mqbroker -c conf/2m-noslave/broker-a.properties &

启动broker-b

nohup sh bin/mqbroker -c conf/2m-noslave/broker-b.properties &

在启动之后,当我们在查看集群列表信息时,如下:

消息中间件RocketMQ

这里显示了2个broker集群:single-master和2m-noslave,其中后者存在两个节点。

3.2.3 多 Master 多 Slave 模式,异步复制

该模式需要使用conf/2m-2s-async目录下的四个配置文件。同样我们需要修改brokerClusterName,listenPort,brokerName以及存储路径。特别需要注意的是对于slave,其brokerRole配置项需要为SLAVE,brokerId是需要时一个大于0的值。

修改后的结果如下所示:

conf/2m-2s-async/broker-a.properties

    brokerClusterName=2m-2s-async     listenPort=13911     namesrvAddr=127.0.0.1:9876     brokerName=2m-2s-async-broker-a     brokerId=0     deleteWhen=04     fileReservedTime=48     brokerRole=ASYNC_MASTER     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-2s-async/broker-a-0/store/     storePathCommitLog=/data/rocketmq/2m-2s-async/broker-a-0/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-a-0/store/consumequeue/

conf/2m-2s-async/broker-a-s.properties

    brokerClusterName=2m-2s-async     listenPort=14911     namesrvAddr=127.0.0.1:9876     brokerName=2m-2s-async-broker-a     brokerId=1     deleteWhen=04     fileReservedTime=48     brokerRole=SLAVE     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-2s-async/broker-a-1/store/     storePathCommitLog=/data/rocketmq/2m-2s-async/broker-a-1/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-a-1/store/consumequeue/

conf/2m-2s-async/broker-b.properties

    brokerClusterName=2m-2s-async     listenPort=15911     namesrvAddr=127.0.0.1:9876     brokerName=2m-2s-async-broker-b     brokerId=0     deleteWhen=04     fileReservedTime=48     brokerRole=ASYNC_MASTER     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-2s-async/broker-b-0/store/     storePathCommitLog=/data/rocketmq/2m-2s-async/broker-b-0/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-b-0/store/consumequeue/

conf/2m-2s-async/broker-b-s.properties

    brokerClusterName=2m-2s-async     listenPort=16911     namesrvAddr=127.0.0.1:9876     brokerName=2m-2s-async-broker-b     brokerId=1     deleteWhen=04     fileReservedTime=48     brokerRole=SLAVE     flushDiskType=ASYNC_FLUSH     storePathRootDir=/data/rocketmq/2m-2s-async/broker-b-1/store/     storePathCommitLog=/data/rocketmq/2m-2s-async/broker-b-1/store/commitlog/     storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-b-1/store/consumequeue/

依次启动:

消息中间件RocketMQ

查看集群信息:

消息中间件RocketMQ

这里多出了2m-2s-async集群的四个broker节点信息。

多 Master 多 Slave 模式,同步复制

该模式需要使用conf/2m-2s-sync目录下的四个配置文件,与异步复制最大的不同是,需要将master节点的brokerRole配置项需要改为SYNC_MASTER。这里不再赘述。如果是在同一台机器上搭建此模式,记得修对应的参数。

停止

bin目录安装包下有一个mqshutdown脚本,其既可以关闭Broker,也可以关闭NameServer。注意该脚本会将本机上启动的所有Broker或所有NameServer关闭。

停止broker

    $ sh bin/mqshutdown broker     The mqbroker(67521     74023     74153     362837     362958     363070) is running...     Send shutdown request to mqbroker(67521     74023     74153     362837     362958     363070)

停止nameserver

    $ sh bin/mqshutdown namesrv     The mqnamesrv(3953057) is running...     Send shutdown request to mqnamesrv(3953057) OK

常见安装错误

错误1:端口已被占用

    java.net.BindException: Address already in use             at sun.nio.ch.Net.bind0(Native Method)             at sun.nio.ch.Net.bind(Net.java:433)             at sun.nio.ch.Net.bind(Net.java:425)             at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)             at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)             at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)             at

原因:重复监听了同一个端口,通常是对同一个配置文件启动了多次,或者配置listenPort端口未生效。

错误2:MQ已启动

    java.lang.RuntimeException: Lock failed,MQ already started             at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:222)             at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:853)             at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:64)             at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)

原因:多个配置文件中,可能指定了相同的存储路径,检查配置是否正确。

错误3:配置文件不存在

ava.io.FileNotFoundException: conf/2m-2s-async/broker-a-m.properties (No such file or directory)         at java.io.FileInputStream.open0(Native Method)         at java.io.FileInputStream.open(FileInputStream.java:195)         at java.io.FileInputStream.<init>(FileInputStream.java:138)         at java.io.FileInputStream.<init>(FileInputStream.java:93)         at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:128)         at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)

配置文件不存在,检查对应目录下是否有此文件

错误4:内存分配失败

未按照前文所述修改bin/runserver.sh,bin/runbroker.sh脚本,导致启动每一个节点时占用内存过多。如果本身机器内存就不足,可以不必同时运行这么多模式。