当前位置: 首页> 汽车> 时评 > e龙岩官网_中文网站域名注册_惠州seo建站_营销网站建设流程

e龙岩官网_中文网站域名注册_惠州seo建站_营销网站建设流程

时间:2025/7/29 21:16:45来源:https://blog.csdn.net/weixin_44231698/article/details/146401566 浏览次数: 1次
e龙岩官网_中文网站域名注册_惠州seo建站_营销网站建设流程

文章目录

  • 1.如何自定义分区机制
  • 2.示例


1.如何自定义分区机制

若需要使用自定义分区机制,需要完成两件事:
1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中实现。
2)在用于构造KafkaProducer的Properties对象中设置 partitioner.class 参数。

2.示例

假设我们的消息中有一些消息是用于审计功能的,这类消息的 key 会被固定地分配一个字符串“audit”。我们想要让这类消息发送到 topic 的最后一个分区上,便于后续统一处理,而对于相同 topic 下的其他消息则采用随机发送的策略发送到其他分区上。那么现在就可以这样来实现自定义的分区策略,如下列代码所示:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class AuditPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> map) {//该方法实现必要资源的初始化工作random= new Random();}@Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object valueObj, byte[] valueBytes, Cluster cluster) {String key=(String)keyObj;//从集群元数据中把属于该topic的所有分区信息都读取出供分区策略使用List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);int partitionCount =partitionInfoList.size();int auditPartition=partitionCount-1;return key == null|| key.isEmpty()|| !key.contains ("audit")?random.nextInt(partitionCount-1):auditPartition;}@Overridepublic void close() {//该方法实现必要资源的清理工作}
}

创建好自定义分区策略类后,在构建KafkaProducer 之前为Properties增加该属性;代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.custompartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord nonKeyRecord = new ProducerRecord("topic-test","non-key record");ProducerRecord auditRecord = new ProducerRecord("topic-test", "audit","audit record");ProducerRecord nonAuditRecord =new ProducerRecord("topic-test","other","non-sudit record");producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.send(auditRecord).get();producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.close();}
}
关键字:e龙岩官网_中文网站域名注册_惠州seo建站_营销网站建设流程

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: