Tokio,Rust異步編程實踐之路


轉自:https://www.cnblogs.com/hymenz/p/9334297.html

緣起

在許多編程語言里,我們都非常樂於去研究在這個語言中所使用的異步網絡編程的框架,比如說Python的 Gevent、asyncio,Nginx 和 OpenResty,Go 等,今年年初我開始接觸 Rust,並被其無 GC、內存安全、極小的運行時等特性所吸引,經過一段時間的學習,開始尋找構建實際項目的解決方案,很快 mio、tokio 等框架進入了我的視野,於是開始從更加底層的 mio 出發實驗。

https://github.com/Hevienz/mio_test/blob/master/src/main.rs

可以看到 mio 是一個非常底層的異步編程的框架,這意味着如果我們要在實際的項目開發中使用它時,就不得不從 event loop 開始編寫我們的軟件,這並不是我們所期望的,於是我們需要一個更高層次抽象的框架,這便是本文要為大家講述的 tokio。

tokio

tokio 是 Rust 中的異步編程框架,它將復雜的異步編程抽象為 Futures、Tasks 和 Executor,並提供了 Timers 等基礎設施,下文中我們將一一展開。

運行時模型

tokio 是一個基於輪訓的模型。比如我們要在 tokio 上調度我們的 task,我們需要為其實現 Future trait。比如下面的例子中,我們想要得到一個 widget,但它有可能還沒有准備好,這時候我們調用 poll 的結果就是 Ok(Async::NotReady),Executor 會負責重復的調用 poll,直到 widget 准備好,返回Ok(Async::Ready(()))

/// A task that polls a single widget and writes it to STDOUT.
pub struct MyTask;

impl Future for MyTask {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<()>, ()> {
        match poll_widget() {
            Async::Ready(widget) => {
                println!("widget={:?}", widget);
                Ok(Async::Ready(()))
            }
            Async::NotReady => {
                return Ok(Async::NotReady);
            }
        }
    }
}

在最簡單的情況下,Executor 可能會長這樣。(注:這不是真實的實現,只是用來說明概念)

pub struct SpinExecutor {
    tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
}

impl SpinExecutor {
    pub fn spawn<T>(&mut self, task: T)
    where T: Future<Item = (), Error = ()> + 'static
    {
        self.tasks.push_back(Box::new(task));
    }

    pub fn run(&mut self) {
        while let Some(mut task) = self.tasks.pop_front() {
            match task.poll().unwrap() {
                Async::Ready(_) => {}
                Async::NotReady => {
                    self.tasks.push_back(task);
                }
            }
        }
    }
}

Executor 頻繁地輪詢所有 task,即使某些 task 仍然會以 NotReady 返回。
理想情況下,Executor 應該可以通過某種方式知道哪些 task 恰好轉變為 “就緒” 狀態。這正是 futures 任務模型的核心。

Futures

future 是對一個未來事件的抽象。比如你可以將各種事件抽象為 future:

  • 在線程池中執行的數據庫查詢。當數據庫查詢完成時,future 完成,其值是查詢的結果。
  • 對服務器的 RPC 調用。當服務器回復時,future 完成,其值是服務器的響應。
  • 超時事件。當時間到了,future 就完成了,它的值是 ()
  • 在線程池上運行的長時間運行的 CPU 密集型任務。任務完成后,future 完成,其值為任務的返回值。

這里我們舉一個例子:

extern crate futures;
extern crate tokio;
extern crate tokio_core;

use std::error::Error;
use futures::Future;
use futures::future::{ok, done};
use tokio_core::reactor::Core;

fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> {
    Ok(i * i)
}

fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error + 'static>> {
    ok(i * i)
}

fn my_fut() -> impl Future<Item = u32, Error = Box<Error + 'static>> {
    ok(10)
}

fn main() {
    let mut reactor = Core::new().unwrap();

    let chained_future = my_fut().and_then(|retval| {
        done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2))
    });
    let retval3 = reactor.run(chained_future).unwrap();
    println!("{:?}", retval3);
}

這里,我們的 my_fut 的返回值實現了 Future,我們知道它被 Executor 執行完成后,會返回一個 u32 或者 一個 Box<Error + 'static>,而現在我們就可以通過 .and_then 來處理這個 u32 的值,而最終我們將我們的 future 鏈接了起來,交給 Executor 執行。

Tasks

Tasks 是應用程序的 “邏輯單元”。他們以 Future trait 來表示。一旦 task 完成處理,task 的 future 實現將以值 () 返回。

