当前位置: 首页> 汽车> 报价 > 中国航空技术北京有限公司_web前端是什么专业_seo培训_软件推广赚佣金渠道

中国航空技术北京有限公司_web前端是什么专业_seo培训_软件推广赚佣金渠道

时间:2025/7/9 13:06:50来源:https://blog.csdn.net/weixin_74417835/article/details/147034611 浏览次数: 0次
中国航空技术北京有限公司_web前端是什么专业_seo培训_软件推广赚佣金渠道

Kafka基础知识

一、Kafka简介

1.1 什么是Kafka?

Kafka是一个分布式的流处理平台,具有以下特点:

  • 高吞吐量
  • 可持久化
  • 分布式
  • 可扩展
  • 高可靠性

1.2 基本概念

  • Topic:消息主题,消息以主题为单位进行归类
  • Partition:分区,一个主题可以有多个分区
  • Broker:消息中间件服务器
  • Producer:消息生产者
  • Consumer:消息消费者
  • Consumer Group:消费者组
  • Zookeeper:用于管理和协调Kafka代理

二、安装与配置

2.1 安装Kafka

# 下载Kafka
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz# 解压
tar -xzf kafka_2.13-3.5.1.tgz# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动Kafka
bin/kafka-server-start.sh config/server.properties

2.2 基础配置

# server.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

三、Topic管理

3.1 Topic操作

# 创建Topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \--replication-factor 1 --partitions 3 --topic test-topic# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看Topic详情
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 \--topic test-topic# 删除Topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 \--topic test-topic

四、Java API使用

4.1 添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version>
</dependency>

4.2 生产者示例

public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("Message sent successfully");} else {exception.printStackTrace();}});producer.close();}
}

4.3 消费者示例

public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

五、Spring Boot集成

5.1 添加依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

5.2 配置文件

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: test-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

5.3 生产者示例

@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("消息发送成功"),ex -> System.out.println("消息发送失败: " + ex.getMessage()));}
}

5.4 消费者示例

@Service
public class KafkaConsumerService {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(String message) {System.out.println("接收到消息: " + message);}
}

六、高级特性

6.1 分区策略

// 自定义分区器
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 自定义分区逻辑return Math.abs(key.hashCode() % numPartitions);}
}// 配置分区器
props.put("partitioner.class", "com.example.CustomPartitioner");

6.2 消费者组管理

// 手动提交偏移量
props.put("enable.auto.commit", "false");
consumer.poll(Duration.ofMillis(100));
consumer.commitSync();// 异步提交
consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.out.println("Commit failed for offsets: " + offsets);}
});

6.3 消息压缩

// 生产者配置压缩
props.put("compression.type", "gzip");  // 支持gzip, snappy, lz4, zstd

七、监控与运维

7.1 监控指标

  • 生产者指标

    • 消息发送速率
    • 消息大小
    • 请求延迟
  • 消费者指标

    • 消息消费速率
    • 消费延迟
    • 消费者组状态

7.2 常用命令

# 查看消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 查看消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group test-group# 查看Topic消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic --from-beginning

八、最佳实践

8.1 生产者最佳实践

  1. 可靠性配置

    // 确保消息可靠发送
    props.put("acks", "all");
    props.put("retries", 3);
    props.put("max.in.flight.requests.per.connection", 1);
    
  2. 性能优化

    // 批量发送
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    

8.2 消费者最佳实践

  1. 消费者配置

    // 设置合适的拉取大小
    props.put("max.poll.records", 500);// 设置合理的心跳时间
    props.put("heartbeat.interval.ms", 3000);
    
  2. 异常处理

    try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync();
    } catch (Exception e) {// 处理异常handleException(e);
    }
    

8.3 运维建议

  1. 监控告警

    • 监控消费延迟
    • 监控集群状态
    • 设置关键指标告警
  2. 容量规划

    • 评估消息量
    • 规划分区数量
    • 合理设置副本数
  3. 安全管理

    • 启用认证
    • 配置ACL权限
    • 加密传输

九、Kafka与Spring Cloud集成

9.1 Spring Cloud Stream

<!-- 添加Spring Cloud Stream依赖 -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

9.2 配置Spring Cloud Stream

# application.yml
spring:cloud:stream:kafka:binder:brokers: localhost:9092autoCreateTopics: truebindings:input:destination: input-topiccontentType: application/jsonoutput:destination: output-topiccontentType: application/json

9.3 消息生产者

@Service
public class MessageService {@Autowiredprivate MessageChannel output;public void sendMessage(Message<?> message) {output.send(message);}
}

9.4 消息消费者

@Service
public class MessageListener {@StreamListener("input")public void handleMessage(Message<?> message) {System.out.println("Received message: " + message.getPayload());}
}

