鄙文主要討論如何在asio上擴(kuò)展自定義的異步io操作。包括asio中經(jīng)典的Context/Service插件機(jī)制,以及我們將經(jīng)常打交道的幾個(gè)Service,還有使用asio調(diào)度器的一些注意事項(xiàng)等。

1. Context/Service機(jī)制

在使用asio開發(fā)時(shí),我們都會(huì)創(chuàng)建兩類對(duì)象:一類是exection context,另一類是IO object. 比如我們以asio的tcp socket為例:

asio::io_context ioc{};asio::ip::tcp::socket socket{ ioc };

asio::io_context是execution context的實(shí)現(xiàn)類,而asio::ip::tcp::socket則是IO object. 以上短短的兩行代碼,用戶就已經(jīng)通過IO object對(duì)象,將tcp/ip的socket功能裝載入io_context中。也就是將socket service,注冊(cè)到了io_context中。注冊(cè)的扳機(jī)是asio::detail::io_object_impl模板類。

1.1 io_object_impl

asio::ip::tcp::socket類型,拋開中間的各種實(shí)例化和繼承的關(guān)系,它本質(zhì)上是asio::detail::io_object_impl實(shí)例化類型的外觀。如下面代碼所示(代碼經(jīng)過簡(jiǎn)化處理,方便展示概念,與asio有出入)

// in namemspace asioclass socket{private:  detail::io_object_impl<    detail::reactive_socket_service<ip::tcp>,     io_context::executor  > impl_;
public:  socket(const io_context::executor& ex)    : impl_(0, ex)  {}};

可以看到socket外觀類,使用了reactive_socket_service(這里以linux平臺(tái)為例),和io_context導(dǎo)出的executor,對(duì)io_object_impl進(jìn)行實(shí)例化;而socket構(gòu)造函數(shù),將io_context關(guān)聯(lián)的executor傳遞給impl_成員。我們?cè)賮砜纯磇o_object_impl對(duì)應(yīng)的構(gòu)造函數(shù):

// in namespace asio::detailtemplate <typename IoObjectService,    typename Executor = io_context::executor_type>class io_object_impl{public:  // The type of the service that will be used to provide I/O operations.  typedef IoObjectService service_type;
 // The underlying implementation type of I/O object.  typedef typename service_type::implementation_type implementation_type;
 // The type of the executor associated with the object.  typedef Executor executor_type;
 // Construct an I/O object using an executor.  explicit io_object_impl(int, const executor_type& ex)    : service_(&asio::use_service<IoObjectService>(          io_object_impl::get_context(ex))),      executor_(ex)  {    service_->construct(implementation_);  }
private:  // The service associated with the I/O object.  service_type* service_;
 // The underlying implementation of the I/O object.  implementation_type implementation_;
 // The associated executor.  executor_type executor_;};

構(gòu)造函數(shù)非常明確的展示了io_object_impl通過Executor獲取Execution Context對(duì)象,并通過use_service函數(shù)向Execution Context中注冊(cè)Service. 對(duì)implementation_type相關(guān)的內(nèi)容,我們暫時(shí)跳過。這里著重關(guān)注use_service函數(shù),它是asio向其異步IO事件框架中,裝載具體IO功能(這里是reactor_socket_service)的關(guān)鍵入口。

1.2 Context/Service的對(duì)象模型

Context/Service大家也許很陌生,但換一個(gè)說法也許大家就很熟悉了。例如COM中使用的Component/Interface組件對(duì)象模型,還有游戲引擎中常用的Object/Component模型。如下圖所示:

use_service函數(shù)實(shí)際上就是向Execution Context中管理Service集合的類Service Registry中申請(qǐng)一個(gè)指定的Service對(duì)象。而service_registry則會(huì)保證每一種Service只有一個(gè),沒有的時(shí)候創(chuàng)建一個(gè)新的為原則,最終將有效的Service引用返回。當(dāng)然,以上操作都是線程安全的。值得一提的是,Service Registry使用了一個(gè)id來區(qū)分不同的Service,而id的生成則是通過CRTP配合std::type_info來生成的,如下代碼所示:

