HASCOM Toolkit — Source Reference

Complete source code for John's system — 17 providers, 166 capabilities, 3,637 symbols

Generated 2026-02-14 from HASCOM v1.0.0 · taxonomy.db (14.7MB)

← Back to Consulting Handoff
17Providers
166Capabilities
3,637Symbols
9,468Files Indexed
357Routes
905Functions
168Tables
1,865Columns

Foundation Core Registry

core.py CORE 7.4 KB · 208 lines · Capability + Provider + HascomRegistry
The atomic foundation. Capability is the unit. Provider registers them. HascomRegistry is fractal — it is itself a capability. Auto-discovers providers from providers/ via pkgutil.iter_modules.
"""HASCOM Core — Capability, Provider, and HascomRegistry.

The fractal capability registry. A Capability is the atomic unit.
A Registry holds capabilities and is itself a capability.
"""

import asyncio
import importlib
import inspect
import json
import logging
import pkgutil
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

log = logging.getLogger("hascom")

HASCOM_VERSION = "1.0.0"


@dataclass
class Capability:
    """Atomic unit of HASCOM — one invokable thing."""
    name: str               # "mhsync.encrypt", "cf.d1_query"
    domain: str             # "local", "cloudflare", "syncropy", "system"
    description: str        # Human/AI readable purpose
    interface: str          # "function" | "http" | "websocket" | "cli"
    invoke: Callable        # The actual callable (sync or async)
    status: Callable        # () -> {"ok": bool, "detail": str}
    tags: List[str] = field(default_factory=list)

    def to_dict(self) -> dict:
        """Serialize for manifest (without callables)."""
        try:
            st = self.status()
        except Exception as e:
            st = {"ok": False, "detail": str(e)}
        return {
            "domain": self.domain,
            "interface": self.interface,
            "description": self.description,
            "status": "ok" if st.get("ok") else "error",
            "status_detail": st.get("detail", ""),
            "tags": self.tags,
        }


class Provider:
    """Base class for capability providers."""
    domain: str = "unknown"
    name: str = "unknown"

    def register(self, registry: 'HascomRegistry') -> List[Capability]:
        """Return list of capabilities to register. Override in subclass."""
        raise NotImplementedError


class HascomRegistry:
    """The fractal registry. Is itself a capability."""

    def __init__(self, node_id: str = "", machine: str = ""):
        self.capabilities: Dict[str, Capability] = {}
        self.providers: Dict[str, Provider] = {}
        self.node_id = node_id
        self.machine = machine
        self._manifest_path = Path(__file__).parent / "hascom_manifest.json"

    def register(self, cap: Capability):
        """Register a single capability."""
        if cap.name in self.capabilities:
            log.warning("Overwriting capability: %s", cap.name)
        self.capabilities[cap.name] = cap
        log.debug("Registered: %s [%s/%s]", cap.name, cap.domain, cap.interface)

    def register_provider(self, provider: Provider):
        """Register a provider and all its capabilities."""
        caps = provider.register(self)
        for cap in caps:
            self.register(cap)
        self.providers[provider.name] = provider
        log.info("Provider '%s' registered %d capabilities", provider.name, len(caps))

    def find(self, query: str = None, domain: str = None,
             tags: list = None) -> List[Capability]:
        """Search capabilities by keyword, domain, or tags."""
        results = list(self.capabilities.values())

        if domain:
            results = [c for c in results if c.domain == domain]

        if tags:
            tag_set = set(tags)
            results = [c for c in results if tag_set & set(c.tags)]

        if query:
            q = query.lower()
            results = [c for c in results
                       if q in c.name.lower()
                       or q in c.description.lower()
                       or any(q in t.lower() for t in c.tags)]

        return results

    def invoke(self, name: str, **kwargs) -> Any:
        """Invoke a capability by name."""
        cap = self.capabilities.get(name)
        if not cap:
            raise KeyError(f"Capability not found: {name}")

        fn = cap.invoke
        if asyncio.iscoroutinefunction(fn):
            loop = asyncio.get_event_loop()
            if loop.is_running():
                import concurrent.futures
                with concurrent.futures.ThreadPoolExecutor() as pool:
                    future = pool.submit(asyncio.run, fn(**kwargs))
                    return future.result()
            else:
                return asyncio.run(fn(**kwargs))
        else:
            return fn(**kwargs)

    def status(self) -> dict:
        """Health of all registered capabilities."""
        results = {}
        ok_count = 0
        err_count = 0
        for name, cap in sorted(self.capabilities.items()):
            try:
                st = cap.status()
            except Exception as e:
                st = {"ok": False, "detail": str(e)}
            results[name] = st
            if st.get("ok"):
                ok_count += 1
            else:
                err_count += 1
        return {
            "ok": err_count == 0,
            "total": len(self.capabilities),
            "ok_count": ok_count,
            "error_count": err_count,
            "capabilities": results,
        }

    def manifest(self, save: bool = True) -> dict:
        """Full JSON capability catalog."""
        caps = {}
        for name, cap in sorted(self.capabilities.items()):
            caps[name] = cap.to_dict()

        doc = {
            "hascom_version": HASCOM_VERSION,
            "node_id": self.node_id,
            "machine": self.machine,
            "generated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
            "capabilities": caps,
            "providers": sorted(self.providers.keys()),
            "total_capabilities": len(caps),
        }

        if save:
            try:
                with open(self._manifest_path, "w", encoding="utf-8") as f:
                    json.dump(doc, f, indent=2)
                log.debug("Manifest written: %s", self._manifest_path)
            except OSError as e:
                log.warning("Could not write manifest: %s", e)

        return doc

    def as_capability(self) -> Capability:
        """Fractal: the registry as a capability."""
        return Capability(
            name="hascom.registry",
            domain="local",
            description="HASCOM capability registry — query, invoke, and discover capabilities",
            interface="function",
            invoke=lambda **kw: self.manifest(save=False),
            status=lambda: {"ok": True, "detail": f"{len(self.capabilities)} capabilities"},
            tags=["hascom", "registry", "meta"],
        )

    def load_providers(self):
        """Auto-discover and load all providers from the providers/ package."""
        providers_dir = Path(__file__).parent / "providers"
        if not providers_dir.exists():
            log.warning("providers/ directory not found")
            return

        for importer, modname, ispkg in pkgutil.iter_modules([str(providers_dir)]):
            if modname.startswith("_"):
                continue
            try:
                mod = importlib.import_module(f"hascom.providers.{modname}")
                if hasattr(mod, "get_provider"):
                    provider = mod.get_provider()
                    self.register_provider(provider)
                else:
                    log.debug("Provider module '%s' has no get_provider()", modname)
            except Exception as e:
                log.warning("Failed to load provider '%s': %s", modname, e)

        # Register self as capability (fractal)
        self.register(self.as_capability())

Knowledge Symbol-Level Indexing Engine

analyzers.py CORE 25 KB · 663 lines · Route + function + schema + wiring extraction
Four analyzers that turn source files into searchable symbols: analyze_worker() — Cloudflare Worker routes + handlers, analyze_frontend() — API calls + event listeners, analyze_schema() — CREATE TABLE + columns from migrations, analyze_wiring() — cross-references api_call → route relationships. All general-purpose — not SubX-specific.
"""HASCOM Analyzers — symbol-level indexing for large source files.

Extracts routes, functions, API calls, tables, and columns from source files
and stores them in the taxonomy symbols table. Turns file-level knowledge
into function-level knowledge.

Analyzers:
  analyze_worker(path)    → routes + handlers from Cloudflare Worker .js files
  analyze_frontend(path)  → functions + API calls from frontend .html/.js files
  analyze_schema(path)    → tables + columns from migration .sql files
  analyze_wiring(db)      → cross-reference api_call → route relationships

Usage:
    from hascom.analyzers import analyze_all
    symbols, relationships = analyze_all(db)  # runs all analyzers, stores results

All analyzers are general-purpose: they work on any Worker, any frontend,
any SQL migration. Not SubX-specific.
"""

import json
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple

log = logging.getLogger("hascom.analyzers")


# ─── Domain detection ────────────────────────────────────────────────────────

def _detect_domain(path: str) -> str:
    """Infer domain from file path."""
    lower = path.lower()
    for name in ["consenta", "hascom", "mhsync", "quanticfork", "athena"]:
        if name in lower:
            return name
    if "weyland" in lower or "subx" in lower or "prototype1" in lower:
        return "weyland"
    return "unknown"


# ─── Worker Route Analyzer ───────────────────────────────────────────────────

# Patterns for Cloudflare Worker route matching
_ROUTE_PATTERNS = [
    # itty-router style: router.get('/api/...', ...)
    re.compile(
        r"""router\.(get|post|put|patch|delete|options|all)\s*\(\s*['"`](/[^'"`]+)['"`]""",
        re.IGNORECASE,
    ),
    # Direct equality: path === '/api/...'
    re.compile(
        r"""(?:url\.pathname|path)\s*===?\s*['"`](/(?:api|q|quote)/[^'"`]+)['"`]""",
    ),
    # startsWith: url.pathname.startsWith('/api/...')
    re.compile(
        r"""(?:url\.pathname|path)\.startsWith\s*\(\s*['"`](/(?:api|q|quote)/[^'"`]+)['"`]\s*\)""",
    ),
    # .match with regex
    re.compile(
        r"""(?:url\.pathname|path)\.match\s*\(\s*/\^?\\?(/(?:api|q|quote)/[^\s/]+(?:/[^\s/]+)*)/""",
    ),
]

_METHOD_PATTERN = re.compile(
    r"""(?:method|request\.method)\s*===?\s*['"`](GET|POST|PUT|PATCH|DELETE)['"`]""",
)

_FUNC_PATTERNS = [
    re.compile(r"^(?:async\s+)?function\s+(\w+)\s*\("),
    re.compile(r"^(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?function"),
    re.compile(r"^(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\("),
    re.compile(r"^export\s+(?:async\s+)?function\s+(\w+)\s*\("),
]


def analyze_worker(path: str) -> List[dict]:
    """Extract routes and handler functions from a Cloudflare Worker JS file."""
    path_obj = Path(path)
    if not path_obj.exists():
        return []

    content = path_obj.read_text(encoding="utf-8", errors="replace")
    lines = content.split("\n")
    domain = _detect_domain(path)
    symbols = []
    current_function = None
    recent_method = None
    recent_method_line = 0

    for i, line in enumerate(lines, 1):
        stripped = line.strip()
        if stripped.startswith("//") or stripped.startswith("/*") or stripped.startswith("*"):
            continue

        for fp in _FUNC_PATTERNS:
            m = fp.search(stripped)
            if m:
                fname = m.group(1)
                if len(fname) > 2 and not fname.startswith("_U"):
                    current_function = fname
                    symbols.append({
                        "path": path, "name": fname, "line": i,
                        "symbol_type": "function", "parent": None,
                        "domain": domain, "metadata": None,
                    })
                break

        mm = _METHOD_PATTERN.search(stripped)
        if mm:
            recent_method = mm.group(1)
            recent_method_line = i

        for rp in _ROUTE_PATTERNS:
            rm = rp.search(stripped)
            if rm:
                if rm.lastindex >= 2:
                    method = rm.group(1).upper()
                    route = rm.group(2)
                else:
                    route = rm.group(1).replace("\\", "")
                    method = "?"
                    if recent_method and (i - recent_method_line) < 20:
                        method = recent_method
                    for j in range(max(0, i - 6), min(len(lines), i + 5)):
                        lm = _METHOD_PATTERN.search(lines[j])
                        if lm:
                            method = lm.group(1)
                            break

                symbols.append({
                    "path": path, "name": f"{method} {route}", "line": i,
                    "symbol_type": "route", "parent": current_function,
                    "domain": domain,
                    "metadata": {"method": method, "pattern": route},
                })

    log.info("analyze_worker(%s): %d symbols", path_obj.name, len(symbols))
    return symbols


# ─── Frontend Analyzer ───────────────────────────────────────────────────────

_API_CALL_PATTERNS = [
    re.compile(r"""callAPI\s*\(\s*['"`/]([^'"`\n]+?)['"`]"""),
    re.compile(r"""fetch\s*\(\s*['"`]([^'"`\n]*?/api/[^'"`\n]+?)['"`]"""),
    re.compile(r"""fetch\s*\(\s*`[^`]*?(/api/[^`\n]+?)`"""),
    re.compile(r"""callAPI\s*\(\s*`([^`\n]+?)`"""),
]

_LISTENER_PATTERNS = [
    re.compile(r"""onclick\s*=\s*["'](\w+)\s*\("""),
    re.compile(r"""addEventListener\s*\(\s*['"](\w+)['"],\s*(\w+)"""),
]


def analyze_frontend(path: str) -> List[dict]:
    """Extract functions and API calls from a frontend HTML/JS file."""
    path_obj = Path(path)
    if not path_obj.exists():
        return []

    content = path_obj.read_text(encoding="utf-8", errors="replace")
    lines = content.split("\n")
    domain = _detect_domain(path)
    symbols = []
    current_function = None

    for i, line in enumerate(lines, 1):
        stripped = line.strip()
        if stripped.startswith("<!--") or stripped.startswith("*/"):
            continue

        for fp in _FUNC_PATTERNS:
            m = fp.search(stripped)
            if m:
                fname = m.group(1)
                if len(fname) > 2:
                    current_function = fname
                    symbols.append({
                        "path": path, "name": fname, "line": i,
                        "symbol_type": "function", "parent": None,
                        "domain": domain, "metadata": None,
                    })
                break

        for ap in _API_CALL_PATTERNS:
            for am in ap.finditer(stripped):
                api_path = am.group(1)
                api_path = re.sub(r"\$\{([^}]+)\}", r":\1", api_path)
                if "/api/" not in api_path and not api_path.startswith("/q"):
                    continue
                symbols.append({
                    "path": path, "name": api_path, "line": i,
                    "symbol_type": "api_call", "parent": current_function,
                    "domain": domain,
                    "metadata": {"caller": current_function},
                })

        for lp in _LISTENER_PATTERNS:
            for lm in lp.finditer(stripped):
                handler = lm.group(1) if lm.lastindex == 1 else lm.group(2)
                if len(handler) > 2 and handler[0].islower():
                    symbols.append({
                        "path": path, "name": handler, "line": i,
                        "symbol_type": "event_listener", "parent": None,
                        "domain": domain,
                        "metadata": {"event": lm.group(1) if lm.lastindex >= 2 else "click"},
                    })

    # Deduplicate api_calls
    seen_api = set()
    deduped = []
    for s in symbols:
        if s["symbol_type"] == "api_call":
            key = (s["name"], s.get("parent"))
            if key in seen_api:
                continue
            seen_api.add(key)
        deduped.append(s)

    log.info("analyze_frontend(%s): %d symbols", path_obj.name, len(deduped))
    return deduped


# ─── Schema Analyzer ─────────────────────────────────────────────────────────

