0%

Watermark hack for libuv

最近碰到一個困難的問題,好不容易找到解答,因此紀錄起來。
當一個Socket讀寫速度不對稱的時候,該怎麼處理。
假設Socket讀取的資料室100M/S,而寫入的速度是10M/S。
如果是Blocking I/O,OS會自動幫你處理這種狀況,難怪上網找範例程式碼都沒特別處理。
由於libuv/libevent等都是Non-blocking I/O framework,,因此無法得到有用的資訊。
翻到Linux多線程服務端編程:使用muduo C++網絡庫裡面有個地方觸動了我的靈感。利用Watermark來管理讀寫動作。
後來找到Libevent的BuffereventsPython asys IO都有類似Watermark的觀念,我想這應該是可行解。

以讀取比寫入快為例

  • 當寫入的資料量大於Hih water mark時,暫停讀取動作
  • 當剩餘的寫入資料量小於Low water mark時,重新開始讀取
    反之亦然。

至於要怎麼改,以下是我的解法

  1. 先增加兩個Callback function type

    1
    2
    typedef void(*uv_write_hw_cb)(uv_stream_t* stream);
    typedef void(*uv_write_lw_cb)(uv_stream_t* stream);
  2. 修改Marco UV_STREAM_FIELDS新增四個欄位

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #define UV_STREAM_FIELDS                                                      \
    /* number of bytes queued for writing */ \
    size_t write_queue_size; \
    size_t write_highwater; \
    size_t write_lowwater; \
    uv_write_hw_cb water_highcb; \
    uv_write_lw_cb water_lowcb; \
    uv_alloc_cb alloc_cb; \
    uv_read_cb read_cb; \
    /* private */ \
    UV_STREAM_PRIVATE_FIELDS

    紀錄高水位跟滴水位的資訊和Callback function。

  3. 修改 uv_stream_init,,將新增的欄位初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    INLINE static void uv_stream_init(uv_loop_t* loop,
    uv_stream_t* handle,
    uv_handle_type type) {
    uv__handle_init(loop, (uv_handle_t*) handle, type);
    handle->write_queue_size = 0;
    handle->activecnt = 0;
    handle->write_highwater = 0;
    handle->write_lowwater = 0;
    handle->water_highcb = NULL;
    handle->water_lowcb = NULL;
    }
  4. 新增一組API,讓外界可以控制高低水位和控制函數。 (略)

  5. 修改紀錄寫入的點,當資料量超過某個臨界點的時候執行相對硬的動作。例如TCP

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    int uv_tcp_write(uv_loop_t* loop,
    uv_write_t* req,
    uv_tcp_t* handle,
    const uv_buf_t bufs[],
    unsigned int nbufs,
    uv_write_cb cb) {
    // 略
    handle->write_queue_size += req->queued_bytes;
    // 這邊是我們新增的部份
    if (handle->water_highcb && handle->write_highwater <= handle->write_queue_size)
    handle->water_highcb(handle);
    }

    還有

    1
    2
    3
    4
    5
    6
    7
    8
    void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
    uv_write_t* req) {
    // 略
    handle->write_queue_size -= req->queued_bytes;
    // 以下是新增的點
    if (handle->water_lowcb && handle->write_lowwater >= handle->write_queue_size)
    handle->water_lowcb(handle);
    }

雖然不確定這部是最佳解,不過目前看起來是可行解。