7.12.2022 |

Actor Pattern

An actor is a selfcontained task that performs a single job indepenently of the rest of the application. It can talk to other actors, usually through message passing.

The actor has two parts: The actor itself, and a handle that is used to talk to the actor. The handle is a abstraction over the message passing. The individual actors could even be distributed over multiple machines and talk over the network.

A benefit of this is, that you don't need complicated locking mechanisms because the actors don't share memory between threads and the actor itself is purely sequential.

Deadlocks are still possible if there are cyclic dependencies between actors, but in my experience it's a lot more obvious and easier to debug.

A chat app could for example have the following actors:

  • a global actor responsible for handling all chat rooms
  • an actor for each chat room
  • an actor for each connected user

I'm heavily using actors on my project https://hellopaint.io, where I wrote them in rust. They look something like this:


// Message enum. Oneshot is a channel that can send a single message, it's used here to return the connected persons id
#[derive(Debug)]
enum RoomMessage {
    Join(Connection, oneshot::Sender<u64>),
    ClientMessage(ClientMessage, String),
    Leave(u64),
}


// The state of the actor. It contains a map of connection id and a handle to the user connection actor.
struct RoomActor {
    rx: mpsc::Receiver<RoomMessage>,
    next_connection_id: u64,
    users: HashMap<u64, Connection>,
}

// the actor implementation. It handles the messages send via the actor's handles
impl RoomActor {
    pub async fn run(&mut self) {
        while let Some(msg) = self.rx.recv().await {
            match msg {
                RoomMessage::Join(connection, send_uid) => {
                    let user = connection.user.clone();

                    self.users.insert(self.next_connection_id, connection);
                    send_uid.send(self.next_connection_id).unwrap();
                    self.next_connection_id += 1;

                    self.broadcast(ServerMessage::Joined {
                        user,
                    }).await;
                }
                RoomMessage::ClientMessage(ClientMessage::SendMessage { text }, user) => {
                    self.broadcast(ServerMessage::Message {
                        text,
                        user,
                    }).await;
                }
                RoomMessage::Leave(conn_id) => {
                    self.remove_user(conn_id).await;
                }
                _ => ()
            }
        }
    }

    async fn broadcast(&mut self, msg: ServerMessage) {
        if let Ok(json) = serde_json::to_string(&msg) {
            for connection in self.users.values() {
                connection.sender.send(tungstenite::Message::Text(json.clone())).await.unwrap();
            }
        }
    }

    async fn remove_user(&mut self, conn_id: u64) {
        let connection = self.users.remove(&conn_id);
        if let Some(connection) = connection {
            self.broadcast(ServerMessage::Left {
                user: connection.user,
            }).await;
        }
    }
}

// The handle only holds a sender to talk to the actor
#[derive(Clone)]
pub struct RoomHandle {
    tx: mpsc::Sender<RoomMessage>,
}

// implementation of the handle
impl RoomHandle {
    pub fn new() -> Self {
        let (tx, rx) = mpsc::channel(100);

        let mut actor = RoomActor {
            rx,
            users: HashMap::new(),
            next_connection_id: 0,
        };

        tokio::spawn(async move {
            actor.run().await
        });

        RoomHandle {
            tx,
        }
    }


    pub async fn join(&self, connection: Connection) {
        let (conn_id_tx, conn_id_rx) = oneshot::channel();
        let (sender_tx, mut sender_rx) = mpsc::channel(100);

        self.tx.send(RoomMessage::Join(connection, conn_id_tx)).await.unwrap();

        let conn_id = conn_id_rx.await.unwrap();
        println!("{user} successfully joined room with user id {conn_id}");
    }

    [...]
}

The above code was taken and slightly modified from my rust workshop: https://github.com/lucasmerlin/rust-chat-workshop/blob/finished/server/src/room.rs Please note that it's not a complete actor example because ideally the connection would also be a actor.

Lucas
Zur Übersicht

Mehr vom DevSquad...

Sophia Brandt

Spotify im Terminal auf MacOS

Jan Sauer

Automated dependency updates with Renovate