实现匿名管道多进程任务派发
mypipe
mypipe.cc
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <algorithm>
#include "Task.hpp"
#include "Log.hpp"
using namespace std;#define PROCESS_NUM 10int waitCommand(int fd, bool &quit)
{u_int32_t command = 0;ssize_t ret = read(fd, &command, sizeof command);Log("子进程读取任务", READ) << endl;if (0 == ret){quit = true;return -1;}assert(ret == sizeof command);return command;
}int main()
{// 装载任务load();// 父进程保留每个子进程的pid与对应管道文件写端描述符vector<pair<pid_t, int>> slots;for (size_t i = 0; i < PROCESS_NUM; i++){int pipefd[2] = {0};int n = pipe(pipefd);Log("创建管道文件", CREATE) << endl;Log("打开管道文件", OPEN) << endl;assert(n != -1);// void(n);pid_t id = fork();assert(id != -1);if (0 == id){// 子进程// 接收任务// 保留读,关闭1close(pipefd[1]);while (true){bool quit = false;// 等待命令// 内部read阻塞等待int command = waitCommand(pipefd[0], quit);// 执行命令if (quit){break;}if (command > 0 && command < TASK_NUM){// 执行任务callbacks[command]();}else{cout << "非法命令: " << command + 1 << endl;}}exit(0);Log("关闭管道文件", CLOSE) << endl;}// 父进程// 保留写,关闭0// 保留每个管道对应子进程pid和写端close(pipefd[0]);slots.push_back(make_pair(id, pipefd[1]));}// 父进程// 分配任务srand((unsigned)time(nullptr));while (true){printf("######################################\n");printf("# 1.show task 2.send task #\n");printf("# 3.exit 4..... #\n");printf("######################################\n");int command = -1;cin >> command;if (1 == command){// auto it = desc.rbegin();// reverse(desc.cbegin(), desc.cend());for (auto e : desc){cout << e.second << endl;}}else if (2 == command){printf("Please chose task: ");int task = -1;cin >> task;// 派发任务给谁?int process = rand() % PROCESS_NUM;task--;write(slots[process].second, &task, sizeof task);Log("父进程发送任务", WRITE) << endl;}else if (3 == command){break;}else{cout << "非法选项: " << command << endl;}sleep(2);}// 关闭fdfor (auto e : slots){close(e.second);}for (auto e : slots){waitpid(e.first, nullptr, 0);}return 0;
}
Task
Task.hpp
#pragma once
#ifndef _TASK_H_
#define _TASK_H_
#include <unordered_map>
#include <iostream>
#include <functional>
#include <vector>
#include <unistd.h>
using namespace std;#define TASK_NUM 10unordered_map<int, string> desc;typedef function<void()> func;
vector<func> callbacks;
void task_1()
{printf("Sub Process [ %d ] 执行任务 1 ......\n", getpid());
}
void task_2()
{printf("Sub Process [ %d ] 执行任务 2 ......\n", getpid());
}
void task_3()
{printf("Sub Process [ %d ] 执行任务 3 ......\n", getpid());
}
void task_4()
{printf("Sub Process [ %d ] 执行任务 4 ......\n", getpid());
}
void task_5()
{printf("Sub Process [ %d ] 执行任务 5 ......\n", getpid());
}
void task_6()
{printf("Sub Process [ %d ] 执行任务 6 ......\n", getpid());
}
void task_7()
{printf("Sub Process [ %d ] 执行任务 7 ......\n", getpid());
}void task_8()
{printf("Sub Process [ %d ] 执行任务 8 ......\n", getpid());
}
void task_9()
{printf("Sub Process [ %d ] 执行任务 9 ......\n", getpid());
}
void task_10()
{printf("Sub Process [ %d ] 执行任务 10 ......\n", getpid());
}void load()
{desc.insert(make_pair(callbacks.size(), " task_1 "));callbacks.push_back(task_1);desc.insert(make_pair(callbacks.size(), " task_2 "));callbacks.push_back(task_2);desc.insert(make_pair(callbacks.size(), " task_3 "));callbacks.push_back(task_3);desc.insert(make_pair(callbacks.size(), " task_4 "));callbacks.push_back(task_4);desc.insert(make_pair(callbacks.size(), " task_5 "));callbacks.push_back(task_5);desc.insert(make_pair(callbacks.size(), " task_6 "));callbacks.push_back(task_6);desc.insert(make_pair(callbacks.size(), " task_7 "));callbacks.push_back(task_7);desc.insert(make_pair(callbacks.size(), " task_8 "));callbacks.push_back(task_8);desc.insert(make_pair(callbacks.size(), " task_9 "));callbacks.push_back(task_9);desc.insert(make_pair(callbacks.size(), " task_10 "));callbacks.push_back(task_10);
}#endif
Log
#ifndef _LOG_H_
#define _LOG_H_
#pragma once#include <iostream>
#include <string>#define CREATE 0
#define OPEN 1
#define WRITE 2
#define READ 3
#define CLOSE 4std::ostream &Log(std::string message, int mode)
{std::cout << "| " << message << " | " << "step " << mode << " | ";
}#endif
makefile
mypipe:mypipe.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f mypipe