通过推送自毁信号使socket自行发出disconnected信号关闭
一下是源码 自定义的tcpsocket, 和 tcpserver,继承了QTcpSocket 和 QTcpServer
关键代码 mainwindow.cpp中监听服务器重启命令, 如修改了服务器端口需要立即重启,
发送自毁指令
// 服务器地址, 端口connect(systemSet, &SystemSet::serverInfoChanged, this, [=](const QString &ip, const quint16 port){// 注销所有socket连接qDebug() << "socketlist.size:" << server->getSocketList().size();
// int size = server->getSocketList().size();
// for (int i=0; i< size; i++) {
// CustomTcpSocket* tmpSocket = server->getSocketList()[i];
// tmpSocket->logOut();
// tmpSocket->abort();
// tmpSocket->close();
// }// server->getSocketList().clear();emit SignalRelayer::instance()->socketDestroy();
// // 重启服务server->close();server->open(ip, port);});
socket自行disconnected
CustomTcpSocket::CustomTcpSocket(QObject *parent): QTcpSocket(parent)
{......connect(SignalRelayer::instance(), &SignalRelayer::socketDestroy, this, [=]{this->disconnectFromHost();});
}
server端处理自毁任务
connect(socket, &CustomTcpSocket::disconnected, this, [=](){// 先注销socket->logOut();// 容器中移除socketList.removeOne(socket);// 判断已注册或未注册QString tipMsg;if (socket->devNumber.compare("") == 0) {tipMsg = "(未注册)";} else {// 更新为断网异常状态Myapp::updateDevUnitOnline(socket->devNumber, Myapp::ONLINE_STATUS[2]);// 发送监听, 刷新离线检测器emit SignalRelayer::instance()->flushOfflineDetectManager();}socket->deleteLater();// 发送断线信号
// emit socketDisconnected(socket->clientIp, socket->peerPort(), socket->devNumber);
// QString errMsg = QString("设备: %1 在%2断开. IP: %3, PORT: %4.")
// .arg(socket->devNumber).arg(Myapp::getCurrentTimeDesc())
// .arg(socket->clientIp).arg(socket->peerPort());// 发送断线故障
// emit SignalRelayer::instance()->deviceErro(errMsg, socket->devNumber, Myapp::SOCKET_STATUS_TYPE[0].toInt());// 刷新面板. 打印消息emit SignalRelayer::instance()->socketPublish(QString("设备%1: %2:%3 已断开").arg(tipMsg).arg(socket->peerAddress().toString()).arg(socket->peerPort()));});
server端
1.1. server.h
#ifndef CUSTOMTCPSERVER_H
#define CUSTOMTCPSERVER_H#include <QObject>
#include <QTcpServer>
#include <QFuture>
#include <QtConcurrent>
#include <QMainWindow>#include "customtcpsocket.h"
#include "threadinformationmessagebox.h"
#include "dev/devfaulteventparam.h"/*** @brief 开启服务监听设备连接. 服务启动后10秒会发送给离线设备检测器信号, 通知进行监听未连接的设备。*/
class CustomTcpServer : public QTcpServer
{Q_OBJECTstruct TipMark {bool errTipFlag;TipMark(bool _flag) {this->errTipFlag = _flag;}// 设置标识不可用, 即该类型的状态弹出框可以弹出void setUnNormal(){this->errTipFlag = true;}// 设置标识可用, 即该类型的状态弹出框不能弹出void setNormal(){this->errTipFlag = false;}
};// 异常提醒分类及标识
struct ErrTipMark : public TipMark{int type; // 0:在线情况, 1:运行时情况ErrTipMark(int _type, bool _errTipFlag): TipMark(_errTipFlag){this->type = _type;}
};public:explicit CustomTcpServer(QObject *parent = nullptr);/*** @brief 启动服务* @param ip ip地址* @param port 端口*/void open(const QString &ip, quint16 port);/*** 更改定时任务间隔时间**/void changeTimerIntervalTime(int intervalTime);/*** @brief 根据devNumber关闭socket* @param devNumber*/void closeSocket(const QString &devNumber);/** 获取连接集合 */QList<CustomTcpSocket*> getSocketList();/*** @brief 发送离线设备监听信号*/void flushOfflineDetectSignals();// ------------- public 方法 endpublic slots:signals:void reciveMsg(const QString &consoleMsg, QString footNote = QString(""));/*** @brief 设备失联* @param clientIp 设备ip地址* @param port 端口号* @param devNumber 设备id*/
// void socketDisconnected(QString clientIp, quint16 port, QString devNumber);/*** @brief 故障记录* @param basicInfo 异常事件基础信息*/void deviceFaultRecord(DevFaultEventParam::BaiscInfo basicInfo);/*** @brief realtimelog 实时日志* @param msg 发送或接收的消息* @param commDirect 1接收, 2写入*/void realtimelog(QString msg, int commDirect);/*** @brief 服务器开启* @param msg 启动成功消息*/void serverOpen(QString msg);protected:void incomingConnection(qintptr socketDescriptor) override;// 端口号quint16 port;QList<CustomTcpSocket*> socketList;private:ErrTipMark *runTipMark; // 运行状况提醒// -- private 方法 begin};#endif // CUSTOMTCPSERVER_H
1.1. server.cpp
#include "customtcpserver.h"CustomTcpServer::CustomTcpServer(QObject *parent) : QTcpServer(parent)
{}void CustomTcpServer::open(const QString &ip, quint16 inport)
{const QHostAddress &address = QHostAddress(ip);this->port = inport;qDebug() << "服务器启动成功.debug" << ",ip:" << ip << ",port:" << inport;if(this->listen(address, port)) {qDebug() << "服务器启动成功.debug" << ",ip:" << ip << ",port:" << inport;QString connectStateDesc = "服务器启动成功.";emit SignalRelayer::instance()->consoleMsg(QString("%1 ip地址:%2, 端口号: %3").arg(connectStateDesc).arg(ip).arg(port), connectStateDesc);// 两秒后发送离线检测通知QTimer::singleShot(1000, this, [=]{flushOfflineDetectSignals();});}
}/*** @brief 发送离线设备监听信号*/
void CustomTcpServer::flushOfflineDetectSignals() {
// QList<QString> devNumbers;
// qDebug() << "customserver.socketList.size: " << socketList.size();
// if (!socketList.isEmpty()) {
// // 组装数组, 发送信号
// for (CustomTcpSocket* socket : socketList) {
// if (socket->isRegister()) {
// devNumbers.append(socket->devNumber);
// }
// }
// }
// qDebug() << "customserver.devNumbers.size: " << devNumbers.size();emit SignalRelayer::instance()->flushOfflineDetectManager();}QList<CustomTcpSocket*> CustomTcpServer::getSocketList() {return this->socketList;
}void CustomTcpServer::incomingConnection(qintptr socketDescriptor)
{qDebug() << "new socket: " << socketDescriptor << ", " << Myapp::getCurrentTimeDescYMDHms();CustomTcpSocket *socket = new CustomTcpSocket(this);socket->setSocketDescriptor(socketDescriptor);socketList.append(socket);// 解析ip, 易看socket->clientIp = socket->peerAddress().toString().replace("::ffff:","").replace("%17","");emit SignalRelayer::instance()->consoleMsg(QString("设备: %1:%2 :已连接!").arg(socket->clientIp).arg(socket->peerPort()));
// connect(socket, &CustomTcpSocket::reciveMsg, this, [=](const QString &msg){
// emit reciveMsg(msg);
// });connect(socket, &CustomTcpSocket::disconnected, this, [=](){// 先注销socket->logOut();// 容器中移除socketList.removeOne(socket);// 判断已注册或未注册QString tipMsg;if (socket->devNumber.compare("") == 0) {tipMsg = "(未注册)";} else {// 更新为断网异常状态Myapp::updateDevUnitOnline(socket->devNumber, Myapp::ONLINE_STATUS[2]);// 发送监听, 刷新离线检测器emit SignalRelayer::instance()->flushOfflineDetectManager();}socket->deleteLater();// 发送断线信号
// emit socketDisconnected(socket->clientIp, socket->peerPort(), socket->devNumber);
// QString errMsg = QString("设备: %1 在%2断开. IP: %3, PORT: %4.")
// .arg(socket->devNumber).arg(Myapp::getCurrentTimeDesc())
// .arg(socket->clientIp).arg(socket->peerPort());// 发送断线故障
// emit SignalRelayer::instance()->deviceErro(errMsg, socket->devNumber, Myapp::SOCKET_STATUS_TYPE[0].toInt());// 刷新面板. 打印消息emit SignalRelayer::instance()->socketPublish(QString("设备%1: %2:%3 已断开").arg(tipMsg).arg(socket->peerAddress().toString()).arg(socket->peerPort()));});// 故障记录
// connect(socket, &CustomTcpSocket::deviceFaultRecord, this, [=](DevFaultEventParam::BaiscInfo basicInfo){
// // 若uuid为空不处理
// if (basicInfo.uuid.isEmpty()) {qDebug() << "uuid 为空";
// return;
// }qDebug() << "uuid 为: " << basicInfo.uuid;
// emit deviceFaultRecord(basicInfo);
// });// 实时日志connect(socket, &CustomTcpSocket::realtimelog, this, [=](QString msg, int commDirect){emit realtimelog(msg, commDirect);});
}void CustomTcpServer::closeSocket(const QString &devNumber)
{if(socketList.size() == 0) return;for(int i=0; i<socketList.size(); i++) {if(socketList.at(i)->devNumber == devNumber) {socketList.at(i)->close();break;}}
}
socket端
2.1. socket.h
#ifndef CUSTOMTCPSOCKET_H
#define CUSTOMTCPSOCKET_H#include <QObject>
#include <QTcpSocket>
#include <QTimer>
#include <QSqlQuery>
#include <QUuid>#include "app/myapp.h"
#include "dev/devfaulteventparam.h"
#include "app/signalrelayer.h"
#include "app/myapp.h"
#include "dev/warning/elecstreamwarncontroller.h"
#include "dev/warning/onlinewarncontroller.h"
#include "dev/devspeakrender.h"
#include "dev/warning/pulldatawarncontroller.h"
#include "registerlog.h"/*** @brief 建立与设备连接. 注册成功后会发送给离线设备检测器信号, 通知进行监听未连接的设备。*/
class CustomTcpSocket : public QTcpSocket
{Q_OBJECT
public:explicit CustomTcpSocket(QObject *parent = nullptr);~CustomTcpSocket();friend class Myapp;// --------- public 静态方法 end
// struct Base {
// int id;
// virtual void print() { } // 支持多态
// };
// struct Derived : Base {
// QString name;
// void print() override { }
// };/*** @brief 设备信息*/struct DevInfo {QString devName;QString devNumber;DevInfo(){}DevInfo(QString devName, QString devNumber): devName(devName), devNumber(devNumber) {}};QString clientIp;QString sendMsg;/** 设备号 */QString devNumber;/** 通讯单元名称 */QString devName;QString errMsg; // 故障信息QString reciveInfoMsg; // 接收到客户端数据, 固定的Hex数据// -------- public 方法 start/*** @brief 是否注册* @return true 已注册, false 未注册*/bool isRegister();/*** @brief 注销socket, 避免在断网瞬间还在接收消息, 但内容已经不正确, 读取不到数据了*/void logOut();signals:/*** @brief realtimelog* @param msg 接收或者发送的消息* @param dataType 数据类型. 接收或者发送, 已在myapp中定义, COMM_DIRECT(通讯方向类型)*/void realtimelog(QString msg, int commDirect); // 实时日志通知信号, dataType:1:接收的数据, 2:发送的数据/*** @brief 故障记录* @param basicInfo 异常事件参数基础信息*/void deviceFaultRecord(DevFaultEventParam::BaiscInfo basicInfo);private slots:/*** @brief 读取数据*/void handleReadyRead();/*** @brief 变更电流阈值* @param intervalTime 阈值*/void handleChangeIntervalTime(int intervalTime);private:// ------------- private 字段 begin/** 设备消息内容完善处理 */DevSpeakRender *speakRender;// ------------- private 方法 begin/*** 16进制字符串转10进制字符串, 如"0c"转换后为"12"*/QString hexToInt(const QByteArray &);/*** 根据位置(此位置为业务逻辑位置,如寄存器顺序的索引, 若返回结果有4个寄存器数据, 1:则像读取寄存器1的hex字符串)获取内容数据,返回值为 截取的hex字符串,*/int getIntResultByIndex(const QString &, int index);/*** @brief 处理设备响应的数据*/void handleReadReadImpl(QString &asciiString);
// bool isOnline(); // 判断设备是否在线(查询数据库实现)/*** @brief 向客户端写入, 读取数据*/void handleWrite();/*** @brief 接收到数据并存入变量后续使用*/QString recieveInfo();/*** @brief isNotBusinessInfo 是否是来自客户端的注册信息, 是:更新数据库状态正常online=1,不是则放行, 接着后续逻辑判断* @return 是否是注册信息, 含注册或者心跳*/bool isNotBusinessInfo();/*** @brief 解析注册信息*/void parseRegisterMsg(QString &asciiString, QString &data);/*** @brief 更新在线状态. 用户心跳检测后的更新和 isNotBusinessInfo判断后的更新* @param onlineStatus 在线状态, 0:未连接, 1:正常, 2:异常*/void updateOnlineStatus(QString onlineStatus);/*** @brief 查询是否存在此设备* @param tmpDevNumber 传入设备号* @return 设备信息. devNumber == 0 则此设备不存在*/CustomTcpSocket::DevInfo existDev(const QString &tmpDevNumber);// ----------- private 方法 endprivate:// -------- private 字段 startQList<QTimer> *listTime;// 判断上一次发送是否, 提供给发送时使用, 若为获得返回结果, 3次(定时任务)内不再发送消息, 超过3次继续发送, 并把这种情况作为日志记录bool recieveOver = false;// 电话号码QString telephone;/*** @brief 日志类型(0:正常,2:异常)*/int faultStatus;QString asciiString; // ascii的字符串, 用于判断是否是注册包数据/*** @brief isRegisterMsg 是否是心跳信息*/bool isRegisterMsg;bool isHeartMsg;/*** @brief 心跳回声, 定位心跳, 若频繁变化, 则证明心跳正常*/QString heartEcho;/*** @brief 心跳回声历史. 用于在线检查, 若期限内无变化则表明掉线*/QString heartEchoHistory;/*** @brief 健康检查定时任务初始化标识*/bool healthFirstInit = false;/** 电流预警控制 */ElecStreamWarnController *elecWarnController;/** 在线预警控制 */OnlineWarnController *onlineWarnController;/** 拉取数据预警控制 */PullDataWarnController *pullDataWarnController;/** @brief 定时器,拉取数据 */QTimer *pullDataTimer;// --------- private 字段 end/*** @brief 控制器初始化*/void controllerInit();/*** @brief 初始化拉取定时器*/void initPullTimer();/*** @brief 持久化消息, 记录到registerlog表中, 通过发送信号记录, 只有注册的才持久化* @param msg 消息内容* @commDirt 消息方向. 发送和接收*/void durationMsg(QString msg, int commDirt);// --------- private 方法 end};#endif // CUSTOMTCPSOCKET_H
2.2. socket.cpp
#include "customtcpsocket.h"#include <iostream>CustomTcpSocket::CustomTcpSocket(QObject *parent): QTcpSocket(parent)
{sendMsg = QString("01 03 00 00 00 06 C5 C8");// 监听获取设备数据connect(this, &QIODevice::readyRead, this, &CustomTcpSocket::handleReadyRead);// 监听采集频率变更connect(SignalRelayer::instance(), &SignalRelayer::socketIntervalTimeChanged, this, &CustomTcpSocket::handleChangeIntervalTime);connect(SignalRelayer::instance(), &SignalRelayer::socketDestroy, this, [=]{this->disconnectFromHost();});
}/*** @brief 控制器初始化*/
void CustomTcpSocket::controllerInit() {// 电流预警控制器初始化elecWarnController = new ElecStreamWarnController(this, this->devNumber, this->devName, this->speakRender);// 在线预警控制器onlineWarnController = new OnlineWarnController(this, this->devNumber, this->devName, this->speakRender);// 拉取数据预警控制器pullDataWarnController = new PullDataWarnController(this, this->devNumber, this->devName, this->speakRender);
}CustomTcpSocket::~CustomTcpSocket()
{qDebug() << "detete socket";
}void CustomTcpSocket::parseRegisterMsg(QString &asciiString, QString &data)
{// 若为注册记录设备id等信息if(asciiString.left(1).compare("*") == 0) {// 是注册消息, 更新dev_unit_online字段QString tmpDevNumber = asciiString.mid(1, 8);if (tmpDevNumber.compare(devNumber) == 0) {qDebug() << "设备已注册, 不再处理.";return;}// 判断此设备是否存在DevInfo devInfo = existDev(tmpDevNumber);if (devInfo.devNumber.compare("") == 0) {// 设备不存在return;}// 赋值通讯单元名称this->devName = devInfo.devName;devNumber = tmpDevNumber;// 初始化消息内容处理器this->speakRender = new DevSpeakRender(this, this->devName, this->peerPort(), this->clientIp);// 提取必要字符telephone = asciiString.replace(devNumber, "").replace("#", "").replace(",", "").replace("*", "");this->isRegisterMsg = true;emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), asciiString, "注册"));// 更新在线状态
// Myapp::updateDevUnitOnline(this->devNumber, Myapp::ONLINE_STATUS[1]);// 发送心跳通知. 此处注册成功emit SignalRelayer::instance()->socketPublish(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[2].toInt(), "设备注册成功, 心跳检测开启."));// 控制器初始化controllerInit();// 初始化拉取定时任务initPullTimer();// 发送重置用于在线检测emit SignalRelayer::instance()->flushOfflineDetectManager();return;}// 非注册不处理if (!isRegister()) {return;}if (asciiString.compare("&&&&") == 0) {// 触发心跳onlineWarnController->detectedBeat();this->isHeartMsg = true;// 接收消息打印emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "收到心跳包")));return;} else {this->isHeartMsg = false;this->isRegisterMsg = false;emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "接收消息")));}}void CustomTcpSocket::durationMsg(QString msg, int commDirt) {if (!isRegister()) {return;}emit SignalRelayer::instance()->transferMsgDuration(RegisterLogDto(this->devNumber, this->clientIp, this->peerPort(), msg, commDirt));
}CustomTcpSocket::DevInfo CustomTcpSocket::existDev(const QString &tmpDevNumber) {QString sql = QString("SELECT u.name, d.numb "" from dev_manage_ref c"" inner join dev_manage_ref e on e.pid = c.id"" inner join dev_manage_ref w on w.pid = e.id"" inner join dev_manage_ref u on u.pid = w.id"" inner join dev_unit d on d.id = u.id where d.numb = '%1' and d.enable = 1").arg(tmpDevNumber);QSqlQuery query;if (!query.exec(sql) || !query.next() || query.value(1).isNull()) {return DevInfo();}return DevInfo(query.value(0).toString(), query.value(1).toString());
}void CustomTcpSocket::handleReadyRead()
{// 信号灯闪烁emit SignalRelayer::instance()->msgInOut(true);// 接收到消息QString msg = recieveInfo();// 解析收到的数据parseRegisterMsg(msg.split(",")[0], msg.split(",")[1]);// 判断注册的设备号是否可用if(!Myapp::devEnabled(devNumber)) {return;}// 判断是否是业务数据if(isNotBusinessInfo()) return;// 不是0103开头的数据不处理QString data = msg.split(",")[1];if (data.indexOf("0103") == -1) {qDebug() << "不是0103开头数据不做处理, data: " << data;return;}// 电路数据处理handleReadReadImpl(data);
}QString CustomTcpSocket::recieveInfo()
{QByteArray byteArray = this->readAll();QString reciveInfoMsg = byteArray.toHex();qDebug() << "socket.devNumber: " << this->devNumber << ", devName: " << this->devName << ", message: " << reciveInfoMsg;// 持久化记录消息, 记录原始记录durationMsg(reciveInfoMsg, Myapp::COMM_DIRECT[1].toInt());byteArray.clear();byteArray = QByteArray::fromHex(reciveInfoMsg.toUtf8());asciiString = QString::fromUtf8(byteArray);byteArray.clear();return QString("%1,%2").arg(asciiString).arg(reciveInfoMsg);}bool CustomTcpSocket::isNotBusinessInfo()
{// 若是注册, 返回不是业务数据if (isRegisterMsg) {return true;}// 若是心跳, 更新在线状态if (isHeartMsg) {Myapp::updateDevUnitOnline(devNumber, Myapp::ONLINE_STATUS[1]);return true;}// 若拉取异常, 不处理业务数据, 接收的数据会异常
// if (pullDataWarnController->pullDataEchoErrNotifyFlag) {
// return true;
// }return false;
}void CustomTcpSocket::handleReadReadImpl(QString &asciiString)
{// 转换十进制QString midstr = asciiString.mid(4,2);// 获取返回数据长度QByteArray arylen = QByteArray::fromHex(midstr.toLocal8Bit());// 获取三路电流数值, 得到10进制QString lenResult = hexToInt(arylen);// 判断是否异常更新设备// 获取 总功率值(在第3块(4个为一块)16进制截取4位)int ab = getIntResultByIndex(asciiString, 4);int bc = getIntResultByIndex(asciiString, 5);int ca = getIntResultByIndex(asciiString, 6);QString recordType;// 处理电流elecWarnController->handleData(ab, bc, ca);// 拉取正常进行处理pullDataWarnController->handleNormal();
}void CustomTcpSocket::initPullTimer() {pullDataTimer = new QTimer(this);connect(pullDataTimer, &QTimer::timeout, this, &CustomTcpSocket::handleWrite);pullDataTimer->start(Myapp::secToMillSec(Myapp::getValueByKey(Myapp::SERV_INTERVAL_TIME_KEY)));
}void CustomTcpSocket::handleWrite()
{// 设备ID为空则不拉取数据if (!isRegister()) {return;}// 判断前一次数据是否拉取成功.this->pullDataWarnController->echoDetect();QByteArray data = QByteArray::fromHex(sendMsg.toLocal8Bit());emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[0].toInt(), this->sendMsg));// 持久化发送消息durationMsg(sendMsg, Myapp::COMM_DIRECT[0].toInt());// 向服务端发送this->write(data);
}
// ---------- 注册与注销bool CustomTcpSocket::isRegister() {return !devNumber.isNull() && !devNumber.isEmpty();
}/*** @brief 注销socket, 避免在断网瞬间还在接收消息, 但内容已经不正确, 读取不到数据了*/
void CustomTcpSocket::logOut() {this->devNumber = "";
}// ---------- 采集频率void CustomTcpSocket::handleChangeIntervalTime(int intervalTime)
{this->pullDataTimer->setInterval(Myapp::secToMillSec(intervalTime));emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleWrapperForDev(QString("采集频率重置为: %1 秒.").arg(intervalTime)));
}// --------- 解析设备回应的电流等数据↓int CustomTcpSocket::getIntResultByIndex(const QString &hexString, int index)
{QString wantStr = hexString.mid(3*2+((index-1)*4), 4);
// qDebug() << "index: " << index << ", wantStr: " << wantStr;QByteArray arylen = QByteArray::fromHex(wantStr.toLocal8Bit());QString high = wantStr.left(2);QString low = wantStr.right(2);return 16*hexToInt(QByteArray::fromHex(high.toLocal8Bit())).toInt()+hexToInt(QByteArray::fromHex(low.toLocal8Bit())).toInt();
}QString CustomTcpSocket::hexToInt(const QByteArray &byteAry)
{QString result;for (int i = 0; i < byteAry.size(); i++){quint8 byte = byteAry[i];result += QString::number(byte, 10);}return result;
}