From c3f24d58c3553a03328e5c135bd308bf091aad54 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Sun, 30 Mar 2025 11:38:41 -0400 Subject: [PATCH] Fixed lost sender issue. --- src/field.rs | 9 ++---- src/lib.rs | 12 ++++++-- src/queue.rs | 86 +++++++++++++++++++++++----------------------------- 3 files changed, 51 insertions(+), 56 deletions(-) diff --git a/src/field.rs b/src/field.rs index 8d52360..7e614bf 100644 --- a/src/field.rs +++ b/src/field.rs @@ -1,8 +1,5 @@ use crate::queue::Message; -use std::{ - fmt, - sync::mpsc::Sender, -}; +use std::{fmt, sync::mpsc::Sender}; use uuid::Uuid; #[derive(Clone, Debug)] @@ -65,8 +62,8 @@ impl fmt::Display for Field { #[cfg(test)] mod fields { - use std::sync::mpsc::channel; use super::*; + use std::sync::mpsc::channel; #[test] fn string_to_field() { @@ -128,7 +125,7 @@ mod fields { let msg = Message::new(); sender.send(msg).unwrap(); rx.recv().unwrap(); - }, + } _ => unreachable!("should have been a sender"), } } diff --git a/src/lib.rs b/src/lib.rs index 8fc8d87..041f628 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,17 +4,25 @@ mod queue; use client::{Client, Reply, Request}; use field::Field; -use std::sync::mpsc::{channel, Sender}; +use queue::Message; +use std::sync::{ + mpsc::{channel, Sender}, + Arc, RwLock, +}; #[derive(Clone)] pub struct MoreThanText { + registry: Arc>>>, tx: Sender, } impl MoreThanText { pub fn new() -> Self { let tx = Client::start(); - Self { tx: tx } + Self { + registry: Arc::new(RwLock::new(Vec::new())), + tx: tx, + } } pub fn request(&self, _session: Option) -> Reply diff --git a/src/queue.rs b/src/queue.rs index b5ec052..b9879ae 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,16 +1,18 @@ use crate::field::Field; use std::{ collections::HashMap, - sync::mpsc::{channel, Receiver, Sender}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, RwLock, + }, thread::spawn, }; - use uuid::Uuid; enum MsgType { - ClientNewMessage, + ClientMessage, + NewClientMessage, NoOp, - Register, } pub struct Message { @@ -40,7 +42,11 @@ impl Message { &self.class } - fn add_data(&mut self, name: S, data: F) where S: Into, F: Into { + fn add_data(&mut self, name: S, data: F) + where + S: Into, + F: Into, + { self.data.insert(name.into(), data.into()); } @@ -69,11 +75,11 @@ mod messages { let id = Uuid::new_v4(); let mut msg = Message::new(); msg.id = id.clone(); - let data = MsgType::Register; + let data = MsgType::NewClientMessage; let result = msg.reply(data); assert_eq!(result.id, id); match result.class { - MsgType::Register => {}, + MsgType::NewClientMessage => {} _ => unreachable!("should have been a registration request"), } } @@ -82,7 +88,7 @@ mod messages { fn get_message_type() { let msg = Message::new(); match msg.get_class() { - MsgType::NoOp => {}, + MsgType::NoOp => {} _ => unreachable!("should have bneen noopn"), } } @@ -101,22 +107,22 @@ mod messages { } struct Queue { - senders: Vec>, + registry: Arc>>>, rx: Receiver, } impl Queue { - fn new(rx: Receiver) -> Self { + fn new(rx: Receiver, registry: Arc>>>) -> Self { Self { - senders: Vec::new(), + registry: registry, rx: rx, } } - fn start() -> Sender { + fn start(registry: Arc>>>) -> Sender { let (tx, rx) = channel(); spawn(move || { - let mut queue = Queue::new(rx); + let mut queue = Queue::new(rx, registry); queue.listen(); }); tx @@ -125,56 +131,40 @@ impl Queue { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); - match msg.get_class() { - MsgType::Register => self.register(msg.get_data()).unwrap(), - _ => { - for sender in self.senders.iter() { - sender.send(Message::new()).unwrap(); - } - }, + let senders = self.registry.read().unwrap(); + for sender in senders.iter() { + sender.send(Message::new()).unwrap(); } } } - - fn register(&mut self, data: &HashMap) -> Result<(), String> { - match data.get("tx") { - Some(data) => { - self.senders.push(data.to_sender().unwrap()); - Ok(()) - }, - None => Err("missing tx sender".to_string()), - } - } } #[cfg(test)] mod queues { - use std::time::Duration; use super::*; + use std::time::Duration; - fn start_queue() -> (Sender, Receiver) { + static TIMEOUT: Duration = Duration::from_millis(500); + + fn start_queue() -> ( + Sender, + Receiver, + ) { + let reg: Arc>>> = Arc::new(RwLock::new(Vec::new())); let (tx, rx) = channel::(); - let initial = Message::new(); - let mut msg = initial.reply(MsgType::Register); - msg.add_data("tx", tx); - let queue_tx = Queue::start(); + let mut data = reg.write().unwrap(); + data.push(tx.clone()); + drop(data); + let queue_tx = Queue::start(Arc::clone(®)); (queue_tx, rx) } - #[test] - fn run_queue() { - let queue_tx = Queue::start(); - let (tx, rx) = channel(); - let msg = Message::new(); - let mut reply = msg.reply(MsgType::Register); - reply.add_data("tx", tx); - queue_tx.send(reply); - queue_tx.send(msg.reply(MsgType::NoOp)); - rx.recv_timeout(Duration::from_millis(400)).unwrap(); - } - #[test] fn get_new_client_message() { let (tx, rx) = start_queue(); + let initial = Message::new(); + let msg = initial.reply(MsgType::NewClientMessage); + tx.send(msg).unwrap(); + rx.recv_timeout(TIMEOUT).unwrap(); } }