Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

V2 #57

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft

V2 #57

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Rust

on:
push:
branches: [master]
branches: ['*']
pull_request:
branches: [master]

Expand Down
38 changes: 4 additions & 34 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
[package]
name = "loco"
version = "0.1.0"
authors = [
"Seungjae Park <[email protected]>",
"JellyBrick <[email protected]>",
"kiwiyou <[email protected]>",
"Sangwon Ryu <[email protected]>",
"nnnlog <[email protected]>"
[workspace]
members = [
"loco",
"loco-client",
]
edition = "2018"
description = "Loco Protocol Wrapper for Rust"
documentation = "https://docs.rs/loco"
readme = "README.md"
repository = "https://github.com/organization/loco.rs"
license = "Apache-2.0"

[dependencies]
bson = "1.0"
hex-literal = "0.3.0"
reqwest = { version = "0.11", features = ["blocking"] }
sha2 = "0.9.0"
hex = "0.4.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_repr = "0.1"
data-encoding = "2.2.0"
tokio = { version = "1.0.1", features = ["macros"] }
tokio-util = { version = "0.6.0", features = ["codec"], default-features = false }
bytes = "1.0.0"
futures = "0.3.5"
log = "0.4.8"
loco-derive = { path = "loco-derive" }
webpki = "0.22"
bincode = "1.3"
11 changes: 11 additions & 0 deletions loco-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "loco-client"
version = "0.1.0"
edition = "2018"

[dependencies]
futures = "0.3.14"
loco = { path = "../loco" }
tokio = "1.5.0"
tokio-native-tls = "0.3.0"
tokio-util = { version = "0.6.6", features = ["codec"] }
45 changes: 45 additions & 0 deletions loco-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use loco::types::response::DataStatus;

pub mod reactor;

#[derive(Debug)]
pub enum Error {
Loco(Box<loco::Error>),
Io(std::io::Error),
Tls(tokio_native_tls::native_tls::Error),
ReactorFail,
RequestFail { status: DataStatus, method: String },
PacketIdConflict(i32),
}

pub type Result<T> = std::result::Result<T, Error>;

impl From<loco::Error> for Error {
fn from(e: loco::Error) -> Self {
Self::Loco(Box::new(e))
}
}

impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Self::Io(error)
}
}

impl From<tokio_native_tls::native_tls::Error> for Error {
fn from(e: tokio_native_tls::native_tls::Error) -> Self {
Self::Tls(e)
}
}

impl From<futures::channel::oneshot::Canceled> for Error {
fn from(_: futures::channel::oneshot::Canceled) -> Self {
Self::ReactorFail
}
}

impl From<futures::channel::mpsc::SendError> for Error {
fn from(_: futures::channel::mpsc::SendError) -> Self {
Self::ReactorFail
}
}
141 changes: 141 additions & 0 deletions loco-client/src/reactor/entrance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use futures::{SinkExt, StreamExt};
use loco::{
codec::{EncryptType, KeyEncryptType, LocoClientCodec, LocoPacket, LocoSecureClientCodec},
config::{BookingConfig, CheckinConfig},
crypto::LocoCrypto,
types::{
request,
response::{self, LocoResponse, ResponseKind},
UserId,
},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
use tokio_util::codec::{Decoder, Framed};

use crate::{Error, Result};

pub type LocoSecureClientStream<S> = Framed<S, LocoSecureClientCodec<LocoCrypto>>;
pub type LocoClientStream<S> = Framed<S, LocoClientCodec>;

async fn create_tls_stream(
host: &str,
port: u16,
// keep_alive: bool,
) -> Result<TlsStream<TcpStream>> {
let connector = tokio_native_tls::native_tls::TlsConnector::new().unwrap();
let connector = tokio_native_tls::TlsConnector::from(connector);
let stream = TcpStream::connect((host, port)).await?;
let tls_stream = connector.connect(host, stream).await?;
Ok(tls_stream)
}

pub async fn get_config(config: &impl BookingConfig) -> Result<response::GetConf> {
let (host, port) = config.booking_host();
let stream = create_tls_stream(host, port).await?;
let mut framed = LocoClientCodec::default().framed(stream);
get_booking_data(&mut framed, config).await
}

pub async fn get_checkin(config: &impl CheckinConfig) -> Result<response::Checkin> {
let crypto = config.new_crypto();
let try_checkin = match get_config(config).await {
Ok(response) => {
let host = response.ticket_hosts.lsl[0].as_str();
let port = response.config_wifi.ports[0] as u16;
let stream = TcpStream::connect((host, port)).await?;
let framed = LocoSecureClientCodec::new(crypto.clone())
.wrap(
stream,
KeyEncryptType::RsaOaepSha1Mgf1Sha1,
EncryptType::AesCfb128,
&config.public_key(),
)
.await?;
Ok(framed)
}
Err(e) => Err(e),
};
let mut stream = match try_checkin {
Ok(stream) => stream,
Err(_) => {
let (host, port) = config.checkin_fallback_host();
let stream = TcpStream::connect((host, port)).await?;
let framed = LocoSecureClientCodec::new(crypto)
.wrap(
stream,
KeyEncryptType::RsaOaepSha1Mgf1Sha1,
EncryptType::AesCfb128,
&config.public_key(),
)
.await?;
framed
}
};

get_checkin_data(&mut stream, config, None).await
}

async fn get_booking_data<S>(
stream: &mut LocoClientStream<S>,
config: &impl BookingConfig,
) -> Result<response::GetConf>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let request = request::GetConf::from_config(config);
let packet = LocoPacket::from_request(0, request);
stream.send(packet).await?;
while let Some(packet) = stream.next().await {
let packet = packet?;
if packet.id == 0 {
match packet.payload {
LocoResponse::Success { status: _, kind } => {
if let ResponseKind::GetConf(get_conf) = *kind {
return Ok(get_conf);
}
}
LocoResponse::Fail { status } => {
return Err(Error::RequestFail {
status,
method: packet.method,
})
}
}
}
}
Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "No server response").into())
}

