RocketMQ、Kafka 高性能架构设计对比
消息存储架构
RocketMQ
存储结构:RocketMQ 的消息存储主要基于文件系统。它有 CommitLog 文件用于存储消息的主体内容,消息按照顺序写入 CommitLog。同时,有 ConsumeQueue 文件用于存储消息的索引信息,消费者通过 ConsumeQueue 来快速定位消息在 CommitLog 中的位置。这种存储方式可以保证消息的顺序写入,提高写入性能。
存储策略:采用异步刷盘和同步刷盘两种策略。异步刷盘是指消息写入内存后就返回成功给生产者,然后由后台线程将内存中的消息刷写到磁盘,这种方式可以提供高吞吐量;同步刷盘则是消息写入磁盘成功后才返回成功给生产者,数据安全性更高。
Kafka
存储结构:Kafka 将消息存储在分区(Partition)中,每个分区是一个有序的、不可变的消息序列。消息以日志(Log)的形式存储在磁盘上,日志被分割成多个日志段(Log Segment)。这种分区存储方式方便水平扩展和数据的并行处理。
存储策略:Kafka 采用基于时间和大小的日志段滚动策略。当达到一定的时间间隔或者日志段大小后,会创建新的日志段。并且,Kafka 通过零拷贝(Zero - Copy)技术将消息从磁盘文件直接发送到网络通道,减少了数据复制的开销,提高了性能。
消息传输机制
RocketMQ
消息模型:支持多种消息模型,包括发布 / 订阅(Publish/Subscribe)和点对点(Point - to - Point)模式。在发布 / 订阅模式下,生产者将消息发送到主题(Topic),多个消费者可以订阅该主题获取消息;在点对点模式下,消息会被发送到一个队列,只有一个消费者能消费该消息。
传输协议:采用自定义的协议来传输消息,在网络传输过程中,消息会被封装成特定的格式进行发送和接收。并且 RocketMQ 支持同步和异步两种发送方式,异步发送可以提高生产者的吞吐量。
Kafka
消息模型:主要是发布 / 订阅模式。生产者将消息发送到主题的特定分区,消费者从分区中读取消息。消费者可以通过组(Group)的方式来实现消息的负载均衡和广播消费。
传输协议:使用基于 TCP/IP 的二进制协议进行消息传输。它通过优化网络请求和响应的格式,减少网络开销。同时,Kafka 支持批量发送消息,通过将多个消息组合成一个批次发送,降低网络传输的频率,提高传输效率。
集群架构与扩展性
RocketMQ
集群模式:支持多种集群模式,包括单主模式、双主双从模式等。在双主双从模式下,通过主从复制来保证数据的冗余和高可用性。主节点负责处理读写请求,从节点用于数据备份。
扩展性:可以通过增加节点来扩展集群的处理能力。例如,增加 Broker 节点可以增加消息存储和处理的容量。并且,RocketMQ 的 Name Server 可以动态地发现新的 Broker,方便集群的扩展和管理。
Kafka
集群模式:Kafka 是分布式的消息队列,由多个 Broker 组成集群。每个 Broker 可以包含多个主题的分区,通过分区的分布在不同 Broker 上实现负载均衡和扩展性。
扩展性:具有很好的水平扩展性,通过增加 Broker 节点或者增加主题的分区数量来提升集群的处理能力。并且,Kafka 通过 Zookeeper 来管理集群的元数据,包括主题、分区、消费者组等信息,方便集群的协调和扩展。
消费模型与性能优化
RocketMQ
消费模型:消费者采用拉(Pull)和推(Push)相结合的方式获取消息。实际上是消费者主动拉取消息,但是 RocketMQ 的服务器可以根据消费者的消费能力进行消息推送,这样可以避免消费者被过多的消息淹没。消费者可以通过设置不同的消费策略,如集群消费(多个消费者共同消费一个主题的消息)和广播消费(每个消费者都能收到主题的全部消息)。
性能优化:可以通过调整消费者的并发度、消息批量拉取的数量等方式来优化消费性能。例如,增加消费者的并发度可以加快消息的消费速度。
Kafka
消费模型:消费者也是以拉取消息为主,通过维护消费偏移量(Offset)来记录已经消费的消息位置。消费者组内的成员可以共同消费主题的分区消息,实现负载均衡。
性能优化:可以通过调整消费者组的大小、分区数量等方式来优化性能。例如,增加分区数量可以提高并行度,加快消息的消费速度,同时合理设置消费者组的大小可以避免消费者空闲或者过度繁忙。
RocketMQ 和 Kafka 都是广泛使用的开源消息中间件,它们在设计上都追求高性能、高可用性和可扩展性。下面是 RocketMQ 和 Kafka 在架构设计上的几个关键方面的对比:
1. 架构模型
Kafka:
使用发布/订阅模式。
每个主题可以有多个分区(partition),每个分区可以有多个副本(replica)以实现高可用。
生产者将消息发送到特定的分区,消费者通过消费组(consumer group)来消费消息。
RocketMQ:
支持发布/订阅模式和点对点模式。
每个主题也可以有多个队列(queue),这些队列分布在不同的 Broker 上。
消息被持久化到 CommitLog 文件中,并且可以通过 ConsumerQueue 进行快速定位。
2. 数据存储
Kafka:
消息是以追加的方式写入日志文件,采用顺序写磁盘的方式,保证了较高的吞吐量。
分区的日志文件保存在本地文件系统中,支持数据压缩。
RocketMQ:
消息也是以追加的方式写入 CommitLog 文件,同样使用顺序写来提高性能。
提供了多种存储级别:内存映射文件、文件系统等。
支持异步刷盘机制,可以在一定程度上减少 I/O 延迟。
3. 高可用性
Kafka:
通过分区副本(replication)来实现高可用性。
使用 ZooKeeper 来管理集群元数据和选举 Leader。
可配置 ISR(in-sync replicas)集合确保数据的一致性。
RocketMQ:
通过主从复制(Master-Slave)架构来提供高可用性。
主节点宕机时,从节点可以自动切换为主节点。
名称服务器(Name Server)负责管理和广播路由信息。
4. 扩展性
Kafka:
良好的水平扩展能力,可以通过增加分区数量和Broker数量来提升处理能力。
消费者可以并行地从不同的分区读取数据。
RocketMQ:
也具有良好的水平扩展性,可以通过添加更多的 Broker 或者扩大单个 Broker 的容量来扩展。
消费者可以同时从多个队列拉取消息,从而实现并行消费。
5. 性能表现
Kafka:
在大规模的数据流场景下表现出色。
特别适合于需要高速度和大数据量的消息传递。
RocketMQ:
在延迟敏感的应用场景中表现出色。
设计时考虑到了金融级的可靠性要求,如事务消息、定时消息等功能。
6. 其他特性
Kafka:
提供了丰富的API和客户端库。
支持流处理框架,比如Kafka Streams。
RocketMQ:
提供了消息过滤、延时消息、顺序消息等高级功能。
对分布式事务的支持更为完善。
综上所述,RocketMQ 和 Kafka 都是非常强大的消息中间件,但各自有着不同的优势。选择哪一个取决于具体的业务需求和技术偏好。例如,如果你的应用需要实时处理大量流式数据,那么 Kafka 可能是一个更好的选择;而如果需要一个更加灵活的消息平台,能够很好地支持复杂的业务逻辑,那么 RocketMQ 可能更适合。
01 总体概述
我们都知道,Kafka 与 RocketMQ 是当下业界最主流的两款消息中间件,都可以支持「超高并发」、「超高性能发」、「高可靠」能力,下面是两者的功能对比。
acl是什么
定义
ACL 是 “Access Control List” 的缩写,即访问控制列表。它是一种基于包过滤的访问控制技术,用于控制网络流量和系统资源的访问权限。通过 ACL,可以根据源 IP 地址、目的 IP 地址、端口号、协议类型等条件来允许或禁止特定的网络连接或操作。
工作原理
规则匹配
ACL 包含一系列的规则,这些规则按照一定的顺序排列。当有网络请求或操作发生时,系统会将请求的相关参数(如 IP 地址、端口等)与 ACL 中的规则进行逐一匹配。例如,在一个网络防火墙的 ACL 中,对于一个传入的 TCP 连接请求,会先检查请求的源 IP 地址和目的端口号是否符合 ACL 中的某条规则。
决策执行
根据规则匹配的结果,做出允许或禁止的决策。如果请求符合某条允许规则,那么就允许该请求通过;如果请求符合禁止规则,或者没有符合任何规则(在默认禁止的情况下),则禁止该请求。例如,在企业内部网络的边界路由器上设置 ACL,禁止外部网络对内部某些敏感服务器端口(如财务系统的数据库端口)的访问。
应用场景
网络安全领域
防火墙应用:在防火墙中,ACL 是核心的安全策略配置工具。可以通过设置 ACL 规则来区分合法的和非法的网络流量。例如,允许企业内部员工从特定网段访问互联网,但禁止外部未知 IP 访问企业内部的核心业务服务器。
入侵检测系统(IDS)辅助:IDS 可以利用 ACL 来识别和过滤一些明显的恶意攻击流量。例如,当检测到大量来自同一个外部 IP 地址对内部网络某服务端口的扫描请求时,根据预先设置的 ACL 规则,可以及时阻断这种可疑的流量,防止进一步的入侵行为。
系统资源访问控制
文件系统访问控制:在操作系统的文件系统中,ACL 可以用于控制用户或用户组对文件和目录的访问权限。与传统的基于用户、组和权限位(如 Unix 系统中的 rwx 权限)的访问控制不同,ACL 可以更精细地指定每个用户或用户组对特定文件或目录的访问权限。例如,在 Windows Server 系统中,通过设置文件或目录的 ACL,可以允许某个特定用户对某个文件有只读权限,同时禁止其他用户访问该文件。
数据库访问控制:在数据库管理系统中,ACL 可以用于控制用户对数据库对象(如表、视图、存储过程等)的访问权限。例如,在一个企业级数据库中,可以设置 ACL 规则,允许财务部门的用户查询和更新财务相关的表,而禁止其他部门的用户访问这些敏感数据。
ACL 是 "Access Control List"(访问控制列表)的缩写。它是一种用于管理用户或系统进程对文件、目录、网络资源等访问权限的安全机制。通过 ACL,管理员可以为不同的用户或用户组设置细粒度的访问权限,以确保只有授权的用户才能访问特定的资源。
ACL 的基本概念
主体:请求访问资源的实体,如用户、用户组或其他系统进程。
客体:被访问的资源,例如文件、目录、服务端口等。
权限:允许或禁止的操作类型,如读取(read)、写入(write)、执行(execute)等。
ACL 在不同环境中的应用
文件系统:
在 Unix/Linux 系统中,传统的权限模型只支持所有者(owner)、所属组(group)和其他(others)三类用户的权限设置。而使用 ACL 可以定义更复杂的权限规则。
Windows NTFS 文件系统也支持 ACL 来控制文件和目录的访问权限。
网络设备:
在路由器、交换机等网络设备上,ACL 用来过滤网络流量,根据源 IP 地址、目标 IP 地址、协议类型等条件来决定是否允许数据包通过。
应用程序和服务:
许多数据库系统、Web 服务器(如 Apache, Nginx)以及其他企业级软件都支持 ACL 来控制谁可以访问特定的数据或功能。
消息队列系统:
在消息中间件如 Kafka 或 RocketMQ 中,ACL 用于控制哪些生产者可以发送消息到某个主题,以及哪些消费者可以从某个主题消费消息。
示例
在 Linux 系统中,你可以使用 setfacl
命令来设置文件的 ACL:
# 设置用户 alice 对 /example/file.txt 有读权限
setfacl -m u:alice:r /example/file.txt
# 设置用户组 developers 对 /example/directory/ 有读写权限
setfacl -m g:developers:rw /example/directory/
在网络设备中,一个简单的 ACL 规则可能如下所示,它阻止了来自特定 IP 地址的所有流量:
access-list 100 deny ip 192.168.1.100 any
access-list 100 permit ip any any
以上只是 ACL 的基础介绍。在实际应用中,ACL 的配置可以根据具体的业务需求变得非常复杂,提供了极大的灵活性来保护系统的安全。
如何在Linux系统中查看ACL?如何在消息队列系统中使用ACL?如何保证ACL的安全性?
如何在 Linux 系统中查看 ACL
查看文件和目录的 ACL
在 Linux 系统中,可以使用
getfacl
命令来查看文件和目录的 ACL(访问控制列表)信息。例如,要查看文件example.txt
的 ACL,在终端中输入getfacl example.txt
。输出结果会显示文件的所有者、所属组、权限位以及 ACL 规则。ACL 规则会详细列出每个用户或用户组对该文件的额外访问权限。例如,
user:john:rwx
表示用户john
对该文件有读、写、执行权限。
查看系统范围内的 ACL 相关设置
对于一些与 ACL 相关的系统默认设置,可以查看文件系统挂载选项。通过查看
/etc/fstab
文件,找到文件系统挂载的相关行,检查是否有与 ACL 相关的挂载选项(如acl
选项)。如果文件系统是以支持 ACL 的方式挂载的,才能有效地使用和查看 ACL。
如何在消息队列系统中使用 ACL
定义用户和角色权限
在消息队列系统(如 RabbitMQ、RocketMQ 等)中,首先要定义用户和角色。例如,在 RabbitMQ 中,可以通过管理界面或者命令行工具创建用户,并且为每个用户分配不同的角色(如管理员、生产者、消费者等)。
根据用户角色定义 ACL 规则。对于生产者角色的用户,可以允许其向特定主题或队列发送消息;对于消费者角色的用户,可以允许其从特定主题或队列接收消息。例如,在 RocketMQ 中,可以通过配置文件或者管理命令来设置用户对主题的访问权限,如允许用户
producer_user
向主题order_topic
发送消息。
集成认证机制
消息队列系统通常需要与认证机制相结合来使用 ACL。可以使用外部的认证服务器(如 LDAP)或者内部的用户数据库进行用户认证。当用户连接到消息队列系统时,先进行身份认证,认证通过后再根据 ACL 规则来判断用户对消息队列资源的访问权限。
如何保证 ACL 的安全性
最小权限原则
遵循最小权限原则配置 ACL 规则。只给予用户和系统组件完成任务所需的最少访问权限。例如,在网络访问控制中,如果一个 Web 服务器只需要对外提供 HTTP 服务,那么在 ACL 中只允许外部访问其 80 端口(对于 HTTP),而禁止其他不必要的端口访问。
定期审查和更新 ACL 规则,确保没有过多的权限被授予。随着系统的变化和业务需求的更新,可能会出现一些不再需要的权限,及时发现并删除这些权限可以减少安全风险。
安全审计
建立 ACL 安全审计机制,记录所有 ACL 相关的访问请求和决策。通过分析这些审计日志,可以发现异常的访问模式或潜在的安全漏洞。例如,在数据库系统中,如果发现某个用户频繁地访问其本不应有访问权限的数据表,这可能是安全漏洞或者恶意行为的迹象。
与入侵检测系统(IDS)和入侵防御系统(IPS)相结合,利用它们来检测和防范基于 ACL 绕过等攻击手段。例如,当 IDS 检测到有试图篡改 ACL 规则的网络活动时,可以及时发出警报并采取防御措施。
ACL 主要有哪些类型
标准 ACL
主要基于源 IP 地址进行访问控制。例如,在网络路由器的 ACL 中,标准 ACL 可以用于允许或禁止来自某些特定 IP 网段的流量。标准 ACL 通常只检查源 IP 地址,相对比较简单,适用于一些对访问控制要求不高的场景。
扩展 ACL
扩展 ACL 在标准 ACL 的基础上,还考虑了目的 IP 地址、端口号、协议类型等多种因素。例如,在企业网络防火墙的 ACL 中,扩展 ACL 可以用于允许内部网络用户使用 HTTP 协议访问外部网站,但禁止使用 FTP 协议访问外部网络,这种 ACL 可以提供更精细的访问控制。
基于用户和组的 ACL(在操作系统和应用系统中)
在操作系统(如 Windows、Linux)和一些应用系统(如数据库管理系统)中,ACL 可以基于用户和用户组来定义访问权限。例如,在 Linux 系统中,通过 ACL 可以指定某个用户组对一个目录有读权限,而某个特定用户对该目录有写权限。
如何配置 ACL 规则
网络设备(路由器、防火墙)中的 ACL 配置
在网络设备中,通常使用命令行界面(CLI)来配置 ACL。例如,在 Cisco 路由器中,首先要进入全局配置模式,然后使用
access - list
命令来创建 ACL 规则。可以指定规则编号、访问控制类型(允许或禁止)、源 IP 地址、目的 IP 地址、端口号和协议类型等参数。配置完成后,需要将 ACL 应用到特定的接口(如以太网接口或串行接口)上,通过
interface
命令进入接口配置模式,然后使用ip access - group
命令将之前创建的 ACL 应用到该接口。
操作系统中的 ACL 配置
在 Windows 操作系统中,通过文件或文件夹的属性窗口中的 “安全” 选项卡来配置 ACL。可以添加或删除用户和用户组,并为其设置不同的访问权限(如读取、写入、执行等)。在 Linux 系统中,使用
setfacl
命令来配置文件和目录的 ACL。例如,setfacl -m u:user1:rwx file1
可以为用户user1
设置对文件file1
的读、写、执行权限。
应用系统(如数据库、消息队列)中的 ACL 配置
在数据库系统中,通过管理工具或 SQL 语句来配置 ACL。例如,在 Oracle 数据库中,可以使用
GRANT
和REVOKE
语句来授予和撤销用户对数据库对象的访问权限。在消息队列系统中,如前面提到的,通过管理界面、命令行工具或者配置文件来设置用户对主题、队列等资源的访问权限。
ACL 和防火墙有什么关系
防火墙是 ACL 的应用场景之一
防火墙利用 ACL 来实现网络访问控制的功能。防火墙中的 ACL 规则决定了哪些网络流量可以通过防火墙,哪些需要被阻止。例如,在网络边界防火墙中,ACL 规则可以基于源 IP 地址、目的 IP 地址、端口号和协议类型等因素来区分合法的和非法的网络流量。
ACL 是防火墙的核心技术组件,通过配置不同的 ACL 规则,可以将防火墙设置为允许内部网络访问外部网络、禁止外部网络访问内部敏感区域、允许特定服务的访问等多种模式,从而实现网络安全防护的功能。
防火墙增强了 ACL 的功能和安全性
防火墙不仅可以应用 ACL 规则,还可以提供其他安全功能,如入侵检测、防病毒、VPN 支持等。这些功能与 ACL 相结合,可以更全面地保护网络安全。例如,当防火墙检测到有违反 ACL 规则的可疑流量时,除了阻止该流量,还可以启动入侵检测功能来进一步分析是否存在网络攻击行为。
在 Linux 系统中查看 ACL
在 Linux 系统中,可以使用 getfacl
命令来查看文件或目录的 ACL 设置。这个命令会显示所有针对该文件或目录设置的访问控制条目。
查看 ACL 的示例
# 查看单个文件的 ACL
getfacl /path/to/file
# 查看整个目录及其内容的 ACL
getfacl -R /path/to/directory
输出将包含标准的 Unix 权限(所有者、组和其他)以及任何额外的 ACL 条目。每个条目都会列出用户或组以及对应的权限。
在消息队列系统中使用 ACL
不同的消息队列系统对 ACL 的实现和支持有所不同。以下是 Kafka 和 RocketMQ 中如何配置和使用 ACL 的简要说明:
Kafka 中的 ACL
Kafka 使用基于 ZooKeeper 的 ACL 机制来管理对主题、生产者和消费者的访问控制。你可以通过 kafka-acls.sh
脚本来设置和管理这些规则。
设置 ACL 示例
# 允许用户 alice 生产消息到 topic1
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:alice --operation Write --topic topic1
# 允许用户 group1 消费来自 topic2 的消息
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal Group:group1 --operation Read --topic topic2
RocketMQ 中的 ACL
RocketMQ 也支持 ACL,不过它的配置方式与 Kafka 不同。通常是在 Broker 配置文件中进行设置,并且需要启用名称服务器的 ACL 插件。
启用并配置 ACL 示例
在
broker.conf
中添加以下配置:aclEnable=true aclAccessKey=your_access_key aclSecretKey=your_secret_key
在 Name Server 上启用 ACL 插件:
sh mqnamesrv -c namesrv.properties &
创建 ACL 规则(例如,允许某个 IP 地址范围访问):
sh mqadmin updateAclConfig -b broker-ip:port -a your_access_key -s your_secret_key -t ip -n 192.168.1.0/24
保证 ACL 的安全性
为了确保 ACL 的安全性,可以采取以下几个措施:
最小权限原则:始终为用户和服务分配完成其任务所需的最小权限集。
定期审查:定期检查和审计 ACL 设置,以确保没有过期或不必要的权限存在。
强密码策略:对于使用用户名和密码认证的系统,实施强密码策略,并定期更新密码。
日志记录与监控:开启详细的日志记录,并使用安全信息和事件管理系统 (SIEM) 监控异常行为。
加密通信:对于网络传输的数据,使用 SSL/TLS 加密,防止中间人攻击。
限制外部访问:尽量减少对外部网络开放的服务端口,只暴露必要的服务。
双因素认证:对于敏感操作或高风险环境,实施双因素认证增加安全性。
备份与恢复计划:定期备份 ACL 设置,并制定恢复计划,以防数据丢失或损坏。
通过上述措施,可以有效地提高 ACL 的安全性,保护系统免受未授权访问和潜在的安全威胁。ACL 的主要类型
ACL(访问控制列表)可以根据其应用的上下文分为不同的类型,以下是几种常见的 ACL 类型:
文件系统 ACL:
用于控制用户或用户组对文件和目录的访问权限。
例如,在 Unix/Linux 系统中使用
setfacl
和getfacl
命令来设置和查看 ACL。
网络 ACL:
用于控制网络流量,通常在路由器、交换机等网络设备上配置。
可以基于源 IP 地址、目标 IP 地址、协议类型、端口号等条件来允许或拒绝流量。
应用程序 ACL:
为特定的应用程序或服务定义的访问控制规则。
例如,Web 服务器(如 Apache 或 Nginx)、数据库系统(如 MySQL 或 PostgreSQL)中的访问控制。
消息队列 ACL:
用于控制谁可以向消息队列发送消息,以及谁可以从消息队列接收消息。
例如,在 Kafka 或 RocketMQ 中配置的 ACL 规则。
如何配置 ACL 规则
配置 ACL 规则的具体方法取决于你使用的系统或应用程序。下面是一些常见类型的配置示例:
文件系统 ACL (Linux)
设置 ACL:
# 设置用户 alice 对 /example/file.txt 有读写权限 setfacl -m u:alice:rw /example/file.txt # 设置用户组 developers 对 /example/directory/ 有只读权限 setfacl -m g:developers:r /example/directory/
查看 ACL:
getfacl /path/to/file_or_directory
网络 ACL (Cisco IOS)
创建并应用 ACL:
access-list 100 deny ip 192.168.1.100 any access-list 100 permit ip any any interface FastEthernet0/1 ip access-group 100 in
应用程序 ACL (Apache HTTP Server)
编辑 .htaccess 文件:
<RequireAll> Require all granted Require not ip 192.168.1.100 </RequireAll>
消息队列 ACL (Kafka)
使用 kafka-acls.sh 脚本:
# 允许用户 alice 生产消息到 topic1 kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:alice --operation Write --topic topic1 # 允许用户 group1 消费来自 topic2 的消息 kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal Group:group1 --operation Read --topic topic2
ACL 与防火墙的关系
ACL 和防火墙都是网络安全的重要组成部分,但它们的作用层次不同:
防火墙:通常在网络边界运行,用于控制进出网络的数据流。它可以基于 IP 地址、端口、协议等因素过滤流量。防火墙是更宏观的安全措施,它保护整个网络不受外部攻击。
ACL:可以在多个层面上实现,包括网络设备、操作系统、应用程序等。它提供了更细粒度的访问控制,允许管理员为单个资源(如文件、目录、服务)定义详细的访问规则。ACL 是微观层面的安全控制,它保护具体的资源不受未经授权的访问。
两者通常是互补的。防火墙可以阻止大多数未授权的外部访问,而 ACL 则进一步确保即使攻击者进入内部网络,也无法访问敏感资源。结合使用这两种技术可以提供多层次的安全防护。
今天这篇,我们将从「客户端发送模式」、「服务端网络架构」、「服务端存储架构」三个维度来对比一下两者的实现差异,通过这种方式来带你学习高性能编程设计的内功心法。
02 客户端发送模式
我们先从客户端发送这个维度看看 RocketMQ 与 Kafka 的设计差异。
2.1 Kafka 客户端发送模式
首先 Kafka 消息发送客户端采用的是「双端队列 ArrayDeque」数据结构,同时还引入了「批处理 ProducerBatch」、「缓存池 BufferPool 机制」等设计思想,它的消息发送机制如下图所示:
当客户端想要调用 Kafka 的消息发送者发送消息时,消息会首先存入到一个「双端队列 ArrayDeque」中,双端
队列中单个元素为 「ProducerBatch」表示一个发送批次,其最大值会受参数 batch.size 控制,默认大小为 16K。
接着 Kafka 客户端会单独开一个 「Sender 线程」,从「双端队列 ArrayDeque」中获取对应准备好的「ProducerBatch 发送批次」,然后调用「NetworkClient 网络通信组件」将消息按批发送到 Kafka 集群中。
除了 batch.size 参数外,还引入了 linger.ms 参数来控制 Send 线程的发送行为,代表批次要在「双端队列 ArrayDeque」中等待的最小时长。
如果将 linger.ms 设置为 0,表示要立即发送消息。
如果将 linger.ms 设置为大于 0,那么 sender 线程在发送消息时只会从双端队列中获取等待时长大于该参数值的批次。
这里需要注意的是,linger.ms 参数会延长响应时间,但有利于增加吞吐量。有点类似于 TCP 领域的 Nagle 算法。
另外 Kafka 的消息发送,在写入「ProducerBatch 发送批次」时就会按照消息存储协议进行组织数据,在服务端无需解析可以直接写入到文件中,当然如果客户端跟服务端的压缩算法不一致情况除外。
2.2 RocketMQ 客户端发送模式
而 RocketMQ 的消息发送在客户端主要是根据路由选择算法选择一个队列,然后将消息发送到服务端。消息会在服务端按照消息的存储格式进行组织,然后进行持久化等操作。
2.3 小结
综上,Kafka 相对于 RocketMQ 有一个非常大的优势,那就是它的消息格式是在客户端进行组织的,即写入「ProducerBatch 发送批次」时就已经确定了,这就大大节约了 Broker 端的 CPU 压力。
另外 Kafka 在消息发送端引入了「双端队列 ArrayDeque」。可以看出 Kafka 的设计初衷是在追求批处理,这能够提高消息发送的吞吐量,但于此同时,消息的响应时间延长了,消息丢失的可能性也会加大。
在这里,如果我们想达到 RocketMQ 消息发送的效果,我们可以将 Kafka 中 linger.ms 参数设置为 0 即可。但是 Kafka 可以通过调整 batch.size 与 linger.ms 两个参数来适配不同的业务场景,这种实现方式要比 RocketMQ 更加灵活些。
03 服务端网络架构
接下来,我们从服务端网络架构这个维度看看 RocketMQ 与 Kafka 的设计。
3.1 Kafka 服务端网络架构
首先 Kafka 服务端网络架构采用的是 Java NIO 实现的「Reactor 设计模型」以及 「Selector 多路复用器」,其架构图如下:
其内部包含有三个主要组件「Acceptor 线程」、「Processor 网络线程」、「KafkaRequestHandler 业务线程」。
具体源码可以查看以下文章进行学习:
【服务端 Broker 源码分析系列第一篇】图解 Kafka 源码之 Reactor 网络模型架构设计
【服务端 Broker 源码分析系列第二篇】图解Kafka源码SocketServer组件之Acceptor线程架构设计
【服务端Broker源码分析系列第三篇】图解Kafka源码SocketServer组件之Porcessor线程架构设计
【服务端Broker源码分析系列第四篇】图解Kafka源码之 RequestChannel 请求通道架构设计
【服务端Broker源码分析系列第五篇】图解Kafka源码之 KafkaRequestHandler I/O 线程池剖析
【服务端Broker源码分析系列第六篇】图解Kafka源码之 KafkaApis 详解
【服务端Broker源码分析系列第六篇】图解Kafka源码之网络层请求处理全流程总结
3.2 RocketMQ 服务端网络架构
而 RocketMQ 服务端网络架构是基于「Netty」异步事件驱动模式进行网络通信的。
Netty 是一个高性能的 Java 网络编程框架,具有低延迟、高吞吐量和高性能等特性。RocketMQ Broker 采用 Netty 分别实现了与 Producer、Consumer 和 Name Server 之间的通信,同时支持「长连接」和「短连接」等多种通信模式。
3.3 小结
综上,Kafka 的服务端网络层架构设计采用了 Java NIO 实现的 Reactor 设计模型,具有以下优势:
高并发性:使用 Java NIO 实现的 Reactor 设计模型能够更好地处理高并发请求,能够同时处理多个请求。
高吞吐量:Reactor 模型能够管理并行请求的并发量,有效地提高了 Kafka 的吞吐量。
低延迟:由于 Kafka 采用了异步处理模型,能够更快地处理请求并响应客户端,从而减少了系统响应时间,降低了延迟。
可扩展性:Reactor 模型能够更好地处理请求和并发量,更容易实现系统的可扩展性。
而 RocketMQ 服务端网络层架构采用了基于 Netty 框架的实现模型,具有以下优势:
高性能:Netty 框架采用了异步非阻塞的 I/O 模型,具有高性能和低延迟的特点。
可扩展性:Netty 框架的实现可以根据不同的场景和需要进行快速扩展和定制。
易于使用:Netty 提供了简单易用的 API,开发者能够更加容易地编写高质量的代码,从而提高了开发的效率。
支持多种协议:Netty 框架支持多种网络协议,包括 TCP、UDP、HTTP、SMTP、WebSocket 等,能够支持更多的网络应用。
两者并没有太大的性能差异。Kafka 采用的是 Java NIO 实现的 Reactor 设计模型,强调「高并发性」、「高吞吐量」、「低延迟」。而 RocketMQ 使用 Netty 实现模型,注重「性能」和「可扩展性」。
一、Kafka 的服务端网络层架构优势
高并发性:
Java NIO 实现的 Reactor 设计模型可以同时处理多个请求,这对于处理大量并发的生产者和消费者连接非常重要。在高并发的场景下,如大规模的分布式系统或实时数据处理应用中,Kafka 能够有效地管理和响应众多的连接请求。
例如,在一个大型电商平台中,订单系统可能会同时产生大量的消息,而消费者可能来自不同的业务模块,如库存管理、物流配送等。Kafka 的高并发性能够确保这些消息能够被快速地接收和处理,不会因为并发请求过多而导致系统性能下降。
高吞吐量:
Reactor 模型能够管理并行请求的并发量,这使得 Kafka 可以在单位时间内处理大量的消息。高吞吐量对于需要处理海量数据的应用场景至关重要,如日志收集、大数据分析等。
例如,在一个大型互联网公司的日志收集系统中,每天可能会产生数十亿条日志记录。Kafka 的高吞吐量能够确保这些日志能够被快速地收集和存储,以便后续的分析和处理。
低延迟:
异步处理模型使得 Kafka 能够更快地处理请求并响应客户端,减少了系统响应时间,降低了延迟。在对实时性要求较高的应用场景中,如金融交易系统、实时监控系统等,低延迟是非常关键的性能指标。
例如,在金融交易系统中,交易订单的处理速度直接影响到交易的执行效率和用户体验。Kafka 的低延迟特性能够确保交易订单能够被快速地发送到交易系统,并及时得到处理和响应。
可扩展性:
Reactor 模型能够更好地处理请求和并发量,这使得 Kafka 更容易实现系统的可扩展性。随着业务的增长和数据量的增加,Kafka 可以通过增加节点或分区来扩展其处理能力,而不会对系统性能产生明显的影响。
例如,在一个不断发展的社交媒体平台中,用户数量和消息量可能会持续增长。Kafka 的可扩展性能够确保系统能够随着业务的发展而不断扩展,满足不断增长的业务需求。
二、RocketMQ 的服务端网络层架构优势
高性能:
Netty 框架采用了异步非阻塞的 I/O 模型,具有高性能和低延迟的特点。这种模型能够充分利用系统资源,提高网络通信的效率,从而为 RocketMQ 提供了高性能的服务端网络层架构。
例如,在一个高并发的消息队列系统中,Netty 的高性能能够确保消息的快速发送和接收,减少了消息的延迟和积压。
可扩展性:
Netty 框架的实现可以根据不同的场景和需要进行快速扩展和定制。这使得 RocketMQ 能够根据实际业务需求进行灵活的配置和扩展,满足不同应用场景的要求。
例如,在一个分布式系统中,RocketMQ 可以根据不同的业务模块和数据流量进行动态的扩展和调整,以确保系统的性能和稳定性。
易于使用:
Netty 提供了简单易用的 API,开发者能够更加容易地编写高质量的代码,从而提高了开发的效率。这使得开发者可以更加专注于业务逻辑的实现,而不必过多关注底层的网络通信细节。
例如,在开发一个基于 RocketMQ 的消息队列应用时,开发者可以利用 Netty 的简单易用的 API 快速地实现消息的发送和接收功能,而不必花费大量的时间和精力去学习和理解复杂的网络通信协议。
支持多种协议:
Netty 框架支持多种网络协议,包括 TCP、UDP、HTTP、SMTP、WebSocket 等,能够支持更多的网络应用。这使得 RocketMQ 可以与不同的系统和应用进行集成,扩展了其应用场景和功能。
例如,在一个企业级应用中,RocketMQ 可以通过支持多种协议与不同的业务系统进行集成,实现数据的实时传输和共享。
三、两者性能差异不大的原因
虽然 Kafka 和 RocketMQ 在服务端网络层架构上采用了不同的实现方式,但两者并没有太大的性能差异。这是因为它们都针对高并发、高吞吐量、低延迟和可扩展性等关键性能指标进行了优化,并且在实际应用中都能够满足大多数业务场景的需求。
此外,两者的性能还受到其他因素的影响,如硬件资源、网络环境、消息大小、消息存储方式等。在实际应用中,需要根据具体的业务需求和场景进行选择和优化,以充分发挥它们的性能优势。
易于使用:
以下是一个更具体的例子来说明 Netty 在基于 RocketMQ 的消息队列应用中如何提供简单易用的 API 以及提高开发效率:
场景设定
假设我们正在开发一个电商平台的订单处理系统,其中使用 RocketMQ 来传递订单状态变化的消息。当订单状态发生变化时,系统需要将新的状态信息发送给其他相关的业务模块,如库存管理、物流配送等。同时,这些业务模块也需要能够接收并处理这些消息。
使用 Netty API 实现消息发送
在使用 Netty 的 API 实现消息发送功能时,开发者可以按照以下步骤进行:
创建一个 Netty 的客户端通道(Channel):
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 可以在这里添加自定义的编码器和解码器,用于处理消息的序列化和反序列化
}
});
// 连接到 RocketMQ 的服务器地址
ChannelFuture future = bootstrap.connect("localhost", 10911).sync();
Channel channel = future.channel();
准备要发送的消息:
OrderStatusChangeMessage message = new OrderStatusChangeMessage(orderId, newStatus);
byte[] serializedMessage = serializeMessage(message); // 自定义的消息序列化方法
发送消息:
channel.writeAndFlush(Unpooled.copiedBuffer(serializedMessage));
通过以上简单的代码,开发者就可以利用 Netty 的 API 将订单状态变化的消息发送到 RocketMQ 的服务器。在这个过程中,开发者不需要深入了解底层的网络通信细节,如 TCP 连接的建立、数据的分包和重组等。Netty 已经封装了这些复杂的操作,提供了简洁的 API 让开发者专注于业务逻辑的实现。
使用 Netty API 实现消息接收
同样,在接收消息时,开发者可以使用 Netty 的服务器端通道来处理来自 RocketMQ 的消息:
创建一个 Netty 的服务器通道(ServerChannel):
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 在这里添加消息的解码器,用于将接收到的字节流转换为业务对象
ch.pipeline().addLast(new OrderStatusChangeMessageDecoder());
// 添加业务逻辑处理器,用于处理接收到的消息
ch.pipeline().addLast(new OrderStatusChangeMessageHandler());
}
});
// 绑定服务器端口,等待客户端连接
ChannelFuture future = bootstrap.bind(10912).sync();
定义消息解码器和处理器:
// 消息解码器,将字节流转换为 OrderStatusChangeMessage 对象
public class OrderStatusChangeMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) { // 假设消息长度占 4 个字节
int messageLength = in.readInt();
if (in.readableBytes() >= messageLength) {
byte[] messageBytes = new byte[messageLength];
in.readBytes(messageBytes);
OrderStatusChangeMessage message = deserializeMessage(messageBytes); // 自定义的消息反序列化方法
out.add(message);
}
}
}
}
// 消息处理器,处理接收到的 OrderStatusChangeMessage 对象
public class OrderStatusChangeMessageHandler extends SimpleChannelInboundHandler<OrderStatusChangeMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, OrderStatusChangeMessage message) throws Exception {
// 处理接收到的订单状态变化消息,如更新库存、通知物流等业务逻辑
processOrderStatusChange(message);
}
}
通过以上代码,开发者可以使用 Netty 的 API 快速搭建一个能够接收 RocketMQ 消息的服务器。同样,开发者不需要关心底层的网络通信细节,只需要专注于实现业务逻辑,即处理接收到的订单状态变化消息。
综上所述,Netty 的简单易用的 API 使得开发者在基于 RocketMQ 的消息队列应用开发中能够更加高效地实现消息的发送和接收功能,从而提高了开发效率。
Netty 的可扩展性和易于使用性
Netty 是一个高性能的异步事件驱动网络应用框架,它不仅提供了强大的网络编程能力,还通过其灵活的配置和插件化设计以及简洁的 API 来简化开发工作。下面是两个方面的具体例子:
可扩展性:灵活的配置和插件化设计
自定义编解码器:
场景:在处理特定协议时,可能需要对数据进行编码或解码。
实现:Netty 允许开发者创建自定义的
ChannelInboundHandler
或ChannelOutboundHandler
来处理编码和解码逻辑。例如,可以创建一个ByteToMessageDecoder
和MessageToByteEncoder
来处理自定义协议的数据转换。示例代码:
public class MyCustomDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 自定义解码逻辑 } } public class MyCustomEncoder extends MessageToByteEncoder<MyCustomMessage> { @Override protected void encode(ChannelHandlerContext ctx, MyCustomMessage msg, ByteBuf out) throws Exception { // 自定义编码逻辑 } }
添加新的处理器:
场景:根据业务需求,可能需要在消息处理管道中添加新的处理逻辑。
实现:可以通过
ChannelPipeline
添加新的处理器(ChannelHandler
)。每个处理器可以独立地处理特定的任务,如日志记录、安全检查、业务逻辑等。示例代码:
ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO)); pipeline.addLast("customHandler", new MyCustomHandler());
使用子通道:
场景:在某些情况下,可能需要为每个连接创建一个独立的处理管道。
实现:Netty 支持子通道(
ChildChannel
),可以在父通道初始化时为每个新连接配置独立的管道。示例代码:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new MyCustomHandler()); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
易于使用:简洁且强大的 API
启动服务器:
场景:快速搭建一个简单的 TCP 服务器。
实现:使用 Netty 可以非常简单地启动一个 TCP 服务器,并且只需几行代码就可以完成基本配置。
示例代码:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class SimpleServer { private int port; public SimpleServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new SimpleChannelInitializer()); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new SimpleServer(port).run(); } }
处理客户端请求:
场景:处理客户端发送的消息并响应。
实现:通过实现
ChannelInboundHandler
接口来处理接收到的消息,并通过ctx.writeAndFlush()
方法发送响应。示例代码:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SimpleChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); } } finally { in.release(); } // 发送响应 String response = "Hello, Client!"; ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(response.getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
通过这些示例,可以看到 Netty 如何通过其灵活的配置、插件化设计和简洁强大的 API 来满足不同的应用场景,并简化网络编程的工作。这种设计使得 Netty 成为构建高性能、可扩展网络应用的理想选择。
04 服务端存储架构
接着,我们从服务端存储架构这个维度看看 RocketMQ 与 Kafka 的设计差异。
4.1 Kafka 服务端存储架构
Kafka 存储实现方案, 即基于「顺序追加写日志」 + 「稀疏哈希索引」。 日志存储结构如下:
从上图可以看出来,Kafka 是基于「主题」+ 「分区」+ 「副本」+ 「分段」+ 「索引」的结构组成:
kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。
Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中。
Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的 偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。
然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理的时候,直接删除旧的 LogSegement 文件就可以了。
Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".snapshot"为后缀的快照索引文件等)。
了解了日志存储架构之后,我们来看看日志数据是如何被查询的,它的底层实现是 「稀疏哈希索引」,即先根据 Offset 大小找到对应的块, 然后再从块中顺序查找。如下图所示:
另外,Kafka 分区支持「副本机制」,即一个分区可以在多台 Broker 机器上进行复制数据。topic 中每一个分区会有「Leader」与「Follower」。
Kafka 的内部机制可以保证 topic 某一个分区的「Leader」与「Follower」不在分到同一台机器上,并且每一台 Broker 会尽量均衡地承担各个分区的 Leader。
分区的「Leader」节点负责读写,而「Follower」节点则负责数据同步,如果 Leader 分区所在的 Broker 节点宕机后,会触发「Leader 选举机制」,从「ISR 副本集合」中 Follower 副本中选一个作为新的 Leader,如下图所示:
分区 Leader 收到客户端的消息发送请求后,有以下两种返回策略:
将数据写入到 Leader 节点后就返回。
等到它的 Follower 节点全部写入完成后再返回。
这个策略选择非常的关键,它会直接影响消息发送端的时延,延所以 Kafka 提供了 ack 这个参数来进行策略供你选择:
说完这个日志存储方案后,我们来看看 Kafka 服务端是如何处理消息写入的?
Kafka 服务端处理消息写入的代码定义在 FileRecords 的 writeTo 方法中,具体代码截图如下:
从上面这段代码看,Kafka 服务端写入消息时,主要是调用 FileChannel 的 transferTo 方法,该方法在底层使用了操作系统的 sendfile 系统调用。
sendfile 系统调用的优势
减少数据拷贝次数:传统的文件读取和网络发送过程涉及多次数据拷贝。例如,数据通常要从磁盘文件拷贝到内核缓冲区,再从内核缓冲区拷贝到用户缓冲区,最后从用户缓冲区拷贝到网络缓冲区。而 sendfile 系统调用可以直接将数据从磁盘文件的内核缓冲区传输到网络缓冲区,减少了中间的用户缓冲区拷贝环节,从而显著提高了数据传输效率。
提高性能和吞吐量:通过减少数据拷贝次数,降低了 CPU 的开销。这使得 Kafka 服务端在处理大量消息写入和发送时,可以更高效地利用系统资源,进而提高整个系统的吞吐量。例如,在处理海量日志数据的场景中,这种优化能够确保日志消息可以快速地从存储介质传输到网络,满足高并发的日志收集和处理需求。
FileChannel 的 transferTo 方法作用
实现高效的文件到通道的数据传输:transferTo 方法提供了一种简单而高效的方式,将文件中的数据传输到另一个通道(比如网络通道)。在 Kafka 的消息写入过程中,它将存储在文件中的消息数据(通过 FileRecords 表示)传输到网络通道,以便发送给消费者或者其他 Kafka 节点。
跨平台兼容性和一致性:Java 的 FileChannel 类提供了一种跨平台的抽象。尽管底层可能因操作系统的不同而在实现 sendfile 系统调用的方式上有所差异(例如,不同操作系统的系统调用名称和参数可能不同),但 Java 的 FileChannel 类的 transferTo 方法对这些差异进行了封装。这使得 Kafka 的代码在不同操作系统上都能够以相对一致的方式进行消息写入操作,提高了代码的可移植性和稳定性。
消息写入流程细节
数据定位:在调用 transferTo 方法之前,Kafka 服务端需要确定要写入消息的文件位置(在 FileRecords 中记录)。这涉及到对消息在文件中的偏移量等信息的管理。例如,当有新的消息批次写入时,服务端要根据当前文件的写入指针或者索引信息,找到合适的位置来开始写入新消息。
写入操作的原子性和一致性:Kafka 需要确保消息写入的原子性和一致性。虽然 transferTo 方法本身提供了高效的数据传输机制,但 Kafka 还需要考虑在多线程或者多客户端并发写入消息的情况下,如何保证文件系统中的消息存储状态是正确的。这可能涉及到对文件锁、日志段切换等操作的处理。例如,当一个日志段写满后,需要安全地切换到新的日志段进行消息写入,同时要保证之前写入消息的完整性和顺序性。
Kafka 服务端处理消息写入的机制
Kafka 的消息写入过程是其高性能和高吞吐量的关键之一。在 Kafka 中,消息被持久化到磁盘上的日志文件中。为了优化性能,Kafka 使用了多种技术,包括零拷贝(zero-copy)技术和批处理等。
消息写入的核心方法
Kafka 服务端处理消息写入的主要逻辑定义在 FileRecords
类的 writeTo
方法中。这个方法主要负责将内存中的数据写入到磁盘上的日志文件中。以下是 writeTo
方法的一些关键点:
使用
FileChannel
:Kafka 使用 Java NIO 的
FileChannel
来进行 I/O 操作。FileChannel
提供了一种高效的方式来读写文件,并且支持异步操作。
调用
transferTo
方法:在
writeTo
方法中,Kafka 调用了FileChannel
的transferTo
方法来将数据从一个通道传输到另一个通道。这个方法利用了操作系统的sendfile
系统调用来实现零拷贝。sendfile
系统调用允许数据直接从内核缓冲区复制到网络套接字,而不需要经过用户空间,从而减少了 CPU 和内存的使用,提高了性能。
批处理:
Kafka 将多个消息批量写入磁盘,以减少 I/O 操作的次数。这样可以显著提高写入性能,尤其是在高并发场景下。
顺序写入:
Kafka 的日志文件采用追加的方式写入,即所有的写操作都是顺序写。这避免了随机写带来的性能开销,并且充分利用了现代硬盘的顺序写特性。
日志分段:
Kafka 将日志文件分割成多个分段(segments),每个分段都有固定的大小。当一个分段写满后,会创建一个新的分段。这种方式有助于管理和清理过期的日志数据。
索引文件:
为了快速定位消息,Kafka 还维护了一个索引文件,记录了每条消息在日志文件中的偏移量。索引文件也是通过
FileChannel
进行写入的。
具体代码示例
假设我们有一个简化的 writeTo
方法示例,展示了如何使用 FileChannel
和 transferTo
方法:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileRecords {
private final FileChannel fileChannel;
public FileRecords(FileChannel fileChannel) {
this.fileChannel = fileChannel;
}
public void writeTo(ByteBuffer buffer, long position) throws IOException {
// 定位到指定位置
fileChannel.position(position);
// 使用 transferTo 方法将数据写入文件
while (buffer.hasRemaining()) {
long bytesWritten = fileChannel.transferTo(fileChannel.position(), buffer.remaining(), fileChannel);
if (bytesWritten == 0) {
break; // 如果没有写入任何数据,则退出循环
}
fileChannel.position(fileChannel.position() + bytesWritten); // 更新文件指针
}
}
}
在这个示例中,writeTo
方法接收一个 ByteBuffer
和一个 position
参数。它首先将 FileChannel
的位置设置为 position
,然后使用 transferTo
方法将 ByteBuffer
中的数据写入到文件中。
需要注意的是,实际的 Kafka 代码要比这个示例复杂得多,涉及到更多的细节和优化,例如错误处理、日志分段管理、索引文件更新等。
总结
Kafka 服务端在处理消息写入时,通过以下几种方式来优化性能:
使用
FileChannel
和transferTo
方法,利用sendfile
系统调用实现零拷贝。批处理多条消息,减少 I/O 操作次数。
采用顺序写入,避免随机写带来的性能开销。
维护日志分段和索引文件,便于管理和快速查找消息。
这些设计使得 Kafka 能够在高并发场景下提供高吞吐量和低延迟的消息处理能力。
4.2 RocketMQ 服务端存储架构
接下来,我们再来看看 RocketMQ 服务端存储架构。
RocketMQ 存储文件包括「CommitLog 日志文件」、「ConsumeQueue 消费队列」、「IndexFile 索引文件」,存储目录如下:
RocketMQ 所有主题的消息都会写入到「CommitLog 文件」中,然后基于「CommitLog 文件」构建消
息消费队列「ConsumeQueue 消费队列」,消息消费队列的组织结构按照 /topic/{queue} 来组织。
消息存储的整体的设计:
CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容, 消息内容不是定长的。单个文件大小默认1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件写满了,会自动生成一个新文件继续写入。CommitLog 文件保存于${Rocket_Home}/store/commitlog 目录中。
ConsumeQueue:消息消费队列,引入目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size和消息 Tag 的 HashCode 值。ConsumeQueue 文件可以看成是基于 Topic 的 CommitLog 索引文件, ConsumeQueue文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 ConsumeQueue 文件采取定长设计,每一个条目共20 个字节,分别为 8字节的 CommitLog 物理偏移量、4字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72 M;
IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:{fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400 M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,所以 RocketMQ 的索引文件其底层实现为 hash 索引。
总之 RocketMQ 采用的是混合型存储结构,即多个 Topic 的消息都写入同一个日志数据文件即「CommitLog 文件」来存储。
RocketMQ的混合型存储结构针对 Producer 和 Consumer 分别采用了「数据」和「索引」部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化到 「CommitLog 文件」。
只要消息被刷盘持久化到磁盘文件 CommitLog 中,那么 Producer 端发送的消息就不会丢失。Consumer 端也就能消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30 s的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程「ReputMessageService」不停地分发请求并异步构建 「ConsumeQueue 消费队列」和「IndexFile 索引文件」数据。
另外 RocketMQ 服务端默认采取的是「主从同步」架构,即 Master-Slave 方式,其中 Master 节点负责读写,
Slave 节点负责数据同步与消费。在 RocketMQ 4.5 版本后引入了「多副本机制」。
RocketMQ 的副本机制与 kafka 的副本机制两者差异:
RocketMQ 的副本机制是基于 Commitlog 文件。
kafka 是副本机制是基于主题分区级别。
说完这个日志存储方案后,我们来看看 RocketMQ 服务端是如何处理消息写入的?
RocketMQ 服务端处理消息写入支持「内存映射」与「FileChannel」两种写入方式:
当将参数 tranisentStorePoolEnable 设置为 false,数据会先写入到「页缓存 PageCache」,然后根据「刷盘策略机制」将数据持久化到磁盘中。
当将参数 tranisentStorePoolEnable 设置为 true,数据会先写入到「堆外内存 DirectByteBuffer」,然后「批量提交」到 「FileChannel」,并最终根据「刷盘策略机制」将数据持久化到磁盘中。
虽然 RocketMQ 也支持「FileChannel」方式写入,但是它底层调用的 API 是并不是 「transferTo」, 而是先调用 「writer」,然后定时 flush 刷新到磁盘中,具体的代码调用入口如下:
4.3 小结
综上,Kafka 中日志存储是以 Topic/partition 为主 ,每一个分区拥有一个物理文件夹,Kafka 在分
区级别实现「文件顺序写」。Kafka 在消息写入时的 I/O 性能,会随着 Topic 、分区数量的增长会先上升后下降。
而 RocketMQ 在消息写入时「追求极致的顺序写」,所有的消息不区分主题会统一顺序写入「CommitLog 文件」中, topic 和 分区数量的增加不会影响写入顺序。
05 总结
这里,我们一起来总结一下这篇文章的重点。
我们从 「客户端发送模式」、「服务端网络架构」、「服务端存储架构」三个维度对比了 Kafka 与 RocketMQ 各自再追求高性能时采用的技术架构。
综合对比来看,在同等硬件配置下,Kafka 的综合性能要比 RocketMQ 更为强劲。
RocketMQ 和 Kafka 都采用了顺序写机制,但相对 Kafka 来说,RocketMQ 消息写入时追求「极致的顺序写」,会在同一时刻将消息全部写入一个 CommitLog 文件,这显然无法压榨磁盘的性能。而 Kafka 是采取「分区级别顺序写」,在分区数量不多的情况下,这能充分发挥 CPU 的多核优势。所以在磁盘没有遇到瓶颈时,Kafka 的性能要优于 RocketMQ。
在消息发送方面,Kafka 的客户端充分利用了 「Batch 批处理」思想,比 RocketMQ 要拥有更高的吞吐量。
Kafka 底层写入数据时直接调用「FileChannel」的「transferTo」方法比 RocketMQ 中的「write」方法性能更优,因为「transferTo」底层使用了操作系统的「sendfile」系统调用,能充分发挥块设备的优势。
RocketMQ Producer 架构设计
从今天开始,我们开始对 RocketMQ 进行相关实现原理进行剖析,今天是第二篇,我们来聊聊 RocketMQ Producer 的架构设计,深度剖析下其内部底层原理设计思想,下面进入正题。
01 RocketMQ Producer 总体概述
在 RocketMQ 中,我们通常把生产消息的一方称为 Producer 即生产者,它是消息的来源所在,它的主要功能就是将客户端的请求发送到 RocketMQ 服务端上。
本文从原理实现上来讨论以下 2 个问题:
RocketMQ Producer 的启动流程是怎样的?
RocketMQ Producer 是如何把消息发送到服务端 Broker 上的?
02 RocketMQ Producer 之启动流程
2.1 生产者示例
我们都知道,在 RocketMQ 中,消息发送分为「同步消息」、「异步消息」、「单向消息」。
同步消息:即 Producer 将消息发送之后会同步等待 Broker 的响应,并把响应结果传递给业务线程,整个过程中业务线程都处于在等待过程。
异步消息:即 Producer 将消息发送请求放进线程池就返回了无需等待 Broker 的响应。后续逻辑处理,网络请求都在线程池中进行,等结果处理完之后回调业务定义好的回调函数。
单向消息:只负责发送消息,不管发送结果。
下面给出这三种发送消息的示例:
2.1.1 同步发送
// 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1、实例化消息生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 2、设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 3、启动 Producer 实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 5、发送消息到 Broker
SendResult sendResult = producer.send(msg);
// 6、通过 sendResult 返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 7、如果不再发送消息,关闭 Producer 实例。
producer.shutdown();
}
}
2.1.2 异步发送
// 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 1、实例化消息生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 2、设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 3、启动 Producer 实例
producer.start();
// 4、设置异步调用失败后的重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5、发送消息并通过 SendCallback 接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 6、如果不再发送消息,关闭 Producer 实例。
producer.shutdown();
}
}
2.1.3 单向发送
// 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 1、实例化消息生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 2、设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 3、启动 Producer 实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 4、发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 5、如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
这里简述下上面的消息发送流程:
首先,需要实例化一个生产者 producer,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。
然后 producer 得做一些初始化,这是很关键的一步,它要和 NameServer 进行通信并初始化通信模块等等。
当 producer 已经准备好之后,就可以准备要发的消息内容了,即创建消息,并指定Topic,Tag、消息体。
当消息内容准备好之后,producer 就可以把消息发送出去了。
这此时 producer 是怎么知道要发送到哪个 Broker 地址上呢?它会去 NameServer 获取路由信息,比如得到 Broker 的地址是 localhost:10911,然后通过网络通信将消息发送给 Broker。
生产者发送的消息通过网络传输给 Broker,Broker 端需要对消息按照一定的数据结构进行存储,这点是与 Kafka 有所不同的, Kafka 是在 Producer 端发送批次时就确定了消息格式,而 RocketMQ 是在 Broker 端进行组装的。存储完成之后,把存储结果告知生产者。
同步消息:SendResult sendResult = producer.send(msg);
异步消息:producer.send(msg, new SendCallback() {}
单向消息:producer.sendOneway(msg);
通过上面的发送流程中,我们发现有两个地方是非常关键的,也就是 producer 启动与消息发送。
// 启动 Producer 实例
producer.start();
// 发送消息, 同步或异步
producer.send();
下面会从这两行代码为切入点,来看看 RocketMQ Producer 的设计与实现原理。
2.2 生产者启动初始化
我们实例化一个生产者 DefaultMQProducer,然后调用它的 start() 方法进行初始化。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
看似就2行代码,其实底层实现了很多功能,用阿里的一句话来说就是 「把复杂流程自己,把简单交给别人」。具体的内部调用流程如下图所示:
从图上可以得出:启动流程相对比较长,这里我们来看下流程步骤:
参数校验,检查配置是否合法:生产者组名称是否为空,长度是否超过最大值 255,是否满足其命名规则。
初始化 MQClientInstance 实例,注册本地路由消息。
启动通信模块服务:这里启动的是 NettyRemotingClient 服务,RemotingClient 是一个接口,底层使用的通讯框架是 Netty,提供了实现类 NettyRemotingClient 。它在初始化时实例化 Bootstrap,方便后续用来创建 SocketChannel。
启动后台定时任务:这里总共有5个定时任务,分别为:
「定时更新 NameServerAddr 信息」每隔 2 分钟调用一次,延迟 10 秒执行。
「定时更新 Topic 路由信息」每隔 30 秒调用一次,延迟 10 ms 执行。
「定时向 Broker 发送心跳以及清理下线的 Broker」每隔 30 秒调用一次,延迟 1 秒执行。
「定时持久化 Consumer 的 Offset 信息」每隔 5 秒调用一次,延迟 10 秒执行。
「定时调整线程池大小」每隔 1 ms 调用一次,延迟 1 ms 执行。
启动消息拉取服务:这两个服务都是用于消费者的,这里我们暂且不展开。消息拉取服务 pullMessageService 是从 Broker 拉取消息的服务;而重平衡服务 rebalanceService 用于消费者的负载均衡,负责分配消费者可消费的消息队列。
启动消息 push 服务:发送客户端消息出去。
最后设置服务状态为启动成功。
03 RocketMQ Producer 之前提知识
当客户端启动完成之后,Producer 就可以开始发送消息了。在讲解发送消息流程之前,我们先来了解一些前提知识点,为更好的理解发送流程做准备。
3.1 Netty 服务端
我们知道 RocketMQ 是一款高性能、高吞吐量的消息中间件,除了 Producer 端、Consumer 端需要网络传输之外,数据还需要在服务端集群中进行流转,所以一个「高效」、「可靠」的网络通讯组件是必不可少的。
在这里,RocketMQ 选择的是 Netty 通讯框架,我们在使用 Netty 时首先需要考虑的是业务上的数据「粘包」、「拆包」问题,Netty 为我们提供了一套较为常用的解决方案。
消息格式这块,RocketMQ 使用的规则是最为通用的 「head」、「body」分离方式,其中 head 用来存储消息的长度,而 body 用来存储真正的数据,具体的实现类在 NettyRemotingClient 。
消息收发这块,RocketMQ 将所有的消息都封装在同一个协议类 RemotingCommand 中,这么做的好处是:统一规范、减轻网络协议来适配不同的消息类型带来的负担。
Netty 为我们提供的 2 个 Handler。
org.apache.rocketmq.remoting.netty.NettyEncoder:消息编码,向 Broker 或者 NameServer 发送消息时,进行编码操作,即将 RemotingCommand 对象转换为 byte[] 格式。
org.apache.rocketmq.remoting.netty.NettyDecoder:消息解码,接收 Broker 端返回的消息时,进行解码操作,即将 byte[] 格式转换为 RemotingCommand 对象。
3.2 消息格式
消息格式由 「MsgHeader」、「MsgBody」组成,这里将消息的「长度」、「标记」、「版本」等重要参数都放在 header 中, 而 body 只存放数据。
下面是一张 Netty 视角的消息格式。
3.3 Topic 路由消息
3.3.1 Topic 创建
在发送消息之前,我们需要先创建一个 Topic,创建 Topic 的命令可以使用 RocketMQ 自带的命令行工具,如下图所示:
mqadmin updateTopic -b <arg> | -c <arg> [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t <arg> [-u <arg>] [-w <arg>]
举例说明:
updateTopic -b localhost:10911 -t message -r 16 -w 16 -p 6 -o false -u false -s false
这里简单介绍下每个参数的作用:
-b :broker 地址,表示 topic 所在 Broker,只支持单个 Broker,地址为 ip:port。
-c :cluster 地址,表示 topic 所在 cluster,会向 cluster 中所有的 broker 发送请求。
-t :topic 名称。
-r :可读队列数(4.9.x 版本下默认为 16,在 TopicConfig 中定义)。
-w :可写队列数(4.9.x 版本下默认为 16,在 TopicConfig 中定义)。
-p :指定新topic的读写权限 (W=2|R=4|WR=6)2表示当前 topic 仅可写入数据,4 表示仅可读,6 表示可读可写。
-o :set topic's order(true|false)。
-u :is unit topic (true|false)。
-s :has unit sub (true|false)。
执行上面例子的命令,则会在 localhost:10911 对应的 Broker 下创建一个 Topic,且该 Topic 的读写队列数都为16。
3.3.2 读写队列数
读写队列是 RocketMQ 独有的,对应 Kafka 来说只有一个 Partition 不区分读写。在一般情况下,这两个队列数值建议设置为相等。
这里我们来看下 Client 端是如何对它们进行处理的?
1、生产端
// 按照 QueueData 配置的写队列个数,生成对应数量的 MessageQueue
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
2、消费端
// 按照 QueueData 配置的读队列个数,生成对应数量的 MessageQueue
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
为什么要将这 2 个队列数设置相等呢?如果不相等会出现什么问题?我们来看下。
这里我们设置写队列为8,读队列为6,如下图所示:
从上图可以看出,6、7 号队列中的数据一定不会被消费,这里有个规则如下:
writeQueueNum > readQueueNum :此时大于 readQueueNum 部分的队列永远不会被消费。
writeQueueNum < readQueueNum :此时所有队列中的数据都会被消费,但部分读队列中的数据会一直为空。
这么做的好处:可以让我们更加「精细」、「方便」的控制读写操作。
3.3.3 路由数据结构
接下来,我们来看看 Topic 路由数据结构,本篇是讲解 Producer,所以这里只分析 Producer 从 NameServer 上获取路由数据,数据结构如下:
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
// 队列数据信息
private List<QueueData> queueDatas;
// broker 数据信息
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
....
}
// 队列数据信息
public class QueueData implements Comparable<QueueData> {
// broker name
private String brokerName;
// 读队列数
private int readQueueNums;
// 写队列数
private int writeQueueNums;
....
}
// broker 数据信息
public class BrokerData implements Comparable<BrokerData> {
// 集群
private String cluster;
// broker name
private String brokerName;
// broker 地址列表,主要用于主从节点集群场景
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
....
}
下面通过一张图来描述下上面这段代码,如下图所示:
从上图可以得出,该集群 cluster 下有3个 broker,每个 broker 下有 1 个 master,2 个 slave 组成。所以此时你应该理解了类 BrokerData 中有 HashMap<Long/* brokerId /, String/ broker address */> brokerAddrs
这个变量的原因了吧。
这里需要注意的是:master 节点的编号始终为 0 。
理解 BrokerData 中的 HashMap 变量
存储 Broker 信息的必要性:在这个集群架构中,
BrokerData
类中的HashMap<Long, String>
变量(brokerAddrs
)起到了关键的存储作用。因为集群中有多个broker
,每个broker
又包含一个master
和两个slave
,这个变量用于记录每个broker
(通过brokerId
来标识)的地址(broker address
)。例如,当需要向特定broker
发送消息或者获取某个broker
上的主题(Topic
)信息时,就可以通过brokerId
快速从这个HashMap
中获取相应broker
的地址。Master 节点编号的特殊性:由于
master
节点的编号始终为0
,这为集群的管理和通信提供了便利。在进行读写操作时,可能首先会定位到master
节点(brokerId = 0
)进行操作,如消息的写入操作可能先发送到master
,然后再根据需要同步到slave
节点。这种固定的编号方式有助于简化代码逻辑和操作流程。
Topic 路由信息变更的原因和时机
集群拓扑结构变化时:当集群中的
broker
数量发生变化,比如新增了一个broker
或者某个broker
出现故障退出集群,Topic
路由信息就需要变更。例如,新增一个broker
可能会导致某些主题的分区重新分配,以平衡负载。原本存储在其他broker
上的主题分区可能会部分迁移到新的broker
上,此时Topic
路由信息要更新,以反映每个主题分区在集群中的新位置。主题配置调整时:如果对主题的配置进行了修改,例如分区数量的增加或减少、副本因子的改变等,
Topic
路由信息也会相应变更。比如,将一个主题的分区数从3
个增加到5
个,那么就需要重新分配这些分区在各个broker
上的存储位置,并且更新Topic
路由信息,使得生产者和消费者能够正确地定位到新的分区位置来发送和接收消息。Broker 故障恢复时:当某个
broker
(无论是master
还是slave
)出现故障并恢复后,Topic
路由信息可能需要调整。在故障期间,可能已经对主题分区的存储位置进行了临时调整,恢复后需要根据集群的策略(如副本同步策略)重新确定该broker
在主题分区存储和处理中的角色,从而更新Topic
路由信息。
接下来,我们来看看 Topic 路由信息是如何变更的,以及何时变更?
3.3.4 定时 Topic 路由信息变更
在文章开头,Producer 初始化启动时,分析过里面会启动 5 个后台任务,其中一个就是 「定时更新 Topic 路由信息」每隔 30 秒调用一次,延迟 10 ms 执行。
那么 Topic 路由信息在何时会发生变化或者更新呢?下面举例说明两种场景,假如 RocketMQ 集群有 3 台 master:
分别向其中的 2 台发送了创建 topic 的命令,此时所有的客户端都知道了该 topic 的数据在这两台 broker 上。此时通过 mqadmin 向第 3 台 broker 发送创建 topic 命令时,nameServer 上的路由信息发生了变更,等客户端 30 秒轮询后,即可以拿到最新的topic 路由信息。此时 topic 分别在 3 台 broker 上都创建了,如果某台 broker 宕机,nameServer 将其摘除,等客户端 30 秒轮询后,即可以拿到最新的topic 路由信息。
这里需要注意的是:客户端路由变更是依赖这 30 秒的轮询服务,如果此时路由信息已经发生变更,但还未到轮询时间,此时客户端还拿着旧的 Topic 路由信息去访问,会出现短暂报错。
下面通过一张图来描述下 Topic 路由信息变更的全过程,如下图所示:
这里不展开讲里面的细节,等源码剖析时再详细展开,深度剖析里面的实现细节。
需要注意的是:上图中 TopicPublishInfo 是由 TopicRouteData 变种而来,多了一个 messageQueueList 的属性,在 producer 端,该属性为写入队列,即某个topic所有的可写入的队列集合。
3.4 与 Broker 之间心跳
接下来,我们来看看与 Broker 之间的心跳流程,这里主要分两部分:
定时向有效 Broker 发送心跳。
清理无效 Broker。
3.4.1 定时向 Broker 发送心跳
先来看第一部分,发送心跳数据的流程,如下图所示:
3.4.2 清理无效 Broker
我们知道 RocketMQ 会获取所有已经注册的 Topic 所在的 broker 信息,并将这些信息存储变量brokerAddrTable 中,其存储结构如下:
private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
ConcurrentMap 的 key: 代表 brokerName。
ConcurrentMap 的 value:HashMap<Long, String>,其中这里的 key 代表 brokerId(其中 master 的 brokerId 始终为0),value 代表 ip地址。
那么如何去判断某个 broker 是否无效呢?
其实也比较简单,就是判断 broker 是否不存在于 MQClientInstance#topicRouteTable 这个变量中,它是从nameServer 中同步过来的。
举例说明:如果 brokerAddrTable 中有 broker 1,2,3,而 topicRouteTable 只有 broker 1,2 的话,那么就需要从 brokerAddrTable 中删除 3。
04 RocketMQ Producer 之发送流程
等讲解完上面的一些前提知识,等待 Producer 启动完成之后,就可以开始发送消息了。可以看到在 DefaultMQProducer 中发送消息的方法非常多,大致可进行如下分类:
普通消息:没有什么特殊的地方,就是普通消息。
延迟消息:延时消息在投递时,需要设置指定的延时级别,即等到特定的时间间隔后消息才会被消费者消费。mq服务端 ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列。目前RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息,即 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。
顺序消息:对于指定的一个 Topic,Producer 保证消息顺序的发到一个队列中,消费的时候需要保证队列的数据只有一个线程消费。
事务消息:通过两阶段提交、状态定时回查来保证消息一定发到 broker。
这里以发送普通消息为例,用一张图来说明下其发送流程。
这里我们来重点看下「MessageQueue 选择机制」,对于 Kafka 来说是选择分区去发送,这里是选择队列去发送。
4.1 MessageQueue 选择机制
我们知道,一个 Topic 的数据是「分片存储」在一个或者多个 Broker 上的,底层的存储介质为「MessageQueue」,我们来看下 Producer 端是如何选择具体发送到哪个 MessageQueue 上的?
在 Producer 中是通过 「selectOneMessageQueue」方法来进行「MessageQueue」选择,该方法通过「Topic 详细元数据」和「上一次选择的 MessageQueue 所在的 Broker」,来决定下一次的选择。
那么其「核心的选择逻辑」究竟是怎么样的呢?
其实也比较简单,就是先选择出来一个 index,首次选择时肯定是没有的,此时 RocketMQ 会先算出一个随机值,然后在此基础上 + 1。
然后将其和当前 Topic 的 MessageQueue 首数量进行取模。
除了默认方案,RocketMQ 还提供了 「发送故障延迟」方案。
4.2 MessageQueue 发送故障延迟机制
在实际的选择过程中,会判断当前是否启用了「发送故障延迟」,这个由变量 sendLatencyFaultEnable 的值决定,其默认值是 false,开启后即执行 「发送故障延迟」方案。
那么其「核心的选择逻辑」究竟是怎么样的呢?
开启 for 循环,次数为 MessageQueue 的数量,选择出一个 MessageQueue ,方法与默认方案的方法相同。
通过内存表 faultItemTable 校验该 MessageQueue 对应的 broker 是否可用,可用则直接返回。
如果不可用则在尝试从规避的 Broker 中选择一个可用的 broker,如果选出来的 broker 有写队列则返回。
如果无可写队列则最后再用默认方案选出一个 MessageQueue 返回。
下面通过一张图来形象的描述下上面的选择 MessageQueue 过程。
4.3 故障延迟关系对应表
Producer 端在发送消息时会根据发送消息耗时来更新 broker 的「不可用周期时间」,消息耗时 与 broker 不可用周期对应关系图如下。
当发送消息耗时在 0~100ms 时,不可用周期为0s。
发送消息耗时在 100ms~550ms 时,broker 不可用周期为 30s。以此类推,当发送消息耗时在 3s~15s 时,broker 不可用周期为 10min。
在这种机制下,每次发送消息的时候,通过轮询方式选择出一个 MessageQueue ,就会去判断这个 broker 是否在延迟时间内,如果还需要等待,那就继续轮询下一个 MessageQueue 。
如果都在延迟时间内,那就找一个相对可用的,如果此时也没有相对可用的,那就继续轮询,直接返回对应的MessageQueue 。
最后调用 Netty 网络通信组件将消息发送出去。
05 RocketMQ Producer 之总结
这里,我们一起来总结一下这篇文章的重点。
1、通过三种发送方式「同步消息」、「异步消息」、「单向消息」带你深度剖析了 RocketMQ 中生产者的启动流程以及启动过程中的各种实现细节。
2、带你深度剖析了 RocketMQ 中生产者的前提知识点,包括「Netty 服务端」、「消息格式」、「Topic 路由消息」、「与 Broker 之间心跳」等。
3、带你深度剖析了 RocketMQ 中生产者的发送流程,重点剖析了「MessageQueue 默认选择机制」、「MessageQueue 发送故障延迟机制」这两种实现方案的细节。
下篇我们来深度剖析「NameServer 架构设计」,大家期待,我们下期见