Foundation Core Registry
core.py CORE
The atomic foundation.
Capability is the unit. Provider registers them. HascomRegistry is fractal — it is itself a capability. Auto-discovers providers from providers/ via pkgutil.iter_modules."""HASCOM Core — Capability, Provider, and HascomRegistry.
The fractal capability registry. A Capability is the atomic unit.
A Registry holds capabilities and is itself a capability.
"""
import asyncio
import importlib
import inspect
import json
import logging
import pkgutil
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
log = logging.getLogger("hascom")
HASCOM_VERSION = "1.0.0"
@dataclass
class Capability:
"""Atomic unit of HASCOM — one invokable thing."""
name: str # "mhsync.encrypt", "cf.d1_query"
domain: str # "local", "cloudflare", "syncropy", "system"
description: str # Human/AI readable purpose
interface: str # "function" | "http" | "websocket" | "cli"
invoke: Callable # The actual callable (sync or async)
status: Callable # () -> {"ok": bool, "detail": str}
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Serialize for manifest (without callables)."""
try:
st = self.status()
except Exception as e:
st = {"ok": False, "detail": str(e)}
return {
"domain": self.domain,
"interface": self.interface,
"description": self.description,
"status": "ok" if st.get("ok") else "error",
"status_detail": st.get("detail", ""),
"tags": self.tags,
}
class Provider:
"""Base class for capability providers."""
domain: str = "unknown"
name: str = "unknown"
def register(self, registry: 'HascomRegistry') -> List[Capability]:
"""Return list of capabilities to register. Override in subclass."""
raise NotImplementedError
class HascomRegistry:
"""The fractal registry. Is itself a capability."""
def __init__(self, node_id: str = "", machine: str = ""):
self.capabilities: Dict[str, Capability] = {}
self.providers: Dict[str, Provider] = {}
self.node_id = node_id
self.machine = machine
self._manifest_path = Path(__file__).parent / "hascom_manifest.json"
def register(self, cap: Capability):
"""Register a single capability."""
if cap.name in self.capabilities:
log.warning("Overwriting capability: %s", cap.name)
self.capabilities[cap.name] = cap
log.debug("Registered: %s [%s/%s]", cap.name, cap.domain, cap.interface)
def register_provider(self, provider: Provider):
"""Register a provider and all its capabilities."""
caps = provider.register(self)
for cap in caps:
self.register(cap)
self.providers[provider.name] = provider
log.info("Provider '%s' registered %d capabilities", provider.name, len(caps))
def find(self, query: str = None, domain: str = None,
tags: list = None) -> List[Capability]:
"""Search capabilities by keyword, domain, or tags."""
results = list(self.capabilities.values())
if domain:
results = [c for c in results if c.domain == domain]
if tags:
tag_set = set(tags)
results = [c for c in results if tag_set & set(c.tags)]
if query:
q = query.lower()
results = [c for c in results
if q in c.name.lower()
or q in c.description.lower()
or any(q in t.lower() for t in c.tags)]
return results
def invoke(self, name: str, **kwargs) -> Any:
"""Invoke a capability by name."""
cap = self.capabilities.get(name)
if not cap:
raise KeyError(f"Capability not found: {name}")
fn = cap.invoke
if asyncio.iscoroutinefunction(fn):
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, fn(**kwargs))
return future.result()
else:
return asyncio.run(fn(**kwargs))
else:
return fn(**kwargs)
def status(self) -> dict:
"""Health of all registered capabilities."""
results = {}
ok_count = 0
err_count = 0
for name, cap in sorted(self.capabilities.items()):
try:
st = cap.status()
except Exception as e:
st = {"ok": False, "detail": str(e)}
results[name] = st
if st.get("ok"):
ok_count += 1
else:
err_count += 1
return {
"ok": err_count == 0,
"total": len(self.capabilities),
"ok_count": ok_count,
"error_count": err_count,
"capabilities": results,
}
def manifest(self, save: bool = True) -> dict:
"""Full JSON capability catalog."""
caps = {}
for name, cap in sorted(self.capabilities.items()):
caps[name] = cap.to_dict()
doc = {
"hascom_version": HASCOM_VERSION,
"node_id": self.node_id,
"machine": self.machine,
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
"capabilities": caps,
"providers": sorted(self.providers.keys()),
"total_capabilities": len(caps),
}
if save:
try:
with open(self._manifest_path, "w", encoding="utf-8") as f:
json.dump(doc, f, indent=2)
log.debug("Manifest written: %s", self._manifest_path)
except OSError as e:
log.warning("Could not write manifest: %s", e)
return doc
def as_capability(self) -> Capability:
"""Fractal: the registry as a capability."""
return Capability(
name="hascom.registry",
domain="local",
description="HASCOM capability registry — query, invoke, and discover capabilities",
interface="function",
invoke=lambda **kw: self.manifest(save=False),
status=lambda: {"ok": True, "detail": f"{len(self.capabilities)} capabilities"},
tags=["hascom", "registry", "meta"],
)
def load_providers(self):
"""Auto-discover and load all providers from the providers/ package."""
providers_dir = Path(__file__).parent / "providers"
if not providers_dir.exists():
log.warning("providers/ directory not found")
return
for importer, modname, ispkg in pkgutil.iter_modules([str(providers_dir)]):
if modname.startswith("_"):
continue
try:
mod = importlib.import_module(f"hascom.providers.{modname}")
if hasattr(mod, "get_provider"):
provider = mod.get_provider()
self.register_provider(provider)
else:
log.debug("Provider module '%s' has no get_provider()", modname)
except Exception as e:
log.warning("Failed to load provider '%s': %s", modname, e)
# Register self as capability (fractal)
self.register(self.as_capability())
Knowledge Symbol-Level Indexing Engine
analyzers.py CORE
Four analyzers that turn source files into searchable symbols:
analyze_worker() — Cloudflare Worker routes + handlers,
analyze_frontend() — API calls + event listeners,
analyze_schema() — CREATE TABLE + columns from migrations,
analyze_wiring() — cross-references api_call → route relationships.
All general-purpose — not SubX-specific.
"""HASCOM Analyzers — symbol-level indexing for large source files.
Extracts routes, functions, API calls, tables, and columns from source files
and stores them in the taxonomy symbols table. Turns file-level knowledge
into function-level knowledge.
Analyzers:
analyze_worker(path) → routes + handlers from Cloudflare Worker .js files
analyze_frontend(path) → functions + API calls from frontend .html/.js files
analyze_schema(path) → tables + columns from migration .sql files
analyze_wiring(db) → cross-reference api_call → route relationships
Usage:
from hascom.analyzers import analyze_all
symbols, relationships = analyze_all(db) # runs all analyzers, stores results
All analyzers are general-purpose: they work on any Worker, any frontend,
any SQL migration. Not SubX-specific.
"""
import json
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
log = logging.getLogger("hascom.analyzers")
# ─── Domain detection ────────────────────────────────────────────────────────
def _detect_domain(path: str) -> str:
"""Infer domain from file path."""
lower = path.lower()
for name in ["consenta", "hascom", "mhsync", "quanticfork", "athena"]:
if name in lower:
return name
if "weyland" in lower or "subx" in lower or "prototype1" in lower:
return "weyland"
return "unknown"
# ─── Worker Route Analyzer ───────────────────────────────────────────────────
# Patterns for Cloudflare Worker route matching
_ROUTE_PATTERNS = [
# itty-router style: router.get('/api/...', ...)
re.compile(
r"""router\.(get|post|put|patch|delete|options|all)\s*\(\s*['"`](/[^'"`]+)['"`]""",
re.IGNORECASE,
),
# Direct equality: path === '/api/...'
re.compile(
r"""(?:url\.pathname|path)\s*===?\s*['"`](/(?:api|q|quote)/[^'"`]+)['"`]""",
),
# startsWith: url.pathname.startsWith('/api/...')
re.compile(
r"""(?:url\.pathname|path)\.startsWith\s*\(\s*['"`](/(?:api|q|quote)/[^'"`]+)['"`]\s*\)""",
),
# .match with regex
re.compile(
r"""(?:url\.pathname|path)\.match\s*\(\s*/\^?\\?(/(?:api|q|quote)/[^\s/]+(?:/[^\s/]+)*)/""",
),
]
_METHOD_PATTERN = re.compile(
r"""(?:method|request\.method)\s*===?\s*['"`](GET|POST|PUT|PATCH|DELETE)['"`]""",
)
_FUNC_PATTERNS = [
re.compile(r"^(?:async\s+)?function\s+(\w+)\s*\("),
re.compile(r"^(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?function"),
re.compile(r"^(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\("),
re.compile(r"^export\s+(?:async\s+)?function\s+(\w+)\s*\("),
]
def analyze_worker(path: str) -> List[dict]:
"""Extract routes and handler functions from a Cloudflare Worker JS file."""
path_obj = Path(path)
if not path_obj.exists():
return []
content = path_obj.read_text(encoding="utf-8", errors="replace")
lines = content.split("\n")
domain = _detect_domain(path)
symbols = []
current_function = None
recent_method = None
recent_method_line = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("//") or stripped.startswith("/*") or stripped.startswith("*"):
continue
for fp in _FUNC_PATTERNS:
m = fp.search(stripped)
if m:
fname = m.group(1)
if len(fname) > 2 and not fname.startswith("_U"):
current_function = fname
symbols.append({
"path": path, "name": fname, "line": i,
"symbol_type": "function", "parent": None,
"domain": domain, "metadata": None,
})
break
mm = _METHOD_PATTERN.search(stripped)
if mm:
recent_method = mm.group(1)
recent_method_line = i
for rp in _ROUTE_PATTERNS:
rm = rp.search(stripped)
if rm:
if rm.lastindex >= 2:
method = rm.group(1).upper()
route = rm.group(2)
else:
route = rm.group(1).replace("\\", "")
method = "?"
if recent_method and (i - recent_method_line) < 20:
method = recent_method
for j in range(max(0, i - 6), min(len(lines), i + 5)):
lm = _METHOD_PATTERN.search(lines[j])
if lm:
method = lm.group(1)
break
symbols.append({
"path": path, "name": f"{method} {route}", "line": i,
"symbol_type": "route", "parent": current_function,
"domain": domain,
"metadata": {"method": method, "pattern": route},
})
log.info("analyze_worker(%s): %d symbols", path_obj.name, len(symbols))
return symbols
# ─── Frontend Analyzer ───────────────────────────────────────────────────────
_API_CALL_PATTERNS = [
re.compile(r"""callAPI\s*\(\s*['"`/]([^'"`\n]+?)['"`]"""),
re.compile(r"""fetch\s*\(\s*['"`]([^'"`\n]*?/api/[^'"`\n]+?)['"`]"""),
re.compile(r"""fetch\s*\(\s*`[^`]*?(/api/[^`\n]+?)`"""),
re.compile(r"""callAPI\s*\(\s*`([^`\n]+?)`"""),
]
_LISTENER_PATTERNS = [
re.compile(r"""onclick\s*=\s*["'](\w+)\s*\("""),
re.compile(r"""addEventListener\s*\(\s*['"](\w+)['"],\s*(\w+)"""),
]
def analyze_frontend(path: str) -> List[dict]:
"""Extract functions and API calls from a frontend HTML/JS file."""
path_obj = Path(path)
if not path_obj.exists():
return []
content = path_obj.read_text(encoding="utf-8", errors="replace")
lines = content.split("\n")
domain = _detect_domain(path)
symbols = []
current_function = None
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("<!--") or stripped.startswith("*/"):
continue
for fp in _FUNC_PATTERNS:
m = fp.search(stripped)
if m:
fname = m.group(1)
if len(fname) > 2:
current_function = fname
symbols.append({
"path": path, "name": fname, "line": i,
"symbol_type": "function", "parent": None,
"domain": domain, "metadata": None,
})
break
for ap in _API_CALL_PATTERNS:
for am in ap.finditer(stripped):
api_path = am.group(1)
api_path = re.sub(r"\$\{([^}]+)\}", r":\1", api_path)
if "/api/" not in api_path and not api_path.startswith("/q"):
continue
symbols.append({
"path": path, "name": api_path, "line": i,
"symbol_type": "api_call", "parent": current_function,
"domain": domain,
"metadata": {"caller": current_function},
})
for lp in _LISTENER_PATTERNS:
for lm in lp.finditer(stripped):
handler = lm.group(1) if lm.lastindex == 1 else lm.group(2)
if len(handler) > 2 and handler[0].islower():
symbols.append({
"path": path, "name": handler, "line": i,
"symbol_type": "event_listener", "parent": None,
"domain": domain,
"metadata": {"event": lm.group(1) if lm.lastindex >= 2 else "click"},
})
# Deduplicate api_calls
seen_api = set()
deduped = []
for s in symbols:
if s["symbol_type"] == "api_call":
key = (s["name"], s.get("parent"))
if key in seen_api:
continue
seen_api.add(key)
deduped.append(s)
log.info("analyze_frontend(%s): %d symbols", path_obj.name, len(deduped))
return deduped
# ─── Schema Analyzer ─────────────────────────────────────────────────────────
_CREATE_TABLE = re.compile(r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(\w+)\s*\(", re.IGNORECASE)
_ALTER_TABLE_ADD = re.compile(r"ALTER\s+TABLE\s+(\w+)\s+ADD\s+(?:COLUMN\s+)?(\w+)\s+(\w+)", re.IGNORECASE)
_COLUMN_DEF = re.compile(
r"^\s+(\w+)\s+(TEXT|INTEGER|REAL|BLOB|NUMERIC|INT|VARCHAR|BOOLEAN|DATETIME|TIMESTAMP)"
r"(?:\s+(.*?))?(?:,\s*)?$",
re.IGNORECASE,
)
def analyze_schema(migrations_source: str) -> List[dict]:
"""Extract tables and columns from SQL migration files."""
source = Path(migrations_source)
if not source.exists():
return []
symbols = []
domain = _detect_domain(migrations_source)
sql_files = [source] if source.is_file() else sorted(source.glob("*.sql"))
for sql_file in sql_files:
content = sql_file.read_text(encoding="utf-8", errors="replace")
lines = content.split("\n")
path = str(sql_file)
current_table = None
for i, line in enumerate(lines, 1):
ctm = _CREATE_TABLE.search(line)
if ctm:
current_table = ctm.group(1)
symbols.append({
"path": path, "name": current_table, "line": i,
"symbol_type": "table", "parent": None,
"domain": domain,
"metadata": {"source": sql_file.name},
})
continue
if current_table:
if line.strip().startswith(")"):
current_table = None
continue
cm = _COLUMN_DEF.match(line)
if cm:
col_name = cm.group(1)
col_type = cm.group(2).upper()
constraints = cm.group(3).strip() if cm.group(3) else ""
if col_name.upper() in ("PRIMARY", "UNIQUE", "FOREIGN",
"CHECK", "CONSTRAINT", "CREATE",
"INDEX", "ON", "IF", "NOT"):
continue
meta = {"column_type": col_type, "source": sql_file.name}
if "NOT NULL" in constraints.upper():
meta["not_null"] = True
if "DEFAULT" in constraints.upper():
dm = re.search(r"DEFAULT\s+(\S+)", constraints, re.I)
if dm:
meta["default"] = dm.group(1).strip("'\"")
symbols.append({
"path": path, "name": f"{current_table}.{col_name}",
"line": i, "symbol_type": "column",
"parent": current_table, "domain": domain,
"metadata": meta,
})
am = _ALTER_TABLE_ADD.search(line)
if am:
symbols.append({
"path": path,
"name": f"{am.group(1)}.{am.group(2)}",
"line": i, "symbol_type": "column",
"parent": am.group(1), "domain": domain,
"metadata": {"column_type": am.group(3).upper(),
"source": sql_file.name, "added_by": "ALTER TABLE"},
})
seen = {}
for s in symbols:
key = (s["name"], s["symbol_type"])
seen[key] = s
deduped = list(seen.values())
log.info("analyze_schema(%s): %d symbols from %d files",
source.name, len(deduped), len(sql_files))
return deduped
# ─── Wiring Analyzer ─────────────────────────────────────────────────────────
def analyze_wiring(db) -> dict:
"""Cross-reference api_calls with routes to build call-graph relationships."""
conn = db.connect()
routes = conn.execute("SELECT * FROM symbols WHERE symbol_type = 'route'").fetchall()
api_calls = conn.execute("SELECT * FROM symbols WHERE symbol_type = 'api_call'").fetchall()
stats = {"connected": 0, "orphaned_routes": 0, "orphaned_calls": 0,
"relationships_created": 0}
route_map = {}
for r in routes:
pattern = dict(r).get("name", "")
if " " in pattern:
_, pattern = pattern.split(" ", 1)
norm = re.sub(r":\w+", "*", pattern)
route_map[norm] = dict(r)
matched_routes = set()
for call in api_calls:
call_dict = dict(call)
call_norm = re.sub(r":\w+", "*", call_dict["name"])
matched = False
for route_norm, route_row in route_map.items():
if _routes_match(call_norm, route_norm):
matched = True
matched_routes.add(route_norm)
stats["connected"] += 1
db.add_relationship(
source=f"{call_dict['path']}:{call_dict['line']}",
target=f"{route_row['path']}:{route_row['line']}",
rel_type="calls",
metadata={"caller": call_dict.get("parent"),
"route": route_row["name"],
"api_path": call_dict["name"]},
)
stats["relationships_created"] += 1
break
if not matched:
stats["orphaned_calls"] += 1
for route_norm in route_map:
if route_norm not in matched_routes:
stats["orphaned_routes"] += 1
log.info("analyze_wiring: %d connected, %d orphaned routes, %d orphaned calls",
stats["connected"], stats["orphaned_routes"], stats["orphaned_calls"])
return stats
def _routes_match(call_pattern: str, route_pattern: str) -> bool:
"""Check if a frontend API call matches a backend route."""
if call_pattern == route_pattern:
return True
call_parts = [p for p in call_pattern.split("/") if p]
route_parts = [p for p in route_pattern.split("/") if p]
min_len = min(len(call_parts), len(route_parts))
if min_len < 2:
return False
for j in range(min_len):
cp, rp = call_parts[j], route_parts[j]
if cp == "*" or rp == "*":
continue
if cp != rp:
return False
return True
# ─── Main entry points ───────────────────────────────────────────────────────
KNOWN_WORKERS = [
r"C:\AthenaSystem\...\Prototype1\weyland-worker.js",
r"C:\MHS\Ventures\Consenta\consenta-worker.js",
]
KNOWN_FRONTENDS = [
r"C:\AthenaSystem\...\deployment\public\subx.html",
r"C:\MHS\Ventures\Consenta\deployment\public\consenta.html",
]
KNOWN_MIGRATIONS = [
r"C:\AthenaSystem\...\Prototype1\migrations",
r"C:\MHS\Ventures\Consenta\migrations",
]
def analyze_all(db=None) -> dict:
"""Run all analyzers and store results in taxonomy DB."""
from hascom.taxonomy import TaxonomyDB
if db is None:
db = TaxonomyDB()
db.connect()
stats = {"workers": {}, "frontends": {}, "schema": {}, "wiring": {},
"total_symbols": 0}
for path in KNOWN_WORKERS + KNOWN_MODULES:
if Path(path).exists():
symbols = analyze_worker(path)
if symbols:
db.clear_symbols(path)
db.insert_symbols(symbols)
stats["workers"][Path(path).name] = len(symbols)
stats["total_symbols"] += len(symbols)
for path in KNOWN_FRONTENDS:
if Path(path).exists():
symbols = analyze_frontend(path)
if symbols:
db.clear_symbols(path)
db.insert_symbols(symbols)
stats["frontends"][Path(path).name] = len(symbols)
stats["total_symbols"] += len(symbols)
for mdir in KNOWN_MIGRATIONS:
if Path(mdir).exists():
symbols = analyze_schema(mdir)
if symbols:
for sql_file in Path(mdir).glob("*.sql"):
db.clear_symbols(str(sql_file))
db.insert_symbols(symbols)
stats["schema"][Path(mdir).name] = len(symbols)
stats["total_symbols"] += len(symbols)
stats["wiring"] = analyze_wiring(db)
db.log_change("symbols_analyzed", detail=stats)
log.info("analyze_all complete: %d total symbols", stats["total_symbols"])
return stats
Testing UAT Toolkit
uat/__init__.py UAT
Public API surface of the UAT toolkit. Exports all assertions, the runner, and session management.
from hascom.uat import * gives you everything."""HASCOM UAT Toolkit — general-purpose API testing framework.
Drop test suites in uat/suites/ → auto-discovered → run via CLI or capability.
"""
from hascom.uat.http import (
UATSession, Response, api_call,
decode_jwt_expiry, decode_jwt_claims,
)
from hascom.uat.assertions import (
UATAssertionError,
assert_ok, assert_status, assert_status_in,
assert_field, assert_field_equals, assert_field_type, assert_field_in,
assert_fields_present, assert_content_type,
assert_numeric_equals, assert_greater_than,
assert_pdf_valid, assert_pdf_size,
assert_list_length, assert_latency,
)
from hascom.uat.runner import (
TestCase, Suite, TestResult, SuiteResult,
Runner, discover_suites, print_report, to_json,
)
__all__ = [
"UATSession", "Response", "api_call",
"decode_jwt_expiry", "decode_jwt_claims",
"UATAssertionError",
"assert_ok", "assert_status", "assert_status_in",
"assert_field", "assert_field_equals", "assert_field_type", "assert_field_in",
"assert_fields_present", "assert_content_type",
"assert_numeric_equals", "assert_greater_than",
"assert_pdf_valid", "assert_pdf_size",
"assert_list_length", "assert_latency",
"TestCase", "Suite", "TestResult", "SuiteResult",
"Runner", "discover_suites", "print_report", "to_json",
]
uat/http.py UAT
HTTP layer for API testing.
UATSession holds base URL + JWT token.
Response has structured field access (resp.field("data.quotes[0].id")).
api_call() is the generic caller (urllib, no external deps).
Auto-token-refresh via daemon bridge or devtools fallback.
"""UAT HTTP layer — session, response, and generic API caller.
Includes auto-token-refresh: decode JWT, check expiry, refresh via
daemon bridge (HTTP eval) or devtools.get_token fallback.
"""
import base64
import json
import logging
import time
import urllib.request
import urllib.error
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
log = logging.getLogger("hascom.uat.http")
@dataclass
class Response:
"""Structured HTTP response for UAT assertions."""
status_code: int
body: Any # parsed JSON or None
headers: Dict[str, str]
elapsed_ms: float
raw_bytes: Optional[bytes] = None
error: Optional[str] = None
@property
def ok(self) -> bool:
return 200 <= self.status_code < 300
def field(self, dot_path: str) -> Any:
"""Navigate nested fields. Supports 'data.quotes[0].id' syntax."""
obj = self.body
for part in dot_path.replace(']', '').split('.'):
if '[' in part:
key, idx = part.split('[', 1)
obj = obj[key][int(idx)]
elif isinstance(obj, dict):
obj = obj[part]
elif isinstance(obj, list):
obj = obj[int(part)]
else:
raise KeyError(f"Cannot traverse {part!r} on {type(obj).__name__}")
return obj
class UATSession:
"""Holds base URL, auth token, and shared context for a test run."""
def __init__(self, base_url: str, token: str = ""):
self.base_url = base_url.rstrip('/')
self.token = token
self.context: Dict[str, Any] = {}
@classmethod
def from_config(cls, auto_refresh: bool = True,
site: str = "subx") -> 'UATSession':
"""Load session from hascom_config.json."""
from hascom.config import load_hascom_config
config = load_hascom_config()
base_url = config.get("uat_base_url",
"https://weyland.johnmobley99.workers.dev")
token = config.get("uat_token", "")
session = cls(base_url=base_url, token=token)
if auto_refresh and token:
expiry = decode_jwt_expiry(token)
if expiry is not None and expiry < time.time():
log.info("JWT expired — attempting auto-refresh")
new_token = _auto_refresh_token(site)
if new_token:
session.token = new_token
elif expiry is not None:
remaining = expiry - time.time()
if remaining < 3600:
log.info("JWT expires in %.0f minutes", remaining / 60)
return session
def validate_token(self) -> bool:
"""Quick check: does the token work against /api/auth/me?"""
resp = api_call(self, "/api/auth/me", timeout=10)
return resp.ok
def api_call(session: UATSession, path: str, method: str = "GET",
data: Any = None, timeout: int = 30,
raw: bool = False) -> Response:
"""Generic HTTP call using session credentials."""
url = f"{session.base_url}{path}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
"Accept": "application/json, text/html, */*",
}
if session.token:
headers["Authorization"] = f"Bearer {session.token}"
body_bytes = None
if data is not None:
headers["Content-Type"] = "application/json"
body_bytes = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body_bytes, headers=headers,
method=method)
t0 = time.perf_counter()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = (time.perf_counter() - t0) * 1000
raw_data = resp.read()
resp_headers = {k: v for k, v in resp.getheaders()}
if raw:
return Response(status_code=resp.status, body=None,
headers=resp_headers, elapsed_ms=elapsed,
raw_bytes=raw_data)
try:
parsed = json.loads(raw_data.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
parsed = None
return Response(status_code=resp.status, body=parsed,
headers=resp_headers, elapsed_ms=elapsed,
raw_bytes=raw_data)
except urllib.error.HTTPError as e:
elapsed = (time.perf_counter() - t0) * 1000
raw_err = None
detail = ""
try:
raw_err = e.read()
detail = raw_err.decode("utf-8")[:2000]
except Exception:
pass
try:
parsed = json.loads(detail)
except (json.JSONDecodeError, TypeError):
parsed = {"error": detail}
return Response(status_code=e.code, body=parsed,
headers={k: v for k, v in e.headers.items()} if e.headers else {},
elapsed_ms=elapsed, raw_bytes=raw_err, error=detail)
except Exception as e:
elapsed = (time.perf_counter() - t0) * 1000
return Response(status_code=0, body=None, headers={},
elapsed_ms=elapsed, error=str(e))
# ─── JWT decode + auto-refresh ───────────────────────────────────────────────
def decode_jwt_expiry(token: str) -> Optional[float]:
"""Decode JWT payload and return expiry timestamp (or None)."""
try:
parts = token.split(".")
if len(parts) != 3:
return None
payload_b64 = parts[1]
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += "=" * padding
payload_bytes = base64.urlsafe_b64decode(payload_b64)
payload = json.loads(payload_bytes)
return float(payload.get("exp", 0)) or None
except Exception:
return None
def decode_jwt_claims(token: str) -> Optional[dict]:
"""Decode JWT payload and return all claims."""
try:
parts = token.split(".")
if len(parts) != 3:
return None
payload_b64 = parts[1]
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += "=" * padding
payload_bytes = base64.urlsafe_b64decode(payload_b64)
return json.loads(payload_bytes)
except Exception:
return None
uat/assertions.py UAT
Every assertion returns True on success, raises
UATAssertionError (with structured expected/actual) on failure.
Categories: status (ok/code/set), field (exists/equals/type/in/present/matches/not-empty), content-type, numeric (equals/greater), PDF (valid/size), collection (list length), performance (latency), string (contains/uuid).
"""UAT Assertion library — 22 reusable assertions.
Each returns True on success, raises UATAssertionError (with expected/actual)
on failure. Designed for structured reporting, not stdlib AssertionError.
"""
import re
class UATAssertionError(Exception):
"""Structured assertion failure with expected/actual for reporting."""
def __init__(self, message: str, expected=None, actual=None):
super().__init__(message)
self.expected = expected
self.actual = actual
# ─── Status assertions ───────────────────────────────────────────────────────
def assert_ok(resp):
"""Assert response has 2xx status."""
if not resp.ok:
raise UATAssertionError(
f"Expected 2xx, got {resp.status_code}",
expected="2xx", actual=resp.status_code)
return True
def assert_status(resp, code):
"""Assert exact status code."""
if resp.status_code != code:
raise UATAssertionError(
f"Expected status {code}, got {resp.status_code}",
expected=code, actual=resp.status_code)
return True
def assert_status_in(resp, codes):
"""Assert status code is in a set."""
if resp.status_code not in codes:
raise UATAssertionError(
f"Expected status in {codes}, got {resp.status_code}",
expected=codes, actual=resp.status_code)
return True
# ─── Field assertions ────────────────────────────────────────────────────────
def assert_field(resp, dot_path):
"""Assert a field exists and is not None."""
try:
val = resp.field(dot_path)
except (KeyError, IndexError, TypeError) as e:
raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
expected=f"field '{dot_path}' exists", actual="missing")
if val is None:
raise UATAssertionError(f"Field '{dot_path}' is None",
expected=f"field '{dot_path}' not None", actual=None)
return True
def assert_field_equals(resp, dot_path, expected):
"""Assert a field equals expected value."""
try:
val = resp.field(dot_path)
except (KeyError, IndexError, TypeError) as e:
raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
expected=expected, actual="missing")
if val != expected:
raise UATAssertionError(
f"Field '{dot_path}': expected {expected!r}, got {val!r}",
expected=expected, actual=val)
return True
def assert_field_type(resp, dot_path, expected_type):
"""Assert a field is of expected type."""
try:
val = resp.field(dot_path)
except (KeyError, IndexError, TypeError) as e:
raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
expected=str(expected_type), actual="missing")
if not isinstance(val, expected_type):
raise UATAssertionError(
f"Field '{dot_path}': expected {expected_type}, got {type(val).__name__}",
expected=str(expected_type), actual=type(val).__name__)
return True
def assert_fields_present(resp, *paths):
"""Assert multiple fields exist."""
missing = []
for p in paths:
try:
resp.field(p)
except (KeyError, IndexError, TypeError):
missing.append(p)
if missing:
raise UATAssertionError(f"Missing fields: {missing}",
expected=list(paths), actual=f"missing: {missing}")
return True
# ─── PDF assertions ──────────────────────────────────────────────────────────
def assert_pdf_valid(resp):
"""Assert response contains valid PDF (magic bytes + %%EOF marker)."""
data = resp.raw_bytes
if not data:
raise UATAssertionError("No raw bytes", expected="PDF bytes", actual="empty")
if data[:5] != b'%PDF-':
raise UATAssertionError(f"Not a PDF: starts with {data[:10]!r}",
expected="%PDF- header", actual=data[:10])
if b'%%EOF' not in data[-32:]:
raise UATAssertionError("PDF missing %%EOF",
expected="%%EOF", actual=data[-16:])
return True
# ─── Latency assertion ───────────────────────────────────────────────────────
def assert_latency(resp, max_ms):
"""Assert response time within threshold."""
if resp.elapsed_ms > max_ms:
raise UATAssertionError(
f"Latency {resp.elapsed_ms:.0f}ms exceeds {max_ms}ms",
expected=f"<= {max_ms}ms", actual=f"{resp.elapsed_ms:.0f}ms")
return True
# ─── String assertions ───────────────────────────────────────────────────────
def assert_contains(value, substring, label="value"):
"""Assert a string contains a substring."""
if not isinstance(value, str):
raise UATAssertionError(f"{label}: expected string",
expected=f"string containing '{substring}'", actual=type(value).__name__)
if substring not in value:
raise UATAssertionError(f"{label}: '{substring}' not found",
expected=f"contains '{substring}'", actual=value[:200])
return True
def assert_uuid(value, label="value"):
"""Assert a value is a valid UUID format."""
uuid_re = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
if not re.match(uuid_re, str(value).lower()):
raise UATAssertionError(f"{label}: '{value}' is not a valid UUID",
expected="UUID format", actual=value)
return True
# ... plus assert_field_in, assert_content_type, assert_numeric_equals,
# assert_greater_than, assert_pdf_size, assert_list_length,
# assert_not_empty, assert_field_matches, assert_field_not_empty
uat/runner.py UAT
Test execution engine.
TestCase with dependency declarations. Suite collects them.
Runner executes with Kahn topological sort, structured reporting, and auto-persistence
to taxonomy.db (regression detection across runs).
discover_suites() auto-loads from uat/suites/ via get_suite() convention.
"""UAT Runner — TestCase, Suite, Runner, discovery, reporting."""
import importlib
import logging
import pkgutil
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from hascom.uat.assertions import UATAssertionError
log = logging.getLogger("hascom.uat")
@dataclass
class TestCase:
"""Single test — a callable with optional dependency declarations."""
name: str
description: str
run: Callable # (session) -> None; raises on failure
depends_on: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
@dataclass
class Suite:
"""Named collection of test cases."""
name: str
description: str
tests: List[TestCase]
tags: List[str] = field(default_factory=list)
@dataclass
class TestResult:
"""Outcome of a single test execution."""
name: str
status: str # pass | fail | skip | error
message: str = ""
elapsed_ms: float = 0.0
assertion_detail: Optional[Dict[str, Any]] = None
@dataclass
class SuiteResult:
"""Aggregate outcome of a full suite run."""
suite_name: str
results: List[TestResult]
elapsed_ms: float = 0.0
@property
def ok(self) -> bool:
return all(r.status in ("pass", "skip") for r in self.results)
@property
def pass_count(self) -> int:
return sum(1 for r in self.results if r.status == "pass")
@property
def fail_count(self) -> int:
return sum(1 for r in self.results if r.status == "fail")
# ─── Topological sort ────────────────────────────────────────────────────────
def _topo_sort(tests: List[TestCase]) -> List[TestCase]:
"""Order tests by depends_on using Kahn's algorithm."""
by_name = {t.name: t for t in tests}
in_degree: Dict[str, int] = defaultdict(int)
graph: Dict[str, List[str]] = defaultdict(list)
for t in tests:
in_degree.setdefault(t.name, 0)
for dep in t.depends_on:
graph[dep].append(t.name)
in_degree[t.name] += 1
queue = deque(n for n in in_degree if in_degree[n] == 0)
ordered: List[TestCase] = []
while queue:
name = queue.popleft()
if name in by_name:
ordered.append(by_name[name])
for neighbor in graph[name]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Append any remaining (cycle / missing dep)
seen = {t.name for t in ordered}
for t in tests:
if t.name not in seen:
ordered.append(t)
return ordered
# ─── Runner ──────────────────────────────────────────────────────────────────
class Runner:
"""Execute a suite with dependency ordering and structured reporting."""
def run_suite(self, suite: Suite, session,
quiet: bool = False) -> SuiteResult:
results: List[TestResult] = []
failed_names: set = set()
t_suite = time.perf_counter()
ordered = _topo_sort(suite.tests)
for test in ordered:
unmet = [d for d in test.depends_on if d in failed_names]
if unmet:
result = TestResult(
name=test.name, status="skip",
message=f"Skipped: dependency failed ({', '.join(unmet)})")
results.append(result)
failed_names.add(test.name)
if not quiet:
print(f" [SKIP] {test.name} — {result.message}")
continue
t0 = time.perf_counter()
try:
test.run(session)
elapsed = (time.perf_counter() - t0) * 1000
result = TestResult(name=test.name, status="pass",
elapsed_ms=elapsed, message="OK")
if not quiet:
print(f" [PASS] {test.name} ({elapsed:.0f}ms)")
except UATAssertionError as e:
elapsed = (time.perf_counter() - t0) * 1000
result = TestResult(
name=test.name, status="fail",
elapsed_ms=elapsed, message=str(e),
assertion_detail={"expected": e.expected, "actual": e.actual})
failed_names.add(test.name)
if not quiet:
print(f" [FAIL] {test.name} — {e}")
except Exception as e:
elapsed = (time.perf_counter() - t0) * 1000
result = TestResult(
name=test.name, status="error",
elapsed_ms=elapsed, message=f"{type(e).__name__}: {e}")
failed_names.add(test.name)
if not quiet:
print(f" [ERR] {test.name} — {type(e).__name__}: {e}")
results.append(result)
suite_elapsed = (time.perf_counter() - t_suite) * 1000
return SuiteResult(suite_name=suite.name, results=results,
elapsed_ms=suite_elapsed)
# ─── Discovery ───────────────────────────────────────────────────────────────
def discover_suites() -> Dict[str, Suite]:
"""Auto-discover suites from uat/suites/ (get_suite() convention)."""
suites_dir = str(Path(__file__).parent / "suites")
found: Dict[str, Suite] = {}
for _importer, modname, _ispkg in pkgutil.iter_modules([suites_dir]):
if modname.startswith("_"):
continue
try:
mod = importlib.import_module(f"hascom.uat.suites.{modname}")
if hasattr(mod, "get_suite"):
suite = mod.get_suite()
found[suite.name] = suite
except Exception as e:
log.warning("Failed to load suite '%s': %s", modname, e)
return found
# ─── Reporting ───────────────────────────────────────────────────────────────
def print_report(result: SuiteResult):
"""Formatted CLI report."""
print(f"\n{'=' * 60}")
print(f" Suite: {result.suite_name}")
print(f" Time: {result.elapsed_ms:.0f}ms")
print(f"{'=' * 60}")
for r in result.results:
tag = {"pass": "PASS", "fail": "FAIL",
"skip": "SKIP", "error": "ERR "}.get(r.status, "????")
line = f" [{tag}] {r.name}"
if r.elapsed_ms:
line += f" ({r.elapsed_ms:.0f}ms)"
if r.status != "pass" and r.message:
line += f" — {r.message}"
print(line)
print(f"{'=' * 60}")
p = result.pass_count
total = len(result.results)
verdict = "ALL PASS" if result.ok else "FAILURES"
print(f" {p}/{total} passed")
print(f" Verdict: {verdict}")
print(f"{'=' * 60}\n")
def to_json(result: SuiteResult) -> dict:
"""Machine-readable dict for structured reporting."""
return {
"suite": result.suite_name,
"ok": result.ok,
"elapsed_ms": round(result.elapsed_ms, 1),
"summary": {
"total": len(result.results),
"pass": result.pass_count,
"fail": result.fail_count,
},
"tests": [
{"name": r.name, "status": r.status, "message": r.message,
"elapsed_ms": round(r.elapsed_ms, 1)}
for r in result.results
],
}