A compounding LLM-maintained knowledge wiki. Synthesis of Andrej Karpathy's persistent-wiki gist and milla-jovovich's mempalace, with an automation layer on top for conversation mining, URL harvesting, human-in-the-loop staging, staleness decay, and hygiene. Includes: - 11 pipeline scripts (extract, summarize, index, harvest, stage, hygiene, maintain, sync, + shared library) - Full docs: README, SETUP, ARCHITECTURE, DESIGN-RATIONALE, CUSTOMIZE - Example CLAUDE.md files (wiki schema + global instructions) tuned for the three-collection qmd setup - 171-test pytest suite (cross-platform, runs in ~1.3s) - MIT licensed
1588 lines
58 KiB
Python
Executable File
1588 lines
58 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Automated wiki hygiene — quick (no LLM) and full (LLM) modes.
|
|
|
|
Implements Plan 02 (staleness & archive) and Plan 04 (automated hygiene).
|
|
|
|
Quick mode checks (daily, no LLM):
|
|
- Backfill missing last_verified
|
|
- Refresh last_verified from conversation references
|
|
- Auto-restore archived pages referenced again
|
|
- Confidence decay per thresholds
|
|
- Archive stale and superseded pages
|
|
- Frontmatter repair (missing required fields)
|
|
- Orphan pages (no inbound links)
|
|
- Broken cross-references (with fuzzy-match fix)
|
|
- Main index drift (missing/orphan entries)
|
|
- Empty stubs (report-only)
|
|
- State file drift (report-only)
|
|
- Staging/archive index resync
|
|
|
|
Full mode checks (weekly, LLM-powered, extends quick):
|
|
- Missing cross-references (haiku)
|
|
- Duplicate coverage (sonnet)
|
|
- Contradictions (sonnet, report-only)
|
|
- Technology lifecycle (haiku)
|
|
|
|
Usage:
|
|
python3 scripts/wiki-hygiene.py # Quick mode (default)
|
|
python3 scripts/wiki-hygiene.py --quick # Explicit quick
|
|
python3 scripts/wiki-hygiene.py --full # Full mode (quick + LLM)
|
|
python3 scripts/wiki-hygiene.py --dry-run # Show what would change
|
|
python3 scripts/wiki-hygiene.py --check-only # Report only, no auto-fixes
|
|
python3 scripts/wiki-hygiene.py --backfill # Backfill last_verified only
|
|
python3 scripts/wiki-hygiene.py --scan-refs # Refresh from conversation refs only
|
|
python3 scripts/wiki-hygiene.py --archive PATH # Manually archive a page
|
|
python3 scripts/wiki-hygiene.py --restore PATH # Manually restore an archived page
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import difflib
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from datetime import date, datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from wiki_lib import ( # noqa: E402
|
|
ARCHIVE_DIR,
|
|
ARCHIVE_INDEX,
|
|
CONVERSATIONS_DIR,
|
|
HARVEST_STATE_FILE,
|
|
INDEX_FILE,
|
|
LIVE_CONTENT_DIRS,
|
|
REPORTS_DIR,
|
|
STAGING_DIR,
|
|
STAGING_INDEX,
|
|
WIKI_DIR,
|
|
WikiPage,
|
|
iter_archived_pages,
|
|
iter_live_pages,
|
|
iter_staging_pages,
|
|
page_content_hash,
|
|
parse_date,
|
|
parse_page,
|
|
today,
|
|
write_page,
|
|
)
|
|
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
sys.stderr.reconfigure(line_buffering=True)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
HYGIENE_STATE_FILE = WIKI_DIR / ".hygiene-state.json"
|
|
MINE_STATE_FILE = WIKI_DIR / ".mine-state.json"
|
|
|
|
# Decay thresholds in days since last_verified
|
|
DECAY_HIGH_TO_MEDIUM = 180
|
|
DECAY_MEDIUM_TO_LOW = 270
|
|
DECAY_LOW_TO_STALE = 365
|
|
|
|
CONFIDENCE_ORDER = ["stale", "low", "medium", "high"]
|
|
VALID_CONFIDENCE = {"high", "medium", "low", "stale"}
|
|
VALID_TYPES = {"pattern", "decision", "environment", "concept"}
|
|
|
|
EMPTY_STUB_THRESHOLD = 100 # body chars below which a page is a stub
|
|
|
|
# Required fields per type — missing → auto-fix
|
|
REQUIRED_FIELDS = ["title", "type", "confidence", "last_compiled", "last_verified"]
|
|
|
|
# LLM call defaults
|
|
CLAUDE_TIMEOUT = 300
|
|
CLAUDE_HAIKU = "haiku"
|
|
CLAUDE_SONNET = "sonnet"
|
|
|
|
# Tech version patterns for lifecycle check
|
|
VERSION_REGEX = re.compile(
|
|
r"\b(?:Node(?:\.js)?|Python|Docker|PostgreSQL|MySQL|Redis|Next\.js|NestJS)\s+(\d+(?:\.\d+)?)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hygiene state (.hygiene-state.json)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def load_hygiene_state() -> dict[str, Any]:
|
|
if HYGIENE_STATE_FILE.exists():
|
|
try:
|
|
with open(HYGIENE_STATE_FILE) as f:
|
|
return json.load(f)
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
return {
|
|
"last_quick_run": None,
|
|
"last_full_run": None,
|
|
"pages_checked": {},
|
|
"deferred_issues": [],
|
|
}
|
|
|
|
|
|
def save_hygiene_state(state: dict[str, Any]) -> None:
|
|
tmp = HYGIENE_STATE_FILE.with_suffix(".json.tmp")
|
|
with open(tmp, "w") as f:
|
|
json.dump(state, f, indent=2, sort_keys=True)
|
|
tmp.replace(HYGIENE_STATE_FILE)
|
|
|
|
|
|
def mark_page_checked(state: dict[str, Any], page: WikiPage, mode: str) -> None:
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
entry = state.setdefault("pages_checked", {}).setdefault(rel, {})
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
if mode == "quick":
|
|
entry["last_checked_quick"] = now
|
|
elif mode == "full":
|
|
entry["last_checked_full"] = now
|
|
entry["content_hash"] = page_content_hash(page)
|
|
|
|
|
|
def page_changed_since(state: dict[str, Any], page: WikiPage, mode: str) -> bool:
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
entry = state.get("pages_checked", {}).get(rel, {})
|
|
stored_hash = entry.get("content_hash")
|
|
if not stored_hash:
|
|
return True
|
|
return stored_hash != page_content_hash(page)
|
|
|
|
|
|
def is_deferred(state: dict[str, Any], issue_type: str, pages: list[str]) -> bool:
|
|
sorted_pages = sorted(pages)
|
|
for issue in state.get("deferred_issues", []):
|
|
if issue.get("type") == issue_type and sorted(issue.get("pages", [])) == sorted_pages:
|
|
return True
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Date / git helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def git_first_commit_date(path: Path) -> date | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "-C", str(WIKI_DIR), "log", "--diff-filter=A", "--format=%cs", "--", str(path.relative_to(WIKI_DIR))],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return parse_date(result.stdout.strip().splitlines()[-1])
|
|
except (subprocess.TimeoutExpired, OSError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def file_mtime_date(path: Path) -> date:
|
|
return datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).date()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backfill last_verified
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def backfill_last_verified(dry_run: bool = False) -> list[tuple[Path, str, date]]:
|
|
changes: list[tuple[Path, str, date]] = []
|
|
for page in iter_live_pages():
|
|
if "last_verified" in page.frontmatter and parse_date(page.frontmatter["last_verified"]):
|
|
continue
|
|
|
|
source = "mtime"
|
|
d = parse_date(page.frontmatter.get("last_compiled"))
|
|
if d:
|
|
source = "last_compiled"
|
|
else:
|
|
d = git_first_commit_date(page.path)
|
|
if d:
|
|
source = "git"
|
|
else:
|
|
d = file_mtime_date(page.path)
|
|
|
|
changes.append((page.path, source, d))
|
|
if not dry_run:
|
|
page.frontmatter["last_verified"] = d.isoformat()
|
|
write_page(page)
|
|
return changes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Frontmatter repair
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def repair_frontmatter(dry_run: bool = False) -> list[tuple[Path, list[str]]]:
|
|
"""Add missing required fields with sensible defaults. Returns list of (page, fields_fixed)."""
|
|
changes: list[tuple[Path, list[str]]] = []
|
|
for page in iter_live_pages():
|
|
fixes: list[str] = []
|
|
fm = page.frontmatter
|
|
|
|
if "title" not in fm:
|
|
fm["title"] = page.path.stem.replace("-", " ").title()
|
|
fixes.append("title")
|
|
|
|
if "type" not in fm or fm["type"] not in VALID_TYPES:
|
|
inferred = page.path.parent.name.rstrip("s")
|
|
if inferred in VALID_TYPES:
|
|
fm["type"] = inferred
|
|
fixes.append("type")
|
|
|
|
if "confidence" not in fm or str(fm.get("confidence")) not in VALID_CONFIDENCE:
|
|
fm["confidence"] = "medium"
|
|
fixes.append("confidence")
|
|
|
|
if "last_compiled" not in fm or not parse_date(fm.get("last_compiled")):
|
|
d = git_first_commit_date(page.path) or file_mtime_date(page.path)
|
|
fm["last_compiled"] = d.isoformat()
|
|
fixes.append("last_compiled")
|
|
|
|
if "last_verified" not in fm or not parse_date(fm.get("last_verified")):
|
|
fm["last_verified"] = fm.get("last_compiled") or today().isoformat()
|
|
fixes.append("last_verified")
|
|
|
|
if "sources" not in fm:
|
|
fm["sources"] = []
|
|
fixes.append("sources")
|
|
|
|
if "related" not in fm:
|
|
fm["related"] = []
|
|
fixes.append("related")
|
|
|
|
if fixes:
|
|
changes.append((page.path, fixes))
|
|
if not dry_run:
|
|
write_page(page)
|
|
return changes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Confidence decay
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def expected_confidence(current: str, last_verified: date | None, is_superseded: bool) -> str:
|
|
if is_superseded:
|
|
return "stale"
|
|
if not last_verified:
|
|
return current
|
|
elapsed = (today() - last_verified).days
|
|
if elapsed >= DECAY_LOW_TO_STALE:
|
|
return "stale"
|
|
if elapsed >= DECAY_MEDIUM_TO_LOW:
|
|
return _min_confidence(current, "low")
|
|
if elapsed >= DECAY_HIGH_TO_MEDIUM:
|
|
return _min_confidence(current, "medium")
|
|
return current
|
|
|
|
|
|
def _min_confidence(a: str, b: str) -> str:
|
|
order = {c: i for i, c in enumerate(CONFIDENCE_ORDER)}
|
|
ai = order.get(a, len(CONFIDENCE_ORDER))
|
|
bi = order.get(b, len(CONFIDENCE_ORDER))
|
|
return CONFIDENCE_ORDER[min(ai, bi)]
|
|
|
|
|
|
def bump_confidence(current: str) -> str:
|
|
idx = CONFIDENCE_ORDER.index(current) if current in CONFIDENCE_ORDER else 0
|
|
return CONFIDENCE_ORDER[min(idx + 1, len(CONFIDENCE_ORDER) - 1)]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Archive / Restore
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def archive_page(page: WikiPage, reason: str, dry_run: bool = False) -> Path | None:
|
|
rel = page.path.relative_to(WIKI_DIR)
|
|
parts = rel.parts
|
|
if len(parts) < 2 or parts[0] not in LIVE_CONTENT_DIRS:
|
|
print(f" [warn] cannot archive {rel} — not a live content page", file=sys.stderr)
|
|
return None
|
|
|
|
dest = ARCHIVE_DIR / rel
|
|
original_path = str(rel)
|
|
|
|
if dry_run:
|
|
print(f" [dry-run] archive {rel} → {dest.relative_to(WIKI_DIR)} ({reason})")
|
|
return dest
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
page.frontmatter["archived_date"] = today().isoformat()
|
|
page.frontmatter["archived_reason"] = reason
|
|
page.frontmatter["original_path"] = original_path
|
|
page.frontmatter["confidence"] = "stale"
|
|
page.path.rename(dest)
|
|
page.path = dest
|
|
write_page(page)
|
|
|
|
_remove_from_main_index(original_path)
|
|
_append_to_archive_index(dest, original_path, reason)
|
|
_rewrite_cross_references(original_path, f"archive/{original_path}")
|
|
return dest
|
|
|
|
|
|
def restore_page(page: WikiPage, dry_run: bool = False) -> Path | None:
|
|
original_path = page.frontmatter.get("original_path")
|
|
if not original_path:
|
|
rel = page.path.relative_to(ARCHIVE_DIR)
|
|
original_path = str(rel)
|
|
|
|
dest = WIKI_DIR / original_path
|
|
if dry_run:
|
|
print(f" [dry-run] restore {page.path.relative_to(WIKI_DIR)} → {original_path}")
|
|
return dest
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
for key in ("archived_date", "archived_reason", "original_path"):
|
|
page.frontmatter.pop(key, None)
|
|
page.frontmatter["confidence"] = "medium"
|
|
page.frontmatter["last_verified"] = today().isoformat()
|
|
old = page.path
|
|
page.path.rename(dest)
|
|
page.path = dest
|
|
write_page(page)
|
|
|
|
_remove_from_archive_index(str(old.relative_to(ARCHIVE_DIR)))
|
|
_rewrite_cross_references(f"archive/{original_path}", original_path)
|
|
return dest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Index I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _remove_from_main_index(original_path: str) -> None:
|
|
if not INDEX_FILE.exists():
|
|
return
|
|
text = INDEX_FILE.read_text()
|
|
lines = text.splitlines(keepends=True)
|
|
pattern = re.compile(rf"^- \[.+\]\({re.escape(original_path)}\) ")
|
|
new_lines = [line for line in lines if not pattern.match(line)]
|
|
if len(new_lines) != len(lines):
|
|
INDEX_FILE.write_text("".join(new_lines))
|
|
|
|
|
|
def _append_to_archive_index(archived_path: Path, original_path: str, reason: str) -> None:
|
|
ARCHIVE_INDEX.parent.mkdir(parents=True, exist_ok=True)
|
|
if not ARCHIVE_INDEX.exists():
|
|
ARCHIVE_INDEX.write_text(_default_archive_index())
|
|
text = ARCHIVE_INDEX.read_text()
|
|
name = archived_path.stem.replace("-", " ").title()
|
|
rel_in_archive = archived_path.relative_to(ARCHIVE_DIR)
|
|
row = f"| [{name}]({rel_in_archive}) | {original_path} | {today().isoformat()} | {reason} |\n"
|
|
text = text.replace("| _(none yet)_ | | | |\n", "")
|
|
if row.strip() in text:
|
|
return
|
|
ARCHIVE_INDEX.write_text(text.rstrip() + "\n" + row)
|
|
|
|
|
|
def _remove_from_archive_index(rel_in_archive: str) -> None:
|
|
if not ARCHIVE_INDEX.exists():
|
|
return
|
|
text = ARCHIVE_INDEX.read_text()
|
|
pattern = re.compile(rf"^\|\s*\[.+\]\({re.escape(rel_in_archive)}\).*\n", re.MULTILINE)
|
|
new_text = pattern.sub("", text)
|
|
if new_text != text:
|
|
ARCHIVE_INDEX.write_text(new_text)
|
|
|
|
|
|
def _default_archive_index() -> str:
|
|
return (
|
|
"# Archived Wiki Pages\n\n"
|
|
"Pages archived due to staleness or obsolescence.\n\n"
|
|
"## Archived Pages\n\n"
|
|
"| Page | Original Location | Archived | Reason |\n"
|
|
"|------|-------------------|----------|--------|\n"
|
|
)
|
|
|
|
|
|
def _add_to_main_index(rel_path: str, title: str, summary: str = "") -> None:
|
|
if not INDEX_FILE.exists():
|
|
return
|
|
text = INDEX_FILE.read_text()
|
|
if f"]({rel_path})" in text:
|
|
return
|
|
entry = f"- [{title}]({rel_path})"
|
|
if summary:
|
|
entry += f" — {summary}"
|
|
entry += "\n"
|
|
ptype = rel_path.split("/")[0]
|
|
section_headers = {
|
|
"patterns": "## Patterns",
|
|
"decisions": "## Decisions",
|
|
"concepts": "## Concepts",
|
|
"environments": "## Environments",
|
|
}
|
|
header = section_headers.get(ptype)
|
|
if header and header in text:
|
|
idx = text.find(header)
|
|
next_header = text.find("\n## ", idx + len(header))
|
|
if next_header == -1:
|
|
next_header = len(text)
|
|
section = text[idx:next_header]
|
|
last_nl = section.rfind("\n", 0, len(section) - 1) + 1
|
|
INDEX_FILE.write_text(text[: idx + last_nl] + entry + text[idx + last_nl :])
|
|
else:
|
|
INDEX_FILE.write_text(text.rstrip() + "\n" + entry)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-reference rewriting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _rewrite_cross_references(old_path: str, new_path: str) -> int:
|
|
targets: list[Path] = [INDEX_FILE]
|
|
for sub in LIVE_CONTENT_DIRS:
|
|
targets.extend((WIKI_DIR / sub).glob("*.md"))
|
|
if STAGING_DIR.exists():
|
|
for sub in LIVE_CONTENT_DIRS:
|
|
targets.extend((STAGING_DIR / sub).glob("*.md"))
|
|
if ARCHIVE_DIR.exists():
|
|
for sub in LIVE_CONTENT_DIRS:
|
|
targets.extend((ARCHIVE_DIR / sub).glob("*.md"))
|
|
|
|
count = 0
|
|
old_esc = re.escape(old_path)
|
|
link_patterns = [
|
|
(re.compile(rf"\]\({old_esc}\)"), f"]({new_path})"),
|
|
(re.compile(rf"\]\(\.\./{old_esc}\)"), f"](../{new_path})"),
|
|
]
|
|
related_patterns = [
|
|
(re.compile(rf"^(\s*-\s*){old_esc}$", re.MULTILINE), rf"\g<1>{new_path}"),
|
|
]
|
|
for target in targets:
|
|
if not target.exists():
|
|
continue
|
|
try:
|
|
text = target.read_text()
|
|
except OSError:
|
|
continue
|
|
new_text = text
|
|
for pat, repl in link_patterns + related_patterns:
|
|
new_text = pat.sub(repl, new_text)
|
|
if new_text != text:
|
|
target.write_text(new_text)
|
|
count += 1
|
|
return count
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Conversation refresh signals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def scan_conversation_references() -> dict[str, date]:
|
|
refs: dict[str, date] = {}
|
|
if not CONVERSATIONS_DIR.exists():
|
|
return refs
|
|
|
|
page_link_pattern = re.compile(
|
|
r"(?:patterns|decisions|concepts|environments)/[\w\-]+\.md"
|
|
)
|
|
for project_dir in CONVERSATIONS_DIR.iterdir():
|
|
if not project_dir.is_dir():
|
|
continue
|
|
for md in project_dir.glob("*.md"):
|
|
page = parse_page(md)
|
|
if not page:
|
|
continue
|
|
if page.frontmatter.get("status") != "summarized":
|
|
continue
|
|
conv_date = parse_date(page.frontmatter.get("date"))
|
|
if not conv_date:
|
|
continue
|
|
related = page.frontmatter.get("related") or []
|
|
if isinstance(related, list):
|
|
for ref in related:
|
|
m = page_link_pattern.search(str(ref))
|
|
if m:
|
|
path = m.group(0)
|
|
if path not in refs or conv_date > refs[path]:
|
|
refs[path] = conv_date
|
|
for m in page_link_pattern.finditer(page.body):
|
|
path = m.group(0)
|
|
if path not in refs or conv_date > refs[path]:
|
|
refs[path] = conv_date
|
|
return refs
|
|
|
|
|
|
def apply_refresh_signals(refs: dict[str, date], dry_run: bool = False) -> list[tuple[Path, str, str, date]]:
|
|
changes: list[tuple[Path, str, str, date]] = []
|
|
for page in iter_live_pages():
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
ref_date = refs.get(rel)
|
|
if not ref_date:
|
|
continue
|
|
current_verified = parse_date(page.frontmatter.get("last_verified"))
|
|
if current_verified and current_verified >= ref_date:
|
|
continue
|
|
old_conf = str(page.frontmatter.get("confidence", "medium"))
|
|
new_conf = bump_confidence(old_conf) if old_conf in ("low", "medium") else old_conf
|
|
changes.append((page.path, old_conf, new_conf, ref_date))
|
|
if not dry_run:
|
|
page.frontmatter["last_verified"] = ref_date.isoformat()
|
|
if new_conf != old_conf:
|
|
page.frontmatter["confidence"] = new_conf
|
|
write_page(page)
|
|
return changes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-restoration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def auto_restore_archived(dry_run: bool = False) -> list[Path]:
|
|
restored: list[Path] = []
|
|
archived = {
|
|
str(p.path.relative_to(ARCHIVE_DIR)): p
|
|
for p in iter_archived_pages()
|
|
if p.path.name != "index.md"
|
|
}
|
|
if not archived:
|
|
return restored
|
|
|
|
referenced: set[str] = set()
|
|
scan_targets: list[Path] = [INDEX_FILE]
|
|
for sub in LIVE_CONTENT_DIRS:
|
|
scan_targets.extend((WIKI_DIR / sub).glob("*.md"))
|
|
if CONVERSATIONS_DIR.exists():
|
|
for project_dir in CONVERSATIONS_DIR.iterdir():
|
|
if project_dir.is_dir():
|
|
scan_targets.extend(project_dir.glob("*.md"))
|
|
|
|
for t in scan_targets:
|
|
try:
|
|
text = t.read_text()
|
|
except OSError:
|
|
continue
|
|
for rel_archive in archived:
|
|
if rel_archive in text or f"archive/{rel_archive}" in text:
|
|
referenced.add(rel_archive)
|
|
|
|
for rel_archive, page in archived.items():
|
|
if rel_archive in referenced:
|
|
restored_path = restore_page(page, dry_run=dry_run)
|
|
if restored_path:
|
|
restored.append(restored_path)
|
|
return restored
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Orphan detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def find_orphan_pages() -> list[WikiPage]:
|
|
"""Pages with no inbound link from index.md or any other wiki page."""
|
|
all_pages = iter_live_pages()
|
|
all_text = []
|
|
if INDEX_FILE.exists():
|
|
all_text.append(INDEX_FILE.read_text())
|
|
for p in all_pages:
|
|
all_text.append(p.path.read_text())
|
|
combined = "\n".join(all_text)
|
|
|
|
orphans: list[WikiPage] = []
|
|
for page in all_pages:
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
# A page that only appears in its own file isn't linked
|
|
own_count = page.path.read_text().count(rel)
|
|
total = combined.count(rel)
|
|
if total - own_count == 0:
|
|
orphans.append(page)
|
|
return orphans
|
|
|
|
|
|
def fix_orphan_page(page: WikiPage, dry_run: bool = False) -> bool:
|
|
"""Add the page to index.md under its section. Returns True if fixed."""
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
title = str(page.frontmatter.get("title", page.path.stem))
|
|
# Use first non-heading non-empty body line as summary
|
|
summary = ""
|
|
for line in page.body.strip().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
summary = line[:120]
|
|
break
|
|
if dry_run:
|
|
print(f" [dry-run] add orphan to index: {rel}")
|
|
return True
|
|
_add_to_main_index(rel, title, summary)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Broken cross-references
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
LINK_REGEX = re.compile(r"\]\(((?:patterns|decisions|concepts|environments|archive)/[\w\-/]+\.md)\)")
|
|
RELATED_LINE_REGEX = re.compile(r"^\s*-\s*((?:patterns|decisions|concepts|environments)/[\w\-]+\.md)\s*$", re.MULTILINE)
|
|
|
|
|
|
def find_broken_cross_refs() -> list[tuple[Path, str, str | None]]:
|
|
"""Return list of (page_path, bad_link, suggested_fix_or_None).
|
|
|
|
`archived_paths` is keyed by the page's *original* live path (relative to
|
|
ARCHIVE_DIR, not WIKI_DIR) so we can directly check whether a broken live
|
|
link corresponds to an archived file at the same subpath.
|
|
"""
|
|
results: list[tuple[Path, str, str | None]] = []
|
|
live_names = {str(p.path.relative_to(WIKI_DIR)) for p in iter_live_pages()}
|
|
archived_paths = {str(p.path.relative_to(ARCHIVE_DIR)) for p in iter_archived_pages()}
|
|
|
|
scan: list[Path] = [INDEX_FILE]
|
|
for sub in LIVE_CONTENT_DIRS:
|
|
scan.extend((WIKI_DIR / sub).glob("*.md"))
|
|
|
|
for target in scan:
|
|
try:
|
|
text = target.read_text()
|
|
except OSError:
|
|
continue
|
|
seen: set[str] = set()
|
|
for link in LINK_REGEX.findall(text):
|
|
if link in seen:
|
|
continue
|
|
seen.add(link)
|
|
if link in live_names:
|
|
continue
|
|
if link in archived_paths:
|
|
# Reference to archive → trigger restore
|
|
results.append((target, link, f"__RESTORE__:{link}"))
|
|
continue
|
|
# Fuzzy match
|
|
suggestion = fuzzy_find_page(link, live_names)
|
|
results.append((target, link, suggestion))
|
|
# Also bare references in `related:`
|
|
for m in RELATED_LINE_REGEX.finditer(text):
|
|
link = m.group(1)
|
|
if link in seen or link in live_names:
|
|
continue
|
|
seen.add(link)
|
|
if link in archived_paths:
|
|
results.append((target, link, f"__RESTORE__:{link}"))
|
|
continue
|
|
results.append((target, link, fuzzy_find_page(link, live_names)))
|
|
return results
|
|
|
|
|
|
def fuzzy_find_page(bad_link: str, candidates: set[str]) -> str | None:
|
|
"""Use difflib to find the closest valid page path."""
|
|
matches = difflib.get_close_matches(bad_link, list(candidates), n=1, cutoff=0.75)
|
|
return matches[0] if matches else None
|
|
|
|
|
|
def fix_broken_cross_ref(target: Path, bad_link: str, suggested: str, dry_run: bool = False) -> bool:
|
|
if suggested.startswith("__RESTORE__:"):
|
|
archived_rel = suggested.split(":", 1)[1]
|
|
archived_page = parse_page(ARCHIVE_DIR / archived_rel)
|
|
if archived_page and not dry_run:
|
|
restore_page(archived_page)
|
|
return True
|
|
if dry_run:
|
|
print(f" [dry-run] fix {target.relative_to(WIKI_DIR)}: {bad_link} → {suggested}")
|
|
return True
|
|
text = target.read_text()
|
|
new_text = text.replace(f"]({bad_link})", f"]({suggested})")
|
|
new_text = re.sub(
|
|
rf"^(\s*-\s*){re.escape(bad_link)}$",
|
|
rf"\g<1>{suggested}",
|
|
new_text,
|
|
flags=re.MULTILINE,
|
|
)
|
|
if new_text != text:
|
|
target.write_text(new_text)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Index drift
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def find_index_drift() -> tuple[list[str], list[str]]:
|
|
"""Return (missing_from_index, stale_index_entries)."""
|
|
disk_pages = {str(p.path.relative_to(WIKI_DIR)) for p in iter_live_pages()}
|
|
indexed: set[str] = set()
|
|
if INDEX_FILE.exists():
|
|
for link in LINK_REGEX.findall(INDEX_FILE.read_text()):
|
|
indexed.add(link)
|
|
missing = sorted(disk_pages - indexed)
|
|
stale = sorted(indexed - disk_pages - {p for p in indexed if p.startswith("archive/")})
|
|
return missing, stale
|
|
|
|
|
|
def fix_index_drift(missing: list[str], stale: list[str], dry_run: bool = False) -> None:
|
|
for rel in missing:
|
|
page = parse_page(WIKI_DIR / rel)
|
|
if not page:
|
|
continue
|
|
title = str(page.frontmatter.get("title", page.path.stem))
|
|
summary = ""
|
|
for line in page.body.strip().splitlines():
|
|
line = line.strip()
|
|
if line and not line.startswith("#"):
|
|
summary = line[:120]
|
|
break
|
|
if dry_run:
|
|
print(f" [dry-run] add to index: {rel}")
|
|
else:
|
|
_add_to_main_index(rel, title, summary)
|
|
for rel in stale:
|
|
if dry_run:
|
|
print(f" [dry-run] remove from index: {rel}")
|
|
else:
|
|
_remove_from_main_index(rel)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Empty stubs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def find_empty_stubs() -> list[WikiPage]:
|
|
stubs: list[WikiPage] = []
|
|
for page in iter_live_pages():
|
|
body_text = re.sub(r"^#+\s+.*$", "", page.body, flags=re.MULTILINE).strip()
|
|
if len(body_text) < EMPTY_STUB_THRESHOLD:
|
|
stubs.append(page)
|
|
return stubs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State drift
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def find_state_drift() -> list[str]:
|
|
issues: list[str] = []
|
|
|
|
# .mine-state.json → output_file existence
|
|
if MINE_STATE_FILE.exists():
|
|
try:
|
|
mine = json.load(open(MINE_STATE_FILE))
|
|
for sid, info in mine.get("sessions", {}).items():
|
|
out = info.get("output_file")
|
|
if out:
|
|
out_path = WIKI_DIR / out
|
|
if not out_path.exists():
|
|
issues.append(f"mine: session {sid[:8]} references missing {out}")
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
issues.append(f"mine: could not parse .mine-state.json ({e})")
|
|
|
|
# .harvest-state.json → raw_file / wiki_pages existence
|
|
if HARVEST_STATE_FILE.exists():
|
|
try:
|
|
harvest = json.load(open(HARVEST_STATE_FILE))
|
|
for url, info in harvest.get("harvested_urls", {}).items():
|
|
raw = info.get("raw_file")
|
|
if raw and not (WIKI_DIR / raw).exists():
|
|
issues.append(f"harvest: {url[:60]} → missing raw file {raw}")
|
|
for wiki_page in info.get("wiki_pages", []):
|
|
if wiki_page and not (WIKI_DIR / wiki_page).exists():
|
|
issues.append(f"harvest: {url[:60]} → missing wiki page {wiki_page}")
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
issues.append(f"harvest: could not parse .harvest-state.json ({e})")
|
|
|
|
# .hygiene-state.json → pages_checked existence
|
|
if HYGIENE_STATE_FILE.exists():
|
|
try:
|
|
h = json.load(open(HYGIENE_STATE_FILE))
|
|
for rel in h.get("pages_checked", {}):
|
|
if not (WIKI_DIR / rel).exists() and not (ARCHIVE_DIR / rel).exists():
|
|
issues.append(f"hygiene: pages_checked references missing {rel}")
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
issues.append(f"hygiene: could not parse .hygiene-state.json ({e})")
|
|
|
|
return issues
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Staging / archive index sync
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def sync_staging_index(dry_run: bool = False) -> bool:
|
|
"""Regenerate staging/index.md from disk. Returns True if any change was needed."""
|
|
if not STAGING_DIR.exists():
|
|
return False
|
|
pending = [p for p in iter_staging_pages() if p.path.name != "index.md"]
|
|
expected = _build_staging_index(pending)
|
|
if STAGING_INDEX.exists():
|
|
current = STAGING_INDEX.read_text()
|
|
if current == expected:
|
|
return False
|
|
if dry_run:
|
|
print(" [dry-run] staging/index.md would be regenerated")
|
|
return True
|
|
STAGING_DIR.mkdir(parents=True, exist_ok=True)
|
|
STAGING_INDEX.write_text(expected)
|
|
return True
|
|
|
|
|
|
def _build_staging_index(pending: list[WikiPage]) -> str:
|
|
lines = [
|
|
"# Staging — Pending Wiki Content",
|
|
"",
|
|
"Content awaiting human review. These pages were generated by automated scripts",
|
|
"and need approval before joining the live wiki.",
|
|
"",
|
|
"**Review options**:",
|
|
"- Browse in Obsidian and move files manually (then run `scripts/wiki-staging.py --sync`)",
|
|
"- Run `python3 scripts/wiki-staging.py --list` for a summary",
|
|
"- Start a Claude session: \"let's review what's in staging\"",
|
|
"",
|
|
f"**{len(pending)} pending item(s)** as of {today().isoformat()}",
|
|
"",
|
|
"## Pending Items",
|
|
"",
|
|
]
|
|
if not pending:
|
|
lines.append("_No pending items._")
|
|
else:
|
|
lines.append("| Page | Type | Source | Staged | Target |")
|
|
lines.append("|------|------|--------|--------|--------|")
|
|
for p in pending:
|
|
fm = p.frontmatter
|
|
title = fm.get("title", p.path.stem)
|
|
rel = str(p.path.relative_to(STAGING_DIR))
|
|
ptype = fm.get("type", "unknown")
|
|
staged_by = fm.get("staged_by", "unknown")
|
|
staged = fm.get("staged_date", "—")
|
|
target = fm.get("target_path", rel)
|
|
lines.append(f"| [{title}]({rel}) | {ptype} | {staged_by} | {staged} | `{target}` |")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def sync_archive_index(dry_run: bool = False) -> bool:
|
|
"""Rebuild archive/index.md from disk if out of sync. Returns True if changed."""
|
|
if not ARCHIVE_DIR.exists():
|
|
return False
|
|
archived = [p for p in iter_archived_pages() if p.path.name != "index.md"]
|
|
expected = _build_archive_index(archived)
|
|
if ARCHIVE_INDEX.exists():
|
|
if ARCHIVE_INDEX.read_text() == expected:
|
|
return False
|
|
if dry_run:
|
|
print(" [dry-run] archive/index.md would be regenerated")
|
|
return True
|
|
ARCHIVE_INDEX.write_text(expected)
|
|
return True
|
|
|
|
|
|
def _build_archive_index(archived: list[WikiPage]) -> str:
|
|
lines = [
|
|
"# Archived Wiki Pages",
|
|
"",
|
|
"Pages archived due to staleness or obsolescence. Excluded from default",
|
|
"wiki searches but available via `qmd search \"topic\" -c wiki-archive`.",
|
|
"",
|
|
"## Archived Pages",
|
|
"",
|
|
"| Page | Original Location | Archived | Reason |",
|
|
"|------|-------------------|----------|--------|",
|
|
]
|
|
if not archived:
|
|
lines.append("| _(none yet)_ | | | |")
|
|
else:
|
|
for p in archived:
|
|
fm = p.frontmatter
|
|
name = p.path.stem.replace("-", " ").title()
|
|
rel = str(p.path.relative_to(ARCHIVE_DIR))
|
|
original = fm.get("original_path", rel)
|
|
archived_date = fm.get("archived_date", "—")
|
|
reason = fm.get("archived_reason", "—")
|
|
lines.append(f"| [{name}]({rel}) | {original} | {archived_date} | {reason} |")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM helpers (full mode)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def call_claude(prompt: str, model: str = CLAUDE_HAIKU) -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["claude", "-p", "--model", model, "--output-format", "text", prompt],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=CLAUDE_TIMEOUT,
|
|
)
|
|
except FileNotFoundError:
|
|
print(" [warn] claude CLI not found", file=sys.stderr)
|
|
return None
|
|
except subprocess.TimeoutExpired:
|
|
print(" [warn] claude -p timed out", file=sys.stderr)
|
|
return None
|
|
if result.returncode != 0:
|
|
print(f" [warn] claude -p failed: {result.stderr.strip()[:200]}", file=sys.stderr)
|
|
return None
|
|
return result.stdout.strip()
|
|
|
|
|
|
def _extract_json(text: str) -> Any:
|
|
match = re.search(r"(\{.*\}|\[.*\])", text, re.DOTALL)
|
|
if not match:
|
|
return None
|
|
try:
|
|
return json.loads(match.group(0))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _page_header_snippet(page: WikiPage) -> str:
|
|
"""Short representation of a page for LLM prompts: rel path + title + first paragraph."""
|
|
rel = str(page.path.relative_to(WIKI_DIR))
|
|
title = str(page.frontmatter.get("title", page.path.stem))
|
|
first_para = ""
|
|
for line in page.body.strip().splitlines():
|
|
line = line.strip()
|
|
if line and not line.startswith("#"):
|
|
first_para = line[:400]
|
|
break
|
|
return f"`{rel}` — {title}\n{first_para}"
|
|
|
|
|
|
def find_missing_cross_refs_llm(pages: list[WikiPage]) -> list[tuple[Path, list[str]]]:
|
|
"""For each page, ask haiku which other pages it should link to.
|
|
|
|
Returns list of (page_path, suggested_rel_paths).
|
|
"""
|
|
if not pages:
|
|
return []
|
|
# Use index.md as the catalog of candidates
|
|
try:
|
|
index_text = INDEX_FILE.read_text()[:10_000]
|
|
except OSError:
|
|
return []
|
|
|
|
results: list[tuple[Path, list[str]]] = []
|
|
# Batch 5 pages per call
|
|
for i in range(0, len(pages), 5):
|
|
batch = pages[i : i + 5]
|
|
batch_text = "\n\n".join(
|
|
f"### PAGE {n+1}: {str(p.path.relative_to(WIKI_DIR))}\n"
|
|
f"title: {p.frontmatter.get('title', p.path.stem)}\n"
|
|
f"current related: {p.frontmatter.get('related', [])}\n"
|
|
f"first paragraph:\n{_page_header_snippet(p)}"
|
|
for n, p in enumerate(batch)
|
|
)
|
|
prompt = (
|
|
"You are reviewing wiki pages for missing cross-references. For each PAGE below, "
|
|
"identify OTHER wiki pages it should link to but currently doesn't. Only suggest "
|
|
"pages listed in the INDEX. Be conservative — only suggest strong topical matches.\n\n"
|
|
"Emit a single JSON object mapping the page's relative path to an array of relative "
|
|
"paths it should link to. Omit pages with no suggestions. No prose.\n\n"
|
|
f"### INDEX\n{index_text}\n\n"
|
|
f"### PAGES TO REVIEW\n{batch_text}\n"
|
|
)
|
|
raw = call_claude(prompt, model=CLAUDE_HAIKU)
|
|
if not raw:
|
|
continue
|
|
data = _extract_json(raw)
|
|
if not isinstance(data, dict):
|
|
continue
|
|
for p in batch:
|
|
rel = str(p.path.relative_to(WIKI_DIR))
|
|
suggestions = data.get(rel)
|
|
if isinstance(suggestions, list) and suggestions:
|
|
# Filter out pages already in related
|
|
existing = set(str(x) for x in (p.frontmatter.get("related") or []))
|
|
new = [s for s in suggestions if s not in existing and s != rel]
|
|
if new:
|
|
results.append((p.path, new))
|
|
return results
|
|
|
|
|
|
def find_duplicates_llm(pages: list[WikiPage]) -> list[tuple[Path, Path, str]]:
|
|
"""First pass (no LLM) groups by keyword overlap; second pass (sonnet) confirms duplicates.
|
|
|
|
Returns list of (weaker_path, stronger_path, reason).
|
|
"""
|
|
if len(pages) < 2:
|
|
return []
|
|
|
|
# Group pages by type
|
|
by_type: dict[str, list[WikiPage]] = {}
|
|
for p in pages:
|
|
t = str(p.frontmatter.get("type", ""))
|
|
by_type.setdefault(t, []).append(p)
|
|
|
|
candidates: list[tuple[WikiPage, WikiPage]] = []
|
|
for type_pages in by_type.values():
|
|
for i, a in enumerate(type_pages):
|
|
a_words = _title_keywords(a)
|
|
for b in type_pages[i + 1 :]:
|
|
overlap = a_words & _title_keywords(b)
|
|
if len(overlap) >= 2:
|
|
candidates.append((a, b))
|
|
|
|
results: list[tuple[Path, Path, str]] = []
|
|
for a, b in candidates[:10]: # cap to control LLM cost
|
|
prompt = (
|
|
"Are these two wiki pages duplicates (substantially the same topic)?\n\n"
|
|
f"### PAGE A: {a.path.relative_to(WIKI_DIR)}\n{a.body[:3000]}\n\n"
|
|
f"### PAGE B: {b.path.relative_to(WIKI_DIR)}\n{b.body[:3000]}\n\n"
|
|
"Emit a single JSON object: "
|
|
'{\"duplicate\": true|false, \"stronger\": \"A\"|\"B\", \"reason\": \"...\"}. '
|
|
"No prose."
|
|
)
|
|
raw = call_claude(prompt, model=CLAUDE_SONNET)
|
|
data = _extract_json(raw or "")
|
|
if isinstance(data, dict) and data.get("duplicate"):
|
|
stronger = data.get("stronger", "A")
|
|
reason = str(data.get("reason", ""))
|
|
if stronger == "A":
|
|
results.append((b.path, a.path, reason))
|
|
else:
|
|
results.append((a.path, b.path, reason))
|
|
return results
|
|
|
|
|
|
def _title_keywords(page: WikiPage) -> set[str]:
|
|
title = str(page.frontmatter.get("title", page.path.stem)).lower()
|
|
return {w for w in re.split(r"[^a-z0-9]+", title) if len(w) > 3}
|
|
|
|
|
|
def find_contradictions_llm(pages: list[WikiPage]) -> list[tuple[Path, Path, str]]:
|
|
"""Report-only — pair up related pages and ask sonnet to find conflicting claims."""
|
|
# Focus on decisions/ and patterns/
|
|
focus = [p for p in pages if str(p.frontmatter.get("type")) in ("decision", "pattern")]
|
|
if len(focus) < 2:
|
|
return []
|
|
|
|
# Build candidate pairs from shared related: links
|
|
by_path = {str(p.path.relative_to(WIKI_DIR)): p for p in focus}
|
|
candidates: list[tuple[WikiPage, WikiPage]] = []
|
|
seen_pairs: set[tuple[str, str]] = set()
|
|
for p in focus:
|
|
related = p.frontmatter.get("related") or []
|
|
if not isinstance(related, list):
|
|
continue
|
|
for rel_link in related:
|
|
other = by_path.get(str(rel_link))
|
|
if not other:
|
|
continue
|
|
key = tuple(sorted([str(p.path), str(other.path)]))
|
|
if key in seen_pairs:
|
|
continue
|
|
seen_pairs.add(key)
|
|
candidates.append((p, other))
|
|
|
|
results: list[tuple[Path, Path, str]] = []
|
|
for a, b in candidates[:8]: # cap
|
|
prompt = (
|
|
"Compare these two wiki pages for contradictions in their claims or recommendations. "
|
|
"Only flag genuine contradictions, not complementary content.\n\n"
|
|
f"### PAGE A: {a.path.relative_to(WIKI_DIR)}\n{a.body[:3000]}\n\n"
|
|
f"### PAGE B: {b.path.relative_to(WIKI_DIR)}\n{b.body[:3000]}\n\n"
|
|
"Emit a single JSON object: "
|
|
'{\"contradiction\": true|false, \"description\": \"...\"}. No prose.'
|
|
)
|
|
raw = call_claude(prompt, model=CLAUDE_SONNET)
|
|
data = _extract_json(raw or "")
|
|
if isinstance(data, dict) and data.get("contradiction"):
|
|
results.append((a.path, b.path, str(data.get("description", ""))))
|
|
return results
|
|
|
|
|
|
def find_tech_lifecycle_issues() -> list[tuple[Path, str]]:
|
|
"""Flag pages mentioning outdated versions when newer ones appear in recent conversations."""
|
|
page_versions: dict[Path, dict[str, str]] = {}
|
|
for page in iter_live_pages():
|
|
versions = {}
|
|
for m in VERSION_REGEX.finditer(page.body):
|
|
tool = m.group(0).split()[0].lower()
|
|
versions[tool] = m.group(1)
|
|
if versions:
|
|
page_versions[page.path] = versions
|
|
|
|
if not CONVERSATIONS_DIR.exists():
|
|
return []
|
|
|
|
# Scan recent conversations (last 90 days)
|
|
recent_versions: dict[str, str] = {}
|
|
cutoff = today() - __import__("datetime").timedelta(days=90)
|
|
for project_dir in CONVERSATIONS_DIR.iterdir():
|
|
if not project_dir.is_dir():
|
|
continue
|
|
for md in project_dir.glob("*.md"):
|
|
page = parse_page(md)
|
|
if not page:
|
|
continue
|
|
d = parse_date(page.frontmatter.get("date"))
|
|
if not d or d < cutoff:
|
|
continue
|
|
for m in VERSION_REGEX.finditer(page.body):
|
|
tool = m.group(0).split()[0].lower()
|
|
ver = m.group(1)
|
|
if tool not in recent_versions or _version_gt(ver, recent_versions[tool]):
|
|
recent_versions[tool] = ver
|
|
|
|
results: list[tuple[Path, str]] = []
|
|
for path, versions in page_versions.items():
|
|
for tool, page_ver in versions.items():
|
|
recent = recent_versions.get(tool)
|
|
if recent and _version_gt(recent, page_ver):
|
|
results.append((path, f"{tool} {page_ver} in page; {recent} in recent conversations"))
|
|
break # one flag per page is enough
|
|
return results
|
|
|
|
|
|
def _version_gt(a: str, b: str) -> bool:
|
|
try:
|
|
ap = [int(x) for x in a.split(".")]
|
|
bp = [int(x) for x in b.split(".")]
|
|
return ap > bp
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class HygieneReport:
|
|
# Quick-mode fields
|
|
backfilled: list[tuple[Path, str, date]] = field(default_factory=list)
|
|
refreshed: list[tuple[Path, str, str, date]] = field(default_factory=list)
|
|
decayed: list[tuple[Path, str, str]] = field(default_factory=list)
|
|
archived: list[tuple[Path, str]] = field(default_factory=list)
|
|
restored: list[Path] = field(default_factory=list)
|
|
frontmatter_fixes: list[tuple[Path, list[str]]] = field(default_factory=list)
|
|
orphans_fixed: list[Path] = field(default_factory=list)
|
|
orphans_unfixed: list[Path] = field(default_factory=list)
|
|
xrefs_fixed: list[tuple[Path, str, str]] = field(default_factory=list)
|
|
xrefs_unfixed: list[tuple[Path, str]] = field(default_factory=list)
|
|
index_drift_added: list[str] = field(default_factory=list)
|
|
index_drift_removed: list[str] = field(default_factory=list)
|
|
staging_synced: bool = False
|
|
archive_synced: bool = False
|
|
# Report-only
|
|
empty_stubs: list[Path] = field(default_factory=list)
|
|
state_drift: list[str] = field(default_factory=list)
|
|
# Full-mode fields
|
|
missing_xrefs: list[tuple[Path, list[str]]] = field(default_factory=list)
|
|
duplicates: list[tuple[Path, Path, str]] = field(default_factory=list)
|
|
contradictions: list[tuple[Path, Path, str]] = field(default_factory=list)
|
|
tech_lifecycle: list[tuple[Path, str]] = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hygiene orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def run_quick_hygiene(dry_run: bool = False, check_only: bool = False) -> HygieneReport:
|
|
report = HygieneReport()
|
|
apply = not (dry_run or check_only)
|
|
|
|
print("[quick] backfilling missing last_verified")
|
|
report.backfilled = backfill_last_verified(dry_run=not apply)
|
|
|
|
print("[quick] scanning conversation references")
|
|
refs = scan_conversation_references()
|
|
report.refreshed = apply_refresh_signals(refs, dry_run=not apply)
|
|
|
|
print("[quick] auto-restoring archived pages referenced again")
|
|
report.restored = auto_restore_archived(dry_run=not apply)
|
|
|
|
print("[quick] repairing frontmatter")
|
|
report.frontmatter_fixes = repair_frontmatter(dry_run=not apply)
|
|
|
|
print("[quick] applying confidence decay")
|
|
for page in iter_live_pages():
|
|
current = str(page.frontmatter.get("confidence", "medium"))
|
|
last_verified = parse_date(page.frontmatter.get("last_verified"))
|
|
is_superseded = bool(re.search(r"superseded by", str(page.frontmatter.get("status", "")), re.IGNORECASE))
|
|
expected = expected_confidence(current, last_verified, is_superseded)
|
|
if expected != current:
|
|
report.decayed.append((page.path, current, expected))
|
|
if apply:
|
|
page.frontmatter["confidence"] = expected
|
|
write_page(page)
|
|
|
|
print("[quick] archiving stale and superseded pages")
|
|
for page in iter_live_pages():
|
|
conf = str(page.frontmatter.get("confidence", "medium"))
|
|
status_val = str(page.frontmatter.get("status", ""))
|
|
is_superseded = bool(re.search(r"superseded by", status_val, re.IGNORECASE))
|
|
last_verified = parse_date(page.frontmatter.get("last_verified"))
|
|
if is_superseded:
|
|
reason = "Explicitly superseded"
|
|
if apply:
|
|
archive_page(page, reason)
|
|
report.archived.append((page.path, reason))
|
|
continue
|
|
if conf == "stale":
|
|
days = (today() - last_verified).days if last_verified else -1
|
|
reason = f"Confidence decayed to stale — no references in {days} days"
|
|
if apply:
|
|
archive_page(page, reason)
|
|
report.archived.append((page.path, reason))
|
|
|
|
print("[quick] checking index drift")
|
|
missing, stale_entries = find_index_drift()
|
|
report.index_drift_added = missing
|
|
report.index_drift_removed = stale_entries
|
|
if apply and (missing or stale_entries):
|
|
fix_index_drift(missing, stale_entries)
|
|
|
|
print("[quick] checking for orphan pages")
|
|
orphans = find_orphan_pages()
|
|
for o in orphans:
|
|
if apply:
|
|
fix_orphan_page(o)
|
|
report.orphans_fixed.append(o.path)
|
|
else:
|
|
report.orphans_unfixed.append(o.path)
|
|
|
|
print("[quick] checking for broken cross-references")
|
|
broken = find_broken_cross_refs()
|
|
for target, bad, suggested in broken:
|
|
if suggested is None:
|
|
report.xrefs_unfixed.append((target, bad))
|
|
else:
|
|
if apply:
|
|
fix_broken_cross_ref(target, bad, suggested)
|
|
report.xrefs_fixed.append((target, bad, suggested))
|
|
|
|
print("[quick] checking for empty stubs")
|
|
report.empty_stubs = [p.path for p in find_empty_stubs()]
|
|
|
|
print("[quick] checking state drift")
|
|
report.state_drift = find_state_drift()
|
|
|
|
print("[quick] syncing staging/archive indexes")
|
|
report.staging_synced = sync_staging_index(dry_run=not apply)
|
|
report.archive_synced = sync_archive_index(dry_run=not apply)
|
|
|
|
# Update hygiene state
|
|
if apply:
|
|
state = load_hygiene_state()
|
|
state["last_quick_run"] = datetime.now(timezone.utc).isoformat()
|
|
for page in iter_live_pages():
|
|
mark_page_checked(state, page, "quick")
|
|
save_hygiene_state(state)
|
|
|
|
return report
|
|
|
|
|
|
def run_full_hygiene(dry_run: bool = False, check_only: bool = False) -> HygieneReport:
|
|
"""Quick hygiene + LLM-powered checks."""
|
|
print("[full] running quick hygiene first")
|
|
report = run_quick_hygiene(dry_run=dry_run, check_only=check_only)
|
|
|
|
apply = not (dry_run or check_only)
|
|
|
|
# Only check pages that changed since last full run
|
|
state = load_hygiene_state()
|
|
all_pages = iter_live_pages()
|
|
changed_pages = [p for p in all_pages if page_changed_since(state, p, "full")]
|
|
print(f"[full] {len(changed_pages)}/{len(all_pages)} pages changed since last full run")
|
|
|
|
print("[full] checking missing cross-references (haiku)")
|
|
report.missing_xrefs = find_missing_cross_refs_llm(changed_pages)
|
|
if apply:
|
|
for path, suggestions in report.missing_xrefs:
|
|
page = parse_page(path)
|
|
if not page:
|
|
continue
|
|
existing = list(page.frontmatter.get("related") or [])
|
|
for s in suggestions:
|
|
if s not in existing:
|
|
existing.append(s)
|
|
page.frontmatter["related"] = existing
|
|
write_page(page)
|
|
|
|
print("[full] checking for duplicate coverage (sonnet)")
|
|
report.duplicates = find_duplicates_llm(all_pages)
|
|
if apply:
|
|
for weaker, stronger, reason in report.duplicates:
|
|
wp = parse_page(weaker)
|
|
if wp:
|
|
archive_page(wp, f"Merged into {stronger.relative_to(WIKI_DIR)} — {reason}")
|
|
|
|
print("[full] checking for contradictions (sonnet) — report-only")
|
|
report.contradictions = find_contradictions_llm(all_pages)
|
|
|
|
print("[full] checking technology lifecycle")
|
|
report.tech_lifecycle = find_tech_lifecycle_issues()
|
|
|
|
if apply:
|
|
state["last_full_run"] = datetime.now(timezone.utc).isoformat()
|
|
for page in iter_live_pages():
|
|
mark_page_checked(state, page, "full")
|
|
save_hygiene_state(state)
|
|
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Report writers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def write_fixed_report(report: HygieneReport, mode: str, dry_run: bool) -> Path:
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
suffix = "-dry-run" if dry_run else "-fixed"
|
|
path = REPORTS_DIR / f"hygiene-{today().isoformat()}{suffix}.md"
|
|
|
|
lines = [
|
|
f"# Hygiene Report — Auto-Fixed ({today().isoformat()})",
|
|
"",
|
|
f"Mode: {mode}{' (dry-run)' if dry_run else ''}",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
f"- Backfilled last_verified: {len(report.backfilled)}",
|
|
f"- Refreshed from conversations: {len(report.refreshed)}",
|
|
f"- Frontmatter repairs: {len(report.frontmatter_fixes)}",
|
|
f"- Orphans linked: {len(report.orphans_fixed)}",
|
|
f"- Broken xrefs fixed: {len(report.xrefs_fixed)}",
|
|
f"- Index drift — added: {len(report.index_drift_added)}, removed: {len(report.index_drift_removed)}",
|
|
f"- Decayed: {len(report.decayed)}",
|
|
f"- Archived: {len(report.archived)}",
|
|
f"- Restored: {len(report.restored)}",
|
|
f"- Staging index resynced: {report.staging_synced}",
|
|
f"- Archive index resynced: {report.archive_synced}",
|
|
]
|
|
if mode == "full":
|
|
lines.extend([
|
|
f"- Missing xrefs added: {len(report.missing_xrefs)}",
|
|
f"- Duplicates merged: {len(report.duplicates)}",
|
|
])
|
|
lines.append("")
|
|
|
|
def _section(title: str, rows: list[str]) -> None:
|
|
if not rows:
|
|
return
|
|
lines.append(f"## {title}")
|
|
lines.append("")
|
|
lines.extend(rows)
|
|
lines.append("")
|
|
|
|
_section(
|
|
"Backfilled last_verified",
|
|
[f"- `{p.relative_to(WIKI_DIR)}` ← {src} ({d.isoformat()})" for p, src, d in report.backfilled],
|
|
)
|
|
_section(
|
|
"Refreshed from conversations",
|
|
[
|
|
f"- `{p.relative_to(WIKI_DIR)}` confidence {old} → {new} (ref {d.isoformat()})"
|
|
for p, old, new, d in report.refreshed
|
|
],
|
|
)
|
|
_section(
|
|
"Frontmatter repairs",
|
|
[f"- `{p.relative_to(WIKI_DIR)}` — added: {', '.join(fields)}" for p, fields in report.frontmatter_fixes],
|
|
)
|
|
_section(
|
|
"Orphans linked",
|
|
[f"- `{p.relative_to(WIKI_DIR)}`" for p in report.orphans_fixed],
|
|
)
|
|
_section(
|
|
"Broken xrefs fixed",
|
|
[f"- `{t.relative_to(WIKI_DIR)}` {bad} → {new}" for t, bad, new in report.xrefs_fixed],
|
|
)
|
|
_section(
|
|
"Index drift — added",
|
|
[f"- `{p}`" for p in report.index_drift_added],
|
|
)
|
|
_section(
|
|
"Index drift — removed",
|
|
[f"- `{p}`" for p in report.index_drift_removed],
|
|
)
|
|
_section(
|
|
"Confidence decayed",
|
|
[f"- `{p.relative_to(WIKI_DIR)}` {old} → {new}" for p, old, new in report.decayed],
|
|
)
|
|
_section(
|
|
"Archived",
|
|
[f"- `{p.relative_to(WIKI_DIR)}` — {reason}" for p, reason in report.archived],
|
|
)
|
|
_section(
|
|
"Restored",
|
|
[f"- `{p.relative_to(WIKI_DIR)}`" for p in report.restored],
|
|
)
|
|
if mode == "full":
|
|
_section(
|
|
"Missing xrefs added",
|
|
[
|
|
f"- `{p.relative_to(WIKI_DIR)}` ← added: {', '.join(s)}"
|
|
for p, s in report.missing_xrefs
|
|
],
|
|
)
|
|
_section(
|
|
"Duplicates merged",
|
|
[
|
|
f"- `{w.relative_to(WIKI_DIR)}` → merged into `{s.relative_to(WIKI_DIR)}` ({r})"
|
|
for w, s, r in report.duplicates
|
|
],
|
|
)
|
|
|
|
path.write_text("\n".join(lines) + "\n")
|
|
return path
|
|
|
|
|
|
def write_needs_review_report(report: HygieneReport, mode: str) -> Path | None:
|
|
"""Write needs-review report if there's anything to review. Returns path or None."""
|
|
items: list[str] = []
|
|
|
|
if report.orphans_unfixed:
|
|
items.append("## Orphan pages (no inbound links)")
|
|
items.append("")
|
|
items.extend(f"- `{p.relative_to(WIKI_DIR)}`" for p in report.orphans_unfixed)
|
|
items.append("")
|
|
|
|
if report.xrefs_unfixed:
|
|
items.append("## Broken cross-references (no fuzzy match)")
|
|
items.append("")
|
|
items.extend(
|
|
f"- `{t.relative_to(WIKI_DIR)}` → missing link `{bad}`"
|
|
for t, bad in report.xrefs_unfixed
|
|
)
|
|
items.append("")
|
|
|
|
if report.empty_stubs:
|
|
items.append("## Empty stubs (body < 100 chars)")
|
|
items.append("")
|
|
items.extend(f"- `{p.relative_to(WIKI_DIR)}`" for p in report.empty_stubs)
|
|
items.append("")
|
|
|
|
if report.state_drift:
|
|
items.append("## State file drift")
|
|
items.append("")
|
|
items.extend(f"- {msg}" for msg in report.state_drift)
|
|
items.append("")
|
|
|
|
if mode == "full":
|
|
if report.contradictions:
|
|
items.append("## Contradictions (LLM-detected — human judgment required)")
|
|
items.append("")
|
|
for a, b, desc in report.contradictions:
|
|
items.append(f"### `{a.relative_to(WIKI_DIR)}` vs `{b.relative_to(WIKI_DIR)}`")
|
|
items.append("")
|
|
items.append(desc)
|
|
items.append("")
|
|
if report.tech_lifecycle:
|
|
items.append("## Technology lifecycle flags")
|
|
items.append("")
|
|
items.extend(
|
|
f"- `{p.relative_to(WIKI_DIR)}` — {note}"
|
|
for p, note in report.tech_lifecycle
|
|
)
|
|
items.append("")
|
|
|
|
if not items:
|
|
return None
|
|
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
path = REPORTS_DIR / f"hygiene-{today().isoformat()}-needs-review.md"
|
|
header = [
|
|
f"# Hygiene Report — Needs Review ({today().isoformat()})",
|
|
"",
|
|
f"Mode: {mode}",
|
|
f"Items requiring attention: {sum(1 for line in items if line.startswith(('## ', '### ')))}",
|
|
"",
|
|
]
|
|
path.write_text("\n".join(header + items) + "\n")
|
|
return path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Wiki hygiene — quick and full modes")
|
|
mode = parser.add_mutually_exclusive_group()
|
|
mode.add_argument("--quick", action="store_true", help="Run the quick hygiene loop (default)")
|
|
mode.add_argument("--full", action="store_true", help="Run full hygiene (quick + LLM checks)")
|
|
mode.add_argument("--backfill", action="store_true", help="Only run the last_verified backfill")
|
|
mode.add_argument("--scan-refs", action="store_true", help="Only apply conversation refresh signals")
|
|
mode.add_argument("--archive", metavar="PATH", help="Manually archive a live page")
|
|
mode.add_argument("--restore", metavar="PATH", help="Manually restore an archived page")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would change without writing")
|
|
parser.add_argument("--check-only", action="store_true", help="Report issues without auto-fixing")
|
|
parser.add_argument("--reason", default="Manual archive", help="Reason for --archive")
|
|
args = parser.parse_args()
|
|
|
|
if args.backfill:
|
|
changes = backfill_last_verified(dry_run=args.dry_run)
|
|
for p, src, d in changes:
|
|
print(f" {p.relative_to(WIKI_DIR)} ← {src} ({d.isoformat()})")
|
|
print(f"\n{len(changes)} page(s) backfilled")
|
|
return 0
|
|
|
|
if args.scan_refs:
|
|
refs = scan_conversation_references()
|
|
print(f"Found references to {len(refs)} wiki page(s)")
|
|
changes = apply_refresh_signals(refs, dry_run=args.dry_run)
|
|
for p, old, new, d in changes:
|
|
print(f" {p.relative_to(WIKI_DIR)} {old}→{new} ({d.isoformat()})")
|
|
print(f"\n{len(changes)} page(s) refreshed")
|
|
return 0
|
|
|
|
if args.archive:
|
|
path = Path(args.archive)
|
|
if not path.is_absolute():
|
|
path = WIKI_DIR / path
|
|
page = parse_page(path)
|
|
if not page:
|
|
print(f"Cannot parse page: {path}", file=sys.stderr)
|
|
return 1
|
|
archive_page(page, args.reason, dry_run=args.dry_run)
|
|
return 0
|
|
|
|
if args.restore:
|
|
path = Path(args.restore)
|
|
if not path.is_absolute():
|
|
path = WIKI_DIR / path
|
|
page = parse_page(path)
|
|
if not page:
|
|
print(f"Cannot parse page: {path}", file=sys.stderr)
|
|
return 1
|
|
restore_page(page, dry_run=args.dry_run)
|
|
return 0
|
|
|
|
# Default: quick or full hygiene loop
|
|
mode_name = "full" if args.full else "quick"
|
|
if args.full:
|
|
report = run_full_hygiene(dry_run=args.dry_run, check_only=args.check_only)
|
|
else:
|
|
report = run_quick_hygiene(dry_run=args.dry_run, check_only=args.check_only)
|
|
|
|
fixed_path = write_fixed_report(report, mode_name, args.dry_run)
|
|
review_path = write_needs_review_report(report, mode_name)
|
|
|
|
print(f"\nFixed report: {fixed_path.relative_to(WIKI_DIR)}")
|
|
if review_path:
|
|
print(f"Needs-review report: {review_path.relative_to(WIKI_DIR)}")
|
|
else:
|
|
print("No items need human review.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|