_CREATE_TABLE = re.compile(r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(\w+)\s*\(", re.IGNORECASE)
_ALTER_TABLE_ADD = re.compile(r"ALTER\s+TABLE\s+(\w+)\s+ADD\s+(?:COLUMN\s+)?(\w+)\s+(\w+)", re.IGNORECASE)
_COLUMN_DEF = re.compile(
    r"^\s+(\w+)\s+(TEXT|INTEGER|REAL|BLOB|NUMERIC|INT|VARCHAR|BOOLEAN|DATETIME|TIMESTAMP)"
    r"(?:\s+(.*?))?(?:,\s*)?$",
    re.IGNORECASE,
)


def analyze_schema(migrations_source: str) -> List[dict]:
    """Extract tables and columns from SQL migration files."""
    source = Path(migrations_source)
    if not source.exists():
        return []

    symbols = []
    domain = _detect_domain(migrations_source)
    sql_files = [source] if source.is_file() else sorted(source.glob("*.sql"))

    for sql_file in sql_files:
        content = sql_file.read_text(encoding="utf-8", errors="replace")
        lines = content.split("\n")
        path = str(sql_file)
        current_table = None

        for i, line in enumerate(lines, 1):
            ctm = _CREATE_TABLE.search(line)
            if ctm:
                current_table = ctm.group(1)
                symbols.append({
                    "path": path, "name": current_table, "line": i,
                    "symbol_type": "table", "parent": None,
                    "domain": domain,
                    "metadata": {"source": sql_file.name},
                })
                continue

            if current_table:
                if line.strip().startswith(")"):
                    current_table = None
                    continue
                cm = _COLUMN_DEF.match(line)
                if cm:
                    col_name = cm.group(1)
                    col_type = cm.group(2).upper()
                    constraints = cm.group(3).strip() if cm.group(3) else ""
                    if col_name.upper() in ("PRIMARY", "UNIQUE", "FOREIGN",
                                            "CHECK", "CONSTRAINT", "CREATE",
                                            "INDEX", "ON", "IF", "NOT"):
                        continue
                    meta = {"column_type": col_type, "source": sql_file.name}
                    if "NOT NULL" in constraints.upper():
                        meta["not_null"] = True
                    if "DEFAULT" in constraints.upper():
                        dm = re.search(r"DEFAULT\s+(\S+)", constraints, re.I)
                        if dm:
                            meta["default"] = dm.group(1).strip("'\"")
                    symbols.append({
                        "path": path, "name": f"{current_table}.{col_name}",
                        "line": i, "symbol_type": "column",
                        "parent": current_table, "domain": domain,
                        "metadata": meta,
                    })

            am = _ALTER_TABLE_ADD.search(line)
            if am:
                symbols.append({
                    "path": path,
                    "name": f"{am.group(1)}.{am.group(2)}",
                    "line": i, "symbol_type": "column",
                    "parent": am.group(1), "domain": domain,
                    "metadata": {"column_type": am.group(3).upper(),
                                 "source": sql_file.name, "added_by": "ALTER TABLE"},
                })

    seen = {}
    for s in symbols:
        key = (s["name"], s["symbol_type"])
        seen[key] = s
    deduped = list(seen.values())

    log.info("analyze_schema(%s): %d symbols from %d files",
             source.name, len(deduped), len(sql_files))
    return deduped


# ─── Wiring Analyzer ─────────────────────────────────────────────────────────

def analyze_wiring(db) -> dict:
    """Cross-reference api_calls with routes to build call-graph relationships."""
    conn = db.connect()
    routes = conn.execute("SELECT * FROM symbols WHERE symbol_type = 'route'").fetchall()
    api_calls = conn.execute("SELECT * FROM symbols WHERE symbol_type = 'api_call'").fetchall()

    stats = {"connected": 0, "orphaned_routes": 0, "orphaned_calls": 0,
             "relationships_created": 0}

    route_map = {}
    for r in routes:
        pattern = dict(r).get("name", "")
        if " " in pattern:
            _, pattern = pattern.split(" ", 1)
        norm = re.sub(r":\w+", "*", pattern)
        route_map[norm] = dict(r)

    matched_routes = set()
    for call in api_calls:
        call_dict = dict(call)
        call_norm = re.sub(r":\w+", "*", call_dict["name"])
        matched = False
        for route_norm, route_row in route_map.items():
            if _routes_match(call_norm, route_norm):
                matched = True
                matched_routes.add(route_norm)
                stats["connected"] += 1
                db.add_relationship(
                    source=f"{call_dict['path']}:{call_dict['line']}",
                    target=f"{route_row['path']}:{route_row['line']}",
                    rel_type="calls",
                    metadata={"caller": call_dict.get("parent"),
                              "route": route_row["name"],
                              "api_path": call_dict["name"]},
                )
                stats["relationships_created"] += 1
                break
        if not matched:
            stats["orphaned_calls"] += 1

    for route_norm in route_map:
        if route_norm not in matched_routes:
            stats["orphaned_routes"] += 1

    log.info("analyze_wiring: %d connected, %d orphaned routes, %d orphaned calls",
             stats["connected"], stats["orphaned_routes"], stats["orphaned_calls"])
    return stats


def _routes_match(call_pattern: str, route_pattern: str) -> bool:
    """Check if a frontend API call matches a backend route."""
    if call_pattern == route_pattern:
        return True
    call_parts = [p for p in call_pattern.split("/") if p]
    route_parts = [p for p in route_pattern.split("/") if p]
    min_len = min(len(call_parts), len(route_parts))
    if min_len < 2:
        return False
    for j in range(min_len):
        cp, rp = call_parts[j], route_parts[j]
        if cp == "*" or rp == "*":
            continue
        if cp != rp:
            return False
    return True


# ─── Main entry points ───────────────────────────────────────────────────────

KNOWN_WORKERS = [
    r"C:\AthenaSystem\...\Prototype1\weyland-worker.js",
    r"C:\MHS\Ventures\Consenta\consenta-worker.js",
]

KNOWN_FRONTENDS = [
    r"C:\AthenaSystem\...\deployment\public\subx.html",
    r"C:\MHS\Ventures\Consenta\deployment\public\consenta.html",
]

KNOWN_MIGRATIONS = [
    r"C:\AthenaSystem\...\Prototype1\migrations",
    r"C:\MHS\Ventures\Consenta\migrations",
]

def analyze_all(db=None) -> dict:
    """Run all analyzers and store results in taxonomy DB."""
    from hascom.taxonomy import TaxonomyDB
    if db is None:
        db = TaxonomyDB()
        db.connect()

    stats = {"workers": {}, "frontends": {}, "schema": {}, "wiring": {},
             "total_symbols": 0}

    for path in KNOWN_WORKERS + KNOWN_MODULES:
        if Path(path).exists():
            symbols = analyze_worker(path)
            if symbols:
                db.clear_symbols(path)
                db.insert_symbols(symbols)
                stats["workers"][Path(path).name] = len(symbols)
                stats["total_symbols"] += len(symbols)

    for path in KNOWN_FRONTENDS:
        if Path(path).exists():
            symbols = analyze_frontend(path)
            if symbols:
                db.clear_symbols(path)
                db.insert_symbols(symbols)
                stats["frontends"][Path(path).name] = len(symbols)
                stats["total_symbols"] += len(symbols)

    for mdir in KNOWN_MIGRATIONS:
        if Path(mdir).exists():
            symbols = analyze_schema(mdir)
            if symbols:
                for sql_file in Path(mdir).glob("*.sql"):
                    db.clear_symbols(str(sql_file))
                db.insert_symbols(symbols)
                stats["schema"][Path(mdir).name] = len(symbols)
                stats["total_symbols"] += len(symbols)

    stats["wiring"] = analyze_wiring(db)
    db.log_change("symbols_analyzed", detail=stats)
    log.info("analyze_all complete: %d total symbols", stats["total_symbols"])
    return stats

Testing UAT Toolkit

uat/__init__.py UAT 1.3 KB · 37 lines · Package exports
Public API surface of the UAT toolkit. Exports all assertions, the runner, and session management. from hascom.uat import * gives you everything.
"""HASCOM UAT Toolkit — general-purpose API testing framework.

Drop test suites in uat/suites/ → auto-discovered → run via CLI or capability.
"""

from hascom.uat.http import (
    UATSession, Response, api_call,
    decode_jwt_expiry, decode_jwt_claims,
)
from hascom.uat.assertions import (
    UATAssertionError,
    assert_ok, assert_status, assert_status_in,
    assert_field, assert_field_equals, assert_field_type, assert_field_in,
    assert_fields_present, assert_content_type,
    assert_numeric_equals, assert_greater_than,
    assert_pdf_valid, assert_pdf_size,
    assert_list_length, assert_latency,
)
from hascom.uat.runner import (
    TestCase, Suite, TestResult, SuiteResult,
    Runner, discover_suites, print_report, to_json,
)

__all__ = [
    "UATSession", "Response", "api_call",
    "decode_jwt_expiry", "decode_jwt_claims",
    "UATAssertionError",
    "assert_ok", "assert_status", "assert_status_in",
    "assert_field", "assert_field_equals", "assert_field_type", "assert_field_in",
    "assert_fields_present", "assert_content_type",
    "assert_numeric_equals", "assert_greater_than",
    "assert_pdf_valid", "assert_pdf_size",
    "assert_list_length", "assert_latency",
    "TestCase", "Suite", "TestResult", "SuiteResult",
    "Runner", "discover_suites", "print_report", "to_json",
]
uat/http.py UAT 13.3 KB · 382 lines · UATSession + Response + api_call + JWT auto-refresh
HTTP layer for API testing. UATSession holds base URL + JWT token. Response has structured field access (resp.field("data.quotes[0].id")). api_call() is the generic caller (urllib, no external deps). Auto-token-refresh via daemon bridge or devtools fallback.
"""UAT HTTP layer — session, response, and generic API caller.

Includes auto-token-refresh: decode JWT, check expiry, refresh via
daemon bridge (HTTP eval) or devtools.get_token fallback.
"""

import base64
import json
import logging
import time
import urllib.request
import urllib.error
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

log = logging.getLogger("hascom.uat.http")


@dataclass
class Response:
    """Structured HTTP response for UAT assertions."""
    status_code: int
    body: Any               # parsed JSON or None
    headers: Dict[str, str]
    elapsed_ms: float
    raw_bytes: Optional[bytes] = None
    error: Optional[str] = None

    @property
    def ok(self) -> bool:
        return 200 <= self.status_code < 300

    def field(self, dot_path: str) -> Any:
        """Navigate nested fields.  Supports 'data.quotes[0].id' syntax."""
        obj = self.body
        for part in dot_path.replace(']', '').split('.'):
            if '[' in part:
                key, idx = part.split('[', 1)
                obj = obj[key][int(idx)]
            elif isinstance(obj, dict):
                obj = obj[part]
            elif isinstance(obj, list):
                obj = obj[int(part)]
            else:
                raise KeyError(f"Cannot traverse {part!r} on {type(obj).__name__}")
        return obj


class UATSession:
    """Holds base URL, auth token, and shared context for a test run."""

    def __init__(self, base_url: str, token: str = ""):
        self.base_url = base_url.rstrip('/')
        self.token = token
        self.context: Dict[str, Any] = {}

    @classmethod
    def from_config(cls, auto_refresh: bool = True,
                    site: str = "subx") -> 'UATSession':
        """Load session from hascom_config.json."""
        from hascom.config import load_hascom_config
        config = load_hascom_config()
        base_url = config.get("uat_base_url",
                              "https://weyland.johnmobley99.workers.dev")
        token = config.get("uat_token", "")
        session = cls(base_url=base_url, token=token)

        if auto_refresh and token:
            expiry = decode_jwt_expiry(token)
            if expiry is not None and expiry < time.time():
                log.info("JWT expired — attempting auto-refresh")
                new_token = _auto_refresh_token(site)
                if new_token:
                    session.token = new_token
            elif expiry is not None:
                remaining = expiry - time.time()
                if remaining < 3600:
                    log.info("JWT expires in %.0f minutes", remaining / 60)

        return session

    def validate_token(self) -> bool:
        """Quick check: does the token work against /api/auth/me?"""
        resp = api_call(self, "/api/auth/me", timeout=10)
        return resp.ok


def api_call(session: UATSession, path: str, method: str = "GET",
             data: Any = None, timeout: int = 30,
             raw: bool = False) -> Response:
    """Generic HTTP call using session credentials."""
    url = f"{session.base_url}{path}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                      "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
        "Accept": "application/json, text/html, */*",
    }
    if session.token:
        headers["Authorization"] = f"Bearer {session.token}"

    body_bytes = None
    if data is not None:
        headers["Content-Type"] = "application/json"
        body_bytes = json.dumps(data).encode("utf-8")

    req = urllib.request.Request(url, data=body_bytes, headers=headers,
                                method=method)
    t0 = time.perf_counter()

    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            elapsed = (time.perf_counter() - t0) * 1000
            raw_data = resp.read()
            resp_headers = {k: v for k, v in resp.getheaders()}
            if raw:
                return Response(status_code=resp.status, body=None,
                                headers=resp_headers, elapsed_ms=elapsed,
                                raw_bytes=raw_data)
            try:
                parsed = json.loads(raw_data.decode("utf-8"))
            except (json.JSONDecodeError, UnicodeDecodeError):
                parsed = None
            return Response(status_code=resp.status, body=parsed,
                            headers=resp_headers, elapsed_ms=elapsed,
                            raw_bytes=raw_data)

    except urllib.error.HTTPError as e:
        elapsed = (time.perf_counter() - t0) * 1000
        raw_err = None
        detail = ""
        try:
            raw_err = e.read()
            detail = raw_err.decode("utf-8")[:2000]
        except Exception:
            pass
        try:
            parsed = json.loads(detail)
        except (json.JSONDecodeError, TypeError):
            parsed = {"error": detail}
        return Response(status_code=e.code, body=parsed,
                        headers={k: v for k, v in e.headers.items()} if e.headers else {},
                        elapsed_ms=elapsed, raw_bytes=raw_err, error=detail)

    except Exception as e:
        elapsed = (time.perf_counter() - t0) * 1000
        return Response(status_code=0, body=None, headers={},
                        elapsed_ms=elapsed, error=str(e))


# ─── JWT decode + auto-refresh ───────────────────────────────────────────────

def decode_jwt_expiry(token: str) -> Optional[float]:
    """Decode JWT payload and return expiry timestamp (or None)."""
    try:
        parts = token.split(".")
        if len(parts) != 3:
            return None
        payload_b64 = parts[1]
        padding = 4 - len(payload_b64) % 4
        if padding != 4:
            payload_b64 += "=" * padding
        payload_bytes = base64.urlsafe_b64decode(payload_b64)
        payload = json.loads(payload_bytes)
        return float(payload.get("exp", 0)) or None
    except Exception:
        return None


def decode_jwt_claims(token: str) -> Optional[dict]:
    """Decode JWT payload and return all claims."""
    try:
        parts = token.split(".")
        if len(parts) != 3:
            return None
        payload_b64 = parts[1]
        padding = 4 - len(payload_b64) % 4
        if padding != 4:
            payload_b64 += "=" * padding
        payload_bytes = base64.urlsafe_b64decode(payload_b64)
        return json.loads(payload_bytes)
    except Exception:
        return None
uat/assertions.py UAT 13 KB · 343 lines · 22 reusable assertions
Every assertion returns True on success, raises UATAssertionError (with structured expected/actual) on failure. Categories: status (ok/code/set), field (exists/equals/type/in/present/matches/not-empty), content-type, numeric (equals/greater), PDF (valid/size), collection (list length), performance (latency), string (contains/uuid).
"""UAT Assertion library — 22 reusable assertions.

Each returns True on success, raises UATAssertionError (with expected/actual)
on failure.  Designed for structured reporting, not stdlib AssertionError.
"""

import re


class UATAssertionError(Exception):
    """Structured assertion failure with expected/actual for reporting."""

    def __init__(self, message: str, expected=None, actual=None):
        super().__init__(message)
        self.expected = expected
        self.actual = actual


# ─── Status assertions ───────────────────────────────────────────────────────

def assert_ok(resp):
    """Assert response has 2xx status."""
    if not resp.ok:
        raise UATAssertionError(
            f"Expected 2xx, got {resp.status_code}",
            expected="2xx", actual=resp.status_code)
    return True

def assert_status(resp, code):
    """Assert exact status code."""
    if resp.status_code != code:
        raise UATAssertionError(
            f"Expected status {code}, got {resp.status_code}",
            expected=code, actual=resp.status_code)
    return True

def assert_status_in(resp, codes):
    """Assert status code is in a set."""
    if resp.status_code not in codes:
        raise UATAssertionError(
            f"Expected status in {codes}, got {resp.status_code}",
            expected=codes, actual=resp.status_code)
    return True


# ─── Field assertions ────────────────────────────────────────────────────────

def assert_field(resp, dot_path):
    """Assert a field exists and is not None."""
    try:
        val = resp.field(dot_path)
    except (KeyError, IndexError, TypeError) as e:
        raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
            expected=f"field '{dot_path}' exists", actual="missing")
    if val is None:
        raise UATAssertionError(f"Field '{dot_path}' is None",
            expected=f"field '{dot_path}' not None", actual=None)
    return True

def assert_field_equals(resp, dot_path, expected):
    """Assert a field equals expected value."""
    try:
        val = resp.field(dot_path)
    except (KeyError, IndexError, TypeError) as e:
        raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
            expected=expected, actual="missing")
    if val != expected:
        raise UATAssertionError(
            f"Field '{dot_path}': expected {expected!r}, got {val!r}",
            expected=expected, actual=val)
    return True

def assert_field_type(resp, dot_path, expected_type):
    """Assert a field is of expected type."""
    try:
        val = resp.field(dot_path)
    except (KeyError, IndexError, TypeError) as e:
        raise UATAssertionError(f"Field '{dot_path}' not found: {e}",
            expected=str(expected_type), actual="missing")
    if not isinstance(val, expected_type):
        raise UATAssertionError(
            f"Field '{dot_path}': expected {expected_type}, got {type(val).__name__}",
            expected=str(expected_type), actual=type(val).__name__)
    return True

def assert_fields_present(resp, *paths):
    """Assert multiple fields exist."""
    missing = []
    for p in paths:
        try:
            resp.field(p)
        except (KeyError, IndexError, TypeError):
            missing.append(p)
    if missing:
        raise UATAssertionError(f"Missing fields: {missing}",
            expected=list(paths), actual=f"missing: {missing}")
    return True


