Abandoned path. Senders can only be moved oncde.

This commit is contained in:
2025-03-26 17:42:34 -04:00
parent d824220fab
commit c9de11ff61
12 changed files with 1564 additions and 676 deletions

View File

@ -1,20 +1,224 @@
use crate::{session2::SessionMessage, Request};
use crate::{client::Reply, ClientMessage, Request};
use std::{
sync::mpsc::{channel, Receiver, Sender},
thread::spawn,
};
#[derive(Clone)]
pub enum MessageData {
ClientMsg(ClientMessage),
Register(Sender<Message>),
}
impl From<ClientMessage> for MessageData {
fn from(value: ClientMessage) -> Self {
MessageData::ClientMsg(value)
}
}
impl From<Request> for MessageData {
fn from(value: Request) -> Self {
let data: ClientMessage = value.into();
MessageData::ClientMsg(data)
}
}
impl From<Sender<Message>> for MessageData {
fn from(value: Sender<Message>) -> Self {
MessageData::Register(value)
}
}
#[cfg(test)]
mod messagedatas {
use crate::create_requests::root_doc_no_sess;
use super::*;
#[test]
fn from_client_message() {
let (req, _) = root_doc_no_sess();
let data: ClientMessage = req.into();
match data.into() {
MessageData::ClientMsg(_) => {},
_ => unreachable!("should have been a client messsage"),
}
}
#[test]
fn from_request() {
let (req, _) = root_doc_no_sess();
match req.into() {
MessageData::ClientMsg(_) => {},
_ => unreachable!("should be a client message"),
}
}
#[test]
fn from_sender() {
let (tx, rx) = channel();
match tx.into() {
MessageData::Register(result) => {
let (req, _) = root_doc_no_sess();
result.send(Message::new(req.into())).unwrap();
match rx.recv().unwrap().data {
MessageData::ClientMsg(_) => {}
_ => unreachable!("should have been a request"),
}
}
_ => unreachable!("should have been a register"),
}
}
}
#[derive(Clone)]
pub struct Message {
data: MessageData,
}
impl Message {
pub fn new(data: MessageData) -> Self {
Self {
data: data,
}
}
pub fn get_data(&self) -> &MessageData {
&self.data
}
}
#[cfg(test)]
mod messages {
use crate::create_requests::root_doc_no_sess;
use super::*;
#[test]
fn create_msg() {
let (req, _) = root_doc_no_sess();
let msg = Message::new(req.into());
match msg.data {
MessageData::ClientMsg(_) => {},
_ => unreachable!("should have been a client message"),
}
}
}
pub struct Queue {
channels: Vec<Sender<Message>>,
rx: Receiver<Message>,
}
impl Queue {
fn new(rx: Receiver<Message>) -> Self {
Self {
channels: Vec::new(),
rx: rx,
}
}
pub fn start() -> Sender<Message> {
let (tx, rx) = channel();
spawn(move || {
let mut queue = Queue::new(rx);
queue.listen();
});
tx
}
fn listen(&mut self) {
loop {
let msg = self.rx.recv().unwrap();
match msg.get_data() {
MessageData::Register(sender) => self.channels.push(sender.clone()),
_ => {
for tx in self.channels.iter() {
tx.send(msg.clone()).unwrap();
}
},
}
}
}
}
#[cfg(test)]
mod queues {
use crate::{
create_requests::root_doc_no_sess,
Request,
};
use std::time::Duration;
use super::*;
#[test]
fn create_queue() {
let mut channels = Vec::new();
for _ in 0..5 {
channels.push(channel());
}
let queue_tx = Queue::start();
for (tx, _) in channels.iter() {
queue_tx.send(Message::new(tx.clone().into())).unwrap();
}
let (req, _) = root_doc_no_sess();
queue_tx.send(Message::new(req.into())).unwrap();
for (_, rx) in channels.iter() {
match rx.recv_timeout(Duration::from_millis(500)).unwrap().get_data() {
MessageData::ClientMsg(_) => {}
_ => unreachable!("should have been a request"),
}
}
}
/*
#[test]
fn get_root_doc() {
let (client_tx, client_rx) = channel();
let sess = None;
let req = Request {
tx: client_tx,
session: sess,
};
let tx = Queue::start();
tx.send(Message::new(req.into())).unwrap();
client_rx.recv().unwrap();
}
*/
}
/*
use crate::Request;
use std::{
sync::mpsc::{channel, Receiver, Sender},
thread::spawn,
};
pub trait QueueClient: Sized {
fn new(tx: Sender<Message>, rx: Receiver<Message>) -> Self;
fn start(queue_tx: Sender<Message>) {
spawn(move || {
let (tx, rx) = channel();
let service = Self::new(queue_tx.clone(), rx);
queue_tx.send(tx.into()).unwrap();
service.listen();
});
}
fn get_receiver(&self) -> &Receiver<Message>;
fn listen(&self) {
let rx = self.get_receiver();
loop {
rx.recv().unwrap();
}
}
}
#[derive(Clone)]
pub enum Message {
Register(Sender<Message>),
Req(Request),
SessMsg(SessionMessage),
}
impl From<Request> for Message {
fn from(value: Request) -> Self {
Message::Req(value)
}
}
impl From<Sender<Message>> for Message {
@ -23,22 +227,22 @@ impl From<Sender<Message>> for Message {
}
}
impl From<SessionMessage> for Message {
fn from(value: SessionMessage) -> Self {
Message::SessMsg(value)
impl From<Request> for Message {
fn from(value: Request) -> Self {
Message::Req(value)
}
}
#[cfg(test)]
mod messages {
use super::*;
use crate::{create_request::empty_request, Field};
use crate::request_test_data::request_root_document;
#[test]
fn from_request() {
let (req, _) = empty_request();
let (req, _) = request_root_document();
match req.into() {
Message::Req(result) => assert!(result.get_session().is_none()),
Message::Req(_) => {},
_ => unreachable!("should have been s request"),
}
}
@ -48,7 +252,7 @@ mod messages {
let (tx, rx) = channel();
match tx.into() {
Message::Register(result) => {
let (req, _) = empty_request();
let (req, _) = request_root_document();
result.send(req.into()).unwrap();
match rx.recv().unwrap() {
Message::Req(_) => {}
@ -58,22 +262,9 @@ mod messages {
_ => unreachable!("should have been a register"),
}
}
#[test]
fn from_sessionmessage() {
let id: Option<Field> = None;
let sess_msg: SessionMessage = id.into();
match sess_msg.into() {
Message::SessMsg(result) => match result {
SessionMessage::Validate(data) => assert!(data.is_none()),
_ => unreachable!("should have been a validate"),
},
_ => unreachable!("should have been a session message"),
}
}
}
struct Queue {
pub struct Queue {
channels: Vec<Sender<Message>>,
rx: Receiver<Message>,
}
@ -100,7 +291,7 @@ impl Queue {
}
}
fn start() -> Sender<Message> {
pub fn start() -> Sender<Message> {
let (tx, rx) = channel();
spawn(move || {
let mut queue = Queue::new(rx);
@ -113,7 +304,7 @@ impl Queue {
#[cfg(test)]
mod queues {
use super::*;
use crate::create_request::empty_request;
use crate::request_test_data::request_root_document;
#[test]
fn create_queue() {
@ -121,11 +312,11 @@ mod queues {
for _ in 0..5 {
channels.push(channel());
}
let mut queue_tx = Queue::start();
let queue_tx = Queue::start();
for (tx, _) in channels.iter() {
queue_tx.send(tx.clone().into()).unwrap();
}
let (req, _) = empty_request();
let (req, _) = request_root_document();
queue_tx.send(req.into()).unwrap();
for (_, rx) in channels.iter() {
match rx.recv().unwrap() {
@ -135,3 +326,4 @@ mod queues {
}
}
}
*/