#!/usr/bin/env python3 """Harvest external reference URLs from summarized conversations into the wiki. Scans summarized conversation transcripts for URLs, classifies them, fetches the content, stores the raw source under raw/harvested/, and optionally calls `claude -p` to compile each raw file into a staging/ wiki page. Usage: python3 scripts/wiki-harvest.py # Process all summarized conversations python3 scripts/wiki-harvest.py --project mc # One project only python3 scripts/wiki-harvest.py --file PATH # One conversation file python3 scripts/wiki-harvest.py --dry-run # Show what would be harvested python3 scripts/wiki-harvest.py --no-compile # Fetch only, skip claude -p compile step python3 scripts/wiki-harvest.py --limit 10 # Cap number of URLs processed State is persisted in .harvest-state.json; existing URLs are deduplicated. """ from __future__ import annotations import argparse import hashlib import json import os import re import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urlparse # Force unbuffered output for pipe usage sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- WIKI_DIR = Path(os.environ.get("WIKI_DIR", str(Path.home() / "projects" / "wiki"))) CONVERSATIONS_DIR = WIKI_DIR / "conversations" RAW_HARVESTED_DIR = WIKI_DIR / "raw" / "harvested" STAGING_DIR = WIKI_DIR / "staging" INDEX_FILE = WIKI_DIR / "index.md" CLAUDE_MD = WIKI_DIR / "CLAUDE.md" HARVEST_STATE_FILE = WIKI_DIR / ".harvest-state.json" # ════════════════════════════════════════════════════════════════════════════ # CONFIGURE ME — URL classification rules # ════════════════════════════════════════════════════════════════════════════ # # Type D: always skip. Add your own internal/ephemeral/personal domains here. # Patterns use `re.search` so unanchored suffixes like `\.example\.com$` work. # Private IPs (10.x, 172.16-31.x, 192.168.x, 127.x) are detected separately. SKIP_DOMAIN_PATTERNS = [ # Generic: ephemeral / personal / chat / internal r"\.atlassian\.net$", r"^app\.asana\.com$", r"^(www\.)?slack\.com$", r"\.slack\.com$", r"^(www\.)?discord\.com$", r"^localhost$", r"^0\.0\.0\.0$", r"^mail\.google\.com$", r"^calendar\.google\.com$", r"^docs\.google\.com$", r"^drive\.google\.com$", r"^.+\.local$", r"^.+\.internal$", # Add your own internal domains below, for example: # r"\.mycompany\.com$", # r"^git\.mydomain\.com$", ] # Type C — issue trackers / Q&A; only harvest if topic touches existing wiki C_TYPE_URL_PATTERNS = [ r"^https?://github\.com/[^/]+/[^/]+/issues/\d+", r"^https?://github\.com/[^/]+/[^/]+/pull/\d+", r"^https?://github\.com/[^/]+/[^/]+/discussions/\d+", r"^https?://(www\.)?stackoverflow\.com/questions/\d+", r"^https?://(www\.)?serverfault\.com/questions/\d+", r"^https?://(www\.)?superuser\.com/questions/\d+", r"^https?://.+\.stackexchange\.com/questions/\d+", ] # Asset/image extensions to filter out ASSET_EXTENSIONS = { ".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".ico", ".bmp", ".css", ".js", ".mjs", ".woff", ".woff2", ".ttf", ".eot", ".mp4", ".webm", ".mov", ".mp3", ".wav", ".zip", ".tar", ".gz", ".bz2", } # URL regex — HTTP(S), stops at whitespace, brackets, and common markdown delimiters URL_REGEX = re.compile( r"https?://[^\s<>\"')\]}\\|`]+", re.IGNORECASE, ) # Claude CLI models CLAUDE_HAIKU_MODEL = "haiku" CLAUDE_SONNET_MODEL = "sonnet" SONNET_CONTENT_THRESHOLD = 20_000 # chars — larger than this → sonnet # Fetch behavior FETCH_DELAY_SECONDS = 2 MAX_FAILED_ATTEMPTS = 3 MIN_CONTENT_LENGTH = 100 FETCH_TIMEOUT = 45 # HTML-leak detection — content containing any of these is treated as a failed extraction HTML_LEAK_MARKERS = [" dict[str, Any]: defaults: dict[str, Any] = { "harvested_urls": {}, "skipped_urls": {}, "failed_urls": {}, "rejected_urls": {}, "last_run": None, } if HARVEST_STATE_FILE.exists(): try: with open(HARVEST_STATE_FILE) as f: state = json.load(f) for k, v in defaults.items(): state.setdefault(k, v) return state except (OSError, json.JSONDecodeError): pass return defaults def save_state(state: dict[str, Any]) -> None: state["last_run"] = datetime.now(timezone.utc).isoformat() tmp = HARVEST_STATE_FILE.with_suffix(".json.tmp") with open(tmp, "w") as f: json.dump(state, f, indent=2, sort_keys=True) tmp.replace(HARVEST_STATE_FILE) # --------------------------------------------------------------------------- # URL extraction # --------------------------------------------------------------------------- def extract_urls_from_file(file_path: Path) -> list[str]: """Extract all HTTP(S) URLs from a conversation markdown file. Filters: - Asset URLs (images, CSS, JS, fonts, media, archives) - URLs shorter than 20 characters - Duplicates within the same file """ try: text = file_path.read_text(errors="replace") except OSError: return [] seen: set[str] = set() urls: list[str] = [] for match in URL_REGEX.finditer(text): url = match.group(0).rstrip(".,;:!?") # strip trailing sentence punctuation # Drop trailing markdown/code artifacts while url and url[-1] in "()[]{}\"'": url = url[:-1] if len(url) < 20: continue try: parsed = urlparse(url) except ValueError: continue if not parsed.scheme or not parsed.netloc: continue path_lower = parsed.path.lower() if any(path_lower.endswith(ext) for ext in ASSET_EXTENSIONS): continue if url in seen: continue seen.add(url) urls.append(url) return urls # --------------------------------------------------------------------------- # URL classification # --------------------------------------------------------------------------- def _is_private_ip(host: str) -> bool: """Return True if host is an RFC1918 or loopback IP literal.""" if not re.match(r"^\d+\.\d+\.\d+\.\d+$", host): return False parts = [int(p) for p in host.split(".")] if parts[0] == 10: return True if parts[0] == 127: return True if parts[0] == 172 and 16 <= parts[1] <= 31: return True if parts[0] == 192 and parts[1] == 168: return True return False def classify_url(url: str) -> str: """Classify a URL as 'harvest' (A/B), 'check' (C), or 'skip' (D).""" try: parsed = urlparse(url) except ValueError: return "skip" host = (parsed.hostname or "").lower() if not host: return "skip" if _is_private_ip(host): return "skip" for pattern in SKIP_DOMAIN_PATTERNS: if re.search(pattern, host): return "skip" for pattern in C_TYPE_URL_PATTERNS: if re.match(pattern, url): return "check" return "harvest" # --------------------------------------------------------------------------- # Filename derivation # --------------------------------------------------------------------------- def slugify(text: str) -> str: text = text.lower() text = re.sub(r"[^a-z0-9]+", "-", text) return text.strip("-") def raw_filename_for_url(url: str) -> str: parsed = urlparse(url) host = parsed.netloc.lower().replace("www.", "") path = parsed.path.rstrip("/") host_slug = slugify(host) path_slug = slugify(path) if path else "index" # Truncate overly long names if len(path_slug) > 80: path_slug = path_slug[:80].rstrip("-") return f"{host_slug}-{path_slug}.md" # --------------------------------------------------------------------------- # Fetch cascade # --------------------------------------------------------------------------- def run_fetch_command(cmd: list[str], timeout: int = FETCH_TIMEOUT) -> tuple[bool, str]: """Run a fetch command and return (success, output).""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: return False, result.stderr.strip() or "non-zero exit" return True, result.stdout except subprocess.TimeoutExpired: return False, "timeout" except FileNotFoundError as e: return False, f"command not found: {e}" except OSError as e: return False, str(e) def validate_content(content: str) -> bool: if not content or len(content.strip()) < MIN_CONTENT_LENGTH: return False low = content.lower() if any(marker in low for marker in HTML_LEAK_MARKERS): return False return True def fetch_with_trafilatura(url: str) -> tuple[bool, str]: ok, out = run_fetch_command( ["trafilatura", "-u", url, "--markdown", "--no-comments", "--precision"] ) if ok and validate_content(out): return True, out return False, out if not ok else "content validation failed" def fetch_with_crawl4ai(url: str, stealth: bool = False) -> tuple[bool, str]: cmd = ["crwl", url, "-o", "markdown-fit"] if stealth: cmd += [ "-b", "headless=true,user_agent_mode=random", "-c", "magic=true,scan_full_page=true,page_timeout=20000", ] else: cmd += ["-c", "page_timeout=15000"] ok, out = run_fetch_command(cmd, timeout=90) if ok and validate_content(out): return True, out return False, out if not ok else "content validation failed" def fetch_from_conversation(url: str, conversation_file: Path) -> tuple[bool, str]: """Fallback: scrape a block of content near where the URL appears in the transcript. If the assistant fetched the URL during the session, some portion of the content is likely inline in the transcript. """ try: text = conversation_file.read_text(errors="replace") except OSError: return False, "cannot read conversation file" idx = text.find(url) if idx == -1: return False, "url not found in conversation" # Grab up to 2000 chars after the URL mention snippet = text[idx : idx + 2000] if not validate_content(snippet): return False, "snippet failed validation" return True, snippet def fetch_cascade(url: str, conversation_file: Path) -> tuple[bool, str, str]: """Attempt the full fetch cascade. Returns (success, content, method_used).""" ok, out = fetch_with_trafilatura(url) if ok: return True, out, "trafilatura" ok, out = fetch_with_crawl4ai(url, stealth=False) if ok: return True, out, "crawl4ai" ok, out = fetch_with_crawl4ai(url, stealth=True) if ok: return True, out, "crawl4ai-stealth" ok, out = fetch_from_conversation(url, conversation_file) if ok: return True, out, "conversation-fallback" return False, out, "failed" # --------------------------------------------------------------------------- # Raw file storage # --------------------------------------------------------------------------- def content_hash(content: str) -> str: return "sha256:" + hashlib.sha256(content.encode("utf-8")).hexdigest() def write_raw_file( url: str, content: str, method: str, discovered_in: Path, ) -> Path: RAW_HARVESTED_DIR.mkdir(parents=True, exist_ok=True) filename = raw_filename_for_url(url) out_path = RAW_HARVESTED_DIR / filename # Collision: append short hash if out_path.exists(): suffix = hashlib.sha256(url.encode()).hexdigest()[:8] out_path = RAW_HARVESTED_DIR / f"{out_path.stem}-{suffix}.md" rel_discovered = discovered_in.relative_to(WIKI_DIR) frontmatter = [ "---", f"source_url: {url}", f"fetched_date: {datetime.now(timezone.utc).date().isoformat()}", f"fetch_method: {method}", f"discovered_in: {rel_discovered}", f"content_hash: {content_hash(content)}", "---", "", ] out_path.write_text("\n".join(frontmatter) + content.strip() + "\n") return out_path # --------------------------------------------------------------------------- # AI compilation via claude -p # --------------------------------------------------------------------------- COMPILE_PROMPT_TEMPLATE = """You are compiling a raw harvested source document into the LLM wiki at {wiki_dir}. The wiki schema and conventions are defined in CLAUDE.md. The wiki has four content directories: patterns/ (how), decisions/ (why), environments/ (where), concepts/ (what). All pages require YAML frontmatter with title, type, confidence, sources, related, last_compiled, last_verified. IMPORTANT: Do NOT include `status`, `origin`, `staged_*`, `target_path`, `modifies`, `harvest_source`, or `compilation_notes` fields in your page frontmatter — the harvest script injects those automatically. The raw source material is below. Decide what to do with it and emit the result as a single JSON object on stdout (nothing else). Valid actions: - "new_page" — create a new wiki page - "update_page" — update an existing wiki page (add source, merge content) - "both" — create a new page AND update an existing one - "skip" — content isn't substantive enough to warrant a wiki page JSON schema: {{ "action": "new_page" | "update_page" | "both" | "skip", "compilation_notes": "1-3 sentences explaining what you did and why", "new_page": {{ "directory": "patterns" | "decisions" | "environments" | "concepts", "filename": "kebab-case-name.md", "content": "full markdown including frontmatter" }}, "update_page": {{ "path": "patterns/existing-page.md", "content": "full updated markdown including frontmatter" }} }} Omit "new_page" if not applicable; omit "update_page" if not applicable. If action is "skip", omit both. Do NOT include any prose outside the JSON. Wiki index (so you know what pages exist): {wiki_index} Raw harvested source: {raw_content} Conversation context (the working session where this URL was cited): {conversation_context} """ def call_claude_compile( raw_path: Path, raw_content: str, conversation_file: Path, ) -> dict[str, Any] | None: """Invoke `claude -p` to compile the raw source into a staging wiki page.""" # Pick model by size model = CLAUDE_SONNET_MODEL if len(raw_content) > SONNET_CONTENT_THRESHOLD else CLAUDE_HAIKU_MODEL try: wiki_index = INDEX_FILE.read_text()[:20_000] except OSError: wiki_index = "" try: conversation_context = conversation_file.read_text(errors="replace")[:8_000] except OSError: conversation_context = "" prompt = COMPILE_PROMPT_TEMPLATE.format( wiki_dir=str(WIKI_DIR), wiki_index=wiki_index, raw_content=raw_content[:40_000], conversation_context=conversation_context, ) try: result = subprocess.run( ["claude", "-p", "--model", model, "--output-format", "text", prompt], capture_output=True, text=True, timeout=600, ) except FileNotFoundError: print(" [warn] claude CLI not found — skipping compilation", file=sys.stderr) return None except subprocess.TimeoutExpired: print(" [warn] claude -p timed out", file=sys.stderr) return None if result.returncode != 0: print(f" [warn] claude -p failed: {result.stderr.strip()[:200]}", file=sys.stderr) return None # Extract JSON from output (may be wrapped in fences) output = result.stdout.strip() match = re.search(r"\{.*\}", output, re.DOTALL) if not match: print(f" [warn] no JSON found in claude output ({len(output)} chars)", file=sys.stderr) return None try: return json.loads(match.group(0)) except json.JSONDecodeError as e: print(f" [warn] JSON parse failed: {e}", file=sys.stderr) return None STAGING_INJECT_TEMPLATE = ( "---\n" "origin: automated\n" "status: pending\n" "staged_date: {staged_date}\n" "staged_by: wiki-harvest\n" "target_path: {target_path}\n" "{modifies_line}" "harvest_source: {source_url}\n" "compilation_notes: {compilation_notes}\n" ) def _inject_staging_frontmatter( content: str, source_url: str, target_path: str, compilation_notes: str, modifies: str | None, ) -> str: """Insert staging metadata after the opening --- fence of the AI-generated content.""" # Strip existing status/origin/staged fields the AI may have added content = re.sub(r"^(status|origin|staged_\w+|target_path|modifies|harvest_source|compilation_notes):.*\n", "", content, flags=re.MULTILINE) modifies_line = f"modifies: {modifies}\n" if modifies else "" # Collapse multi-line compilation notes to single line for safe YAML clean_notes = compilation_notes.replace("\n", " ").replace("\r", " ").strip() injection = STAGING_INJECT_TEMPLATE.format( staged_date=datetime.now(timezone.utc).date().isoformat(), target_path=target_path, modifies_line=modifies_line, source_url=source_url, compilation_notes=clean_notes or "(none provided)", ) if content.startswith("---\n"): return injection + content[4:] # AI forgot the fence — prepend full frontmatter return injection + "---\n" + content def _unique_staging_path(base: Path) -> Path: """Append a short hash if the target already exists.""" if not base.exists(): return base suffix = hashlib.sha256(str(base).encode() + str(time.time()).encode()).hexdigest()[:6] return base.with_stem(f"{base.stem}-{suffix}") def apply_compile_result( result: dict[str, Any], source_url: str, raw_path: Path, ) -> list[Path]: """Write the AI compilation result into staging/. Returns paths written.""" written: list[Path] = [] action = result.get("action", "skip") if action == "skip": return written notes = result.get("compilation_notes", "") # New page new_page = result.get("new_page") or {} if action in ("new_page", "both") and new_page.get("filename") and new_page.get("content"): directory = new_page.get("directory", "patterns") filename = new_page["filename"] target_rel = f"{directory}/{filename}" dest = _unique_staging_path(STAGING_DIR / target_rel) dest.parent.mkdir(parents=True, exist_ok=True) content = _inject_staging_frontmatter( new_page["content"], source_url=source_url, target_path=target_rel, compilation_notes=notes, modifies=None, ) dest.write_text(content) written.append(dest) # Update to existing page update_page = result.get("update_page") or {} if action in ("update_page", "both") and update_page.get("path") and update_page.get("content"): target_rel = update_page["path"] dest = _unique_staging_path(STAGING_DIR / target_rel) dest.parent.mkdir(parents=True, exist_ok=True) content = _inject_staging_frontmatter( update_page["content"], source_url=source_url, target_path=target_rel, compilation_notes=notes, modifies=target_rel, ) dest.write_text(content) written.append(dest) return written # --------------------------------------------------------------------------- # Wiki topic coverage check (for C-type URLs) # --------------------------------------------------------------------------- def wiki_covers_topic(url: str) -> bool: """Quick heuristic: check if any wiki page mentions terms from the URL path. Used for C-type URLs (GitHub issues, SO questions) — only harvest if the wiki already covers the topic. """ try: parsed = urlparse(url) except ValueError: return False # Derive candidate keywords from path path_terms = [t for t in re.split(r"[/\-_]+", parsed.path.lower()) if len(t) >= 4] if not path_terms: return False # Try qmd search if available; otherwise fall back to a simple grep query = " ".join(path_terms[:5]) try: result = subprocess.run( ["qmd", "search", query, "--json", "-n", "3"], capture_output=True, text=True, timeout=30, ) if result.returncode == 0 and result.stdout.strip(): try: data = json.loads(result.stdout) hits = data.get("results") if isinstance(data, dict) else data return bool(hits) except json.JSONDecodeError: return False except (FileNotFoundError, subprocess.TimeoutExpired): pass return False # --------------------------------------------------------------------------- # Conversation discovery # --------------------------------------------------------------------------- def parse_frontmatter(file_path: Path) -> dict[str, str]: fm: dict[str, str] = {} try: text = file_path.read_text(errors="replace") except OSError: return fm if not text.startswith("---\n"): return fm end = text.find("\n---\n", 4) if end == -1: return fm for line in text[4:end].splitlines(): if ":" in line: key, _, value = line.partition(":") fm[key.strip()] = value.strip() return fm def discover_summarized_conversations( project_filter: str | None = None, file_filter: str | None = None, ) -> list[Path]: if file_filter: path = Path(file_filter) if not path.is_absolute(): path = WIKI_DIR / path return [path] if path.exists() else [] files: list[Path] = [] for project_dir in sorted(CONVERSATIONS_DIR.iterdir()): if not project_dir.is_dir(): continue if project_filter and project_dir.name != project_filter: continue for md in sorted(project_dir.glob("*.md")): fm = parse_frontmatter(md) if fm.get("status") == "summarized": files.append(md) return files # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- def process_url( url: str, conversation_file: Path, state: dict[str, Any], dry_run: bool, compile_enabled: bool, ) -> str: """Process a single URL. Returns a short status tag for logging.""" rel_conv = str(conversation_file.relative_to(WIKI_DIR)) today = datetime.now(timezone.utc).date().isoformat() # Already harvested? if url in state["harvested_urls"]: entry = state["harvested_urls"][url] if rel_conv not in entry.get("seen_in", []): entry.setdefault("seen_in", []).append(rel_conv) return "dup-harvested" # Already rejected by AI? if url in state["rejected_urls"]: return "dup-rejected" # Previously skipped? if url in state["skipped_urls"]: return "dup-skipped" # Previously failed too many times? if url in state["failed_urls"]: if state["failed_urls"][url].get("attempts", 0) >= MAX_FAILED_ATTEMPTS: return "dup-failed" # Classify classification = classify_url(url) if classification == "skip": state["skipped_urls"][url] = { "reason": "domain-skip-list", "first_seen": today, } return "skip-domain" if classification == "check": if not wiki_covers_topic(url): state["skipped_urls"][url] = { "reason": "c-type-no-wiki-match", "first_seen": today, } return "skip-c-type" if dry_run: return f"would-harvest ({classification})" # Fetch print(f" [fetch] {url}") ok, content, method = fetch_cascade(url, conversation_file) time.sleep(FETCH_DELAY_SECONDS) if not ok: entry = state["failed_urls"].setdefault(url, { "first_seen": today, "attempts": 0, }) entry["attempts"] += 1 entry["last_attempt"] = today entry["reason"] = content[:200] if content else "unknown" return f"fetch-failed ({method})" # Save raw file raw_path = write_raw_file(url, content, method, conversation_file) rel_raw = str(raw_path.relative_to(WIKI_DIR)) state["harvested_urls"][url] = { "first_seen": today, "seen_in": [rel_conv], "raw_file": rel_raw, "wiki_pages": [], "status": "raw", "fetch_method": method, "last_checked": today, } # Compile via claude -p if compile_enabled: print(f" [compile] {rel_raw}") result = call_claude_compile(raw_path, content, conversation_file) if result is None: state["harvested_urls"][url]["status"] = "raw-compile-failed" return f"raw-saved ({method}) compile-failed" action = result.get("action", "skip") if action == "skip": state["rejected_urls"][url] = { "reason": result.get("compilation_notes", "AI rejected"), "rejected_date": today, } # Remove from harvested; keep raw file for audit state["harvested_urls"].pop(url, None) return f"rejected ({method})" written = apply_compile_result(result, url, raw_path) state["harvested_urls"][url]["status"] = "compiled" state["harvested_urls"][url]["wiki_pages"] = [ str(p.relative_to(WIKI_DIR)) for p in written ] return f"compiled ({method}) → {len(written)} staging file(s)" return f"raw-saved ({method})" def main() -> int: parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0]) parser.add_argument("--project", help="Only process this project (wing) directory") parser.add_argument("--file", help="Only process this conversation file") parser.add_argument("--dry-run", action="store_true", help="Classify and report without fetching") parser.add_argument("--no-compile", action="store_true", help="Fetch raw only; skip claude -p compile") parser.add_argument("--limit", type=int, default=0, help="Stop after N new URLs processed (0 = no limit)") args = parser.parse_args() files = discover_summarized_conversations(args.project, args.file) print(f"Scanning {len(files)} summarized conversation(s) for URLs...") state = load_state() stats: dict[str, int] = {} processed_new = 0 for file_path in files: urls = extract_urls_from_file(file_path) if not urls: continue rel = file_path.relative_to(WIKI_DIR) print(f"\n[{rel}] {len(urls)} URL(s)") for url in urls: status = process_url( url, file_path, state, dry_run=args.dry_run, compile_enabled=not args.no_compile, ) stats[status] = stats.get(status, 0) + 1 print(f" [{status}] {url}") # Persist state after each non-dry URL if not args.dry_run and not status.startswith("dup-"): processed_new += 1 save_state(state) if args.limit and processed_new >= args.limit: print(f"\nLimit reached ({args.limit}); stopping.") save_state(state) _print_summary(stats) return 0 if not args.dry_run: save_state(state) _print_summary(stats) return 0 def _print_summary(stats: dict[str, int]) -> None: print("\nSummary:") for status, count in sorted(stats.items()): print(f" {status}: {count}") if __name__ == "__main__": sys.exit(main())