Beginning setup of new release testing.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s

This commit is contained in:
2025-06-15 09:46:50 -04:00
parent 5ed9f549a7
commit f6f8f0f205
8 changed files with 197 additions and 120 deletions

View File

111
release_tests/mtt_tc.py Normal file
View File

@@ -0,0 +1,111 @@
"""Base for MoreThanTest test cases."""
from asyncio import create_subprocess_exec, gather, sleep
from pathlib import Path
from socket import socket
from unittest import IsolatedAsyncioTestCase
from aiohttp import ClientSession
LOCALHOST = "127.0.0.1"
SESSION_KEY = "sessionid"
HOST = "example.com"
class Server:
"""Setup and run servers."""
def __init__(self, *args):
"""Initialize class"""
app = Path.cwd().joinpath("target", "release", "morethantext")
addr = "127.0.0.1"
port = 3000
if args:
self.cmd = list(args)
self.cmd.insert(0, app)
get_port = False
get_addr = False
for item in args:
if get_port:
port = item
get_port = False
if get_addr:
addr = item
get_addr = False
if item in ("-a", "--address"):
get_addr = True
if item in ("-p", "--port"):
get_port = True
else:
self.cmd = [app]
self.server = None
self.host = f"http://{addr}:{port}"
async def create(self):
"""Cerate the server"""
self.server = await create_subprocess_exec(*self.cmd)
await sleep(1)
async def destroy(self):
"""destroy servers"""
self.server.terminate()
await self.server.wait()
class MTTClusterTC(IsolatedAsyncioTestCase):
"""Test case for MoreThanTText."""
async def asyncSetUp(self):
"""Test setup"""
self.servers = []
self.cookies = {}
self.session = ClientSession()
async def asyncTearDown(self):
"""Test tear down."""
await self.session.close()
for server in self.servers:
await server.destroy()
@staticmethod
async def get_port():
"""Retrieve an unused port."""
sock = socket()
sock.bind((LOCALHOST, 0))
port = sock.getsockname()[1]
sock.close()
return port
async def create_server_with_flags(self, *args):
"""Create a single server with flags."""
server = Server(*args)
await server.create()
self.servers.append(server)
async def create_server(self):
"""Create a server on a random port."""
port = await self.get_port()
await self.create_server_with_flags("-p", str(port))
async def create_cluster(self, num=2):
"""Create a cluster of servers."""
ports = []
while len(ports) < num:
port = await self.get_port()
if port not in ports:
ports.append(port)
servers = []
for port in ports:
servers.append(self.create_server_with_flags("-p", str(port)))
cluster = gather(*servers)
await cluster
async def run_tests(self, uri, func):
"""Run the tests on each server."""
for server in self.servers:
async with self.session.get(
f"{server.host}{uri}", cookies=self.cookies
) as response:
if SESSION_KEY in response.cookies:
self.cookies[SESSION_KEY] = response.cookies[SESSION_KEY].value
func(response)

View File

@@ -0,0 +1,14 @@
"""Common support items for rekease teesting."""
from socket import socket
ADDR = "127.56.0.1"
def get_port():
"""Retrieve an unused port."""
sock = socket()
sock.bind((ADDR, 0))
port = sock.getsockname()[1]
sock.close()
return port

View File

@@ -0,0 +1,33 @@
"""Confirms translate moxcking is working."""
from unittest import IsolatedAsyncioTestCase
from uuid import uuid4
from aiohttp import ClientSession
from release_tests.support import ADDR
from release_tests.support.translate import Translate
class TranslateTC(IsolatedAsyncioTestCase):
"""Confirms the translate mocking is working."""
async def test_create_translater_server(self):
"""something"""
trans = Translate()
await trans.start()
async with ClientSession() as session:
async with session.get(f"http://{ADDR}:{trans.port}/") as resp:
text = await resp.text()
self.assertEqual(resp.status, 200, text)
async def test_url_reponses(self):
"""created url reponse the mocks"""
url = f"/{uuid4()}"
replies = [str(uuid4()), str(uuid4())]
trans = Translate(url=url, replies=replies)
await trans.start()
async with ClientSession() as session:
for reply in replies:
async with session.get(f"http://{ADDR}:{trans.port}{url}") as resp:
text = await resp.text()
self.assertEqual(resp.status, 200, text)
self.assertEqual(text, reply)

