diff --git a/src/lib.rs b/src/lib.rs index bd06952..3c116b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ mod client; mod data; mod error; mod message; +mod queue; mod router; mod session; @@ -12,7 +13,7 @@ use session::{Session, SessionData, SessionMsg}; use std::{ collections::HashMap, ops::Deref, - sync::mpsc::{channel, Sender}, + sync::mpsc::{channel, Receiver, Sender}, }; use uuid::Uuid; @@ -105,12 +106,14 @@ mod fields { } } +#[derive(Clone)] struct Request { id: Option, + tx: Sender, } impl Request { - fn new(mut id: Option) -> Self + fn new(id: Option) -> (Self, Receiver) where F: Into, { @@ -121,7 +124,22 @@ impl Request { } None => result = None, } - Self { id: result } + let (tx, rx) = channel(); + (Self { id: result, tx: tx }, rx) + } + + fn get_session(&self) -> &Option { + return &self.id; + } +} + +#[cfg(test)] +mod create_request { + use super::*; + + pub fn empty_request() -> (Request, Receiver) { + let id: Option = None; + Request::new(id) } } @@ -132,14 +150,14 @@ mod requests { #[test] fn create_request_no_id() { let input: Option = None; - let req = Request::new(input); + let (req, _) = Request::new(input); assert!(req.id.is_none()); } #[test] fn create_request_with_uuid() { let id = Uuid::new_v4(); - let req = Request::new(Some(id)); + let (req, _) = Request::new(Some(id)); match req.id { Some(field) => match (field) { Field::Uuid(data) => assert_eq!(data, id), @@ -148,6 +166,21 @@ mod requests { None => unreachable!("Should producer data"), } } + + #[test] + fn return_session() { + let id = Uuid::new_v4(); + let (req, _) = Request::new(Some(id)); + match req.get_session() { + Some(result) => { + match result { + Field::Uuid(data) => assert_eq!(data, &id), + _ => unreachable!("should have returned a uuid field"), + }; + } + None => unreachable!("should have returned a uuid"), + } + } } struct Record { diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..3416105 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,110 @@ +use crate::Request; +use std::{ + sync::mpsc::{channel, Receiver, Sender}, + thread::spawn, +}; + +#[derive(Clone)] +enum Message { + Register(Sender), + Req(Request), +} + +impl From for Message { + fn from(value: Request) -> Self { + Message::Req(value) + } +} + +impl From> for Message { + fn from(value: Sender) -> Self { + Message::Register(value) + } +} + +#[cfg(test)] +mod messages { + use super::*; + use crate::create_request::empty_request; + + #[test] + fn from_request() { + let (req, _) = empty_request(); + match req.into() { + Message::Req(result) => assert!(result.get_session().is_none()), + _ => unreachable!("should have been s request"), + } + } + + #[test] + fn from_sender() { + let (tx, _) = channel(); + match tx.into() { + Message::Register(_) => {} + _ => unreachable!("should have been a register"), + } + } +} + +struct Queue { + channels: Vec>, + rx: Receiver, +} + +impl Queue { + fn new(rx: Receiver) -> Self { + Self { + channels: Vec::new(), + rx: rx, + } + } + + fn listen(&mut self) { + loop { + let msg = self.rx.recv().unwrap(); + match msg { + Message::Register(tx) => self.channels.push(tx), + _ => { + for tx in self.channels.iter() { + tx.send(msg.clone()).unwrap(); + } + } + } + } + } + + fn start() -> Sender { + let (tx, rx) = channel(); + spawn(move || { + let mut queue = Queue::new(rx); + queue.listen(); + }); + tx + } +} + +#[cfg(test)] +mod queues { + use super::*; + use crate::create_request::empty_request; + + #[test] + fn create_queue() { + let mut channels = Vec::new(); + for _ in 0..5 { + channels.push(channel()); + } + let mut queue_tx = Queue::start(); + for (tx, _) in channels.iter() { + queue_tx.send(tx.clone().into()).unwrap(); + } + let (req, _) = empty_request(); + queue_tx.send(req.into()).unwrap(); + for (_, rx) in channels.iter() { + match rx.recv().unwrap() { + Message::Req(_) => {} + _ => unreachable!("should have been a request"), + } + } + } +}