"""Fixture for setting up a single server.""" from asyncio import create_subprocess_exec, get_event_loop, sleep from pathlib import Path class Server: """Setup a single server.""" def __init__(self): """Initialization of a server.""" self.env = {} self.process = None self.loop = get_event_loop() @property def address(self): """Get the server address.""" address = "127.0.0.1" if "MMT_ADDRESS" in self.env: address = self.env["MMT_ADDRESS"] return address @address.setter def address(self, addr): """Sets the server address.""" self.env["MTT_ADDRESS"] = addr @property def port(self): """Get the port of the server.""" port = 9090 if "MTT_PORT" in self.env: port = self.env["MTT_PORT"] return port @port.setter def port(self, num): """Set the port for the server.""" self.env["MTT_PORT"] = num async def __start(self): """async start of the server.""" self.process = await create_subprocess_exec( Path.cwd().joinpath("target", "release", "morethantext_web") ) await sleep(1) async def __stop(self): """async stop of the server.""" if self.process is not None and self.process.returncode is None: self.process.terminate() await self.process.wait() def start(self): """Start the server.""" self.loop.run_until_complete(self.__start()) def stop(self): """Stop the server.""" self.loop.run_until_complete(self.__stop()) def destroy(self): """Removes the server instance.""" self.stop()