kafka客户端调用
- springboot整合kafka
- java调用kafka
- 其他问题
springboot整合kafka
- 手动提交需要在配置文件配置kafka属性 kafka.listener.ack-mode: manual
@Component
public class MyKafkaListener {@Autowiredprivate SaslClient saslClient;@KafkaListener(topics ={ "主题" },groupId = "消费组")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {try {saslClient.consume(record);} catch (Exception e) {e.printStackTrace();} finally{ack.acknowledge();}}}
- yml增加配置
kafka:listener:ack-mode: manual bootstrap-servers: consumer:isolation-level: read-committedenable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 2properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: SCRAM-SHA-512jaas:config: org.apache.kafka.common.security.scram.ScramLoginModule required username="用户" password="密码"; session:timeout:ms: 24000max:poll:interval:ms: 30000
java调用kafka
@Value("${kafakaData.topic}")private String topic;@Value("${kafkaData.group}")private String group;@Value("${kafkaData.jaas}")private String jaas;@Value("${kafkaData.key}")private String key;@Value("${kafkaData.brokers}")private String brokers; public void consume() throws Exception {Properties properties = new Properties();properties.put("security.protocol", "SASL_PLAINTEXT");properties.put("sasl.mechanism", "SCRAM-SHA-512");properties.put("bootstrap.servers", brokers);properties.put("group.id", group);properties.put("enable.auto.commit", "false");properties.put("auto.offset.reset", "earliest");properties.put("max.poll.records", 2); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("sasl.jaas.config", jaas);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));System.out.printf("poll records size = %d%n", records.count());try{for (ConsumerRecord<String, String> record : records) {String publicDecrypt = RSAUtil.publicDecrypt(record.value(), RSAUtil.getPublicKey(key));JSONObject jsonObject = JSONObject.parseObject(publicDecrypt);String msg = jsonObject.getString("msg");String page = jsonObject.getString("page");String size = jsonObject.getString("size");String time = jsonObject.getString("time");String total = jsonObject.getString("total");String type = jsonObject.getString("type");String operation = jsonObject.getString("operation");}}catch (Exception e){e.printStackTrace();}finally {consumer.commitAsync();}}}
其他问题
- 每次消费一条数据必须提交,否则会影响分区,导致偏移量错位,后面就消费不到数据了