# ─── PDF assertions ──────────────────────────────────────────────────────────

def assert_pdf_valid(resp):
    """Assert response contains valid PDF (magic bytes + %%EOF marker)."""
    data = resp.raw_bytes
    if not data:
        raise UATAssertionError("No raw bytes", expected="PDF bytes", actual="empty")
    if data[:5] != b'%PDF-':
        raise UATAssertionError(f"Not a PDF: starts with {data[:10]!r}",
            expected="%PDF- header", actual=data[:10])
    if b'%%EOF' not in data[-32:]:
        raise UATAssertionError("PDF missing %%EOF",
            expected="%%EOF", actual=data[-16:])
    return True


# ─── Latency assertion ───────────────────────────────────────────────────────

def assert_latency(resp, max_ms):
    """Assert response time within threshold."""
    if resp.elapsed_ms > max_ms:
        raise UATAssertionError(
            f"Latency {resp.elapsed_ms:.0f}ms exceeds {max_ms}ms",
            expected=f"<= {max_ms}ms", actual=f"{resp.elapsed_ms:.0f}ms")
    return True


# ─── String assertions ───────────────────────────────────────────────────────

def assert_contains(value, substring, label="value"):
    """Assert a string contains a substring."""
    if not isinstance(value, str):
        raise UATAssertionError(f"{label}: expected string",
            expected=f"string containing '{substring}'", actual=type(value).__name__)
    if substring not in value:
        raise UATAssertionError(f"{label}: '{substring}' not found",
            expected=f"contains '{substring}'", actual=value[:200])
    return True

