From 4806f3e85a1e9f29819ad8e3a688ff093f9b6868 Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Fri, 1 Jan 2016 23:56:13 +0100 Subject: [PATCH] mercury: Register subscription for all channel aliases When subscribing to a channel, we may actually end up subscribed to other alias channels. We must track these as well in order to redirect received messages properly. --- protocol/build.rs | 1 + protocol/src/lib.rs | 1 + src/mercury.rs | 59 +++++++++++++++++++++++++++++++-------------- src/spirc.rs | 2 +- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/protocol/build.rs b/protocol/build.rs index 2db7ff19..3b5dfc8d 100644 --- a/protocol/build.rs +++ b/protocol/build.rs @@ -43,6 +43,7 @@ fn main() { &proto.join("authentication.proto"), &proto.join("mercury.proto"), &proto.join("metadata.proto"), + &proto.join("pubsub.proto"), &proto.join("spirc.proto"), ]).unwrap(); } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 7fdd65dd..98b43b3e 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -7,5 +7,6 @@ mod_path! keyexchange (concat!(env!("OUT_DIR"), "/keyexchange.rs")); mod_path! authentication (concat!(env!("OUT_DIR"), "/authentication.rs")); mod_path! mercury (concat!(env!("OUT_DIR"), "/mercury.rs")); mod_path! metadata (concat!(env!("OUT_DIR"), "/metadata.rs")); +mod_path! pubsub (concat!(env!("OUT_DIR"), "/pubsub.rs")); mod_path! spirc (concat!(env!("OUT_DIR"), "/spirc.rs")); diff --git a/src/mercury.rs b/src/mercury.rs index 98176ac6..d317b2ea 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -1,5 +1,5 @@ use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; -use eventual::{self, Async}; +use eventual; use protobuf::{self, Message}; use std::collections::HashMap; use std::io::{Cursor, Read, Write}; @@ -9,7 +9,6 @@ use std::sync::mpsc; use librespot_protocol as protocol; use session::Session; use connection::PacketHandler; -use util::IgnoreExt; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -32,10 +31,16 @@ pub struct MercuryResponse { pub payload: Vec> } +enum MercuryCallback { + Future(eventual::Complete), + Subscription(mpsc::Sender), + Channel, +} + pub struct MercuryPending { parts: Vec>, partial: Option>, - callback: Option> + callback: MercuryCallback } pub struct MercuryManager { @@ -64,9 +69,10 @@ impl MercuryManager { } } - pub fn request(&mut self, session: &Session, req: MercuryRequest) - -> eventual::Future { - + fn request_with_callback(&mut self, + session: &Session, + req: MercuryRequest, + cb: MercuryCallback) { let mut seq = [0u8; 4]; BigEndian::write_u32(&mut seq, self.next_seq); self.next_seq += 1; @@ -80,27 +86,30 @@ impl MercuryManager { session.send_packet(cmd, &data).unwrap(); - let (tx, rx) = eventual::Future::pair(); self.pending.insert(seq.to_vec(), MercuryPending{ parts: Vec::new(), partial: None, - callback: Some(tx), + callback: cb, }); + } + pub fn request(&mut self, session: &Session, req: MercuryRequest) + -> eventual::Future { + let (tx, rx) = eventual::Future::pair(); + self.request_with_callback(session, req, MercuryCallback::Future(tx)); rx } pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(); - self.subscriptions.insert(uri.clone(), tx); - self.request(session, MercuryRequest{ + self.request_with_callback(session, MercuryRequest{ method: MercuryMethod::SUB, uri: uri, content_type: None, payload: Vec::new() - }).fire(); + }, MercuryCallback::Subscription(tx)); rx } @@ -113,7 +122,17 @@ impl MercuryManager { buffer } - fn complete_request(&mut self, cmd: u8, mut pending: MercuryPending) { + fn complete_subscription(&mut self, + response: MercuryResponse, + tx: mpsc::Sender) { + for sub_data in response.payload { + if let Ok(mut sub) = protobuf::parse_from_bytes::(&sub_data) { + self.subscriptions.insert(sub.take_uri(), tx.clone()); + } + } + } + + fn complete_request(&mut self, mut pending: MercuryPending) { let header_data = pending.parts.remove(0); let header : protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); @@ -123,10 +142,14 @@ impl MercuryManager { payload: pending.parts }; - if cmd == 0xb5 { - self.subscriptions.get(header.get_uri()).map(|ch| ch.send(response).ignore()); - } else { - pending.callback.map(|cb| cb.complete(response)); + match pending.callback { + MercuryCallback::Future(tx) => tx.complete(response), + MercuryCallback::Subscription(tx) => self.complete_subscription(response, tx), + MercuryCallback::Channel => { + self.subscriptions + .get(header.get_uri()).unwrap() + .send(response).unwrap() + } } } @@ -176,7 +199,7 @@ impl PacketHandler for MercuryManager { MercuryPending { parts: Vec::new(), partial: None, - callback: None, + callback: MercuryCallback::Channel, } } else { println!("Ignore seq {:?} cmd {}", seq, cmd); @@ -198,7 +221,7 @@ impl PacketHandler for MercuryManager { } if flags == 0x1 { - self.complete_request(cmd, pending); + self.complete_request(pending); } else { self.pending.insert(seq, pending); } diff --git a/src/spirc.rs b/src/spirc.rs index 6fa86b43..29c7790c 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -89,7 +89,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> { } pub fn run(&mut self) { - let rx = self.session.mercury_sub(format!("hm://remote/3/user/{}/", + let rx = self.session.mercury_sub(format!("hm://remote/user/{}/", self.session.0.data.read().unwrap().canonical_username.clone())); let updates = self.delegate.updates();