Added queue with a little clean up.
This commit is contained in:
		@@ -1,5 +1,5 @@
 | 
			
		||||
use crate::{
 | 
			
		||||
    queue::{Message, MsgType},
 | 
			
		||||
    queue::{Message, MsgType, Queue},
 | 
			
		||||
    utils::GenID,
 | 
			
		||||
};
 | 
			
		||||
use std::{
 | 
			
		||||
@@ -62,10 +62,6 @@ impl ClientRegistry {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create_storage() -> Arc<Mutex<HashMap<Uuid, Sender<Reply>>>> {
 | 
			
		||||
        Arc::new(Mutex::new(HashMap::new()))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_id<'a>(
 | 
			
		||||
        gen: &mut impl Iterator<Item = Uuid>,
 | 
			
		||||
        data: &HashMap<Uuid, Sender<Reply>>,
 | 
			
		||||
@@ -78,7 +74,6 @@ impl ClientRegistry {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn add(&mut self, tx: Sender<Reply>) -> Uuid {
 | 
			
		||||
        let id = Uuid::new_v4();
 | 
			
		||||
        let mut reg = self.registry.lock().unwrap();
 | 
			
		||||
        let mut gen_id = GenID::new();
 | 
			
		||||
        let id = ClientRegistry::get_id(&mut gen_id, ®);
 | 
			
		||||
@@ -158,7 +153,7 @@ impl ClientLink {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn send(&mut self, req: Request) -> Receiver<Reply> {
 | 
			
		||||
    pub fn send(&mut self, _req: Request) -> Receiver<Reply> {
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        let mut msg = Message::new(MsgType::ClientRequest);
 | 
			
		||||
        let id = self.registry.add(tx);
 | 
			
		||||
@@ -211,7 +206,7 @@ impl Client {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn start() -> ClientLink {
 | 
			
		||||
    pub fn start(_queue: Queue) -> ClientLink {
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        let mut client = Client::new(rx);
 | 
			
		||||
        let link = ClientLink::new(tx, client.get_registry());
 | 
			
		||||
@@ -242,7 +237,8 @@ mod clients {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn start_client() {
 | 
			
		||||
        let mut link = Client::start();
 | 
			
		||||
        let queue = Queue::new();
 | 
			
		||||
        let mut link = Client::start(queue.clone());
 | 
			
		||||
        let req = create_request();
 | 
			
		||||
        link.send(req);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
use crate::queue::Message;
 | 
			
		||||
use std::{fmt, sync::mpsc::Sender};
 | 
			
		||||
use std::fmt;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
@@ -55,7 +54,6 @@ impl fmt::Display for Field {
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod fields {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use std::sync::mpsc::channel;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn string_to_field() {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										11
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								src/lib.rs
									
									
									
									
									
								
							@@ -5,23 +5,18 @@ mod utils;
 | 
			
		||||
 | 
			
		||||
use client::{Client, ClientLink, Reply, Request};
 | 
			
		||||
use field::Field;
 | 
			
		||||
use queue::Message;
 | 
			
		||||
use std::sync::{
 | 
			
		||||
    mpsc::{channel, Sender},
 | 
			
		||||
    Arc, RwLock,
 | 
			
		||||
};
 | 
			
		||||
use queue::Queue;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct MoreThanText {
 | 
			
		||||
    client_link: ClientLink,
 | 
			
		||||
    registry: Arc<RwLock<Vec<Sender<Message>>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MoreThanText {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        let queue = Queue::new();
 | 
			
		||||
        Self {
 | 
			
		||||
            client_link: Client::start(),
 | 
			
		||||
            registry: Arc::new(RwLock::new(Vec::new())),
 | 
			
		||||
            client_link: Client::start(queue.clone()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										139
									
								
								src/queue.rs
									
									
									
									
									
								
							
							
						
						
									
										139
									
								
								src/queue.rs
									
									
									
									
									
								
							@@ -2,18 +2,15 @@ use crate::{client::Request, field::Field};
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    sync::{
 | 
			
		||||
        mpsc::{channel, Receiver, Sender},
 | 
			
		||||
        Arc, Mutex, RwLock,
 | 
			
		||||
        mpsc::Sender,
 | 
			
		||||
        Arc, RwLock,
 | 
			
		||||
    },
 | 
			
		||||
    thread::spawn,
 | 
			
		||||
};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub enum MsgType {
 | 
			
		||||
    ClientMessage,
 | 
			
		||||
    ClientRequest,
 | 
			
		||||
    NewClientMessage,
 | 
			
		||||
    NoOp,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -27,7 +24,7 @@ pub struct Message {
 | 
			
		||||
impl Message {
 | 
			
		||||
    pub fn new(msg_type: MsgType) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            id: Uuid::nil(),
 | 
			
		||||
            id: Uuid::new_v4(),
 | 
			
		||||
            class: msg_type,
 | 
			
		||||
            data: HashMap::new(),
 | 
			
		||||
        }
 | 
			
		||||
@@ -37,7 +34,7 @@ impl Message {
 | 
			
		||||
        Self {
 | 
			
		||||
            id: self.id.clone(),
 | 
			
		||||
            class: data,
 | 
			
		||||
            data: self.data.clone(),
 | 
			
		||||
            data: HashMap::new(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -59,7 +56,7 @@ impl Message {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<Request> for Message {
 | 
			
		||||
    fn from(value: Request) -> Self {
 | 
			
		||||
    fn from(_value: Request) -> Self {
 | 
			
		||||
        let msg = Message::new(MsgType::ClientRequest);
 | 
			
		||||
        msg.reply(MsgType::ClientRequest)
 | 
			
		||||
    }
 | 
			
		||||
@@ -72,7 +69,6 @@ mod messages {
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn new_message() {
 | 
			
		||||
        let msg = Message::new(MsgType::NoOp);
 | 
			
		||||
        assert_eq!(msg.id, Uuid::nil());
 | 
			
		||||
        match msg.class {
 | 
			
		||||
            MsgType::NoOp => (),
 | 
			
		||||
            _ => unreachable!("new defaults to noop"),
 | 
			
		||||
@@ -80,18 +76,31 @@ mod messages {
 | 
			
		||||
        assert!(msg.data.is_empty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn message_ids_are_random() {
 | 
			
		||||
        let mut ids: Vec<Uuid> = Vec::new();
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            let msg = Message::new(MsgType::NoOp);
 | 
			
		||||
            let id = msg.id.clone();
 | 
			
		||||
            assert!(!ids.contains(&id), "{} is a duplicate", id);
 | 
			
		||||
            ids.push(id);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn create_reply() {
 | 
			
		||||
        let id = Uuid::new_v4();
 | 
			
		||||
        let mut msg = Message::new(MsgType::NoOp);
 | 
			
		||||
        msg.id = id.clone();
 | 
			
		||||
        let data = MsgType::NewClientMessage;
 | 
			
		||||
        msg.add_data("test", "test");
 | 
			
		||||
        let data = MsgType::ClientRequest;
 | 
			
		||||
        let result = msg.reply(data);
 | 
			
		||||
        assert_eq!(result.id, id);
 | 
			
		||||
        match result.class {
 | 
			
		||||
            MsgType::NewClientMessage => {}
 | 
			
		||||
            MsgType::ClientRequest => {}
 | 
			
		||||
            _ => unreachable!("should have been a registration request"),
 | 
			
		||||
        }
 | 
			
		||||
        assert!(result.data.is_empty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
@@ -116,24 +125,25 @@ mod messages {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct ServiceRegistry {
 | 
			
		||||
    store: Arc<Mutex<Vec<Sender<Message>>>>,
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Queue {
 | 
			
		||||
    store: Arc<RwLock<Vec<Sender<Message>>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ServiceRegistry {
 | 
			
		||||
    fn new() -> Self {
 | 
			
		||||
impl Queue {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            store: Arc::new(Mutex::new(Vec::new())),
 | 
			
		||||
            store: Arc::new(RwLock::new(Vec::new())),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn add(&self, tx: Sender<Message>) {
 | 
			
		||||
        let mut store = self.store.lock().unwrap();
 | 
			
		||||
        let mut store = self.store.write().unwrap();
 | 
			
		||||
        store.push(tx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn send(&self, msg: Message) {
 | 
			
		||||
        let mut store = self.store.lock().unwrap();
 | 
			
		||||
        let store = self.store.read().unwrap();
 | 
			
		||||
        for sender in store.iter() {
 | 
			
		||||
            sender.send(msg.clone()).unwrap();
 | 
			
		||||
        }
 | 
			
		||||
@@ -142,99 +152,18 @@ impl ServiceRegistry {
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod serviceredistries {
 | 
			
		||||
    use std::sync::mpsc::channel;
 | 
			
		||||
    use super::*;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn create_registry() {
 | 
			
		||||
        let reg = ServiceRegistry::new();
 | 
			
		||||
    fn create_queue() {
 | 
			
		||||
        let queue = Queue::new();
 | 
			
		||||
        let (tx1, rx1) = channel();
 | 
			
		||||
        let (tx2, rx2) = channel();
 | 
			
		||||
        reg.add(tx1);
 | 
			
		||||
        reg.add(tx2);
 | 
			
		||||
        reg.send(Message::new(MsgType::NoOp));
 | 
			
		||||
        queue.add(tx1);
 | 
			
		||||
        queue.add(tx2);
 | 
			
		||||
        queue.send(Message::new(MsgType::NoOp));
 | 
			
		||||
        rx1.recv().unwrap();
 | 
			
		||||
        rx2.recv().unwrap();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct Queue {
 | 
			
		||||
    registry: Arc<RwLock<Vec<Sender<Message>>>>,
 | 
			
		||||
    rx: Receiver<Message>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Queue {
 | 
			
		||||
    fn new(rx: Receiver<Message>, registry: Arc<RwLock<Vec<Sender<Message>>>>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            registry: registry,
 | 
			
		||||
            rx: rx,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn start(registry: Arc<RwLock<Vec<Sender<Message>>>>) -> Sender<Message> {
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        spawn(move || {
 | 
			
		||||
            let mut queue = Queue::new(rx, registry);
 | 
			
		||||
            queue.listen();
 | 
			
		||||
        });
 | 
			
		||||
        tx
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn listen(&mut self) {
 | 
			
		||||
        loop {
 | 
			
		||||
            let mut msg = self.rx.recv().unwrap();
 | 
			
		||||
            msg.id = Uuid::new_v4();
 | 
			
		||||
            let senders = self.registry.read().unwrap();
 | 
			
		||||
            for sender in senders.iter() {
 | 
			
		||||
                sender.send(msg.reply(MsgType::ClientMessage)).unwrap();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod queues {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
    static TIMEOUT: Duration = Duration::from_millis(500);
 | 
			
		||||
 | 
			
		||||
    fn start_queue() -> (Sender<Message>, Receiver<Message>) {
 | 
			
		||||
        let reg: Arc<RwLock<Vec<Sender<Message>>>> = Arc::new(RwLock::new(Vec::new()));
 | 
			
		||||
        let (tx, rx) = channel::<Message>();
 | 
			
		||||
        let mut data = reg.write().unwrap();
 | 
			
		||||
        data.push(tx);
 | 
			
		||||
        drop(data);
 | 
			
		||||
        let queue_tx = Queue::start(Arc::clone(®));
 | 
			
		||||
        (queue_tx, rx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn get_new_client_message() {
 | 
			
		||||
        let (tx, rx) = start_queue();
 | 
			
		||||
        let initial = Message::new(MsgType::NoOp);
 | 
			
		||||
        let msg = initial.reply(MsgType::NewClientMessage);
 | 
			
		||||
        tx.send(msg).unwrap();
 | 
			
		||||
        let msg = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        match msg.class {
 | 
			
		||||
            MsgType::ClientMessage => assert_ne!(msg.id, initial.id),
 | 
			
		||||
            _ => unreachable!("should have been a client message"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn new_client_messages_are_unique() {
 | 
			
		||||
        let (tx, rx) = start_queue();
 | 
			
		||||
        let msg = Message::new(MsgType::NoOp);
 | 
			
		||||
        let mut ids: Vec<Uuid> = Vec::new();
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            tx.send(msg.reply(MsgType::NewClientMessage)).unwrap();
 | 
			
		||||
            let result = rx.recv().unwrap();
 | 
			
		||||
            assert!(
 | 
			
		||||
                !ids.contains(&result.id.clone()),
 | 
			
		||||
                "{} is a duplicate",
 | 
			
		||||
                &result.id
 | 
			
		||||
            );
 | 
			
		||||
            ids.push(result.id);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user