Tasks 被傳遞給 Executor,Executor 處理 task 的調度。Executor 通常在一組或一組線程中調度許多 task。task 不得執行計算繁重的邏輯,否則將阻止其他 task 執行。

Tasks 既可以通過實現 Future trait 來實現,也可以通過使用 futures 和 tokio crates 中的各種組合器函數來構建 future 來實現。

I/O

tokio crate 也提供了 TCP、UDP 的支持,不像 std 中的實現,tokio 的網絡類型是基於 poll 模型的,並且當他們的 “就緒” 狀態改變時會通知 task executors。在 tokio::net 模塊中你將會找到像 TcpListener、TcpStream、UdpSocket 這些類型。

所有這些類型都提供了 future 的 API 以及 poll API。

Tokio 網絡類型被一個基於 mio 的 reactor 所驅動,默認情況下,它在后台線程上啟動。

使用 future API

一些幫助使用 future API 的函數包括:

  • incoming:入站 TCP 連接的 Stream。
  • read_exact:將n字節准確讀入緩沖區。
  • read_to_end:將所有字節讀入緩沖區。
  • write_all:寫緩沖區的全部內容。
  • copy:將字節從一個 I/O 句柄復制到另一個。

這些函數中的許多都是源於 AsyncRead 和 AsyncWrite trait 的。這些 trait 類似於 std 中的 Read 和 Write,但僅僅用於具有 future aware 的類型,例如符合下面的特征:

  • 調用 read 或 write 是非阻塞的,他們從不阻塞調用線程。
  • 如果一個調用會以其他方式阻塞,那么會返回一個錯誤 WouldBlock。如果發生這種情況,則當前 future 的task 將在 I/O 再次准備就緒時被調度。

注意 AsyncRead 和 AsyncWrite 類型的用戶應該使用 poll_read 和 poll_write 代替直接調用 read 和 write

例如,以下是如何接受連接,從中讀取5個字節,然后將5個字節寫回 socket 的例子:

let server = listener.incoming().for_each(|socket| {
    println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());

    let buf = vec![0; 5];

    let connection = io::read_exact(socket, buf)
        .and_then(|(socket, buf)| {
            io::write_all(socket, buf)
        })
        .then(|_| Ok(())); // Just discard the socket and buffer

    // Spawn a new task that processes the socket:
    tokio::spawn(connection);

    Ok(())
})

使用 Poll API

當手動實現 Future 時,需要使用基於 Poll 的 API,並且你需要返回 Async。當您需要實現自己的處理自定義邏輯的組合器時,這非常有用。

例如,這就是如何為 TcpStream 實現 read_exact future 的例子。

pub struct ReadExact {
    state: State,
}

enum State {
    Reading {
        stream: TcpStream,
        buf: Vec<u8>,
        pos: usize,
    },
    Empty,
}

impl Future for ReadExact {
    type Item = (TcpStream, Vec<u8>);
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
        match self.state {
            State::Reading {
                ref mut stream,
                ref mut buf,
                ref mut pos
            } => {
                while *pos < buf.len() {
                    let n = try_ready!({
                        stream.poll_read(&mut buf[*pos..])
                    });
                    *pos += n;
                    if n == 0 {
                        let err = io::Error::new(
                            io::ErrorKind::UnexpectedEof,
                            "early eof");

                        return Err(err)
                    }
                }
            }
            State::Empty => panic!("poll a ReadExact after it's done"),
        }

        match mem::replace(&mut self.state, State::Empty) {
            State::Reading { stream, buf, .. } => {
                Ok(Async::Ready((stream, buf)))
            }
            State::Empty => panic!(),
        }
    }
}

數據報

UdpSocket 類型提供了許多方便的方法:

  • send_dgram 允許您將發送數據報作為 future,如果無法立即發送整個數據報,則返回錯誤。
  • recv_dgram 表示將數據報讀入緩沖區。

示例

#[macro_use]
extern crate log;
extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::future::{done, ok};
use futures::{Future, Stream};
use tokio::io::{self as tio, AsyncRead};
use tokio::net::{TcpListener, TcpStream};

use std::error;
use std::fmt;
use std::io;

fn client_fut(socket: TcpStream) -> impl Future<Item = (), Error = ()> + 'static + Send {
    futures::lazy(move || match socket.peer_addr() {
        Ok(peer) => {
            info!("Tcp connection [{:?}] connected to server", peer);
            Ok((socket, peer))
        }
        Err(err) => {
            error!("Fetch peer address failed: {:?}", err);
            Err(())
        }
    }).and_then(move |(socket, peer)| {
            let buf = vec![0; 5];
            let svc_fut = tio::read_exact(socket, buf)
                .and_then(|(socket, buf)| {
                    tio::write_all(socket, buf)
                })
                .then(|_| Ok(()));

            tokio::spawn(svc_fut);
            ok(())
        })
}

fn server_fut(listener: TcpListener) -> impl Future<Item = (), Error = ()> + 'static + Send {
    listener
        .incoming()
        .for_each(|socket| {
            tokio::spawn(client_fut(socket));
            Ok(())
        })
        .map_err(|err| {
            error!("Accept connection failed: {:?}", err);
        })
}

fn run() -> Result<(), io::Error> {
    let addr = "127.0.0.1:1234".parse().unwrap();
    info!("Listening on {:?}", addr);

    let listener = TcpListener::bind(&addr)?;
    let server_fut = server_fut(listener);

    tokio::run(server_fut);
    Ok(())
}

fn print<T: fmt::Debug, E: error::Error>(result: Result<T, E>) {
    match result {
        Ok(any) => info!("Result: {:?}", any),
        Err(err) => error!("Error: {:?}", err),
    }
}

fn init() {
    pretty_env_logger::init();
}

fn main() {
    init();
    print(run());
}

Timers

在編寫基於網絡的應用程序時,通常需要根據時間執行操作。

  • 在一段時間后運行一些代碼。
  • 取消運行時間過長的運行操作。
  • 以一定間隔重復執行操作。

這些用例通過使用 timer 模塊中提供的各種計時器 API 來處理。

延遲運行代碼

在這個例子中,我們希望在一段時間后執行任務。為此,我們使用 Delay API。我們要做的只是將 "Hello world!" 寫到終端。

use tokio::prelude::*;
use tokio::timer::Delay;

use std::time::{Duration, Instant};

fn main() {
    let when = Instant::now() + Duration::from_millis(100);
    let task = Delay::new(when)
        .and_then(|_| {
            println!("Hello world!");
            Ok(())
        })
        .map_err(|e| panic!("delay errored; err={:?}", e));

    tokio::run(task);
}

為長時間運行的操作設置 Timeout

在編寫健壯的網絡應用程序時,確保在合理的時間內完成操作至關重要。在等待來自外部的,不受信任的來源的數據時尤其如此。

該 Deadline 類型確保操作在固定的時間內完成。

use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;

use std::time::{Duration, Instant};

fn read_four_bytes(socket: TcpStream)
    -> Box<Future<Item = (TcpStream, Vec<u8>), Error = ()>>
{
    // The instant at which the read will be aborted if
    // it has not yet completed.
    let when = Instant::now() + Duration::from_secs(5);

    let buf = vec![0; 4];
    let fut = io::read_exact(socket, buf)
        .deadline(when)
        .map_err(|_| println!("failed to read 4 bytes by deadline"));

    Box::new(fut)
}

周期性運行代碼

在一個時間間隔內重復運行代碼對於在套接字上發送 PING 消息,或經常檢查配置文件等情況很有用。

Interval 類型實現了 Stream,並以指定的速率掛起。

use tokio::prelude::*;
use tokio::timer::Interval;

use std::time::{Duration, Instant};