def assert_uuid(value, label="value"):
    """Assert a value is a valid UUID format."""
    uuid_re = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    if not re.match(uuid_re, str(value).lower()):
        raise UATAssertionError(f"{label}: '{value}' is not a valid UUID",
            expected="UUID format", actual=value)
    return True

# ... plus assert_field_in, assert_content_type, assert_numeric_equals,
#     assert_greater_than, assert_pdf_size, assert_list_length,
#     assert_not_empty, assert_field_matches, assert_field_not_empty
uat/runner.py UAT 8.9 KB · 291 lines · TestCase + Suite + Runner + Kahn topo-sort
Test execution engine. TestCase with dependency declarations. Suite collects them. Runner executes with Kahn topological sort, structured reporting, and auto-persistence to taxonomy.db (regression detection across runs). discover_suites() auto-loads from uat/suites/ via get_suite() convention.
"""UAT Runner — TestCase, Suite, Runner, discovery, reporting."""

import importlib
import logging
import pkgutil
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

from hascom.uat.assertions import UATAssertionError

log = logging.getLogger("hascom.uat")


@dataclass
class TestCase:
    """Single test — a callable with optional dependency declarations."""
    name: str
    description: str
    run: Callable              # (session) -> None;  raises on failure
    depends_on: List[str] = field(default_factory=list)
    tags: List[str] = field(default_factory=list)


@dataclass
class Suite:
    """Named collection of test cases."""
    name: str
    description: str
    tests: List[TestCase]
    tags: List[str] = field(default_factory=list)


@dataclass
class TestResult:
    """Outcome of a single test execution."""
    name: str
    status: str                # pass | fail | skip | error
    message: str = ""
    elapsed_ms: float = 0.0
    assertion_detail: Optional[Dict[str, Any]] = None


@dataclass
class SuiteResult:
    """Aggregate outcome of a full suite run."""
    suite_name: str
    results: List[TestResult]
    elapsed_ms: float = 0.0

    @property
    def ok(self) -> bool:
        return all(r.status in ("pass", "skip") for r in self.results)

    @property
    def pass_count(self) -> int:
        return sum(1 for r in self.results if r.status == "pass")

    @property
    def fail_count(self) -> int:
        return sum(1 for r in self.results if r.status == "fail")


