Added queue.
This commit is contained in:
		
							
								
								
									
										46
									
								
								src/field.rs
									
									
									
									
									
								
							
							
						
						
									
										46
									
								
								src/field.rs
									
									
									
									
									
								
							@@ -1,10 +1,24 @@
 | 
			
		||||
use std::fmt;
 | 
			
		||||
use crate::queue::Message;
 | 
			
		||||
use std::{
 | 
			
		||||
    fmt,
 | 
			
		||||
    sync::mpsc::Sender,
 | 
			
		||||
};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
pub enum Field {
 | 
			
		||||
    Static(String),
 | 
			
		||||
    Uuid(Uuid),
 | 
			
		||||
    Tx(Sender<Message>),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Field {
 | 
			
		||||
    pub fn to_sender(&self) -> Result<Sender<Message>, String> {
 | 
			
		||||
        match self {
 | 
			
		||||
            Field::Tx(sender) => Ok(sender.clone()),
 | 
			
		||||
            _ => Err("not a sender field".to_string()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<String> for Field {
 | 
			
		||||
@@ -33,17 +47,25 @@ impl From<Uuid> for Field {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<Sender<Message>> for Field {
 | 
			
		||||
    fn from(value: Sender<Message>) -> Self {
 | 
			
		||||
        Field::Tx(value)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl fmt::Display for Field {
 | 
			
		||||
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 | 
			
		||||
        match self {
 | 
			
		||||
            Field::Uuid(data) => write!(f, "{}", data),
 | 
			
		||||
            Field::Static(data) => write!(f, "{}", data),
 | 
			
		||||
            Field::Tx(_) => write!(f, "{}", "message sender"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod fields {
 | 
			
		||||
    use std::sync::mpsc::channel;
 | 
			
		||||
    use super::*;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
@@ -98,6 +120,19 @@ mod fields {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn sender_to_field() {
 | 
			
		||||
        let (tx, rx) = channel::<Message>();
 | 
			
		||||
        match tx.into() {
 | 
			
		||||
            Field::Tx(sender) => {
 | 
			
		||||
                let msg = Message::new();
 | 
			
		||||
                sender.send(msg).unwrap();
 | 
			
		||||
                rx.recv().unwrap();
 | 
			
		||||
            },
 | 
			
		||||
            _ => unreachable!("should have been a sender"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn uuid_field_to_string() {
 | 
			
		||||
        let id = Uuid::new_v4();
 | 
			
		||||
@@ -119,4 +154,13 @@ mod fields {
 | 
			
		||||
        let input: Field = result.clone().into();
 | 
			
		||||
        assert_eq!(input.to_string(), result);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn get_sender() {
 | 
			
		||||
        let (tx, rx) = channel::<Message>();
 | 
			
		||||
        let field: Field = tx.into();
 | 
			
		||||
        let sender = field.to_sender().unwrap();
 | 
			
		||||
        sender.send(Message::new()).unwrap();
 | 
			
		||||
        rx.recv().unwrap();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,6 @@
 | 
			
		||||
mod client;
 | 
			
		||||
mod field;
 | 
			
		||||
mod queue;
 | 
			
		||||
 | 
			
		||||
use client::{Client, Reply, Request};
 | 
			
		||||
use field::Field;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										180
									
								
								src/queue.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										180
									
								
								src/queue.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,180 @@
 | 
			
		||||
use crate::field::Field;
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    sync::mpsc::{channel, Receiver, Sender},
 | 
			
		||||
    thread::spawn,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
enum MsgType {
 | 
			
		||||
    ClientNewMessage,
 | 
			
		||||
    NoOp,
 | 
			
		||||
    Register,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct Message {
 | 
			
		||||
    id: Uuid,
 | 
			
		||||
    class: MsgType,
 | 
			
		||||
    data: HashMap<String, Field>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Message {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            id: Uuid::nil(),
 | 
			
		||||
            class: MsgType::NoOp,
 | 
			
		||||
            data: HashMap::new(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn reply(&self, data: MsgType) -> Message {
 | 
			
		||||
        Self {
 | 
			
		||||
            id: self.id.clone(),
 | 
			
		||||
            class: data,
 | 
			
		||||
            data: self.data.clone(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_class(&self) -> &MsgType {
 | 
			
		||||
        &self.class
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn add_data<S, F>(&mut self, name: S, data: F) where S: Into<String>, F: Into<Field> {
 | 
			
		||||
        self.data.insert(name.into(), data.into());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_data(&self) -> &HashMap<String, Field> {
 | 
			
		||||
        &self.data
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod messages {
 | 
			
		||||
    use super::*;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn new_message() {
 | 
			
		||||
        let msg = Message::new();
 | 
			
		||||
        assert_eq!(msg.id, Uuid::nil());
 | 
			
		||||
        match msg.class {
 | 
			
		||||
            MsgType::NoOp => (),
 | 
			
		||||
            _ => unreachable!("new defaults to noop"),
 | 
			
		||||
        }
 | 
			
		||||
        assert!(msg.data.is_empty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn create_reply() {
 | 
			
		||||
        let id = Uuid::new_v4();
 | 
			
		||||
        let mut msg = Message::new();
 | 
			
		||||
        msg.id = id.clone();
 | 
			
		||||
        let data = MsgType::Register;
 | 
			
		||||
        let result = msg.reply(data);
 | 
			
		||||
        assert_eq!(result.id, id);
 | 
			
		||||
        match result.class {
 | 
			
		||||
            MsgType::Register => {},
 | 
			
		||||
            _ => unreachable!("should have been a registration request"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn get_message_type() {
 | 
			
		||||
        let msg = Message::new();
 | 
			
		||||
        match msg.get_class() {
 | 
			
		||||
            MsgType::NoOp => {},
 | 
			
		||||
            _ => unreachable!("should have bneen noopn"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn add_data() {
 | 
			
		||||
        let mut msg = Message::new();
 | 
			
		||||
        let one = "one";
 | 
			
		||||
        let two = "two".to_string();
 | 
			
		||||
        msg.add_data(one, one);
 | 
			
		||||
        msg.add_data(two.clone(), two.clone());
 | 
			
		||||
        let result = msg.get_data();
 | 
			
		||||
        assert_eq!(result.get(one).unwrap().to_string(), one);
 | 
			
		||||
        assert_eq!(result.get(&two).unwrap().to_string(), two);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct Queue {
 | 
			
		||||
    senders: Vec<Sender<Message>>,
 | 
			
		||||
    rx: Receiver<Message>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Queue {
 | 
			
		||||
    fn new(rx: Receiver<Message>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            senders: Vec::new(),
 | 
			
		||||
            rx: rx,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn start() -> Sender<Message> {
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        spawn(move || {
 | 
			
		||||
            let mut queue = Queue::new(rx);
 | 
			
		||||
            queue.listen();
 | 
			
		||||
        });
 | 
			
		||||
        tx
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn listen(&mut self) {
 | 
			
		||||
        loop {
 | 
			
		||||
            let msg = self.rx.recv().unwrap();
 | 
			
		||||
            match msg.get_class() {
 | 
			
		||||
                MsgType::Register => self.register(msg.get_data()).unwrap(),
 | 
			
		||||
                _ => {
 | 
			
		||||
                    for sender in self.senders.iter() {
 | 
			
		||||
                        sender.send(Message::new()).unwrap();
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn register(&mut self, data: &HashMap<String, Field>) -> Result<(), String> {
 | 
			
		||||
        match data.get("tx") {
 | 
			
		||||
            Some(data) => {
 | 
			
		||||
                self.senders.push(data.to_sender().unwrap());
 | 
			
		||||
                Ok(())
 | 
			
		||||
            },
 | 
			
		||||
            None => Err("missing tx sender".to_string()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod queues {
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
    use super::*;
 | 
			
		||||
 | 
			
		||||
    fn start_queue() -> (Sender<Message>, Receiver<Message>) {
 | 
			
		||||
        let (tx, rx) = channel::<Message>();
 | 
			
		||||
        let initial = Message::new();
 | 
			
		||||
        let mut msg = initial.reply(MsgType::Register);
 | 
			
		||||
        msg.add_data("tx", tx);
 | 
			
		||||
        let queue_tx = Queue::start();
 | 
			
		||||
        (queue_tx, rx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn run_queue() {
 | 
			
		||||
        let queue_tx = Queue::start();
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        let msg = Message::new();
 | 
			
		||||
        let mut reply = msg.reply(MsgType::Register);
 | 
			
		||||
        reply.add_data("tx", tx);
 | 
			
		||||
        queue_tx.send(reply);
 | 
			
		||||
        queue_tx.send(msg.reply(MsgType::NoOp));
 | 
			
		||||
        rx.recv_timeout(Duration::from_millis(400)).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn get_new_client_message() {
 | 
			
		||||
        let (tx, rx) = start_queue();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user