From e77ed83d84d2e04a7557a54937bd80789e0327f9 Mon Sep 17 00:00:00 2001
From: Jeff Baskin <jeff.l.baskin@gmail.com>
Date: Sat, 22 Feb 2025 10:53:05 -0500
Subject: [PATCH] Added message queue.

---
 src/lib.rs   |  43 +++++++++++++++++---
 src/queue.rs | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+), 5 deletions(-)
 create mode 100644 src/queue.rs

diff --git a/src/lib.rs b/src/lib.rs
index bd06952..3c116b8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -2,6 +2,7 @@ mod client;
 mod data;
 mod error;
 mod message;
+mod queue;
 mod router;
 mod session;
 
@@ -12,7 +13,7 @@ use session::{Session, SessionData, SessionMsg};
 use std::{
     collections::HashMap,
     ops::Deref,
-    sync::mpsc::{channel, Sender},
+    sync::mpsc::{channel, Receiver, Sender},
 };
 use uuid::Uuid;
 
@@ -105,12 +106,14 @@ mod fields {
     }
 }
 
+#[derive(Clone)]
 struct Request {
     id: Option<Field>,
+    tx: Sender<Response>,
 }
 
 impl Request {
-    fn new<F>(mut id: Option<F>) -> Self
+    fn new<F>(id: Option<F>) -> (Self, Receiver<Response>)
     where
         F: Into<Field>,
     {
@@ -121,7 +124,22 @@ impl Request {
             }
             None => result = None,
         }
-        Self { id: result }
+        let (tx, rx) = channel();
+        (Self { id: result, tx: tx }, rx)
+    }
+
+    fn get_session(&self) -> &Option<Field> {
+        return &self.id;
+    }
+}
+
+#[cfg(test)]
+mod create_request {
+    use super::*;
+
+    pub fn empty_request() -> (Request, Receiver<Response>) {
+        let id: Option<String> = None;
+        Request::new(id)
     }
 }
 
@@ -132,14 +150,14 @@ mod requests {
     #[test]
     fn create_request_no_id() {
         let input: Option<String> = None;
-        let req = Request::new(input);
+        let (req, _) = Request::new(input);
         assert!(req.id.is_none());
     }
 
     #[test]
     fn create_request_with_uuid() {
         let id = Uuid::new_v4();
-        let req = Request::new(Some(id));
+        let (req, _) = Request::new(Some(id));
         match req.id {
             Some(field) => match (field) {
                 Field::Uuid(data) => assert_eq!(data, id),
@@ -148,6 +166,21 @@ mod requests {
             None => unreachable!("Should producer data"),
         }
     }
+
+    #[test]
+    fn return_session() {
+        let id = Uuid::new_v4();
+        let (req, _) = Request::new(Some(id));
+        match req.get_session() {
+            Some(result) => {
+                match result {
+                    Field::Uuid(data) => assert_eq!(data, &id),
+                    _ => unreachable!("should have returned a uuid field"),
+                };
+            }
+            None => unreachable!("should have returned a uuid"),
+        }
+    }
 }
 
 struct Record {
diff --git a/src/queue.rs b/src/queue.rs
new file mode 100644
index 0000000..3416105
--- /dev/null
+++ b/src/queue.rs
@@ -0,0 +1,110 @@
+use crate::Request;
+use std::{
+    sync::mpsc::{channel, Receiver, Sender},
+    thread::spawn,
+};
+
+#[derive(Clone)]
+enum Message {
+    Register(Sender<Message>),
+    Req(Request),
+}
+
+impl From<Request> for Message {
+    fn from(value: Request) -> Self {
+        Message::Req(value)
+    }
+}
+
+impl From<Sender<Message>> for Message {
+    fn from(value: Sender<Message>) -> Self {
+        Message::Register(value)
+    }
+}
+
+#[cfg(test)]
+mod messages {
+    use super::*;
+    use crate::create_request::empty_request;
+
+    #[test]
+    fn from_request() {
+        let (req, _) = empty_request();
+        match req.into() {
+            Message::Req(result) => assert!(result.get_session().is_none()),
+            _ => unreachable!("should have been s request"),
+        }
+    }
+
+    #[test]
+    fn from_sender() {
+        let (tx, _) = channel();
+        match tx.into() {
+            Message::Register(_) => {}
+            _ => unreachable!("should have been a register"),
+        }
+    }
+}
+
+struct Queue {
+    channels: Vec<Sender<Message>>,
+    rx: Receiver<Message>,
+}
+
+impl Queue {
+    fn new(rx: Receiver<Message>) -> Self {
+        Self {
+            channels: Vec::new(),
+            rx: rx,
+        }
+    }
+
+    fn listen(&mut self) {
+        loop {
+            let msg = self.rx.recv().unwrap();
+            match msg {
+                Message::Register(tx) => self.channels.push(tx),
+                _ => {
+                    for tx in self.channels.iter() {
+                        tx.send(msg.clone()).unwrap();
+                    }
+                }
+            }
+        }
+    }
+
+    fn start() -> Sender<Message> {
+        let (tx, rx) = channel();
+        spawn(move || {
+            let mut queue = Queue::new(rx);
+            queue.listen();
+        });
+        tx
+    }
+}
+
+#[cfg(test)]
+mod queues {
+    use super::*;
+    use crate::create_request::empty_request;
+
+    #[test]
+    fn create_queue() {
+        let mut channels = Vec::new();
+        for _ in 0..5 {
+            channels.push(channel());
+        }
+        let mut queue_tx = Queue::start();
+        for (tx, _) in channels.iter() {
+            queue_tx.send(tx.clone().into()).unwrap();
+        }
+        let (req, _) = empty_request();
+        queue_tx.send(req.into()).unwrap();
+        for (_, rx) in channels.iter() {
+            match rx.recv().unwrap() {
+                Message::Req(_) => {}
+                _ => unreachable!("should have been a request"),
+            }
+        }
+    }
+}