async fn get_checkin_data<S>(
stream: &mut LocoSecureClientStream<S>,
config: &impl CheckinConfig,
user_id: Option<UserId>,
) -> Result<response::Checkin>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let request = request::Checkin::from_config(config).with_user(user_id);
let packet = LocoPacket::from_request(0, request);
stream.send(packet).await?;
while let Some(packet) = stream.next().await {
let packet = packet?;
if packet.id == 0 {
match packet.payload {
LocoResponse::Success { status: _, kind } => {
if let ResponseKind::Checkin(checkin) = *kind {
return Ok(checkin);
}
}
LocoResponse::Fail { status } => {
return Err(Error::RequestFail {
status,
method: packet.method,
})
}
}
}
}
Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "No server response").into())
}
146 changes: 146 additions & 0 deletions loco-client/src/reactor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use std::collections::HashMap;

use futures::{
channel::{mpsc, oneshot},
future::Either,
SinkExt, Stream, StreamExt,
};
use loco::{
codec::{self, EncryptType, KeyEncryptType, LocoPacket, LocoSecureDecoder, LocoSecureEncoder},
config::ClientConfig,
types::{
request::LocoRequest,
response::{LocoResponse, ResponseKind},
},
};
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead, FramedWrite};

use crate::{Error, Result};

mod entrance;

#[derive(Clone)]
pub struct Sender {
tx: mpsc::Sender<(LocoRequest, Option<oneshot::Sender<LocoResponse>>)>,
}

impl Sender {
pub async fn spawn(&mut self, request: impl Into<LocoRequest>) -> Result<()> {
self.tx.send((request.into(), None)).await?;
Ok(())
}

pub async fn send(&mut self, request: impl Into<LocoRequest>) -> Result<ResponseKind> {
let (res_tx, res_rx) = oneshot::channel();
let request: LocoRequest = request.into();
let method = request.to_string();
self.tx.send((request, Some(res_tx))).await?;
let response = res_rx.await?;
match response {
LocoResponse::Success { kind, .. } => Ok(*kind),
LocoResponse::Fail { status } => Err(Error::RequestFail { status, method }),
}
}
}

pub struct Reactor<Config> {
config: Config,
sender: Sender,
receiver: mpsc::Receiver<(LocoRequest, Option<oneshot::Sender<LocoResponse>>)>,
packet_tx: Option<mpsc::Sender<(Sender, LocoPacket<LocoResponse>)>>,
}

impl<Config> Reactor<Config> {
pub fn new(config: Config) -> Self {
let (sender, receiver) = mpsc::channel(16);
Self {
config,
sender: Sender { tx: sender },
receiver,
packet_tx: None,
}
}

pub fn packets(&mut self) -> impl Stream<Item = (Sender, LocoPacket<LocoResponse>)> {
let (packet_tx, packet_rx) = mpsc::channel(16);
self.packet_tx = Some(packet_tx);
packet_rx
}

pub fn sender(&self) -> &Sender {
&self.sender
}
}

impl<Config> Reactor<Config>
where
Config: ClientConfig + Send + 'static,
{
pub async fn spawn(self) -> Result<()> {
let checkin = entrance::get_checkin(&self.config).await?;
let mut socket = TcpStream::connect((checkin.host.as_str(), checkin.port as u16)).await?;
let crypto = self.config.new_crypto();
codec::send_handshake(
&mut socket,
&crypto,
KeyEncryptType::RsaOaepSha1Mgf1Sha1,
EncryptType::AesCfb128,
&self.config.public_key(),
)
.await?;
let (rx, tx) = socket.into_split();
let reader = FramedRead::new(rx, LocoSecureDecoder::new(crypto.clone()));
let mut writer = FramedWrite::new(tx, LocoSecureEncoder::new(crypto));
let (mut req_tx, req_rx) = mpsc::channel(16);

let mut packet_tx = self.packet_tx;
let sender = self.sender;
let read_task = async move {
let mut notifier_registry =
HashMap::<i32, Option<oneshot::Sender<LocoResponse>>>::new();

let reader_stream = reader.map(Either::Left);
let register_stream = req_rx.map(Either::Right);

let mut stream = futures::stream::select(reader_stream, register_stream);
while let Some(input) = stream.next().await {
match input {
Either::Left(Ok(packet)) => {
if let Some(Some(notifier)) = notifier_registry.remove(&packet.id) {
notifier
.send(packet.payload)
.expect("Response notification failed");
continue;
}
if let Some(tx) = &mut packet_tx {
tx.send((sender.clone(), packet)).await?;
}
}
Either::Left(Err(e)) => return Err(e.into()),
Either::Right((id, notifier)) => {
notifier_registry.insert(id, notifier);
}
}
}

Ok(())
};

let mut receiver = self.receiver;
let write_task = async move {
let mut packet_id = 0;
while let Some((request, maybe_notifier)) = receiver.next().await {
let packet = LocoPacket::from_request(packet_id, request);
writer.send(packet).await?;
req_tx.send((packet_id, maybe_notifier)).await?;
packet_id += 1;
}

Ok(())
};

let (try_read, try_write) = futures::future::join(read_task, write_task).await;
try_read.and(try_write)
}
}
Loading