Added session storage.
This commit is contained in:
		@@ -1,5 +1,9 @@
 | 
				
			|||||||
use crate::queue::{Message, MsgType, Queue};
 | 
					use crate::{
 | 
				
			||||||
 | 
					    field::Field,
 | 
				
			||||||
 | 
					    queue::{Message, MsgType, Queue},
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use std::{
 | 
					use std::{
 | 
				
			||||||
 | 
					    collections::HashMap,
 | 
				
			||||||
    sync::mpsc::{channel, Receiver},
 | 
					    sync::mpsc::{channel, Receiver},
 | 
				
			||||||
    thread::spawn,
 | 
					    thread::spawn,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
@@ -7,7 +11,10 @@ use uuid::Uuid;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const RESPONDS_TO: [MsgType; 1] = [MsgType::SessionValidate];
 | 
					const RESPONDS_TO: [MsgType; 1] = [MsgType::SessionValidate];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					struct SessionData;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct Session {
 | 
					pub struct Session {
 | 
				
			||||||
 | 
					    data: HashMap<Uuid, SessionData>,
 | 
				
			||||||
    queue: Queue,
 | 
					    queue: Queue,
 | 
				
			||||||
    rx: Receiver<Message>,
 | 
					    rx: Receiver<Message>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -15,6 +22,7 @@ pub struct Session {
 | 
				
			|||||||
impl Session {
 | 
					impl Session {
 | 
				
			||||||
    fn new(queue: Queue, rx: Receiver<Message>) -> Self {
 | 
					    fn new(queue: Queue, rx: Receiver<Message>) -> Self {
 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
 | 
					            data: HashMap::new(),
 | 
				
			||||||
            queue: queue,
 | 
					            queue: queue,
 | 
				
			||||||
            rx: rx,
 | 
					            rx: rx,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@@ -22,26 +30,48 @@ impl Session {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    pub fn start(queue: Queue) {
 | 
					    pub fn start(queue: Queue) {
 | 
				
			||||||
        let (tx, rx) = channel();
 | 
					        let (tx, rx) = channel();
 | 
				
			||||||
        let session = Session::new(queue, rx);
 | 
					        let mut session = Session::new(queue, rx);
 | 
				
			||||||
        session.queue.add(tx, RESPONDS_TO.to_vec());
 | 
					        session.queue.add(tx, RESPONDS_TO.to_vec());
 | 
				
			||||||
        spawn(move || {
 | 
					        spawn(move || {
 | 
				
			||||||
            session.listen();
 | 
					            session.listen();
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn listen(&self) {
 | 
					    fn listen(&mut self) {
 | 
				
			||||||
        loop {
 | 
					        loop {
 | 
				
			||||||
            let msg = self.rx.recv().unwrap();
 | 
					            let msg = self.rx.recv().unwrap();
 | 
				
			||||||
            self.validate(msg);
 | 
					            self.validate(msg);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn validate(&self, msg: Message) {
 | 
					    fn validate(&mut self, msg: Message) {
 | 
				
			||||||
        let mut reply = msg.reply(MsgType::Session);
 | 
					 | 
				
			||||||
        match msg.get_data("sess_id") {
 | 
					        match msg.get_data("sess_id") {
 | 
				
			||||||
            Some(id) => reply.add_data("sess_id", id.clone()),
 | 
					            Some(sid) => {
 | 
				
			||||||
            None => reply.add_data("sess_id", Uuid::new_v4()),
 | 
					                match sid { 
 | 
				
			||||||
 | 
					                    Field::Uuid(sess_id) => {
 | 
				
			||||||
 | 
					                        if self.data.contains_key(&sess_id) {
 | 
				
			||||||
 | 
					                            let mut reply = msg.reply(MsgType::Session);
 | 
				
			||||||
 | 
					                            reply.add_data("sess_id", sess_id.clone());
 | 
				
			||||||
 | 
					                             self.queue.send(reply);
 | 
				
			||||||
 | 
					                        } else {
 | 
				
			||||||
 | 
					                            self.new_session(msg);
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    },
 | 
				
			||||||
 | 
					                    _ => self.new_session(msg),
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					            None => self.new_session(msg),
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn new_session(&mut self, msg: Message) {
 | 
				
			||||||
 | 
					        let mut id = Uuid::new_v4();
 | 
				
			||||||
 | 
					        while self.data.contains_key(&id) {
 | 
				
			||||||
 | 
					            id = Uuid::new_v4();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        self.data.insert(id.clone(), SessionData {});
 | 
				
			||||||
 | 
					        let mut reply = msg.reply(MsgType::Session);
 | 
				
			||||||
 | 
					        reply.add_data("sess_id", id);
 | 
				
			||||||
        self.queue.send(reply);
 | 
					        self.queue.send(reply);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -87,7 +117,7 @@ mod sessions {
 | 
				
			|||||||
        let mut ids: Vec<Uuid> = Vec::new();
 | 
					        let mut ids: Vec<Uuid> = Vec::new();
 | 
				
			||||||
        for _ in 0..10 {
 | 
					        for _ in 0..10 {
 | 
				
			||||||
            queue.send(msg.clone());
 | 
					            queue.send(msg.clone());
 | 
				
			||||||
            let result = rx.recv().unwrap();
 | 
					            let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
				
			||||||
            let id = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
					            let id = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
				
			||||||
            assert!(!ids.contains(&id), "{} is a duplicate id", id);
 | 
					            assert!(!ids.contains(&id), "{} is a duplicate id", id);
 | 
				
			||||||
            ids.push(id);
 | 
					            ids.push(id);
 | 
				
			||||||
@@ -100,12 +130,38 @@ mod sessions {
 | 
				
			|||||||
        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
					        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
				
			||||||
        let mut msg = Message::new(MsgType::SessionValidate);
 | 
					        let mut msg = Message::new(MsgType::SessionValidate);
 | 
				
			||||||
        queue.send(msg.clone());
 | 
					        queue.send(msg.clone());
 | 
				
			||||||
        let holder = rx.recv().unwrap();
 | 
					        let holder = rx.recv_timeout(TIMEOUT).unwrap();
 | 
				
			||||||
        let id = holder.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
					        let id = holder.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
				
			||||||
        msg.add_data("sess_id", id.clone());
 | 
					        msg.add_data("sess_id", id.clone());
 | 
				
			||||||
        queue.send(msg);
 | 
					        queue.send(msg);
 | 
				
			||||||
        let result = rx.recv().unwrap();
 | 
					        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
				
			||||||
        let output = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
					        let output = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
				
			||||||
        assert_eq!(output, id);
 | 
					        assert_eq!(output, id);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[test]
 | 
				
			||||||
 | 
					    fn issue_new_if_validated_doe_not_exist() {
 | 
				
			||||||
 | 
					        let id = Uuid::new_v4();
 | 
				
			||||||
 | 
					        let listen_for = [MsgType::Session];
 | 
				
			||||||
 | 
					        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
				
			||||||
 | 
					        let mut msg = Message::new(MsgType::SessionValidate);
 | 
				
			||||||
 | 
					        msg.add_data("sess_id", id.clone());
 | 
				
			||||||
 | 
					        queue.send(msg);
 | 
				
			||||||
 | 
					        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
				
			||||||
 | 
					        let output = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
				
			||||||
 | 
					        assert_ne!(output, id);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[test]
 | 
				
			||||||
 | 
					    fn new_for_bad_uuid() {
 | 
				
			||||||
 | 
					        let id = "bad uuid";
 | 
				
			||||||
 | 
					        let listen_for = [MsgType::Session];
 | 
				
			||||||
 | 
					        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
				
			||||||
 | 
					        let mut msg = Message::new(MsgType::SessionValidate);
 | 
				
			||||||
 | 
					        msg.add_data("sess_id", id);
 | 
				
			||||||
 | 
					        queue.send(msg);
 | 
				
			||||||
 | 
					        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
				
			||||||
 | 
					        let output = result.get_data("sess_id").unwrap().to_string();
 | 
				
			||||||
 | 
					        assert_ne!(output, id);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user