鄙文主要討論如何在asio上擴展自定義的異步io操作。包括asio中經(jīng)典的Context/Service插件機制,以及我們將經(jīng)常打交道的幾個Service,還有使用asio調(diào)度器的一些注意事項等。
在使用asio開發(fā)時,我們都會創(chuàng)建兩類對象:一類是exection context,另一類是IO object. 比如我們以asio的tcp socket為例:
asio::io_context ioc{};asio::ip::tcp::socket socket{ ioc };
asio::io_context是execution context的實現(xiàn)類,而asio::ip::tcp::socket則是IO object. 以上短短的兩行代碼,用戶就已經(jīng)通過IO object對象,將tcp/ip的socket功能裝載入io_context中。也就是將socket service,注冊到了io_context中。注冊的扳機是asio::detail::io_object_impl模板類。
asio::ip::tcp::socket類型,拋開中間的各種實例化和繼承的關(guān)系,它本質(zhì)上是asio::detail::io_object_impl實例化類型的外觀。如下面代碼所示(代碼經(jīng)過簡化處理,方便展示概念,與asio有出入)
// in namemspace asioclass socket{private:detail::io_object_impl<detail::reactive_socket_service<ip::tcp>,io_context::executor> impl_;public:socket(const io_context::executor& ex): impl_(0, ex){}};
可以看到socket外觀類,使用了reactive_socket_service(這里以linux平臺為例),和io_context導(dǎo)出的executor,對io_object_impl進行實例化;而socket構(gòu)造函數(shù),將io_context關(guān)聯(lián)的executor傳遞給impl_成員。我們再來看看io_object_impl對應(yīng)的構(gòu)造函數(shù):
// in namespace asio::detailtemplate <typename IoObjectService,typename Executor = io_context::executor_type>class io_object_impl{public:// The type of the service that will be used to provide I/O operations.typedef IoObjectService service_type;// The underlying implementation type of I/O object.typedef typename service_type::implementation_type implementation_type;// The type of the executor associated with the object.typedef Executor executor_type;// Construct an I/O object using an executor.explicit io_object_impl(int, const executor_type& ex): service_(&asio::use_service<IoObjectService>(io_object_impl::get_context(ex))),executor_(ex){service_->construct(implementation_);}private:// The service associated with the I/O object.service_type* service_;// The underlying implementation of the I/O object.implementation_type implementation_;// The associated executor.executor_type executor_;};
構(gòu)造函數(shù)非常明確的展示了io_object_impl通過Executor獲取Execution Context對象,并通過use_service函數(shù)向Execution Context中注冊Service. 對implementation_type相關(guān)的內(nèi)容,我們暫時跳過。這里著重關(guān)注use_service函數(shù),它是asio向其異步IO事件框架中,裝載具體IO功能(這里是reactor_socket_service)的關(guān)鍵入口。
Context/Service大家也許很陌生,但換一個說法也許大家就很熟悉了。例如COM中使用的Component/Interface組件對象模型,還有游戲引擎中常用的Object/Component模型。如下圖所示:

use_service函數(shù)實際上就是向Execution Context中管理Service集合的類Service Registry中申請一個指定的Service對象。而service_registry則會保證每一種Service只有一個,沒有的時候創(chuàng)建一個新的為原則,最終將有效的Service引用返回。當(dāng)然,以上操作都是線程安全的。值得一提的是,Service Registry使用了一個id來區(qū)分不同的Service,而id的生成則是通過CRTP配合std::type_info來生成的,如下代碼所示:
// Special derived service id type to keep classes header-file only.template <typename Type>class service_id: public execution_context::id{};// Special service base class to keep classes header-file only.template <typename Type>class execution_context_service_base: public execution_context::service{public:static service_id<Type> id;// Constructor.execution_context_service_base(execution_context& e): execution_context::service(e){}};// implement your private servicetemplate <typename PortSpace>class rdma_core_service :public execution_context_service_base<rdma_core_service<PortSpace>>{// ....};
需要注意的是CRTP中的傳遞的實現(xiàn)類千萬不要寫錯了,這里是rdma_core_service<PortSpace>. 否則底層計算的ID會出錯。如果ID是某一個已經(jīng)存在的Service的ID,會直接將那個Service返回給用戶,這將會是非常危險的事情。雖然CRTP可以在正確的情況下用戶保證在運行時保證類型安全,但是錯誤的使用并不會有任何編譯期報錯,并將災(zāi)難拋給運行時。
到現(xiàn)在為止,我們應(yīng)該知道擴展asio的關(guān)鍵是實現(xiàn)自己的Service,并實現(xiàn)自己的io_object_impl外觀類。Service負責(zé)實現(xiàn)具體的同步或者異步IO的操作,而外觀類則提供封裝好的接口,讓用戶方便地,正確地使用Service提供的能力。下面,我么展開討論一下:
回顧一下第一節(jié)貼出的io_object_impl代碼段,其中有一個implementation_的成員。它的類型是io_object_impl通過Service中導(dǎo)出的implementation_type而來。它是IO Object真正的數(shù)據(jù)實現(xiàn),并由各個Service來定義。這樣做的好處是,可以讓每個平臺來處理自己的實現(xiàn)細節(jié)。比如Linux平臺的reactor_socket_service使用的socket類型是fd,而windows平臺的iocp_socket_service則使用的是SOCKET,implementation_type的使用,可以讓io_object_impl不用關(guān)心平臺相關(guān)的數(shù)據(jù)類型差異和實現(xiàn)細節(jié)。
例如本系列文檔討論的在asio上封裝rdma-core(https://github.com/linux-rdma/rdma-core)功能的Service
template <typename PortSpace>class rdma_core_service :public execution_context_service_base<rdma_core_service<PortSpace>>{public:// The implementation type of the cm id.struct implementation_type{// The native cm representation.rdma_cm_id_data* cm_id_data_;// state for memory regionrdma_mr_state mr_state_;// if there is a queue pair attached to this io objectbool has_qp_;};};
io_object_impl會實現(xiàn)自己的The Rule of Five(https://en.cppreference.com/w/cpp/language/rule_of_three). 例如第一節(jié)io_object_impl中的構(gòu)造函數(shù),轉(zhuǎn)發(fā)給了Service的construct函數(shù)。這樣的函數(shù)照例,我們需要實現(xiàn)如下:
template <typename PortSpace>class rdma_core_service :public execution_context_service_base<rdma_core_service<PortSpace>>{public:// Construct a new implementation.inline void construct(implementation_type& impl);// Destroy a implementationinline void destroy(implementation_type& impl);// move constructorinline void move_construct(implementation_type& impl, implementation_type& other_impl);// move assigninline void move_assign(implementation_type& impl,rdma_core_service& other_service, implementation_type& other_impl);};
io_object_impl是Noncopyable,所以只用實現(xiàn)Move Constructor和Move Assignment Operator. 至于Converter Constructor,則根據(jù)用戶自己的需求來決定。
Service中還有幾個需要Overwrite的函數(shù),如下列代碼所示:
template <typename PortSpace>class rdma_core_service :public execution_context_service_base<rdma_core_service<PortSpace>>{public:/// Destructor.ASIO_DECL virtual ~rdma_core_service()/// Destroy all user-defined handler objects owned by the service.ASIO_DECL virtual void shutdown();/// Handle notification of a fork-related event to perform any necessary/// housekeeping./*** This function is not a pure virtual so that services only have to* implement it if necessary. The default implementation does nothing.*/ASIO_DECL virtual void notify_fork(execution_context::fork_event event);};
Virtual Destructor就不多說了, 因為Service Registry持有的是基類指針,清理Service的時候析構(gòu)函數(shù)得是Virtual. Shutdown則是給有狀態(tài)的Service在整個asio發(fā)起Shutdown的時候清理和校驗自己擴展點。notify_fork只適用于Linux平臺,用來響應(yīng)Fork系統(tǒng)調(diào)用的事件。
如第一節(jié)代碼所示,用戶并不直接調(diào)用Service的接口來適用asio的功能,而是通過IO Object. 因此,我們還要實現(xiàn)對應(yīng)的IO Object外觀類。以普遍理性而論,Service與IO Object外觀是一對多的關(guān)系,例如在linux平臺上Socket和Socket Acceptor都是對應(yīng)于reactive_socket_service. 反過來,因為IO Object會使用一個確定的Service導(dǎo)出的implementation_type,IO object只能對應(yīng)一個確定的Service. 例如一個rdma_connetion的實現(xiàn):
// IO object for rdma, provides rdma functionalitytemplate <typename PortSpace, typename Executor = any_io_executor>class rdma_connection{public:// ....#if ASIO_HAS_IOCPusing service_type = detail::rdma_iocp_connector_service<PortSpace>;#elseusing service_type = detail::rdma_core_service<PortSpace>;#endifprivate:detail::io_object_impl<service_type> impl_;public: // implement interfacevoid open(port_space_type const& port_space);bool is_open() const{return impl_.get_service().is_open(impl_.get_implementation());}// .....};
通過外觀類調(diào)用Service的接口,可以參考is_open成員方法的實現(xiàn)。這里為了方便說明,簡單起見只用了一個同步的,實現(xiàn)簡單的接口作為展示。
至此,我們就可以在asio上擴展我們的“插件”了??梢园l(fā)現(xiàn)我們并沒有侵入式的修改asio的任何代碼,這都得益于Context/Service對象模型強大的可擴展性。
asio是異步IO框架,如果我們需要支持自定義的異步IO操作,還需要做一些額外的事工作。在工作開始之前,我們需要了解一些asio底層調(diào)度器的細節(jié)。
asio的實現(xiàn)中沒有一個具體的統(tǒng)一的Scheduler實現(xiàn),原因是Linux平臺上使用了Reactor模式,而Windows平臺則是基于IOCP的Proactor模式。兩個模式有差異,但asio的選擇是在Linux平臺上實現(xiàn)一個scheduler類,并與Reactor一同工作,提供與Proactor模式相同的接口。最后,asio對Service層提供了一套統(tǒng)一的Scheduler機制,如下圖:

由上圖我們可以看到,Reactor中額外實現(xiàn)了一個scheduler類,它內(nèi)部管理了一個Completion Queue,來模擬與IOCP中等同的功能,并允許用戶直接向該隊列Post一個回調(diào)。
統(tǒng)一的Scheduler機制,也就是各個平臺實現(xiàn)的context_impl類,并不能抹平Reactor與Proactor所有的差異。對于Proactor而言,IO函數(shù)的調(diào)用就直接發(fā)起了異步IO操作(Windows平臺中WSA系列的函數(shù))。而Reactor依舊是同步IO,因此Reactor額外提供了start_op接口。該接口將根據(jù)IO的類型,將IO事件注冊給Reactor,并同步地在Scheduler收到IO事件后,選擇適當(dāng)?shù)臅r機執(zhí)行IO操作。
調(diào)度的過程和細節(jié),asio封裝的Reactor與Proactor也有所不同?;贗OCP的Completion Queue中,只有IO完成事件回調(diào)和用戶主動Post入隊的回調(diào);而基于Reactor的Completion Queue中,除了這兩個回調(diào)以外,還有從Reactor中通知的同步IO事件。
對asio中Reactor與Proactor的機制有所了解之后,我們便可以繼續(xù)在不同平臺上適配異步操作的工作了。
首先是Reactor模式下的異步操作封裝。我們知道,在Reactor模式下,asio有同步調(diào)用IO函數(shù)的機制。對asio源碼稍作閱讀后,不難發(fā)現(xiàn)這些秘密都藏在asio::detail::reactor_op類中。我么只要繼承該類,并覆蓋兩個關(guān)鍵的函數(shù),一個是同步執(zhí)行IO操作的函數(shù),另一個是IO完成的函數(shù),就能夠?qū)崿F(xiàn)自定義的異步操作。當(dāng)然,在Reactor模式下的異步操作,是模擬的異步操作。
// reactor_op implemetionclass reactor_op : public operation{ /******/ };class your_reactor_async_op : public reactor_op{public:your_reactor_async_op(asio::error_code const& success_ec, ...): reactor_op(success_ec, do_perform, do_complete)//, other initialization{}static status do_perform(reactor_op* base){auto* this_op = static_cast<your_reactor_async_op*>(base);// TODO ... call your sync io}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*bytes_transferred*/){auto* this_op = static_cast<your_reactor_async_op*>(base);// TODO ... do your io completion logic}};
reactor_op類是繼承自operation類,該類也是Scheduler中Completion Queue鏈表的節(jié)點類,因此我們可以把要模擬的某個異步IO操作的perform和complete業(yè)務(wù)都實現(xiàn)在一起。
擴展IOCP Proactor的操作就比Reactor模式下簡單許多。operation類繼承自O(shè)VERLAPPED結(jié)構(gòu)體,可以直接把op操作傳遞給異步IO函數(shù)的LPOVERLAPPED指針參數(shù)。因此我們只需要考慮IO complete回調(diào)即可。
class your_iocp_async_op : public operation{public:your_iocp_async_op(...): operation(do_complete)//, other initialization{}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*bytes_transferred*/){auto* this_op = static_cast<your_iocp_async_op*>(base);// TODO ... do your io completion logic}};
有些IO庫或者標準中的IO完成事件,并不能統(tǒng)一到asio的標準統(tǒng)一Scheduler中。例如rdma中的Completion Queue(CQ). 這種設(shè)計也是合理的,畢竟使用統(tǒng)一的調(diào)度,并不能滿足對低延遲極度敏感的場景。
適配類似rdma中的CQ,一個簡單可行的做法,可以參考在asio上嘗試RDMA中第二節(jié)的內(nèi)容。除此之外,既然rdma提供了CQ的概念和Poll CQ的接口,說明用戶一定存在自己主動使用它們的場景。于是筆者嘗試提供了CQ對象,讓用戶可以在任意線程主動Poll CQ.
asio::io_context ioc{};asio::rdma::completion_queue cq{ ioc };std::thread thread {[&] { cq.run(); }}
該場景下,io_context中的統(tǒng)一調(diào)度CQ的Service將會停止工作,并把Poll CQ的權(quán)限交給用戶。
有了適配各個平臺的異步操作定義之后,剩下的工作就是為io_object_impl與Service實現(xiàn)異步接口了。io_object_impl外觀類的實現(xiàn)盡量統(tǒng)一,但是每個平臺的Operation類型實現(xiàn)大概率不一樣,asio在外觀類層通過Initializer的機制,把實現(xiàn)上的差異轉(zhuǎn)發(fā)給了各個平臺的Service,我們以rdma_connect的一個實現(xiàn)為例:
// IO object for rdma, provides rdma functionalitytemplate <typename PortSpace, typename Executor = any_io_executor>class rdma_connection{public:// .......class initiate_async_connect;// async connecttemplate <ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))ConnectToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,void (asio::error_code))async_connect(endpoint_type const& endpoint,ASIO_MOVE_ARG(ConnectToken) tokenASIO_DEFAULT_COMPLETION_TOKEN(executor_type))ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate<ConnectToken, void (asio::error_code)>(declval<initiate_async_resolve_route>(), token,declval<asio::error_code&>()))){asio::error_code open_ec{};if(!is_open()){open_ec = asio::error::try_again;}#if ASIO_HAS_IOCPif(!open_ec){impl_.get_service().allocate_qp(impl_.get_implementation(), open_ec);}#endifreturn async_initiate<ConnectToken, void(asio::error_code)>(initiate_async_connect(this), token, endpoint, open_ec);}// initializer for async_connect_opclass initiate_async_connect{public:using executor_type = Executor;explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT: self_(self) {}template <typename Handler>void operator() (ASIO_MOVE_ARG(Handler) handler,endpoint_type const& endpoint,asio::error_code const& open_ec) const{ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;if(open_ec){asio::post(self_->impl_.get_executor(),asio::detail::bind_handler(ASIO_MOVE_CAST(Handler)(handler), open_ec));}else{detail::non_const_lvalue<Handler> handler2(handler);self_->impl_.get_service().async_connect(self_->impl_.get_implementation(), endpoint,handler2.value, self_->impl_.get_executor());}}private:rdma_connection* self_;};};
Initializer還擔(dān)任了很多編譯期檢查的工作,比如Handler的類型是否滿足,Buffer是否滿足ConstBufferSequence的約束等。在跟祁宇的一次討論中,他還提到了Initializer的設(shè)計還可以讓用戶去特化async_result來實現(xiàn)自定義異步操作,是一個很強大的用戶擴展點。
io_object_impl通過Initializer轉(zhuǎn)發(fā)給Service之后,剩下的事情就很簡單了。Service的異步操作接口主要負責(zé)創(chuàng)建對應(yīng)的異步操作對象,初始化操作對象,并通知Scheduler發(fā)起IO操作。
以rdma在Linux和Windows平臺上的實現(xiàn)為例:
// rdma-core for linuxtemplate <typename PortSpace>template <typename Handler, typename IoExecutor>void rdma_core_service<PortSpace>::async_connect(implementation_type& impl,endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex){bool const is_continuation =asio_handler_cont_helpers::is_continuation(handler);// TODO ... cancellationusing op = rdma_cm_connect_op<PortSpace, Handler, IoExecutor>;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(success_ec_, implNaN_id_data_->cm_id_, get_context(), handler, io_ex);cm_event_service_.start_connect_op(implNaN_id_data_, p.p, endpoint.data(), is_continuation);p.v = p.p = 0;}// iocp for windowstemplate <typename PortSpace>template <typename Handler, typename IoExecutor>void rdma_iocp_connector_service<PortSpace>::async_connect(implementation_type& impl, endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex){// TODO ... cancellationusing op = rdma_nd_connect_op<Handler, IoExecutor>;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);start_connect_op(impl, endpoint, p.p);p.v = p.p = 0;}
需要注意的是,Reactor與Proactor模式的差異在這里就顯現(xiàn)出來了。Reactor在這里發(fā)起的是一個reactor_op操作, 注冊給Connection Mananger也就是這里的cm_event_service_對象,真正的connect函數(shù)調(diào)用是在Connection Mananger的事件Channel對應(yīng)的fd有POLLIN消息的時候,才會調(diào)用。而對于IOCP Proactor而言,start_connect_op會直接非阻塞地調(diào)用Connector的Connect函數(shù),并在Connection創(chuàng)建成功后,通過IOCP的Completion Queue通知用戶。
本文簡要地討論了如何在asio中擴展自己的異步IO操作,簡單分析了Context/Service對象模型,簡述如何實現(xiàn)自己的IO Object外觀類以及對應(yīng)的Service類。最后討論了如何在Reactor和Proactor上擴展自定義的異步操作。