From f9f64dae5513cfcd7ff0ea7f61ac0bb11481f516 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Mon, 7 Apr 2025 00:41:28 -0400 Subject: [PATCH] linked client to session. --- src/client.rs | 72 +++++++++++++++++++++++++++++----------- src/lib.rs | 2 ++ src/main.rs | 4 +-- src/queue.rs | 30 ++++++++--------- src/session.rs | 10 +++--- test/test_single_boot.py | 1 - 6 files changed, 75 insertions(+), 44 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9af2602..b8aec3e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,8 @@ use std::{ }; use uuid::Uuid; +const RESPONS_TO: [MsgType; 1] = [MsgType::Session]; + pub struct Request; impl Request { @@ -29,11 +31,17 @@ mod requests { } } -pub struct Reply; +pub struct Reply { + sess_id: Uuid, +} impl Reply { - pub fn get_session(&self) -> String { - "id".to_string() + fn new(sess_id: Uuid) -> Self { + Self { sess_id: sess_id } + } + + pub fn get_session(&self) -> Uuid { + self.sess_id.clone() } pub fn get_content(&self) -> String { @@ -46,7 +54,16 @@ mod replies { use super::*; pub fn create_reply() -> Reply { - Reply {} + Reply { + sess_id: Uuid::new_v4(), + } + } + + #[test] + fn create_new_reply() { + let sess_id = Uuid::new_v4(); + let reply = Reply::new(sess_id); + assert_eq!(reply.get_session(), sess_id); } } @@ -83,15 +100,15 @@ impl ClientRegistry { fn send(&mut self, id: &Uuid, msg: Reply) { let mut reg = self.registry.lock().unwrap(); - let tx = reg.get(id).unwrap(); + let tx = reg.remove(id).unwrap(); tx.send(msg).unwrap(); - reg.remove(id).unwrap(); } } #[cfg(test)] mod clientregistries { use super::*; + use crate::client::replies::create_reply; use std::{ sync::mpsc::{channel, Receiver}, time::Duration, @@ -118,7 +135,7 @@ mod clientregistries { } assert_eq!(rxs.len(), count, "should have been {} receivers", count); for (id, rx) in rxs.iter() { - let msg = Reply {}; + let msg = create_reply(); reg.send(id, msg); rx.recv_timeout(TIMEOUT).unwrap(); } @@ -129,7 +146,7 @@ mod clientregistries { #[test] fn prevent_duplicates() { let mut reg = ClientRegistry::new(); - let (tx, rx) = channel::(); + let (tx, _rx) = channel::(); let existing = reg.add(tx); let expected = Uuid::new_v4(); let ids = [existing.clone(), expected.clone()]; @@ -166,6 +183,7 @@ impl ClientLink { #[cfg(test)] mod clientlinks { use super::*; + use crate::client::replies::create_reply; use std::time::Duration; static TIMEOUT: Duration = Duration::from_millis(500); @@ -182,10 +200,10 @@ mod clientlinks { MsgType::ClientRequest => {} _ => unreachable!("should have been a client request"), } - match msg.get_data().get("tx_id") { + match msg.get_data("tx_id") { Some(result) => { let id = result.to_uuid().unwrap(); - registry.send(&id, Reply {}); + registry.send(&id, create_reply()); rx_client.recv().unwrap(); } None => unreachable!("should have had a seender id"), @@ -196,6 +214,7 @@ mod clientlinks { pub struct Client { queue: Queue, registry: ClientRegistry, + return_to: HashMap, rx: Receiver, } @@ -204,12 +223,14 @@ impl Client { Self { queue: queue, registry: ClientRegistry::new(), + return_to: HashMap::new(), rx: rx, } } pub fn start(queue: Queue) -> ClientLink { let (tx, rx) = channel(); + queue.add(tx.clone(), RESPONS_TO.to_vec()); let mut client = Client::new(rx, queue); let link = ClientLink::new(tx, client.get_registry()); spawn(move || { @@ -221,10 +242,19 @@ impl Client { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); - //self.queue.send(Message::new(MsgType::SessionValidate)); - let id = msg.get_data().get("tx_id").unwrap().to_uuid().unwrap(); - let reply = Reply {}; - self.registry.send(&id, reply); + match msg.get_class() { + MsgType::ClientRequest => { + let tx_id = msg.get_data("tx_id").unwrap().to_uuid().unwrap(); + self.return_to.insert(msg.get_id(), tx_id); + self.queue.send(msg.reply(MsgType::SessionValidate)); + } + MsgType::Session => { + let rx_id = self.return_to.remove(&msg.get_id()).unwrap(); + let sess_id = msg.get_data("sess_id").unwrap().to_uuid().unwrap(); + self.registry.send(&rx_id, Reply::new(sess_id)); + } + _ => unreachable!("Received message it did not understand"), + } } } @@ -244,17 +274,21 @@ mod clients { #[test] fn start_client() { let (tx, rx) = channel(); - let mut queue = Queue::new(); + let queue = Queue::new(); queue.add(tx, [MsgType::SessionValidate].to_vec()); let mut link = Client::start(queue.clone()); let req = create_request(); - link.send(req); - /* + let reply_rx = link.send(req); let sess = rx.recv_timeout(TIMEOUT).unwrap(); match sess.get_class() { - MsgType::SessionValidate => {}, + MsgType::SessionValidate => {} _ => unreachable!("should request session validation"), } - */ + let sess_id = Uuid::new_v4(); + let mut sess_res = sess.reply(MsgType::Session); + sess_res.add_data("sess_id", sess_id.clone()); + queue.send(sess_res); + let reply = reply_rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(reply.get_session(), sess_id); } } diff --git a/src/lib.rs b/src/lib.rs index 2ceefb2..de9c9dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ mod utils; use client::{Client, ClientLink, Reply, Request}; use field::Field; use queue::Queue; +use session::Session; #[derive(Clone)] pub struct MoreThanText { @@ -16,6 +17,7 @@ pub struct MoreThanText { impl MoreThanText { pub fn new() -> Self { let queue = Queue::new(); + Session::start(queue.clone()); Self { client_link: Client::start(queue.clone()), } diff --git a/src/main.rs b/src/main.rs index d22ec34..7ec4a45 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ async fn mtt_conn(jar: CookieJar, state: State) -> impl IntoRespon tx.send(state.clone().request(sid)).await.unwrap(); }); let reply = rx.recv().await.unwrap(); - let cookie = Cookie::build((SESSION_KEY, reply.get_session())); + let cookie = Cookie::build((SESSION_KEY, reply.get_session().to_string())); let cookies = jar.add(cookie); /* @@ -81,7 +81,6 @@ mod servers { assert!(sessid.contains(SESSION_KEY), "did not set session id"); } - /* #[tokio::test] async fn session_ids_are_unique() { let app = mtt_conn.with_state(MoreThanText::new()); @@ -101,5 +100,4 @@ mod servers { holder.push(sessid); } } - */ } diff --git a/src/queue.rs b/src/queue.rs index 567a271..b8482c3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -8,7 +8,6 @@ use uuid::Uuid; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum MsgType { ClientRequest, - NoOp, SessionValidate, Session, } @@ -49,8 +48,8 @@ impl Message { self.data.insert(name.into(), data.into()); } - pub fn get_data(&self) -> &HashMap { - &self.data + pub fn get_data(&self, name: &str) -> Option<&Field> { + self.data.get(name) } pub fn get_id(&self) -> Uuid { @@ -71,9 +70,9 @@ mod messages { #[test] fn new_message() { - let msg = Message::new(MsgType::NoOp); + let msg = Message::new(MsgType::SessionValidate); match msg.class { - MsgType::NoOp => (), + MsgType::SessionValidate => (), _ => unreachable!("new defaults to noop"), } assert!(msg.data.is_empty()); @@ -83,7 +82,7 @@ mod messages { fn message_ids_are_random() { let mut ids: Vec = Vec::new(); for _ in 0..10 { - let msg = Message::new(MsgType::NoOp); + let msg = Message::new(MsgType::SessionValidate); let id = msg.id.clone(); assert!(!ids.contains(&id), "{} is a duplicate", id); ids.push(id); @@ -93,7 +92,7 @@ mod messages { #[test] fn create_reply() { let id = Uuid::new_v4(); - let mut msg = Message::new(MsgType::NoOp); + let mut msg = Message::new(MsgType::SessionValidate); msg.id = id.clone(); msg.add_data("test", "test"); let data = MsgType::ClientRequest; @@ -108,23 +107,22 @@ mod messages { #[test] fn get_message_type() { - let msg = Message::new(MsgType::NoOp); + let msg = Message::new(MsgType::SessionValidate); match msg.get_class() { - MsgType::NoOp => {} + MsgType::SessionValidate => {} _ => unreachable!("should have bneen noopn"), } } #[test] fn add_data() { - let mut msg = Message::new(MsgType::NoOp); + let mut msg = Message::new(MsgType::SessionValidate); let one = "one"; let two = "two".to_string(); msg.add_data(one, one); msg.add_data(two.clone(), two.clone()); - let result = msg.get_data(); - assert_eq!(result.get(one).unwrap().to_string(), one); - assert_eq!(result.get(&two).unwrap().to_string(), two); + assert_eq!(msg.get_data(one).unwrap().to_string(), one); + assert_eq!(msg.get_data(&two).unwrap().to_string(), two); } #[test] @@ -185,9 +183,9 @@ mod queues { let queue = Queue::new(); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - queue.add(tx1, [MsgType::NoOp].to_vec()); - queue.add(tx2, [MsgType::NoOp].to_vec()); - queue.send(Message::new(MsgType::NoOp)); + queue.add(tx1, [MsgType::SessionValidate].to_vec()); + queue.add(tx2, [MsgType::SessionValidate].to_vec()); + queue.send(Message::new(MsgType::SessionValidate)); rx1.recv().unwrap(); rx2.recv().unwrap(); } diff --git a/src/session.rs b/src/session.rs index 60b1a8c..3b1781c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -5,9 +5,9 @@ use std::{ }; use uuid::Uuid; -const RESPONS_TO: [MsgType; 1] = [MsgType::SessionValidate]; +const RESPONDS_TO: [MsgType; 1] = [MsgType::SessionValidate]; -struct Session { +pub struct Session { queue: Queue, rx: Receiver, } @@ -20,10 +20,10 @@ impl Session { } } - fn start(queue: Queue) { + pub fn start(queue: Queue) { let (tx, rx) = channel(); let session = Session::new(queue, rx); - session.queue.add(tx, RESPONS_TO.to_vec()); + session.queue.add(tx, RESPONDS_TO.to_vec()); spawn(move || { session.listen(); }); @@ -81,7 +81,7 @@ mod sessions { for _ in 0..10 { queue.send(msg.clone()); let result = rx.recv().unwrap(); - let id = result.get_data().get("sess_id").unwrap().to_uuid().unwrap(); + let id = result.get_data("sess_id").unwrap().to_uuid().unwrap(); assert!(!ids.contains(&id), "{} is a duplicate id", id); ids.push(id); } diff --git a/test/test_single_boot.py b/test/test_single_boot.py index 6417bc3..3ebbdaf 100644 --- a/test/test_single_boot.py +++ b/test/test_single_boot.py @@ -51,7 +51,6 @@ class BootUpTC(MTTClusterTC): await self.run_tests("/", tests) - @skip("Code not availaable yet.") async def test_session_id_is_random(self): """Is the session id random?""" await self.create_server()