Kafka基础知识
一、Kafka简介
1.1 什么是Kafka?
Kafka是一个分布式的流处理平台,具有以下特点:
- 高吞吐量
- 可持久化
- 分布式
- 可扩展
- 高可靠性
1.2 基本概念
- Topic:消息主题,消息以主题为单位进行归类
- Partition:分区,一个主题可以有多个分区
- Broker:消息中间件服务器
- Producer:消息生产者
- Consumer:消息消费者
- Consumer Group:消费者组
- Zookeeper:用于管理和协调Kafka代理
二、安装与配置
2.1 安装Kafka
# 下载Kafka
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz# 解压
tar -xzf kafka_2.13-3.5.1.tgz# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动Kafka
bin/kafka-server-start.sh config/server.properties
2.2 基础配置
# server.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
三、Topic管理
3.1 Topic操作
# 创建Topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \--replication-factor 1 --partitions 3 --topic test-topic# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看Topic详情
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 \--topic test-topic# 删除Topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 \--topic test-topic
四、Java API使用
4.1 添加依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version>
</dependency>
4.2 生产者示例
public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("Message sent successfully");} else {exception.printStackTrace();}});producer.close();}
}
4.3 消费者示例
public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
五、Spring Boot集成
5.1 添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
5.2 配置文件
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: test-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5.3 生产者示例
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("消息发送成功"),ex -> System.out.println("消息发送失败: " + ex.getMessage()));}
}
5.4 消费者示例
@Service
public class KafkaConsumerService {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(String message) {System.out.println("接收到消息: " + message);}
}
六、高级特性
6.1 分区策略
// 自定义分区器
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 自定义分区逻辑return Math.abs(key.hashCode() % numPartitions);}
}// 配置分区器
props.put("partitioner.class", "com.example.CustomPartitioner");
6.2 消费者组管理
// 手动提交偏移量
props.put("enable.auto.commit", "false");
consumer.poll(Duration.ofMillis(100));
consumer.commitSync();// 异步提交
consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.out.println("Commit failed for offsets: " + offsets);}
});
6.3 消息压缩
// 生产者配置压缩
props.put("compression.type", "gzip"); // 支持gzip, snappy, lz4, zstd
七、监控与运维
7.1 监控指标
-
生产者指标
- 消息发送速率
- 消息大小
- 请求延迟
-
消费者指标
- 消息消费速率
- 消费延迟
- 消费者组状态
7.2 常用命令
# 查看消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 查看消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group test-group# 查看Topic消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic --from-beginning
八、最佳实践
8.1 生产者最佳实践
-
可靠性配置
// 确保消息可靠发送 props.put("acks", "all"); props.put("retries", 3); props.put("max.in.flight.requests.per.connection", 1);
-
性能优化
// 批量发送 props.put("batch.size", 16384); props.put("linger.ms", 1);
8.2 消费者最佳实践
-
消费者配置
// 设置合适的拉取大小 props.put("max.poll.records", 500);// 设置合理的心跳时间 props.put("heartbeat.interval.ms", 3000);
-
异常处理
try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync(); } catch (Exception e) {// 处理异常handleException(e); }
8.3 运维建议
-
监控告警
- 监控消费延迟
- 监控集群状态
- 设置关键指标告警
-
容量规划
- 评估消息量
- 规划分区数量
- 合理设置副本数
-
安全管理
- 启用认证
- 配置ACL权限
- 加密传输
九、Kafka与Spring Cloud集成
9.1 Spring Cloud Stream
<!-- 添加Spring Cloud Stream依赖 -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
9.2 配置Spring Cloud Stream
# application.yml
spring:cloud:stream:kafka:binder:brokers: localhost:9092autoCreateTopics: truebindings:input:destination: input-topiccontentType: application/jsonoutput:destination: output-topiccontentType: application/json
9.3 消息生产者
@Service
public class MessageService {@Autowiredprivate MessageChannel output;public void sendMessage(Message<?> message) {output.send(message);}
}
9.4 消息消费者
@Service
public class MessageListener {@StreamListener("input")public void handleMessage(Message<?> message) {System.out.println("Received message: " + message.getPayload());}
}
十、Kafka与Docker部署
10.1 使用Docker运行Kafka
# 拉取Zookeeper镜像
docker pull wurstmeister/zookeeper# 拉取Kafka镜像
docker pull wurstmeister/kafka# 运行Zookeeper容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper# 运行Kafka容器
docker run -d --name kafka \-p 9092:9092 \-e KAFKA_ADVERTISED_HOST_NAME=localhost \-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \--link zookeeper:zookeeper \wurstmeister/kafka
10.2 使用Docker Compose部署Kafka集群
# docker-compose.yml
version: '3'
services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperports:- "2181:2181"networks:- kafka-networkkafka1:image: wurstmeister/kafkacontainer_name: kafka1ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka2:image: wurstmeister/kafkacontainer_name: kafka2ports:- "9093:9092"environment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka3:image: wurstmeister/kafkacontainer_name: kafka3ports:- "9094:9092"environment:KAFKA_BROKER_ID: 3KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uiports:- "8080:8080"environment:KAFKA_CLUSTERS_0_NAME: localKAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9092,kafka3:9092KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181depends_on:- kafka1- kafka2- kafka3networks:- kafka-networknetworks:kafka-network:driver: bridge
十一、Kafka常见问题解答
11.1 消息丢失问题
问题: 如何确保消息不丢失?
解决方案:
-
生产者端:
// 设置acks为all,确保所有副本都收到消息 props.put("acks", "all");// 设置重试次数 props.put("retries", 3);// 使用同步发送或带回调的异步发送 producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理发送失败的情况} });
-
Broker端:
# 设置副本因子 replication.factor=3# 设置最小同步副本数 min.insync.replicas=2# 启用幂等性 enable.idempotence=true
-
消费者端:
// 禁用自动提交 props.put("enable.auto.commit", "false");// 手动提交偏移量 try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync(); } catch (Exception e) {// 处理异常 }
11.2 消息重复消费问题
问题: 如何避免消息重复消费?
解决方案:
-
启用幂等性:
// 生产者端启用幂等性 props.put("enable.idempotence", "true");
-
消费者端实现幂等性:
// 使用数据库唯一约束或分布式锁 public void processMessage(ConsumerRecord<String, String> record) {String messageId = record.key();if (isMessageProcessed(messageId)) {return; // 消息已处理,跳过}// 处理消息processBusinessLogic(record.value());// 标记消息已处理markMessageAsProcessed(messageId); }
11.3 性能问题
问题: Kafka性能优化有哪些方法?
解决方案:
-
生产者优化:
// 批量发送 props.put("batch.size", 16384); props.put("linger.ms", 1);// 压缩消息 props.put("compression.type", "gzip");// 使用异步发送 producer.send(record, callback);
-
消费者优化:
// 增加单次拉取的消息数量 props.put("max.poll.records", 500);// 增加消费者线程数 ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) {executor.submit(() -> {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);}}); }
-
Broker优化:
# 增加分区数 num.partitions=8# 优化日志段大小 log.segment.bytes=1073741824# 优化刷盘策略 log.flush.interval.messages=10000 log.flush.interval.ms=1000
11.4 连接问题
问题: 客户端连接Kafka失败怎么办?
解决方案:
- 检查Kafka服务是否正在运行
- 检查防火墙设置,确保端口9092开放
- 检查Kafka配置文件中的listeners和advertised.listeners设置
- 检查网络连接和DNS解析
- 检查客户端配置的bootstrap.servers是否正确
11.5 分区不平衡问题
问题: 如何解决分区不平衡问题?
解决方案:
-
手动重新分配分区:
# 创建重新分配计划 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--topics-to-move-json-file reassign.json \--broker-list "0,1,2" --generate# 执行重新分配 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file reassign.json --execute
-
自动平衡:
# 启用自动平衡 auto.leader.rebalance.enable=true
十二、Kafka高级应用场景
12.1 消息追踪
// 使用消息头传递追踪ID
public class MessageTracer {private static final String TRACE_ID = "trace_id";public static void addTraceId(ProducerRecord<String, String> record) {String traceId = UUID.randomUUID().toString();record.headers().add(TRACE_ID, traceId.getBytes());}public static String getTraceId(ConsumerRecord<String, String> record) {Header header = record.headers().lastHeader(TRACE_ID);return header != null ? new String(header.value()) : null;}
}// 生产者使用
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
MessageTracer.addTraceId(record);
producer.send(record, callback);// 消费者使用
String traceId = MessageTracer.getTraceId(record);
log.info("Processing message with trace ID: {}", traceId);
12.2 消息过滤
// 使用消费者拦截器过滤消息
public class MessageFilterInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {List<ConsumerRecord<String, String>> filteredRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (shouldProcess(record)) {filteredRecords.add(record);}}return new ConsumerRecords<>(filteredRecords);}private boolean shouldProcess(ConsumerRecord<String, String> record) {// 实现过滤逻辑return true;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 提交偏移量时的处理}@Overridepublic void close() {// 关闭资源}@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}
}// 配置拦截器
props.put("interceptor.classes", "com.example.MessageFilterInterceptor");
12.3 消息转换
// 使用消费者拦截器转换消息
public class MessageTransformInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {List<ConsumerRecord<String, String>> transformedRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {String transformedValue = transform(record.value());transformedRecords.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),record.key(), transformedValue, record.headers(), record.timestamp()));}return new ConsumerRecords<>(transformedRecords);}private String transform(String value) {// 实现转换逻辑return value.toUpperCase();}// 其他方法实现...
}
12.4 消息路由
// 根据消息内容路由到不同主题
public class MessageRouter {private final KafkaTemplate<String, String> kafkaTemplate;public MessageRouter(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void routeMessage(String message) {String topic = determineTopic(message);kafkaTemplate.send(topic, message);}private String determineTopic(message) {// 根据消息内容决定目标主题if (message.contains("error")) {return "error-topic";} else if (message.contains("warning")) {return "warning-topic";} else {return "info-topic";}}
}
12.5 消息聚合
// 使用Kafka Streams进行消息聚合
public class MessageAggregator {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "message-aggregator");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();// 从输入主题读取消息KStream<String, String> input = builder.stream("input-topic");// 按key分组并计数KTable<String, Long> counts = input.groupByKey().count();// 将结果写入输出主题counts.toStream().to("output-topic");KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}
十三、Kafka最佳实践总结
13.1 主题设计最佳实践
-
命名规范
- 使用小写字母
- 使用连字符或下划线分隔单词
- 包含业务领域信息
- 包含数据类型信息
-
分区数量
- 考虑吞吐量需求
- 考虑消费者并行度
- 考虑未来扩展性
- 一般建议:分区数 = 目标吞吐量 / 单个分区吞吐量
-
副本因子
- 生产环境至少为3
- 考虑可用性需求
- 考虑存储成本
13.2 消息设计最佳实践
-
消息格式
- 使用JSON或Avro等标准格式
- 包含必要的元数据
- 版本控制
- 向后兼容
-
消息大小
- 控制消息大小,避免过大
- 考虑压缩
- 大消息考虑分片或外部存储
-
消息键设计
- 使用有意义的键
- 确保键的均匀分布
- 考虑分区策略
13.3 生产者最佳实践
-
可靠性配置
// 确保消息可靠发送 props.put("acks", "all"); props.put("retries", 3); props.put("max.in.flight.requests.per.connection", 1);
-
性能优化
// 批量发送 props.put("batch.size", 16384); props.put("linger.ms", 1);// 压缩 props.put("compression.type", "gzip");
-
异常处理
producer.send(record, (metadata, exception) -> {if (exception != null) {// 记录错误log.error("Failed to send message", exception);// 重试或告警} });
13.4 消费者最佳实践
-
消费者配置
// 设置合适的拉取大小 props.put("max.poll.records", 500);// 设置合理的心跳时间 props.put("heartbeat.interval.ms", 3000);// 设置合理的会话超时时间 props.put("session.timeout.ms", 30000);
-
消费者组管理
- 合理设置消费者组ID
- 监控消费者组状态
- 处理消费者组再平衡
-
偏移量管理
// 手动提交偏移量 try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync(); } catch (Exception e) {// 处理异常 }
13.5 运维最佳实践
-
监控告警
- 监控Broker状态
- 监控消费者延迟
- 监控分区数量
- 监控消息积压
-
容量规划
- 评估消息量
- 规划分区数量
- 规划存储空间
- 规划网络带宽
-
安全管理
- 启用认证
- 配置ACL权限
- 加密传输
- 定期审计
-
备份恢复
- 定期备份配置
- 测试恢复流程
- 制定灾难恢复计划