# ─── Topological sort ────────────────────────────────────────────────────────

def _topo_sort(tests: List[TestCase]) -> List[TestCase]:
    """Order tests by depends_on using Kahn's algorithm."""
    by_name = {t.name: t for t in tests}
    in_degree: Dict[str, int] = defaultdict(int)
    graph: Dict[str, List[str]] = defaultdict(list)

    for t in tests:
        in_degree.setdefault(t.name, 0)
        for dep in t.depends_on:
            graph[dep].append(t.name)
            in_degree[t.name] += 1

    queue = deque(n for n in in_degree if in_degree[n] == 0)
    ordered: List[TestCase] = []

    while queue:
        name = queue.popleft()
        if name in by_name:
            ordered.append(by_name[name])
        for neighbor in graph[name]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    # Append any remaining (cycle / missing dep)
    seen = {t.name for t in ordered}
    for t in tests:
        if t.name not in seen:
            ordered.append(t)

    return ordered


# ─── Runner ──────────────────────────────────────────────────────────────────

class Runner:
    """Execute a suite with dependency ordering and structured reporting."""

    def run_suite(self, suite: Suite, session,
                  quiet: bool = False) -> SuiteResult:
        results: List[TestResult] = []
        failed_names: set = set()
        t_suite = time.perf_counter()

        ordered = _topo_sort(suite.tests)

        for test in ordered:
            unmet = [d for d in test.depends_on if d in failed_names]
            if unmet:
                result = TestResult(
                    name=test.name, status="skip",
                    message=f"Skipped: dependency failed ({', '.join(unmet)})")
                results.append(result)
                failed_names.add(test.name)
                if not quiet:
                    print(f"  [SKIP] {test.name} — {result.message}")
                continue

            t0 = time.perf_counter()
            try:
                test.run(session)
                elapsed = (time.perf_counter() - t0) * 1000
                result = TestResult(name=test.name, status="pass",
                                    elapsed_ms=elapsed, message="OK")
                if not quiet:
                    print(f"  [PASS] {test.name} ({elapsed:.0f}ms)")

            except UATAssertionError as e:
                elapsed = (time.perf_counter() - t0) * 1000
                result = TestResult(
                    name=test.name, status="fail",
                    elapsed_ms=elapsed, message=str(e),
                    assertion_detail={"expected": e.expected, "actual": e.actual})
                failed_names.add(test.name)
                if not quiet:
                    print(f"  [FAIL] {test.name} — {e}")

            except Exception as e:
                elapsed = (time.perf_counter() - t0) * 1000
                result = TestResult(
                    name=test.name, status="error",
                    elapsed_ms=elapsed, message=f"{type(e).__name__}: {e}")
                failed_names.add(test.name)
                if not quiet:
                    print(f"  [ERR]  {test.name} — {type(e).__name__}: {e}")

            results.append(result)

        suite_elapsed = (time.perf_counter() - t_suite) * 1000
        return SuiteResult(suite_name=suite.name, results=results,
                           elapsed_ms=suite_elapsed)


# ─── Discovery ───────────────────────────────────────────────────────────────

def discover_suites() -> Dict[str, Suite]:
    """Auto-discover suites from uat/suites/ (get_suite() convention)."""
    suites_dir = str(Path(__file__).parent / "suites")
    found: Dict[str, Suite] = {}
    for _importer, modname, _ispkg in pkgutil.iter_modules([suites_dir]):
        if modname.startswith("_"):
            continue
        try:
            mod = importlib.import_module(f"hascom.uat.suites.{modname}")
            if hasattr(mod, "get_suite"):
                suite = mod.get_suite()
                found[suite.name] = suite
        except Exception as e:
            log.warning("Failed to load suite '%s': %s", modname, e)
    return found


# ─── Reporting ───────────────────────────────────────────────────────────────

def print_report(result: SuiteResult):
    """Formatted CLI report."""
    print(f"\n{'=' * 60}")
    print(f"  Suite: {result.suite_name}")
    print(f"  Time:  {result.elapsed_ms:.0f}ms")
    print(f"{'=' * 60}")
    for r in result.results:
        tag = {"pass": "PASS", "fail": "FAIL",
               "skip": "SKIP", "error": "ERR "}.get(r.status, "????")
        line = f"  [{tag}] {r.name}"
        if r.elapsed_ms:
            line += f" ({r.elapsed_ms:.0f}ms)"
        if r.status != "pass" and r.message:
            line += f" — {r.message}"
        print(line)
    print(f"{'=' * 60}")
    p = result.pass_count
    total = len(result.results)
    verdict = "ALL PASS" if result.ok else "FAILURES"
    print(f"  {p}/{total} passed")
    print(f"  Verdict: {verdict}")
    print(f"{'=' * 60}\n")


def to_json(result: SuiteResult) -> dict:
    """Machine-readable dict for structured reporting."""
    return {
        "suite": result.suite_name,
        "ok": result.ok,
        "elapsed_ms": round(result.elapsed_ms, 1),
        "summary": {
            "total": len(result.results),
            "pass": result.pass_count,
            "fail": result.fail_count,
        },
        "tests": [
            {"name": r.name, "status": r.status, "message": r.message,
             "elapsed_ms": round(r.elapsed_ms, 1)}
            for r in result.results
        ],
    }