fn main() {
    let task = Interval::new(Instant::now(), Duration::from_millis(100))
        .take(10)
        .for_each(|instant| {
            println!("fire; instant={:?}", instant);
            Ok(())
        })
        .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

計時器的注意事項

Tokio 計時器的粒度為 1 毫秒。任何更小的間隔都會向上舍入到最接近的毫秒。定時器在用戶域中實現(即不使用操作系統定時器,像 linux 上的 timerfd)。它使用分層散列計時器輪實現,在創建,取消和觸發超時時提供有效的恆定時間復雜度。

Tokio 運行時包括每個工作線程一個計時器實例。這意味着,如果運行時啟動4個工作線程,則將有4個計時器實例。這在大多數情況下避免了同步,因為當使用計時器時,任務將在位於當前線程上的狀態下操作。

也就是說,計時器實現是線程安全的,並支持從任何線程使用。

基本組合器

下面是關於 Future 的圖表,來自於 Cheatsheet for Futures 。

// Constructing leaf futures
fn empty ()             -> Future<T, E>
fn ok    (T)            -> Future<T, E>
fn err   (E)            -> Future<T, E>
fn result(Result<T, E>) -> Future<T, E>

// General future constructor
fn poll_fn(FnMut(thread_local!(Task)) -> Poll<T, E>) -> Future<T, E>

// Mapping futures
fn Future::map     (Future<T, E>, FnOnce(T) -> U) -> Future<U, E>
fn Future::map_err (Future<T, E>, FnOnce(E) -> F) -> Future<T, F>
fn Future::from_err(Future<T, Into<E>>)           -> Future<T, E>

// Chaining (sequencing) futures
fn Future::then    (Future<T, E>, FnOnce(Result<T, E>) -> IntoFuture<U, F>) -> Future<U, F>
fn Future::and_then(Future<T, E>, FnOnce(T)            -> IntoFuture<U, E>) -> Future<U, E>
fn Future::or_else (Future<T, E>, FnOnce(E)            -> IntoFuture<T, F>) -> Future<T, F>
fn Future::flatten (Future<Future<T, E>, Into<E>>)                          -> Future<T, E>

// Joining (waiting) futures
fn Future::join (Future<T, E>, IntoFuture<U, E>)                                                       -> Future<(T, U),          E>
fn Future::join3(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>)                                     -> Future<(T, U, V),       E>
fn Future::join4(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>)                   -> Future<(T, U, V, W),    E>
fn Future::join5(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>, IntoFuture<X, E>) -> Future<(T, U, V, W, X), E>
fn join_all     (IntoIterator<IntoFuture<T, E>>)                                                       -> Future<Vec<T>,          E>

// Selecting (racing) futures
fn Future::select (Future<T, E>, IntoFuture<T, E>) -> Future<(T, Future<T, E>), (E, Future<T, E>)>
fn Future::select2(Future<T, E>, IntoFuture<U, F>) -> Future<Either<(T, Future<U, F>), (U, Future<T, E>)>, Either<(E, Future<U, F>), (F, Future<T, E>)>>
fn select_all     (IntoIterator<IntoFuture<T, E>>) -> Future<(T, usize, Vec<Future<T, E>>), (E, usize, Vec<Future<T, E>>)>
fn select_ok      (IntoIterator<IntoFuture<T, E>>) -> Future<(T, Vec<Future<T, E>>), E>

// Utility
fn lazy         (FnOnce() -> IntoFuture<T, E>)             -> Future<T, E>
fn loop_fn      (S, FnMut(S) -> IntoFuture<Loop<T, S>, E>) -> Future<T, E>

// Miscellaneous
fn Future::into_stream   (Future<T, E>)            -> Stream<T, E>
fn Future::flatten_stream(Future<Stream<T, E>, E>) -> Stream<T, E>
fn Future::fuse          (Future<T, E>)            -> Future<T, E>
fn Future::catch_unwind  (Future<T, E>+UnwindSafe) -> Future<Result<T, E>, Any+Send>
fn Future::shared        (Future<T, E>)            -> Future<SharedItem<T>, SharedError<E>>+Clone
fn Future::wait          (Future<T, E>)            -> Result<T, E>

這部分的內容推薦參考這篇文章,https://www.jianshu.com/p/5059c403a335

本文不再贅述。

返回 futures

在使用 futures 時,您可能需要做的第一件事就是返回一個 Future。這有幾種選擇,從最符合人體工程學到最不符合。

  • Trait 對象
  • impl Trait

Trait 對象

首先,您始終可以選擇返回一個 boxed trait 對象

fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
    // ...
}

這個策略的好處是它很容易寫出來並且易於創建。

這種方法的缺點是,在構建 future 時需要運行時分配,在使用該 future 時需要動態分派。Box 需要在堆上分配而 future 會被置入其中。

通常可以通過僅在您想要返回的 future 鏈的最后來 Boxing 來減少分配。

impl Trait

在 Rust 1.26 版本之后(2018年5月7日發布),我們可以使用叫做 impl Trait 的新的語言特性。

fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
    where F: Future<Item = i32>,
{
    f.map(|i| i + 10)
}

這種方法的好處在於它是零開銷的,不再需要 Box

使用 framed streams

Tokio 有幫助函數將字節流轉換為幀流。字節流的例子包括 TCP 連接,管道,文件對象以及標准輸入和輸出。在Rust中,streams 很容易識別,因為它們實現了 Read 和 Write trait。

