Tokio - Rust Concurrency (draft)
I started to discover Tokio framework.
https://tokio.rs/tokio/tutorial
You can find all official tutorial code at here.
What is Tokio ultimately
- Asynchronous Rust code does not run on its own, so you must choose a runtime to execute it.
- Tokio is an asynchronous runtime for the Rust programming language.
Handson tutorial with Redis-clone
We’ll run a clone of Redis server. While running the redis server, your prompt will be blocked.
For someone who doesn’t know about Redis, it is a key-value store server, and you need to follow a redis protocol on TCP/IP.
Overview:
- Key-value pairs are stored in
HashMap
- The client SDK is given for the tutorial purpose.
Run Redis-clone and test it
Install mini-redis
server:
cargo install mini-redis
Now you can run the Redis server by mini-redis-server
command.
After run midi-redis-server
, open another terminal and you can confirm the Redeis server runs.
➜ mini-redis-cli get foo
(nil)
We’ve tested mini-redis
server and client.
Let’s build it by ourselves.
TCP client with async
Cargo.toml
:
tokio = { version = "^1.7", features = ["full"] }
mini-redis = "0.4"
src/main.rs
:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
client.set("hello", "world".into()).await?;
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
Run mini-redis-server
in another terminal, and test the client:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/my-redis`
got value from the server; result=Some(b"world")
Quick explanation:
- In
mini-redis
crate:pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> ... pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>>
- Rust transforms the
async fn
at compile time into a routine that operates asynchronously. For example, the#[tokio::main]
function is a macro which transforms theasync fn main()
into a synchronousfn main()
that initializes a runtime instance and executes the async main function.Transformed:#[tokio::main] async fn main() { println!("hello"); }
fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!("hello"); }) }
- Calling an
async fn
returns a value representing the operation. This is conceptually analogous to a zero-argument closure. To actually run the operation, you should use the.await
operator on the return value. - Rust’s async operations are “lazy” (= doesn’t run utill you call
.await
explicitly).
Tokio runtime
Asynchronous functions must be executed by a runtime. The runtime contains the asynchronous task scheduler, provides evented I/O, timers, etc. The runtime does not automatically start, so the main function needs to start it.
TCP listner with async
Let’s implement mini-redis
.
tokio::net::TcpListener
is an async TCP listener:
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis "frames" instead of byte streams.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
After cargo run
, try mini-redis-cli get foo
and it will return Error: "unimplemented"
, where as on the terminal which you run cargo run
, it printsGOTA: Array([Bulk(b"get"), Bulk(b"foo")])
.
- In Tokio crate:
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<TcpListener> pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)>
loop
doesn’t happen unless TCP listener get a request..await
atlistner.accept()
calls the function, and it contains another async function inside. Since the async function inside never returns unless it get a request,listener.accept().await
never returns until it get a request.- The snippet above accepts only one request at the same time. We will improve it now.
Accept multiple requests without blocking
Wrapping process
with tokio::spawn
as follows:
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
- A Tokio task is an asynchronous green thread. They are created by passing an
async
block totokio::spawn
. - Tasks are the unit of execution managed by the scheduler.
- The
tokio::spawn
function returns aJoinHandle
, which the caller may use to interact with the spawned task. (same asstd::thread::spawn
) - Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do (i.e., when we call
.await
). Lazy async. - Note that we
move
d the socket. - The task can also be moved between threads after being spawned.
- Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory.
- When
.await
is called, the task “yields” back to the scheduler.
Here is another async execution example which show handler explicitly:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
Awaiting on
JoinHandle
returns aResult
.The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread.
As in standard libraries, tasks spawned by
tokio::spawn
must implementSend
. This allows the Tokio runtime to move the tasks between threads while they are suspended at an.await
.
Data shared between threads
Send
bound
Tasks spawned by tokio::spawn
must implement Send
.
This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await
.
Spawned tasks must not contain any references to data owned outside the task.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc
.
The next time the task is executed, it resumes from the point it last yielded.
To make this work, all state that is used after .await
must be saved by the task.
Tasks are Send
when all data that is held across .await
calls is Send
.
It there is a data which is not Send
, compiler returns error.
Shared/Persistent data in memory
Usually, there are two technique to share data between threads:
- Guard the shared state with a Mutex. (for simple data)
- Spawn a task to manage the state and use message passing to operate on it. (when asynchronous work required such as I/O primitives)
Hands on
- We’re going to use
bytes
crate for TCP dataframe.- The crate is developed by Tokio project. The goal of
Bytes
is to provide a robust byte array structure for network programming. - The
Bytes
type is roughly anArc<Vec<u8>>
but with some added capabilities.
- The crate is developed by Tokio project. The goal of
- Create database as
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
- The DB hashmap is managed by
main
thread. - Note that, throughout Tokio, the term handle is used to reference a value that provides access to some shared state.
- In our case, the reference counter
Arc
to the database object is a handle. - We will
clone
the reference before spawng it, and pass the cloned reference to tasks. - Before a task operate on the handle, the task need to explicitly
lock()
the handle.
- In our case, the reference counter
Rewrite the main
function as follows:
use bytes::Bytes;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
Rewrite the process
function as follows:
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
Upgrade DB
Contention
If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch std::sync::Mutex
to tokio::sync::Mutex
.
This type acts similarly to std::sync::Mutex
, with two major differences: lock
is an async
method so does not block, and the lock guard is designed to be held across .await
points.
std::sync::MutexGuard
type is not Send
.
This means that you can’t send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await
.
Shard
Cf. The dashmap
crate provides an implementation of a more sophisticated sharded hash map.
Old note (dirty)
About futures
crate
Why the name of crate is “futures”? Because async jobs are executed in the future (in other words, not immediately when control flow reaches).
- The return value of an
async fn
is an anonymous type that implements theFuture
trait.
else
- Tasks are the unit of execution managed by the scheduler (of tokio). Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do.
When you spawn a task on the Tokio runtime, its type’s lifetime must be 'static
.
This means that the spawned task must not contain any references to data owned outside the task.
cf) https://github.com/pretzelhammer/rust-blog/blob/master/posts/common-rust-lifetime-misconceptions.md#2-if-t-static-then-t-must-be-valid-for-the-entire-program
anyway, thread is independent, if you want to interact, use move
to move the ownerwhip.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc
.
the compiler is unable to reason about how long a newly spawned task stays around, so the only way it can be sure that the task doesn’t live too long is to make sure it may live forever.
check the main loop
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
let mut i = 0u32;
let mut j = 0u32;
loop {
i += 1;
println!("i-loop: {i}");
// The second item contains the ip and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
//process::exit(1);
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move { // socket moved to. want to share? used Arc
process(socket).await;
j += 1;
println!("j-loop: {j}");
});
}
}
warning: `test` (bin "test") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/test`
i-loop: 1
i-loop: 2
j-loop: 1
i-loop: 3
j-loop: 1
i-loop: 4
j-loop: 1
Tokio Send issue, own struct. https://tokio.rs/tokio/tutorial/shared-state#restructure-your-code-to-not-hold-the-lock-across-an-await
RwLock and mutex
https://doc.rust-lang.org/std/sync/struct.RwLock.html
In comparison, a
Mutex
does not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. AnRwLock
will allow any number of readers to acquire the lock as long as a writer is not holding the lock.
https://riptutorial.com/rust/example/24527/read-write-locks
RAII guard
https://rust-unofficial.github.io/patterns/patterns/behavioural/RAII.html
Chatting app
https://www.youtube.com/watch?v=4DqP57BHaXI