1. 概述
Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并开源,具有高吞吐、低延迟、可水平扩展等特性。它广泛应用于实时数据管道、日志聚合、事件溯源、消息队列等场景。
2. 核心原理
2.1 架构组成
- Broker:Kafka 集群中的单个节点,负责消息存储和传递。
- Producer:向 Kafka Topic 发布消息的客户端。
- Consumer:从 Topic 订阅并消费消息的客户端。
- ZooKeeper(旧版本) / KRaft(新版本):管理集群元数据和协调节点(Kafka 2.8+ 开始支持去 ZooKeeper 化)。
2.2 关键概念
- Topic:消息的逻辑分类,生产者发送到 Topic,消费者从 Topic 读取。
- Partition:Topic 的分区,每个 Partition 是有序、不可变的消息序列。
- Offset:消息在 Partition 中的唯一标识(递增序号)。
- Consumer Group:多个消费者组成的组,共同消费一个 Topic,实现负载均衡。
2.3 数据存储
- 消息持久化到磁盘,通过分段(Segment)和索引机制实现高效读写。
- 数据保留策略:基于时间或大小自动清理旧数据。
2.4 高可用性
- 副本机制(Replication):每个 Partition 有多个副本,Leader 处理读写,Followers 同步数据。
- ISR(In-Sync Replicas):与 Leader 保持同步的副本集合,用于故障恢复。
3. 优缺点分析
优点
- 高吞吐:单机可达百万消息/秒。
- 持久化存储:消息可长期保留,支持重复消费。
- 水平扩展:通过增加 Broker 和 Partition 轻松扩容。
- 容错性:副本机制保障数据不丢失。
- 多语言支持:提供 Java、Python 等客户端 API。
缺点
- 运维复杂:需管理 ZooKeeper/KRaft、Broker、Topic 等组件。
- 消息延迟:默认配置不适合亚毫秒级延迟场景。
- 功能单一:无复杂路由、事务消息等高级特性(需结合其他工具)。
- 资源消耗:高并发下 CPU/内存占用较高。
4. 使用方法
4.1 安装与启动
- 下载 Kafka:
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
- 启动 ZooKeeper(旧版本):
bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动 Kafka Broker:
bin/kafka-server-start.sh config/server.properties
4.2 创建 Topic
bin/kafka-topics.sh --create --topic demo-topic \--bootstrap-server localhost:9092 \--partitions 3 --replication-factor 1
4.3 生产者/消费者命令行测试
- 启动生产者:
bin/kafka-console-producer.sh --topic demo-topic --bootstrap-server localhost:9092
- 启动消费者:
bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server localhost:9092 --from-beginning
5. Java 客户端示例
5.1 添加 Maven 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version>
</dependency>
5.2 生产者代码
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", "key-" + i, "value-" + i);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("Sent to partition %d, offset %d%n", metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.close();}
}
5.3 消费者代码
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("demo-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received: partition=%d, offset=%d, key=%s, value=%s%n",record.partition(), record.offset(), record.key(), record.value());}}} finally {consumer.close();}}
}
6. 注意事项
- 配置调优:根据场景调整
batch.size
、linger.ms
(生产者)和max.poll.records
(消费者)。 - 消费者组管理:避免重复的
group.id
导致消费混乱。 - 事务支持:需要时启用
enable.idempotence=true
和事务 API。 - 监控:使用 Kafka Manager 或 JMX 监控集群状态。
7. 总结
Kafka 是构建实时数据管道的核心工具,适用于日志收集、事件流处理等场景。通过合理设计 Topic 分区和消费者组,结合 Java 客户端 API,可快速实现高可靠的消息系统。对于需要更强事务支持或复杂路由的场景,建议结合 RabbitMQ 或 Pulsar 使用。
技术视角的奇妙联想
“人生到处知何似,应似飞鸿踏雪泥”
若用分布式系统比喻:
雪泥:如同消息队列中的持久化存储(如Kafka的日志保留)
鸿爪:类似系统产生的数据痕迹(offset记录)
飞鸿:恰似流动的实时数据流(stream processing)