简介
主要分析linux平台下的,即reactive_socket_service_base
和reactive_socket_service
发起
由basic_socket_acceptor
调用async_accept
,前提是需要调用open
创建socket添加到reactor中。其定义为
template <typename SocketService, typename AcceptHandler>BOOST_ASIO_INITFN_RESULT_TYPE(AcceptHandler,void (boost::system::error_code))async_accept(basic_socket<protocol_type, SocketService>& peer,endpoint_type& peer_endpoint, BOOST_ASIO_MOVE_ARG(AcceptHandler) handler){// If you get an error on the following line it means that your handler does// not meet the documented type requirements for a AcceptHandler.BOOST_ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;return this->get_service().async_accept(this->get_implementation(), peer,&peer_endpoint, BOOST_ASIO_MOVE_CAST(AcceptHandler)(handler));}
peer
:为连接后的socket
peer_endpoint
:对端的端口信息
handler
:连接成功后的处理器
其底层是调用reactive_socket_service
的async_accept
template <typename Socket, typename Handler>void async_accept(implementation_type& impl, Socket& peer,endpoint_type* peer_endpoint, Handler& handler){bool is_continuation =boost_asio_handler_cont_helpers::is_continuation(handler);// Allocate and construct an operation to wrap the handler.typedef reactive_socket_accept_op<Socket, Protocol, Handler> op;typename op::ptr p = { boost::asio::detail::addressof(handler),boost_asio_handler_alloc_helpers::allocate(sizeof(op), handler), 0 };p.p = new (p.v) op(impl.socket_, impl.state_, peer,impl.protocol_, peer_endpoint, handler);BOOST_ASIO_HANDLER_CREATION((p.p, "socket", &impl, "async_accept"));start_accept_op(impl, p.p, is_continuation, peer.is_open());p.v = p.p = 0;}
首先创建reactive_socket_accept_op
,其会动态分配内存handler,如果开启了小内存回收,分配策略是使用thread_info::allocate
,否则是使用new
void* asio_handler_allocate(std::size_t size, ...)
{
#if !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
# if defined(BOOST_ASIO_HAS_IOCP)typedef detail::win_iocp_io_service io_service_impl;typedef detail::win_iocp_thread_info thread_info;
# else // defined(BOOST_ASIO_HAS_IOCP)typedef detail::task_io_service io_service_impl;typedef detail::task_io_service_thread_info thread_info;
# endif // defined(BOOST_ASIO_HAS_IOCP)typedef detail::call_stack<io_service_impl, thread_info> call_stack;return thread_info::allocate(call_stack::top(), size);
#else // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)return ::operator new(size);
#endif // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
}
然后调用reactive_socket_service_base
的start_accept_op
将reactive_socket_accept_op添加到per_descriptor_data
的队列中
void reactive_socket_service_base::start_accept_op(reactive_socket_service_base::base_implementation_type& impl,reactor_op* op, bool is_continuation, bool peer_is_open)
{if (!peer_is_open)start_op(impl, reactor::read_op, op, true, is_continuation, false);else{op->ec_ = boost::asio::error::already_open;reactor_.post_immediate_completion(op, is_continuation);}
}
其内部是调用start_op
void reactive_socket_service_base::start_op(reactive_socket_service_base::base_implementation_type& impl,int op_type, reactor_op* op, bool is_continuation,bool is_non_blocking, bool noop)
{if (!noop){if ((impl.state_ & socket_ops::non_blocking)|| socket_ops::set_internal_non_blocking(impl.socket_, impl.state_, true, op->ec_)){reactor_.start_op(op_type, impl.socket_,impl.reactor_data_, op, is_continuation, is_non_blocking);return;}}reactor_.post_immediate_completion(op, is_continuation);
}
对于非阻塞socket 是调用 reactor的start_op
,以epoll_reactor
为例,添加到descriptor_data的read队列中,等待io的回调
void epoll_reactor::start_op(int op_type, socket_type descriptor,epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,bool is_continuation, bool allow_speculative)
{if (!descriptor_data){op->ec_ = boost::asio::error::bad_descriptor;post_immediate_completion(op, is_continuation);return;}mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);if (descriptor_data->shutdown_){post_immediate_completion(op, is_continuation);return;}if (descriptor_data->op_queue_[op_type].empty()){if (allow_speculative&& (op_type != read_op|| descriptor_data->op_queue_[except_op].empty())){if (op->perform()){descriptor_lock.unlock();io_service_.post_immediate_completion(op, is_continuation);return;}if (op_type == write_op){if ((descriptor_data->registered_events_ & EPOLLOUT) == 0){epoll_event ev = { 0, { 0 } };ev.events = descriptor_data->registered_events_ | EPOLLOUT;ev.data.ptr = descriptor_data;if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0){descriptor_data->registered_events_ |= ev.events;}else{op->ec_ = boost::system::error_code(errno,boost::asio::error::get_system_category());io_service_.post_immediate_completion(op, is_continuation);return;}}}}else{if (op_type == write_op){descriptor_data->registered_events_ |= EPOLLOUT;}epoll_event ev = { 0, { 0 } };ev.events = descriptor_data->registered_events_;ev.data.ptr = descriptor_data;epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);}}descriptor_data->op_queue_[op_type].push(op);io_service_.work_started();
}
回调处理
当listen套接字有连接到来时,会触发reactive_socket_accept_op
的完成操作,其先调用其父类reactive_socket_accept_op_base
的do_perform
方法执行非阻塞accept操作
static bool do_perform(reactor_op* base){reactive_socket_accept_op_base* o(static_cast<reactive_socket_accept_op_base*>(base));std::size_t addrlen = o->peer_endpoint_ ? o->peer_endpoint_->capacity() : 0;socket_type new_socket = invalid_socket;bool result = socket_ops::non_blocking_accept(o->socket_,o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket);// On success, assign new connection to peer socket object.if (new_socket != invalid_socket){socket_holder new_socket_holder(new_socket);if (o->peer_endpoint_)o->peer_endpoint_->resize(addrlen);if (!o->peer_.assign(o->protocol_, new_socket, o->ec_))new_socket_holder.release();}return result;}bool non_blocking_accept(socket_type s,state_type state, socket_addr_type* addr, std::size_t* addrlen,boost::system::error_code& ec, socket_type& new_socket)
{for (;;){// Accept the waiting connection.new_socket = socket_ops::accept(s, addr, addrlen, ec);// Check if operation succeeded.if (new_socket != invalid_socket)return true;// Retry operation if interrupted by signal.if (ec == boost::asio::error::interrupted)continue;// Operation failed.if (ec == boost::asio::error::would_block|| ec == boost::asio::error::try_again){if (state & user_set_non_blocking)return true;// Fall through to retry operation.}else if (ec == boost::asio::error::connection_aborted){if (state & enable_connection_aborted)return true;// Fall through to retry operation.}
#if defined(EPROTO)else if (ec.value() == EPROTO){if (state & enable_connection_aborted)return true;// Fall through to retry operation.}
#endif // defined(EPROTO)elsereturn true;return false;}
}
当non_blocking_accept
操作由于错误码为would_block或者try_again时,此时new_socket
为invalid_socket
,此时peer_
的值为
impl.socket_ = invalid_socket;
impl.state_ = 0;
针对新的套接字不会作任何操作,会对 状态以及socket合法性判断
if ((impl.state_ & socket_ops::non_blocking)|| socket_ops::set_internal_non_blocking(impl.socket_, impl.state_, true, op->ec_)){reactor_.start_op(op_type, impl.socket_,impl.reactor_data_, op, is_continuation, is_non_blocking);return;}
对于接受连接成功的,会调用basic_socket
的assign
方法,会最终是调用reactive_socket_service
的assign
将新的socket注册到reactor中
boost::system::error_code assign(implementation_type& impl,const protocol_type& protocol, const native_handle_type& native_socket,boost::system::error_code& ec){if (!do_assign(impl, protocol.type(), native_socket, ec))impl.protocol_ = protocol;return ec;}boost::system::error_code reactive_socket_service_base::do_assign(reactive_socket_service_base::base_implementation_type& impl, int type,const reactive_socket_service_base::native_handle_type& native_socket,boost::system::error_code& ec)
{if (is_open(impl)){ec = boost::asio::error::already_open;return ec;}if (int err = reactor_.register_descriptor(native_socket, impl.reactor_data_)){ec = boost::system::error_code(err,boost::asio::error::get_system_category());return ec;}impl.socket_ = native_socket;switch (type){case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;default: impl.state_ = 0; break;}impl.state_ |= socket_ops::possible_dup;ec = boost::system::error_code();return ec;
}