Write a simple TCP chat server in Rust
source link: https://developerlife.com/2024/01/13/write-simple-chat-server-in-rust/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Build a chat server using Tokio #
In this tutorial we will build a simple chat server using Tokio. The server will be able to handle multiple clients, and each client will be able to send messages to the server, which will then broadcast the message to all other connected clients.
- We will use Tokioโs
tokio::net::TcpListener
andtokio::net::TcpStream
to create a TCP server that listens for incoming connections and handles them concurrently. - We will also use Tokioโs
tokio::sync::broadcast
to broadcast messages to all connected clients.
Read this tutorial to learn more about the basics of TCP client and server programming in Rust (without using Tokio).
Hereโs a video of the app that we are going to build in action.
๐ Please star the r3bl-open-core repo.
You can get lots of useful Rust command line apps from the
r3bl-cmdr
crate in this repo:
- ๐ฑ
giti
: run interactive git commands with confidence in your terminal- ๐ฆ
edi
: edit Markdown with style in your terminal๐ฆ You can install them using
cargo install r3bl-cmdr
giti in action edi in actionIf you would like to get involved in an open source project and like Rust crates, we welcome your contributions to the r3bl-open-core repo.
You can find the finished source code for this tutorial here.
The chat server comprises all these pieces #
โโCLIENT-1โโโโโโโโ โโCLIENTโ2โโโโโโโโ โโCLIENTโ3โโโโโโโ
โ โ โ โ โ โ
โโโโโโโโโผโโโโโโโโโ โโโโโโโโผโโโโโโโโโโ โโโโโโโผโโโโโโโโโโ
โ โ โ
โโSERVERโโโโผโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโ
โ โ โ โ โ
โ โ
โ handle_client_task() handle_client_task() handle_client_task() โ
โ โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โโโโโโ โโโโโโ โ โ โโโโโโ โโโโโโ โ โ โโโโโโ โโโโโโ โ โ
โ โ โ TX โ โ RX โ โ โ โ TX โ โ RX โ โ โ โ TX โ โ RX โ โ โ
โ โ โโโฌโโโ โโโฒโโโ โ โ โโโฌโโโ โโโฒโโโ โ โ โโโฌโโโ โโโฒโโโ โ โ
โ โ โ โ โ โ โ โ โ โ โ โ โ โ
โ โโโโโโโผโโโโโโโผโโโโโโโ โโโโโโโผโโโโโโโผโโโโโโโโ โโโโโโโโผโโโโโโโผโโโโโโโโ โ
โ โ โ โ โ โ โ โ
โ โ โ โ โ โ โ โ
โ โโโโโโโผโโโโโโโดโโโโโโโโโโโโโโโผโโโโโโโดโโโโโโโโโโโโโโโโโโผโโโโโโโดโโโโโโโโ โ
โ โ (TX, RX) = channel::broadcast() โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
The server has a main
function that creates a tokio::net::TcpListener
and listens for
incoming connections. When a new connection is received, it spawns a new task to handle
the connection using tokio::spawn()
.
Using tokio::select!
, the task tries to do the following concurrently, and waits until
one of them completes:
- The task reads messages from its client and broadcasts them to all other connected clients. It also echoes the message back to its client.
- The task listens for messages from other clients and sends them to its client.
When one task above completes, the other is dropped. Then the code path with the completed task executes. Then the code returns to the infinite loop, if it hasnโt returned already.
A client can be any TCP client, such as telnet
, nc
, or PuTTY.
Add dependencies to Cargo.toml #
Letโs create a new project by running cargo create --bin tcp-server-netcat-client
. Then
we will add the following dependencies to our Cargo.toml
file.
# tokio.
tokio = { version = "1.35.1", features = ["full"] }
# stdout logging.
femme = { version = "2.2.1" }
log = { version = "0.4.20" }
# r3bl_rs_utils_core - friendly name generator.
r3bl_rs_utils_core = { version = "0.9.12" }
r3bl_tui = { version = "0.5.1" }
Main function #
We will implement the following algorithm for our server in our main function:
- Create a broadcast channel. It will be shared by all the client tasks.
- Create
TcpListener
and bind to an address & port. - Loop:
- Accept socket connection, and get its
TCPStream
. - Use
tokio::spawn()
to spawn a task to handle this client connection and itsTCPStream
.
- Accept socket connection, and get its
In the task that handles the connection:
- Get
BufReader
&BufWriter
from theTCPStream
. The reader and writer allow us to read data from and write data to the client socket. - Loop:
- Use
tokio::select!
to concurrently:- Read from broadcast channel (via
recv()
):- Send the message to the client (only if it is from a different client) over the
socket (use
BufWriter
to write the message).
- Send the message to the client (only if it is from a different client) over the
socket (use
- Read from socket (via
BufReader::read_line()
):- Read
incoming
from reader. - Call
process(incoming)
and generateoutgoing
. This colorizes theincoming
message with a lolcat effect to generate theoutgoing
message. - Send
incoming
message to other connected clients (via the broadcast channel).
- Read
- Read from broadcast channel (via
- Use
You can find the finished source code for this tutorial here.
Hereโs the code for the main function, and some supporting type aliases and structs:
pub type IOResult<T> = std::io::Result<T>;
#[derive(Debug, Clone)]
pub struct MsgType {
pub socket_addr: SocketAddr,
pub payload: String,
pub from_id: String,
}
#[tokio::main]
pub async fn main() -> IOResult<()> {
let addr = "127.0.0.1:3000";
// Start logging.
femme::start();
// Create TCP listener.
let tcp_listener = TcpListener::bind(addr).await?;
log::info!("Server is ready to accept connections on {}", addr);
// Create channel shared among all clients that connect to the server loop.
let (tx, _) = broadcast::channel::<MsgType>(10);
// Server loop.
loop {
// Accept incoming socket connections.
let (tcp_stream, socket_addr) = tcp_listener.accept().await?;
let tx = tx.clone();
tokio::spawn(async move {
let result = handle_client_task(tcp_stream, tx, socket_addr).await;
match result {
Ok(_) => {
log::info!("handle_client_task() terminated gracefully")
}
Err(error) => log::error!("handle_client_task() encountered error: {}", error),
}
});
}
}
To run the server, you can run cargo run
. There are no command line arguments to pass or
parse.
tokio::spawn does not spawn a new thread, so what does it actually do? #
Since tokio::spawn
sounds similar to thread::spawn
it might be easy to assume that
tokio::spawn
creates a new thread. This would go against the idea of even using tokio
(which is all about concurrency and non blocking IO), since handling one connection per
thread isnโt scalable, which is what we did in
this tutorial: Write a simple TCP chat server in Rust.
tokio::spawn
does not create a thread; it creates a Tokio task, which is a
co-operatively scheduled entity that Tokio knows how to schedule on the Tokio runtime (in
turn, the Tokio runtime can have as many worker threads as you want - from 1 upwards).
By using tokio::spawn
, you allow the Tokio runtime to switch to another task at points
in the task where it has a .await
, and only those points. Your alternative, if you donโt
want multiple tasks, is to use things like select!
, join!
and functions with select
or ` join` in their name to have concurrent I/O in a single task.
The point of spawning in Tokio is twofold:
- If your runtime has multiple threads, then two tasks can execute in parallel on different threads, reducing latency.
- It is almost always easier to understand a complex program in terms of different tasks
doing their work, than in terms of a single large task doing lots of work concurrently
(e.g. using
select
to wait for one of many options, orjoin
to wait for all options to finish).
More information:
- You can get more info on this topic here.
- For an even deeper dive into how Tokio tasks themselves are implemented for intra-task concurrency, please take a look at this excellent article.
Handle client task function #
The handle_client_task
function is where all the magic happens.
- It reads messages from its client (over TCP socket) and broadcasts them to all other connected clients.
- It processes the message from its client and echoes it back to its client (over TCP socket).
- It reads messages from other clients (over broadcast channel) and sends them to its client (over socket).
Hereโs the code for the handle_client_task()
function:
async fn handle_client_task(
mut tcp_stream: TcpStream,
tx: Sender<MsgType>,
socket_addr: SocketAddr,
) -> IOResult<()> {
log::info!("Handle socket connection from client");
let id = friendly_random_id::generate_friendly_random_id();
let mut rx = tx.subscribe();
// Set up buf reader and writer.
let (reader, writer) = tcp_stream.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
// Send welcome message to client w/ ids.
let welcome_msg_for_client =
ColorWheel::lolcat_into_string(&format!("addr: {}, id: {}\n", socket_addr, id));
writer.write(welcome_msg_for_client.as_bytes()).await?;
writer.flush().await?;
let mut incoming = String::new();
loop {
let tx = tx.clone();
tokio::select! {
// Read from broadcast channel.
result = rx.recv() => {
read_from_broadcast_channel(result, socket_addr, &mut writer, &id).await?;
}
// Read from socket.
network_read_result = reader.read_line(&mut incoming) => {
let num_bytes_read: usize = network_read_result?;
// EOF check.
if num_bytes_read == 0 {
break;
}
handle_socket_read(num_bytes_read, &id, &incoming, &mut writer, tx, socket_addr).await?;
incoming.clear();
}
}
}
Ok(())
}
Two concurrent tasks in the tokio::select! block #
- Read from broadcast channel. The function
read_from_broadcast_channel()
does this work. - Read from socket. The function
handle_socket_read()
does this work.
Whichever task completes first, the tokio::select!
block will go down that code path,
and drop the other task.
Handle read from broadcast channel function #
Hereโs the code for the read_from_broadcast_channel()
function:
async fn read_from_broadcast_channel(
result: Result<MsgType, RecvError>,
socket_addr: SocketAddr,
writer: &mut BufWriter<WriteHalf<'_>>,
id: &str,
) -> IOResult<()> {
match result {
Ok(it) => {
let msg: MsgType = it;
log::info!("[{}]: channel: {:?}", id, msg);
if msg.socket_addr != socket_addr {
writer.write(msg.payload.as_bytes()).await?;
writer.flush().await?;
}
}
Err(error) => {
log::error!("{:?}", error);
}
}
Ok(())
}
Handle socket read function #
Hereโs the code for the handle_socket_read()
function:
async fn handle_socket_read(
num_bytes_read: usize,
id: &str,
incoming: &str,
writer: &mut BufWriter<WriteHalf<'_>>,
tx: Sender<MsgType>,
socket_addr: SocketAddr,
) -> IOResult<()> {
log::info!(
"[{}]: incoming: {}, size: {}",
id,
incoming.trim(),
num_bytes_read
);
// Process incoming -> outgoing.
let outgoing = process(&incoming);
// outgoing -> Writer.
writer.write(outgoing.as_bytes()).await?;
writer.flush().await?;
// Broadcast outgoing to the channel.
let _ = tx.send(MsgType {
socket_addr,
payload: incoming.to_string(),
from_id: id.to_string(),
});
log::info!(
"[{}]: outgoing: {}, size: {}",
id,
outgoing.trim(),
num_bytes_read
);
Ok(())
}
fn process(incoming: &str) -> String {
// Remove new line from incoming.
let incoming_trimmed = format!("{}", incoming.trim());
// Colorize it.
let outgoing = ColorWheel::lolcat_into_string(&incoming_trimmed);
// Add new line back to outgoing.
format!("{}\n", outgoing)
}
Next steps #
๐ Please star the r3bl-open-core repo.
You can get lots of useful Rust command line apps from the
r3bl-cmdr
crate in this repo:
- ๐ฑ
giti
: run interactive git commands with confidence in your terminal- ๐ฆ
edi
: edit Markdown with style in your terminal๐ฆ You can install them using
cargo install r3bl-cmdr
giti in action edi in actionIf you would like to get involved in an open source project and like Rust crates, we welcome your contributions to the r3bl-open-core repo.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK