Preparing to move session into an extractor.

This commit is contained in:
2025-04-19 07:57:16 -04:00
parent 9e6d407b69
commit bb70cc65e0
5 changed files with 146 additions and 38 deletions

View File

@ -107,7 +107,7 @@ mod replies {
#[derive(Clone)]
pub struct ClientRegistry {
registry: Arc<Mutex<HashMap<Uuid, Sender<Reply>>>>,
registry: Arc<Mutex<HashMap<Uuid, Sender<Message>>>>,
}
impl ClientRegistry {
@ -119,7 +119,7 @@ impl ClientRegistry {
fn get_id<'a>(
gen: &mut impl Iterator<Item = Uuid>,
data: &HashMap<Uuid, Sender<Reply>>,
data: &HashMap<Uuid, Sender<Message>>,
) -> Uuid {
let mut id = gen.next().unwrap();
while data.contains_key(&id) {
@ -128,7 +128,7 @@ impl ClientRegistry {
id.clone()
}
pub fn add(&mut self, tx: Sender<Reply>) -> Uuid {
pub fn add(&mut self, tx: Sender<Message>) -> Uuid {
let mut reg = self.registry.lock().unwrap();
let mut gen_id = GenID::new();
let id = ClientRegistry::get_id(&mut gen_id, &reg);
@ -136,7 +136,7 @@ impl ClientRegistry {
id
}
fn send(&mut self, id: &Uuid, msg: Reply) {
fn send(&mut self, id: &Uuid, msg: Message) {
let mut reg = self.registry.lock().unwrap();
let tx = reg.remove(id).unwrap();
tx.send(msg).unwrap();
@ -165,15 +165,15 @@ mod clientregistries {
fn send_from_client() {
let mut reg = ClientRegistry::new();
let count = 10;
let mut rxs: HashMap<Uuid, Receiver<Reply>> = HashMap::new();
let mut rxs: HashMap<Uuid, Receiver<Message>> = HashMap::new();
for _ in 0..count {
let (tx, rx) = channel::<Reply>();
let (tx, rx) = channel::<Message>();
let id = reg.add(tx);
rxs.insert(id, rx);
}
assert_eq!(rxs.len(), count, "should have been {} receivers", count);
for (id, rx) in rxs.iter() {
let msg = create_reply();
let msg = Message::new(MsgType::Document);
reg.send(id, msg);
rx.recv_timeout(TIMEOUT).unwrap();
}
@ -184,7 +184,7 @@ mod clientregistries {
#[test]
fn prevent_duplicates() {
let mut reg = ClientRegistry::new();
let (tx, _rx) = channel::<Reply>();
let (tx, _rx) = channel::<Message>();
let existing = reg.add(tx);
let expected = Uuid::new_v4();
let ids = [existing.clone(), expected.clone()];
@ -208,12 +208,12 @@ impl ClientLink {
}
}
pub fn send(&mut self, req: Request) -> Receiver<Reply> {
pub fn send(&mut self, mut req: Message) -> Receiver<Message> {
let (tx, rx) = channel();
let mut msg: Message = req.into();
//let mut msg: Message = req.into();
let id = self.registry.add(tx);
msg.add_data("tx_id", id);
self.tx.send(msg).unwrap();
req.add_data("tx_id", id);
self.tx.send(req).unwrap();
rx
}
}
@ -232,7 +232,7 @@ mod clientlinks {
let mut registry = ClientRegistry::new();
let mut link = ClientLink::new(tx, registry.clone());
let req = Request::new(None);
let rx_client = link.send(req);
let rx_client = link.send(req.into());
let msg = rx.recv_timeout(TIMEOUT).unwrap();
match msg.get_msg_type() {
MsgType::ClientRequest => {}
@ -241,7 +241,7 @@ mod clientlinks {
match msg.get_data("tx_id") {
Some(result) => {
let id = result.to_uuid().unwrap();
registry.send(&id, create_reply());
registry.send(&id, msg.reply(MsgType::Document));
rx_client.recv().unwrap();
}
None => unreachable!("should have had a seender id"),
@ -319,11 +319,13 @@ impl Client {
fn document(&mut self, msg: Message) {
let initial_msg = self.return_to.remove(&msg.get_id()).unwrap();
let tx_id = initial_msg.get_data("tx_id").unwrap().to_uuid().unwrap();
/*
let reply = Reply::new(
initial_msg.get_data("sess_id").unwrap().to_uuid().unwrap(),
msg.get_data("doc").unwrap().to_string(),
);
self.registry.send(&tx_id, reply);
)s
*/
self.registry.send(&tx_id, initial_msg.reply(MsgType::Document));
}
}
@ -335,6 +337,7 @@ mod clients {
static TIMEOUT: Duration = Duration::from_millis(500);
/*
#[test]
fn start_client() {
let sess_id1 = Uuid::new_v4();
@ -348,7 +351,7 @@ mod clients {
);
let mut link = Client::start(queue.clone());
let req = get_root_with_session(&sess_id1);
let reply_rx = link.send(req);
let reply_rx = link.send(req.into());
let send1 = rx.recv_timeout(TIMEOUT).unwrap();
match send1.get_msg_type() {
MsgType::SessionValidate => {}
@ -376,7 +379,8 @@ mod clients {
document.add_data("doc", doc.clone());
queue.send(document).unwrap();
let reply = reply_rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(reply.get_session(), sess_id2);
assert_eq!(reply.get_content(), doc);
assert_eq!(reply.get_data("sess_id").unwrap().to_uuid().unwrap(), sess_id2);
assert_eq!(reply.get_data("doc").unwrap().to_string(), doc);
}
*/
}