307 lines
11 KiB
Python
307 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
ROOT = Path(__file__).resolve().parents[2]
|
||
VERIFY_DIR = ROOT / "ansible" / "playbooks" / "verify"
|
||
DOCS_DIR = ROOT / "docs"
|
||
RESULTS_DIR = ROOT / ".status"
|
||
RESULTS_JSON = RESULTS_DIR / "verify-results.json"
|
||
BOARD_MD = DOCS_DIR / "00-04-验证状态板.md"
|
||
|
||
EXEC_ID_RE = re.compile(r"^(0[1-9]|[1-9][0-9])-(0[1-9]|[1-9][0-9])$")
|
||
OC_LINE_RE = re.compile(r"^\[OC\]\s+(?P<kv>.+)$")
|
||
|
||
|
||
def utc_now() -> str:
|
||
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
|
||
|
||
|
||
def load_results() -> dict[str, Any]:
|
||
if not RESULTS_JSON.exists():
|
||
return {"version": 1, "updated_at": None, "results": {}}
|
||
try:
|
||
return json.loads(RESULTS_JSON.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {"version": 1, "updated_at": None, "results": {}}
|
||
|
||
|
||
def save_results(data: dict[str, Any]) -> None:
|
||
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
RESULTS_JSON.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||
|
||
|
||
def list_exec_doc_ids() -> list[str]:
|
||
ids: list[str] = []
|
||
for p in VERIFY_DIR.glob("*.yml"):
|
||
if len(p.name) != len("00-00.yml"):
|
||
continue
|
||
doc_id = p.stem
|
||
if EXEC_ID_RE.fullmatch(doc_id):
|
||
ids.append(doc_id)
|
||
return sorted(set(ids))
|
||
|
||
|
||
def detect_noop(doc_id: str) -> bool:
|
||
pb = VERIFY_DIR / f"{doc_id}.yml"
|
||
if not pb.exists():
|
||
return False
|
||
txt = pb.read_text(encoding="utf-8", errors="ignore")
|
||
return ("noop verify" in txt) or ("noop-doc-verify.yml" in txt)
|
||
|
||
|
||
def doc_exists(doc_id: str) -> bool:
|
||
return any(DOCS_DIR.glob(f"{doc_id}-*.md"))
|
||
|
||
|
||
def files_dir_exists(doc_id: str) -> bool:
|
||
return (ROOT / "ansible" / "files" / doc_id).is_dir()
|
||
|
||
|
||
@dataclass
|
||
class RunResult:
|
||
rc: int
|
||
stdout: str
|
||
stderr: str
|
||
|
||
|
||
def run_verify(doc_id: str) -> RunResult:
|
||
cmd = ["bash", str(ROOT / "ansible" / "bin" / "verify.sh"), "run", doc_id]
|
||
p = subprocess.run(cmd, cwd=str(ROOT), capture_output=True, text=True, env=os.environ.copy())
|
||
return RunResult(rc=p.returncode, stdout=p.stdout, stderr=p.stderr)
|
||
|
||
|
||
def parse_oc_result(output: str, doc_id: str) -> dict[str, str] | None:
|
||
"""Parse last [OC] line for a given doc_id.
|
||
|
||
Expected format (from verify.sh):
|
||
[OC] doc_id=01-06 result=verified phase=verify assertion=...
|
||
"""
|
||
|
||
last: dict[str, str] | None = None
|
||
for line in (output or "").splitlines():
|
||
m = OC_LINE_RE.match(line.strip())
|
||
if not m:
|
||
continue
|
||
kv = m.group("kv")
|
||
parts = [p for p in kv.split(" ") if p]
|
||
data: dict[str, str] = {}
|
||
for p in parts:
|
||
if "=" not in p:
|
||
continue
|
||
k, v = p.split("=", 1)
|
||
data[k.strip()] = v.strip().strip('"')
|
||
if data.get("doc_id") == doc_id and "result" in data:
|
||
last = data
|
||
return last
|
||
|
||
|
||
def normalize_status(s: str | None) -> str | None:
|
||
if not s:
|
||
return None
|
||
s = s.strip().lower()
|
||
if s in {"verified", "failed", "gated", "noop", "unknown", "broken"}:
|
||
return s
|
||
return None
|
||
|
||
|
||
def classify(doc_id: str, rr: RunResult | None, noop: bool) -> str:
|
||
if not doc_exists(doc_id) or not (VERIFY_DIR / f"{doc_id}.yml").exists():
|
||
return "broken"
|
||
if rr is None:
|
||
return "noop" if noop else "unknown"
|
||
out = (rr.stdout or "") + "\n" + (rr.stderr or "")
|
||
oc = parse_oc_result(out, doc_id)
|
||
oc_status = normalize_status(oc.get("result") if oc else None)
|
||
if oc_status:
|
||
return oc_status
|
||
# fallback for legacy output
|
||
if "[GATE]" in out:
|
||
return "gated"
|
||
return "verified" if rr.rc == 0 else "failed"
|
||
|
||
|
||
def render_board(results_db: dict[str, Any], ids: list[str]) -> str:
|
||
updated_at = utc_now()
|
||
rows = []
|
||
conflicts: list[str] = []
|
||
for doc_id in ids:
|
||
rr = results_db.get("results", {}).get(doc_id)
|
||
noop = detect_noop(doc_id)
|
||
status = rr.get("status") if isinstance(rr, dict) else None
|
||
last = rr.get("updated_at") if isinstance(rr, dict) else None
|
||
rc = rr.get("rc") if isinstance(rr, dict) else None
|
||
stdout_tail = rr.get("stdout_tail") if isinstance(rr, dict) else None
|
||
stderr_tail = rr.get("stderr_tail") if isinstance(rr, dict) else None
|
||
|
||
doc_ok = doc_exists(doc_id)
|
||
files_ok = files_dir_exists(doc_id)
|
||
pb_ok = (VERIFY_DIR / f"{doc_id}.yml").exists()
|
||
|
||
# cache-first, but detect conflict with OC parsing when tails exist (EC6/OC6).
|
||
cache_status = normalize_status(status)
|
||
oc = None
|
||
oc_status = None
|
||
if isinstance(stdout_tail, str) or isinstance(stderr_tail, str):
|
||
out = f"{stdout_tail or ''}\n{stderr_tail or ''}"
|
||
oc = parse_oc_result(out, doc_id)
|
||
oc_status = normalize_status(oc.get("result") if oc else None)
|
||
if cache_status and oc_status and cache_status != oc_status:
|
||
conflicts.append(f"{doc_id}: cache={cache_status} oc={oc_status} (last={last})")
|
||
|
||
if cache_status is None:
|
||
# static fallback
|
||
status = "noop" if noop else "unknown"
|
||
else:
|
||
status = cache_status
|
||
rows.append(
|
||
{
|
||
"doc_id": doc_id,
|
||
"status": status,
|
||
"noop": "Y" if noop else "",
|
||
"rc": "" if rc is None else str(rc),
|
||
"last": "" if not last else str(last),
|
||
"doc": "Y" if doc_ok else "N",
|
||
"playbook": "Y" if pb_ok else "N",
|
||
"files": "Y" if files_ok else "N",
|
||
}
|
||
)
|
||
|
||
def fmt(status: str) -> str:
|
||
return {
|
||
"verified": "✅ verified",
|
||
"gated": "🟡 gated",
|
||
"failed": "❌ failed",
|
||
"noop": "⚪ noop",
|
||
"unknown": "❓ unknown",
|
||
"broken": "🚫 broken",
|
||
}.get(status, status)
|
||
|
||
lines = []
|
||
lines.append("# 00-04-验证状态板(自动生成视图)")
|
||
lines.append("")
|
||
lines.append("> 本页为**只读视图**:用于快速查看「已验证/未验证/门控/失败」。")
|
||
lines.append("> **执行真源**仍以 `ansible/playbooks/verify/*.yml` 为准;本页不承载执行逻辑。")
|
||
lines.append("")
|
||
lines.append(f"- 最近生成时间(UTC):`{updated_at}`")
|
||
lines.append(f"- 本地结果缓存(不入库):`{RESULTS_JSON.relative_to(ROOT)}`")
|
||
lines.append("")
|
||
if conflicts:
|
||
lines.append("## ⚠️ 结果冲突告警(缓存 vs OC 解析)")
|
||
lines.append("")
|
||
lines.append("> 读源优先级:缓存优先;但若缓存状态与 OC 解析不一致,需排查 verify 输出或缓存一致性。")
|
||
lines.append("")
|
||
for c in conflicts[:50]:
|
||
lines.append(f"- `{c}`")
|
||
if len(conflicts) > 50:
|
||
lines.append(f"- ...(共 {len(conflicts)} 条,已截断)")
|
||
lines.append("")
|
||
lines.append("## 快速更新")
|
||
lines.append("")
|
||
lines.append("在仓库根执行:")
|
||
lines.append("")
|
||
lines.append("```bash")
|
||
lines.append("# 仅渲染(不跑真机验证,按缓存/静态信息生成)")
|
||
lines.append("python3 ansible/tools/status_board.py render")
|
||
lines.append("")
|
||
lines.append("# 真机跑一轮并写入缓存(会执行 verify playbook)")
|
||
lines.append("python3 ansible/tools/status_board.py update --all")
|
||
lines.append("python3 ansible/tools/status_board.py render")
|
||
lines.append("```")
|
||
lines.append("")
|
||
lines.append("## 状态表")
|
||
lines.append("")
|
||
lines.append("| doc_id | 状态 | noop | rc | last_update | docs | playbook | files |")
|
||
lines.append("|---|---|---:|---:|---|---:|---:|---:|")
|
||
for r in rows:
|
||
lines.append(
|
||
f"| {r['doc_id']} | {fmt(r['status'])} | {r['noop']} | {r['rc']} | {r['last']} | {r['doc']} | {r['playbook']} | {r['files']} |"
|
||
)
|
||
lines.append("")
|
||
lines.append("## 口径说明")
|
||
lines.append("")
|
||
lines.append("- **verified/gated/failed/noop/unknown**:以 verify 输出的 `[OC] ... result=<status>` 为准;缺失 OC 时回退到 legacy 规则。")
|
||
lines.append("- **gated**:必须附带 `missing_dependency` 与 `skip_scope`(见 Output Contract OC2)。")
|
||
lines.append("- **noop**:该 doc_id 的 verify playbook 为 noop 模式(仅基线/存在性/结构检查)。")
|
||
lines.append("- **unknown**:尚未在本机写入结果缓存(或仅静态生成)。")
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def cmd_update(args: argparse.Namespace) -> int:
|
||
db = load_results()
|
||
ids = list_exec_doc_ids()
|
||
if args.all:
|
||
target_ids = ids
|
||
else:
|
||
target_ids = args.doc_ids
|
||
|
||
results: dict[str, Any] = db.get("results", {}) if isinstance(db.get("results"), dict) else {}
|
||
|
||
for doc_id in target_ids:
|
||
if not EXEC_ID_RE.fullmatch(doc_id):
|
||
continue
|
||
rr = run_verify(doc_id)
|
||
noop = detect_noop(doc_id)
|
||
status = classify(doc_id, rr, noop)
|
||
out = (rr.stdout or "") + "\n" + (rr.stderr or "")
|
||
oc = parse_oc_result(out, doc_id)
|
||
results[doc_id] = {
|
||
"updated_at": utc_now(),
|
||
"rc": rr.rc,
|
||
"status": status,
|
||
"noop": noop,
|
||
"oc": oc or None,
|
||
# keep small: store tail only
|
||
"stdout_tail": (rr.stdout or "")[-4000:],
|
||
"stderr_tail": (rr.stderr or "")[-4000:],
|
||
}
|
||
|
||
db["version"] = 1
|
||
db["updated_at"] = utc_now()
|
||
db["results"] = results
|
||
save_results(db)
|
||
return 0
|
||
|
||
|
||
def cmd_render(_: argparse.Namespace) -> int:
|
||
db = load_results()
|
||
ids = list_exec_doc_ids()
|
||
md = render_board(db, ids)
|
||
BOARD_MD.write_text(md + "\n", encoding="utf-8")
|
||
return 0
|
||
|
||
|
||
def main() -> int:
|
||
if not VERIFY_DIR.is_dir() or not DOCS_DIR.is_dir():
|
||
raise SystemExit("ERR: not in repo root layout")
|
||
|
||
ap = argparse.ArgumentParser(prog="status_board.py")
|
||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||
|
||
sp_u = sub.add_parser("update", help="Run verify and write local cache (not committed)")
|
||
sp_u.add_argument("--all", action="store_true", help="Update all exec doc_ids from verify dir")
|
||
sp_u.add_argument("doc_ids", nargs="*", help="Doc IDs like 03-07 (ignored if --all)")
|
||
sp_u.set_defaults(func=cmd_update)
|
||
|
||
sp_r = sub.add_parser("render", help="Render docs/00-04-验证状态板.md from cache/static")
|
||
sp_r.set_defaults(func=cmd_render)
|
||
|
||
args = ap.parse_args()
|
||
return int(args.func(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|
||
|