#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re import subprocess from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] VERIFY_DIR = ROOT / "ansible" / "playbooks" / "verify" DOCS_DIR = ROOT / "docs" RESULTS_DIR = ROOT / ".status" RESULTS_JSON = RESULTS_DIR / "verify-results.json" BOARD_MD = DOCS_DIR / "00-04-验证状态板.md" EXEC_ID_RE = re.compile(r"^(0[1-9]|[1-9][0-9])-(0[1-9]|[1-9][0-9])$") OC_LINE_RE = re.compile(r"^\[OC\]\s+(?P.+)$") def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def load_results() -> dict[str, Any]: if not RESULTS_JSON.exists(): return {"version": 1, "updated_at": None, "results": {}} try: return json.loads(RESULTS_JSON.read_text(encoding="utf-8")) except Exception: return {"version": 1, "updated_at": None, "results": {}} def save_results(data: dict[str, Any]) -> None: RESULTS_DIR.mkdir(parents=True, exist_ok=True) RESULTS_JSON.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def list_exec_doc_ids() -> list[str]: ids: list[str] = [] for p in VERIFY_DIR.glob("*.yml"): if len(p.name) != len("00-00.yml"): continue doc_id = p.stem if EXEC_ID_RE.fullmatch(doc_id): ids.append(doc_id) return sorted(set(ids)) def detect_noop(doc_id: str) -> bool: pb = VERIFY_DIR / f"{doc_id}.yml" if not pb.exists(): return False txt = pb.read_text(encoding="utf-8", errors="ignore") return ("noop verify" in txt) or ("noop-doc-verify.yml" in txt) def doc_exists(doc_id: str) -> bool: return any(DOCS_DIR.glob(f"{doc_id}-*.md")) def files_dir_exists(doc_id: str) -> bool: return (ROOT / "ansible" / "files" / doc_id).is_dir() @dataclass class RunResult: rc: int stdout: str stderr: str def run_verify(doc_id: str) -> RunResult: cmd = ["bash", str(ROOT / "ansible" / "bin" / "verify.sh"), "run", doc_id] p = subprocess.run(cmd, cwd=str(ROOT), capture_output=True, text=True, env=os.environ.copy()) return RunResult(rc=p.returncode, stdout=p.stdout, stderr=p.stderr) def parse_oc_result(output: str, doc_id: str) -> dict[str, str] | None: """Parse last [OC] line for a given doc_id. Expected format (from verify.sh): [OC] doc_id=01-06 result=verified phase=verify assertion=... """ last: dict[str, str] | None = None for line in (output or "").splitlines(): m = OC_LINE_RE.match(line.strip()) if not m: continue kv = m.group("kv") parts = [p for p in kv.split(" ") if p] data: dict[str, str] = {} for p in parts: if "=" not in p: continue k, v = p.split("=", 1) data[k.strip()] = v.strip().strip('"') if data.get("doc_id") == doc_id and "result" in data: last = data return last def normalize_status(s: str | None) -> str | None: if not s: return None s = s.strip().lower() if s in {"verified", "failed", "gated", "noop", "unknown", "broken"}: return s return None def classify(doc_id: str, rr: RunResult | None, noop: bool) -> str: if not doc_exists(doc_id) or not (VERIFY_DIR / f"{doc_id}.yml").exists(): return "broken" if rr is None: return "noop" if noop else "unknown" out = (rr.stdout or "") + "\n" + (rr.stderr or "") oc = parse_oc_result(out, doc_id) oc_status = normalize_status(oc.get("result") if oc else None) if oc_status: return oc_status # fallback for legacy output if "[GATE]" in out: return "gated" return "verified" if rr.rc == 0 else "failed" def render_board(results_db: dict[str, Any], ids: list[str]) -> str: updated_at = utc_now() rows = [] conflicts: list[str] = [] for doc_id in ids: rr = results_db.get("results", {}).get(doc_id) noop = detect_noop(doc_id) status = rr.get("status") if isinstance(rr, dict) else None last = rr.get("updated_at") if isinstance(rr, dict) else None rc = rr.get("rc") if isinstance(rr, dict) else None stdout_tail = rr.get("stdout_tail") if isinstance(rr, dict) else None stderr_tail = rr.get("stderr_tail") if isinstance(rr, dict) else None doc_ok = doc_exists(doc_id) files_ok = files_dir_exists(doc_id) pb_ok = (VERIFY_DIR / f"{doc_id}.yml").exists() # cache-first, but detect conflict with OC parsing when tails exist (EC6/OC6). cache_status = normalize_status(status) oc = None oc_status = None if isinstance(stdout_tail, str) or isinstance(stderr_tail, str): out = f"{stdout_tail or ''}\n{stderr_tail or ''}" oc = parse_oc_result(out, doc_id) oc_status = normalize_status(oc.get("result") if oc else None) if cache_status and oc_status and cache_status != oc_status: conflicts.append(f"{doc_id}: cache={cache_status} oc={oc_status} (last={last})") if cache_status is None: # static fallback status = "noop" if noop else "unknown" else: status = cache_status rows.append( { "doc_id": doc_id, "status": status, "noop": "Y" if noop else "", "rc": "" if rc is None else str(rc), "last": "" if not last else str(last), "doc": "Y" if doc_ok else "N", "playbook": "Y" if pb_ok else "N", "files": "Y" if files_ok else "N", } ) def fmt(status: str) -> str: return { "verified": "✅ verified", "gated": "🟡 gated", "failed": "❌ failed", "noop": "⚪ noop", "unknown": "❓ unknown", "broken": "🚫 broken", }.get(status, status) lines = [] lines.append("# 00-04-验证状态板(自动生成视图)") lines.append("") lines.append("> 本页为**只读视图**:用于快速查看「已验证/未验证/门控/失败」。") lines.append("> **执行真源**仍以 `ansible/playbooks/verify/*.yml` 为准;本页不承载执行逻辑。") lines.append("") lines.append(f"- 最近生成时间(UTC):`{updated_at}`") lines.append(f"- 本地结果缓存(不入库):`{RESULTS_JSON.relative_to(ROOT)}`") lines.append("") if conflicts: lines.append("## ⚠️ 结果冲突告警(缓存 vs OC 解析)") lines.append("") lines.append("> 读源优先级:缓存优先;但若缓存状态与 OC 解析不一致,需排查 verify 输出或缓存一致性。") lines.append("") for c in conflicts[:50]: lines.append(f"- `{c}`") if len(conflicts) > 50: lines.append(f"- ...(共 {len(conflicts)} 条,已截断)") lines.append("") lines.append("## 快速更新") lines.append("") lines.append("在仓库根执行:") lines.append("") lines.append("```bash") lines.append("# 仅渲染(不跑真机验证,按缓存/静态信息生成)") lines.append("python3 ansible/tools/status_board.py render") lines.append("") lines.append("# 真机跑一轮并写入缓存(会执行 verify playbook)") lines.append("python3 ansible/tools/status_board.py update --all") lines.append("python3 ansible/tools/status_board.py render") lines.append("```") lines.append("") lines.append("## 状态表") lines.append("") lines.append("| doc_id | 状态 | noop | rc | last_update | docs | playbook | files |") lines.append("|---|---|---:|---:|---|---:|---:|---:|") for r in rows: lines.append( f"| {r['doc_id']} | {fmt(r['status'])} | {r['noop']} | {r['rc']} | {r['last']} | {r['doc']} | {r['playbook']} | {r['files']} |" ) lines.append("") lines.append("## 口径说明") lines.append("") lines.append("- **verified/gated/failed/noop/unknown**:以 verify 输出的 `[OC] ... result=` 为准;缺失 OC 时回退到 legacy 规则。") lines.append("- **gated**:必须附带 `missing_dependency` 与 `skip_scope`(见 Output Contract OC2)。") lines.append("- **noop**:该 doc_id 的 verify playbook 为 noop 模式(仅基线/存在性/结构检查)。") lines.append("- **unknown**:尚未在本机写入结果缓存(或仅静态生成)。") lines.append("") return "\n".join(lines) def cmd_update(args: argparse.Namespace) -> int: db = load_results() ids = list_exec_doc_ids() if args.all: target_ids = ids else: target_ids = args.doc_ids results: dict[str, Any] = db.get("results", {}) if isinstance(db.get("results"), dict) else {} for doc_id in target_ids: if not EXEC_ID_RE.fullmatch(doc_id): continue rr = run_verify(doc_id) noop = detect_noop(doc_id) status = classify(doc_id, rr, noop) out = (rr.stdout or "") + "\n" + (rr.stderr or "") oc = parse_oc_result(out, doc_id) results[doc_id] = { "updated_at": utc_now(), "rc": rr.rc, "status": status, "noop": noop, "oc": oc or None, # keep small: store tail only "stdout_tail": (rr.stdout or "")[-4000:], "stderr_tail": (rr.stderr or "")[-4000:], } db["version"] = 1 db["updated_at"] = utc_now() db["results"] = results save_results(db) return 0 def cmd_render(_: argparse.Namespace) -> int: db = load_results() ids = list_exec_doc_ids() md = render_board(db, ids) BOARD_MD.write_text(md + "\n", encoding="utf-8") return 0 def main() -> int: if not VERIFY_DIR.is_dir() or not DOCS_DIR.is_dir(): raise SystemExit("ERR: not in repo root layout") ap = argparse.ArgumentParser(prog="status_board.py") sub = ap.add_subparsers(dest="cmd", required=True) sp_u = sub.add_parser("update", help="Run verify and write local cache (not committed)") sp_u.add_argument("--all", action="store_true", help="Update all exec doc_ids from verify dir") sp_u.add_argument("doc_ids", nargs="*", help="Doc IDs like 03-07 (ignored if --all)") sp_u.set_defaults(func=cmd_update) sp_r = sub.add_parser("render", help="Render docs/00-04-验证状态板.md from cache/static") sp_r.set_defaults(func=cmd_render) args = ap.parse_args() return int(args.func(args)) if __name__ == "__main__": raise SystemExit(main())