Kafka 高性能消息队列
这里学习的 Kafka 与之前学习的 RabbitMQ 的区别是,Kafka 其实更注重的是流式数据,高吞吐的性能处理,而 RabbitMQ 性能比 Kafka 差,但提供了更多的特性。Kafka 适用于日志收集与处理等场景,一般只有数据量足够大才需要用到它。
注意,Kafka 是一种持久化的订阅-发布模式,可以用于存储数据,且在消息转发后不会丢弃。
基本概念
- Broker(代理节点)
- 这是 Kafka 集群中的服务器节点
- 每个 broker 负责存储数据并处理客户端请求
- 多个 broker 组成一个 Kafka 集群
- Controller(控制器)
- 是一个特殊的 broker,由集群选举产生
- 负责管理集群状态,如分区分配、leader 选举等
- 一个集群只有一个 controller
- Topic(主题)
- 是消息的逻辑分类
- 相当于一个消息队列或频道
- 比如可以有"订单"、"支付"等不同的 topic
- Partition(分区)
- 每个 topic 可以分成多个 partition
- partition 是数据存储和复制的基本单位
- 不同 partition 可以分布在不同 broker 上,实现负载均衡
- Message/Record(消息/记录)
- 这是 Kafka 中最基本的数据单元
- 包含 key、value、timestamp 等信息
- 发送到 topic 中的每条数据就是一条 message
它们之间的关系是:
Kafka集群
├── 多个 Broker (其中一个是 Controller)
└── 多个 Topic
└── 多个 Partition
└── 多条 Message
假设有一个电商系统,可能会有:
- "订单" topic:存储订单相关消息
- "支付" topic:存储支付相关消息
每个 topic 可能分成 3 个 partition,分布在不同 broker 上,提高并行处理能力。
下面介绍一下 kafka 的架构。
- 基本架构
生产者 -> Kafka集群 -> 消费者
↓
ZooKeeper/KRaft
- 核心组件和职责:
Producer(生产者)
- 负责发送消息到 Kafka 集群
- 可以指定分区策略(轮询、hash等)
- 支持同步或异步发送
Consumer(消费者)
- 从 Kafka 集群读取消息
- 可以组成 Consumer Group 进行消费
- 每个分区只能被同一组内的一个消费者消费
Consumer Group(消费者组)
- 多个消费者组成一组
- 实现消息的并行消费
- 提供故障转移能力
- 高可用机制:
副本机制,使用一主多备机制。
- 每个分区有多个副本(replica)
- 一个 leader 副本和多个 follower 副本
- follower 从 leader 同步数据
- 当 leader 故障时,从 follower 中选举新的 leader
- 示例架构:
Producer层
↓
Broker集群
├── Broker1 (Controller)
│ └── Partition1-Leader
│ └── Partition2-Follower
├── Broker2
│ └── Partition1-Follower
│ └── Partition2-Leader
├── Broker3
│ └── Partition1-Follower
│ └── Partition2-Follower
↓
Consumer Group层
- 关键特性:
- 分区提供并行处理能力
- 副本提供高可用性
- Consumer Group 提供扩展性
- Controller 提供集中式管理
- 支持水平扩展
这种架构设计使 Kafka:
- 具有高吞吐量
- 可靠性强
- 容错性好
- 易于扩展
启动 Kafka
注意,在以前版本,Kafka 需要搭配 Zookeeper。但是现在 Kraft 是更推荐的机制。
这里直接使用 Docker 启动一个 Kafka 集群。
docker pull apache/kafka-native:3.9.0
docker run -p 9092:9092 apache/kafka-native:3.9.0
使用 native 版本的镜像性能更好,不过在我们测试过程中这不重要。
Kafka 的配置文件在 Container 的 /opt/kafka/config
中,也可以在启动 Container 时使用环境变量配置。
上面这样只启动了一个容器,以下是一个集群的 Docker Compose,在生产环境中请使用 k8s 或 ansible 搭建集群。
version: '3.8'
networks:
kafka-net:
driver: bridge
services:
kafka1:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka1
container_name: kafka1
networks:
- kafka-net
ports:
- "19092:19092" # External access
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka1:9092,EXTERNAL://localhost:19092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
CLUSTER_ID: "m1Ze6AjGRwqarkcxJscgyQ"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- ./kafka1/data:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka2
container_name: kafka2
networks:
- kafka-net
ports:
- "19093:19093"
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19093"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka2:9092,EXTERNAL://localhost:19093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
CLUSTER_ID: "m1Ze6AjGRwqarkcxJscgyQ"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- ./kafka2/data:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka3
container_name: kafka3
networks:
- kafka-net
ports:
- "19094:19094"
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19094"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka3:9092,EXTERNAL://localhost:19094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
CLUSTER_ID: "m1Ze6AjGRwqarkcxJscgyQ"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- ./kafka3/data:/var/lib/kafka/data
这里的配置需要注意,比较复杂,
这里的配置需要注意以下几点:
-
KAFKA_KRAFT_MODE:设置为
true
表示启用了Kafka的KRaft模式,即不依赖Zookeeper。这是Kafka的新特性,推荐在新版本中使用。 -
KAFKA_PROCESS_ROLES:指定每个Kafka节点的角色。在此配置中,每个节点都同时作为控制器和代理(
controller,broker
)。这允许Kafka集群在没有独立控制器节点的情况下运行。 -
KAFKA_NODE_ID:为每个节点分配一个唯一的ID。这里分别为1、2和3,分别代表三个Kafka节点。
-
KAFKA_CONTROLLER_QUORUM_VOTERS:配置集群中控制器的选举成员,这里包括所有三个节点。控制器负责管理Kafka集群的元数据和协调工作。
-
KAFKA_LISTENERS 和 KAFKA_ADVERTISED_LISTENERS:这些环境变量用于指定Kafka节点的监听地址和广告地址。
INTERNAL
用于集群内部通信,EXTERNAL
则用于外部访问。 -
KAFKA_LOG_DIRS:指定Kafka数据存储的路径。每个节点都需要一个单独的卷来存储其日志数据。
-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:这是用来指定不同监听器协议的映射。在这个配置中,
CONTROLLER
、INTERNAL
、EXTERNAL
三个协议都设置为PLAINTEXT
。这意味着通信不加密。如果需要更安全的通信,可以选择SSL
或SASL
等协议。 -
KAFKA_CONTROLLER_LISTENER_NAMES:用于指定控制器监听器的名称。在这种情况下,控制器监听器的名称是
CONTROLLER
,它会与KAFKA_LISTENERS
中的CONTROLLER
协议配对。 -
KAFKA_INTER_BROKER_LISTENER_NAME:指定集群中节点之间(即代理之间)通信使用的监听器名称。这里设置为
INTERNAL
,意味着内部的节点间通信会使用INTERNAL
协议。 -
CLUSTER_ID:这是集群的唯一标识符。在启用KRaft模式时,Kafka需要一个
CLUSTER_ID
来区分不同的集群。确保在所有节点上使用相同的CLUSTER_ID
。这里必须是一个 base64 UUID,随机生成一个即可。 -
KAFKA_LOG_DIRS:指定Kafka日志的存储位置。每个Kafka容器都需要持久化存储,以避免数据丢失。通常会将容器的存储目录映射到本地的某个路径,比如
./kafka1/data
、./kafka2/data
等。
操纵 Kafka
这里我们使用 NodeJS 操纵 Kafka 集群。当然,单机 NodeJS 处理可能性能不够,但是在微服务场景下,因为服务可以自由伸缩,因此单机的性能反而不是很重要了。
const { Kafka } = require('kafkajs');
// 基础配置
const kafka = new Kafka({
clientId: 'simple-client',
brokers: ['localhost:19092', 'localhost:19093', 'localhost:19094'],
connectionTimeout: 10000,
retry: { retries: 10 }
});
async function main() {
// 生产者
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'simple-topic',
messages: [{ value: 'Test Message' }],
acks: -1
});
console.log('消息发送成功');
// 消费者(解决协调器问题关键配置)
const consumer = kafka.consumer({
groupId: 'simple-group',
sessionTimeout: 30000,
retry: { retries: 15 }
});
await consumer.connect();
await consumer.subscribe({ topic: 'simple-topic', fromBeginning: true });
consumer.run({
eachMessage: async ({ message }) => {
console.log(`收到消息: ${message.value}`);
}
});
// 保持运行30秒
await new Promise(resolve => setTimeout(resolve, 30000));
await Promise.all([producer.disconnect(), consumer.disconnect()]);
}
main().catch(console.error);