diff --git a/src/client.rs b/src/client.rs index 92e980f..c909634 100644 --- a/src/client.rs +++ b/src/client.rs @@ -194,21 +194,23 @@ mod clientlinks { } pub struct Client { + queue: Queue, registry: ClientRegistry, rx: Receiver, } impl Client { - fn new(rx: Receiver) -> Self { + fn new(rx: Receiver, queue: Queue) -> Self { Self { + queue: queue, registry: ClientRegistry::new(), rx: rx, } } - pub fn start(_queue: Queue) -> ClientLink { + pub fn start(queue: Queue) -> ClientLink { let (tx, rx) = channel(); - let mut client = Client::new(rx); + let mut client = Client::new(rx, queue); let link = ClientLink::new(tx, client.get_registry()); spawn(move || { client.listen(); @@ -219,6 +221,7 @@ impl Client { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); + self.queue.send(Message::new(MsgType::SessionValidate)); let id = msg.get_data().get("tx_id").unwrap().to_uuid().unwrap(); let reply = Reply {}; self.registry.send(&id, reply); @@ -234,12 +237,22 @@ impl Client { mod clients { use super::*; use requests::create_request; + use std::time::Duration; + + static TIMEOUT: Duration = Duration::from_millis(500); #[test] fn start_client() { - let queue = Queue::new(); + let (tx, rx) = channel(); + let mut queue = Queue::new(); + queue.add(tx); let mut link = Client::start(queue.clone()); let req = create_request(); link.send(req); + let sess = rx.recv_timeout(TIMEOUT).unwrap(); + match sess.get_class() { + MsgType::SessionValidate => {}, + _ => unreachable!("should request session validation"), + } } } diff --git a/src/queue.rs b/src/queue.rs index e10c0b9..7870b58 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,10 +1,7 @@ use crate::{client::Request, field::Field}; use std::{ collections::HashMap, - sync::{ - mpsc::Sender, - Arc, RwLock, - }, + sync::{mpsc::Sender, Arc, RwLock}, }; use uuid::Uuid; @@ -12,6 +9,7 @@ use uuid::Uuid; pub enum MsgType { ClientRequest, NoOp, + SessionValidate, } #[derive(Clone)] @@ -137,12 +135,12 @@ impl Queue { } } - fn add(&self, tx: Sender) { + pub fn add(&self, tx: Sender) { let mut store = self.store.write().unwrap(); store.push(tx); } - fn send(&self, msg: Message) { + pub fn send(&self, msg: Message) { let store = self.store.read().unwrap(); for sender in store.iter() { sender.send(msg.clone()).unwrap(); @@ -152,8 +150,8 @@ impl Queue { #[cfg(test)] mod serviceredistries { - use std::sync::mpsc::channel; use super::*; + use std::sync::mpsc::channel; #[test] fn create_queue() {