Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial draft: rabbitmq & async #68

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft

Initial draft: rabbitmq & async #68

wants to merge 2 commits into from

Conversation

Robin5605
Copy link
Contributor

@Robin5605 Robin5605 commented Aug 13, 2023

This rewrite consists of both a migration to an asynchronous model and a migration to the proposed RabbitMQ architecture, because the lapin crate is one of the most mature AMQP client crates for Rust, except it's asynchronous

Here is the gist of the new structure:

  • This task will continuously refresh the access token, this makes it so that each task doesn't have to send a request, check if it's authentication failed, then re-send it. Now, all operations can simply assume the authentication is update to date. This task will refresh the access tokens 10 seconds early just to have some built-in tolerance.

    tokio::spawn({
    let client = Arc::clone(&client);
    async move {
    loop {
    let sleep = client.authentication.read().await.expires_in
    - tokio::time::Duration::from_secs(10);
    tokio::time::sleep(sleep).await;
    client.reauthenticate().await;
    }
    }
    });

  • This task listens on the exclusive queue that will get rule updates. It doesn't really matter what comes over this queue, since it will be discarded. Each client will establish it's own, exclusive queue and bind it to the rule_updates fanout exchange. Mainframe will dispatch a message to all on this queue whenever the rules are updated. Clients will then have to fetch the new rules from /rules, compile the rules, and update their local state.

    tokio::spawn({
    let client = Arc::clone(&client);
    async move {
    loop {
    match client.receive_rule_update().await {
    Ok(_) => client.update_rules().await,
    Err(err) => error!("Error from rule updates queue: {err}"),
    }
    }
    }
    });

  • This loop listens on the jobs queue for incoming jobs, and creates a task for each of them.

    loop {
    trace!("Waiting for message");
    match client.receive_delivery().await {
    Ok(delivery) => {
    trace!("Message received");
    let client = Arc::clone(&client);
    tokio::task::spawn(async move {
    if let Err(err) = handle_delivery(Arc::clone(&client), &delivery).await {
    error!("Error while scanning: {err}");
    match delivery.reject(BasicRejectOptions { requeue: false }).await {
    Ok(()) => warn!("Rejected message"),
    Err(err) => error!("Error while rejecting: {err}"),
    }
    }
    });
    }
    Err(err) => error!("Error while consuming: {err}"),
    }
    }

Copy link
Contributor

@AbooMinister25 AbooMinister25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks mostly good - I left a couple of suggestions, but nothing that's a real problem. I am yet to run this, though.

src/main.rs Show resolved Hide resolved
src/main.rs Outdated Show resolved Hide resolved
@Robin5605
Copy link
Contributor Author

TODO:

  • Deduplication
  • Authentication (with Keycloak)

@shenanigansd shenanigansd marked this pull request as draft May 30, 2024 01:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants