Made queue send return a result/
This commit is contained in:
		@@ -273,7 +273,9 @@ impl Client {
 | 
			
		||||
                MsgType::ClientRequest => {
 | 
			
		||||
                    let tx_id = msg.get_data("tx_id").unwrap().to_uuid().unwrap();
 | 
			
		||||
                    self.return_to.insert(msg.get_id(), tx_id);
 | 
			
		||||
                    self.queue.send(msg.reply(MsgType::SessionValidate));
 | 
			
		||||
                    self.queue
 | 
			
		||||
                        .send(msg.reply(MsgType::SessionValidate))
 | 
			
		||||
                        .unwrap();
 | 
			
		||||
                }
 | 
			
		||||
                MsgType::Session => {
 | 
			
		||||
                    let rx_id = self.return_to.remove(&msg.get_id()).unwrap();
 | 
			
		||||
@@ -314,7 +316,7 @@ mod clients {
 | 
			
		||||
        let sess_id = Uuid::new_v4();
 | 
			
		||||
        let mut sess_res = sess.reply(MsgType::Session);
 | 
			
		||||
        sess_res.add_data("sess_id", sess_id.clone());
 | 
			
		||||
        queue.send(sess_res);
 | 
			
		||||
        queue.send(sess_res).unwrap();
 | 
			
		||||
        let reply = reply_rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        assert_eq!(reply.get_session(), sess_id);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										22
									
								
								src/queue.rs
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								src/queue.rs
									
									
									
									
									
								
							@@ -181,15 +181,16 @@ impl Queue {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn send(&self, msg: Message) {
 | 
			
		||||
    pub fn send(&self, msg: Message) -> Result<(), String> {
 | 
			
		||||
        let store = self.store.read().unwrap();
 | 
			
		||||
        match store.get(&msg.get_class()) {
 | 
			
		||||
            Some(senders) => {
 | 
			
		||||
                for sender in senders.into_iter() {
 | 
			
		||||
                    sender.send(msg.clone()).unwrap();
 | 
			
		||||
                }
 | 
			
		||||
                Ok(())
 | 
			
		||||
            }
 | 
			
		||||
            None => {}
 | 
			
		||||
            None => Err(format!("no listeners for {:?}", msg.get_class())),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -211,7 +212,7 @@ mod queues {
 | 
			
		||||
        let (tx2, rx2) = channel();
 | 
			
		||||
        queue.add(tx1, [MsgType::SessionValidate].to_vec());
 | 
			
		||||
        queue.add(tx2, [MsgType::SessionValidate].to_vec());
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate));
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate)).unwrap();
 | 
			
		||||
        rx1.recv().unwrap();
 | 
			
		||||
        rx2.recv().unwrap();
 | 
			
		||||
    }
 | 
			
		||||
@@ -223,7 +224,7 @@ mod queues {
 | 
			
		||||
        let (tx2, rx2) = channel();
 | 
			
		||||
        queue.add(tx1, [MsgType::SessionValidate].to_vec());
 | 
			
		||||
        queue.add(tx2, [MsgType::Session].to_vec());
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate));
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate)).unwrap();
 | 
			
		||||
        let result = rx1.recv().unwrap();
 | 
			
		||||
        match result.get_class() {
 | 
			
		||||
            MsgType::SessionValidate => {}
 | 
			
		||||
@@ -239,7 +240,7 @@ mod queues {
 | 
			
		||||
                _ => unreachable!("{:?}", err),
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
        queue.send(Message::new(MsgType::Session));
 | 
			
		||||
        queue.send(Message::new(MsgType::Session)).unwrap();
 | 
			
		||||
        let result = rx2.recv().unwrap();
 | 
			
		||||
        match result.get_class() {
 | 
			
		||||
            MsgType::Session => {}
 | 
			
		||||
@@ -262,17 +263,20 @@ mod queues {
 | 
			
		||||
        let queue = Queue::new();
 | 
			
		||||
        let (tx, rx) = channel();
 | 
			
		||||
        queue.add(tx, [MsgType::Session, MsgType::SessionValidate].to_vec());
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate));
 | 
			
		||||
        queue.send(Message::new(MsgType::SessionValidate)).unwrap();
 | 
			
		||||
        let msg = rx.recv().unwrap();
 | 
			
		||||
        assert_eq!(msg.get_class(), &MsgType::SessionValidate);
 | 
			
		||||
        queue.send(Message::new(MsgType::Session));
 | 
			
		||||
        queue.send(Message::new(MsgType::Session)).unwrap();
 | 
			
		||||
        let msg = rx.recv().unwrap();
 | 
			
		||||
        assert_eq!(msg.get_class(), &MsgType::Session);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn unassigned_message_should_not_panic() {
 | 
			
		||||
    fn unassigned_message_should_return_error() {
 | 
			
		||||
        let queue = Queue::new();
 | 
			
		||||
        queue.send(Message::new(MsgType::Session));
 | 
			
		||||
        match queue.send(Message::new(MsgType::Session)) {
 | 
			
		||||
            Ok(_) => unreachable!("should return error"),
 | 
			
		||||
            Err(_) => {}
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -46,19 +46,17 @@ impl Session {
 | 
			
		||||
 | 
			
		||||
    fn validate(&mut self, msg: Message) {
 | 
			
		||||
        match msg.get_data("sess_id") {
 | 
			
		||||
            Some(sid) => {
 | 
			
		||||
                match sid { 
 | 
			
		||||
                    Field::Uuid(sess_id) => {
 | 
			
		||||
                        if self.data.contains_key(&sess_id) {
 | 
			
		||||
                            let mut reply = msg.reply(MsgType::Session);
 | 
			
		||||
                            reply.add_data("sess_id", sess_id.clone());
 | 
			
		||||
                             self.queue.send(reply);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            self.new_session(msg);
 | 
			
		||||
                        }
 | 
			
		||||
                    },
 | 
			
		||||
                    _ => self.new_session(msg),
 | 
			
		||||
            Some(sid) => match sid {
 | 
			
		||||
                Field::Uuid(sess_id) => {
 | 
			
		||||
                    if self.data.contains_key(&sess_id) {
 | 
			
		||||
                        let mut reply = msg.reply(MsgType::Session);
 | 
			
		||||
                        reply.add_data("sess_id", sess_id.clone());
 | 
			
		||||
                        self.queue.send(reply).unwrap();
 | 
			
		||||
                    } else {
 | 
			
		||||
                        self.new_session(msg);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                _ => self.new_session(msg),
 | 
			
		||||
            },
 | 
			
		||||
            None => self.new_session(msg),
 | 
			
		||||
        }
 | 
			
		||||
@@ -72,7 +70,7 @@ impl Session {
 | 
			
		||||
        self.data.insert(id.clone(), SessionData {});
 | 
			
		||||
        let mut reply = msg.reply(MsgType::Session);
 | 
			
		||||
        reply.add_data("sess_id", id);
 | 
			
		||||
        self.queue.send(reply);
 | 
			
		||||
        self.queue.send(reply).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -97,7 +95,7 @@ mod sessions {
 | 
			
		||||
        let listen_for = [MsgType::Session];
 | 
			
		||||
        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
			
		||||
        let msg = Message::new(MsgType::SessionValidate);
 | 
			
		||||
        queue.send(msg.clone());
 | 
			
		||||
        queue.send(msg.clone()).unwrap();
 | 
			
		||||
        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        match result.get_class() {
 | 
			
		||||
            MsgType::Session => {}
 | 
			
		||||
@@ -116,7 +114,7 @@ mod sessions {
 | 
			
		||||
        let msg = Message::new(MsgType::SessionValidate);
 | 
			
		||||
        let mut ids: Vec<Uuid> = Vec::new();
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            queue.send(msg.clone());
 | 
			
		||||
            queue.send(msg.clone()).unwrap();
 | 
			
		||||
            let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
            let id = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
			
		||||
            assert!(!ids.contains(&id), "{} is a duplicate id", id);
 | 
			
		||||
@@ -129,11 +127,11 @@ mod sessions {
 | 
			
		||||
        let listen_for = [MsgType::Session];
 | 
			
		||||
        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
			
		||||
        let mut msg = Message::new(MsgType::SessionValidate);
 | 
			
		||||
        queue.send(msg.clone());
 | 
			
		||||
        queue.send(msg.clone()).unwrap();
 | 
			
		||||
        let holder = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        let id = holder.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
			
		||||
        msg.add_data("sess_id", id.clone());
 | 
			
		||||
        queue.send(msg);
 | 
			
		||||
        queue.send(msg).unwrap();
 | 
			
		||||
        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        let output = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
			
		||||
        assert_eq!(output, id);
 | 
			
		||||
@@ -146,7 +144,7 @@ mod sessions {
 | 
			
		||||
        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
			
		||||
        let mut msg = Message::new(MsgType::SessionValidate);
 | 
			
		||||
        msg.add_data("sess_id", id.clone());
 | 
			
		||||
        queue.send(msg);
 | 
			
		||||
        queue.send(msg).unwrap();
 | 
			
		||||
        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        let output = result.get_data("sess_id").unwrap().to_uuid().unwrap();
 | 
			
		||||
        assert_ne!(output, id);
 | 
			
		||||
@@ -159,7 +157,7 @@ mod sessions {
 | 
			
		||||
        let (queue, rx) = setup_session(listen_for.to_vec());
 | 
			
		||||
        let mut msg = Message::new(MsgType::SessionValidate);
 | 
			
		||||
        msg.add_data("sess_id", id);
 | 
			
		||||
        queue.send(msg);
 | 
			
		||||
        queue.send(msg).unwrap();
 | 
			
		||||
        let result = rx.recv_timeout(TIMEOUT).unwrap();
 | 
			
		||||
        let output = result.get_data("sess_id").unwrap().to_string();
 | 
			
		||||
        assert_ne!(output, id);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user