Tokio - Rust Concurrency (draft)
I started to discover Tokio framework.
https://tokio.rs/tokio/tutorial
You can find all official tutorial code at here.
What is Tokio ultimately
- Asynchronous Rust code does not run on its own, so you must choose a runtime to execute it.
- Tokio is an asynchronous runtime for the Rust programming language.
Handson tutorial with Redis-clone
We’ll run a clone of Redis server. While running the redis server, your prompt will be blocked.
For someone who doesn’t know about Redis, it is a key-value store server, and you need to follow a redis protocol on TCP/IP.
Overview:
- Key-value pairs are stored in
HashMap - The client SDK is given for the tutorial purpose.
Run Redis-clone and test it
Install mini-redis server:
cargo install mini-redis
Now you can run the Redis server by mini-redis-server command.
After run midi-redis-server, open another terminal and you can confirm the Redeis server runs.
➜ mini-redis-cli get foo
(nil)
We’ve tested mini-redis server and client.
Let’s build it by ourselves.
TCP client with async
Cargo.toml:
tokio = { version = "^1.7", features = ["full"] }
mini-redis = "0.4"
src/main.rs:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
client.set("hello", "world".into()).await?;
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
Run mini-redis-server in another terminal, and test the client:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/my-redis`
got value from the server; result=Some(b"world")
Quick explanation:
- In
mini-rediscrate:pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> ... pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> - Rust transforms the
async fnat compile time into a routine that operates asynchronously. For example, the#[tokio::main]function is a macro which transforms theasync fn main()into a synchronousfn main()that initializes a runtime instance and executes the async main function.Transformed:#[tokio::main] async fn main() { println!("hello"); }fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!("hello"); }) } - Calling an
async fnreturns a value representing the operation. This is conceptually analogous to a zero-argument closure. To actually run the operation, you should use the.awaitoperator on the return value. - Rust’s async operations are “lazy” (= doesn’t run utill you call
.awaitexplicitly).
Tokio runtime
Asynchronous functions must be executed by a runtime. The runtime contains the asynchronous task scheduler, provides evented I/O, timers, etc. The runtime does not automatically start, so the main function needs to start it.
TCP listner with async
Let’s implement mini-redis.
tokio::net::TcpListener is an async TCP listener:
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis "frames" instead of byte streams.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
After cargo run, try mini-redis-cli get foo and it will return Error: "unimplemented", where as on the terminal which you run cargo run, it printsGOTA: Array([Bulk(b"get"), Bulk(b"foo")]).
- In Tokio crate:
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<TcpListener> pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> loopdoesn’t happen unless TCP listener get a request..awaitatlistner.accept()calls the function, and it contains another async function inside. Since the async function inside never returns unless it get a request,listener.accept().awaitnever returns until it get a request.- The snippet above accepts only one request at the same time. We will improve it now.
Accept multiple requests without blocking
Wrapping process with tokio::spawn as follows:
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
- A Tokio task is an asynchronous green thread. They are created by passing an
asyncblock totokio::spawn. - Tasks are the unit of execution managed by the scheduler.
- The
tokio::spawnfunction returns aJoinHandle, which the caller may use to interact with the spawned task. (same asstd::thread::spawn) - Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do (i.e., when we call
.await). Lazy async. - Note that we
moved the socket. - The task can also be moved between threads after being spawned.
- Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory.
- When
.awaitis called, the task “yields” back to the scheduler.
Here is another async execution example which show handler explicitly:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
Awaiting on
JoinHandlereturns aResult.The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread.
As in standard libraries, tasks spawned by
tokio::spawnmust implementSend. This allows the Tokio runtime to move the tasks between threads while they are suspended at an.await.
Data shared between threads
Send bound
Tasks spawned by tokio::spawn must implement Send.
This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.
Spawned tasks must not contain any references to data owned outside the task.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.
The next time the task is executed, it resumes from the point it last yielded.
To make this work, all state that is used after .await must be saved by the task.
Tasks are Send when all data that is held across .await calls is Send.
It there is a data which is not Send, compiler returns error.
Shared/Persistent data in memory
Usually, there are two technique to share data between threads:
- Guard the shared state with a Mutex. (for simple data)
- Spawn a task to manage the state and use message passing to operate on it. (when asynchronous work required such as I/O primitives)
Hands on
- We’re going to use
bytescrate for TCP dataframe.- The crate is developed by Tokio project. The goal of
Bytesis to provide a robust byte array structure for network programming. - The
Bytestype is roughly anArc<Vec<u8>>but with some added capabilities.
- The crate is developed by Tokio project. The goal of
- Create database as
type Db = Arc<Mutex<HashMap<String, Bytes>>>; - The DB hashmap is managed by
mainthread. - Note that, throughout Tokio, the term handle is used to reference a value that provides access to some shared state.
- In our case, the reference counter
Arcto the database object is a handle. - We will
clonethe reference before spawng it, and pass the cloned reference to tasks. - Before a task operate on the handle, the task need to explicitly
lock()the handle.
- In our case, the reference counter
Rewrite the main function as follows:
use bytes::Bytes;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
Rewrite the process function as follows:
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
Upgrade DB
Contention
If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch std::sync::Mutex to tokio::sync::Mutex.
This type acts similarly to std::sync::Mutex, with two major differences: lock is an async method so does not block, and the lock guard is designed to be held across .await points.
std::sync::MutexGuard type is not Send.
This means that you can’t send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await.
Shard
Cf. The dashmap crate provides an implementation of a more sophisticated sharded hash map.
Old note (dirty)
About futures crate
Why the name of crate is “futures”? Because async jobs are executed in the future (in other words, not immediately when control flow reaches).
- The return value of an
async fnis an anonymous type that implements theFuturetrait.
else
- Tasks are the unit of execution managed by the scheduler (of tokio). Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do.
When you spawn a task on the Tokio runtime, its type’s lifetime must be 'static.
This means that the spawned task must not contain any references to data owned outside the task.
cf) https://github.com/pretzelhammer/rust-blog/blob/master/posts/common-rust-lifetime-misconceptions.md#2-if-t-static-then-t-must-be-valid-for-the-entire-program
anyway, thread is independent, if you want to interact, use move to move the ownerwhip.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.
the compiler is unable to reason about how long a newly spawned task stays around, so the only way it can be sure that the task doesn’t live too long is to make sure it may live forever.
check the main loop
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
let mut i = 0u32;
let mut j = 0u32;
loop {
i += 1;
println!("i-loop: {i}");
// The second item contains the ip and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
//process::exit(1);
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move { // socket moved to. want to share? used Arc
process(socket).await;
j += 1;
println!("j-loop: {j}");
});
}
}
warning: `test` (bin "test") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/test`
i-loop: 1
i-loop: 2
j-loop: 1
i-loop: 3
j-loop: 1
i-loop: 4
j-loop: 1
Tokio Send issue, own struct. https://tokio.rs/tokio/tutorial/shared-state#restructure-your-code-to-not-hold-the-lock-across-an-await
RwLock and mutex
https://doc.rust-lang.org/std/sync/struct.RwLock.html
In comparison, a
Mutexdoes not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. AnRwLockwill allow any number of readers to acquire the lock as long as a writer is not holding the lock.
https://riptutorial.com/rust/example/24527/read-write-locks
RAII guard
https://rust-unofficial.github.io/patterns/patterns/behavioural/RAII.html
Chatting app
https://www.youtube.com/watch?v=4DqP57BHaXI