编写负载均衡模块
代码整体结构
oj_control.hpp
// code: #include...
// input: ""
void Judge(const std::string &number, const std::string in_json, std::string *out_json)
{// 0. 根据题目编号,直接拿到对应的题目细节// 1. in_json进行反序列化,得到题目的id,得到用户提交源代码,input// 2. 重新拼接用户代码+测试用例代码,形成新的代码// 3. 选择负载最低的主机(差错处理)// 4. 然后发起http请求,得到结果// 5. 将结果赋值给out_json
}
建立一个配置文件
47.94.228.92:8082
47.94.228.92:8083
47.94.228.92:8084
oj_control.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <mutex>
#include <cassert>#include "../comm/util.hpp"
#include "../comm/log.hpp"
#include "oj_model.hpp"
#include "oj_view.hpp"namespace ns_control
{using namespace std;using namespace ns_log;using namespace ns_util;using namespace ns_model;using namespace ns_view;//提供服务的主机class Machine{public:std::string ip; //编译服务的ipint port; //编译服务的端口uint64_t load; //编译服务的负载std::mutex *mtx; //mutex禁止拷贝,使用指针来完成public:Machine() : ip(""), port(0), load(0), mtx(nullptr){}~Machine(){}};const std::string service_machine = "./conf/service_machine.conf";//负载均衡模块class LoadBlance{private://可以提供编译服务的所有主机// 每一台主机都有自己的下标,充当当前主机的idstd::vector<Machine> machines;//所有在线的主机idstd::vector<int> online;//所有离线的主机idstd::vector<int> offline;public:LoadBlance(){assert(LoadConf());}~LoadBlance(){}public:bool LoadConf(const std::string &machine_list){}bool SmartChoice(){}void OfflineMachine(){}void OnlineMachine(){}};
}
编写负载均衡器代码
#pragma once#include <iostream>
#include <string>
#include <fstream>
#include <vector>
#include <mutex>
#include <cassert>#include "../comm/util.hpp"
#include "../comm/log.hpp"
#include "oj_model.hpp"
#include "oj_view.hpp"namespace ns_control
{using namespace std;using namespace ns_log;using namespace ns_util;using namespace ns_model;using namespace ns_view;//提供服务的主机class Machine{public:std::string ip; //编译服务的ipint port; //编译服务的端口uint64_t load; //编译服务的负载std::mutex *mtx; //mutex禁止拷贝,使用指针来完成public:Machine() : ip(""), port(0), load(0), mtx(nullptr){}~Machine(){}public:// 提升主机负载void IncLoad(){if (mtx) mtx->lock();++load;if (mtx) mtx->unlock();}// 减少主机负载void DecLoad(){if (mtx) mtx->lock();--load;if (mtx) mtx->unlock();}// 获取主机负载,没有太大的意义,只是为了统一接口uint64_t Load(){uint64_t _load = 0;if (mtx) mtx->lock();_load = load;if (mtx) mtx->unlock();return _load;}};const std::string service_machine = "./conf/service_machine.conf";//负载均衡模块class LoadBlance{private://可以提供编译服务的所有主机// 每一台主机都有自己的下标,充当当前主机的idstd::vector<Machine> machines;//所有在线的主机idstd::vector<int> online;//所有离线的主机idstd::vector<int> offline;// 保证LoadBlance它的数据安全std::mutex mtx;public:LoadBlance(){assert(LoadConf());LOG(INFO) << "加载 " << service_machine << " 成功" << "\n";}~LoadBlance(){}public:bool LoadConf(const std::string &machine_conf){std::ifstream in(machine_conf);if (!in.is_open()){LOG(FATAL) << " 加载: " << machine_conf << " 失败" << "\n";return false;}std::string line;while (std::getline(in, line)){std::vector<std::string> tokens;StringUtil::SplitString(line, &tokens, ":");if (tokens.size() != 2){LOG(WARNING) << " 切分 " << line << " 失败" << "\n";continue;}Machine m;m.ip = tokens[0];m.port = atoi(tokens[1].c_str());m.load = 0;m.mtx = new std::mutex();online.push_back(machines.size());machines.push_back(m);}in.close();return true;}// id: 输出型参数// m : 输出型参数bool SmartChoice(){// 1. 使用选择好的主机(更新该主机的负载)// 2. 我们需要可能离线该主机mtx.lock();// 负载均衡的算法// 1. 随机数+hash// 2. 轮询+hashint online_num = online.size();if (online_num == 0){mtx.unlock();LOG(FATAL) << " 所有的后端编译主机已经离线, 请运维的同事尽快查看" << "\n";return false;}// 通过遍历的方式,找到所有负载最小的机器*id = online[0];*m = &machines[online[0]];uint64_t min_load = machines[online[0]].Load();for (int i = 1; i < online_num; i++){uint64_t curr_load = machines[online[i]].Load();if (min_load > curr_load){min_load = curr_load;*id = online[i];*m = &machines[online[i]];}}mtx.unlock();return true;}void OfflineMachine(){}void OnlineMachine(){}};
编写judge功能
#pragma once#include <iostream>
#include <string>
#include <fstream>
#include <vector>
#include <algorithm>
#include <mutex>
#include <cassert>
#include <json/json.h>#include "../comm/util.hpp"
#include "../comm/log.hpp"
#include "../comm/httplib.h"
#include "oj_model.hpp"
#include "oj_view.hpp"namespace ns_control
{using namespace std;using namespace ns_log;using namespace ns_util;using namespace ns_model;using namespace ns_view;using namespace httplib;//提供服务的主机class Machine{public:std::string ip; //编译服务的ipint port; //编译服务的端口uint64_t load; //编译服务的负载std::mutex *mtx; //mutex禁止拷贝,使用指针来完成public:Machine() : ip(""), port(0), load(0), mtx(nullptr){}~Machine(){}public:// 提升主机负载void IncLoad(){if (mtx) mtx->lock();++load;if (mtx) mtx->unlock();}// 减少主机负载void DecLoad(){if (mtx) mtx->lock();--load;if (mtx) mtx->unlock();}void ResetLoad(){if(mtx) mtx->lock();load = 0;if(mtx) mtx->unlock();}// 获取主机负载,没有太大的意义,只是为了统一接口uint64_t Load(){uint64_t _load = 0;if (mtx) mtx->lock();_load = load;if (mtx) mtx->unlock();return _load;}};const std::string service_machine = "./conf/service_machine.conf";//负载均衡模块class LoadBlance{private://可以提供编译服务的所有主机// 每一台主机都有自己的下标,充当当前主机的idstd::vector<Machine> machines;//所有在线的主机idstd::vector<int> online;//所有离线的主机idstd::vector<int> offline;// 保证LoadBlance它的数据安全std::mutex mtx;public:LoadBlance(){assert(LoadConf());LOG(INFO) << "加载 " << service_machine << " 成功" << "\n";}~LoadBlance(){}public:bool LoadConf(const std::string &machine_conf){std::ifstream in(machine_conf);if (!in.is_open()){LOG(FATAL) << " 加载: " << machine_conf << " 失败" << "\n";return false;}std::string line;while (std::getline(in, line)){std::vector<std::string> tokens;StringUtil::SplitString(line, &tokens, ":");if (tokens.size() != 2){LOG(WARNING) << " 切分 " << line << " 失败" << "\n";continue;}Machine m;m.ip = tokens[0];m.port = atoi(tokens[1].c_str());m.load = 0;m.mtx = new std::mutex();online.push_back(machines.size());machines.push_back(m);}in.close();return true;}// id: 输出型参数// m : 输出型参数bool SmartChoice(){// 1. 使用选择好的主机(更新该主机的负载)// 2. 我们需要可能离线该主机mtx.lock();// 负载均衡的算法// 1. 随机数+hash// 2. 轮询+hashint online_num = online.size();if (online_num == 0){mtx.unlock();LOG(FATAL) << " 所有的后端编译主机已经离线, 请运维的同事尽快查看" << "\n";return false;}// 通过遍历的方式,找到所有负载最小的机器*id = online[0];*m = &machines[online[0]];uint64_t min_load = machines[online[0]].Load();for (int i = 1; i < online_num; i++){uint64_t curr_load = machines[online[i]].Load();if (min_load > curr_load){min_load = curr_load;*id = online[i];*m = &machines[online[i]];}}mtx.unlock();return true;}void OfflineMachine(int which){mtx.lock();for(auto iter = online.begin(); iter != online.end(); iter++){if(*iter == which){machines[which].ResetLoad();//要离线的主机已经找到啦online.erase(iter);offline.push_back(which);break; //因为break的存在,所有我们暂时不考虑迭代器失效的问题}}mtx.unlock();}void OnlineMachine(){//我们统一上线,后面统一解决mtx.lock();online.insert(online.end(), offline.begin(), offline.end());offline.erase(offline.begin(), offline.end());mtx.unlock();LOG(INFO) << "所有的主机有上线啦!" << "\n";}//for testvoid ShowMachines(){mtx.lock();std::cout << "当前在线主机列表: ";for(auto &id : online){std::cout << id << " ";}std::cout << std::endl;std::cout << "当前离线主机列表: ";for(auto &id : offline){std::cout << id << " ";}std::cout << std::endl;mtx.unlock();}};class Control{private:Model model_; //提供后台数据View view_; //提供网页渲染功能LoadBlance load_blance_; //核心负载均衡器public:Control(){}~Control(){}public://根据题目数据构建网页// html: 输出型参数bool AllQuestions(string *html){bool ret = true;vector<struct Question> all;if (model_.GetAllQuestions(&all)){// 获取题目信息成功,将所有的题目数据构建成网页view_.AllExpandHtml(all, html);}else{*html = "获取题目失败, 形成题目列表失败";ret = false;}return ret;}bool Question(const string &number, string *html){bool ret = true;struct Question q;if (model_.GetOneQuestion(number, &q)){// 获取指定题目信息成功,将所有的题目数据构建成网页view_.OneExpandHtml(q, html);}else{*html = "指定题目: " + number + " 不存在!";ret = false;}return ret;}// code: #include...// input: ""void Judge(const std::string &number, const std::string in_json, std::string *out_json){// LOG(DEBUG) << in_json << " \nnumber:" << number << "\n";// 0. 根据题目编号,直接拿到对应的题目细节struct Question q;model_.GetOneQuestion(number, &q);// 1. in_json进行反序列化,得到题目的id,得到用户提交源代码,inputJson::Reader reader;Json::Value in_value;reader.parse(in_json, in_value);std::string code = in_value["code"].asString();// 2. 重新拼接用户代码+测试用例代码,形成新的代码Json::Value compile_value;compile_value["input"] = in_value["input"].asString();compile_value["code"] = code + "\n" + q.tail;compile_value["cpu_limit"] = q.cpu_limit;compile_value["mem_limit"] = q.mem_limit;Json::FastWriter writer;std::string compile_string = writer.write(compile_value);// 3. 选择负载最低的主机(差错处理)// 规则: 一直选择,直到主机可用,否则,就是全部挂掉while(true){int id = 0;Machine *m = nullptr;if(!load_blance_.SmartChoice(&id, &m)){break;}// 4. 然后发起http请求,得到结果Client cli(m->ip, m->port);m->IncLoad();LOG(INFO) << " 选择主机成功, 主机id: " << id << " 详情: " << m->ip << ":" << m->port << " 当前主机的负载是: " << m->Load() << "\n";if(auto res = cli.Post("/compile_and_run", compile_string, "application/json;charset=utf-8")){// 5. 将结果赋值给out_jsonif(res->status == 200){*out_json = res->body;m->DecLoad();LOG(INFO) << "请求编译和运行服务成功..." << "\n";break;}m->DecLoad();}else{//请求失败LOG(ERROR) << " 当前请求的主机id: " << id << " 详情: " << m->ip << ":" << m->port << " 可能已经离线"<< "\n";load_blance_.OfflineMachine(id);load_blance_.ShowMachines(); //仅仅是为了用来调试}}}};
}
postman综合调试
oj_server.cc
#include <iostream>
#include "../comm/httplib.h"
#include "oj_control.hpp"using namespace httplib;
using namespace ns_control;int main()
{//用户请求的服务路由功能Server svr;Control ctrl;// 获取所有的题目列表svr.Get("/all_questions", [&ctrl](const Request &req, Response &resp){//返回一张包含有所有题目的html网页std::string html;ctrl.AllQuestions(&html);//用户看到的是什么呢??网页数据 + 拼上了题目相关的数据resp.set_content(html, "text/html; charset=utf-8");//resp.set_content("这是所有题目的列表", "text/plain; charset=utf-8");});// 用户要根据题目编号,获取题目的内容// /question/100 -> 正则匹配// R"()", 原始字符串raw string,保持字符串内容的原貌,不用做相关的转义svr.Get(R"(/question/(\d+))", [&ctrl](const Request &req, Response &resp){std::string number = req.matches[1];std::string html;ctrl.Question(number, &html);resp.set_content(html, "text/html; charset=utf-8");//resp.set_content("这是指定的一道题:" + number, "text/plain; charset=utf-8");});// 用户提交代码,使用我们的判题功能(1. 每道题的测试用例 2. compile_and_run)svr.Post(R"(/judge/(\d+))", [&ctrl](const Request &req, Response &resp){std::string number = req.matches[1];std::string result_json;ctrl.Judge(number, req.body, &result_json);resp.set_content(result_json, "application/json;charset=utf-8");//resp.set_content("指定题目的判题: " + number, "text/plain; charset=utf-8");});svr.set_base_dir("./wwwroot");svr.listen("0.0.0.0", 8080);return 0;
}
makefile
oj_server:oj_server.ccg++ -o $@ $^ -std=c++11 -lpthread -lctemplate -ljsoncpp.PHONY:clean
clean:rm -f oj_server
将oj_server和compile_server都重新编译一下
启动三台服务器,运行8082,8083和8084
再运行oj_server
要给编译模块添加—D条件编译掉测试用例中的头文件incldue
compiler.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>#include "../comm/util.hpp"
#include "../comm/log.hpp"// 只负责进行代码的编译namespace ns_compiler
{// 引入路径拼接功能using namespace ns_util;using namespace ns_log;class Compiler{public:Compiler(){}~Compiler(){}//返回值:编译成功:true,否则:false//输入参数:编译的文件名//file_name: 1234//1234 -> ./temp/1234.cpp//1234 -> ./temp/1234.exe//1234 -> ./temp/1234.stderrstatic bool Compile(const std::string &file_name){pid_t pid = fork();if(pid < 0){LOG(ERROR) << "内部错误,创建子进程失败" << "\n";return false;}else if (pid == 0){umask(0);int _stderr = open(PathUtil::CompilerError(file_name).c_str(), O_CREAT | O_WRONLY, 0644);if(_stderr < 0){LOG(WARNING) << "没有成功形成stderr文件" << "\n";exit(1);}//重定向标准错误到_stderrdup2(_stderr, 2);//程序替换,并不影响进程的文件描述符表//子进程: 调用编译器,完成对代码的编译工作//g++ -o target src -std=c++11execlp("g++", "g++", "-o", PathUtil::Exe(file_name).c_str(),\PathUtil::Src(file_name).c_str(), "-D", "COMPILER_ONLINE","-std=c++11", nullptr/*不要忘记*/);LOG(ERROR) << "启动编译器g++失败,可能是参数错误" << "\n";exit(2);}else{waitpid(pid, nullptr, 0);//编译是否成功,就看有没有形成对应的可执行程序if(FileUtil::IsFileExists(PathUtil::Exe(file_name))){LOG(INFO) << PathUtil::Src(file_name) << " 编译成功!" << "\n";return true;}}LOG(ERROR) << "编译失败,没有形成可执行程序" << "\n";return false;}};
}
postman
输入地址http://47.94.228.92:8080/judge/2
{"code" : "#include <iostream>\n#include <vector>\n#include <algorithm>\nusing namespace std;\nclass Solution\n{\npublic:\nint Max(const vector<int> &v)\n{\nreturn 0;\n}\n};\n","input" : ""
}