Skip to content

Commit

Permalink
Almost done with a modular PubSub example
Browse files Browse the repository at this point in the history
  • Loading branch information
chouzar committed Jun 22, 2024
1 parent 55e763b commit 83bbae2
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 52 deletions.
14 changes: 14 additions & 0 deletions test/chat/event.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub type Event {
Event(id: Int, user: String, message: String)
}

pub fn next(events: List(Event), user: String, message: String) -> Event {
Event(id: next_id(events), user: user, message: message)
}

fn next_id(chat: List(Event)) -> Int {
case chat {
[] -> 1
[record, ..] -> record.id + 1
}
}
23 changes: 15 additions & 8 deletions test/chat/pubsub.gleam
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import chat/event
import chip/group
import gleam/erlang/process
import gleam/otp/actor
import gleam/otp/supervisor

pub type PubSub =
process.Subject(group.Message(Nil, Event))
process.Subject(group.Message(Nil, event.Event))

pub type Client =
process.Subject(Event)

pub type Event {
Event(id: Int, user: String, message: String)
// TODO: Maybe do channels for a more complex use case
type Channel {
General
Coffee
Pets
}

pub fn start() -> Result(PubSub, actor.StartError) {
group.start()
}

pub fn subscribe(pubsub: PubSub, subject: process.Subject(Event)) -> Nil {
pub fn childspec() {
supervisor.worker(fn(_param) { start() })
|> supervisor.returning(fn(_param, pubsub) { pubsub })
}

pub fn subscribe(pubsub: PubSub, subject: process.Subject(event.Event)) -> Nil {
group.register(pubsub, subject, Nil)
}

pub fn publish(pubsub: PubSub, message: Event) -> Nil {
pub fn publish(pubsub: PubSub, message: event.Event) -> Nil {
group.dispatch(pubsub, Nil, fn(subscriber) {
process.send(subscriber, message)
})
Expand Down
41 changes: 20 additions & 21 deletions test/chat/server.gleam
Original file line number Diff line number Diff line change
@@ -1,37 +1,44 @@
import chat/event
import chat/pubsub
import gleam/erlang/process
import gleam/option
import gleam/otp/actor

pub opaque type Message {
Connect(client: pubsub.Client)
Send(user: String, message: String)
}
import gleam/otp/supervisor
import gleam/result

pub type Server =
process.Subject(Message)

type State {
State(pubsub: pubsub.PubSub, chat: List(pubsub.Event))
}

pub fn start(pubsub: pubsub.PubSub) -> Result(Server, actor.StartError) {
let init = fn() { init(pubsub) }
actor.start_spec(actor.Spec(init: init, init_timeout: 10, loop: loop))
}

pub fn child_spec() {
todo
pub fn childspec(caller: process.Subject(Server)) {
supervisor.worker(fn(pubsub) {
use server <- result.try(start(pubsub))
process.send(caller, server)
Ok(server)
})
}

pub fn connect(server: Server, client: pubsub.Client) -> Nil {
pub fn connect(server: Server, client: process.Subject(event.Event)) -> Nil {
actor.send(server, Connect(client))
}

pub fn send(server: Server, user: String, message: String) -> Nil {
actor.send(server, Send(user, message))
}

pub opaque type Message {
Connect(client: process.Subject(event.Event))
Send(user: String, message: String)
}

type State {
State(pubsub: pubsub.PubSub, chat: List(event.Event))
}

fn init(pubsub: pubsub.PubSub) {
let state = State(pubsub: pubsub, chat: [])
actor.Ready(state, process.new_selector())
Expand All @@ -45,8 +52,7 @@ fn loop(message: Message, state: State) {
}

Send(user, message) -> {
let id = next_id(state.chat)
let event = pubsub.Event(id, user, message)
let event = event.next(state.chat, user, message)
let chat = [event, ..state.chat]

pubsub.publish(state.pubsub, event)
Expand All @@ -55,10 +61,3 @@ fn loop(message: Message, state: State) {
}
}
}

fn next_id(chat: List(pubsub.Event)) -> Int {
case chat {
[] -> 1
[record, ..] -> record.id + 1
}
}
18 changes: 18 additions & 0 deletions test/chat/supervisor.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import chat/pubsub
import chat/server
import gleam/erlang/process
import gleam/otp/actor
import gleam/otp/supervisor

pub type Supervisor =
process.Subject(supervisor.Message)

pub fn start(
caller: process.Subject(server.Server),
) -> Result(Supervisor, actor.StartError) {
supervisor.start(fn(children) {
children
|> supervisor.add(pubsub.childspec())
|> supervisor.add(server.childspec(caller))
})
}
58 changes: 35 additions & 23 deletions test/chip/chat_test.gleam
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import chat/pubsub
import chat/event
import chat/server
import chat/supervisor as chat_supervisor
import gleam/erlang/process
import gleam/list
import gleam/otp/task

pub fn chat_test() {
// Good Example to start the supervision example maybe is to do it without.
// Another good example is order dependency vs inheritance.
let assert Ok(pubsub) = pubsub.start()
let assert Ok(server) = server.start(pubsub)
// Start the chat's supervision tree and retrieve the server.
let caller: process.Subject(server.Server) = process.new_subject()
let assert Ok(_supervisor) = chat_supervisor.start(caller)
let assert Ok(server) = process.receive(caller, 100)

let self: pubsub.Client = process.new_subject()
// For this scenario, out of simplicity, the client is the current process.
let client: Client = process.new_subject()

server.connect(server, self)
// Connect the client so it can receive new messages from the server.
server.connect(server, client)

server.send(server, "luis", "Hola Juan")
server.send(server, "juan", "Hola Luis, como vas?")
server.send(server, "luis", "Bien! Estas recibiendo mensajes")
task.async(fn() {
// Send messages from another Subject.
server.send(server, "luis", "Hola Juan.")
server.send(server, "juan", "Hola Luis, como vas?")
server.send(server, "luis", "Bien! Recibiendo mensajes.")
})

// Client should have received the messages
let assert [
"luis: Hola Juan",
"luis: Hola Juan.",
"juan: Hola Luis, como vas?",
"luis: Bien! Estas recibiendo mensajes",
] = wait_for_messages(self, [])
"luis: Bien! Recibiendo mensajes.",
] = wait_for_messages(client, [])
}

// Client helpers

fn wait_for_messages(
subject: process.Subject(pubsub.Event),
messages: List(String),
) -> List(String) {
case process.receive(subject, 100) {
Ok(event) ->
event
|> build_message()
type Client =
process.Subject(event.Event)

fn wait_for_messages(client: Client, messages: List(String)) -> List(String) {
let selector =
process.new_selector()
|> process.selecting(client, build_message)

case process.select(selector, 100) {
Ok(message) ->
message
|> list.prepend(messages, _)
|> wait_for_messages(subject, _)
|> wait_for_messages(client, _)

Error(Nil) ->
messages
|> list.reverse()
}
}

fn build_message(event: pubsub.Event) -> String {
fn build_message(event: event.Event) -> String {
event.user <> ": " <> event.message
}

0 comments on commit 83bbae2

Please sign in to comment.