一、进程间通信介绍
1.1、进程间通信的目的
- 数据传输:一个进程需要将它的数据发送给另一个进程 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.2、进程间通信的必要性
- 资源共享:粗多应用需要多个进程来完成复杂的任务,而这些进程间需要共享资源,通过进程通信来避免不一致的资源冲突;同时,通过进程间通信,进程间可以更安全,高效的共享资源。
- 数据交换:在多进程应用中,不同进程需要交换数据来完成任务。
- 任务分解和并行处理:为了提高应用程序的性能和响应速度,任务可以被分解成多个子任务,由不同的进程并行处理。进程通信机制使得这些子任务能够协调工作。
1.3、进程通信的技术背景
- 进程是具有独立性的,其主要是有虚拟地址 + 页表来保障进程的独立性(即进程内核数据结构 + 进程的代码和数据)
- 进程通信的成本较高
1.4、进程通信的本质理解
- 进程间通信的前提是让不同的进程看到同一块 “ 内存 ”
- 进程通信的本质在于为独立运行的进程提供一种机制,他们能够交换数据,协调操作,和共享资源。通过各种机制确保进程在隔离的环境中能够安全,有效地协调工作,满足系统的需求。
1.5、进程通信的方式
管道:适用于父子进程间的数据流通信。
- 管道(Pipe)
- 命名管道
POSIX IPC
- 消息队列(Message Queue):适用于传递结构化数据。
- 共享内存(Shared Memory):适用于高效数据交换,但需同步控制。
- 信号量(Semaphore):主要用于同步和互斥控制。
- 互斥量
- 条件变量
- 读写锁
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
二、管道
2.1、什么是管道
管道是Unix中最古老的进程通信方式,将从一个进程链接到另一个进程的数据流称为一个管道。
分别以读写方式打开同一个问题,然后使用fork()创建子进程,双方进程各自关闭自己不需要的文件描述符。
在此期间进程相关数据结构需要复制一份,但是文件相关内核结构则不需要。
2.2、对管道的理解
-
管道是用来进行具有血缘关系的进程进行进程间通信–常用与父子进程
-
管道通过让进程间协同,并提供了访问控制
a. 写快,读慢,写满就不能在写了
b. 写慢,读快,管道中无数据时,读端需要等待
c. 写关,读0,标识读到文件结尾
d. 读关,写继续写,OS终止写进程 -
管道是面向流式的通信服务,面向字节流
-
管道是基于文件的,文件的生命周期是随进程的,管道的生命周期也是随进程的
-
管道是单向通信的,只能一端发一端收,属于半双工通信的一种特殊情况。
2.3、匿名管道
2.3.1、pipe接口函数
#include <unistd.h>
int pipe(int fildes[2]);
//pipefd是一个包含两个整数的数组,用于接收文件描述符:
//pipefd[0]:读端
//pipefd[1]:写端
linux中可以使用 pipe 系统调用来创建一个管道,这个调用会创建一个未命名的管道(匿名管道),但会两个文件描述符
2.3.2、代码实例
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <assert.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
using namespace std;int main()
{// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n != -1);(void)n;// 此处使用void来使用一下n,防止在debug模式因未使用而告警#ifdef DEBUGcout << "pipefd[0]" << pipefd[0] << endl;cout << "pipefd[1]" << pipefd[1] << endl;
#endif// 创建子进程pid_t id = fork();assert(id != -1);if (id == 0){// 子进程 -- 读// 关闭写close(pipefd[1]);char buffer[1024 * 8];while(true){// 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等// 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾!ssize_t s = read(pipefd[0],buffer,sizeof(buffer)-1);if(s > 0){buffer[s] = 0;cout << "child get a message[" << getpid() << "] Father# " << buffer << endl;}else if(s == 0){cout << "writer quit(father), me quit!!!" << endl;break;}}exit(0);}//父进程 - 写// 3. 构建单向通信的信道// 3.1 关闭父进程不需要的fdclose(pipefd[0]);string message = "我是父进程,我正在给你发信息";int count = 0;char send_buffer[1024 * 8];while(true){snprintf(send_buffer,sizeof send_buffer,"%s[%d] : %d",message.c_str(),getpid(),count++);write(pipefd[1],send_buffer,strlen(send_buffer));sleep(1);if(count == 5){cout << "write quit !" << endl;break;}}close(pipefd[1]);pid_t ret = waitpid(id,nullptr,0);cout << "id: " << id << "ret :" << ret <<endl;assert(ret > 0);(void) ret;return 0;
}
代码结果:
2.3.3、进程间通信–派发任务
//Task.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>typedef std::function<void()> func;std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;void readMySQL()
{std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process[" << getpid() << " ] 执行url解析" << std::endl;
}void cal()
{std::cout << "sub process[" << getpid() << " ] 执行加密任务" << std::endl;
}void save()
{std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务" << std::endl;
}void load()
{desc.insert({callbacks.size(), "readMySQL: 读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(), "cal: 进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(), "save: 进行数据的文件保存"});callbacks.push_back(save);
}void showHandler()
{for(const auto &iter : desc ){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()
{return callbacks.size();
}
//ProcessPool.cc#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"#define PROCESS_NUM 5using namespace std;int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{uint32_t command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command;
}void sendAndWakeup(pid_t who, int fd, uint32_t command)
{write(fd, &command, sizeof(command));cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}int main()
{// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??load();// pid: pipefdvector<pair<pid_t, int>> slots;// 先创建多个进程for (int i = 0; i < PROCESS_NUM; i++){// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);// 子进程我们让他进行读取if (id == 0){// 关闭写端close(pipefd[1]);// childwhile (true){// pipefd[0]// 等命令bool quit = false;int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞if (quit)break;// 执行对应的命令if (command >= 0 && command < handlerSize()){callbacks[command]();}else{cout << "非法command: " << command << endl;}}exit(1);}// father,进行写入,关闭读端close(pipefd[0]); // pipefd[1]slots.push_back(pair<pid_t, int>(id, pipefd[1]));}// 父进程派发任务srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// // 选择一个任务, 如果任务是从网络里面来的?// int command = rand() % handlerSize();// // 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡// int choice = rand() % slots.size();// // 把任务给指定的进程// sendAndWakeup(slots[choice].first, slots[choice].second, command);sleep(1);int select;int command;cout << "############################################" << endl;cout << "# 1. show funcitons 2.send command #" << endl;cout << "############################################" << endl;cout << "Please Select> ";cin >> select;if (select == 1)showHandler();else if (select == 2){cout << "Enter Your Command> ";// 选择任务cin >> command;// 选择进程int choice = rand() % slots.size();// 把任务给指定的进程sendAndWakeup(slots[choice].first, slots[choice].second, command);}else{cout << "选择错误,请重新选择" << endl;}}// 关闭fd, 所有的子进程都会退出for (const auto &slot : slots){close(slot.second);}// 回收所有的子进程信息for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
}
2.4、命名管道
2.4.1、mkfifo接口
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
// pathname:要创建的命名管道的路径。
// mode:文件权限,类似于 open 系统调用中的权限参数。
mkfifo
命令和函数用于创建命名管道,实现进程间通信。- 命名管道存在于文件系统中,可以通过路径名进行访问。
- 命名管道支持不同进程间的数据传输,非常适合父子进程或无关进程的通信需求。
2.4.2、命名管道实现server和cilient通信
//log.hpp
#ifndef _LOG_H_
#define _LOG_H_#include <iostream>
#include <ctime>#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3const std::string msg[] = {"Debug","Notice","Warning","Error"
};std::ostream &Log(std::string message, int level)
{std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;return std::cout;
}#endif
//comm.hpp
#ifndef _COMM_H_
#define _COMM_H_
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include<sys/wait.h>
#include <fcntl.h>
#include "Log.hpp"using namespace std;#define MODE 0666
#define SIZE 128string ipcPath = "./fifo.ipc";#endif
//server.cxx
#include "Log.hpp"
#include "comm.hpp"void getMassage(int fd)
{char buffer[SIZE];while (true){memset(buffer, '\0', sizeof(buffer));ssize_t s = read(fd, buffer, sizeof(buffer) - 1);if (s > 0){cout << "[" << getpid() << "]" << "client say > " << buffer << endl;}else if (s == 0){cerr << "[" << getpid() << "] " << "read end of file, clien quit, server quit too!" << endl;break;}else{perror("read");break;}}
}int main()
{if (mkfifo(ipcPath.c_str(), MODE) < 0){perror("mkfifo");exit(1);}Log("创建管道文件成功", Debug) << "step 1" << endl;int fd = open(ipcPath.c_str(), O_RDONLY);if (fd < 0){perror("open");exit(2);}Log("打开文件成功", Debug) << "step 2" << endl;int num = 3;for (int i = 0; i < num; i++){pid_t id = fork();if (id == 0){getMassage(fd);exit(1);}}for (int i = 0; i < num; i++){waitpid(-1, nullptr, 0);}close(fd);Log("关闭管道文件成功", Debug) << " step 3" << endl;unlink(ipcPath.c_str()); // 通信完毕,就删除文件Log("删除管道文件成功", Debug) << " step 4" << endl;return 0;
}
//client.cxx
#include "comm.hpp"int main()
{// 1. 获取管道文件int fd = open(ipcPath.c_str(), O_WRONLY);if(fd < 0){perror("open");exit(1);}// 2. ipc过程string buffer;while(true){cout << "Please Enter Message Line :> ";std::getline(std::cin, buffer);write(fd, buffer.c_str(), buffer.size());}// 3. 关闭close(fd);return 0;
}
三、共享内存
3.1、原理
System V 共享内存提供了一种高效的进程间通信机制,允许多个进程共享同一块内存区域。通过创建共享内存区域,将其连接到进程地址空间,并使用适当的同步机制,可以实现多个进程之间的数据共享和通信。
这些共享内存段通常存储在内核的共享内存区域中。
3.2、相关函数介绍
3.2.1、shmget
#include <sys/shm.h>
#include <sys/ipc.h>
int shmget(key_t key, size_t size, int shmflg);
//key:共享内存段的键值,用于唯一标识共享内存段。通常使用 ftok 函数生成。
//size:共享内存段的大小,以字节为单位。
//shmflg:共享内存段的标志,用于指定创建模式和权限。可以是以下值的按位或组合://IPC_CREAT:如果共享内存段不存在,则创建新的共享内存段。//IPC_EXCL:与 IPC_CREAT 一起使用,表示如果共享内存段已经存在,则返回错误。//权限标志(如 0666,表示读写权限)
- 共享内存段的键值应该在所有需要访问该共享内存的进程中是唯一的,通常使用
ftok
函数基于文件路径和一个字符生成键值。 - 创建共享内存段时需要注意权限设置,以确保所有需要访问该共享内存的进程具有足够的权限。
- 使用
shmget
创建共享内存段时,如果已经存在具有相同键值的共享内存段且没有指定IPC_EXCL
标志,则会返回该共享内存段的标识符。
通过 shmget
调用,可以创建或获取 System V 共享内存段,实现多个进程之间的数据共享和通信。
3.2.2、shmat
#include <sys/types.h>
#include <sys/shm.h>
void *shmat(int shmid, const void *shmaddr, int shmflg);
//shmid:共享内存段的标识符,由 shmget 返回。
//shmaddr:指定连接共享内存段的地址,通常为 NULL,表示由系统自动选择地址。
//shmflg:连接共享内存段的标志,通常为 0。
- 连接共享内存段时,通常使用
NULL
作为shmaddr
参数,让系统自动选择连接地址。 - 使用
shmat
连接共享内存后,返回的地址指针可以被用来访问共享内存中的数据。 - 连接共享内存后,应该始终使用
shmdt
分离共享内存段,以防止内存泄漏和资源浪费。
通过 shmat
调用,可以将共享内存段连接到进程的地址空间,实现多个进程之间的数据共享和通信。
3.2.3、shmdt
#include <sys/ipc.h>
#include <sys/shm.h>int shmdt(const void *shmaddr);
//shmaddr:指向共享内存段连接地址的指针。
- 使用
shmdt
分离共享内存段后,进程将无法再访问共享内存中的数据。 - 分离共享内存段不会删除共享内存,只是解除了进程对共享内存的连接,其他进程仍然可以继续访问共享内存段。
- 在进程结束时,应该确保分离共享内存段,以防止资源泄漏和系统资源浪费。
通过 shmdt
调用,可以将共享内存段从进程的地址空间中分离,实现对共享内存的安全使用和释放。
3.2.4、shmctl
#include <sys/ipc.h>
#include <sys/shm.h>int shmctl(int shmid, int cmd, struct shmid_ds *buf);
//shmid:共享内存段的标识符,由 shmget 返回。
//cmd:控制命令,用于指定要执行的操作。可以是以下命令之一://IPC_STAT:获取共享内存段的信息,并将其存储在 buf 中。//IPC_SET:设置共享内存段的信息,从 buf 中读取要设置的值。//IPC_RMID:删除共享内存段。
//buf:指向 shmid_ds 结构的指针,用于存储共享内存段的信息。
- 使用
IPC_STAT
命令可以获取共享内存段的详细信息,包括权限、大小等。 - 使用
IPC_SET
命令可以修改共享内存段的权限和其他属性。 - 使用
IPC_RMID
命令可以删除共享内存段,但要注意删除后其他进程将无法再访问该共享内存段,所以应该谨慎使用。
通过 shmctl
调用,可以对 System V 共享内存段进行各种控制操作,包括获取信息、修改属性和删除共享内存段等。
3.3、对共享内存的理解
为了让进程间通信,即需要让不同的进程看到同一份资源。
让不同进程看到同一份资源,会带来一些时序问题吗,造成数据不一致问题,
- 我们把多个进程(执行流)看到的公共的资源称为临界资源
- 把自己的进程,访问自己资源的代码称为临界区
- 为了更好的进行临界区的保护,可以让多执行流在任何时刻都只有一个进程进入临界区----这就是互斥
- 原子性:要么不做,要么做完,没有中间状态,这就称为原子性
多个执行流,互相运行的时候互不干扰,主要是不加保护的访问了同样的资源,在非临界区多个执行流互相不影响。
3.4、shm实现client和server通信
//comm.hpp
#pragma once#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <cassert>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "Log.hpp"using namespace std; //不推荐#define PATH_NAME "/home/whb"
#define PROJ_ID 0x66
#define SHM_SIZE 4096 //共享内存的大小,最好是页(PAGE: 4096)的整数倍#define FIFO_NAME "./fifo"class Init
{
public:Init(){umask(0);int n = mkfifo(FIFO_NAME, 0666);assert(n == 0);(void)n;Log("create fifo success",Notice) << "\n";}~Init(){unlink(FIFO_NAME);Log("remove fifo success",Notice) << "\n";}
};#define READ O_RDONLY
#define WRITE O_WRONLYint OpenFIFO(std::string pathname, int flags)
{int fd = open(pathname.c_str(), flags);assert(fd >= 0);return fd;
}void Wait(int fd)
{Log("等待中....", Notice) << "\n";uint32_t temp = 0;ssize_t s = read(fd, &temp, sizeof(uint32_t));assert(s == sizeof(uint32_t));(void)s;
}void Signal(int fd)
{uint32_t temp = 1;ssize_t s = write(fd, &temp, sizeof(uint32_t));assert(s == sizeof(uint32_t));(void)s;Log("唤醒中....", Notice) << "\n";
}void CloseFifo(int fd)
{close(fd);
}
//log.hpp
#ifndef _LOG_H_
#define _LOG_H_#include <iostream>
#include <ctime>#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3const std::string msg[] = {"Debug","Notice","Warning","Error"
};std::ostream &Log(std::string message, int level)
{std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;return std::cout;
}#endif
//shmClient.cc
#include "comm.hpp"int main()
{Log("child pid is : ", Debug) << getpid() << endl;key_t k = ftok(PATH_NAME, PROJ_ID);if (k < 0){Log("create key failed", Error) << " client key : " << k << endl;exit(1);}Log("create key done", Debug) << " client key : " << k << endl;// 获取共享内存int shmid = shmget(k, SHM_SIZE, 0);if(shmid < 0){Log("create shm failed", Error) << " client key : " << k << endl;exit(2);}Log("create shm success", Error) << " client key : " << k << endl;// sleep(10);char *shmaddr = (char *)shmat(shmid, nullptr, 0);if(shmaddr == nullptr){Log("attach shm failed", Error) << " client key : " << k << endl;exit(3);}Log("attach shm success", Error) << " client key : " << k << endl;// sleep(10);int fd = OpenFIFO(FIFO_NAME, WRITE);// 使用// client将共享内存看做一个char 类型的bufferwhile(true){ssize_t s = read(0, shmaddr, SHM_SIZE-1);if(s > 0){shmaddr[s-1] = 0;Signal(fd);if(strcmp(shmaddr,"quit") == 0) break;}}CloseFifo(fd);// char a = 'a';// for(; a <= 'z'; a++)// {// shmaddr[a-'a'] = a;// // 我们是每一次都向shmaddr[共享内存的起始地址]写入// // snprintf(shmaddr, SHM_SIZE - 1,\// // "hello server, 我是其他进程,我的pid: %d, inc: %c\n",\// // getpid(), a);// sleep(5);// }// strcpy(shmaddr, "quit");// 去关联int n = shmdt(shmaddr);assert(n != -1);Log("detach shm success", Error) << " client key : " << k << endl;// sleep(10);// client 不要chmctl删除return 0;
}
//shmServer.cc
#include "comm.hpp"// 是不是对应的程序,在加载的时候,会自动构建全局变量,就要调用该类的构造函数 -- 创建管道文件
// 程序退出的时候,全局变量会被析构,自动调用析构函数,会自动删除管道文件
Init init; string TransToHex(key_t k)
{char buffer[32];snprintf(buffer, sizeof buffer, "0x%x", k);return buffer;
}int main()
{// 我们之前为了通信,所做的所有的工作,属于什么工作呢:让不同的进程看到了同一份资源(内存)// 1. 创建公共的Key值key_t k = ftok(PATH_NAME, PROJ_ID);assert(k != -1);Log("create key done", Debug) << " server key : " << TransToHex(k) << endl;// 2. 创建共享内存 -- 建议要创建一个全新的共享内存 -- 通信的发起者int shmid = shmget(k, SHM_SIZE, IPC_CREAT | IPC_EXCL | 0666); //if (shmid == -1){perror("shmget");exit(1);}Log("create shm done", Debug) << " shmid : " << shmid << endl;// sleep(10);// 3. 将指定的共享内存,挂接到自己的地址空间char *shmaddr = (char *)shmat(shmid, nullptr, 0);Log("attach shm done", Debug) << " shmid : " << shmid << endl;// sleep(10);// 这里就是通信的逻辑了// 将共享内存当成一个大字符串// char buffer[SHM_SIZE];// 结论1: 只要是通信双方使用shm,一方直接向共享内存中写入数据,另一方,就可以立马看到对方写入的数据。// 共享内存是所有进程间通信(IPC),速度最快的!不需要过多的拷贝!!(不需要将数据给操作系统)// 结论2: 共享内存缺乏访问控制!会带来并发问题 【如果想,也可以进行一定程度的访问控制】int fd = OpenFIFO(FIFO_NAME, READ);for(;;){Wait(fd);// 临界区printf("%s\n", shmaddr);if(strcmp(shmaddr, "quit") == 0) break;// sleep(1);}// 4. 将指定的共享内存,从自己的地址空间中去关联int n = shmdt(shmaddr);assert(n != -1);(void)n;Log("detach shm done", Debug) << " shmid : " << shmid << endl;// sleep(10);// 5. 删除共享内存,IPC_RMID即便是有进程和当下的shm挂接,依旧删除共享内存n = shmctl(shmid, IPC_RMID, nullptr);assert(n != -1);(void)n;Log("delete shm done", Debug) << " shmid : " << shmid << endl;CloseFifo(fd);return 0;
}
四、system V消息队列
- 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
- 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
特性方面
IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
五、system V信号量
- 由于各进程要求共享资源,而且有些资源需要互斥使用,因此各进程间竞争使用这些资源,进程的这种 关系为进程的互斥
- 系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源。
- 在进程中涉及到互斥资源的程序段叫临界区
特性方面
IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核