Tokio - Rust Concurrency (draft)

Page content

I started to discover Tokio framework.

https://tokio.rs/tokio/tutorial

You can find all official tutorial code at here.

What is Tokio ultimately

  • Asynchronous Rust code does not run on its own, so you must choose a runtime to execute it.
  • Tokio is an asynchronous runtime for the Rust programming language.

Handson tutorial with Redis-clone

We’ll run a clone of Redis server. While running the redis server, your prompt will be blocked.

For someone who doesn’t know about Redis, it is a key-value store server, and you need to follow a redis protocol on TCP/IP.

Overview:

  • Key-value pairs are stored in HashMap
  • The client SDK is given for the tutorial purpose.

Run Redis-clone and test it

Install mini-redis server:

cargo install mini-redis

Now you can run the Redis server by mini-redis-server command. After run midi-redis-server, open another terminal and you can confirm the Redeis server runs.

➜ mini-redis-cli get foo
(nil)

We’ve tested mini-redis server and client. Let’s build it by ourselves.

TCP client with async

Cargo.toml:

tokio = { version = "^1.7", features = ["full"] }
mini-redis = "0.4"

src/main.rs:

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    client.set("hello", "world".into()).await?;

    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

Run mini-redis-server in another terminal, and test the client:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/my-redis`
got value from the server; result=Some(b"world")

Quick explanation:

  • In mini-redis crate:
    pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client>
    ...
    pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>>
    
  • Rust transforms the async fn at compile time into a routine that operates asynchronously. For example, the #[tokio::main] function is a macro which transforms the async fn main() into a synchronous fn main() that initializes a runtime instance and executes the async main function.
    #[tokio::main]
    async fn main() {
        println!("hello");
    }
    
    Transformed:
    fn main() {
        let mut rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async {
            println!("hello");
        })
    }
    
  • Calling an async fn returns a value representing the operation. This is conceptually analogous to a zero-argument closure. To actually run the operation, you should use the .await operator on the return value.
  • Rust’s async operations are “lazy” (= doesn’t run utill you call .await explicitly).

Tokio runtime

Asynchronous functions must be executed by a runtime. The runtime contains the asynchronous task scheduler, provides evented I/O, timers, etc. The runtime does not automatically start, so the main function needs to start it.

TCP listner with async

Let’s implement mini-redis.

tokio::net::TcpListener is an async TCP listener:

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis "frames" instead of byte streams.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

After cargo run, try mini-redis-cli get foo and it will return Error: "unimplemented", where as on the terminal which you run cargo run, it printsGOTA: Array([Bulk(b"get"), Bulk(b"foo")]).

  • In Tokio crate:
    pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<TcpListener>
    pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)>
    
  • loop doesn’t happen unless TCP listener get a request. .await at listner.accept() calls the function, and it contains another async function inside. Since the async function inside never returns unless it get a request, listener.accept().await never returns until it get a request.
  • The snippet above accepts only one request at the same time. We will improve it now.

Accept multiple requests without blocking

Wrapping process with tokio::spawn as follows:

loop {
    let (socket, _) = listener.accept().await.unwrap();
    tokio::spawn(async move {
        process(socket).await;
    });
}
  • A Tokio task is an asynchronous green thread. They are created by passing an async block to tokio::spawn.
  • Tasks are the unit of execution managed by the scheduler.
  • The tokio::spawn function returns a JoinHandle, which the caller may use to interact with the spawned task. (same as std::thread::spawn)
  • Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do (i.e., when we call .await). Lazy async.
  • Note that we moved the socket.
  • The task can also be moved between threads after being spawned.
  • Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory.
  • When .await is called, the task “yields” back to the scheduler.

Here is another async execution example which show handler explicitly:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });

    // Do some other work

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}
  • Awaiting on JoinHandle returns a Result.

  • The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread.

  • As in standard libraries, tasks spawned by tokio::spawn must implement Send. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.

Data shared between threads

Send bound

Tasks spawned by tokio::spawn must implement Send. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.

Spawned tasks must not contain any references to data owned outside the task. If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

The next time the task is executed, it resumes from the point it last yielded. To make this work, all state that is used after .await must be saved by the task. Tasks are Send when all data that is held across .await calls is Send. It there is a data which is not Send, compiler returns error.

Shared/Persistent data in memory

Usually, there are two technique to share data between threads:

  • Guard the shared state with a Mutex. (for simple data)
  • Spawn a task to manage the state and use message passing to operate on it. (when asynchronous work required such as I/O primitives)

Hands on

  • We’re going to use bytes crate for TCP dataframe.
    • The crate is developed by Tokio project. The goal of Bytes is to provide a robust byte array structure for network programming.
    • The Bytes type is roughly an Arc<Vec<u8>> but with some added capabilities.
  • Create database as type Db = Arc<Mutex<HashMap<String, Bytes>>>;
  • The DB hashmap is managed by main thread.
  • Note that, throughout Tokio, the term handle is used to reference a value that provides access to some shared state.
    • In our case, the reference counter Arc to the database object is a handle.
    • We will clone the reference before spawng it, and pass the cloned reference to tasks.
    • Before a task operate on the handle, the task need to explicitly lock() the handle.

Rewrite the main function as follows:

use bytes::Bytes;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

Rewrite the process function as follows:

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        connection.write_frame(&response).await.unwrap();
    }
}

Upgrade DB

Contention

A (thread) contention in concurrent programing is simply when two threads try to access either the same resource or related resources in such a way that at least one of the contending threads runs more slowly than it would if the other thread(s) were not running..

If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch std::sync::Mutex to tokio::sync::Mutex. This type acts similarly to std::sync::Mutex, with two major differences: lock is an async method so does not block, and the lock guard is designed to be held across .await points.

std::sync::MutexGuard type is not Send. This means that you can’t send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await.

Shard

Cf. The dashmap crate provides an implementation of a more sophisticated sharded hash map.


Old note (dirty)

About futures crate

Why the name of crate is “futures”? Because async jobs are executed in the future (in other words, not immediately when control flow reaches).

  • The return value of an async fn is an anonymous type that implements the Future trait.

else

  • Tasks are the unit of execution managed by the scheduler (of tokio). Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do.

When you spawn a task on the Tokio runtime, its type’s lifetime must be 'static. This means that the spawned task must not contain any references to data owned outside the task. cf) https://github.com/pretzelhammer/rust-blog/blob/master/posts/common-rust-lifetime-misconceptions.md#2-if-t-static-then-t-must-be-valid-for-the-entire-program

anyway, thread is independent, if you want to interact, use move to move the ownerwhip.

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

the compiler is unable to reason about how long a newly spawned task stays around, so the only way it can be sure that the task doesn’t live too long is to make sure it may live forever.

check the main loop

  async fn main() {
      // Bind the listener to the address
      let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
      let mut i = 0u32;
      let mut j = 0u32;
  
      loop {
          i += 1;
          println!("i-loop: {i}");
          // The second item contains the ip and port of the new connection.
          let (socket, _) = listener.accept().await.unwrap();
          //process::exit(1);
  
          // A new task is spawned for each inbound socket.  The socket is
          // moved to the new task and processed there.
          tokio::spawn(async move { // socket moved to. want to share? used Arc
              process(socket).await; 
              j += 1;
              println!("j-loop: {j}");
          });
      }
  }
warning: `test` (bin "test") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/test`
i-loop: 1
i-loop: 2
j-loop: 1
i-loop: 3
j-loop: 1
i-loop: 4
j-loop: 1

Tokio Send issue, own struct. https://tokio.rs/tokio/tutorial/shared-state#restructure-your-code-to-not-hold-the-lock-across-an-await

RwLock and mutex

https://doc.rust-lang.org/std/sync/struct.RwLock.html

In comparison, a Mutex does not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

https://riptutorial.com/rust/example/24527/read-write-locks

RAII guard

https://rust-unofficial.github.io/patterns/patterns/behavioural/RAII.html

Chatting app

https://www.youtube.com/watch?v=4DqP57BHaXI

Later

https://youtu.be/ThjvMReOXYM