一、server启动WebSockerServer
server.js会通过protoo 启动WebSocketServer
// Run a protoo WebSocketServer.
await runProtooWebSocketServer();
当用户请求连接时会带着roomId,server接收到用户请求后会判断当前房间是否存在,如果不存在会就创建一个。
具体详情见server.js的runProtoWebSockerServer()函数
async function runProtooWebSocketServer()
{
logger.info(‘running protoo WebSocketServer…’);
// Create the protoo WebSocket server.
protooWebSocketServer = new protoo.WebSocketServer(httpsServer,{maxReceivedFrameSize : 960000, // 960 KBytes.maxReceivedMessageSize : 960000,fragmentOutgoingMessages : true,fragmentationThreshold : 960000});// Handle connections from clients.
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{// The client indicates the roomId and peerId in the URL query.const u = url.parse(info.request.url, true);const roomId = u.query['roomId'];const peerId = u.query['peerId'];if (!roomId || !peerId){reject(400, 'Connection request without roomId and/or peerId');return;}logger.info('protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',roomId, peerId, info.socket.remoteAddress, info.origin);// Serialize this code into the queue to avoid that two peers connecting at// the same time with the same roomId create two separate rooms with same// roomId.queue.push(async () =>{//判断房间是否存在,不存在会创建一个const room = await getOrCreateRoom({ roomId });// Accept the protoo WebSocket connection.const protooWebSocketTransport = accept();room.handleProtooConnection({ peerId, protooWebSocketTransport });}).catch((error) =>{logger.error('room creation or room joining failed:%o', error);reject(error);});
});
}
二、创建房间
/**
- Get a Room instance (or create one if it does not exist).
*/
async function getOrCreateRoom({ roomId })
{
let room = rooms.get(roomId);
// If the Room does not exist create a new one.
if (!room)
{
logger.info(‘creating a new Room [roomId:%s]’, roomId);
//轮训出一个Worker,在该Worker上创建一个房间
const mediasoupWorker = getMediasoupWorker();
room = await Room.create({ mediasoupWorker, roomId });rooms.set(roomId, room);room.on('close', () => rooms.delete(roomId));
}
return room;
}
三、Room.create函数
创建房间时会创建Router 和createAudioLevelObserver
详情见mediasoup-demo/server/lib/Room.js
static async create({ mediasoupWorker, roomId })
{logger.info('create() [roomId:%s]', roomId);// Create a protoo Room instance.const protooRoom = new protoo.Room();// Router media codecs.const { mediaCodecs } = config.mediasoup.routerOptions;// Create a mediasoup Router. 基于该房间的worker创建Routerconst mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });// Create a mediasoup AudioLevelObserver.const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({maxEntries : 1,threshold : -80,interval : 800});const bot = await Bot.create({ mediasoupRouter });return new Room({roomId,protooRoom,mediasoupRouter,audioLevelObserver,bot});
}
四、创建Router
详情见:server/node_modules/mediasoup/src/Worker.ts
/*** Create a Router.*/
async createRouter({mediaCodecs,appData = {}}: RouterOptions = {}): Promise<Router>
{logger.debug('createRouter()');if (appData && typeof appData !== 'object')throw new TypeError('if given, appData must be an object');// This may throw.const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);const internal = { routerId: uuidv4() };//通过管道通知C++层创建Routerawait this._channel.request('worker.createRouter', internal);const data = { rtpCapabilities };const router = new Router({internal,data,channel : this._channel,payloadChannel : this._payloadChannel,appData});//保存创建的router,一个woker上面会有多个room,自然也会有多个routerthis._routers.add(router);router.on('@close', () => this._routers.delete(router));// Emit observer event. 通知观察者this._observer.safeEmit('newrouter', router);return router;
}
mediasoup-demo/server/node_modules/mediasoup/src/Channel.ts 中管理和C++通信的channel
async request(method: string, internal?: object, data?: any): Promise
{
this._nextId < 4294967295 ? ++this._nextId : (this._nextId = 1);
const id = this._nextId;logger.debug('request() [method:%s, id:%s]', method, id);if (this._closed)throw new InvalidStateError('Channel closed');const request = { id, method, internal, data };const ns = netstring.nsWrite(JSON.stringify(request));if (Buffer.byteLength(ns) > NS_MESSAGE_MAX_LEN)throw new Error('Channel request too big');//向管道中写入数据,通知c++层// This may throw if closed or remote side ended.this._producerSocket.write(ns);
}
五、C++ 层创建管道
代码见:mediasoup-demo/server/node_modules/mediasoup/worker/src/Worker.cpp
inline void Worker::OnChannelRequest(Channel::UnixStreamSocket_ /channel/, Channel::Request_ request)inline void Worker::OnChannelRequest(Channel::UnixStreamSocket_ /channel/, Channel::Request_ request)
{
case Channel::Request::MethodId::WORKER_CREATE_ROUTER:
{std::string routerId;// This may throw.SetNewRouterIdFromInternal(request->internal, routerId);auto* router = new RTC::Router(routerId);this->mapRouters[routerId] = router;MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());request->Accept();break;}
}