当前位置: 首页> 房产> 建筑 > RocketMQ源码分析 - Producer

RocketMQ源码分析 - Producer

时间:2025/7/15 14:34:06来源:https://blog.csdn.net/muriyue6/article/details/141356747 浏览次数:0次

RocketMQ源码分析 - Producer

    • Producer
      • 1方法和属性
        • 1) 主要方法介绍
        • 2) 属性介绍
      • 2启动流程
      • 3消息发送
        • 1) 验证消息
        • 2) 查找路由
        • 3) 选择队列
        • 4) 发送消息
      • 4批量消息发送

Producer

消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。
在这里插入图片描述

1方法和属性

1) 主要方法介绍

在这里插入图片描述

//创建主题
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
//根据时间戳从队列中查找消息偏移量
long searchOffset(final MessageQueue mq, final long timestamp)
//查找消息队列中最大的偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
//查找消息队列中最小的偏移量
long minOffset(final MessageQueue mq) 
//根据偏移量查找消息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,InterruptedException, MQClientException;
//根据条件查找消息
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end) throws MQClientException, InterruptedException;
//根据消息ID和主题查找消息
MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

在这里插入图片描述

//启动
void start() throws MQClientException;
//关闭
void shutdown();
//查找该主题下所有消息
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
//同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;
//异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;
//异步超时发送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;
//发送单向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;
//选择指定队列同步发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)throws MQClientException, RemotingException, InterruptedException;
//选择指定队列单项发送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, InterruptedException;
//批量发送消息
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
2) 属性介绍

在这里插入图片描述

producerGroup:生产者所属组
createTopicKey:默认Topic
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M

2启动流程

在这里插入图片描述
代码:DefaultMQProducerImpl(start方式)

//检查生产者组是否满足要求
this.checkConfig();
//更改当前instanceName为进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();
}
//获得MQ客户端实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道。

代码:MQClientManager(getAndCreateMQClientInstance方法)

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {//构建客户端IDString clientId = clientConfig.buildMQClientId();//根据客户端ID或者客户端实例MQClientInstance instance = this.factoryTable.get(clientId);//实例如果为空就创建新的实例,并添加到实例表中if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;
}

代码:DefaultMQProducerImpl(start方法)

//注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
//启动生产者
if (startFactory) {mQClientFactory.start();
}

3消息发送

在这里插入图片描述

代码:DefaultMQProducerImpl.send(Message msg)方法

//发送消息
public SendResult send(Message msg) {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

代码:DefaultMQProducerImpl.send(Message msg, long timeout)方法

//发送消息,默认超时时间为3s
public SendResult send(Message msg,long timeout){return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

代码:DefalultMQProducerImpl.sendDefaultImpl方法

//校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
1) 验证消息

代码:Validator.checkMessage方法

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)throws MQClientException {//判断是否为空if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}// 校验主题Validators.checkTopic(msg.getTopic());// 校验消息体if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());}
}
2) 查找路由

代码:DefaultMQProducerImpl.tryToFindTopicPublishInfo方法

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//从缓存中获得主题的路由信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//路由信息为空,则从NameServer获取路由if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//如果未找到当前主题的路由信息,则用默认主题继续查找this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

在这里插入图片描述

代码:TopicPublishInfo

public class TopicPublishInfo {private boolean orderTopic = false;	//是否是顺序消息private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();	//该主题消息队列private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每选择一次消息队列,该值+1private TopicRouteData topicRouteData;//关联Topic路由元信息
}

代码:MQClientInstance.updateTopicRouteInfoFromNameServer方法

TopicRouteData topicRouteData;
//使用默认主题从NameServer获取路由信息
if (isDefault && defaultMQProducer != null) {topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}
} else {//使用指定主题从NameServer获取路由信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}

代码:MQClientInstance.updateTopicRouteInfoFromNameServer方法

//判断路由是否需要更改
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

代码:MQClientInstance.updateTopicRouteInfoFromNameServer方法

if (changed) {//将topicRouteData转换为发布队列TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);//遍历生产Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//生产者不为空时,更新publishInfo信息impl.updateTopicPublishInfo(topic, publishInfo);}}
}

代码:MQClientInstance.topicRouteData2TopicPublishInfo方法

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {//创建TopicPublishInfo对象TopicPublishInfo info = new TopicPublishInfo();//关联topicRouteinfo.setTopicRouteData(route);//顺序消息,更新TopicPublishInfoif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {String[] brokers = route.getOrderTopicConf().split(";");for (String broker : brokers) {String[] item = broker.split(":");int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {MessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}info.setOrderTopic(true);} else {//非顺序消息更新TopicPublishInfoList<QueueData> qds = route.getQueueDatas();Collections.sort(qds);//遍历topic队列信息for (QueueData qd : qds) {//是否是写队列if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;//遍历写队列Brokerfor (BrokerData bd : route.getBrokerDatas()) {//根据名称获得读队列对应的Brokerif (bd.getBrokerName().equals(qd.getBrokerName())) {brokerData = bd;break;}}if (null == brokerData) {continue;}if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}//封装TopicPublishInfo写队列for (int i = 0; i < qd.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);info.getMessageQueueList().add(mq);}}}info.setOrderTopic(false);}//返回TopicPublishInfo对象return info;
}
3) 选择队列
  • 默认不启用Broker故障延迟机制

代码:TopicPublishInfo.selectOneMessageQueue(lastBrokerName)方法

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//第一次选择队列if (lastBrokerName == null) {return selectOneMessageQueue();} else {//sendWhichQueueint index = this.sendWhichQueue.getAndIncrement();//遍历消息队列集合for (int i = 0; i < this.messageQueueList.size(); i++) {//sendWhichQueue自增后取模int pos = Math.abs(index++) % this.messageQueueList.size();if (pos < 0)pos = 0;//规避上次Broker队列MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//如果以上情况都不满足,返回sendWhichQueue取模后的队列return selectOneMessageQueue();}
}

代码:TopicPublishInfo.selectOneMessageQueue()方法

//第一次选择队列
public MessageQueue selectOneMessageQueue() {//sendWhichQueue自增int index = this.sendWhichQueue.getAndIncrement();//对队列大小取模int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;//返回对应的队列return this.messageQueueList.get(pos);
}
  • 启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//Broker故障延迟机制if (this.sendLatencyFaultEnable) {try {//对sendWhichQueue自增int index = tpInfo.getSendWhichQueue().getAndIncrement();//对消息队列轮询获取一个队列for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//验证该队列是否可用if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {//可用if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//从规避的Broker中选择一个可用的Brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//获得Broker的写队列集合int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//获得一个队列,指定broker和队列ID并返回final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}

在这里插入图片描述

  • 延迟机制接口规范
public interface LatencyFaultTolerance<T> {//更新失败条目void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);//判断Broker是否可用boolean isAvailable(final T name);//移除Fault条目void remove(final T name);//尝试从规避的Broker中选择一个可用的BrokerT pickOneAtLeast();
}
  • FaultItem:失败条目
class FaultItem implements Comparable<FaultItem> {//条目唯一键,这里为brokerNameprivate final String name;//本次消息发送延迟private volatile long currentLatency;//故障规避开始时间private volatile long startTimestamp;
}
  • 消息失败策略
public class MQFaultStrategy {//根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};//根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}

原理分析
代码:DefaultMQProducerImpl.sendDefaultImpl方法

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

如果上述发送过程出现异常,则调用DefaultMQProducerImpl.updateFaultItem方法。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {//参数一:broker名称//参数二:本次消息发送延迟时间//参数三:是否隔离this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

代码:MQFaultStrategy.updateFaultItem方法

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {//计算broker规避的时长long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新该FaultItem规避时长this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}

代码:MQFaultStrategy.computeNotAvailableDuration方法

private long computeNotAvailableDuration(final long currentLatency) {//遍历latencyMaxfor (int i = latencyMax.length - 1; i >= 0; i--) {//找到第一个比currentLatency的latencyMax值if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}//没有找到则返回0return 0;
}

代码:LatencyFaultToleranceImpl.updateFaultItem方法

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {//获得原FaultItemFaultItem old = this.faultItemTable.get(name);//为空新建faultItem对象,设置规避时长和开始时间if (null == old) {final FaultItem faultItem = new FaultItem(name);faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}} else {//更新规避时长和开始时间old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}
4) 发送消息

消息发送API核心入口DefaultMQProducerImpl.sendKernelImpl方法

private SendResult sendKernelImpl(final Message msg,	//待发送消息final MessageQueue mq,	//消息发送队列final CommunicationMode communicationMode,		//消息发送内模式final SendCallback sendCallback,	pp	//异步消息回调函数final TopicPublishInfo topicPublishInfo,	//主题路由信息final long timeout	//超时时间)

代码:DefaultMQProducerImpl.sendKernelImpl方法

//获得broker网络地址信息
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {//没有找到从NameServer更新broker网络地址信息tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
//为消息分类唯一ID
if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);
}boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;
}
//消息大小超过4K,启用消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);
}

代码:SendMessageHook

public interface SendMessageHook {String hookName();void sendMessageBefore(final SendMessageContext context);void sendMessageAfter(final SendMessageContext context);
}

代码:DefaultMQProducerImpl.sendKernelImpl方法

//构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//主题
requestHeader.setTopic(msg.getTopic());
//默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题在单个Broker默认队列树
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标记
requestHeader.setSysFlag(sysFlag);
//消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息扩展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息等
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}
}
case ASYNC:		//异步发送Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;
case ONEWAY:
case SYNC:		//同步发送long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;
}
//如果注册了钩子函数,则发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);
}

4批量消息发送

在这里插入图片描述

批量消息发送是将同一个主题的多条消息一起打包发送到消息服务器,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息总长度不能超过DefaultMQProducer.maxMessageSize。

批量消息发送要解决的问题是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。

代码:DefalutMQProducer.send方法

public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//压缩消息集合成一条消息,然后发送出去return this.defaultMQProducerImpl.send(batch(msgs));
}

代码:DefaultMQProducer.batch方法

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {MessageBatch msgBatch;try {//将集合消息封装到MessageBatchmsgBatch = MessageBatch.generateFromList(msgs);//遍历消息集合,检查消息合法性,设置消息ID,设置Topicfor (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//压缩消息,设置消息bodymsgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}//设置msgBatch的topicmsgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;
}
关键字:RocketMQ源码分析 - Producer

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: