I started to discover Tokio framework.
https://tokio.rs/tokio/tutorial
You can find all official tutorial code at here.
We’ll run a clone of Redis server. While running the redis server, your prompt will be blocked.
For someone who doesn’t know about Redis, it is a key-value store server, and you need to follow a redis protocol on TCP/IP.
Overview:
HashMap
Install mini-redis
server:
cargo install mini-redis
Now you can run the Redis server by mini-redis-server
command.
After run midi-redis-server
, open another terminal and you can confirm the Redeis server runs.
➜ mini-redis-cli get foo
(nil)
We’ve tested mini-redis
server and client.
Let’s build it by ourselves.
Cargo.toml
:
tokio = { version = "^1.7", features = ["full"] }
mini-redis = "0.4"
src/main.rs
:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
client.set("hello", "world".into()).await?;
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
Run mini-redis-server
in another terminal, and test the client:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/my-redis`
got value from the server; result=Some(b"world")
Quick explanation:
mini-redis
crate:pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client>
...
pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>>
async fn
at compile time into a routine that operates asynchronously. For example, the #[tokio::main]
function is a macro which transforms the async fn main()
into a synchronous fn main()
that initializes a runtime instance and executes the async main function.#[tokio::main]
async fn main() {
println!("hello");
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
async fn
returns a value representing the operation. This is conceptually analogous to a zero-argument closure. To actually run the operation, you should use the .await
operator on the return value..await
explicitly).Asynchronous functions must be executed by a runtime. The runtime contains the asynchronous task scheduler, provides evented I/O, timers, etc. The runtime does not automatically start, so the main function needs to start it.
Let’s implement mini-redis
.
tokio::net::TcpListener
is an async TCP listener:
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis "frames" instead of byte streams.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
After cargo run
, try mini-redis-cli get foo
and it will return Error: "unimplemented"
, where as on the terminal which you run cargo run
, it printsGOTA: Array([Bulk(b"get"), Bulk(b"foo")])
.
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<TcpListener>
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)>
loop
doesn’t happen unless TCP listener get a request. .await
at listner.accept()
calls the function, and it contains another async function inside. Since the async function inside never returns unless it get a request, listener.accept().await
never returns until it get a request.Wrapping process
with tokio::spawn
as follows:
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
async
block to tokio::spawn
.tokio::spawn
function returns a JoinHandle
, which the caller may use to interact with the spawned task. (same as std::thread::spawn
).await
). Lazy async.move
d the socket..await
is called, the task “yields” back to the scheduler.Here is another async execution example which show handler explicitly:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
Awaiting on JoinHandle
returns a Result
.
The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread.
As in standard libraries, tasks spawned by tokio::spawn
must implement Send
. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await
.
Send
boundTasks spawned by tokio::spawn
must implement Send
.
This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await
.
Spawned tasks must not contain any references to data owned outside the task.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc
.
The next time the task is executed, it resumes from the point it last yielded.
To make this work, all state that is used after .await
must be saved by the task.
Tasks are Send
when all data that is held across .await
calls is Send
.
It there is a data which is not Send
, compiler returns error.
Usually, there are two technique to share data between threads:
bytes
crate for TCP dataframe.Bytes
is to provide a robust byte array structure for network programming.Bytes
type is roughly an Arc<Vec<u8>>
but with some added capabilities.type Db = Arc<Mutex<HashMap<String, Bytes>>>;
main
thread.Arc
to the database object is a handle.clone
the reference before spawng it, and pass the cloned reference to tasks.lock()
the handle.Rewrite the main
function as follows:
use bytes::Bytes;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
Rewrite the process
function as follows:
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch std::sync::Mutex
to tokio::sync::Mutex
.
This type acts similarly to std::sync::Mutex
, with two major differences: lock
is an async
method so does not block, and the lock guard is designed to be held across .await
points.
std::sync::MutexGuard
type is not Send
.
This means that you can’t send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await
.
Cf. The dashmap
crate provides an implementation of a more sophisticated sharded hash map.
futures
crateWhy the name of crate is “futures”? Because async jobs are executed in the future (in other words, not immediately when control flow reaches).
async fn
is an anonymous type that implements the Future
trait.When you spawn a task on the Tokio runtime, its type’s lifetime must be 'static
.
This means that the spawned task must not contain any references to data owned outside the task.
cf) https://github.com/pretzelhammer/rust-blog/blob/master/posts/common-rust-lifetime-misconceptions.md#2-if-t-static-then-t-must-be-valid-for-the-entire-program
anyway, thread is independent, if you want to interact, use move
to move the ownerwhip.
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc
.
the compiler is unable to reason about how long a newly spawned task stays around, so the only way it can be sure that the task doesn’t live too long is to make sure it may live forever.
check the main loop
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
let mut i = 0u32;
let mut j = 0u32;
loop {
i += 1;
println!("i-loop: {i}");
// The second item contains the ip and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
//process::exit(1);
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move { // socket moved to. want to share? used Arc
process(socket).await;
j += 1;
println!("j-loop: {j}");
});
}
}
warning: `test` (bin "test") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/test`
i-loop: 1
i-loop: 2
j-loop: 1
i-loop: 3
j-loop: 1
i-loop: 4
j-loop: 1
Tokio Send issue, own struct. https://tokio.rs/tokio/tutorial/shared-state#restructure-your-code-to-not-hold-the-lock-across-an-await
https://doc.rust-lang.org/std/sync/struct.RwLock.html
In comparison, a
Mutex
does not distinguish between readers or writers that acquire the lock, therefore blocking any threads waiting for the lock to become available. AnRwLock
will allow any number of readers to acquire the lock as long as a writer is not holding the lock.
https://riptutorial.com/rust/example/24527/read-write-locks
https://rust-unofficial.github.io/patterns/patterns/behavioural/RAII.html
https://www.youtube.com/watch?v=4DqP57BHaXI