Node.js集群部署指南 🚀
引言
Node.js集群部署是提高应用可用性和性能的关键策略。本文将深入探讨Node.js集群部署的实现方案,包括进程管理、负载均衡、故障恢复等方面,帮助开发者构建高可用的Node.js应用。
集群部署概述
Node.js集群部署主要包括以下方面:
- 进程管理:主进程与工作进程的协调
- 负载均衡:请求分发与任务调度
- 故障恢复:进程监控与自动重启
- 状态同步:进程间通信与数据共享
- 资源管理:CPU和内存的合理分配
集群部署实现
集群管理器
// 集群管理器
class ClusterManager {private static instance: ClusterManager;private config: ClusterConfig;private workers: Map<number, Worker>;private metrics: Map<number, WorkerMetrics>;private state: ClusterState;private constructor() {this.workers = new Map();this.metrics = new Map();this.config = {workers: os.cpus().length,maxMemory: 1024 * 1024 * 1024, // 1GBrestartDelay: 1000,healthCheckInterval: 10000};this.state = {isStarting: false,isShuttingDown: false,startTime: Date.now(),restartCount: 0};}// 获取单例实例static getInstance(): ClusterManager {if (!ClusterManager.instance) {ClusterManager.instance = new ClusterManager();}return ClusterManager.instance;}// 初始化集群init(config: ClusterConfig): void {this.config = { ...this.config, ...config };if (cluster.isPrimary) {// 主进程逻辑this.initializeMaster();} else {// 工作进程逻辑this.initializeWorker();}}// 初始化主进程private initializeMaster(): void {console.log(`Master ${process.pid} is running`);// 启动工作进程this.startWorkers();// 设置进程事件处理this.setupProcessHandlers();// 启动健康检查this.startHealthCheck();// 设置IPC通信this.setupIpcCommunication();}// 启动工作进程private startWorkers(): void {this.state.isStarting = true;for (let i = 0; i < this.config.workers; i++) {this.createWorker();}this.state.isStarting = false;}// 创建工作进程private createWorker(): void {const worker = cluster.fork();this.workers.set(worker.id, worker);this.metrics.set(worker.id, {pid: worker.process.pid!,cpu: 0,memory: 0,requests: 0,errors: 0,lastHeartbeat: Date.now()});console.log(`Worker ${worker.id} started`);// 设置工作进程事件处理this.setupWorkerHandlers(worker);}// 设置工作进程事件处理private setupWorkerHandlers(worker: Worker): void {// 监听在线状态worker.on('online', () => {console.log(`Worker ${worker.id} is online`);});// 监听退出事件worker.on('exit', (code, signal) => {console.log(`Worker ${worker.id} died with code ${code} and signal ${signal}`);this.handleWorkerExit(worker, code, signal);});// 监听错误事件worker.on('error', error => {console.error(`Worker ${worker.id} error:`, error);this.metrics.get(worker.id)!.errors++;});// 监听消息事件worker.on('message', message => {this.handleWorkerMessage(worker, message);});}// 处理工作进程退出private handleWorkerExit(worker: Worker,code: number,signal: string): void {this.workers.delete(worker.id);this.metrics.delete(worker.id);// 非正常关闭时重启进程if (!this.state.isShuttingDown && code !== 0) {console.log(`Restarting worker ${worker.id}`);setTimeout(() => {this.createWorker();this.state.restartCount++;}, this.config.restartDelay);}}// 处理工作进程消息private handleWorkerMessage(worker: Worker, message: any): void {if (message.type === 'heartbeat') {// 更新心跳时间this.metrics.get(worker.id)!.lastHeartbeat = Date.now();// 更新指标this.updateWorkerMetrics(worker.id, message.metrics);}}// 更新工作进程指标private updateWorkerMetrics(workerId: number,metrics: WorkerMetrics): void {const currentMetrics = this.metrics.get(workerId)!;currentMetrics.cpu = metrics.cpu;currentMetrics.memory = metrics.memory;currentMetrics.requests = metrics.requests;}// 设置进程事件处理private setupProcessHandlers(): void {// 处理未捕获的异常process.on('uncaughtException', error => {console.error('Uncaught exception:', error);this.gracefulShutdown();});// 处理进程信号process.on('SIGTERM', () => {console.log('Received SIGTERM signal');this.gracefulShutdown();});process.on('SIGINT', () => {console.log('Received SIGINT signal');this.gracefulShutdown();});}// 启动健康检查private startHealthCheck(): void {setInterval(() => {this.checkWorkersHealth();}, this.config.healthCheckInterval);}// 检查工作进程健康状态private checkWorkersHealth(): void {const now = Date.now();for (const [workerId, metrics] of this.metrics.entries()) {// 检查心跳超时if (now - metrics.lastHeartbeat > this.config.healthCheckInterval * 2) {console.warn(`Worker ${workerId} heartbeat timeout`);this.restartWorker(workerId);}// 检查内存使用if (metrics.memory > this.config.maxMemory) {console.warn(`Worker ${workerId} memory exceeded`);this.restartWorker(workerId);}}}// 重启工作进程private restartWorker(workerId: number): void {const worker = this.workers.get(workerId);if (worker) {console.log(`Restarting worker ${workerId}`);// 停止接收新请求worker.disconnect();// 等待一段时间后强制终止setTimeout(() => {if (!worker.isDead()) {worker.kill();}}, 5000);}}// 设置IPC通信private setupIpcCommunication(): void {process.on('message', (message: any) => {if (message.type === 'broadcast') {this.broadcastToWorkers(message.data);}});}// 广播消息到所有工作进程private broadcastToWorkers(data: any): void {for (const worker of this.workers.values()) {worker.send({type: 'broadcast',data});}}// 优雅关闭async gracefulShutdown(): Promise<void> {this.state.isShuttingDown = true;console.log('Starting graceful shutdown...');// 停止接收新请求for (const worker of this.workers.values()) {worker.disconnect();}// 等待工作进程退出await this.waitForWorkersToExit();console.log('All workers have exited');process.exit(0);}// 等待工作进程退出private async waitForWorkersToExit(): Promise<void> {return new Promise(resolve => {setInterval(() => {if (this.workers.size === 0) {resolve();}}, 1000);});}// 初始化工作进程private initializeWorker(): void {console.log(`Worker ${process.pid} started`);// 设置进程事件处理this.setupWorkerProcessHandlers();// 启动心跳this.startHeartbeat();// 设置IPC通信this.setupWorkerIpcCommunication();}// 设置工作进程事件处理private setupWorkerProcessHandlers(): void {// 处理未捕获的异常process.on('uncaughtException', error => {console.error('Worker uncaught exception:', error);process.exit(1);});// 处理未处理的Promise拒绝process.on('unhandledRejection', (reason, promise) => {console.error('Worker unhandled rejection:', reason);});}// 启动心跳private startHeartbeat(): void {setInterval(() => {if (process.send) {process.send({type: 'heartbeat',metrics: {cpu: process.cpuUsage(),memory: process.memoryUsage().heapUsed,requests: 0, // 需要应用层实现errors: 0 // 需要应用层实现}});}}, 5000);}// 设置工作进程IPC通信private setupWorkerIpcCommunication(): void {process.on('message', (message: any) => {if (message.type === 'broadcast') {this.handleBroadcastMessage(message.data);}});}// 处理广播消息private handleBroadcastMessage(data: any): void {// 处理广播消息的逻辑console.log('Received broadcast:', data);}
}// 负载均衡器
class LoadBalancer {private algorithm: string;private workers: Map<number, Worker>;private metrics: Map<number, WorkerMetrics>;constructor(algorithm: string = 'round-robin',workers: Map<number, Worker>,metrics: Map<number, WorkerMetrics>) {this.algorithm = algorithm;this.workers = workers;this.metrics = metrics;}// 选择工作进程selectWorker(): Worker | null {switch (this.algorithm) {case 'round-robin':return this.roundRobin();case 'least-connections':return this.leastConnections();case 'least-cpu':return this.leastCpu();default:return this.roundRobin();}}// 轮询算法private roundRobin(): Worker | null {const workers = Array.from(this.workers.values());if (workers.length === 0) {return null;}const worker = workers[0];this.workers.delete(worker.id);this.workers.set(worker.id, worker);return worker;}// 最少连接算法private leastConnections(): Worker | null {let minConnections = Infinity;let selectedWorker: Worker | null = null;for (const [workerId, worker] of this.workers.entries()) {const metrics = this.metrics.get(workerId);if (metrics && metrics.requests < minConnections) {minConnections = metrics.requests;selectedWorker = worker;}}return selectedWorker;}// 最低CPU使用率算法private leastCpu(): Worker | null {let minCpu = Infinity;let selectedWorker: Worker | null = null;for (const [workerId, worker] of this.workers.entries()) {const metrics = this.metrics.get(workerId);if (metrics && metrics.cpu < minCpu) {minCpu = metrics.cpu;selectedWorker = worker;}}return selectedWorker;}
}// 接口定义
interface ClusterConfig {workers: number;maxMemory: number;restartDelay: number;healthCheckInterval: number;
}interface ClusterState {isStarting: boolean;isShuttingDown: boolean;startTime: number;restartCount: number;
}interface WorkerMetrics {pid: number;cpu: number;memory: number;requests: number;errors: number;lastHeartbeat: number;
}// 使用示例
if (cluster.isPrimary) {// 主进程const clusterManager = ClusterManager.getInstance();clusterManager.init({workers: 4,maxMemory: 1024 * 1024 * 1024,restartDelay: 1000,healthCheckInterval: 10000});// 创建HTTP服务器const server = http.createServer();server.on('request', (req, res) => {// 使用负载均衡器选择工作进程const loadBalancer = new LoadBalancer('round-robin',clusterManager.workers,clusterManager.metrics);const worker = loadBalancer.selectWorker();if (worker) {worker.send({type: 'request',data: {url: req.url,method: req.method,headers: req.headers}});} else {res.writeHead(503);res.end('Service Unavailable');}});server.listen(3000);} else {// 工作进程const clusterManager = ClusterManager.getInstance();clusterManager.init({} as ClusterConfig);// 处理请求process.on('message', message => {if (message.type === 'request') {// 处理请求的逻辑console.log('Worker handling request:', message.data);}});
}
最佳实践与建议
-
进程管理
- 合理设置进程数
- 监控进程状态
- 自动故障恢复
- 优雅关闭处理
-
负载均衡
- 选择合适算法
- 考虑进程负载
- 动态调整策略
- 避免单点故障
-
状态管理
- 使用共享存储
- 保持数据一致
- 处理并发访问
- 避免竞态条件
-
监控告警
- 实时监控指标
- 设置告警阈值
- 及时处理异常
- 记录运行日志
总结
Node.js集群部署需要考虑以下方面:
- 进程管理和负载均衡
- 故障检测和自动恢复
- 进程间通信和状态同步
- 资源管理和性能优化
- 监控和运维支持
通过合理的集群部署,可以提高Node.js应用的可用性和性能。
学习资源
- Node.js集群文档
- 进程管理工具
- 负载均衡策略
- 高可用架构
- 运维最佳实践
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