Files
memex/scripts/wiki-harvest.py
Eric Turner ee54a2f5d4 Initial commit — memex
A compounding LLM-maintained knowledge wiki.

Synthesis of Andrej Karpathy's persistent-wiki gist and milla-jovovich's
mempalace, with an automation layer on top for conversation mining, URL
harvesting, human-in-the-loop staging, staleness decay, and hygiene.

Includes:
- 11 pipeline scripts (extract, summarize, index, harvest, stage,
  hygiene, maintain, sync, + shared library)
- Full docs: README, SETUP, ARCHITECTURE, DESIGN-RATIONALE, CUSTOMIZE
- Example CLAUDE.md files (wiki schema + global instructions) tuned for
  the three-collection qmd setup
- 171-test pytest suite (cross-platform, runs in ~1.3s)
- MIT licensed
2026-04-12 21:16:02 -06:00

879 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""Harvest external reference URLs from summarized conversations into the wiki.
Scans summarized conversation transcripts for URLs, classifies them, fetches
the content, stores the raw source under raw/harvested/, and optionally calls
`claude -p` to compile each raw file into a staging/ wiki page.
Usage:
python3 scripts/wiki-harvest.py # Process all summarized conversations
python3 scripts/wiki-harvest.py --project mc # One project only
python3 scripts/wiki-harvest.py --file PATH # One conversation file
python3 scripts/wiki-harvest.py --dry-run # Show what would be harvested
python3 scripts/wiki-harvest.py --no-compile # Fetch only, skip claude -p compile step
python3 scripts/wiki-harvest.py --limit 10 # Cap number of URLs processed
State is persisted in .harvest-state.json; existing URLs are deduplicated.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
# Force unbuffered output for pipe usage
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
WIKI_DIR = Path(os.environ.get("WIKI_DIR", str(Path.home() / "projects" / "wiki")))
CONVERSATIONS_DIR = WIKI_DIR / "conversations"
RAW_HARVESTED_DIR = WIKI_DIR / "raw" / "harvested"
STAGING_DIR = WIKI_DIR / "staging"
INDEX_FILE = WIKI_DIR / "index.md"
CLAUDE_MD = WIKI_DIR / "CLAUDE.md"
HARVEST_STATE_FILE = WIKI_DIR / ".harvest-state.json"
# ════════════════════════════════════════════════════════════════════════════
# CONFIGURE ME — URL classification rules
# ════════════════════════════════════════════════════════════════════════════
#
# Type D: always skip. Add your own internal/ephemeral/personal domains here.
# Patterns use `re.search` so unanchored suffixes like `\.example\.com$` work.
# Private IPs (10.x, 172.16-31.x, 192.168.x, 127.x) are detected separately.
SKIP_DOMAIN_PATTERNS = [
# Generic: ephemeral / personal / chat / internal
r"\.atlassian\.net$",
r"^app\.asana\.com$",
r"^(www\.)?slack\.com$",
r"\.slack\.com$",
r"^(www\.)?discord\.com$",
r"^localhost$",
r"^0\.0\.0\.0$",
r"^mail\.google\.com$",
r"^calendar\.google\.com$",
r"^docs\.google\.com$",
r"^drive\.google\.com$",
r"^.+\.local$",
r"^.+\.internal$",
# Add your own internal domains below, for example:
# r"\.mycompany\.com$",
# r"^git\.mydomain\.com$",
]
# Type C — issue trackers / Q&A; only harvest if topic touches existing wiki
C_TYPE_URL_PATTERNS = [
r"^https?://github\.com/[^/]+/[^/]+/issues/\d+",
r"^https?://github\.com/[^/]+/[^/]+/pull/\d+",
r"^https?://github\.com/[^/]+/[^/]+/discussions/\d+",
r"^https?://(www\.)?stackoverflow\.com/questions/\d+",
r"^https?://(www\.)?serverfault\.com/questions/\d+",
r"^https?://(www\.)?superuser\.com/questions/\d+",
r"^https?://.+\.stackexchange\.com/questions/\d+",
]
# Asset/image extensions to filter out
ASSET_EXTENSIONS = {
".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico", ".bmp",
".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot",
".mp4", ".webm", ".mov", ".mp3", ".wav",
".zip", ".tar", ".gz", ".bz2",
}
# URL regex — HTTP(S), stops at whitespace, brackets, and common markdown delimiters
URL_REGEX = re.compile(
r"https?://[^\s<>\"')\]}\\|`]+",
re.IGNORECASE,
)
# Claude CLI models
CLAUDE_HAIKU_MODEL = "haiku"
CLAUDE_SONNET_MODEL = "sonnet"
SONNET_CONTENT_THRESHOLD = 20_000 # chars — larger than this → sonnet
# Fetch behavior
FETCH_DELAY_SECONDS = 2
MAX_FAILED_ATTEMPTS = 3
MIN_CONTENT_LENGTH = 100
FETCH_TIMEOUT = 45
# HTML-leak detection — content containing any of these is treated as a failed extraction
HTML_LEAK_MARKERS = ["<div", "<script", "<nav", "<header", "<footer"]
# ---------------------------------------------------------------------------
# State management
# ---------------------------------------------------------------------------
def load_state() -> dict[str, Any]:
defaults: dict[str, Any] = {
"harvested_urls": {},
"skipped_urls": {},
"failed_urls": {},
"rejected_urls": {},
"last_run": None,
}
if HARVEST_STATE_FILE.exists():
try:
with open(HARVEST_STATE_FILE) as f:
state = json.load(f)
for k, v in defaults.items():
state.setdefault(k, v)
return state
except (OSError, json.JSONDecodeError):
pass
return defaults
def save_state(state: dict[str, Any]) -> None:
state["last_run"] = datetime.now(timezone.utc).isoformat()
tmp = HARVEST_STATE_FILE.with_suffix(".json.tmp")
with open(tmp, "w") as f:
json.dump(state, f, indent=2, sort_keys=True)
tmp.replace(HARVEST_STATE_FILE)
# ---------------------------------------------------------------------------
# URL extraction
# ---------------------------------------------------------------------------
def extract_urls_from_file(file_path: Path) -> list[str]:
"""Extract all HTTP(S) URLs from a conversation markdown file.
Filters:
- Asset URLs (images, CSS, JS, fonts, media, archives)
- URLs shorter than 20 characters
- Duplicates within the same file
"""
try:
text = file_path.read_text(errors="replace")
except OSError:
return []
seen: set[str] = set()
urls: list[str] = []
for match in URL_REGEX.finditer(text):
url = match.group(0).rstrip(".,;:!?") # strip trailing sentence punctuation
# Drop trailing markdown/code artifacts
while url and url[-1] in "()[]{}\"'":
url = url[:-1]
if len(url) < 20:
continue
try:
parsed = urlparse(url)
except ValueError:
continue
if not parsed.scheme or not parsed.netloc:
continue
path_lower = parsed.path.lower()
if any(path_lower.endswith(ext) for ext in ASSET_EXTENSIONS):
continue
if url in seen:
continue
seen.add(url)
urls.append(url)
return urls
# ---------------------------------------------------------------------------
# URL classification
# ---------------------------------------------------------------------------
def _is_private_ip(host: str) -> bool:
"""Return True if host is an RFC1918 or loopback IP literal."""
if not re.match(r"^\d+\.\d+\.\d+\.\d+$", host):
return False
parts = [int(p) for p in host.split(".")]
if parts[0] == 10:
return True
if parts[0] == 127:
return True
if parts[0] == 172 and 16 <= parts[1] <= 31:
return True
if parts[0] == 192 and parts[1] == 168:
return True
return False
def classify_url(url: str) -> str:
"""Classify a URL as 'harvest' (A/B), 'check' (C), or 'skip' (D)."""
try:
parsed = urlparse(url)
except ValueError:
return "skip"
host = (parsed.hostname or "").lower()
if not host:
return "skip"
if _is_private_ip(host):
return "skip"
for pattern in SKIP_DOMAIN_PATTERNS:
if re.search(pattern, host):
return "skip"
for pattern in C_TYPE_URL_PATTERNS:
if re.match(pattern, url):
return "check"
return "harvest"
# ---------------------------------------------------------------------------
# Filename derivation
# ---------------------------------------------------------------------------
def slugify(text: str) -> str:
text = text.lower()
text = re.sub(r"[^a-z0-9]+", "-", text)
return text.strip("-")
def raw_filename_for_url(url: str) -> str:
parsed = urlparse(url)
host = parsed.netloc.lower().replace("www.", "")
path = parsed.path.rstrip("/")
host_slug = slugify(host)
path_slug = slugify(path) if path else "index"
# Truncate overly long names
if len(path_slug) > 80:
path_slug = path_slug[:80].rstrip("-")
return f"{host_slug}-{path_slug}.md"
# ---------------------------------------------------------------------------
# Fetch cascade
# ---------------------------------------------------------------------------
def run_fetch_command(cmd: list[str], timeout: int = FETCH_TIMEOUT) -> tuple[bool, str]:
"""Run a fetch command and return (success, output)."""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0:
return False, result.stderr.strip() or "non-zero exit"
return True, result.stdout
except subprocess.TimeoutExpired:
return False, "timeout"
except FileNotFoundError as e:
return False, f"command not found: {e}"
except OSError as e:
return False, str(e)
def validate_content(content: str) -> bool:
if not content or len(content.strip()) < MIN_CONTENT_LENGTH:
return False
low = content.lower()
if any(marker in low for marker in HTML_LEAK_MARKERS):
return False
return True
def fetch_with_trafilatura(url: str) -> tuple[bool, str]:
ok, out = run_fetch_command(
["trafilatura", "-u", url, "--markdown", "--no-comments", "--precision"]
)
if ok and validate_content(out):
return True, out
return False, out if not ok else "content validation failed"
def fetch_with_crawl4ai(url: str, stealth: bool = False) -> tuple[bool, str]:
cmd = ["crwl", url, "-o", "markdown-fit"]
if stealth:
cmd += [
"-b", "headless=true,user_agent_mode=random",
"-c", "magic=true,scan_full_page=true,page_timeout=20000",
]
else:
cmd += ["-c", "page_timeout=15000"]
ok, out = run_fetch_command(cmd, timeout=90)
if ok and validate_content(out):
return True, out
return False, out if not ok else "content validation failed"
def fetch_from_conversation(url: str, conversation_file: Path) -> tuple[bool, str]:
"""Fallback: scrape a block of content near where the URL appears in the transcript.
If the assistant fetched the URL during the session, some portion of the
content is likely inline in the transcript.
"""
try:
text = conversation_file.read_text(errors="replace")
except OSError:
return False, "cannot read conversation file"
idx = text.find(url)
if idx == -1:
return False, "url not found in conversation"
# Grab up to 2000 chars after the URL mention
snippet = text[idx : idx + 2000]
if not validate_content(snippet):
return False, "snippet failed validation"
return True, snippet
def fetch_cascade(url: str, conversation_file: Path) -> tuple[bool, str, str]:
"""Attempt the full fetch cascade. Returns (success, content, method_used)."""
ok, out = fetch_with_trafilatura(url)
if ok:
return True, out, "trafilatura"
ok, out = fetch_with_crawl4ai(url, stealth=False)
if ok:
return True, out, "crawl4ai"
ok, out = fetch_with_crawl4ai(url, stealth=True)
if ok:
return True, out, "crawl4ai-stealth"
ok, out = fetch_from_conversation(url, conversation_file)
if ok:
return True, out, "conversation-fallback"
return False, out, "failed"
# ---------------------------------------------------------------------------
# Raw file storage
# ---------------------------------------------------------------------------
def content_hash(content: str) -> str:
return "sha256:" + hashlib.sha256(content.encode("utf-8")).hexdigest()
def write_raw_file(
url: str,
content: str,
method: str,
discovered_in: Path,
) -> Path:
RAW_HARVESTED_DIR.mkdir(parents=True, exist_ok=True)
filename = raw_filename_for_url(url)
out_path = RAW_HARVESTED_DIR / filename
# Collision: append short hash
if out_path.exists():
suffix = hashlib.sha256(url.encode()).hexdigest()[:8]
out_path = RAW_HARVESTED_DIR / f"{out_path.stem}-{suffix}.md"
rel_discovered = discovered_in.relative_to(WIKI_DIR)
frontmatter = [
"---",
f"source_url: {url}",
f"fetched_date: {datetime.now(timezone.utc).date().isoformat()}",
f"fetch_method: {method}",
f"discovered_in: {rel_discovered}",
f"content_hash: {content_hash(content)}",
"---",
"",
]
out_path.write_text("\n".join(frontmatter) + content.strip() + "\n")
return out_path
# ---------------------------------------------------------------------------
# AI compilation via claude -p
# ---------------------------------------------------------------------------
COMPILE_PROMPT_TEMPLATE = """You are compiling a raw harvested source document into the LLM wiki at {wiki_dir}.
The wiki schema and conventions are defined in CLAUDE.md. The wiki has four
content directories: patterns/ (how), decisions/ (why), environments/ (where),
concepts/ (what). All pages require YAML frontmatter with title, type,
confidence, sources, related, last_compiled, last_verified.
IMPORTANT: Do NOT include `status`, `origin`, `staged_*`, `target_path`,
`modifies`, `harvest_source`, or `compilation_notes` fields in your page
frontmatter — the harvest script injects those automatically.
The raw source material is below. Decide what to do with it and emit the
result as a single JSON object on stdout (nothing else). Valid actions:
- "new_page" — create a new wiki page
- "update_page" — update an existing wiki page (add source, merge content)
- "both" — create a new page AND update an existing one
- "skip" — content isn't substantive enough to warrant a wiki page
JSON schema:
{{
"action": "new_page" | "update_page" | "both" | "skip",
"compilation_notes": "1-3 sentences explaining what you did and why",
"new_page": {{
"directory": "patterns" | "decisions" | "environments" | "concepts",
"filename": "kebab-case-name.md",
"content": "full markdown including frontmatter"
}},
"update_page": {{
"path": "patterns/existing-page.md",
"content": "full updated markdown including frontmatter"
}}
}}
Omit "new_page" if not applicable; omit "update_page" if not applicable. If
action is "skip", omit both. Do NOT include any prose outside the JSON.
Wiki index (so you know what pages exist):
{wiki_index}
Raw harvested source:
{raw_content}
Conversation context (the working session where this URL was cited):
{conversation_context}
"""
def call_claude_compile(
raw_path: Path,
raw_content: str,
conversation_file: Path,
) -> dict[str, Any] | None:
"""Invoke `claude -p` to compile the raw source into a staging wiki page."""
# Pick model by size
model = CLAUDE_SONNET_MODEL if len(raw_content) > SONNET_CONTENT_THRESHOLD else CLAUDE_HAIKU_MODEL
try:
wiki_index = INDEX_FILE.read_text()[:20_000]
except OSError:
wiki_index = ""
try:
conversation_context = conversation_file.read_text(errors="replace")[:8_000]
except OSError:
conversation_context = ""
prompt = COMPILE_PROMPT_TEMPLATE.format(
wiki_dir=str(WIKI_DIR),
wiki_index=wiki_index,
raw_content=raw_content[:40_000],
conversation_context=conversation_context,
)
try:
result = subprocess.run(
["claude", "-p", "--model", model, "--output-format", "text", prompt],
capture_output=True,
text=True,
timeout=600,
)
except FileNotFoundError:
print(" [warn] claude CLI not found — skipping compilation", file=sys.stderr)
return None
except subprocess.TimeoutExpired:
print(" [warn] claude -p timed out", file=sys.stderr)
return None
if result.returncode != 0:
print(f" [warn] claude -p failed: {result.stderr.strip()[:200]}", file=sys.stderr)
return None
# Extract JSON from output (may be wrapped in fences)
output = result.stdout.strip()
match = re.search(r"\{.*\}", output, re.DOTALL)
if not match:
print(f" [warn] no JSON found in claude output ({len(output)} chars)", file=sys.stderr)
return None
try:
return json.loads(match.group(0))
except json.JSONDecodeError as e:
print(f" [warn] JSON parse failed: {e}", file=sys.stderr)
return None
STAGING_INJECT_TEMPLATE = (
"---\n"
"origin: automated\n"
"status: pending\n"
"staged_date: {staged_date}\n"
"staged_by: wiki-harvest\n"
"target_path: {target_path}\n"
"{modifies_line}"
"harvest_source: {source_url}\n"
"compilation_notes: {compilation_notes}\n"
)
def _inject_staging_frontmatter(
content: str,
source_url: str,
target_path: str,
compilation_notes: str,
modifies: str | None,
) -> str:
"""Insert staging metadata after the opening --- fence of the AI-generated content."""
# Strip existing status/origin/staged fields the AI may have added
content = re.sub(r"^(status|origin|staged_\w+|target_path|modifies|harvest_source|compilation_notes):.*\n", "", content, flags=re.MULTILINE)
modifies_line = f"modifies: {modifies}\n" if modifies else ""
# Collapse multi-line compilation notes to single line for safe YAML
clean_notes = compilation_notes.replace("\n", " ").replace("\r", " ").strip()
injection = STAGING_INJECT_TEMPLATE.format(
staged_date=datetime.now(timezone.utc).date().isoformat(),
target_path=target_path,
modifies_line=modifies_line,
source_url=source_url,
compilation_notes=clean_notes or "(none provided)",
)
if content.startswith("---\n"):
return injection + content[4:]
# AI forgot the fence — prepend full frontmatter
return injection + "---\n" + content
def _unique_staging_path(base: Path) -> Path:
"""Append a short hash if the target already exists."""
if not base.exists():
return base
suffix = hashlib.sha256(str(base).encode() + str(time.time()).encode()).hexdigest()[:6]
return base.with_stem(f"{base.stem}-{suffix}")
def apply_compile_result(
result: dict[str, Any],
source_url: str,
raw_path: Path,
) -> list[Path]:
"""Write the AI compilation result into staging/. Returns paths written."""
written: list[Path] = []
action = result.get("action", "skip")
if action == "skip":
return written
notes = result.get("compilation_notes", "")
# New page
new_page = result.get("new_page") or {}
if action in ("new_page", "both") and new_page.get("filename") and new_page.get("content"):
directory = new_page.get("directory", "patterns")
filename = new_page["filename"]
target_rel = f"{directory}/{filename}"
dest = _unique_staging_path(STAGING_DIR / target_rel)
dest.parent.mkdir(parents=True, exist_ok=True)
content = _inject_staging_frontmatter(
new_page["content"],
source_url=source_url,
target_path=target_rel,
compilation_notes=notes,
modifies=None,
)
dest.write_text(content)
written.append(dest)
# Update to existing page
update_page = result.get("update_page") or {}
if action in ("update_page", "both") and update_page.get("path") and update_page.get("content"):
target_rel = update_page["path"]
dest = _unique_staging_path(STAGING_DIR / target_rel)
dest.parent.mkdir(parents=True, exist_ok=True)
content = _inject_staging_frontmatter(
update_page["content"],
source_url=source_url,
target_path=target_rel,
compilation_notes=notes,
modifies=target_rel,
)
dest.write_text(content)
written.append(dest)
return written
# ---------------------------------------------------------------------------
# Wiki topic coverage check (for C-type URLs)
# ---------------------------------------------------------------------------
def wiki_covers_topic(url: str) -> bool:
"""Quick heuristic: check if any wiki page mentions terms from the URL path.
Used for C-type URLs (GitHub issues, SO questions) — only harvest if the
wiki already covers the topic.
"""
try:
parsed = urlparse(url)
except ValueError:
return False
# Derive candidate keywords from path
path_terms = [t for t in re.split(r"[/\-_]+", parsed.path.lower()) if len(t) >= 4]
if not path_terms:
return False
# Try qmd search if available; otherwise fall back to a simple grep
query = " ".join(path_terms[:5])
try:
result = subprocess.run(
["qmd", "search", query, "--json", "-n", "3"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0 and result.stdout.strip():
try:
data = json.loads(result.stdout)
hits = data.get("results") if isinstance(data, dict) else data
return bool(hits)
except json.JSONDecodeError:
return False
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return False
# ---------------------------------------------------------------------------
# Conversation discovery
# ---------------------------------------------------------------------------
def parse_frontmatter(file_path: Path) -> dict[str, str]:
fm: dict[str, str] = {}
try:
text = file_path.read_text(errors="replace")
except OSError:
return fm
if not text.startswith("---\n"):
return fm
end = text.find("\n---\n", 4)
if end == -1:
return fm
for line in text[4:end].splitlines():
if ":" in line:
key, _, value = line.partition(":")
fm[key.strip()] = value.strip()
return fm
def discover_summarized_conversations(
project_filter: str | None = None,
file_filter: str | None = None,
) -> list[Path]:
if file_filter:
path = Path(file_filter)
if not path.is_absolute():
path = WIKI_DIR / path
return [path] if path.exists() else []
files: list[Path] = []
for project_dir in sorted(CONVERSATIONS_DIR.iterdir()):
if not project_dir.is_dir():
continue
if project_filter and project_dir.name != project_filter:
continue
for md in sorted(project_dir.glob("*.md")):
fm = parse_frontmatter(md)
if fm.get("status") == "summarized":
files.append(md)
return files
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def process_url(
url: str,
conversation_file: Path,
state: dict[str, Any],
dry_run: bool,
compile_enabled: bool,
) -> str:
"""Process a single URL. Returns a short status tag for logging."""
rel_conv = str(conversation_file.relative_to(WIKI_DIR))
today = datetime.now(timezone.utc).date().isoformat()
# Already harvested?
if url in state["harvested_urls"]:
entry = state["harvested_urls"][url]
if rel_conv not in entry.get("seen_in", []):
entry.setdefault("seen_in", []).append(rel_conv)
return "dup-harvested"
# Already rejected by AI?
if url in state["rejected_urls"]:
return "dup-rejected"
# Previously skipped?
if url in state["skipped_urls"]:
return "dup-skipped"
# Previously failed too many times?
if url in state["failed_urls"]:
if state["failed_urls"][url].get("attempts", 0) >= MAX_FAILED_ATTEMPTS:
return "dup-failed"
# Classify
classification = classify_url(url)
if classification == "skip":
state["skipped_urls"][url] = {
"reason": "domain-skip-list",
"first_seen": today,
}
return "skip-domain"
if classification == "check":
if not wiki_covers_topic(url):
state["skipped_urls"][url] = {
"reason": "c-type-no-wiki-match",
"first_seen": today,
}
return "skip-c-type"
if dry_run:
return f"would-harvest ({classification})"
# Fetch
print(f" [fetch] {url}")
ok, content, method = fetch_cascade(url, conversation_file)
time.sleep(FETCH_DELAY_SECONDS)
if not ok:
entry = state["failed_urls"].setdefault(url, {
"first_seen": today,
"attempts": 0,
})
entry["attempts"] += 1
entry["last_attempt"] = today
entry["reason"] = content[:200] if content else "unknown"
return f"fetch-failed ({method})"
# Save raw file
raw_path = write_raw_file(url, content, method, conversation_file)
rel_raw = str(raw_path.relative_to(WIKI_DIR))
state["harvested_urls"][url] = {
"first_seen": today,
"seen_in": [rel_conv],
"raw_file": rel_raw,
"wiki_pages": [],
"status": "raw",
"fetch_method": method,
"last_checked": today,
}
# Compile via claude -p
if compile_enabled:
print(f" [compile] {rel_raw}")
result = call_claude_compile(raw_path, content, conversation_file)
if result is None:
state["harvested_urls"][url]["status"] = "raw-compile-failed"
return f"raw-saved ({method}) compile-failed"
action = result.get("action", "skip")
if action == "skip":
state["rejected_urls"][url] = {
"reason": result.get("compilation_notes", "AI rejected"),
"rejected_date": today,
}
# Remove from harvested; keep raw file for audit
state["harvested_urls"].pop(url, None)
return f"rejected ({method})"
written = apply_compile_result(result, url, raw_path)
state["harvested_urls"][url]["status"] = "compiled"
state["harvested_urls"][url]["wiki_pages"] = [
str(p.relative_to(WIKI_DIR)) for p in written
]
return f"compiled ({method}) → {len(written)} staging file(s)"
return f"raw-saved ({method})"
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
parser.add_argument("--project", help="Only process this project (wing) directory")
parser.add_argument("--file", help="Only process this conversation file")
parser.add_argument("--dry-run", action="store_true", help="Classify and report without fetching")
parser.add_argument("--no-compile", action="store_true", help="Fetch raw only; skip claude -p compile")
parser.add_argument("--limit", type=int, default=0, help="Stop after N new URLs processed (0 = no limit)")
args = parser.parse_args()
files = discover_summarized_conversations(args.project, args.file)
print(f"Scanning {len(files)} summarized conversation(s) for URLs...")
state = load_state()
stats: dict[str, int] = {}
processed_new = 0
for file_path in files:
urls = extract_urls_from_file(file_path)
if not urls:
continue
rel = file_path.relative_to(WIKI_DIR)
print(f"\n[{rel}] {len(urls)} URL(s)")
for url in urls:
status = process_url(
url,
file_path,
state,
dry_run=args.dry_run,
compile_enabled=not args.no_compile,
)
stats[status] = stats.get(status, 0) + 1
print(f" [{status}] {url}")
# Persist state after each non-dry URL
if not args.dry_run and not status.startswith("dup-"):
processed_new += 1
save_state(state)
if args.limit and processed_new >= args.limit:
print(f"\nLimit reached ({args.limit}); stopping.")
save_state(state)
_print_summary(stats)
return 0
if not args.dry_run:
save_state(state)
_print_summary(stats)
return 0
def _print_summary(stats: dict[str, int]) -> None:
print("\nSummary:")
for status, count in sorted(stats.items()):
print(f" {status}: {count}")
if __name__ == "__main__":
sys.exit(main())