十、Kafka与Docker部署

10.1 使用Docker运行Kafka

# 拉取Zookeeper镜像
docker pull wurstmeister/zookeeper# 拉取Kafka镜像
docker pull wurstmeister/kafka# 运行Zookeeper容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper# 运行Kafka容器
docker run -d --name kafka \-p 9092:9092 \-e KAFKA_ADVERTISED_HOST_NAME=localhost \-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \--link zookeeper:zookeeper \wurstmeister/kafka

10.2 使用Docker Compose部署Kafka集群

# docker-compose.yml
version: '3'
services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperports:- "2181:2181"networks:- kafka-networkkafka1:image: wurstmeister/kafkacontainer_name: kafka1ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka2:image: wurstmeister/kafkacontainer_name: kafka2ports:- "9093:9092"environment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka3:image: wurstmeister/kafkacontainer_name: kafka3ports:- "9094:9092"environment:KAFKA_BROKER_ID: 3KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3depends_on:- zookeepernetworks:- kafka-networkkafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uiports:- "8080:8080"environment:KAFKA_CLUSTERS_0_NAME: localKAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9092,kafka3:9092KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181depends_on:- kafka1- kafka2- kafka3networks:- kafka-networknetworks:kafka-network:driver: bridge

十一、Kafka常见问题解答

11.1 消息丢失问题

问题: 如何确保消息不丢失?
解决方案:

  1. 生产者端:

    // 设置acks为all,确保所有副本都收到消息
    props.put("acks", "all");// 设置重试次数
    props.put("retries", 3);// 使用同步发送或带回调的异步发送
    producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理发送失败的情况}
    });
    
  2. Broker端:

    # 设置副本因子
    replication.factor=3# 设置最小同步副本数
    min.insync.replicas=2# 启用幂等性
    enable.idempotence=true
    
  3. 消费者端:

    // 禁用自动提交
    props.put("enable.auto.commit", "false");// 手动提交偏移量
    try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync();
    } catch (Exception e) {// 处理异常
    }
    

11.2 消息重复消费问题

问题: 如何避免消息重复消费?
解决方案:

  1. 启用幂等性:

    // 生产者端启用幂等性
    props.put("enable.idempotence", "true");
    
  2. 消费者端实现幂等性:

    // 使用数据库唯一约束或分布式锁
    public void processMessage(ConsumerRecord<String, String> record) {String messageId = record.key();if (isMessageProcessed(messageId)) {return; // 消息已处理,跳过}// 处理消息processBusinessLogic(record.value());// 标记消息已处理markMessageAsProcessed(messageId);
    }
    

11.3 性能问题

问题: Kafka性能优化有哪些方法?
解决方案:

  1. 生产者优化:

    // 批量发送
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);// 压缩消息
    props.put("compression.type", "gzip");// 使用异步发送
    producer.send(record, callback);
    
  2. 消费者优化:

    // 增加单次拉取的消息数量
    props.put("max.poll.records", 500);// 增加消费者线程数
    ExecutorService executor = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 3; i++) {executor.submit(() -> {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);}});
    }
    
  3. Broker优化:

    # 增加分区数
    num.partitions=8# 优化日志段大小
    log.segment.bytes=1073741824# 优化刷盘策略
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    

11.4 连接问题

问题: 客户端连接Kafka失败怎么办?
解决方案:

  1. 检查Kafka服务是否正在运行
  2. 检查防火墙设置,确保端口9092开放
  3. 检查Kafka配置文件中的listeners和advertised.listeners设置
  4. 检查网络连接和DNS解析
  5. 检查客户端配置的bootstrap.servers是否正确

11.5 分区不平衡问题

问题: 如何解决分区不平衡问题?
解决方案:

  1. 手动重新分配分区:

    # 创建重新分配计划
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--topics-to-move-json-file reassign.json \--broker-list "0,1,2" --generate# 执行重新分配
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \--reassignment-json-file reassign.json --execute
    
  2. 自动平衡:

    # 启用自动平衡
    auto.leader.rebalance.enable=true
    

十二、Kafka高级应用场景

12.1 消息追踪

// 使用消息头传递追踪ID
public class MessageTracer {private static final String TRACE_ID = "trace_id";public static void addTraceId(ProducerRecord<String, String> record) {String traceId = UUID.randomUUID().toString();record.headers().add(TRACE_ID, traceId.getBytes());}public static String getTraceId(ConsumerRecord<String, String> record) {Header header = record.headers().lastHeader(TRACE_ID);return header != null ? new String(header.value()) : null;}
}// 生产者使用
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
MessageTracer.addTraceId(record);
producer.send(record, callback);// 消费者使用
String traceId = MessageTracer.getTraceId(record);
log.info("Processing message with trace ID: {}", traceId);

