Got the cache channels setup.
This commit is contained in:
parent
e2d32f4a8c
commit
572abbeda8
@ -1,9 +1,9 @@
|
|||||||
use async_std::{
|
use async_std::{
|
||||||
channel::{unbounded, Sender},
|
channel::{unbounded, Receiver, Sender},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
task::spawn,
|
task::spawn,
|
||||||
};
|
};
|
||||||
use std::{error::Error, fmt};
|
use std::{collections::HashMap, error::Error, fmt};
|
||||||
|
|
||||||
const ENTRY: &str = "EntryPoint";
|
const ENTRY: &str = "EntryPoint";
|
||||||
|
|
||||||
@ -180,17 +180,33 @@ mod datatypes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum FromCache {
|
||||||
|
Data(HashMap<String, DataType>),
|
||||||
|
Error(MTTError),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CacheQuery {
|
||||||
|
ids: Vec<String>,
|
||||||
|
reply: Sender<FromCache>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ToCache {
|
||||||
|
Query(CacheQuery),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MoreThanText {
|
pub struct MoreThanText {
|
||||||
session: Vec<String>,
|
session: Vec<String>,
|
||||||
channel: Sender<String>,
|
cache: Sender<Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MoreThanText {
|
impl MoreThanText {
|
||||||
async fn new() {}
|
async fn new(cache: Sender<Vec<String>>) -> Result<Self, MTTError> {
|
||||||
|
Ok(Self {
|
||||||
async fn get_entry(&self, id: String) {
|
session: [ENTRY.to_string()].to_vec(),
|
||||||
self.channel.send(id).await.unwrap();
|
cache: cache,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,25 +216,116 @@ mod mtt {
|
|||||||
|
|
||||||
#[async_std::test]
|
#[async_std::test]
|
||||||
async fn create() {
|
async fn create() {
|
||||||
MoreThanText::new().await;
|
let (s, _) = unbounded();
|
||||||
|
let mtt = MoreThanText::new(s).await.unwrap();
|
||||||
|
assert_eq!(mtt.session, [ENTRY]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_db<P>(dir: P) -> Result<MoreThanText, MTTError>
|
struct Cache {
|
||||||
|
channel: Receiver<ToCache>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Cache {
|
||||||
|
async fn new<P>(_dir: P, channel: Receiver<ToCache>) -> Result<Self, MTTError>
|
||||||
|
where
|
||||||
|
P: Into<PathBuf>,
|
||||||
|
{
|
||||||
|
Ok(Self { channel: channel })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start(&self) {
|
||||||
|
loop {
|
||||||
|
match self.channel.recv().await.unwrap() {
|
||||||
|
ToCache::Query(data) => {
|
||||||
|
for id in data.ids {
|
||||||
|
if id == ENTRY {
|
||||||
|
let mut holder = HashMap::new();
|
||||||
|
holder.insert(ENTRY.to_string(), DataType::new("store").unwrap());
|
||||||
|
data.reply.send(FromCache::Data(holder)).await.unwrap();
|
||||||
|
} else {
|
||||||
|
data.reply
|
||||||
|
.send(FromCache::Error(MTTError::new("fred")))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod caches {
|
||||||
|
use super::*;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
async fn start_cache<P>(dir: P) -> Sender<ToCache>
|
||||||
|
where
|
||||||
|
P: Into<PathBuf>,
|
||||||
|
{
|
||||||
|
let (s, r) = unbounded();
|
||||||
|
let datadir = dir.into();
|
||||||
|
spawn(async move {
|
||||||
|
let cache = Cache::new(datadir, r).await.unwrap();
|
||||||
|
cache.start().await;
|
||||||
|
});
|
||||||
|
s
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_std::test]
|
||||||
|
async fn create() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let s_cache = start_cache(dir.path()).await;
|
||||||
|
let (s_rep, r_rep) = unbounded();
|
||||||
|
let request = ToCache::Query(CacheQuery {
|
||||||
|
ids: [ENTRY.to_string()].to_vec(),
|
||||||
|
reply: s_rep,
|
||||||
|
});
|
||||||
|
s_cache.send(request).await.unwrap();
|
||||||
|
let result = r_rep.recv().await.unwrap();
|
||||||
|
match result {
|
||||||
|
FromCache::Data(data) => match data.get(ENTRY) {
|
||||||
|
Some(output) => match output {
|
||||||
|
DataType::DBMap(_) => (),
|
||||||
|
_ => assert!(false, "{:?} is not a database store.", output),
|
||||||
|
},
|
||||||
|
None => assert!(false, "Should contain entry point."),
|
||||||
|
},
|
||||||
|
_ => assert!(false, "{:?} should have been a store.", result),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_std::test]
|
||||||
|
async fn bad_entry() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let s_cache = start_cache(dir.path()).await;
|
||||||
|
let (s_rep, r_rep) = unbounded();
|
||||||
|
let request = ToCache::Query(CacheQuery {
|
||||||
|
ids: ["bad_id".to_string()].to_vec(),
|
||||||
|
reply: s_rep,
|
||||||
|
});
|
||||||
|
s_cache.send(request).await.unwrap();
|
||||||
|
let result = r_rep.recv().await.unwrap();
|
||||||
|
match result {
|
||||||
|
FromCache::Error(_) => (),
|
||||||
|
_ => assert!(false, "{:?} should have been an error.", result),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_db<P>(_dir: P) -> Result<MoreThanText, MTTError>
|
||||||
where
|
where
|
||||||
P: Into<PathBuf>,
|
P: Into<PathBuf>,
|
||||||
{
|
{
|
||||||
let data_dir = dir.into();
|
|
||||||
let (s, r) = unbounded();
|
let (s, r) = unbounded();
|
||||||
spawn(async move {
|
spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
r.recv().await.unwrap();
|
r.recv().await.unwrap();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(MoreThanText {
|
Ok(MoreThanText::new(s).await.unwrap())
|
||||||
session: [ENTRY.to_string()].to_vec(),
|
|
||||||
channel: s,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
Loading…
Reference in New Issue
Block a user