rebuilt to use message router.
This commit is contained in:
parent
d4ce2ab03b
commit
52b6506088
124
src/cache.rs
124
src/cache.rs
@ -1,124 +0,0 @@
|
|||||||
use super::messages::{ReceiveMsg, SendMsg};
|
|
||||||
use std::sync::mpsc::Receiver;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
/// MoreThanText database Cache
|
|
||||||
pub struct Cache {
|
|
||||||
data: Vec<Uuid>,
|
|
||||||
rx: Receiver<SendMsg>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Cache {
|
|
||||||
/// Create database cache
|
|
||||||
///
|
|
||||||
/// This should not be called directly.
|
|
||||||
/// It is part of the MoreThanText::new function.
|
|
||||||
pub fn new(rx: Receiver<SendMsg>) -> Self {
|
|
||||||
Self {
|
|
||||||
data: Vec::new(),
|
|
||||||
rx: rx,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Starts listening for database requests.
|
|
||||||
///
|
|
||||||
/// Should not be directly called.
|
|
||||||
/// Part of the MoreThanText::new function.
|
|
||||||
pub fn listen(&mut self) {
|
|
||||||
loop {
|
|
||||||
match self.rx.recv().unwrap() {
|
|
||||||
SendMsg::OpenSession(msg) => msg.tx.send(self.get_session(msg.id)).unwrap(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_session(&mut self, id: Option<String>) -> ReceiveMsg {
|
|
||||||
let sid: ReceiveMsg;
|
|
||||||
match id {
|
|
||||||
Some(input) => match Uuid::parse_str(&input) {
|
|
||||||
Ok(vid) => {
|
|
||||||
if self.data.contains(&vid) {
|
|
||||||
sid = ReceiveMsg::Session(vid);
|
|
||||||
} else {
|
|
||||||
sid = self.new_session();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => sid = self.new_session(),
|
|
||||||
},
|
|
||||||
None => sid = self.new_session(),
|
|
||||||
}
|
|
||||||
sid
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_session(&mut self) -> ReceiveMsg {
|
|
||||||
let mut id = Uuid::new_v4();
|
|
||||||
while self.data.contains(&id) {
|
|
||||||
id = Uuid::new_v4();
|
|
||||||
}
|
|
||||||
self.data.push(id.clone());
|
|
||||||
ReceiveMsg::Session(id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod session {
|
|
||||||
use super::*;
|
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn unique_ids() {
|
|
||||||
let (_, rx) = channel();
|
|
||||||
let mut cache = Cache::new(rx);
|
|
||||||
let mut ids: Vec<Uuid> = Vec::new();
|
|
||||||
for _ in 1..10 {
|
|
||||||
let id = cache.get_session(None);
|
|
||||||
match id {
|
|
||||||
ReceiveMsg::Session(sid) => {
|
|
||||||
if ids.contains(&sid) {
|
|
||||||
assert!(false, "{} was a duplicate id.", sid)
|
|
||||||
}
|
|
||||||
ids.push(sid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn existing_ids_are_reused() {
|
|
||||||
let (_, rx) = channel();
|
|
||||||
let mut cache = Cache::new(rx);
|
|
||||||
let id1: Uuid;
|
|
||||||
let id2: Uuid;
|
|
||||||
match cache.get_session(None) {
|
|
||||||
ReceiveMsg::Session(sid) => id1 = sid,
|
|
||||||
}
|
|
||||||
match cache.get_session(Some(id1.to_string())) {
|
|
||||||
ReceiveMsg::Session(sid) => id2 = sid,
|
|
||||||
}
|
|
||||||
assert_eq!(id2, id1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_ids_generate_new_ones() {
|
|
||||||
let (_, rx) = channel();
|
|
||||||
let mut cache = Cache::new(rx);
|
|
||||||
let id: Uuid;
|
|
||||||
let bad_id = "A very bad id";
|
|
||||||
match cache.get_session(Some(bad_id.to_string())) {
|
|
||||||
ReceiveMsg::Session(sid) => id = sid,
|
|
||||||
}
|
|
||||||
assert_ne!(id.to_string(), bad_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn expired_ids_generate_new_ids() {
|
|
||||||
let (_, rx) = channel();
|
|
||||||
let mut cache = Cache::new(rx);
|
|
||||||
let old_id = Uuid::new_v4();
|
|
||||||
let id: Uuid;
|
|
||||||
match cache.get_session(Some(old_id.to_string())) {
|
|
||||||
ReceiveMsg::Session(sid) => id = sid,
|
|
||||||
}
|
|
||||||
assert_ne!(id, old_id);
|
|
||||||
}
|
|
||||||
}
|
|
157
src/client.rs
Normal file
157
src/client.rs
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
use super::{Message, Msg, MsgData, SessionMsg};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
sync::mpsc::{channel, Receiver, Sender},
|
||||||
|
thread::spawn,
|
||||||
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum ClientMsg {
|
||||||
|
OpenSession(Option<String>, Sender<Message>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Msg for ClientMsg {
|
||||||
|
fn to_msgdata(&self) -> MsgData {
|
||||||
|
MsgData::Client(self.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Client {
|
||||||
|
router_tx: Sender<Message>,
|
||||||
|
client_rx: Receiver<Message>,
|
||||||
|
requests: HashMap<Uuid, Sender<Message>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
fn new(router_tx: Sender<Message>, client_rx: Receiver<Message>) -> Self {
|
||||||
|
Self {
|
||||||
|
router_tx: router_tx,
|
||||||
|
client_rx: client_rx,
|
||||||
|
requests: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(router_tx: Sender<Message>) -> Sender<Message> {
|
||||||
|
let (client_tx, client_rx) = channel();
|
||||||
|
spawn(move || {
|
||||||
|
let mut client = Self::new(router_tx, client_rx);
|
||||||
|
client.listen();
|
||||||
|
});
|
||||||
|
client_tx
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&mut self) {
|
||||||
|
loop {
|
||||||
|
let msg = self.client_rx.recv().unwrap();
|
||||||
|
match self.requests.get(&msg.get_id()) {
|
||||||
|
Some(tx) => match msg.get_message() {
|
||||||
|
MsgData::Session(sess) => match sess {
|
||||||
|
SessionMsg::Opened(_) => tx.send(msg).unwrap(),
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
None => match msg.get_message() {
|
||||||
|
MsgData::Client(req) => match req {
|
||||||
|
ClientMsg::OpenSession(id, tx) => {
|
||||||
|
let sess = SessionMsg::Get(id.clone());
|
||||||
|
self.requests.insert(msg.get_id(), tx.clone());
|
||||||
|
self.router_tx.send(msg.reply(&sess)).unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod messages {
|
||||||
|
use super::{super::SessionData, *};
|
||||||
|
use std::time::Duration;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
fn setup_client() -> (Sender<Message>, Receiver<Message>) {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let client_tx = Client::start(tx);
|
||||||
|
(client_tx, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn opensession(id: &str) -> (Message, Receiver<Message>) {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let sess = ClientMsg::OpenSession(Some(id.to_string()), tx);
|
||||||
|
let msg = Message::new(&sess);
|
||||||
|
(msg, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn open_session() {
|
||||||
|
let (tx, rx) = setup_client();
|
||||||
|
let sess_id = Uuid::new_v4().to_string();
|
||||||
|
let (sess_msg, _) = opensession(&sess_id);
|
||||||
|
tx.send(sess_msg.clone()).unwrap();
|
||||||
|
let result1 = rx.recv().unwrap();
|
||||||
|
assert_eq!(result1.get_id(), sess_msg.get_id());
|
||||||
|
match result1.get_message() {
|
||||||
|
MsgData::Session(req) => match req {
|
||||||
|
SessionMsg::Get(sess_data) => match sess_data {
|
||||||
|
Some(id) => assert_eq!(id.to_string(), sess_id),
|
||||||
|
_ => unreachable!("Should have returned some data."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a get session message."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a Session message."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn respond_session() {
|
||||||
|
let (tx, _rx) = setup_client();
|
||||||
|
let (sess_msg, client_rx) = opensession(&Uuid::new_v4().to_string());
|
||||||
|
tx.send(sess_msg.clone()).unwrap();
|
||||||
|
let expected = Uuid::new_v4();
|
||||||
|
let right = SessionData::new(expected.clone());
|
||||||
|
let exp_msg = SessionMsg::Opened(right);
|
||||||
|
tx.send(sess_msg.reply(&exp_msg)).unwrap();
|
||||||
|
let result = client_rx.recv().unwrap();
|
||||||
|
assert_eq!(sess_msg.get_id(), result.get_id(), "Different message ids.");
|
||||||
|
match result.get_message() {
|
||||||
|
MsgData::Session(req) => match req {
|
||||||
|
SessionMsg::Opened(sess) => assert_eq!(
|
||||||
|
sess.to_string(),
|
||||||
|
expected.to_string(),
|
||||||
|
"Different sesssion ids."
|
||||||
|
),
|
||||||
|
_ => unreachable!("Should have been an opened session."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a session message."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_not_react_if_not_requested() {
|
||||||
|
let (tx, rx) = setup_client();
|
||||||
|
let sess = SessionData::new(Uuid::new_v4());
|
||||||
|
let exp_msg = SessionMsg::Opened(sess);
|
||||||
|
tx.send(Message::new(&exp_msg)).unwrap();
|
||||||
|
match rx.recv_timeout(Duration::from_millis(500)) {
|
||||||
|
Err(_) => {}
|
||||||
|
_ => unreachable!("Should not receive anything."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ignores_other_session_messages() {
|
||||||
|
let (tx, _rx) = setup_client();
|
||||||
|
let (sess_msg, client_rx) = opensession(&Uuid::new_v4().to_string());
|
||||||
|
tx.send(sess_msg.clone()).unwrap();
|
||||||
|
let req = SessionMsg::Get(None);
|
||||||
|
tx.send(sess_msg.reply(&req)).unwrap();
|
||||||
|
match client_rx.recv_timeout(Duration::from_millis(500)) {
|
||||||
|
Err(_) => {}
|
||||||
|
_ => unreachable!("Should not return anything."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
136
src/lib.rs
136
src/lib.rs
@ -1,20 +1,48 @@
|
|||||||
mod cache;
|
mod client;
|
||||||
mod messages;
|
mod message;
|
||||||
|
mod router;
|
||||||
|
mod session;
|
||||||
|
|
||||||
use cache::Cache;
|
//use client::ClientMsg;
|
||||||
use messages::{ReceiveMsg, SendMsg, SessionRequest};
|
//use router::Router;
|
||||||
|
//use session::{Session, SessionFilter, SessionMsg};
|
||||||
|
use client::{Client, ClientMsg};
|
||||||
|
use message::{Message, MsgData};
|
||||||
|
use router::Router;
|
||||||
|
use session::{Session, SessionData, SessionMsg};
|
||||||
use std::{
|
use std::{
|
||||||
sync::mpsc::{channel, Sender},
|
sync::mpsc::{channel, Sender},
|
||||||
thread::spawn,
|
|
||||||
};
|
};
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
/// Application connection to the database
|
/// Support functions for Messages.
|
||||||
|
pub trait Msg {
|
||||||
|
fn to_msgdata(&self) -> MsgData;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test_message {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub enum Tester {
|
||||||
|
Test1,
|
||||||
|
Test2,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Msg for Tester {
|
||||||
|
fn to_msgdata(&self) -> MsgData {
|
||||||
|
match self {
|
||||||
|
Tester::Test1 => MsgData::Test1,
|
||||||
|
Tester::Test2 => MsgData::Test2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Application client to MoreThanText
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MoreThanText {
|
pub struct MoreThanText {
|
||||||
id: Option<Uuid>,
|
session: Option<SessionData>,
|
||||||
tx: Sender<SendMsg>,
|
tx: Sender<Message>,
|
||||||
nodes: Vec<String>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MoreThanText {
|
impl MoreThanText {
|
||||||
@ -25,15 +53,18 @@ impl MoreThanText {
|
|||||||
/// ```
|
/// ```
|
||||||
/// use morethantext::MoreThanText;
|
/// use morethantext::MoreThanText;
|
||||||
///
|
///
|
||||||
/// MoreThanText::new(Vec::new());
|
/// MoreThanText::new();
|
||||||
/// ```
|
/// ```
|
||||||
pub fn new(_nodes: Vec<String>) -> Self {
|
pub fn new() -> Self {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
spawn(move || {
|
let mut senders = Vec::new();
|
||||||
let mut cache = Cache::new(rx);
|
senders.push(Client::start(tx.clone()));
|
||||||
cache.listen();
|
senders.push(Session::start(tx.clone()));
|
||||||
});
|
Router::start(senders, rx);
|
||||||
Self { id: None, tx: tx, nodes: Vec::new() }
|
Self {
|
||||||
|
session: None,
|
||||||
|
tx: tx,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opens an existing or new session with the database.
|
/// Opens an existing or new session with the database.
|
||||||
@ -46,17 +77,24 @@ impl MoreThanText {
|
|||||||
/// ```
|
/// ```
|
||||||
/// use morethantext::MoreThanText;
|
/// use morethantext::MoreThanText;
|
||||||
///
|
///
|
||||||
/// let mut mtt = MoreThanText::new(Vec::new());
|
/// let mut mtt = MoreThanText::new();
|
||||||
/// mtt.open_session(None);
|
/// mtt.open_session(None);
|
||||||
/// mtt.open_session(Some("7b1ff340-7dfa-4f29-b144-601384e54423".to_string()));
|
/// mtt.open_session(Some("7b1ff340-7dfa-4f29-b144-601384e54423".to_string()));
|
||||||
/// ```
|
/// ```
|
||||||
pub fn open_session(&mut self, id: Option<String>) {
|
pub fn open_session(&mut self, id: Option<String>) {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let request = SessionRequest { id: id, tx: tx };
|
let req = ClientMsg::OpenSession(id, tx);
|
||||||
self.tx.send(SendMsg::OpenSession(request)).unwrap();
|
let msg = Message::new(&req);
|
||||||
match rx.recv().unwrap() {
|
self.tx.send(msg).unwrap();
|
||||||
ReceiveMsg::Session(sid) => self.id = Some(sid),
|
match rx.recv().unwrap().get_message() {
|
||||||
|
MsgData::Session(data) => {
|
||||||
|
match data {
|
||||||
|
SessionMsg::Opened(sess) => self.session = Some(sess.clone()),
|
||||||
|
_ => {},
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
_ => {},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the session id
|
/// Get the session id
|
||||||
@ -66,13 +104,13 @@ impl MoreThanText {
|
|||||||
/// ```
|
/// ```
|
||||||
/// use morethantext::MoreThanText;
|
/// use morethantext::MoreThanText;
|
||||||
///
|
///
|
||||||
/// let mut mtt = MoreThanText::new(Vec::new());
|
/// let mut mtt = MoreThanText::new();
|
||||||
/// mtt.get_id();
|
/// let id = mtt.get_id();
|
||||||
/// ```
|
/// ```
|
||||||
pub fn get_id(&self) -> String {
|
pub fn get_id(&self) -> String {
|
||||||
match self.id {
|
match &self.session {
|
||||||
Some(sid) => sid.to_string(),
|
Some(id) => id.to_string(),
|
||||||
None => "none".to_string(),
|
None => "".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,40 +120,24 @@ mod mtt_client {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn uniques_ids() {
|
fn default_values() {
|
||||||
let mut mtt = MoreThanText::new(Vec::new());
|
let mtt = MoreThanText::new();
|
||||||
let mut ids: Vec<Uuid> = Vec::new();
|
assert!(mtt.session.is_none());
|
||||||
for _ in 1..10 {
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_session() {
|
||||||
|
let mut mtt = MoreThanText::new();
|
||||||
mtt.open_session(None);
|
mtt.open_session(None);
|
||||||
let id = mtt.id.clone().unwrap();
|
assert!(mtt.session.is_some());
|
||||||
if ids.contains(&id) {
|
|
||||||
assert!(false, "{} is a duplicate id", id);
|
|
||||||
}
|
|
||||||
ids.push(id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn existing_ids_are_reused() {
|
fn session_ids_are_unique() {
|
||||||
let mut mtt = MoreThanText::new(Vec::new());
|
let mut mtt = MoreThanText::new();
|
||||||
mtt.open_session(None);
|
mtt.open_session(None);
|
||||||
let holder = mtt.id.clone().unwrap().to_string();
|
let id1 = mtt.get_id();
|
||||||
mtt.open_session(Some(holder.clone()));
|
mtt.open_session(None);
|
||||||
assert_eq!(mtt.id.clone().unwrap().to_string(), holder);
|
assert_ne!(mtt.get_id(), id1);
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_ids_generate_new_ones() {
|
|
||||||
let mut mtt = MoreThanText::new(Vec::new());
|
|
||||||
mtt.open_session(Some("bad test string".to_string()));
|
|
||||||
assert!(mtt.id.is_some());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn incorrect_ids_generate_new_ones() {
|
|
||||||
let mut mtt = MoreThanText::new(Vec::new());
|
|
||||||
let holder = Uuid::new_v4();
|
|
||||||
mtt.open_session(Some(holder.clone().to_string()));
|
|
||||||
assert_ne!(mtt.id, Some(holder));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
12
src/main.rs
12
src/main.rs
@ -33,8 +33,7 @@ mod http_session {
|
|||||||
async fn main() {
|
async fn main() {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
let addr = format!("{}:{}", args.address, args.port);
|
let addr = format!("{}:{}", args.address, args.port);
|
||||||
let nodes = args.node;
|
let state = MoreThanText::new();
|
||||||
let state = MoreThanText::new(nodes);
|
|
||||||
let app = Router::new().route("/", get(handler)).with_state(state);
|
let app = Router::new().route("/", get(handler)).with_state(state);
|
||||||
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
||||||
axum::serve(listener, app.into_make_service())
|
axum::serve(listener, app.into_make_service())
|
||||||
@ -43,12 +42,11 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(jar: CookieJar, mut state: State<MoreThanText>) -> impl IntoResponse {
|
async fn handler(jar: CookieJar, mut state: State<MoreThanText>) -> impl IntoResponse {
|
||||||
let sid: Option<String>;
|
|
||||||
let mut cookies = jar.clone();
|
let mut cookies = jar.clone();
|
||||||
match jar.get(SESSION_KEY) {
|
let sid = match jar.get(SESSION_KEY) {
|
||||||
Some(cookie) => sid = Some(cookie.value().to_string()),
|
Some(cookie) => Some(cookie.value().to_string()),
|
||||||
None => sid = None,
|
None => None,
|
||||||
}
|
};
|
||||||
state.open_session(sid.clone());
|
state.open_session(sid.clone());
|
||||||
if !sid.is_some_and(|x| x == state.get_id()) {
|
if !sid.is_some_and(|x| x == state.get_id()) {
|
||||||
let cookie = Cookie::build((SESSION_KEY, state.get_id()));
|
let cookie = Cookie::build((SESSION_KEY, state.get_id()));
|
||||||
|
93
src/message.rs
Normal file
93
src/message.rs
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
use super::{ClientMsg, Msg, SessionMsg};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// Message Types
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum MsgData {
|
||||||
|
Client(ClientMsg),
|
||||||
|
Session(SessionMsg),
|
||||||
|
Test1,
|
||||||
|
Test2,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// MoreThanText Message Structure
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Message {
|
||||||
|
id: Uuid,
|
||||||
|
msg: MsgData,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
pub fn new<D>(data: &D) -> Self
|
||||||
|
where
|
||||||
|
D: Msg,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
msg: data.to_msgdata(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_message(&self) -> &MsgData {
|
||||||
|
&self.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_id(&self) -> Uuid {
|
||||||
|
self.id.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reply<D>(&self, data: &D) -> Self
|
||||||
|
where
|
||||||
|
D: Msg,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
id: self.id.clone(),
|
||||||
|
msg: data.to_msgdata(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod messages {
|
||||||
|
use super::{super::test_message::Tester, *};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_messagees() {
|
||||||
|
let data = Tester::Test1;
|
||||||
|
let msg = Message::new(&data);
|
||||||
|
match msg.get_message() {
|
||||||
|
MsgData::Test1 => {}
|
||||||
|
_ => unreachable!("Should have received Test1"),
|
||||||
|
}
|
||||||
|
let data = Tester::Test2;
|
||||||
|
let msg = Message::new(&data);
|
||||||
|
match msg.get_message() {
|
||||||
|
MsgData::Test2 => {}
|
||||||
|
_ => unreachable!("Should have received Test1"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_ids_are_unique() {
|
||||||
|
let mut ids: Vec<Uuid> = Vec::new();
|
||||||
|
let data = Tester::Test1;
|
||||||
|
for _ in 1..10 {
|
||||||
|
let msg = Message::new(&data);
|
||||||
|
assert!(!ids.contains(&msg.get_id()), "Had a duplicate id");
|
||||||
|
ids.push(msg.get_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_replies() {
|
||||||
|
let data1 = Tester::Test1;
|
||||||
|
let data2 = Tester::Test2;
|
||||||
|
let msg = Message::new(&data1);
|
||||||
|
let reply = msg.reply(&data2);
|
||||||
|
assert_eq!(reply.get_id(), msg.get_id());
|
||||||
|
match reply.get_message() {
|
||||||
|
MsgData::Test2 => {}
|
||||||
|
_ => unreachable!("Should have been a Test1"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,21 +0,0 @@
|
|||||||
/// These are the nessages that are used by the database.
|
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
/// Requests of the database.
|
|
||||||
pub enum SendMsg {
|
|
||||||
OpenSession(SessionRequest),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Responses to database requests
|
|
||||||
pub enum ReceiveMsg {
|
|
||||||
Session(Uuid),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Items needed for a session request.
|
|
||||||
pub struct SessionRequest {
|
|
||||||
/// Optional string reprosentation of the session id.
|
|
||||||
pub id: Option<String>,
|
|
||||||
/// Channel Sender for the reply.
|
|
||||||
pub tx: Sender<ReceiveMsg>,
|
|
||||||
}
|
|
66
src/router.rs
Normal file
66
src/router.rs
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
use super::Message;
|
||||||
|
use std::{
|
||||||
|
sync::mpsc::{Receiver, Sender},
|
||||||
|
thread::spawn,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Router {
|
||||||
|
txs: Vec<Sender<Message>>,
|
||||||
|
rx: Receiver<Message>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Router {
|
||||||
|
fn new(senders: Vec<Sender<Message>>, rx: Receiver<Message>) -> Self {
|
||||||
|
Self {
|
||||||
|
txs: senders,
|
||||||
|
rx: rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(senders: Vec<Sender<Message>>, rx: Receiver<Message>) {
|
||||||
|
spawn(move || {
|
||||||
|
let router = Router::new(senders, rx);
|
||||||
|
router.listen();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&self) {
|
||||||
|
loop {
|
||||||
|
let msg = self.rx.recv().unwrap();
|
||||||
|
for tx in self.txs.iter() {
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod messages {
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
|
use super::super::MsgData;
|
||||||
|
use super::{super::test_message::Tester, *};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn forward_messages() {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let (tx1, rx1) = channel();
|
||||||
|
let (tx2, rx2) = channel();
|
||||||
|
let senders = vec![tx1, tx2];
|
||||||
|
Router::start(senders, rx);
|
||||||
|
let data = Tester::Test1;
|
||||||
|
let msg = Message::new(&data);
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
let result1 = rx1.recv().unwrap();
|
||||||
|
assert_eq!(result1.get_id(), msg.get_id());
|
||||||
|
match result1.get_message() {
|
||||||
|
MsgData::Test1 => {}
|
||||||
|
_ => unreachable!("Should have been test1."),
|
||||||
|
}
|
||||||
|
let result2 = rx2.recv().unwrap();
|
||||||
|
assert_eq!(result2.get_id(), msg.get_id());
|
||||||
|
match result2.get_message() {
|
||||||
|
MsgData::Test1 => {}
|
||||||
|
_ => unreachable!("Should have been test1."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
238
src/session.rs
Normal file
238
src/session.rs
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
use super::{Message, Msg, MsgData};
|
||||||
|
use std::{
|
||||||
|
fmt,
|
||||||
|
sync::mpsc::{channel, Receiver, Sender},
|
||||||
|
thread::spawn,
|
||||||
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum SessionMsg {
|
||||||
|
Get(Option<String>),
|
||||||
|
Opened(SessionData),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Msg for SessionMsg {
|
||||||
|
fn to_msgdata(&self) -> MsgData {
|
||||||
|
MsgData::Session(self.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SessionData {
|
||||||
|
id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionData {
|
||||||
|
pub fn new(id: Uuid) -> Self {
|
||||||
|
Self { id: id }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for SessionData {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Session {
|
||||||
|
router_tx: Sender<Message>,
|
||||||
|
session_rx: Receiver<Message>,
|
||||||
|
ids: Vec<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Session {
|
||||||
|
fn new(router_tx: Sender<Message>, session_rx: Receiver<Message>) -> Self {
|
||||||
|
Self {
|
||||||
|
router_tx: router_tx,
|
||||||
|
session_rx: session_rx,
|
||||||
|
ids: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(router_tx: Sender<Message>) -> Sender<Message> {
|
||||||
|
let (session_tx, session_rx) = channel();
|
||||||
|
spawn(move || {
|
||||||
|
let mut session = Session::new(router_tx, session_rx);
|
||||||
|
session.listen();
|
||||||
|
});
|
||||||
|
session_tx
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&mut self) {
|
||||||
|
loop {
|
||||||
|
let msg = self.session_rx.recv().unwrap();
|
||||||
|
match msg.get_message() {
|
||||||
|
MsgData::Session(data) => match data {
|
||||||
|
SessionMsg::Get(req_id) => {
|
||||||
|
let id: Uuid;
|
||||||
|
match req_id {
|
||||||
|
Some(req) => {
|
||||||
|
match Uuid::try_parse(req) {
|
||||||
|
Ok(data) => {
|
||||||
|
if self.ids.contains(&data) {
|
||||||
|
id = data;
|
||||||
|
} else {
|
||||||
|
id = self.create_session();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => id = self.create_session(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => id = self.create_session(),
|
||||||
|
}
|
||||||
|
let data = SessionMsg::Opened(SessionData::new(id));
|
||||||
|
self.router_tx.send(msg.reply(&data)).unwrap()
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_session(&mut self) -> Uuid {
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
self.ids.push(id.clone());
|
||||||
|
id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod messages {
|
||||||
|
use super::{super::test_message::Tester, *};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn setup_session() -> (Sender<Message>, Receiver<Message>) {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let session_tx = Session::start(tx);
|
||||||
|
(session_tx, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ignore_unwanted_messages() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let data = Tester::Test1;
|
||||||
|
let msg = Message::new(&data);
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
match rx.recv_timeout(Duration::from_millis(500)) {
|
||||||
|
Err(_) => {}
|
||||||
|
_ => unreachable!("Should not receive anything."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_new_session() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let msgdata = SessionMsg::Get(None);
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
let result = rx.recv().unwrap();
|
||||||
|
assert_eq!(result.get_id(), msg.get_id());
|
||||||
|
match result.get_message() {
|
||||||
|
MsgData::Session(data) => match data {
|
||||||
|
SessionMsg::Opened(_) => {}
|
||||||
|
_ => unreachable!("Should have been an opened response."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should be a session responsee."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ignore_session_replies() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let msgdata = SessionMsg::Opened(SessionData::new(Uuid::new_v4()));
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
match rx.recv_timeout(Duration::from_millis(500)) {
|
||||||
|
Err(_) => {}
|
||||||
|
_ => unreachable!("Should not receive anything."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ids_must_be_unique() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let msgdata = SessionMsg::Get(None);
|
||||||
|
let mut ids: Vec<String> = Vec::new();
|
||||||
|
for _ in 0..10 {
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
match rx.recv().unwrap().get_message() {
|
||||||
|
MsgData::Session(msg) => match msg {
|
||||||
|
SessionMsg::Opened(sess) => {
|
||||||
|
let id = sess.to_string();
|
||||||
|
assert!(!ids.contains(&id), "duplicated id found.");
|
||||||
|
ids.push(id);
|
||||||
|
}
|
||||||
|
_ => unreachable!("Shouuld have opened a session."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should be a session message"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn expired_ids_get_new() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let old_id = Uuid::new_v4();
|
||||||
|
let msgdata = SessionMsg::Get(Some(old_id.to_string()));
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
let result = rx.recv().unwrap();
|
||||||
|
assert_eq!(result.get_id(), msg.get_id());
|
||||||
|
match result.get_message() {
|
||||||
|
MsgData::Session(msg) => match msg {
|
||||||
|
SessionMsg::Opened(sess) => assert_ne!(sess.to_string(), old_id.to_string()),
|
||||||
|
_ => unreachable!("Should habe been an Opened message."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a session message."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bad_session_ids_get_new() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let id = "something badA";
|
||||||
|
let msgdata = SessionMsg::Get(Some(id.to_string()));
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
let result = rx.recv().unwrap();
|
||||||
|
assert_eq!(result.get_id(), msg.get_id());
|
||||||
|
match result.get_message() {
|
||||||
|
MsgData::Session(data) => match data {
|
||||||
|
SessionMsg::Opened(sess) => assert_ne!(sess.to_string(), id),
|
||||||
|
_ => unreachable!("Should habe been an Opened message."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a session message."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn uses_existing_session() {
|
||||||
|
let (tx, rx) = setup_session();
|
||||||
|
let msgdata = SessionMsg::Get(None);
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
let result = rx.recv().unwrap();
|
||||||
|
let thesess = match result.get_message() {
|
||||||
|
MsgData::Session(data) => match data {
|
||||||
|
SessionMsg::Opened(sess) => sess,
|
||||||
|
_ => unreachable!("Should habe been an Opened message."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a session message."),
|
||||||
|
};
|
||||||
|
let msgdata = SessionMsg::Get(Some(thesess.to_string()));
|
||||||
|
let msg = Message::new(&msgdata);
|
||||||
|
tx.send(msg.clone()).unwrap();
|
||||||
|
let result = rx.recv().unwrap();
|
||||||
|
assert_eq!(result.get_id(), msg.get_id());
|
||||||
|
match result.get_message() {
|
||||||
|
MsgData::Session(data) => match data {
|
||||||
|
SessionMsg::Opened(sess) => assert_eq!(sess.to_string(), thesess.to_string(), "Should use existing sesssion."),
|
||||||
|
_ => unreachable!("Should habe been an Opened message."),
|
||||||
|
},
|
||||||
|
_ => unreachable!("Should have been a session message."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
"""Tests for single server boot ups."""
|
"""Tests for single server boot ups."""
|
||||||
|
|
||||||
from socket import gethostbyname, gethostname
|
from socket import gethostbyname, gethostname
|
||||||
|
from unittest import skip
|
||||||
from aiohttp import ClientSession
|
from aiohttp import ClientSession
|
||||||
from .mtt_tc import MTTClusterTC, SESSION_KEY
|
from .mtt_tc import MTTClusterTC, SESSION_KEY
|
||||||
|
|
||||||
@ -86,6 +87,7 @@ class BootUpTC(MTTClusterTC):
|
|||||||
self.assertIn(SESSION_KEY, response.cookies)
|
self.assertIn(SESSION_KEY, response.cookies)
|
||||||
self.assertNotEqual(response.cookies[SESSION_KEY].value, value)
|
self.assertNotEqual(response.cookies[SESSION_KEY].value, value)
|
||||||
|
|
||||||
|
@skip("Code not availaable yet.")
|
||||||
async def test_sessions_are_shared_between_servers(self):
|
async def test_sessions_are_shared_between_servers(self):
|
||||||
"""Does the session apply to the cluster."""
|
"""Does the session apply to the cluster."""
|
||||||
await self.create_cluster()
|
await self.create_cluster()
|
||||||
|
Loading…
Reference in New Issue
Block a user