// Special derived service id type to keep classes header-file only.template <typename Type>class service_id  : public execution_context::id{};
// Special service base class to keep classes header-file only.template <typename Type>class execution_context_service_base  : public execution_context::service{public:  static service_id<Type> id;
 // Constructor.  execution_context_service_base(execution_context& e)    : execution_context::service(e)  {  }};
// implement your private servicetemplate <typename PortSpace>class rdma_core_service :  public execution_context_service_base<rdma_core_service<PortSpace>>{  // ....};

需要注意的是CRTP中的傳遞的實(shí)現(xiàn)類千萬(wàn)不要寫錯(cuò)了,這里是rdma_core_service<PortSpace>. 否則底層計(jì)算的ID會(huì)出錯(cuò)。如果ID是某一個(gè)已經(jīng)存在的Service的ID,會(huì)直接將那個(gè)Service返回給用戶,這將會(huì)是非常危險(xiǎn)的事情。雖然CRTP可以在正確的情況下用戶保證在運(yùn)行時(shí)保證類型安全,但是錯(cuò)誤的使用并不會(huì)有任何編譯期報(bào)錯(cuò),并將災(zāi)難拋給運(yùn)行時(shí)。

2. 如何擴(kuò)展一個(gè)有效的Service

到現(xiàn)在為止,我們應(yīng)該知道擴(kuò)展asio的關(guān)鍵是實(shí)現(xiàn)自己的Service,并實(shí)現(xiàn)自己的io_object_impl外觀類。Service負(fù)責(zé)實(shí)現(xiàn)具體的同步或者異步IO的操作,而外觀類則提供封裝好的接口,讓用戶方便地,正確地使用Service提供的能力。下面,我么展開討論一下:

2.1 導(dǎo)出implement_type

回顧一下第一節(jié)貼出的io_object_impl代碼段,其中有一個(gè)implementation_的成員。它的類型是io_object_impl通過Service中導(dǎo)出的implementation_type而來。它是IO Object真正的數(shù)據(jù)實(shí)現(xiàn),并由各個(gè)Service來定義。這樣做的好處是,可以讓每個(gè)平臺(tái)來處理自己的實(shí)現(xiàn)細(xì)節(jié)。比如Linux平臺(tái)的reactor_socket_service使用的socket類型是fd,而windows平臺(tái)的iocp_socket_service則使用的是SOCKET,implementation_type的使用,可以讓io_object_impl不用關(guān)心平臺(tái)相關(guān)的數(shù)據(jù)類型差異和實(shí)現(xiàn)細(xì)節(jié)。

例如本系列文檔討論的在asio上封裝rdma-core(https://github.com/linux-rdma/rdma-core)功能的Service

template <typename PortSpace>class rdma_core_service :   public execution_context_service_base<rdma_core_service<PortSpace>>{public:  // The implementation type of the cm id.  struct implementation_type  {    // The native cm representation.    rdma_cm_id_data*      cm_id_data_;
   // state for memory region    rdma_mr_state         mr_state_;
   // if there is a queue pair attached to this io object    bool                  has_qp_;  };};

2.2 實(shí)現(xiàn)The Rule of Five

io_object_impl會(huì)實(shí)現(xiàn)自己的The Rule of Five(https://en.cppreference.com/w/cpp/language/rule_of_three). 例如第一節(jié)io_object_impl中的構(gòu)造函數(shù),轉(zhuǎn)發(fā)給了Service的construct函數(shù)。這樣的函數(shù)照例,我們需要實(shí)現(xiàn)如下:

template <typename PortSpace>class rdma_core_service :   public execution_context_service_base<rdma_core_service<PortSpace>>{public:  // Construct a new implementation.  inline void construct(implementation_type& impl);
 // Destroy a implementation  inline void destroy(implementation_type& impl);
 // move constructor  inline void move_construct(implementation_type& impl, implementation_type& other_impl);
 // move assign  inline void move_assign(implementation_type& impl,    rdma_core_service& other_service, implementation_type& other_impl);};

io_object_impl是Noncopyable,所以只用實(shí)現(xiàn)Move Constructor和Move Assignment Operator. 至于Converter Constructor,則根據(jù)用戶自己的需求來決定。

2.3 Overwrite一些必要的函數(shù)

Service中還有幾個(gè)需要Overwrite的函數(shù),如下列代碼所示:

template <typename PortSpace>class rdma_core_service :   public execution_context_service_base<rdma_core_service<PortSpace>>{public:  /// Destructor.  ASIO_DECL virtual ~rdma_core_service()
 /// Destroy all user-defined handler objects owned by the service.  ASIO_DECL virtual void shutdown();
 /// Handle notification of a fork-related event to perform any necessary  /// housekeeping.  /**   * This function is not a pure virtual so that services only have to   * implement it if necessary. The default implementation does nothing.   */  ASIO_DECL virtual void notify_fork(execution_context::fork_event event);};

Virtual Destructor就不多說了, 因?yàn)镾ervice Registry持有的是基類指針,清理Service的時(shí)候析構(gòu)函數(shù)得是Virtual. Shutdown則是給有狀態(tài)的Service在整個(gè)asio發(fā)起Shutdown的時(shí)候清理和校驗(yàn)自己擴(kuò)展點(diǎn)。notify_fork只適用于Linux平臺(tái),用來響應(yīng)Fork系統(tǒng)調(diào)用的事件。

2.4 實(shí)現(xiàn)IO Object外觀類

如第一節(jié)代碼所示,用戶并不直接調(diào)用Service的接口來適用asio的功能,而是通過IO Object. 因此,我們還要實(shí)現(xiàn)對(duì)應(yīng)的IO Object外觀類。以普遍理性而論,Service與IO Object外觀是一對(duì)多的關(guān)系,例如在linux平臺(tái)上Socket和Socket Acceptor都是對(duì)應(yīng)于reactive_socket_service. 反過來,因?yàn)镮O Object會(huì)使用一個(gè)確定的Service導(dǎo)出的implementation_type,IO object只能對(duì)應(yīng)一個(gè)確定的Service. 例如一個(gè)rdma_connetion的實(shí)現(xiàn):

// IO object for rdma, provides rdma functionalitytemplate <typename PortSpace, typename Executor = any_io_executor>class rdma_connection{public:  // ....#if ASIO_HAS_IOCP  using service_type = detail::rdma_iocp_connector_service<PortSpace>;#else  using service_type = detail::rdma_core_service<PortSpace>;#endif
private:  detail::io_object_impl<service_type> impl_;
public: // implement interface  void open(port_space_type const& port_space);
 bool is_open() const{    return impl_.get_service().is_open(impl_.get_implementation());  }  // .....};

通過外觀類調(diào)用Service的接口,可以參考is_open成員方法的實(shí)現(xiàn)。這里為了方便說明,簡(jiǎn)單起見只用了一個(gè)同步的,實(shí)現(xiàn)簡(jiǎn)單的接口作為展示。

至此,我們就可以在asio上擴(kuò)展我們的“插件”了。可以發(fā)現(xiàn)我們并沒有侵入式的修改asio的任何代碼,這都得益于Context/Service對(duì)象模型強(qiáng)大的可擴(kuò)展性。

3. 如何擴(kuò)展異步操作

asio是異步IO框架,如果我們需要支持自定義的異步IO操作,還需要做一些額外的事工作。在工作開始之前,我們需要了解一些asio底層調(diào)度器的細(xì)節(jié)。

3.1 asio中的Scheduler

asio的實(shí)現(xiàn)中沒有一個(gè)具體的統(tǒng)一的Scheduler實(shí)現(xiàn),原因是Linux平臺(tái)上使用了Reactor模式,而Windows平臺(tái)則是基于IOCP的Proactor模式。兩個(gè)模式有差異,但asio的選擇是在Linux平臺(tái)上實(shí)現(xiàn)一個(gè)scheduler類,并與Reactor一同工作,提供與Proactor模式相同的接口。最后,asio對(duì)Service層提供了一套統(tǒng)一的Scheduler機(jī)制,如下圖:

由上圖我們可以看到,Reactor中額外實(shí)現(xiàn)了一個(gè)scheduler類,它內(nèi)部管理了一個(gè)Completion Queue,來模擬與IOCP中等同的功能,并允許用戶直接向該隊(duì)列Post一個(gè)回調(diào)。

統(tǒng)一的Scheduler機(jī)制,也就是各個(gè)平臺(tái)實(shí)現(xiàn)的context_impl類,并不能抹平Reactor與Proactor所有的差異。對(duì)于Proactor而言,IO函數(shù)的調(diào)用就直接發(fā)起了異步IO操作(Windows平臺(tái)中WSA系列的函數(shù))。而Reactor依舊是同步IO,因此Reactor額外提供了start_op接口。該接口將根據(jù)IO的類型,將IO事件注冊(cè)給Reactor,并同步地在Scheduler收到IO事件后,選擇適當(dāng)?shù)臅r(shí)機(jī)執(zhí)行IO操作。

調(diào)度的過程和細(xì)節(jié),asio封裝的Reactor與Proactor也有所不同?;贗OCP的Completion Queue中,只有IO完成事件回調(diào)和用戶主動(dòng)Post入隊(duì)的回調(diào);而基于Reactor的Completion Queue中,除了這兩個(gè)回調(diào)以外,還有從Reactor中通知的同步IO事件。

對(duì)asio中Reactor與Proactor的機(jī)制有所了解之后,我們便可以繼續(xù)在不同平臺(tái)上適配異步操作的工作了。

3.2 擴(kuò)展Reactor模式的異步操作

首先是Reactor模式下的異步操作封裝。我們知道,在Reactor模式下,asio有同步調(diào)用IO函數(shù)的機(jī)制。對(duì)asio源碼稍作閱讀后,不難發(fā)現(xiàn)這些秘密都藏在asio::detail::reactor_op類中。我么只要繼承該類,并覆蓋兩個(gè)關(guān)鍵的函數(shù),一個(gè)是同步執(zhí)行IO操作的函數(shù),另一個(gè)是IO完成的函數(shù),就能夠?qū)崿F(xiàn)自定義的異步操作。當(dāng)然,在Reactor模式下的異步操作,是模擬的異步操作。

// reactor_op implemetionclass reactor_op : public operation{ /******/ };
class your_reactor_async_op : public reactor_op{public:  your_reactor_async_op(asio::error_code const& success_ec, ...)    : reactor_op(success_ec, do_perform, do_complete)  //, other initialization  {}
 static status do_perform(reactor_op* base){    auto* this_op = static_cast<your_reactor_async_op*>(base);    // TODO ... call your sync io  }
 static void do_complete(void* owner, operation* base,      const asio::error_code& /*ec*/,      std::size_t /*bytes_transferred*/){    auto* this_op = static_cast<your_reactor_async_op*>(base);    // TODO ... do your io completion logic  }};

reactor_op類是繼承自operation類,該類也是Scheduler中Completion Queue鏈表的節(jié)點(diǎn)類,因此我們可以把要模擬的某個(gè)異步IO操作的perform和complete業(yè)務(wù)都實(shí)現(xiàn)在一起。

3.3 擴(kuò)展IOCP Proactor模式的異步操作

擴(kuò)展IOCP Proactor的操作就比Reactor模式下簡(jiǎn)單許多。operation類繼承自O(shè)VERLAPPED結(jié)構(gòu)體,可以直接把op操作傳遞給異步IO函數(shù)的LPOVERLAPPED指針參數(shù)。因此我們只需要考慮IO complete回調(diào)即可。

class your_iocp_async_op : public operation{public:  your_iocp_async_op(...)    : operation(do_complete)  //, other initialization  {}
 static void do_complete(void* owner, operation* base,      const asio::error_code& /*ec*/,      std::size_t /*bytes_transferred*/){    auto* this_op = static_cast<your_iocp_async_op*>(base);    // TODO ... do your io completion logic  }};

3.4 獨(dú)立于Scheduler之外的Completion Queue

有些IO庫(kù)或者標(biāo)準(zhǔn)中的IO完成事件,并不能統(tǒng)一到asio的標(biāo)準(zhǔn)統(tǒng)一Scheduler中。例如rdma中的Completion Queue(CQ). 這種設(shè)計(jì)也是合理的,畢竟使用統(tǒng)一的調(diào)度,并不能滿足對(duì)低延遲極度敏感的場(chǎng)景。

適配類似rdma中的CQ,一個(gè)簡(jiǎn)單可行的做法,可以參考在asio上嘗試RDMA中第二節(jié)的內(nèi)容。除此之外,既然rdma提供了CQ的概念和Poll CQ的接口,說明用戶一定存在自己主動(dòng)使用它們的場(chǎng)景。于是筆者嘗試提供了CQ對(duì)象,讓用戶可以在任意線程主動(dòng)Poll CQ.

asio::io_context ioc{};asio::rdma::completion_queue cq{ ioc };
std::thread thread {  [&] { cq.run(); }}

該場(chǎng)景下,io_context中的統(tǒng)一調(diào)度CQ的Service將會(huì)停止工作,并把Poll CQ的權(quán)限交給用戶。

3.5 io_object_impl通過initializer轉(zhuǎn)發(fā)給Service

有了適配各個(gè)平臺(tái)的異步操作定義之后,剩下的工作就是為io_object_impl與Service實(shí)現(xiàn)異步接口了。io_object_impl外觀類的實(shí)現(xiàn)盡量統(tǒng)一,但是每個(gè)平臺(tái)的Operation類型實(shí)現(xiàn)大概率不一樣,asio在外觀類層通過Initializer的機(jī)制,把實(shí)現(xiàn)上的差異轉(zhuǎn)發(fā)給了各個(gè)平臺(tái)的Service,我們以rdma_connect的一個(gè)實(shí)現(xiàn)為例:

// IO object for rdma, provides rdma functionalitytemplate <typename PortSpace, typename Executor = any_io_executor>class rdma_connection{public:  // .......  class initiate_async_connect;
 // async connect  template <    ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))      ConnectToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>  ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,      void (asio::error_code))  async_connect(endpoint_type const& endpoint,        ASIO_MOVE_ARG(ConnectToken) token        ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))    ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((      async_initiate<ConnectToken, void (asio::error_code)>(          declval<initiate_async_resolve_route>(), token,          declval<asio::error_code&>())))  {    asio::error_code open_ec{};    if(!is_open())    {      open_ec = asio::error::try_again;    }
#if ASIO_HAS_IOCP    if(!open_ec)    {      impl_.get_service().allocate_qp(        impl_.get_implementation(), open_ec);    }#endif
   return async_initiate<ConnectToken, void(asio::error_code)>(      initiate_async_connect(this), token, endpoint, open_ec);  }
 // initializer for async_connect_op  class initiate_async_connect  {  public:    using executor_type = Executor;
   explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT      : self_(self) {}
   template <typename Handler>    void operator() (ASIO_MOVE_ARG(Handler) handler,      endpoint_type const& endpoint,      asio::error_code const& open_ec) const    {      ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;
     if(open_ec)      {        asio::post(self_->impl_.get_executor(),          asio::detail::bind_handler(            ASIO_MOVE_CAST(Handler)(handler), open_ec));      }      else      {        detail::non_const_lvalue<Handler> handler2(handler);        self_->impl_.get_service().async_connect(          self_->impl_.get_implementation(), endpoint,          handler2.value, self_->impl_.get_executor());      }    }
 private:    rdma_connection* self_;  };};

Initializer還擔(dān)任了很多編譯期檢查的工作,比如Handler的類型是否滿足,Buffer是否滿足ConstBufferSequence的約束等。在跟祁宇的一次討論中,他還提到了Initializer的設(shè)計(jì)還可以讓用戶去特化async_result來實(shí)現(xiàn)自定義異步操作,是一個(gè)很強(qiáng)大的用戶擴(kuò)展點(diǎn)。

3.6 Service通過Scheduler發(fā)起IO操作

io_object_impl通過Initializer轉(zhuǎn)發(fā)給Service之后,剩下的事情就很簡(jiǎn)單了。Service的異步操作接口主要負(fù)責(zé)創(chuàng)建對(duì)應(yīng)的異步操作對(duì)象,初始化操作對(duì)象,并通知Scheduler發(fā)起IO操作。

以rdma在Linux和Windows平臺(tái)上的實(shí)現(xiàn)為例:

// rdma-core for linuxtemplate <typename PortSpace>template <typename Handler, typename IoExecutor>void rdma_core_service<PortSpace>::async_connect(implementation_type& impl,  endpoint_type const& endpoint,  Handler& handler, const IoExecutor& io_ex){  bool const is_continuation =      asio_handler_cont_helpers::is_continuation(handler);
 // TODO ... cancellation
 using op = rdma_cm_connect_op<PortSpace, Handler, IoExecutor>;  typename op::ptr p = { asio::detail::addressof(handler),      op::ptr::allocate(handler), 0 };  p.p = new (p.v) op(success_ec_, implNaN_id_data_->cm_id_, get_context(), handler, io_ex);
 cm_event_service_.start_connect_op(implNaN_id_data_, p.p, endpoint.data(), is_continuation);  p.v = p.p = 0;}
// iocp for windowstemplate <typename PortSpace>template <typename Handler, typename IoExecutor>void rdma_iocp_connector_service<PortSpace>::async_connect(  implementation_type& impl, endpoint_type const& endpoint,  Handler& handler, const IoExecutor& io_ex){  // TODO ... cancellation  using op = rdma_nd_connect_op<Handler, IoExecutor>;  typename op::ptr p = { asio::detail::addressof(handler),      op::ptr::allocate(handler), 0 };
 p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);
 start_connect_op(impl, endpoint, p.p);  p.v = p.p = 0;}

需要注意的是,Reactor與Proactor模式的差異在這里就顯現(xiàn)出來了。Reactor在這里發(fā)起的是一個(gè)reactor_op操作, 注冊(cè)給Connection Mananger也就是這里的cm_event_service_對(duì)象,真正的connect函數(shù)調(diào)用是在Connection Mananger的事件Channel對(duì)應(yīng)的fd有POLLIN消息的時(shí)候,才會(huì)調(diào)用。而對(duì)于IOCP Proactor而言,start_connect_op會(huì)直接非阻塞地調(diào)用Connector的Connect函數(shù),并在Connection創(chuàng)建成功后,通過IOCP的Completion Queue通知用戶。

4. 小結(jié)

本文簡(jiǎn)要地討論了如何在asio中擴(kuò)展自己的異步IO操作,簡(jiǎn)單分析了Context/Service對(duì)象模型,簡(jiǎn)述如何實(shí)現(xiàn)自己的IO Object外觀類以及對(duì)應(yīng)的Service類。最后討論了如何在Reactor和Proactor上擴(kuò)展自定義的異步操作。