View File

@@ -0,0 +1,30 @@
"""this mocks a translate serve for etsting."""
from aiohttp import web
from release_tests.support import ADDR, get_port
class Translate:
"""Creates a translation mock for release testing."""
def __init__(self, url="/", replies=["Hello"]):
"""Initialize"""
async def reply(_):
"""response to return"""
nonlocal replies
return web.Response(text=replies.pop(0))
self.port = get_port()
self.app = web.Application()
self.app.add_routes([web.get(url, reply)])
self.runner = None
async def start(self):
"""Start the mock translateer."""
if self.runner:
return
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, ADDR, self.port)
await site.start()

View File

@@ -0,0 +1,102 @@
"""Tests for single server boot ups."""
from socket import gethostbyname, gethostname
from unittest import skip
from aiohttp import ClientSession
from .mtt_tc import MTTClusterTC, SESSION_KEY
class BootUpTC(MTTClusterTC):
"""Single server boot tests."""
async def test_default_boot(self):
"""Does the server default boot on http://localhost:3000?"""
await self.create_server_with_flags()
def tests(response):
"""Response tests."""
self.assertEqual(response.status, 200)
await self.run_tests("/", tests)
async def test_alt_port_boot(self):
"""Can the server boot off on alternate port?"""
port = 9025
await self.create_server_with_flags("-p", str(port))
def tests(response):
"""Response tests."""
self.assertEqual(response.status, 200)
await self.run_tests("/", tests)
async def test_alt_address_boot(self):
"""Can it boot off an alternate address?"""
addr = gethostbyname(gethostname())
await self.create_server_with_flags("-a", addr)
def tests(response):
"""Response tests."""
self.assertEqual(response.status, 200)
await self.run_tests("/", tests)
async def test_for_session_id(self):
"""Is there a session if?"""
await self.create_server()
def tests(response):
"""Response tests."""
self.assertIn(SESSION_KEY, response.cookies)
await self.run_tests("/", tests)
async def test_session_id_is_random(self):
"""Is the session id random?"""
await self.create_server()
async with ClientSession() as session:
async with session.get(f"{self.servers[0].host}/") as response:
result1 = response.cookies[SESSION_KEY].value
async with ClientSession() as session:
async with session.get(f"{self.servers[0].host}/") as response:
result2 = response.cookies[SESSION_KEY].value
self.assertNotEqual(result1, result2, "Session ids should be unique.")
@skip("Code not availaable yet.")
async def test_session_does_not_reset_after_connection(self):
"""Does the session id remain constant during the session"""
await self.create_server()
ids = []
def tests(response):
"""tests"""
if SESSION_KEY in response.cookies:
ids.append(response.cookies[SESSION_KEY].value)
for _ in range(2):
await self.run_tests("/", tests)
self.assertEqual(len(ids), 1)
async def test_reset_bad_session_id(self):
"""Does the session id get reset if bad or expired?"""
await self.create_server()
value = "bad id"
async with ClientSession() as session:
async with session.get(
f"{self.servers[0].host}/", cookies={SESSION_KEY: value}
) as response:
self.assertIn(SESSION_KEY, response.cookies)
self.assertNotEqual(response.cookies[SESSION_KEY].value, value)
@skip("Code not availaable yet.")
async def test_sessions_are_shared_between_servers(self):
"""Does the session apply to the cluster."""
await self.create_cluster()
ids = []
def tests(response):
if SESSION_KEY in response.cookies:
ids.append(response.cookies[SESSION_KEY].value)
await self.run_tests("/", tests)
self.assertEqual(len(ids), 1, "Session info should be shared to the cluster.")