12.2 消息过滤

// 使用消费者拦截器过滤消息
public class MessageFilterInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {List<ConsumerRecord<String, String>> filteredRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (shouldProcess(record)) {filteredRecords.add(record);}}return new ConsumerRecords<>(filteredRecords);}private boolean shouldProcess(ConsumerRecord<String, String> record) {// 实现过滤逻辑return true;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 提交偏移量时的处理}@Overridepublic void close() {// 关闭资源}@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}
}// 配置拦截器
props.put("interceptor.classes", "com.example.MessageFilterInterceptor");

12.3 消息转换

// 使用消费者拦截器转换消息
public class MessageTransformInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {List<ConsumerRecord<String, String>> transformedRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {String transformedValue = transform(record.value());transformedRecords.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),record.key(), transformedValue, record.headers(), record.timestamp()));}return new ConsumerRecords<>(transformedRecords);}private String transform(String value) {// 实现转换逻辑return value.toUpperCase();}// 其他方法实现...
}

12.4 消息路由

// 根据消息内容路由到不同主题
public class MessageRouter {private final KafkaTemplate<String, String> kafkaTemplate;public MessageRouter(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void routeMessage(String message) {String topic = determineTopic(message);kafkaTemplate.send(topic, message);}private String determineTopic(message) {// 根据消息内容决定目标主题if (message.contains("error")) {return "error-topic";} else if (message.contains("warning")) {return "warning-topic";} else {return "info-topic";}}
}

12.5 消息聚合

// 使用Kafka Streams进行消息聚合
public class MessageAggregator {public static void main(String[] args) {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "message-aggregator");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();// 从输入主题读取消息KStream<String, String> input = builder.stream("input-topic");// 按key分组并计数KTable<String, Long> counts = input.groupByKey().count();// 将结果写入输出主题counts.toStream().to("output-topic");KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}

十三、Kafka最佳实践总结

13.1 主题设计最佳实践

  1. 命名规范

    • 使用小写字母
    • 使用连字符或下划线分隔单词
    • 包含业务领域信息
    • 包含数据类型信息
  2. 分区数量

    • 考虑吞吐量需求
    • 考虑消费者并行度
    • 考虑未来扩展性
    • 一般建议:分区数 = 目标吞吐量 / 单个分区吞吐量
  3. 副本因子

    • 生产环境至少为3
    • 考虑可用性需求
    • 考虑存储成本

13.2 消息设计最佳实践

  1. 消息格式

    • 使用JSON或Avro等标准格式
    • 包含必要的元数据
    • 版本控制
    • 向后兼容
  2. 消息大小

    • 控制消息大小,避免过大
    • 考虑压缩
    • 大消息考虑分片或外部存储
  3. 消息键设计

    • 使用有意义的键
    • 确保键的均匀分布
    • 考虑分区策略

13.3 生产者最佳实践

  1. 可靠性配置

    // 确保消息可靠发送
    props.put("acks", "all");
    props.put("retries", 3);
    props.put("max.in.flight.requests.per.connection", 1);
    
  2. 性能优化

    // 批量发送
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);// 压缩
    props.put("compression.type", "gzip");
    
  3. 异常处理

    producer.send(record, (metadata, exception) -> {if (exception != null) {// 记录错误log.error("Failed to send message", exception);// 重试或告警}
    });
    

13.4 消费者最佳实践

  1. 消费者配置

    // 设置合适的拉取大小
    props.put("max.poll.records", 500);// 设置合理的心跳时间
    props.put("heartbeat.interval.ms", 3000);// 设置合理的会话超时时间
    props.put("session.timeout.ms", 30000);
    
  2. 消费者组管理

    • 合理设置消费者组ID
    • 监控消费者组状态
    • 处理消费者组再平衡
  3. 偏移量管理

    // 手动提交偏移量
    try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));processRecords(records);consumer.commitSync();
    } catch (Exception e) {// 处理异常
    }
    

13.5 运维最佳实践

  1. 监控告警

    • 监控Broker状态
    • 监控消费者延迟
    • 监控分区数量
    • 监控消息积压
  2. 容量规划

    • 评估消息量
    • 规划分区数量
    • 规划存储空间
    • 规划网络带宽
  3. 安全管理

    • 启用认证
    • 配置ACL权限
    • 加密传输
    • 定期审计
  4. 备份恢复

    • 定期备份配置
    • 测试恢复流程
    • 制定灾难恢复计划
关键字:中国航空技术北京有限公司_web前端是什么专业_seo培训_软件推广赚佣金渠道

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: