Added message queue.

This commit is contained in:
Jeff Baskin 2025-02-22 10:53:05 -05:00
parent 4bca78a2f8
commit e77ed83d84
2 changed files with 148 additions and 5 deletions

View File

@ -2,6 +2,7 @@ mod client;
mod data; mod data;
mod error; mod error;
mod message; mod message;
mod queue;
mod router; mod router;
mod session; mod session;
@ -12,7 +13,7 @@ use session::{Session, SessionData, SessionMsg};
use std::{ use std::{
collections::HashMap, collections::HashMap,
ops::Deref, ops::Deref,
sync::mpsc::{channel, Sender}, sync::mpsc::{channel, Receiver, Sender},
}; };
use uuid::Uuid; use uuid::Uuid;
@ -105,12 +106,14 @@ mod fields {
} }
} }
#[derive(Clone)]
struct Request { struct Request {
id: Option<Field>, id: Option<Field>,
tx: Sender<Response>,
} }
impl Request { impl Request {
fn new<F>(mut id: Option<F>) -> Self fn new<F>(id: Option<F>) -> (Self, Receiver<Response>)
where where
F: Into<Field>, F: Into<Field>,
{ {
@ -121,7 +124,22 @@ impl Request {
} }
None => result = None, None => result = None,
} }
Self { id: result } let (tx, rx) = channel();
(Self { id: result, tx: tx }, rx)
}
fn get_session(&self) -> &Option<Field> {
return &self.id;
}
}
#[cfg(test)]
mod create_request {
use super::*;
pub fn empty_request() -> (Request, Receiver<Response>) {
let id: Option<String> = None;
Request::new(id)
} }
} }
@ -132,14 +150,14 @@ mod requests {
#[test] #[test]
fn create_request_no_id() { fn create_request_no_id() {
let input: Option<String> = None; let input: Option<String> = None;
let req = Request::new(input); let (req, _) = Request::new(input);
assert!(req.id.is_none()); assert!(req.id.is_none());
} }
#[test] #[test]
fn create_request_with_uuid() { fn create_request_with_uuid() {
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let req = Request::new(Some(id)); let (req, _) = Request::new(Some(id));
match req.id { match req.id {
Some(field) => match (field) { Some(field) => match (field) {
Field::Uuid(data) => assert_eq!(data, id), Field::Uuid(data) => assert_eq!(data, id),
@ -148,6 +166,21 @@ mod requests {
None => unreachable!("Should producer data"), None => unreachable!("Should producer data"),
} }
} }
#[test]
fn return_session() {
let id = Uuid::new_v4();
let (req, _) = Request::new(Some(id));
match req.get_session() {
Some(result) => {
match result {
Field::Uuid(data) => assert_eq!(data, &id),
_ => unreachable!("should have returned a uuid field"),
};
}
None => unreachable!("should have returned a uuid"),
}
}
} }
struct Record { struct Record {

110
src/queue.rs Normal file
View File

@ -0,0 +1,110 @@
use crate::Request;
use std::{
sync::mpsc::{channel, Receiver, Sender},
thread::spawn,
};
#[derive(Clone)]
enum Message {
Register(Sender<Message>),
Req(Request),
}
impl From<Request> for Message {
fn from(value: Request) -> Self {
Message::Req(value)
}
}
impl From<Sender<Message>> for Message {
fn from(value: Sender<Message>) -> Self {
Message::Register(value)
}
}
#[cfg(test)]
mod messages {
use super::*;
use crate::create_request::empty_request;
#[test]
fn from_request() {
let (req, _) = empty_request();
match req.into() {
Message::Req(result) => assert!(result.get_session().is_none()),
_ => unreachable!("should have been s request"),
}
}
#[test]
fn from_sender() {
let (tx, _) = channel();
match tx.into() {
Message::Register(_) => {}
_ => unreachable!("should have been a register"),
}
}
}
struct Queue {
channels: Vec<Sender<Message>>,
rx: Receiver<Message>,
}
impl Queue {
fn new(rx: Receiver<Message>) -> Self {
Self {
channels: Vec::new(),
rx: rx,
}
}
fn listen(&mut self) {
loop {
let msg = self.rx.recv().unwrap();
match msg {
Message::Register(tx) => self.channels.push(tx),
_ => {
for tx in self.channels.iter() {
tx.send(msg.clone()).unwrap();
}
}
}
}
}
fn start() -> Sender<Message> {
let (tx, rx) = channel();
spawn(move || {
let mut queue = Queue::new(rx);
queue.listen();
});
tx
}
}
#[cfg(test)]
mod queues {
use super::*;
use crate::create_request::empty_request;
#[test]
fn create_queue() {
let mut channels = Vec::new();
for _ in 0..5 {
channels.push(channel());
}
let mut queue_tx = Queue::start();
for (tx, _) in channels.iter() {
queue_tx.send(tx.clone().into()).unwrap();
}
let (req, _) = empty_request();
queue_tx.send(req.into()).unwrap();
for (_, rx) in channels.iter() {
match rx.recv().unwrap() {
Message::Req(_) => {}
_ => unreachable!("should have been a request"),
}
}
}
}