当前位置: 首页> 科技> 互联网 > 网页视频下载插件哪个好用_营销策划公司简介模板_免费推广途径_百度信息流推广是什么意思

网页视频下载插件哪个好用_营销策划公司简介模板_免费推广途径_百度信息流推广是什么意思

时间:2025/7/11 16:33:31来源:https://blog.csdn.net/mjunz/article/details/146353811 浏览次数:1次
网页视频下载插件哪个好用_营销策划公司简介模板_免费推广途径_百度信息流推广是什么意思

大纲

1.私有协议介绍

2.私有协议的通信模型

3.私有协议栈的消息定义

4.私有协议栈链路的建立

5.私有协议栈链路的关闭

6.私有协议栈的心跳机制

7.私有协议栈的重连机制

8.私有协议栈的重复登录保护

9.私有协议栈核心的ChannelHandler

10.私有协议栈的客户端和服务端

11.私有协议栈的Packet数据包与编解码

12.私有协议栈的会话ID处理器

13.私有协议栈的握手处理器

14.私有协议栈的链路保活处理器

12.私有协议栈的会话ID处理器

客户端在通道激活时会由会话生成器生成一个会话ID,并利用channelId存放在会话缓存里。服务端在读取数据包时则先尝试根据channelId去会话缓存里获取会话ID,获取不到再从Packet数据包里取出来然后进行缓存。

//负责生成以及传递会话ID处理器handler
public class SessionIdHandler extends ChannelInboundHandlerAdapter {private boolean needGenerate;public SessionIdHandler(boolean needGenerate) {this.needGenerate = needGenerate;}//客户端逻辑,通道激活时的处理@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (needGenerate) {String channelId = ctx.channel().id().asLongText();long sessionId = SessionIdGenerator.generate();SessionManager sessionManager = SessionManager.getInstance();sessionManager.putSessionId(channelId, sessionId);}//触发往后handler的channelActivectx.fireChannelActive();}//服务端逻辑@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;SessionManager sessionManager = SessionManager.getInstance();String channelId = ctx.channel().id().asLongText();Long sessionId = sessionManager.getSessionId(channelId);if (sessionId == null) {sessionId = packet.getHeader().getSessionId();sessionManager.putSessionId(channelId, sessionId);}//触发往后handler的channelReadctx.fireChannelRead(msg);}
}//连接会话管理
public class SessionManager {private SessionManager() {}    private static class Singleton {static SessionManager instance = new SessionManager();}public static SessionManager getInstance() {return Singleton.instance;}//用来记录channelId和sessionId的mapprivate Map<String, Long> sessionIds = new ConcurrentHashMap<String, Long>();//用来记录address和session的map,可判断请求是否重复private Map<String, Session> sessions = new ConcurrentHashMap<String, Session>();public void putSessionId(String channelId, Long sessionId) {sessionIds.put(channelId, sessionId);}public long getSessionId(String channelId) {return sessionIds.get(channelId);}public long getSessionId(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();return sessionIds.get(channelId);}public void putSession(String remoteAddress, Session session) {sessions.put(remoteAddress, session);}public Session getSession(String remoteAddress) {return sessions.get(remoteAddress);}
}//会话id生成组件
public class SessionIdGenerator {public static long generate() {String uuid = UUID.randomUUID().toString();return uuid.hashCode();}
}//连接会话
public class Session {private String remoteAddress;public Session(String remoteAddress) {this.remoteAddress = remoteAddress;}public String getRemoteAddress() {return remoteAddress;}public void setRemoteAddress(String remoteAddress) {this.remoteAddress = remoteAddress;}
}

13.私有协议栈的握手处理器

(1)握手处理器说明

(2)发起握手请求与激活通道

(3)握手请求的重复会话判断与响应

(4)握手请求IP是否在白名单的判断

(5)握手请求的响应以及非法连接问题

(1)握手处理器说明

当连接刚建立通道刚被激活时,客户端需要发起握手请求,服务端则需要启动一个延时线程检查握手是否超时,比如通道激活1分钟后还没有会话ID。

当读取Packet数据包时,需要判断请求是握手请求还是握手响应。客户端处理握手响应,服务端处理握手请求。

服务端处理握手请求时,还要根据白名单IP看是否为非法请求,并根据会话缓存避免重复握手。

所以握手处理器主要包括三个功能:白名单IP处理 + 握手超时处理 + 重复握手处理。

(2)发起握手请求与激活通道

首先在连接刚建立时客户端通过channelActive()方法发起握手请求。

//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);//当连接刚建立时,客户端需要发送握手请求@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asLongText();long sessionId = SessionManager.getInstance().getSessionId(channelId);Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);ctx.writeAndFlush(handshakeRequestPacket);//触发往后handler的channelActivectx.fireChannelActive();}//创建握手请求数据包private Packet createHandshakeRequestPacket(long sessionId) throws IOException {Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.HandshakeRequest.value()).level(PacketLevel.DEFAULT.value()).body(new HandshakeRequest()).build();return packet;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {...}...
}//握手请求
public class HandshakeRequest implements Serializable {private String requestId = RequestIdGenerator.generate();public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}
}//请求ID生成组件
public class RequestIdGenerator {public static String generate() {return UUID.randomUUID().toString().replace("-", "");}
}

然后服务端收到握手请求后通过channelRead()方法进行处理,客户端收到握手响应也是通过channelRead()方法进行处理。

//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);//当连接刚建立时,客户端需要发送握手请求@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asLongText();long sessionId = SessionManager.getInstance().getSessionId(channelId);Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);ctx.writeAndFlush(handshakeRequestPacket);ctx.fireChannelActive();}//创建握手请求数据包private Packet createHandshakeRequestPacket(long sessionId) throws IOException {Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.HandshakeRequest.value()).level(PacketLevel.DEFAULT.value()).body(new HandshakeRequest()).build();return packet;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;//对不同请求的处理if (isHandshakeRequest(packet)) {handleHandshakeRequest(ctx, packet);} else if (isHandshakeResponse(packet)) {handleHandshakeResponse(ctx, packet);} else {ctx.fireChannelRead(msg);}}//是否是握手请求private boolean isHandshakeRequest(Packet packet) {return packet.getHeader().getType() == PacketType.HandshakeRequest.value();}//是否是握手响应private boolean isHandshakeResponse(Packet packet) {return packet.getHeader().getType() == PacketType.HandshakeResponse.value();}//处理握手请求private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {...}//处理握手响应private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {...}
}public enum PacketType {HandshakeRequest(1),HandshakeResponse(2),KeepAlivePing(3),KeepAlivePong(4);byte value;PacketType(int value) {this.value = (byte) value;}public byte value() {return value;}
}

(3)握手请求的重复会话判断与响应

//握手响应
public class HandshakeResponse implements Serializable {public static final String SESSION_EXISTED_ERROR_MESSAGE = "Session Existed.";public static final String NOT_IN_WHITE_LIST_ERROR_MESSAGE = "IP is not in white list.";private String requestId;private boolean success = true;private String errorMessage;//构造函数私有化不给外部进行newprivate HandshakeResponse() {}public static HandshakeResponse success(String requestId) {HandshakeResponse handshakeResponse = new HandshakeResponse();handshakeResponse.setRequestId(requestId);return handshakeResponse;}public static HandshakeResponse error(String requestId, String errorMessage) {HandshakeResponse handshakeResponse = new HandshakeResponse();handshakeResponse.setRequestId(requestId);handshakeResponse.setSuccess(false);handshakeResponse.setErrorMessage(errorMessage);return handshakeResponse;}public String getRequestId() { return requestId; }public void setRequestId(String requestId) { this.requestId = requestId; }public boolean isSuccess() { return success; }public void setSuccess(boolean success) { this.success = success; }public String getErrorMessage() { return errorMessage; }public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}

握手请求的重复会话判断处理:

public class HandshakeHandler extends ChannelInboundHandlerAdapter {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (isHandshakeRequest(packet)) {handleHandshakeRequest(ctx, packet);} else if(isHandshakeResponse(packet)) {handleHandshakeResponse(ctx, packet);} else {ctx.fireChannelRead(msg);}}//是否是握手请求private boolean isHandshakeRequest(Packet packet) {return packet.getHeader().getType() == PacketType.HandshakeRequest.value();}//处理握手请求private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {Packet handshakeResponsePacket = null;//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手if (existSession(ctx)) {handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);}ctx.writeAndFlush(handshakeResponsePacket);}//判断当前连接是否已经存在一个session了private boolean existSession(ChannelHandlerContext ctx) {String remoteAddress = ctx.channel().remoteAddress().toString();SessionManager sessionManager = SessionManager.getInstance();Session session = sessionManager.getSession(remoteAddress);return session != null;}//创建握手应答Packet对象private Packet createHandshakeResponsePacket(ChannelHandlerContext ctx, Packet handshakeRequestPacket, boolean success, String errorMessage) throws IOException {HandshakeRequest handshakeRequest = (HandshakeRequest) handshakeRequestPacket.getBody();HandshakeResponse handshakeResponse = success ? HandshakeResponse.success(handshakeRequest.getRequestId()) : HandshakeResponse.error(handshakeRequest.getRequestId(), errorMessage);Packet packet = Packet.builder().sessionId(handshakeRequestPacket.getHeader().getSessionId()).type(PacketType.HandshakeResponse.value()).level(PacketLevel.DEFAULT.value()).body(handshakeResponse).build();return packet;}...
}

(4)握手请求IP是否在白名单的判断

public class HandshakeHandler extends ChannelInboundHandlerAdapter {...//处理握手请求private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {Packet handshakeResponsePacket = null;//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手if (existSession(ctx)) {handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);}//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求else if(!inWhiteList(ctx)) {handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);}ctx.writeAndFlush(handshakeResponsePacket);}//判断发送握手请求过来的机器IP地址,是否在白名单里private boolean inWhiteList(ChannelHandlerContext ctx) {InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();String ip = socketAddress.getAddress().getHostAddress();WhiteListManager whiteListManager = WhiteListManager.getInstance();boolean inWhiteList = whiteListManager.inWhiteList(ip);return inWhiteList;}...
}public class WhiteListManager {private WhiteListManager() {whiteList.add("125.33.200.123");}private static class Singleton {static WhiteListManager instance = new WhiteListManager();}public static WhiteListManager getInstance() {return Singleton.instance;}private List<String> whiteList = new CopyOnWriteArrayList<String>();public boolean inWhiteList(String ip) {return whiteList.contains(ip);}
}

(5)握手请求的响应以及非法连接问题

握手请求的应答处理:

public class HandshakeHandler extends ChannelInboundHandlerAdapter {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (isHandshakeRequest(packet)) {handleHandshakeRequest(ctx, packet);} else if(isHandshakeResponse(packet)) {handleHandshakeResponse(ctx, packet);} else {ctx.fireChannelRead(msg);}}//处理握手请求private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {Packet handshakeResponsePacket = null;//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手if (existSession(ctx)) {handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);}//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求else if(!inWhiteList(ctx)) {handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);}//当前连接不存在重复session,同时握手请求ip地址在白名单里else {initSession(ctx);handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, true, null);}ctx.writeAndFlush(handshakeResponsePacket);}private void initSession(ChannelHandlerContext ctx) {String remoteAddress = ctx.channel().remoteAddress().toString();SessionManager sessionManager = SessionManager.getInstance();sessionManager.putSession(remoteAddress, new Session(remoteAddress));}//处理握手响应private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {HandshakeResponse handshakeResponse = (HandshakeResponse) packet.getBody();//如果是握手成功了if (handshakeResponse.isSuccess()) {logger.info("handshake success.");}//如果是握手失败了else {logger.error(handshakeResponse.getErrorMessage());ctx.close();}}...
}

如果客户端非法跟服务端建立一个Netty物理连接后,却一直不发送握手请求,这会导致服务端的连接资源被非法占用。为了解决这个问题,需要进行握手超时检查。无握手、连接资源被非法侵占问题可以通过延时线程解决。

public class HandshakeHandler extends ChannelInboundHandlerAdapter {...private int mode;public HandshakeHandler(int mode) {this.mode = mode;}//当连接刚被建立时,需要发送握手请求@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (mode == NettyMode.CLIENT.value()) {String channelId = ctx.channel().id().asLongText();long sessionId = SessionManager.getInstance().getSessionId(channelId);Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);ctx.writeAndFlush(handshakeRequestPacket);} else if(mode == NettyMode.SERVER.value()) {//检查是否在指定时间范围内把握手请求发送过来new HandshakeRequestTimeoutThread(ctx).start();}ctx.fireChannelActive();}//握手请求超时没收到检查的线程private class HandshakeRequestTimeoutThread extends Thread {private static final long HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD = 1 * 60 * 1000;private ChannelHandlerContext ctx;public HandshakeRequestTimeoutThread(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {try {//休眠1分钟,1分钟后再继续(1分钟后线程才能被唤醒, await则可以随时被唤醒)Thread.sleep(HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD);} catch (InterruptedException e) {logger.error("HandshakeRequestTimeoutThread interrupted exception.");}if (!existSession(ctx)) {logger.error("Client did not send handshake request in 1 minute.");ctx.close();}}}...
}public enum NettyMode {CLIENT(1),SERVER(2);int value;Mode(int value) {this.value = value;}int value() {return value;}
}

14.私有协议栈的链路保活处理器

(1)链路保活处理器说明

(2)链路保活处理器框架搭建

(3)定时检测长时间未通信的连接

(4)链路保活探测数据包封装与发送

(5)链路保活探测数据包的处理与响应

(6)链路保活探测包发送失败的重试

(1)链路保活处理器说明

当连接刚建立通道刚被激活时,客户端和服务端各自启动一个链路保活的检查线程。该线程会每隔10分钟做一次保活检查,具体的检查判断如下:

一.如果当前时间距离上一次收到数据包已超1小时,则启动链路保活探测,即向对方发送Ping消息

二.发送Ping消息时会记录Ping消息次数,但只要收到Pong消息或业务消息,就要清空该记录

三.只要记录的Ping消息次数超过3次,就关闭连接

(2)链路保活处理器框架搭建

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {private long lastPacketTimestamp = -1;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new KeepAliveThread(ctx).start();}//通信链路保活检查线程private class KeepAliveThread extends Thread {private ChannelHandlerContext ctx;public KeepAliveThread(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (packet != null) {lastPacketTimestamp = System.currentTimeMillis();}ctx.fireChannelRead(msg);}
}//探测Ping包
public class KeepAlivePing implements Serializable {private String requestId;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}
}//探测Pong包
public class KeepAlivePong implements Serializable {private String requestId;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}
}

(3)定时检测长时间未通信的连接

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);//每隔10分钟做一次保活检查private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;//1小时都没有通信就开启链路保活探测private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;private long lastPacketTimestamp = -1;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new KeepAliveThread(ctx).start();}//通信链路保活检查线程private class KeepAliveThread extends Thread {private ChannelHandlerContext ctx;private int keepAlivePingRetryTimes = 0;public KeepAliveThread(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {for(;;) {//每隔10分钟做一次链路保活检查try {sleep(KEEP_ALIVE_CHECK_INTERNAL);} catch (InterruptedException e) {logger.error("Keep alive thread interrupted exception.");}//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测long now = System.currentTimeMillis();if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {//TODO                    }}}private Packet createKeepAlivePingPacket() throws IOException {SessionManager sessionManager = SessionManager.getInstance();long sessionId = sessionManager.getSessionId(ctx);Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.KeepAlivePing.value()).level(PacketLevel.DEFAULT.value()).body(new KeepAlivePing()).build();return packet;}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (packet != null) {lastPacketTimestamp = System.currentTimeMillis();}ctx.fireChannelRead(msg);}
}

(4)链路保活探测数据包封装与发送

启动链路保活探测,发送保活探测包:

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);//每隔10分钟做一次保活检查private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;//1个小时都没有通信就开启链路保活探测private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;private long lastPacketTimestamp = -1;//存放对于该长连接已经发送的保活探测包请求//map是用于存放已经发送ping探测包后但还没收到pong探测包的packetprivate Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new KeepAliveThread(ctx).start();}//通信链路保活检查线程private class KeepAliveThread extends Thread {private ChannelHandlerContext ctx;private int keepAlivePingRetryTimes = 0;public KeepAliveThread(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {for(;;) {//每隔10分钟做一次链路保活检查try {sleep(KEEP_ALIVE_CHECK_INTERNAL);} catch (InterruptedException e) {logger.error("Keep alive thread interrupted exception.");}//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测long now = System.currentTimeMillis();if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {try {Packet keepAlivePingPacket = createKeepAlivePingPacket();ctx.writeAndFlush(keepAlivePingPacket);KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);} catch (Exception e) {logger.error("keep alive ping packet serialization exception.");}}}}private Packet createKeepAlivePingPacket() throws IOException {SessionManager sessionManager = SessionManager.getInstance();long sessionId = sessionManager.getSessionId(ctx);Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.KeepAlivePing.value()).level(PacketLevel.DEFAULT.value()).body(new KeepAlivePing()).build();return packet;}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (packet != null) {lastPacketTimestamp = System.currentTimeMillis();}ctx.fireChannelRead(msg);}
}

(5)链路保活探测数据包的处理与响应

收到保活探测包之后的处理:

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);//每隔10分钟做一次保活检查private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;//1个小时都没有通信就开启链路保活探测private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;private long lastPacketTimestamp = -1;//存放对于该长连接已经发送的保活探测包请求//map是用于存放已经发送ping探测包后但还没收到pong探测包的packetprivate Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new KeepAliveThread(ctx).start();}...//收到保活探测包之后的处理@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (packet != null) {lastPacketTimestamp = System.currentTimeMillis();keepAlivePingPackets.clear();}if (isKeepAlivePingPacket(packet)) {//收到的是保活探测ping包就构建pong包Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);ctx.writeAndFlush(keepAlivePongPacket);}  else if(isKeepAlivePongPacket(packet)) {//收到的是保活探测pong包就代表成功KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();keepAlivePingPackets.remove(keepAlivePong.getRequestId());logger.info("Keep alive ping pong success.");}ctx.fireChannelRead(msg);}private boolean isKeepAlivePingPacket(Packet packet) {return packet.getHeader().getType() == PacketType.KeepAlivePing.value();}private boolean isKeepAlivePongPacket(Packet packet) {return packet.getHeader().getType() == PacketType.KeepAlivePong.value();}private Packet createKeepAlivePongPacket(ChannelHandlerContext ctx, Packet keepAliveRequestPacket) throws IOException {SessionManager sessionManager = SessionManager.getInstance();long sessionId = sessionManager.getSessionId(ctx);KeepAlivePing keepAlivePing = (KeepAlivePing) keepAliveRequestPacket.getBody();Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.KeepAlivePong.value()).level(PacketLevel.DEFAULT.value()).body(new KeepAlivePong(keepAlivePing.getRequestId())).build();return packet;}
}

(6)链路保活探测包发送失败的重试

如果10分钟了还没收到第一次发的Ping探测包的Pong响应,那么就进行重试,且最多重试3次。连续成功发送了3次Ping,结果一直没有Pong响应,则关闭连接。

public class KeepAliveHandler extends ChannelInboundHandlerAdapter {...private static final int KEEP_ALIVE_PING_RETRY_TIMES = 3;private long lastPacketTimestamp = -1;//存放对于该长连接已经发送的保活探测包请求//map是用于存放已经发送ping探测包后但还没收到pong探测包的packetprivate Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new KeepAliveThread(ctx).start();}//通信链路保活检查线程private class KeepAliveThread extends Thread {private ChannelHandlerContext ctx;private int keepAlivePingRetryTimes = 0;public KeepAliveThread(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {for(;;) {//每隔10分钟做一次链路保活检查try {sleep(KEEP_ALIVE_CHECK_INTERNAL);} catch (InterruptedException e) {logger.error("Keep alive thread interrupted exception.");}//每隔10分钟检查一下最近一次发送的keep alive ping是否已经收到pong了if (keepAlivePingPackets.size() > 0) {//发送重试if (keepAlivePingPackets.size() < KEEP_ALIVE_PING_RETRY_TIMES) {if (!sendKeepAlivePingPacketWithRetry(ctx)) {ctx.close();}}//连续发送成功了3次ping,结果一直没有pong回来,此时也是关闭物理连接if (keepAlivePingPackets.size() >= KEEP_ALIVE_PING_RETRY_TIMES) {ctx.close();}}//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测long now = System.currentTimeMillis();if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {//如果连续重试3次都发送不成功一个探测包,此时直接关闭物理连接if (!sendKeepAlivePingPacketWithRetry(ctx)) {ctx.close();}}}}//10分钟内如果还没收到第一次发的ping探测包的pong响应,那么就进行重试,且最多重试3次private boolean sendKeepAlivePingPacketWithRetry(ChannelHandlerContext ctx) {boolean result = false;int retryTimes = 0;while(retryTimes < KEEP_ALIVE_PING_RETRY_TIMES) {try {sendKeepAlivePingPacket(ctx);result = true;break;} catch (Exception e) {logger.error("send keep alive ping packet exception.");retryTimes++;}}return result;}private void sendKeepAlivePingPacket(ChannelHandlerContext ctx) throws IOException {Packet keepAlivePingPacket = createKeepAlivePingPacket();ctx.writeAndFlush(keepAlivePingPacket);KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);}private Packet createKeepAlivePingPacket() throws IOException {SessionManager sessionManager = SessionManager.getInstance();long sessionId = sessionManager.getSessionId(ctx);Packet packet = Packet.builder().sessionId(sessionId).type(PacketType.KeepAlivePing.value()).level(PacketLevel.DEFAULT.value()).body(new KeepAlivePing()).build();return packet;}}...
}

当有数据包发送过来时,那么就可以清空记录的Ping探测包了。

public class KeepAliveHandler extends ChannelInboundHandlerAdapter {...//收到保活探测包之后的处理@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Packet packet = (Packet) msg;if (packet != null) {//有包发送过来的时候,不管是什么样的包,ping探测包都可以清空了lastPacketTimestamp = System.currentTimeMillis();keepAlivePingPackets.clear();}if (isKeepAlivePingPacket(packet)) {//收到的是保活探测ping包就构建pong包Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);ctx.writeAndFlush(keepAlivePongPacket);}  else if(isKeepAlivePongPacket(packet)) {//收到的是保活探测pong包就代表成功KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();keepAlivePingPackets.remove(keepAlivePong.getRequestId());logger.info("Keep alive ping pong success.");}ctx.fireChannelRead(msg);}...
}

关键字:网页视频下载插件哪个好用_营销策划公司简介模板_免费推广途径_百度信息流推广是什么意思

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: