Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the global AUXILIARY_EVENT_TX and use tracing #80

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/lsp/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl From<VscDocumentConfig> for DocumentConfig {
if var == "tabSize" {
x.tab_size
} else {
crate::log_warn!("Unknown indent alias {var}, using default");
log::warn!("Unknown indent alias {var}, using default");
DavisVaughan marked this conversation as resolved.
Show resolved Hide resolved
2
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/lsp/src/handlers_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::config::DocumentConfig;
use crate::config::VscDiagnosticsConfig;
use crate::config::VscDocumentConfig;
use crate::documents::Document;
use crate::main_loop::AuxiliaryEventSender;
use crate::main_loop::LspState;
use crate::state::workspace_uris;
use crate::state::WorldState;
Expand Down Expand Up @@ -156,6 +157,7 @@ pub(crate) fn did_change(
pub(crate) fn did_close(
params: DidCloseTextDocumentParams,
state: &mut WorldState,
auxiliary_event_tx: &AuxiliaryEventSender,
Comment on lines 157 to +160
Copy link
Collaborator Author

@DavisVaughan DavisVaughan Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any LSP request/notification that needs to spawn a blocking task should add a auxiliary_event_tx argument that is passed through from the top level handle_event()

It can then call things like auxiliary_event_tx.spawn_blocking_task()

) -> anyhow::Result<()> {
let uri = params.text_document.uri;

Expand All @@ -167,7 +169,7 @@ pub(crate) fn did_close(
.remove(&uri)
.ok_or(anyhow!("Failed to remove document for URI: {uri}"))?;

crate::log_info!("did_close(): closed document with URI: '{uri}'.");
auxiliary_event_tx.log_info(format!("did_close(): closed document with URI: '{uri}'."));

Ok(())
}
Expand Down
22 changes: 0 additions & 22 deletions crates/lsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,3 @@ pub mod tower_lsp;

#[cfg(test)]
pub mod tower_lsp_test_client;

// These send LSP messages in a non-async and non-blocking way.
// The LOG level is not timestamped so we're not using it.
macro_rules! log_info {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::INFO, $($arg)+))
}
macro_rules! log_warn {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::WARNING, $($arg)+))
}
macro_rules! log_error {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::ERROR, $($arg)+))
}
macro_rules! _log {
($lvl:expr, $($arg:tt)+) => ({
$crate::main_loop::log($lvl, format!($($arg)+));
});
}

pub(crate) use _log;
pub(crate) use log_error;
pub(crate) use log_info;
pub(crate) use log_warn;
215 changes: 123 additions & 92 deletions crates/lsp/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ use crate::tower_lsp::LspResponse;
pub(crate) type TokioUnboundedSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
pub(crate) type TokioUnboundedReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;

// The global instance of the auxiliary event channel, used for sending log
// messages or spawning threads from free functions. Since this is an unbounded
// channel, sending a log message is not async nor blocking. Tokio senders are
// Send and Sync so this global variable can be safely shared across threads.
static mut AUXILIARY_EVENT_TX: std::cell::OnceCell<TokioUnboundedSender<AuxiliaryEvent>> =
std::cell::OnceCell::new();

// This is the syntax for trait aliases until an official one is stabilised.
// This alias is for the future of a `JoinHandle<anyhow::Result<T>>`
trait AnyhowJoinHandleFut<T>:
Expand Down Expand Up @@ -74,6 +67,82 @@ pub(crate) enum AuxiliaryEvent {
SpawnedTask(JoinHandle<anyhow::Result<Option<AuxiliaryEvent>>>),
}

#[derive(Debug, Clone)]
pub(crate) struct AuxiliaryEventSender {
inner: TokioUnboundedSender<AuxiliaryEvent>,
}

impl AuxiliaryEventSender {
pub(crate) fn new(tx: TokioUnboundedSender<AuxiliaryEvent>) -> Self {
Self { inner: tx }
}

/// Passthrough `send()` method to the underlying sender
pub(crate) fn send(
&self,
message: AuxiliaryEvent,
) -> Result<(), tokio::sync::mpsc::error::SendError<AuxiliaryEvent>> {
self.inner.send(message)
}

pub(crate) fn log_info(&self, message: String) {
self.log(MessageType::INFO, message);
}
pub(crate) fn log_warn(&self, message: String) {
self.log(MessageType::WARNING, message);
}
pub(crate) fn log_error(&self, message: String) {
self.log(MessageType::ERROR, message);
}

/// Send an `AuxiliaryEvent::Log` message in a non-blocking way
fn log(&self, level: lsp_types::MessageType, message: String) {
// We're not connected to an LSP client when running unit tests
if cfg!(test) {
return;
}

// Check that channel is still alive in case the LSP was closed.
// If closed, fallthrough.
if self
.send(AuxiliaryEvent::Log(level, message.clone()))
.is_ok()
{
return;
}

// Log to the kernel as fallback
log::warn!("LSP channel is closed, redirecting messages to Jupyter kernel");

match level {
MessageType::ERROR => log::error!("{message}"),
MessageType::WARNING => log::warn!("{message}"),
_ => log::info!("{message}"),
};
}

/// Spawn a blocking task
///
/// This runs tasks that do semantic analysis on a separate thread pool to avoid
/// blocking the main loop.
///
/// Can optionally return an event for the auxiliary loop (i.e. a log message or
/// diagnostics publication).
pub(crate) fn spawn_blocking_task<Handler>(&self, handler: Handler)
where
Handler: FnOnce() -> anyhow::Result<Option<AuxiliaryEvent>>,
Handler: Send + 'static,
{
let handle = tokio::task::spawn_blocking(handler);

// Send the join handle to the auxiliary loop so it can log any errors
// or panics
if let Err(err) = self.send(AuxiliaryEvent::SpawnedTask(handle)) {
log::warn!("Failed to send task to auxiliary loop due to {err}");
}
}
}

/// Global state for the main loop
///
/// This is a singleton that fully owns the source of truth for `WorldState`
Expand Down Expand Up @@ -101,6 +170,16 @@ pub(crate) struct GlobalState {
/// `Event::Task`.
events_tx: TokioUnboundedSender<Event>,
events_rx: TokioUnboundedReceiver<Event>,

/// State of the internal auxiliary loop.
/// Fully managed by the `GlobalState`.
/// Initialized as `Some()`, and converted to `None` on `start()`.
auxiliary_state: Option<AuxiliaryState>,

/// Event channel for sending to the auxiliary loop.
/// Used for sending latency sensitive events like logs, tasks, and
/// diagnostics.
auxiliary_event_tx: AuxiliaryEventSender,
}

/// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by
Expand Down Expand Up @@ -168,10 +247,14 @@ impl GlobalState {
// tower-lsp backend and the Jupyter kernel.
let (events_tx, events_rx) = tokio_unbounded_channel::<Event>();

let (auxiliary_state, auxiliary_event_tx) = AuxiliaryState::new(client.clone());

Self {
world: WorldState::default(),
lsp_state: LspState::default(),
client,
auxiliary_state: Some(auxiliary_state),
auxiliary_event_tx,
events_tx,
events_rx,
}
Expand All @@ -182,6 +265,11 @@ impl GlobalState {
self.events_tx.clone()
}

/// Get `AuxiliaryEvent` transmission channel
pub(crate) fn auxiliary_event_tx(&self) -> AuxiliaryEventSender {
self.auxiliary_event_tx.clone()
}

/// Start the main and auxiliary loops
///
/// Returns a `JoinSet` that holds onto all tasks and state owned by the
Expand All @@ -202,14 +290,17 @@ impl GlobalState {
async fn main_loop(mut self) {
// Spawn latency-sensitive auxiliary loop. Must be first to initialise
// global transmission channel.
let aux = AuxiliaryState::new(self.client.clone());
let mut set = tokio::task::JoinSet::<()>::new();
set.spawn(async move { aux.start().await });

// Move the `auxiliary_state` owned by the global state to its own thread.
// Unwrap: `start()` should only ever be called once.
let auxiliary_state = self.auxiliary_state.take().unwrap();
set.spawn(async move { auxiliary_state.start().await });

loop {
let event = self.next_event().await;
match self.handle_event(event).await {
Err(err) => crate::log_error!("Failure while handling event:\n{err:?}"),
Err(err) => self.log_error(format!("Failure while handling event:\n{err:?}")),
Ok(LoopControl::Shutdown) => break,
_ => {}
}
Expand Down Expand Up @@ -269,7 +360,7 @@ impl GlobalState {
// Currently ignored
},
LspNotification::DidCloseTextDocument(params) => {
handlers_state::did_close(params, &mut self.world)?;
handlers_state::did_close(params, &mut self.world, &self.auxiliary_event_tx)?;
},
}
},
Expand Down Expand Up @@ -302,7 +393,7 @@ impl GlobalState {

// TODO Make this threshold configurable by the client
if loop_tick.elapsed() > std::time::Duration::from_millis(50) {
crate::log_info!("Handler took {}ms", loop_tick.elapsed().as_millis());
self.log_info(format!("Handler took {}ms", loop_tick.elapsed().as_millis()));
}

Ok(out)
Expand All @@ -319,14 +410,27 @@ impl GlobalState {
/// world state could be run concurrently. On the other hand, handlers that
/// manipulate documents (e.g. formatting or refactoring) should not.
fn spawn_handler<T, Handler>(
&self,
response_tx: TokioUnboundedSender<anyhow::Result<LspResponse>>,
handler: Handler,
into_lsp_response: impl FnOnce(T) -> LspResponse + Send + 'static,
) where
Handler: FnOnce() -> anyhow::Result<T>,
Handler: Send + 'static,
{
spawn_blocking(move || respond(response_tx, handler(), into_lsp_response).and(Ok(None)))
self.auxiliary_event_tx.spawn_blocking_task(move || {
respond(response_tx, handler(), into_lsp_response).and(Ok(None))
});
}

fn log_info(&self, message: String) {
self.auxiliary_event_tx.log_info(message);
}
fn log_warn(&self, message: String) {
self.auxiliary_event_tx.log_warn(message);
}
fn log_error(&self, message: String) {
self.auxiliary_event_tx.log_error(message);
}
}

Expand Down Expand Up @@ -371,23 +475,10 @@ fn respond<T>(
unsafe impl Sync for AuxiliaryState {}

impl AuxiliaryState {
fn new(client: Client) -> Self {
fn new(client: Client) -> (Self, AuxiliaryEventSender) {
// Channels for communication with the auxiliary loop
let (auxiliary_event_tx, auxiliary_event_rx) = tokio_unbounded_channel::<AuxiliaryEvent>();

// Set global instance of this channel. This is used for interacting
// with the auxiliary loop (logging messages or spawning a task) from
// free functions.
unsafe {
#[allow(static_mut_refs)]
if let Some(val) = AUXILIARY_EVENT_TX.get_mut() {
// Reset channel if already set. Happens e.g. on reconnection after a refresh.
*val = auxiliary_event_tx;
} else {
// Set channel for the first time
AUXILIARY_EVENT_TX.set(auxiliary_event_tx).unwrap();
}
}
let auxiliary_event_tx = AuxiliaryEventSender::new(auxiliary_event_tx);

// List of pending tasks for which we manage the lifecycle (mainly relay
// errors and panics)
Expand All @@ -401,11 +492,13 @@ impl AuxiliaryState {
Box::pin(pending) as Pin<Box<dyn AnyhowJoinHandleFut<Option<AuxiliaryEvent>> + Send>>;
tasks.push(pending);

Self {
let state = Self {
client,
auxiliary_event_rx,
tasks,
}
};

(state, auxiliary_event_tx)
}

/// Start the auxiliary loop
Expand Down Expand Up @@ -451,65 +544,3 @@ impl AuxiliaryState {
self.client.log_message(MessageType::ERROR, message).await
}
}

fn auxiliary_tx() -> &'static TokioUnboundedSender<AuxiliaryEvent> {
// If we get here that means the LSP was initialised at least once. The
// channel might be closed if the LSP was dropped, but it should exist.
unsafe {
#[allow(static_mut_refs)]
AUXILIARY_EVENT_TX.get().unwrap()
}
}

fn send_auxiliary(event: AuxiliaryEvent) {
if let Err(err) = auxiliary_tx().send(event) {
// The error includes the event
log::warn!("LSP is shut down, can't send event:\n{err:?}");
}
}

/// Send a message to the LSP client. This is non-blocking and treated on a
/// latency-sensitive task.
pub(crate) fn log(level: lsp_types::MessageType, message: String) {
// We're not connected to an LSP client when running unit tests
if cfg!(test) {
return;
}

// Check that channel is still alive in case the LSP was closed.
// If closed, fallthrough.
if auxiliary_tx()
.send(AuxiliaryEvent::Log(level, message.clone()))
.is_ok()
{
return;
}

// Log to the kernel as fallback
log::warn!("LSP channel is closed, redirecting messages to Jupyter kernel");

match level {
MessageType::ERROR => log::error!("{message}"),
MessageType::WARNING => log::warn!("{message}"),
_ => log::info!("{message}"),
};
}

/// Spawn a blocking task
///
/// This runs tasks that do semantic analysis on a separate thread pool to avoid
/// blocking the main loop.
///
/// Can optionally return an event for the auxiliary loop (i.e. a log message or
/// diagnostics publication).
pub(crate) fn spawn_blocking<Handler>(handler: Handler)
where
Handler: FnOnce() -> anyhow::Result<Option<AuxiliaryEvent>>,
Handler: Send + 'static,
{
let handle = tokio::task::spawn_blocking(handler);

// Send the join handle to the auxiliary loop so it can log any errors
// or panics
send_auxiliary(AuxiliaryEvent::SpawnedTask(handle));
}
Loading
Loading