NET中的异步编程(四)- IO完成端口以及FileStream.BeginRead

📅 2026/7/5 3:40:46
NET中的异步编程(四)- IO完成端口以及FileStream.BeginRead
IO完成端口IO Completion Port大多数人应该或多或少地听说过IO完成端口这么个东西而且也知道它是实现高性能IO高伸缩性应用的尚方宝剑。IO完成端口是一个非常复杂的内核对象其实现的也非常巧妙细细琢磨还是非常有意思的。创建高伸缩性的应用的一个基本原则就是创建更少的线程。线程数更少首先消耗的资源就少每个线程的创建除了要浪费CPU时间外还要创建一系列的数据结构用来保存线程相关的一些信息用户栈线程上下文内核栈等。这个总共加起来大概1.5M左右那么你算算你的32位机器总共能使用多少内存那么对应地能创建多少线程可能有人讲那对于64位的就无所谓了。嗯在资源占用这方面64位确实不用担心。但是系统中可运行的线程数越多你的CPU数又是有限的8个80个。Windows的任务调度机制是每个线程会运行一个时间片然后Windows抢占式的调度另一个线程运行。那么线程数越多Windows势必要进行更频繁的线程上下文切换。线程上下文切换对系统性能的影响在这里我就不多说了你可以搜搜资料。那么如何做到创建更少的线程而又干更多的事儿呢答案就是“不等待”。相对CPU来说IO设备的速度简直低的要命。就好像飞机和拖拉机的差别一样我们可不能让拖拉机拖了飞机的后退儿。而IO完成端口就是为了这个而生的创建更少的线程干更多的事儿。IO完成端口首先不是一个我们看得见摸得着的什么插口也和我们常说的80这样的端口不同。你可以将其理解为一个数据结构或一个对象下面我会用C#的代码来辅助讲解IO完成端口仅仅是讲解这些代码并不是真实的实现Windows提供了一个CreateIoCompletionPort API来创建IO完成端口实际上这个API有两个作用创建IO完成端口和将一个IO设备与该端口绑定。创建IO完成端口时有一个很重要的参数指定同时最多能有多少个线程并行运行这就是为了保证更少的线程如果你将这个数值指定为0那么默认值就会是你机器的CPU数。IO端口里还有一个IO设备句柄列表你可以将很多设备句柄与这个端口绑定文件、Socket等//函数原型HANDLE CreateIoCompletionPort(//设备句柄HANDLE hFile,//已有的IO完成端口句柄如果这里已经指定则是将前面指定的设备与该端口绑定HANDLE hExistingCompletionPort,//因为一个IO完成端口可以绑定很多设备可以用这个来区分ULONG_PTR CompletionKey,//允许同时运行的线程数DWORD dwNumberOfConcurrentThreads);//创建一个IO完成端口HANDLE hIoPort CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,2);//创建文件如果要异步访问文件则需要指定FILE_FLAG_OVERLAPPEDHANDLE hFile CreateFile(..);//将上面创建的文件句柄与刚才创建的IO完成端口绑定不仅仅是文件可以CreateIoCompletionPort(hFile,hIoPort,1,2);除此之外我们还要为该端口创建一些供使用的线程。然后让这些线程调用Windows提供的GetQueuedCompletionStatus方法。这些线程调用了该方法后会被放到IO完成端口另外一个数据结构中一个后进先出的队列我们将其称为等待队列吧。然后该线程会休眠起来不占用CPU。然后我们可以调用像ReadFile这样的方法发起一个IO请求BOOL ReadFile(HANDLE hFile,PVOID pvBuffer,DWORD nNumBytesToRead,PDWORD pdwNumBytes,OVERLAPPED* pOverlapped);ReadFile(..overlapped);上面代码中的OVERLAPPED是一个非常重要的数据结构后面会提到。现在假设你的某个IO设备收到了一个数据包Windows就会检查这个IO设备是否跟一个IO完成端口关联了如果关联了Windows就会把这个数据包投递到这个IO完成端口。IO完成端口里还有另外一个先进先出的队列用来保存这些IO完成的数据。IO完成端口一看唔有个IO完成包投递到我这儿来了那我看看我的那个等待队列里有没有线程还在休息如果有就叫它起来干活儿。嘿还真有一个家伙还在睡觉如是IO完成端口就唤醒该线程实际上就是上面的那个GetQueuedCompletionStatus方法返回了。该方法返回时还会得到一些别的信息接收了多少个字节啊是哪个设备啊最重要的是上面提到的OVERLAPPED这个结构等等。起来后的线程就会拿着这些信息干一些后续的事儿BOOL GetQueuedCompletionStatus(HANDLE hCompletionPort,PDWORD pdwNumberOfBytesTransferred,PULONG_PTR pCompletionKey,OVERLAPPED **ppOverlapped,DWORD dwMilliseconds);//类似于下面的过程//创建一个线程Thread thread new Thread((){while(true){//如果没有IO完成通知到达该线程就在这里休眠了if(GetQueuedCompletionStatus(hIoPort,..ppOverlapped..)){//从ppOverlapped里取出所需的信息比如可能设置了一个回调函数的指针等}else{//}}});thread.Start();干完这个事儿后这个线程又会回到刚才那个队列继续躺起来其实是再次调用一下那个方法。我们要注意的是这个等待队列是后进先出的也就是说如果下次有消息来了很有可能还是上会那个线程来处理。这样做的目的还是为了提高性能不需要进行线程上下文切换。因为CPU的速度比IO设备的高出很多大部分时候我们只需要一两个线程就可以处理很多IO请求。现在假设我们的机器有2个CPU创建IO完成端口时我们指定了同时可以有2个线程运行。我们创建了4个线程放到等待队列里。现在有4个IO完成包投递过来了放在那个队列里。实际上IO完成端口只会唤醒两个线程去执行因为你指定了同时只能有两个线程运行那两个线程运行完就会立马回来继续运行别的。但是现在出了一个状况其中有一个线程执行过程中因为等待某个资源被阻塞了。那现在只有一个线程执行了那这个线程就有点吃力了。其实IO完成端口非常聪明它内部还有一个暂停运行的线程列表和一个正在运行的线程列表。如果某个线程正在运行它就把这个线程ID放到这个队列里当这个线程因为某个事儿暂停运行了它就会将其移动到另外一个列表中。IO完成端口会保证正在运行的线程列表里的数目不会超过你指定的最大并发数。一旦这个列表里的数目少于这个数而IO完成包队列里又有未处理的包IO完成端口就会看看还有没有在睡觉的线程如果有就将其唤醒干活儿。IO完成端口尽量的控制同时运行的线程数减少上下文切换浪费的时间和资源并且让线程尽量的忙起来。这里还有一个有意思的地方假设现在正在运行的两个线程其中一个调用Thread.Sleep休眠了然后IO完成端口唤醒另外一个线程让同时运行的线程数保持为2个不过过了一会儿刚才调用Sleep休眠的线程醒过来了有意思的事情发生了现在有三个线程同时运行超过了我们设置的最大并行数。这个时候IO完成端口是不会杀掉一个线程的它会让它们继续执行然后等到执行完了再让这个并行数降下去。实际上IO完成端口不仅仅可以用来处理这种异步的IO它完全可以作为一种线程间的通讯机制来使用与IO一点关系都没有我们可以调用Win API PostQueuedCompletionStatus来模拟一次IO完成这样我们的IO完成端口就会接到通知然后调用线程执行。熟悉并发里的Actor模型的同学可能觉得这有点Actor的影子了。BeginReadEndRead那么既然有IO完成端口这么个好东西如是有很多人想在.NET里也利用利用。其实大可不必在.NET里异步的IO内部就是使用了IO完成端口。每个CLR初始化后都会创建一个IO完成端口用来处理IO请求。很多人应该知道ThreadPool里的线程分为两类worker thread和io completion thread这里的io completion thread就是上一节说的跟IO完成端口相关联的那些thread。要说它跟其他的thread有什么不同没什么不同只是受IO完成端口控制而已。为了看看在.NET中是如何利用IO完成端口的我们将FileStream.BeginRead作为我们的入口点。在FileStream的Init方法里我们会看到这么一段代码if (this._isAsync){//...try{flag4 ThreadPool.BindHandle(this._handle);}finally{CodeAccessPermission.RevertAssert();}//...}我们感兴趣的就是ThreadPool.BindHandle。还记得上面对IO完成端口的描述么其实这里做的事儿就是将该文件句柄与每个CLR都初始化了的那个IO完成端口绑定。也就是说如果我们创建一个FileStream时指定了异步那么IO完成端口就会“监视”这个文件。我们再来看看BeginRead这个方法。该方法是用来发起异步IO请求的方法该方法执行后会立即返回不阻塞线程。首先看这么段代码if (!this._isAsync){return base.BeginRead(array, offset, numBytes, userCallback, stateObject);}也就是说如果我们创建FileStream时没有指定为异步就会调用基类的BeginRead方法那基类的这个方法又是如何实现的呢[HostProtection(SecurityAction.LinkDemand, ExternalThreadingtrue)]public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state){//...ReadDelegate delegate2 new ReadDelegate(this.Read);//...return delegate2.BeginInvoke(buffer, offset, count, callback, state);}其实是创建一个调用同步的Read方法的委托然后调用一下BeginInvoke方法在第二篇文章已经说过这样的调用实际上还是让线程池里的一个线程来调用我们可以称之为一种伪异步IO。这里可以得出一个结论如果你想用BeginRead那么初始化FileStream的时候就指定异步否则就不用直接用Read。那么如果创建FileStream的时候指定了异步会是什么结果呢这里的实现在BeginReadCore方法里[SecuritySafeCritical]private unsafe FileStreamAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, object stateObject, int numBufferedBytesRead){NativeOverlapped* overlappedPtr;FileStreamAsyncResult ar new FileStreamAsyncResult {_handle this._handle,_userCallback userCallback,_userStateObject stateObject,_isWrite false,_numBufferedBytes numBufferedBytesRead};ManualResetEvent event2 new ManualResetEvent(false);ar._waitHandle event2;Overlapped overlapped new Overlapped(0, 0, IntPtr.Zero, ar);//...overlappedPtr overlapped.Pack(IOCallback, bytes);//...ar._overlapped overlappedPtr;//...ReadFileNative(this._handle, bytes, offset, numBytes, overlappedPtr, out hr)上面代码中的NativeOverlapped就是在上一节我们提到的保存有回调等信息的OVERLAPPED结构在这里也是一样它保存有我们的userCallback回调。然后通过调用ReadNative发起IO请求并将这个数据结构传递进去这里的ReadNative就是对Win32 的ReadFile的封装。发起异步IO请求完毕BeginRead返回过了一会儿磁盘驱动程序将数据读回来了对应的IO完成端口收到通知IO完成端口把刚才传递进去的NativeOverlapped结构传递给IO线程IO线程从中取出IOCallback回调IOCallback回调里有对我们的userCallback回调的调用IOCallback new IOCompletionCallback(FileStream.AsyncFSCallback);private static unsafe void AsyncFSCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped){FileStreamAsyncResult asyncResult (FileStreamAsyncResult) Overlapped.Unpack(pOverlapped).AsyncResult;//...AsyncCallback callback asyncResult._userCallback;if (callback ! null){callback(asyncResult);}}在这个回调里我们会对EndRead进行调用我们看看EndRead的代码会发现其他一些东西public override unsafe int EndRead(IAsyncResult asyncResult){//...WaitHandle handle result._waitHandle;if (handle ! null){try{handle.WaitOne();}finally{handle.Close();}}NativeOverlapped* nativeOverlappedPtr result._overlapped;if (nativeOverlappedPtr ! null){Overlapped.Free(nativeOverlappedPtr);}//...return (result._numBytes result._numBufferedBytes);}首先是销毁我们在BeginRead里初始化的WaitHandle内核对象然后将NativeOverlapped结构也销毁。所以EndRead除了取回读了多少个字节的作用外还起了销毁资源的作用。所以有的时候我们想进行这么一个操作异步的发起请求但是我们并不关心该请求是否成功。如是我们就假想能不能只调用BeginXXX方法就可以了从这里看我们不能简单的调用一下BeginXXX就了事了因为在BeginXXX里分配的一些句柄和内核资源需要在EndXXX里销毁不然会造成资源泄露。