A compounding LLM-maintained knowledge wiki. Synthesis of Andrej Karpathy's persistent-wiki gist and milla-jovovich's mempalace, with an automation layer on top for conversation mining, URL harvesting, human-in-the-loop staging, staleness decay, and hygiene. Includes: - 11 pipeline scripts (extract, summarize, index, harvest, stage, hygiene, maintain, sync, + shared library) - Full docs: README, SETUP, ARCHITECTURE, DESIGN-RATIONALE, CUSTOMIZE - Example CLAUDE.md files (wiki schema + global instructions) tuned for the three-collection qmd setup - 171-test pytest suite (cross-platform, runs in ~1.3s) - MIT licensed
879 lines
28 KiB
Python
Executable File
879 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Harvest external reference URLs from summarized conversations into the wiki.
|
|
|
|
Scans summarized conversation transcripts for URLs, classifies them, fetches
|
|
the content, stores the raw source under raw/harvested/, and optionally calls
|
|
`claude -p` to compile each raw file into a staging/ wiki page.
|
|
|
|
Usage:
|
|
python3 scripts/wiki-harvest.py # Process all summarized conversations
|
|
python3 scripts/wiki-harvest.py --project mc # One project only
|
|
python3 scripts/wiki-harvest.py --file PATH # One conversation file
|
|
python3 scripts/wiki-harvest.py --dry-run # Show what would be harvested
|
|
python3 scripts/wiki-harvest.py --no-compile # Fetch only, skip claude -p compile step
|
|
python3 scripts/wiki-harvest.py --limit 10 # Cap number of URLs processed
|
|
|
|
State is persisted in .harvest-state.json; existing URLs are deduplicated.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
# Force unbuffered output for pipe usage
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
sys.stderr.reconfigure(line_buffering=True)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
WIKI_DIR = Path(os.environ.get("WIKI_DIR", str(Path.home() / "projects" / "wiki")))
|
|
CONVERSATIONS_DIR = WIKI_DIR / "conversations"
|
|
RAW_HARVESTED_DIR = WIKI_DIR / "raw" / "harvested"
|
|
STAGING_DIR = WIKI_DIR / "staging"
|
|
INDEX_FILE = WIKI_DIR / "index.md"
|
|
CLAUDE_MD = WIKI_DIR / "CLAUDE.md"
|
|
HARVEST_STATE_FILE = WIKI_DIR / ".harvest-state.json"
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CONFIGURE ME — URL classification rules
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
#
|
|
# Type D: always skip. Add your own internal/ephemeral/personal domains here.
|
|
# Patterns use `re.search` so unanchored suffixes like `\.example\.com$` work.
|
|
# Private IPs (10.x, 172.16-31.x, 192.168.x, 127.x) are detected separately.
|
|
SKIP_DOMAIN_PATTERNS = [
|
|
# Generic: ephemeral / personal / chat / internal
|
|
r"\.atlassian\.net$",
|
|
r"^app\.asana\.com$",
|
|
r"^(www\.)?slack\.com$",
|
|
r"\.slack\.com$",
|
|
r"^(www\.)?discord\.com$",
|
|
r"^localhost$",
|
|
r"^0\.0\.0\.0$",
|
|
r"^mail\.google\.com$",
|
|
r"^calendar\.google\.com$",
|
|
r"^docs\.google\.com$",
|
|
r"^drive\.google\.com$",
|
|
r"^.+\.local$",
|
|
r"^.+\.internal$",
|
|
# Add your own internal domains below, for example:
|
|
# r"\.mycompany\.com$",
|
|
# r"^git\.mydomain\.com$",
|
|
]
|
|
|
|
# Type C — issue trackers / Q&A; only harvest if topic touches existing wiki
|
|
C_TYPE_URL_PATTERNS = [
|
|
r"^https?://github\.com/[^/]+/[^/]+/issues/\d+",
|
|
r"^https?://github\.com/[^/]+/[^/]+/pull/\d+",
|
|
r"^https?://github\.com/[^/]+/[^/]+/discussions/\d+",
|
|
r"^https?://(www\.)?stackoverflow\.com/questions/\d+",
|
|
r"^https?://(www\.)?serverfault\.com/questions/\d+",
|
|
r"^https?://(www\.)?superuser\.com/questions/\d+",
|
|
r"^https?://.+\.stackexchange\.com/questions/\d+",
|
|
]
|
|
|
|
# Asset/image extensions to filter out
|
|
ASSET_EXTENSIONS = {
|
|
".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico", ".bmp",
|
|
".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot",
|
|
".mp4", ".webm", ".mov", ".mp3", ".wav",
|
|
".zip", ".tar", ".gz", ".bz2",
|
|
}
|
|
|
|
# URL regex — HTTP(S), stops at whitespace, brackets, and common markdown delimiters
|
|
URL_REGEX = re.compile(
|
|
r"https?://[^\s<>\"')\]}\\|`]+",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Claude CLI models
|
|
CLAUDE_HAIKU_MODEL = "haiku"
|
|
CLAUDE_SONNET_MODEL = "sonnet"
|
|
SONNET_CONTENT_THRESHOLD = 20_000 # chars — larger than this → sonnet
|
|
|
|
# Fetch behavior
|
|
FETCH_DELAY_SECONDS = 2
|
|
MAX_FAILED_ATTEMPTS = 3
|
|
MIN_CONTENT_LENGTH = 100
|
|
FETCH_TIMEOUT = 45
|
|
|
|
# HTML-leak detection — content containing any of these is treated as a failed extraction
|
|
HTML_LEAK_MARKERS = ["<div", "<script", "<nav", "<header", "<footer"]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def load_state() -> dict[str, Any]:
|
|
defaults: dict[str, Any] = {
|
|
"harvested_urls": {},
|
|
"skipped_urls": {},
|
|
"failed_urls": {},
|
|
"rejected_urls": {},
|
|
"last_run": None,
|
|
}
|
|
if HARVEST_STATE_FILE.exists():
|
|
try:
|
|
with open(HARVEST_STATE_FILE) as f:
|
|
state = json.load(f)
|
|
for k, v in defaults.items():
|
|
state.setdefault(k, v)
|
|
return state
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
return defaults
|
|
|
|
|
|
def save_state(state: dict[str, Any]) -> None:
|
|
state["last_run"] = datetime.now(timezone.utc).isoformat()
|
|
tmp = HARVEST_STATE_FILE.with_suffix(".json.tmp")
|
|
with open(tmp, "w") as f:
|
|
json.dump(state, f, indent=2, sort_keys=True)
|
|
tmp.replace(HARVEST_STATE_FILE)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# URL extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def extract_urls_from_file(file_path: Path) -> list[str]:
|
|
"""Extract all HTTP(S) URLs from a conversation markdown file.
|
|
|
|
Filters:
|
|
- Asset URLs (images, CSS, JS, fonts, media, archives)
|
|
- URLs shorter than 20 characters
|
|
- Duplicates within the same file
|
|
"""
|
|
try:
|
|
text = file_path.read_text(errors="replace")
|
|
except OSError:
|
|
return []
|
|
|
|
seen: set[str] = set()
|
|
urls: list[str] = []
|
|
|
|
for match in URL_REGEX.finditer(text):
|
|
url = match.group(0).rstrip(".,;:!?") # strip trailing sentence punctuation
|
|
# Drop trailing markdown/code artifacts
|
|
while url and url[-1] in "()[]{}\"'":
|
|
url = url[:-1]
|
|
if len(url) < 20:
|
|
continue
|
|
try:
|
|
parsed = urlparse(url)
|
|
except ValueError:
|
|
continue
|
|
if not parsed.scheme or not parsed.netloc:
|
|
continue
|
|
path_lower = parsed.path.lower()
|
|
if any(path_lower.endswith(ext) for ext in ASSET_EXTENSIONS):
|
|
continue
|
|
if url in seen:
|
|
continue
|
|
seen.add(url)
|
|
urls.append(url)
|
|
|
|
return urls
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# URL classification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _is_private_ip(host: str) -> bool:
|
|
"""Return True if host is an RFC1918 or loopback IP literal."""
|
|
if not re.match(r"^\d+\.\d+\.\d+\.\d+$", host):
|
|
return False
|
|
parts = [int(p) for p in host.split(".")]
|
|
if parts[0] == 10:
|
|
return True
|
|
if parts[0] == 127:
|
|
return True
|
|
if parts[0] == 172 and 16 <= parts[1] <= 31:
|
|
return True
|
|
if parts[0] == 192 and parts[1] == 168:
|
|
return True
|
|
return False
|
|
|
|
|
|
def classify_url(url: str) -> str:
|
|
"""Classify a URL as 'harvest' (A/B), 'check' (C), or 'skip' (D)."""
|
|
try:
|
|
parsed = urlparse(url)
|
|
except ValueError:
|
|
return "skip"
|
|
|
|
host = (parsed.hostname or "").lower()
|
|
if not host:
|
|
return "skip"
|
|
|
|
if _is_private_ip(host):
|
|
return "skip"
|
|
|
|
for pattern in SKIP_DOMAIN_PATTERNS:
|
|
if re.search(pattern, host):
|
|
return "skip"
|
|
|
|
for pattern in C_TYPE_URL_PATTERNS:
|
|
if re.match(pattern, url):
|
|
return "check"
|
|
|
|
return "harvest"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filename derivation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def slugify(text: str) -> str:
|
|
text = text.lower()
|
|
text = re.sub(r"[^a-z0-9]+", "-", text)
|
|
return text.strip("-")
|
|
|
|
|
|
def raw_filename_for_url(url: str) -> str:
|
|
parsed = urlparse(url)
|
|
host = parsed.netloc.lower().replace("www.", "")
|
|
path = parsed.path.rstrip("/")
|
|
host_slug = slugify(host)
|
|
path_slug = slugify(path) if path else "index"
|
|
# Truncate overly long names
|
|
if len(path_slug) > 80:
|
|
path_slug = path_slug[:80].rstrip("-")
|
|
return f"{host_slug}-{path_slug}.md"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch cascade
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def run_fetch_command(cmd: list[str], timeout: int = FETCH_TIMEOUT) -> tuple[bool, str]:
|
|
"""Run a fetch command and return (success, output)."""
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
if result.returncode != 0:
|
|
return False, result.stderr.strip() or "non-zero exit"
|
|
return True, result.stdout
|
|
except subprocess.TimeoutExpired:
|
|
return False, "timeout"
|
|
except FileNotFoundError as e:
|
|
return False, f"command not found: {e}"
|
|
except OSError as e:
|
|
return False, str(e)
|
|
|
|
|
|
def validate_content(content: str) -> bool:
|
|
if not content or len(content.strip()) < MIN_CONTENT_LENGTH:
|
|
return False
|
|
low = content.lower()
|
|
if any(marker in low for marker in HTML_LEAK_MARKERS):
|
|
return False
|
|
return True
|
|
|
|
|
|
def fetch_with_trafilatura(url: str) -> tuple[bool, str]:
|
|
ok, out = run_fetch_command(
|
|
["trafilatura", "-u", url, "--markdown", "--no-comments", "--precision"]
|
|
)
|
|
if ok and validate_content(out):
|
|
return True, out
|
|
return False, out if not ok else "content validation failed"
|
|
|
|
|
|
def fetch_with_crawl4ai(url: str, stealth: bool = False) -> tuple[bool, str]:
|
|
cmd = ["crwl", url, "-o", "markdown-fit"]
|
|
if stealth:
|
|
cmd += [
|
|
"-b", "headless=true,user_agent_mode=random",
|
|
"-c", "magic=true,scan_full_page=true,page_timeout=20000",
|
|
]
|
|
else:
|
|
cmd += ["-c", "page_timeout=15000"]
|
|
ok, out = run_fetch_command(cmd, timeout=90)
|
|
if ok and validate_content(out):
|
|
return True, out
|
|
return False, out if not ok else "content validation failed"
|
|
|
|
|
|
def fetch_from_conversation(url: str, conversation_file: Path) -> tuple[bool, str]:
|
|
"""Fallback: scrape a block of content near where the URL appears in the transcript.
|
|
|
|
If the assistant fetched the URL during the session, some portion of the
|
|
content is likely inline in the transcript.
|
|
"""
|
|
try:
|
|
text = conversation_file.read_text(errors="replace")
|
|
except OSError:
|
|
return False, "cannot read conversation file"
|
|
|
|
idx = text.find(url)
|
|
if idx == -1:
|
|
return False, "url not found in conversation"
|
|
|
|
# Grab up to 2000 chars after the URL mention
|
|
snippet = text[idx : idx + 2000]
|
|
if not validate_content(snippet):
|
|
return False, "snippet failed validation"
|
|
return True, snippet
|
|
|
|
|
|
def fetch_cascade(url: str, conversation_file: Path) -> tuple[bool, str, str]:
|
|
"""Attempt the full fetch cascade. Returns (success, content, method_used)."""
|
|
ok, out = fetch_with_trafilatura(url)
|
|
if ok:
|
|
return True, out, "trafilatura"
|
|
|
|
ok, out = fetch_with_crawl4ai(url, stealth=False)
|
|
if ok:
|
|
return True, out, "crawl4ai"
|
|
|
|
ok, out = fetch_with_crawl4ai(url, stealth=True)
|
|
if ok:
|
|
return True, out, "crawl4ai-stealth"
|
|
|
|
ok, out = fetch_from_conversation(url, conversation_file)
|
|
if ok:
|
|
return True, out, "conversation-fallback"
|
|
|
|
return False, out, "failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Raw file storage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def content_hash(content: str) -> str:
|
|
return "sha256:" + hashlib.sha256(content.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def write_raw_file(
|
|
url: str,
|
|
content: str,
|
|
method: str,
|
|
discovered_in: Path,
|
|
) -> Path:
|
|
RAW_HARVESTED_DIR.mkdir(parents=True, exist_ok=True)
|
|
filename = raw_filename_for_url(url)
|
|
out_path = RAW_HARVESTED_DIR / filename
|
|
# Collision: append short hash
|
|
if out_path.exists():
|
|
suffix = hashlib.sha256(url.encode()).hexdigest()[:8]
|
|
out_path = RAW_HARVESTED_DIR / f"{out_path.stem}-{suffix}.md"
|
|
|
|
rel_discovered = discovered_in.relative_to(WIKI_DIR)
|
|
frontmatter = [
|
|
"---",
|
|
f"source_url: {url}",
|
|
f"fetched_date: {datetime.now(timezone.utc).date().isoformat()}",
|
|
f"fetch_method: {method}",
|
|
f"discovered_in: {rel_discovered}",
|
|
f"content_hash: {content_hash(content)}",
|
|
"---",
|
|
"",
|
|
]
|
|
out_path.write_text("\n".join(frontmatter) + content.strip() + "\n")
|
|
return out_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AI compilation via claude -p
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
COMPILE_PROMPT_TEMPLATE = """You are compiling a raw harvested source document into the LLM wiki at {wiki_dir}.
|
|
|
|
The wiki schema and conventions are defined in CLAUDE.md. The wiki has four
|
|
content directories: patterns/ (how), decisions/ (why), environments/ (where),
|
|
concepts/ (what). All pages require YAML frontmatter with title, type,
|
|
confidence, sources, related, last_compiled, last_verified.
|
|
|
|
IMPORTANT: Do NOT include `status`, `origin`, `staged_*`, `target_path`,
|
|
`modifies`, `harvest_source`, or `compilation_notes` fields in your page
|
|
frontmatter — the harvest script injects those automatically.
|
|
|
|
The raw source material is below. Decide what to do with it and emit the
|
|
result as a single JSON object on stdout (nothing else). Valid actions:
|
|
|
|
- "new_page" — create a new wiki page
|
|
- "update_page" — update an existing wiki page (add source, merge content)
|
|
- "both" — create a new page AND update an existing one
|
|
- "skip" — content isn't substantive enough to warrant a wiki page
|
|
|
|
JSON schema:
|
|
|
|
{{
|
|
"action": "new_page" | "update_page" | "both" | "skip",
|
|
"compilation_notes": "1-3 sentences explaining what you did and why",
|
|
"new_page": {{
|
|
"directory": "patterns" | "decisions" | "environments" | "concepts",
|
|
"filename": "kebab-case-name.md",
|
|
"content": "full markdown including frontmatter"
|
|
}},
|
|
"update_page": {{
|
|
"path": "patterns/existing-page.md",
|
|
"content": "full updated markdown including frontmatter"
|
|
}}
|
|
}}
|
|
|
|
Omit "new_page" if not applicable; omit "update_page" if not applicable. If
|
|
action is "skip", omit both. Do NOT include any prose outside the JSON.
|
|
|
|
Wiki index (so you know what pages exist):
|
|
|
|
{wiki_index}
|
|
|
|
Raw harvested source:
|
|
|
|
{raw_content}
|
|
|
|
Conversation context (the working session where this URL was cited):
|
|
|
|
{conversation_context}
|
|
"""
|
|
|
|
|
|
def call_claude_compile(
|
|
raw_path: Path,
|
|
raw_content: str,
|
|
conversation_file: Path,
|
|
) -> dict[str, Any] | None:
|
|
"""Invoke `claude -p` to compile the raw source into a staging wiki page."""
|
|
|
|
# Pick model by size
|
|
model = CLAUDE_SONNET_MODEL if len(raw_content) > SONNET_CONTENT_THRESHOLD else CLAUDE_HAIKU_MODEL
|
|
|
|
try:
|
|
wiki_index = INDEX_FILE.read_text()[:20_000]
|
|
except OSError:
|
|
wiki_index = ""
|
|
|
|
try:
|
|
conversation_context = conversation_file.read_text(errors="replace")[:8_000]
|
|
except OSError:
|
|
conversation_context = ""
|
|
|
|
prompt = COMPILE_PROMPT_TEMPLATE.format(
|
|
wiki_dir=str(WIKI_DIR),
|
|
wiki_index=wiki_index,
|
|
raw_content=raw_content[:40_000],
|
|
conversation_context=conversation_context,
|
|
)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["claude", "-p", "--model", model, "--output-format", "text", prompt],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600,
|
|
)
|
|
except FileNotFoundError:
|
|
print(" [warn] claude CLI not found — skipping compilation", file=sys.stderr)
|
|
return None
|
|
except subprocess.TimeoutExpired:
|
|
print(" [warn] claude -p timed out", file=sys.stderr)
|
|
return None
|
|
|
|
if result.returncode != 0:
|
|
print(f" [warn] claude -p failed: {result.stderr.strip()[:200]}", file=sys.stderr)
|
|
return None
|
|
|
|
# Extract JSON from output (may be wrapped in fences)
|
|
output = result.stdout.strip()
|
|
match = re.search(r"\{.*\}", output, re.DOTALL)
|
|
if not match:
|
|
print(f" [warn] no JSON found in claude output ({len(output)} chars)", file=sys.stderr)
|
|
return None
|
|
try:
|
|
return json.loads(match.group(0))
|
|
except json.JSONDecodeError as e:
|
|
print(f" [warn] JSON parse failed: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
STAGING_INJECT_TEMPLATE = (
|
|
"---\n"
|
|
"origin: automated\n"
|
|
"status: pending\n"
|
|
"staged_date: {staged_date}\n"
|
|
"staged_by: wiki-harvest\n"
|
|
"target_path: {target_path}\n"
|
|
"{modifies_line}"
|
|
"harvest_source: {source_url}\n"
|
|
"compilation_notes: {compilation_notes}\n"
|
|
)
|
|
|
|
|
|
def _inject_staging_frontmatter(
|
|
content: str,
|
|
source_url: str,
|
|
target_path: str,
|
|
compilation_notes: str,
|
|
modifies: str | None,
|
|
) -> str:
|
|
"""Insert staging metadata after the opening --- fence of the AI-generated content."""
|
|
# Strip existing status/origin/staged fields the AI may have added
|
|
content = re.sub(r"^(status|origin|staged_\w+|target_path|modifies|harvest_source|compilation_notes):.*\n", "", content, flags=re.MULTILINE)
|
|
|
|
modifies_line = f"modifies: {modifies}\n" if modifies else ""
|
|
# Collapse multi-line compilation notes to single line for safe YAML
|
|
clean_notes = compilation_notes.replace("\n", " ").replace("\r", " ").strip()
|
|
injection = STAGING_INJECT_TEMPLATE.format(
|
|
staged_date=datetime.now(timezone.utc).date().isoformat(),
|
|
target_path=target_path,
|
|
modifies_line=modifies_line,
|
|
source_url=source_url,
|
|
compilation_notes=clean_notes or "(none provided)",
|
|
)
|
|
|
|
if content.startswith("---\n"):
|
|
return injection + content[4:]
|
|
# AI forgot the fence — prepend full frontmatter
|
|
return injection + "---\n" + content
|
|
|
|
|
|
def _unique_staging_path(base: Path) -> Path:
|
|
"""Append a short hash if the target already exists."""
|
|
if not base.exists():
|
|
return base
|
|
suffix = hashlib.sha256(str(base).encode() + str(time.time()).encode()).hexdigest()[:6]
|
|
return base.with_stem(f"{base.stem}-{suffix}")
|
|
|
|
|
|
def apply_compile_result(
|
|
result: dict[str, Any],
|
|
source_url: str,
|
|
raw_path: Path,
|
|
) -> list[Path]:
|
|
"""Write the AI compilation result into staging/. Returns paths written."""
|
|
written: list[Path] = []
|
|
action = result.get("action", "skip")
|
|
if action == "skip":
|
|
return written
|
|
|
|
notes = result.get("compilation_notes", "")
|
|
|
|
# New page
|
|
new_page = result.get("new_page") or {}
|
|
if action in ("new_page", "both") and new_page.get("filename") and new_page.get("content"):
|
|
directory = new_page.get("directory", "patterns")
|
|
filename = new_page["filename"]
|
|
target_rel = f"{directory}/{filename}"
|
|
dest = _unique_staging_path(STAGING_DIR / target_rel)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
content = _inject_staging_frontmatter(
|
|
new_page["content"],
|
|
source_url=source_url,
|
|
target_path=target_rel,
|
|
compilation_notes=notes,
|
|
modifies=None,
|
|
)
|
|
dest.write_text(content)
|
|
written.append(dest)
|
|
|
|
# Update to existing page
|
|
update_page = result.get("update_page") or {}
|
|
if action in ("update_page", "both") and update_page.get("path") and update_page.get("content"):
|
|
target_rel = update_page["path"]
|
|
dest = _unique_staging_path(STAGING_DIR / target_rel)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
content = _inject_staging_frontmatter(
|
|
update_page["content"],
|
|
source_url=source_url,
|
|
target_path=target_rel,
|
|
compilation_notes=notes,
|
|
modifies=target_rel,
|
|
)
|
|
dest.write_text(content)
|
|
written.append(dest)
|
|
|
|
return written
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Wiki topic coverage check (for C-type URLs)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def wiki_covers_topic(url: str) -> bool:
|
|
"""Quick heuristic: check if any wiki page mentions terms from the URL path.
|
|
|
|
Used for C-type URLs (GitHub issues, SO questions) — only harvest if the
|
|
wiki already covers the topic.
|
|
"""
|
|
try:
|
|
parsed = urlparse(url)
|
|
except ValueError:
|
|
return False
|
|
|
|
# Derive candidate keywords from path
|
|
path_terms = [t for t in re.split(r"[/\-_]+", parsed.path.lower()) if len(t) >= 4]
|
|
if not path_terms:
|
|
return False
|
|
|
|
# Try qmd search if available; otherwise fall back to a simple grep
|
|
query = " ".join(path_terms[:5])
|
|
try:
|
|
result = subprocess.run(
|
|
["qmd", "search", query, "--json", "-n", "3"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
hits = data.get("results") if isinstance(data, dict) else data
|
|
return bool(hits)
|
|
except json.JSONDecodeError:
|
|
return False
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Conversation discovery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def parse_frontmatter(file_path: Path) -> dict[str, str]:
|
|
fm: dict[str, str] = {}
|
|
try:
|
|
text = file_path.read_text(errors="replace")
|
|
except OSError:
|
|
return fm
|
|
if not text.startswith("---\n"):
|
|
return fm
|
|
end = text.find("\n---\n", 4)
|
|
if end == -1:
|
|
return fm
|
|
for line in text[4:end].splitlines():
|
|
if ":" in line:
|
|
key, _, value = line.partition(":")
|
|
fm[key.strip()] = value.strip()
|
|
return fm
|
|
|
|
|
|
def discover_summarized_conversations(
|
|
project_filter: str | None = None,
|
|
file_filter: str | None = None,
|
|
) -> list[Path]:
|
|
if file_filter:
|
|
path = Path(file_filter)
|
|
if not path.is_absolute():
|
|
path = WIKI_DIR / path
|
|
return [path] if path.exists() else []
|
|
|
|
files: list[Path] = []
|
|
for project_dir in sorted(CONVERSATIONS_DIR.iterdir()):
|
|
if not project_dir.is_dir():
|
|
continue
|
|
if project_filter and project_dir.name != project_filter:
|
|
continue
|
|
for md in sorted(project_dir.glob("*.md")):
|
|
fm = parse_frontmatter(md)
|
|
if fm.get("status") == "summarized":
|
|
files.append(md)
|
|
return files
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def process_url(
|
|
url: str,
|
|
conversation_file: Path,
|
|
state: dict[str, Any],
|
|
dry_run: bool,
|
|
compile_enabled: bool,
|
|
) -> str:
|
|
"""Process a single URL. Returns a short status tag for logging."""
|
|
|
|
rel_conv = str(conversation_file.relative_to(WIKI_DIR))
|
|
today = datetime.now(timezone.utc).date().isoformat()
|
|
|
|
# Already harvested?
|
|
if url in state["harvested_urls"]:
|
|
entry = state["harvested_urls"][url]
|
|
if rel_conv not in entry.get("seen_in", []):
|
|
entry.setdefault("seen_in", []).append(rel_conv)
|
|
return "dup-harvested"
|
|
|
|
# Already rejected by AI?
|
|
if url in state["rejected_urls"]:
|
|
return "dup-rejected"
|
|
|
|
# Previously skipped?
|
|
if url in state["skipped_urls"]:
|
|
return "dup-skipped"
|
|
|
|
# Previously failed too many times?
|
|
if url in state["failed_urls"]:
|
|
if state["failed_urls"][url].get("attempts", 0) >= MAX_FAILED_ATTEMPTS:
|
|
return "dup-failed"
|
|
|
|
# Classify
|
|
classification = classify_url(url)
|
|
if classification == "skip":
|
|
state["skipped_urls"][url] = {
|
|
"reason": "domain-skip-list",
|
|
"first_seen": today,
|
|
}
|
|
return "skip-domain"
|
|
|
|
if classification == "check":
|
|
if not wiki_covers_topic(url):
|
|
state["skipped_urls"][url] = {
|
|
"reason": "c-type-no-wiki-match",
|
|
"first_seen": today,
|
|
}
|
|
return "skip-c-type"
|
|
|
|
if dry_run:
|
|
return f"would-harvest ({classification})"
|
|
|
|
# Fetch
|
|
print(f" [fetch] {url}")
|
|
ok, content, method = fetch_cascade(url, conversation_file)
|
|
time.sleep(FETCH_DELAY_SECONDS)
|
|
|
|
if not ok:
|
|
entry = state["failed_urls"].setdefault(url, {
|
|
"first_seen": today,
|
|
"attempts": 0,
|
|
})
|
|
entry["attempts"] += 1
|
|
entry["last_attempt"] = today
|
|
entry["reason"] = content[:200] if content else "unknown"
|
|
return f"fetch-failed ({method})"
|
|
|
|
# Save raw file
|
|
raw_path = write_raw_file(url, content, method, conversation_file)
|
|
rel_raw = str(raw_path.relative_to(WIKI_DIR))
|
|
|
|
state["harvested_urls"][url] = {
|
|
"first_seen": today,
|
|
"seen_in": [rel_conv],
|
|
"raw_file": rel_raw,
|
|
"wiki_pages": [],
|
|
"status": "raw",
|
|
"fetch_method": method,
|
|
"last_checked": today,
|
|
}
|
|
|
|
# Compile via claude -p
|
|
if compile_enabled:
|
|
print(f" [compile] {rel_raw}")
|
|
result = call_claude_compile(raw_path, content, conversation_file)
|
|
if result is None:
|
|
state["harvested_urls"][url]["status"] = "raw-compile-failed"
|
|
return f"raw-saved ({method}) compile-failed"
|
|
|
|
action = result.get("action", "skip")
|
|
if action == "skip":
|
|
state["rejected_urls"][url] = {
|
|
"reason": result.get("compilation_notes", "AI rejected"),
|
|
"rejected_date": today,
|
|
}
|
|
# Remove from harvested; keep raw file for audit
|
|
state["harvested_urls"].pop(url, None)
|
|
return f"rejected ({method})"
|
|
|
|
written = apply_compile_result(result, url, raw_path)
|
|
state["harvested_urls"][url]["status"] = "compiled"
|
|
state["harvested_urls"][url]["wiki_pages"] = [
|
|
str(p.relative_to(WIKI_DIR)) for p in written
|
|
]
|
|
return f"compiled ({method}) → {len(written)} staging file(s)"
|
|
|
|
return f"raw-saved ({method})"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
|
|
parser.add_argument("--project", help="Only process this project (wing) directory")
|
|
parser.add_argument("--file", help="Only process this conversation file")
|
|
parser.add_argument("--dry-run", action="store_true", help="Classify and report without fetching")
|
|
parser.add_argument("--no-compile", action="store_true", help="Fetch raw only; skip claude -p compile")
|
|
parser.add_argument("--limit", type=int, default=0, help="Stop after N new URLs processed (0 = no limit)")
|
|
args = parser.parse_args()
|
|
|
|
files = discover_summarized_conversations(args.project, args.file)
|
|
print(f"Scanning {len(files)} summarized conversation(s) for URLs...")
|
|
|
|
state = load_state()
|
|
stats: dict[str, int] = {}
|
|
processed_new = 0
|
|
|
|
for file_path in files:
|
|
urls = extract_urls_from_file(file_path)
|
|
if not urls:
|
|
continue
|
|
rel = file_path.relative_to(WIKI_DIR)
|
|
print(f"\n[{rel}] {len(urls)} URL(s)")
|
|
|
|
for url in urls:
|
|
status = process_url(
|
|
url,
|
|
file_path,
|
|
state,
|
|
dry_run=args.dry_run,
|
|
compile_enabled=not args.no_compile,
|
|
)
|
|
stats[status] = stats.get(status, 0) + 1
|
|
print(f" [{status}] {url}")
|
|
|
|
# Persist state after each non-dry URL
|
|
if not args.dry_run and not status.startswith("dup-"):
|
|
processed_new += 1
|
|
save_state(state)
|
|
|
|
if args.limit and processed_new >= args.limit:
|
|
print(f"\nLimit reached ({args.limit}); stopping.")
|
|
save_state(state)
|
|
_print_summary(stats)
|
|
return 0
|
|
|
|
if not args.dry_run:
|
|
save_state(state)
|
|
|
|
_print_summary(stats)
|
|
return 0
|
|
|
|
|
|
def _print_summary(stats: dict[str, int]) -> None:
|
|
print("\nSummary:")
|
|
for status, count in sorted(stats.items()):
|
|
print(f" {status}: {count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|