use crate::queue::{Message, MsgType, Queue}; use std::{ collections::HashMap, sync::{ mpsc::{channel, Receiver, Sender}, Arc, Mutex, }, thread::spawn, }; use uuid::Uuid; const RESPONS_TO: [MsgType; 3] = [MsgType::Document, MsgType::Error, MsgType::SessionValidated]; #[derive(Clone)] pub struct ClientChannel { queue: Queue, registry: Arc>>>, } impl ClientChannel { fn new(queue: Queue) -> Self { Self { queue: queue, registry: Arc::new(Mutex::new(HashMap::new())), } } pub fn send(&self, mut msg: Message) -> Receiver { let mut reg = self.registry.lock().unwrap(); while reg.contains_key(&msg.get_id()) { msg.reset_id(); } let (tx, rx) = channel(); reg.insert(msg.get_id(), tx); self.queue.send(msg).unwrap(); rx } fn reply(&self, msg: Message) { let mut reg = self.registry.lock().unwrap(); match reg.remove(&msg.get_id()) { Some(tx) => tx.send(msg).unwrap(), None => {} } } } #[cfg(test)] mod client_channels { use super::*; use std::time::Duration; static TIMEOUT: Duration = Duration::from_millis(500); #[test] fn fowards_message() { let msg_type = MsgType::Document; let reply_type = MsgType::Time; let queue = Queue::new(); let (tx, rx) = channel(); queue.add(tx, [msg_type.clone()].to_vec()); let chan = ClientChannel::new(queue); let msg = Message::new(msg_type.clone()); let client_rx = chan.send(msg.clone()); let reply = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(reply.get_id(), msg.get_id()); assert_eq!(reply.get_msg_type().clone(), msg_type); let client_reply = reply.reply(MsgType::Time); chan.reply(client_reply); let client_msg = client_rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(client_msg.get_id(), msg.get_id()); assert_eq!(client_msg.get_msg_type().clone(), reply_type); } #[test] fn no_duplicate_ids() { let (tx, rx) = channel(); let queue = Queue::new(); queue.add(tx, [MsgType::Time].to_vec()); let chan = ClientChannel::new(queue); let msg1 = Message::new(MsgType::Time); let msg2 = msg1.reply(MsgType::Time); let rx1 = chan.send(msg1); let rx2 = chan.send(msg2); let queue1 = rx.recv_timeout(TIMEOUT).unwrap(); let queue2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_ne!(queue1.get_id(), queue2.get_id()); chan.reply(queue1.reply(MsgType::Document)); chan.reply(queue2.reply(MsgType::Document)); let reply1 = rx1.recv_timeout(TIMEOUT).unwrap(); let reply2 = rx2.recv_timeout(TIMEOUT).unwrap(); assert_eq!(reply1.get_id(), queue1.get_id()); assert_eq!(reply2.get_id(), queue2.get_id()); } #[test] fn ignore_unrequested() { let queue = Queue::new(); let chan = ClientChannel::new(queue); chan.reply(Message::new(MsgType::Document)); } } pub struct Client { channel: ClientChannel, rx: Receiver, } impl Client { fn new(chan: ClientChannel, rx: Receiver) -> Self { Self { channel: chan, rx: rx, } } pub fn start(queue: Queue) -> ClientChannel { let (tx, rx) = channel(); queue.add(tx.clone(), RESPONS_TO.to_vec()); let chan = ClientChannel::new(queue.clone()); let client = Client::new(chan.clone(), rx); spawn(move || { client.listen(); }); chan } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); self.channel.reply(msg); } } } #[cfg(test)] mod clients { use super::*; use crate::session::sessions::create_validated_reply; use std::time::Duration; static TIMEOUT: Duration = Duration::from_millis(500); #[test] fn session_validated() { let queue = Queue::new(); let (queue_tx, queue_rx) = channel(); queue.add(queue_tx, [MsgType::SessionValidate].to_vec()); let chan = Client::start(queue.clone()); let chan_rx = chan.send(Message::new(MsgType::SessionValidate)); let msg = queue_rx.recv_timeout(TIMEOUT).unwrap(); let expected = create_validated_reply(msg); queue.send(expected.clone()).unwrap(); let result = chan_rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_id(), expected.get_id()); assert_eq!(result.get_msg_type(), expected.get_msg_type()); } #[test] fn document_return() { let queue = Queue::new(); let (queue_tx, queue_rx) = channel(); queue.add(queue_tx, [MsgType::DocumentRequest].to_vec()); let chan = Client::start(queue.clone()); let chan_rx = chan.send(Message::new(MsgType::DocumentRequest)); let msg = queue_rx.recv_timeout(TIMEOUT).unwrap(); let expected = msg.reply(MsgType::Document); queue.send(expected.clone()).unwrap(); let result = chan_rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_id(), expected.get_id()); assert_eq!(result.get_msg_type(), expected.get_msg_type()); } }