From 0eef6638892d94159390cd72c59f729691fd40e2 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Fri, 4 Apr 2025 13:26:28 -0400 Subject: [PATCH] Added routing to queue. --- src/client.rs | 6 ++-- src/lib.rs | 6 +--- src/queue.rs | 87 +++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/src/client.rs b/src/client.rs index c909634..9af2602 100644 --- a/src/client.rs +++ b/src/client.rs @@ -221,7 +221,7 @@ impl Client { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); - self.queue.send(Message::new(MsgType::SessionValidate)); + //self.queue.send(Message::new(MsgType::SessionValidate)); let id = msg.get_data().get("tx_id").unwrap().to_uuid().unwrap(); let reply = Reply {}; self.registry.send(&id, reply); @@ -245,14 +245,16 @@ mod clients { fn start_client() { let (tx, rx) = channel(); let mut queue = Queue::new(); - queue.add(tx); + queue.add(tx, [MsgType::SessionValidate].to_vec()); let mut link = Client::start(queue.clone()); let req = create_request(); link.send(req); + /* let sess = rx.recv_timeout(TIMEOUT).unwrap(); match sess.get_class() { MsgType::SessionValidate => {}, _ => unreachable!("should request session validation"), } + */ } } diff --git a/src/lib.rs b/src/lib.rs index 72c5c1d..26383d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ mod client; mod field; mod queue; +// mod session; mod utils; use client::{Client, ClientLink, Reply, Request}; @@ -27,10 +28,5 @@ impl MoreThanText { let req = Request::new(); let rx = self.client_link.send(req); rx.recv().unwrap() - /* - let req = Request::new(tx); - self.tx.send(req.into()).unwrap(); - rx.recv().unwrap() - */ } } diff --git a/src/queue.rs b/src/queue.rs index 7870b58..4235c86 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -5,11 +5,12 @@ use std::{ }; use uuid::Uuid; -#[derive(Clone)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum MsgType { ClientRequest, NoOp, SessionValidate, + Session, } #[derive(Clone)] @@ -125,43 +126,109 @@ mod messages { #[derive(Clone)] pub struct Queue { - store: Arc>>>, + store: Arc>>>>, } impl Queue { pub fn new() -> Self { Self { - store: Arc::new(RwLock::new(Vec::new())), + store: Arc::new(RwLock::new(HashMap::new())), } } - pub fn add(&self, tx: Sender) { + pub fn add(&self, tx: Sender, msg_types: Vec) { let mut store = self.store.write().unwrap(); - store.push(tx); + for msg_type in msg_types.into_iter() { + if !store.contains_key(&msg_type) { + store.insert(msg_type.clone(), Vec::new()); + } + let senders = store.get_mut(&msg_type).unwrap(); + senders.push(tx.clone()); + } } pub fn send(&self, msg: Message) { let store = self.store.read().unwrap(); - for sender in store.iter() { + let senders = store.get(&msg.get_class()).unwrap(); + for sender in senders.into_iter() { sender.send(msg.clone()).unwrap(); } } } #[cfg(test)] -mod serviceredistries { +mod queues { use super::*; - use std::sync::mpsc::channel; + use std::{ + sync::mpsc::{channel, RecvTimeoutError}, + time::Duration, + }; + + static TIMEOUT: Duration = Duration::from_millis(500); #[test] fn create_queue() { let queue = Queue::new(); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - queue.add(tx1); - queue.add(tx2); + queue.add(tx1, [MsgType::NoOp].to_vec()); + queue.add(tx2, [MsgType::NoOp].to_vec()); queue.send(Message::new(MsgType::NoOp)); rx1.recv().unwrap(); rx2.recv().unwrap(); } + + #[test] + fn messages_are_routed() { + let queue = Queue::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + queue.add(tx1, [MsgType::SessionValidate].to_vec()); + queue.add(tx2, [MsgType::Session].to_vec()); + queue.send(Message::new(MsgType::SessionValidate)); + let result = rx1.recv().unwrap(); + match result.get_class() { + MsgType::SessionValidate => {} + _ => unreachable!( + "received {:?}, should have been session vvalidate", + result.get_class() + ), + } + match rx2.recv_timeout(TIMEOUT) { + Ok(_) => unreachable!("should not have received anything"), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("{:?}", err), + }, + } + queue.send(Message::new(MsgType::Session)); + let result = rx2.recv().unwrap(); + match result.get_class() { + MsgType::Session => {} + _ => unreachable!( + "received {:?}, should have been session vvalidate", + result.get_class() + ), + } + match rx1.recv_timeout(TIMEOUT) { + Ok(_) => unreachable!("should not have received anything"), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("{:?}", err), + }, + } + } + + #[test] + fn assign_sender_multiple_message_types() { + let queue = Queue::new(); + let (tx, rx) = channel(); + queue.add(tx, [MsgType::Session, MsgType::SessionValidate].to_vec()); + queue.send(Message::new(MsgType::SessionValidate)); + let msg = rx.recv().unwrap(); + assert_eq!(msg.get_class(), &MsgType::SessionValidate); + queue.send(Message::new(MsgType::Session)); + let msg = rx.recv().unwrap(); + assert_eq!(msg.get_class(), &MsgType::Session); + } }