当前位置: 首页> 汽车> 新车 > 山西疫情最新今天_网络代理端口是什么_网站seo检测工具_策划方案怎么做

山西疫情最新今天_网络代理端口是什么_网站seo检测工具_策划方案怎么做

时间:2025/7/12 2:17:58来源:https://blog.csdn.net/weixin_45569664/article/details/144425177 浏览次数: 0次
山西疫情最新今天_网络代理端口是什么_网站seo检测工具_策划方案怎么做

kafka客户端调用

  • springboot整合kafka
  • java调用kafka
  • 其他问题

springboot整合kafka

  • 手动提交需要在配置文件配置kafka属性 kafka.listener.ack-mode: manual
    
    @Component
    public class MyKafkaListener {@Autowiredprivate SaslClient saslClient;//监听所有分区@KafkaListener(topics ={ "主题" },groupId = "消费组")//监听指定分区
    //    @KafkaListener(
    //            topicPartitions ={
    //                    @TopicPartition(topic = "主题"
                partitionOffsets = {
                        @PartitionOffset(partition = "3", initialOffset = "323"),
                        @PartitionOffset(partition = "4", initialOffset = "6629")
                }
    //   ) },groupId = "消费组")public void onMessage(ConsumerRecord<String, String> record, Acknowledgment  ack) {try {saslClient.consume(record);} catch (Exception e) {e.printStackTrace();} finally{ack.acknowledge();//手动提交 }}}
    
  • yml增加配置
    kafka:listener:ack-mode: manual # 配置手动提交bootstrap-servers: # 服务组consumer:isolation-level: read-committedenable-auto-commit: false #关闭自动提交#auto-commit-interval: 1000auto-offset-reset: earliest #当各分区下无初始偏移量或偏移量不再可用时,从最早的消息记录开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 2properties:#安全认证security:protocol: SASL_PLAINTEXTsasl:mechanism: SCRAM-SHA-512jaas:config: org.apache.kafka.common.security.scram.ScramLoginModule required username="用户" password="密码";  session:timeout:ms: 24000max:poll:interval:ms: 30000
    

java调用kafka

  //主题@Value("${kafakaData.topic}")private String topic;//消费@Value("${kafkaData.group}")private String group;@Value("${kafkaData.jaas}")private String jaas;@Value("${kafkaData.key}")private String key;@Value("${kafkaData.brokers}")private String brokers; //需使用安全接入点的地址public void consume() throws Exception {Properties properties = new Properties();properties.put("security.protocol", "SASL_PLAINTEXT");properties.put("sasl.mechanism", "SCRAM-SHA-512");properties.put("bootstrap.servers", brokers);properties.put("group.id", group);properties.put("enable.auto.commit", "false");properties.put("auto.offset.reset", "earliest");properties.put("max.poll.records", 2); //每次poll的最大数量properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("sasl.jaas.config", jaas);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 重置消费者的偏移量到最早的偏移量consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));//deprecatedSystem.out.printf("poll records size = %d%n", records.count());try{for (ConsumerRecord<String, String> record : records) {//对加密数据解密String publicDecrypt = RSAUtil.publicDecrypt(record.value(), RSAUtil.getPublicKey(key));JSONObject jsonObject = JSONObject.parseObject(publicDecrypt);String msg = jsonObject.getString("msg");String page = jsonObject.getString("page");String size = jsonObject.getString("size");String time = jsonObject.getString("time");String total = jsonObject.getString("total");String type = jsonObject.getString("type");String operation = jsonObject.getString("operation");//todo 业务处理}}catch (Exception e){e.printStackTrace();}finally {consumer.commitAsync();//手动提交}}}

其他问题

  • 每次消费一条数据必须提交,否则会影响分区,导致偏移量错位,后面就消费不到数据了
关键字:山西疫情最新今天_网络代理端口是什么_网站seo检测工具_策划方案怎么做

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: