From fbd8e81a6fd4f87905ab723038f6cdfec908d84b Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Wed, 9 Apr 2025 08:40:23 -0400 Subject: [PATCH] Made queue send return a result/ --- src/client.rs | 6 ++++-- src/queue.rs | 22 +++++++++++++--------- src/session.rs | 36 +++++++++++++++++------------------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5400f2b..37d122d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -273,7 +273,9 @@ impl Client { MsgType::ClientRequest => { let tx_id = msg.get_data("tx_id").unwrap().to_uuid().unwrap(); self.return_to.insert(msg.get_id(), tx_id); - self.queue.send(msg.reply(MsgType::SessionValidate)); + self.queue + .send(msg.reply(MsgType::SessionValidate)) + .unwrap(); } MsgType::Session => { let rx_id = self.return_to.remove(&msg.get_id()).unwrap(); @@ -314,7 +316,7 @@ mod clients { let sess_id = Uuid::new_v4(); let mut sess_res = sess.reply(MsgType::Session); sess_res.add_data("sess_id", sess_id.clone()); - queue.send(sess_res); + queue.send(sess_res).unwrap(); let reply = reply_rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(reply.get_session(), sess_id); } diff --git a/src/queue.rs b/src/queue.rs index 4115478..c040460 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -181,15 +181,16 @@ impl Queue { } } - pub fn send(&self, msg: Message) { + pub fn send(&self, msg: Message) -> Result<(), String> { let store = self.store.read().unwrap(); match store.get(&msg.get_class()) { Some(senders) => { for sender in senders.into_iter() { sender.send(msg.clone()).unwrap(); } + Ok(()) } - None => {} + None => Err(format!("no listeners for {:?}", msg.get_class())), } } } @@ -211,7 +212,7 @@ mod queues { let (tx2, rx2) = channel(); queue.add(tx1, [MsgType::SessionValidate].to_vec()); queue.add(tx2, [MsgType::SessionValidate].to_vec()); - queue.send(Message::new(MsgType::SessionValidate)); + queue.send(Message::new(MsgType::SessionValidate)).unwrap(); rx1.recv().unwrap(); rx2.recv().unwrap(); } @@ -223,7 +224,7 @@ mod queues { let (tx2, rx2) = channel(); queue.add(tx1, [MsgType::SessionValidate].to_vec()); queue.add(tx2, [MsgType::Session].to_vec()); - queue.send(Message::new(MsgType::SessionValidate)); + queue.send(Message::new(MsgType::SessionValidate)).unwrap(); let result = rx1.recv().unwrap(); match result.get_class() { MsgType::SessionValidate => {} @@ -239,7 +240,7 @@ mod queues { _ => unreachable!("{:?}", err), }, } - queue.send(Message::new(MsgType::Session)); + queue.send(Message::new(MsgType::Session)).unwrap(); let result = rx2.recv().unwrap(); match result.get_class() { MsgType::Session => {} @@ -262,17 +263,20 @@ mod queues { let queue = Queue::new(); let (tx, rx) = channel(); queue.add(tx, [MsgType::Session, MsgType::SessionValidate].to_vec()); - queue.send(Message::new(MsgType::SessionValidate)); + queue.send(Message::new(MsgType::SessionValidate)).unwrap(); let msg = rx.recv().unwrap(); assert_eq!(msg.get_class(), &MsgType::SessionValidate); - queue.send(Message::new(MsgType::Session)); + queue.send(Message::new(MsgType::Session)).unwrap(); let msg = rx.recv().unwrap(); assert_eq!(msg.get_class(), &MsgType::Session); } #[test] - fn unassigned_message_should_not_panic() { + fn unassigned_message_should_return_error() { let queue = Queue::new(); - queue.send(Message::new(MsgType::Session)); + match queue.send(Message::new(MsgType::Session)) { + Ok(_) => unreachable!("should return error"), + Err(_) => {} + } } } diff --git a/src/session.rs b/src/session.rs index b429a5f..cc267d4 100644 --- a/src/session.rs +++ b/src/session.rs @@ -46,19 +46,17 @@ impl Session { fn validate(&mut self, msg: Message) { match msg.get_data("sess_id") { - Some(sid) => { - match sid { - Field::Uuid(sess_id) => { - if self.data.contains_key(&sess_id) { - let mut reply = msg.reply(MsgType::Session); - reply.add_data("sess_id", sess_id.clone()); - self.queue.send(reply); - } else { - self.new_session(msg); - } - }, - _ => self.new_session(msg), + Some(sid) => match sid { + Field::Uuid(sess_id) => { + if self.data.contains_key(&sess_id) { + let mut reply = msg.reply(MsgType::Session); + reply.add_data("sess_id", sess_id.clone()); + self.queue.send(reply).unwrap(); + } else { + self.new_session(msg); + } } + _ => self.new_session(msg), }, None => self.new_session(msg), } @@ -72,7 +70,7 @@ impl Session { self.data.insert(id.clone(), SessionData {}); let mut reply = msg.reply(MsgType::Session); reply.add_data("sess_id", id); - self.queue.send(reply); + self.queue.send(reply).unwrap(); } } @@ -97,7 +95,7 @@ mod sessions { let listen_for = [MsgType::Session]; let (queue, rx) = setup_session(listen_for.to_vec()); let msg = Message::new(MsgType::SessionValidate); - queue.send(msg.clone()); + queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_class() { MsgType::Session => {} @@ -116,7 +114,7 @@ mod sessions { let msg = Message::new(MsgType::SessionValidate); let mut ids: Vec = Vec::new(); for _ in 0..10 { - queue.send(msg.clone()); + queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let id = result.get_data("sess_id").unwrap().to_uuid().unwrap(); assert!(!ids.contains(&id), "{} is a duplicate id", id); @@ -129,11 +127,11 @@ mod sessions { let listen_for = [MsgType::Session]; let (queue, rx) = setup_session(listen_for.to_vec()); let mut msg = Message::new(MsgType::SessionValidate); - queue.send(msg.clone()); + queue.send(msg.clone()).unwrap(); let holder = rx.recv_timeout(TIMEOUT).unwrap(); let id = holder.get_data("sess_id").unwrap().to_uuid().unwrap(); msg.add_data("sess_id", id.clone()); - queue.send(msg); + queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let output = result.get_data("sess_id").unwrap().to_uuid().unwrap(); assert_eq!(output, id); @@ -146,7 +144,7 @@ mod sessions { let (queue, rx) = setup_session(listen_for.to_vec()); let mut msg = Message::new(MsgType::SessionValidate); msg.add_data("sess_id", id.clone()); - queue.send(msg); + queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let output = result.get_data("sess_id").unwrap().to_uuid().unwrap(); assert_ne!(output, id); @@ -159,7 +157,7 @@ mod sessions { let (queue, rx) = setup_session(listen_for.to_vec()); let mut msg = Message::new(MsgType::SessionValidate); msg.add_data("sess_id", id); - queue.send(msg); + queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let output = result.get_data("sess_id").unwrap().to_string(); assert_ne!(output, id);