最簡單的幀化的消息形式之一是行分隔消息。每條消息都以一個 \n 字符結尾。讓我們看一下如何使用 tokio 實現行分隔消息流。

編寫編解碼器

編解碼器實現 tokio_codec::Decoder 和 tokio_codec::Encoder trait。他的工作就是將字節轉為幀以及相反。這些 trait 與 tokio_codec::Framed struct一起使用,以提供字節流的緩沖,解碼和編碼。

讓我們看一下LinesCodec struct 的簡化版本,它實現了行分隔消息的解碼和編碼。

pub struct LinesCodec {
    // Stored index of the next index to examine for a `\n` character.
    // This is used to optimize searching.
    // For example, if `decode` was called with `abc`, it would hold `3`,
    // because that is the next index to examine.
    // The next time `decode` is called with `abcde\n`, the method will
    // only look at `de\n` before returning.
    next_index: usize,
}

這里的注釋解釋了,由於字節被緩存直到找到一行,因此每次接收數據時從緩沖區的開頭搜索 \n 是浪費的。保存緩沖區的最后長度並在收到新數據時從那里開始搜索將更有效。

Decoder::decode 是在底層流上接收到數據時調用的方法。該方法可以生成幀或返回 Ok(None) 以表示它需要更多數據來生成幀。該 decode 方法負責通過使用 BytesMut 的方法將不再需要緩沖的數據刪除。如果數據未刪除,緩沖區將持續增長。

讓我們來看看如何為 LinesCodec 實現 Decoder::decode

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
    // Look for a byte with the value '\n' in buf. Start searching from the search start index.
    if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n')
    {
        // Found a '\n' in the string.

        // The index of the '\n' is at the sum of the start position + the offset found.
        let newline_index = newline_offset + self.next_index;

        // Split the buffer at the index of the '\n' + 1 to include the '\n'.
        // `split_to` returns a new buffer with the contents up to the index.
        // The buffer on which `split_to` is called will now start at this index.
        let line = buf.split_to(newline_index + 1);

        // Trim the `\n` from the buffer because it's part of the protocol,
        // not the data.
        let line = &line[..line.len() - 1];

        // Convert the bytes to a string and panic if the bytes are not valid utf-8.
        let line = str::from_utf8(&line).expect("invalid utf8 data");

        // Set the search start index back to 0.
        self.next_index = 0;

        // Return Ok(Some(...)) to signal that a full frame has been produced.
        Ok(Some(line.to_string()))
    } else {
        // '\n' not found in the string.

        // Tell the next call to start searching after the current length of the buffer
        // since all of it was scanned and no '\n' was found.
        self.next_index = buf.len();

        // Ok(None) signifies that more data is needed to produce a full frame.
        Ok(None)
    }
}

當需要將幀寫入下層流時,Encoder::encode 方法被調用。幀必須寫入緩沖區並作為一個參數。寫入緩沖區的數據將在准備好發送數據時寫入流。

現在讓我們來看看如何為 LinesCodec 實現 Encoder::encode

fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
    // It's important to reserve the amount of space needed. The `bytes` API
    // does not grow the buffers implicitly.
    // Reserve the length of the string + 1 for the '\n'.
    buf.reserve(line.len() + 1);

    // String implements IntoBuf, a trait used by the `bytes` API to work with
    // types that can be expressed as a sequence of bytes.
    buf.put(line);

    // Put the '\n' in the buffer.
    buf.put_u8(b'\n');

    // Return ok to signal that no error occured.
    Ok(())
}

編碼信息通常更簡單。這里我們只需保留所需的空間並將數據寫入緩沖區。

使用編解碼器

使用編解碼器的最簡單方法是使用 Framed 結構體。它是實現自動緩沖的編解碼器的包裝器。該 Framed 結構體既是 Stream 也是 Sink。因此,您可以從中接收幀並向其發送幀。

您可以使用任何實現了 AsyncRead 和 AsyncWrite trait 的類型,使用 AsyncRead::framed 方法創建一個 Framed 結構體。

TcpStream::connect(&addr).and_then(|sock| {
    let framed_sock = sock.framed(LinesCodec::new());
    framed_sock.for_each(|line| {
        println!("Received line {}", line);
        Ok(())
    })
});

推薦閱讀

作者:Hevienz
出處:http://www.cnblogs.com/hymenz/
本博客原創作品采用 知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議進行許可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM