谈谈如何使用Netty开发实现高性能的RPC服务器

📅 2026/7/2 2:40:43
谈谈如何使用Netty开发实现高性能的RPC服务器
RPCRemote Procedure Call Protocol远程过程调用协议它是一种通过网络从远程计算机程序上请求服务而不必了解底层网络技术的协议。说的再直白一点就是客户端在不必知道调用细节的前提之下调用远程计算机上运行的某个对象使用起来就像调用本地的对象一样。目前典型的RPC实现框架有Thriftfacebook开源、Dubboalibaba开源等等。RPC框架针对网络协议、网络I/O模型的封装是透明的对于调用的客户端而言它就认为自己在调用本地的一个对象。至于传输层上运用的是TCP协议、UDP协议、亦或是HTTP协议一概不关心。从网络I/O模型上来看是基于select、poll、epoll方式、还是IOCPI/O Completion Port方式承载实现的对于调用者而言也不用关心。目前主流的RPC框架都支持跨语言调用即有所谓的IDL接口定义语言其实这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求IDL就可以不用包括了。最后值得一提的是衡量一个RPC框架性能的好坏与否RPC的网络I/O模型的选择至关重要。在此基础上设计出来的RPC服务器可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型在高并发的状态下处理性能上会有很大的差别。还有一个衡量的标准就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议对性能也有一定的影响。但是从我目前了解的情况来看大多数RPC开源实现框架都是基于TCP、或者HTTP的目测没有采用UDP协议做为主要的传输协议的。明白了RPC的使用原理和性能要求。现在我们能不能撇开那些RPC开源框架自己动手开发一个高性能的RPC服务器呢我想还是可以的。现在本人就使用Java基于Netty开发实现一个高性能的RPC服务器。如何实现、基于什么原理并发处理性能如何请继续接着看下文。我们有的时候为了提高单个节点的通信吞吐量提高通信性能。如果是基于Java后端的一般首选的是NIO框架No-block IO。但是问题也来了Java的NIO掌握起来要相当的技术功底和足够的技术积累使用起来才能得心应手。一般的开发人员如果要使用NIO开发一个后端的TCP/HTTP服务器附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节开发门槛太高所以比较明智的选择是采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信非阻塞的IO、灵活的IO线程池而设计的应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架设计上日趋完善Java后端高性能服务器开发在技术上提供了有力的支持保障从而打破了C在服务器后端一统天下的局面。因为在此之前Java的NIO一直受人诟病让人敬而远之既然这个RPC服务器是基于Netty的那就在说说Netty吧。实际上Netty是对JAVA NIO框架的再次封装它的开源网址是Netty: Home本文中使用的Netty版本是4.0版本可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2进行下载使用。那也许你会问如何使用Netty进行RPC服务器的开发呢实际不难下面我就简单的说明一下技术原理1、定义RPC请求消息、应答消息结构里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系然后等待客户端发起调用请求。3、客户端发起的RPC消息里面包含远程调用的类名、方法名称、参数结构、参数值等信息通过网络以字节流的方式送给RPC服务端RPC服务端接收到字节流的请求之后去对应的容器里面查找客户端接口映射的具体实现对象。4、RPC服务端找到实现对象的参数信息通过反射机制创建该对象的实例并返回调用处理结果最后封装成RPC应答消息通知到客户端。5、客户端通过网络收到字节流形式的RPC应答消息进行拆包、解析之后显示远程调用结果。上面说的是很简单但是实现的时候我们还要考虑如下的问题1、RPC服务器的传输层是基于TCP协议的出现粘包咋办这样客户端的请求服务端不是会解析失败好在Netty里面已经提供了解决TCP粘包问题的解码器LengthFieldBasedFrameDecoder可以靠它轻松搞定TCP粘包问题。2、Netty服务端的线程模型是单线程、多线程一个线程负责客户端连接连接成功之后丢给后端IO的线程池处理、还是主从模式客户端连接、后端IO处理都是基于线程池的实现。当然在这里我出于性能考虑使用了Netty主从线程池模型。3、Netty的IO处理线程池如果遇到非常耗时的业务出现阻塞了咋办这样不是很容易把后端的NIO线程给挂死、阻塞本文的处理方式是对于复杂的后端业务分派到专门的业务线程池里面进行异步回调处理。4、RPC消息的传输是通过字节流在NIO的通道Channel之间传输那具体如何实现呢本文是通过基于Java原生对象序列化机制的编码、解码器ObjectEncoder、ObjectDecoder进行实现的。当然出于性能考虑这个可能不是最优的方案。更优的方案是把消息的编码、解码器搞成可以配置实现的。具体比如可以通过protobuf、JBoss Marshalling方式进行解码和编码以提高网络消息的传输效率。5、RPC服务器要考虑多线程、高并发的使用场景所以线程安全是必须的。此外尽量不要使用synchronized进行加锁改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调就有这方面的使用。6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置轻松进行加载、卸载。在这里本文是通过Spring容器进行统一的对象管理。综上所述本文设计的RPC服务器调用的流程图如下所示客户端并发发起RPC调用请求然后RPC服务端使用Netty连接器分派出N个NIO连接线程这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到Netty NIO处理线程池进行管理这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理Handler的时候处于性能考虑这里的设计是直接把复杂的消息处理过程丢给专门的RPC业务处理线程池集中处理然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束客户端会异步等待服务端消息的处理结果本文是通过消息回调机制实现MessageCallBack。再来说一说Netty对于RPC消息的解码、编码、处理对应的模块和流程具体如下图所示从上图可以看出客户端、服务端对RPC消息编码、解码、处理调用的模块以及调用顺序了。Netty就是把这样一个一个的处理器串在一起形成一个责任链统一进行调用。说了这么多现在先简单看下我设计实现的NettyRPC的代码目录层级结构其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面则封装了RPC消息请求、应答报文结构以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。下面先来看下newlandframework.netty.rpc.model包中定义的内容。具体是RPC消息请求、应答消息的结构定义RPC请求消息结构/** * filename:MessageRequest.java * * Newland Co. Ltd. All rights reserved. * * Description:rpc服务请求结构 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageRequest implements Serializable { private String messageId; private String className; private String methodName; private Class?[] typeParameters; private Object[] parametersVal; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId messageId; } public String getClassName() { return className; } public void setClassName(String className) { this.className className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName methodName; } public Class?[] getTypeParameters() { return typeParameters; } public void setTypeParameters(Class?[] typeParameters) { this.typeParameters typeParameters; } public Object[] getParameters() { return parametersVal; } public void setParameters(Object[] parametersVal) { this.parametersVal parametersVal; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append(messageId, messageId).append(className, className) .append(methodName, methodName).toString(); } }RPC应答消息结构/** * filename:MessageResponse.java * * Newland Co. Ltd. All rights reserved. * * Description:rpc服务应答结构 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageResponse implements Serializable { private String messageId; private String error; private Object resultDesc; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId messageId; } public String getError() { return error; } public void setError(String error) { this.error error; } public Object getResult() { return resultDesc; } public void setResult(Object resultDesc) { this.resultDesc resultDesc; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append(messageId, messageId).append(error, error).toString(); } }RPC服务接口定义、服务接口实现绑定关系容器定义提供给spring作为容器使用。/** * filename:MessageKeyVal.java * * Newland Co. Ltd. All rights reserved. * * Description:rpc服务映射容器 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.model; import java.util.Map; public class MessageKeyVal { private MapString, Object messageKeyVal; public void setMessageKeyVal(MapString, Object messageKeyVal) { this.messageKeyVal messageKeyVal; } public MapString, Object getMessageKeyVal() { return messageKeyVal; } }好了定义好核心模型结构之后现在再向大家展示一下NettyRPC核心包newlandframework.netty.rpc.core的关键部分实现代码首先是业务线程池相关类的实现代码具体如下线程工厂定义实现/** * filename:NamedThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * Description:线程工厂 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger threadNumber new AtomicInteger(1); private final AtomicInteger mThreadNum new AtomicInteger(1); private final String prefix; private final boolean daemoThread; private final ThreadGroup threadGroup; public NamedThreadFactory() { this(rpcserver-threadpool- threadNumber.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemo) { this.prefix prefix -thread-; daemoThread daemo; SecurityManager s System.getSecurityManager(); threadGroup (s null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } public Thread newThread(Runnable runnable) { String name prefix mThreadNum.getAndIncrement(); Thread ret new Thread(threadGroup, runnable, name, 0); ret.setDaemon(daemoThread); return ret; } public ThreadGroup getThreadGroup() { return threadGroup; } }业务线程池定义实现/** * filename:RpcThreadPool.java * * Newland Co. Ltd. All rights reserved. * * Description:rpc线程池封装 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RpcThreadPool { //独立出线程池主要是为了应对复杂耗I/O操作的业务不阻塞netty的handler线程而引入 //当然如果业务足够简单把处理逻辑写入netty的handlerChannelInboundHandlerAdapter也未尝不可 public static Executor getExecutor(int threads, int queues) { String name RpcThreadPool; return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues 0 ? new SynchronousQueueRunnable() : (queues 0 ? new LinkedBlockingQueueRunnable() : new LinkedBlockingQueueRunnable(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name)); } }/** * filename:AbortPolicyWithReport.java * * Newland Co. Ltd. All rights reserved. * * Description:线程池异常策略 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { private final String threadName; public AbortPolicyWithReport(String threadName) { this.threadName threadName; } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg String.format(RpcServer[ Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)], threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating()); System.out.println(msg); throw new RejectedExecutionException(msg); } }RPC调用客户端定义实现/** * filename:MessageSendExecutor.java * * Newland Co. Ltd. All rights reserved. * * Description:Rpc客户端执行模块 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.Proxy; public class MessageSendExecutor { private RpcServerLoader loader RpcServerLoader.getInstance(); public MessageSendExecutor(String serverAddress) { loader.load(serverAddress); } public void stop() { loader.unLoad(); } public static T T execute(ClassT rpcInterface) { return (T) Proxy.newProxyInstance( rpcInterface.getClassLoader(), new Class?[]{rpcInterface}, new MessageSendProxyT(rpcInterface) ); } }这里的RPC客户端实际上是动态代理了MessageSendProxy当然这里是应用了JDK原生的动态代理实现你还可以改成CGLIBCode Generation Library方式。不过本人测试了一下CGLIB方式在高并发的情况下面会出现空指针异常但是同样的情况JDK原生的动态代理却没有问题。并发程度不高的情况下面两种代理方式都运行正常。后续再深入研究看看吧废话不说了现在给出MessageSendProxy的实现方式/** * filename:MessageSendProxy.java * * Newland Co. Ltd. All rights reserved. * * Description:Rpc客户端消息处理 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; import newlandframework.netty.rpc.model.MessageRequest; public class MessageSendProxyT implements InvocationHandler { private ClassT cls; public MessageSendProxy(ClassT cls) { this.cls cls; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MessageRequest request new MessageRequest(); request.setMessageId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setTypeParameters(method.getParameterTypes()); request.setParameters(args); MessageSendHandler handler RpcServerLoader.getInstance().getMessageSendHandler(); MessageCallBack callBack handler.sendRequest(request); return callBack.start(); } }进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块它的代码如下/** * filename:RpcServerLoader.java * * Newland Co. Ltd. All rights reserved. * * Description:rpc服务器配置加载 * author tangjie * version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class RpcServerLoader { private volatile static RpcServerLoader rpcServerLoader; private final static String DELIMITER :; private RpcSerializeProtocol serializeProtocol RpcSerializeProtocol.JDKSERIALIZE; //方法返回到Java虚拟机的可用的处理器数量 private final static int parallel Runtime.getRuntime().availableProcessors() * 2; //netty nio线程池 private EventLoopGroup eventLoopGroup new NioEventLoopGroup(parallel); private static ThreadPoolExecutor threadPoolExecutor (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1); private MessageSendHandler messageSendHandler null; //等待Netty服务端链路建立通知信号 private Lock lock new ReentrantLock(); private Condition signal lock.newCondition(); private RpcServerLoader() { } //并发双重锁定 public static RpcServerLoader getInstance() { if (rpcServerLoader null) { synchronized (RpcServerLoader.class) { if (rpcServerLoader null) { rpcServerLoader new RpcServerLoader(); } } } return rpcServerLoader; } public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) { String[] ipAddr serverAddress.split(RpcServerLoader.DELIMITER); if (ipAddr.length 2) { String host ipAddr[0]; int port Integer.parseInt(ipAddr[1]); final InetSocketAddress remoteAddr new InetSocketAddress(host, port); threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol)); } } public void setMessageSendHandler(MessageSendHandler messageInHandler) { try { lock.lock(); this.messageSendHandler messageInHandler; //唤醒所有等待客户端RPC线程 signal.signalAll(); } finally { lock.unlock(); } } public MessageSendHandler getMessageSendHandler() throws InterruptedException { try { lock.lock(); //Netty服务端链路没有建立完毕之前先挂起等待 if (messageSendHandler null) { signal.await(); } return messageSendHandler; } finally { lock.unlock(); } } public void unLoad() { messageSendHandler.close(); threadPoolExecutor.shutdown(); eventLoopGroup.shutdownGracefully(); } public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) { this.serializeProtocol serializeProtocol;