当前位置: 首页> 健康> 美食 > 线程池实现服务端

线程池实现服务端

时间:2025/7/17 15:29:25来源:https://blog.csdn.net/m0_52423355/article/details/142066828 浏览次数:0次

线程池实现服务端

线程池的实现

文章目录

  • 线程池实现服务端
    • 实现思路
    • 通信任务函数
    • 处理客户端请求任务函数
    • 创建线程池
    • 完整实现
    • 线程池的实现

实现思路

将服务端的任务抽象为两个任务,即接收客户端请求,和客户端通信;这两个任务就作为线程池中任务队列的元素。我们需要做的就是:

  • socket:创建监听的文件描述符(socket) fd;

  • bind:fd 和自身的 ip 和端口绑定;

  • listen:为当前文件描述符设置监听,主要是设置客户端连接数量;

  • 创建线程池;

  • 向任务队列中添加处理客户端请求的任务;

  • 当收到客户端请求并建立连接后,向任务队列中添加与客户端通信的任务;

以上为主要思路;至于线程的创建销毁,线程池都会自动完成,我们只需要关心以上几步即可;前面的思路在前面的文章中已经讲解,因此此处重点阐述后三步;

其中任务即函数,需要定义两个函数,一个用于接收请求,一个和客户端通信;重点需要考虑的即为这两个函数的参数,在下面将详细介绍。

本文为了合乎逻辑的地推算出需要哪些函数参数,因此讲解可能与执行顺序不一致;

通信任务函数

用于通信的函数需要的参数为 用于通信的文件描述符,客户端的 addr;而在添加任务时,任务函数的参数为 void*,因此将两个参数打包为一个结构体:

typedef struct {int fd;struct sockaddr_in addr;
} SockInfo;

工作函数得到这些信息后即可进行通信:

void working(void* arg) {SockInfo* sockInfo = (SockInfo*)arg;// 开始通信while (1) {char buff[1024];int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);if (ret > 0) {printf("客户端说:%s\n", buff);send(sockInfo->fd, buff, strlen(buff) + 1, 0);} else if (ret == 0) {printf("客户端已断开连接\n");// 初始化,表示弃用该通信套接字sockInfo->fd = -1;break;} else {perror("read fail...\n");sockInfo->fd = -1;break;}}close(sockInfo->fd);
}

处理客户端请求任务函数

该函数需要的参数为 用于监听的文件描述符,线程池(因为在接收请求后需要向任务队列添加任务),将两个变量进行打包:

typedef struct 
{int fd;ThreadPool* pool;
} PoolInfo;

在获取以上信息后,即可监听客户端,当收到请求后,会创建连接并得到通信文件描述符以及客户端的地址;由于这两个信息是通信任务函数所需要的,因此定义一个结构体,将这些信息存放进去;然后添加通信任务到任务队列:

void acceptConn(void* arg) {PoolInfo* poolInfo = (PoolInfo*)arg;// 4. 阻塞等待客户端连接while (1) {// 创建子线程SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));int len = sizeof(struct sockaddr);pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);if (pInfo->fd == -1) {perror("accept fail...\n");exit(0);}printf("建立通信,文件描述符为 %d\n", pInfo->fd);// 添加通信任务threadPoolAdd(poolInfo->pool, working, pInfo);}close(poolInfo->fd);
}

创建线程池

该任务在主线程中完成,在完成 1-3 步后,我们需要将任务添加到任务队列,那么我们需要参数,在前面分析过,此处需要的参数为用于监听的文件描述符和线程池,定义该结构体,将任务添加到任务队列即可;在添加完成后,主线程退出,此后,线程池将接管所有子线程。

int main () {// 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCPint fd = socket(AF_INET, SOCK_STREAM, 0);if (fd == -1) {perror("socket fail...!\n");exit(0);}// 2. socket 和本机ip 端口绑定struct sockaddr_in in_addr;in_addr.sin_family = AF_INET;// 端口号,短整型,主机转网络in_addr.sin_port = htons(9898);// 获取本机 ip 地址in_addr.sin_addr.s_addr = INADDR_ANY;int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));if (ret == -1) {perror("bind fail...\n");exit(0);}// 3. 监听客户端ret = listen(fd, 128);if (ret == -1) {perror("listen fail...\n");exit(0);}PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));poolInfo->fd = fd;ThreadPool* pool = threadpoolCreate(3, 8, 100);poolInfo->pool = pool;threadPoolAdd(pool, acceptConn, poolInfo);pthread_exit(NULL);return 0;
}

27 行之前的代码不再赘述。

首先现在需要创建一个线程,即用于接收客户端请求的线程,任务函数为 acceptConn;该函数中需要的参数为用于通信的文件描述符 fd

PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
poolInfo->fd = fd;ThreadPool* pool = threadpoolCreate(3, 8, 100);
poolInfo->pool = pool;
threadPoolAdd(pool, acceptConn, poolInfo);pthread_exit(NULL);

完整实现

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "threadpool.h"typedef struct {int fd;// pthread_t tid;struct sockaddr_in addr;
} SockInfo;typedef struct 
{int fd;ThreadPool* pool;
} PoolInfo;void working(void* arg) {SockInfo* sockInfo = (SockInfo*)arg;// 开始通信while (1) {char buff[1024];int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);if (ret > 0) {printf("客户端说:%s\n", buff);send(sockInfo->fd, buff, strlen(buff) + 1, 0);} else if (ret == 0) {printf("客户端已断开连接\n");// 初始化,表示弃用该通信套接字sockInfo->fd = -1;break;} else {perror("read fail...\n");sockInfo->fd = -1;break;}}close(sockInfo->fd);
}void acceptConn(void* arg) {PoolInfo* poolInfo = (PoolInfo*)arg;// 4. 阻塞等待客户端连接while (1) {// 创建子线程SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));int len = sizeof(struct sockaddr);pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);if (pInfo->fd == -1) {perror("accept fail...\n");exit(0);}printf("建立通信,文件描述符为 %d\n", pInfo->fd);// 添加通信任务threadPoolAdd(poolInfo->pool, working, pInfo);}close(poolInfo->fd);
}int main () {// 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCPint fd = socket(AF_INET, SOCK_STREAM, 0);if (fd == -1) {perror("socket fail...!\n");exit(0);}// 2. socket 和本机ip 端口绑定struct sockaddr_in in_addr;in_addr.sin_family = AF_INET;// 端口号,短整型,主机转网络in_addr.sin_port = htons(9898);// 获取本机 ip 地址in_addr.sin_addr.s_addr = INADDR_ANY;int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));if (ret == -1) {perror("bind fail...\n");exit(0);}// 3. 监听客户端ret = listen(fd, 128);if (ret == -1) {perror("listen fail...\n");exit(0);}PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));poolInfo->fd = fd;ThreadPool* pool = threadpoolCreate(3, 8, 100);poolInfo->pool = pool;threadPoolAdd(pool, acceptConn, poolInfo);pthread_exit(NULL);return 0;
}

线程池的实现

详见文章(该文章使用C++实现,本文所用的线程池是C语言,单思路一致)
threadpool.h

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
typedef struct ThreadPool ThreadPool;
// 创建线程池
ThreadPool* threadpoolCreate(int min, int max, int queueSize);// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);// 添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);// 获取线程池中工作的线程数
int threadPoolBusyNum(ThreadPool* pool);// 获取线程池中活着的线程数
int threadPoolAliveNum(ThreadPool* pool);void* worker(void* arg);
void* manager(void* arg);void threadExit(ThreadPool* pool);#endif  // _THREADPOOL_H_

threadpool.c

#include "threadpool.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>// 每次添加的线程数
#define NUMBER 2// 任务
typedef struct Task {void (*function)(void*);void* arg;
} Task;// 线程池
typedef struct ThreadPool {//  任务队列,数组的每个元素是一个函数及其参数Task* taskQ;int queueCapacity;  // 容量int queueSize;       // 任务个数int queueFront;     // 队头int queueRear;      // 队尾// 线程pthread_t managerID;    // 管理者线程 IDpthread_t *threadIDs;   // 工作的线程 IDint minNum;             // 线程池最小线程数int maxNum;             // 线程池最大线程数int busyNum;            // 忙的线程数int liveNum;            // 存活的线程数int exitNum;            // 需要销毁的线程数pthread_mutex_t mutexPool;  // 锁整个线程池pthread_mutex_t mutexBusy;  // 锁 busyNum 变量pthread_cond_t notFull;     // 条件变量判断队列是否满了pthread_cond_t notEmpty;    // 任务队列是否为空int shutdown;               // 判断是否需要销毁线程池,销毁为 1,反之为 0
} ThreadPool;ThreadPool *threadpoolCreate(int min, int max, int queueSize)
{// 创建线程池ThreadPool* pool = (ThreadPool*) malloc(sizeof(ThreadPool));do {if (pool == NULL) {printf("malloc threadpool fail ... \n");break;}// 创建工作线程 id 数组pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL) {printf("malloc threadIDs fail ...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;   // 目前存活的线程数,初始化时将在后面进行创建pool->exitNum = 0;     // 需要退出的线程数,即管理线程计算后得出需要退出的线程数量// 初始化锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0) {printf("mutex or condition init fail ...\n");break;}// 创建任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);if (pool->taskQ == NULL) {printf("malloc taskQ fail...\n");break;}pool->queueCapacity = queueSize;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;pool->shutdown = 0;// 创建管理者线程pthread_create(&pool->managerID, NULL, manager, pool);// 工作者线程for (int i = 0; i < min; i++) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);if (pool && pool->taskQ) free(pool->taskQ);if (pool && pool->threadIDs) free(pool->threadIDs);if (pool) free(pool);return NULL;
}int threadPoolDestroy(ThreadPool *pool)
{// 线程池为空if (pool == NULL) {return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒消费者线程for (int i = 0; i < pool->liveNum; i++) {pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ) {free(pool->taskQ);}if (pool->threadIDs) {free(pool->threadIDs);}// 销毁锁和条件变量pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}// 任务添加
void threadPoolAdd(ThreadPool *pool, void (*func)(void*), void *arg)
{pthread_mutex_lock(&pool->mutexPool);// 若任务队列满了则阻塞while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);return;}pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;// 任务添加后,即可唤醒消费者pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}int threadPoolBusyNum(ThreadPool *pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool *pool)
{pthread_mutex_lock(&pool->mutexPool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return liveNum;
}void *worker(void *arg)
{ThreadPool* pool = (ThreadPool*) arg;while (1) {pthread_mutex_lock(&pool->mutexPool);// 任务队列为空则工作线程阻塞while (pool->queueSize == 0 && !pool->shutdown) {pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 若线程唤醒后需要被销毁,则进行销毁,注意销毁时数量必须多于最小值if (pool->exitNum > 0) {pool->exitNum--;if (pool->liveNum > pool->minNum) {pool->liveNum--;// 销毁前需要释放锁pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 若线程池关闭,则销毁当前线程if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 以下为正常情况,即从任务队列中取出任务进行处理Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 取出任务后,头节点后移,此处为循环队列pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;// notFull 表示任务队列已满的条件变量,此处处理了一个任务队列// 即可唤醒生产任务的线程继续生产任务pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);// 处理任务task.function(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void *manager(void *arg)
{ThreadPool* pool = (ThreadPool*) arg;// 若线程池不关闭,开始循环while (!pool->shutdown) {sleep(3);// 读取线程池信息,作为管理的依据pthread_mutex_lock(&pool->mutexPool);// 任务个数int queueSize = pool->queueSize;// 存活的线程数int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);pthread_mutex_lock(&pool->mutexBusy);// 正在忙的线程数int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);// 现在得到了任务数,正在忙的线程,以及目前总线程数// 管理者线程将根据这三个数据对线程进行管理// 若任务个数 > 存活线程个数 && 存活线程数 < 最大线程数 则添加线程if (queueSize > (liveNum - busyNum) && liveNum < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int count = 0;for (int i = 0; i < pool->maxNum && count < NUMBER && liveNum < pool->maxNum; i++){if (pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);count++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 忙线程 * 2 < 存活的线程数 && 存活线程数 > 最小线程数 需要销毁多余的线程if (pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 唤醒阻塞的线程,让其自动销毁for (int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}// 销毁当前线程,并在线程池中注销
void threadExit(ThreadPool *pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++) {if (pool->threadIDs[i] == tid) {pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}
关键字:线程池实现服务端

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: