A compounding LLM-maintained knowledge wiki. Synthesis of Andrej Karpathy's persistent-wiki gist and milla-jovovich's mempalace, with an automation layer on top for conversation mining, URL harvesting, human-in-the-loop staging, staleness decay, and hygiene. Includes: - 11 pipeline scripts (extract, summarize, index, harvest, stage, hygiene, maintain, sync, + shared library) - Full docs: README, SETUP, ARCHITECTURE, DESIGN-RATIONALE, CUSTOMIZE - Example CLAUDE.md files (wiki schema + global instructions) tuned for the three-collection qmd setup - 171-test pytest suite (cross-platform, runs in ~1.3s) - MIT licensed
811 lines
28 KiB
Python
Executable File
811 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Extract Claude Code session JSONL files into clean markdown transcripts.
|
|
|
|
Phase A of the conversation mining pipeline. Deterministic, no LLM dependency.
|
|
Handles incremental extraction via byte offset tracking for sessions that span
|
|
hours or days.
|
|
|
|
Usage:
|
|
python3 extract-sessions.py # Extract all new sessions
|
|
python3 extract-sessions.py --project mc # Extract one project
|
|
python3 extract-sessions.py --session 0a543572 # Extract specific session
|
|
python3 extract-sessions.py --dry-run # Show what would be extracted
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CLAUDE_PROJECTS_DIR = Path(os.environ.get("CLAUDE_PROJECTS_DIR", str(Path.home() / ".claude" / "projects")))
|
|
WIKI_DIR = Path(os.environ.get("WIKI_DIR", str(Path.home() / "projects" / "wiki")))
|
|
CONVERSATIONS_DIR = WIKI_DIR / "conversations"
|
|
MINE_STATE_FILE = WIKI_DIR / ".mine-state.json"
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CONFIGURE ME — Map Claude project directory suffixes to wiki project codes
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
#
|
|
# Claude Code stores sessions under ~/.claude/projects/<hashed-path>/. The
|
|
# directory name is derived from the absolute path of your project, so it
|
|
# looks like `-Users-alice-projects-myapp` or `-home-alice-projects-myapp`.
|
|
#
|
|
# This map tells the extractor which suffix maps to which short wiki code
|
|
# (the "wing"). More specific suffixes should appear first — the extractor
|
|
# picks the first match. Everything unmatched goes into `general/`.
|
|
#
|
|
# Examples — replace with your own projects:
|
|
PROJECT_MAP: dict[str, str] = {
|
|
# More specific suffixes first
|
|
"projects-wiki": "wiki", # this wiki itself
|
|
"-claude": "cl", # ~/.claude config repo
|
|
# Add your real projects here:
|
|
# "my-webapp": "web",
|
|
# "my-mobile-app": "mob",
|
|
# "work-mono-repo": "work",
|
|
# Catch-all — Claude sessions outside any tracked project
|
|
"-home": "general",
|
|
"-Users": "general",
|
|
}
|
|
|
|
# Tool call names to keep full output for
|
|
KEEP_FULL_OUTPUT_TOOLS = {"Bash", "Skill"}
|
|
|
|
# Tool call names to summarize (just note what was accessed)
|
|
SUMMARIZE_TOOLS = {"Read", "Glob", "Grep"}
|
|
|
|
# Tool call names to keep with path + change summary
|
|
KEEP_CHANGE_TOOLS = {"Edit", "Write"}
|
|
|
|
# Tool call names to keep description + result summary
|
|
KEEP_SUMMARY_TOOLS = {"Agent"}
|
|
|
|
# Max lines of Bash output to keep
|
|
MAX_BASH_OUTPUT_LINES = 200
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def load_state() -> dict[str, Any]:
|
|
"""Load mining state from .mine-state.json."""
|
|
if MINE_STATE_FILE.exists():
|
|
with open(MINE_STATE_FILE) as f:
|
|
return json.load(f)
|
|
return {"sessions": {}, "last_run": None}
|
|
|
|
|
|
def save_state(state: dict[str, Any]) -> None:
|
|
"""Save mining state to .mine-state.json."""
|
|
state["last_run"] = datetime.now(timezone.utc).isoformat()
|
|
with open(MINE_STATE_FILE, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Project mapping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def resolve_project_code(dir_name: str) -> str | None:
|
|
"""Map a Claude project directory name to a wiki project code.
|
|
|
|
Directory names look like: -Users-alice-projects-myapp or -home-alice-projects-myapp
|
|
"""
|
|
for suffix, code in PROJECT_MAP.items():
|
|
if dir_name.endswith(suffix):
|
|
return code
|
|
return None
|
|
|
|
|
|
def discover_sessions(
|
|
project_filter: str | None = None,
|
|
session_filter: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Discover JSONL session files from Claude projects directory."""
|
|
sessions = []
|
|
|
|
if not CLAUDE_PROJECTS_DIR.exists():
|
|
print(f"Claude projects directory not found: {CLAUDE_PROJECTS_DIR}", file=sys.stderr)
|
|
return sessions
|
|
|
|
for proj_dir in sorted(CLAUDE_PROJECTS_DIR.iterdir()):
|
|
if not proj_dir.is_dir():
|
|
continue
|
|
|
|
code = resolve_project_code(proj_dir.name)
|
|
if code is None:
|
|
continue
|
|
|
|
if project_filter and code != project_filter:
|
|
continue
|
|
|
|
for jsonl_file in sorted(proj_dir.glob("*.jsonl")):
|
|
session_id = jsonl_file.stem
|
|
if session_filter and not session_id.startswith(session_filter):
|
|
continue
|
|
|
|
sessions.append({
|
|
"session_id": session_id,
|
|
"project": code,
|
|
"jsonl_path": jsonl_file,
|
|
"file_size": jsonl_file.stat().st_size,
|
|
})
|
|
|
|
return sessions
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSONL parsing and filtering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def extract_timestamp(obj: dict[str, Any]) -> str | None:
|
|
"""Get timestamp from a JSONL record."""
|
|
ts = obj.get("timestamp")
|
|
if isinstance(ts, str):
|
|
return ts
|
|
if isinstance(ts, (int, float)):
|
|
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat()
|
|
return None
|
|
|
|
|
|
def extract_session_date(obj: dict[str, Any]) -> str:
|
|
"""Get date string (YYYY-MM-DD) from a JSONL record timestamp."""
|
|
ts = extract_timestamp(obj)
|
|
if ts:
|
|
try:
|
|
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
|
return dt.strftime("%Y-%m-%d")
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
|
|
def truncate_lines(text: str, max_lines: int) -> str:
|
|
"""Truncate text to max_lines, adding a note if truncated."""
|
|
lines = text.splitlines()
|
|
if len(lines) <= max_lines:
|
|
return text
|
|
kept = lines[:max_lines]
|
|
omitted = len(lines) - max_lines
|
|
kept.append(f"\n[... {omitted} lines truncated ...]")
|
|
return "\n".join(kept)
|
|
|
|
|
|
def format_tool_use(name: str, input_data: dict[str, Any]) -> str | None:
|
|
"""Format a tool_use content block for the transcript."""
|
|
if name in KEEP_FULL_OUTPUT_TOOLS:
|
|
if name == "Bash":
|
|
cmd = input_data.get("command", "")
|
|
desc = input_data.get("description", "")
|
|
label = desc if desc else cmd[:100]
|
|
return f"**[Bash]**: `{label}`"
|
|
if name == "Skill":
|
|
skill = input_data.get("skill", "")
|
|
args = input_data.get("args", "")
|
|
return f"**[Skill]**: /{skill} {args}".strip()
|
|
|
|
if name in SUMMARIZE_TOOLS:
|
|
if name == "Read":
|
|
fp = input_data.get("file_path", "?")
|
|
return f"[Read: {fp}]"
|
|
if name == "Glob":
|
|
pattern = input_data.get("pattern", "?")
|
|
return f"[Glob: {pattern}]"
|
|
if name == "Grep":
|
|
pattern = input_data.get("pattern", "?")
|
|
path = input_data.get("path", "")
|
|
return f"[Grep: '{pattern}' in {path}]" if path else f"[Grep: '{pattern}']"
|
|
|
|
if name in KEEP_CHANGE_TOOLS:
|
|
if name == "Edit":
|
|
fp = input_data.get("file_path", "?")
|
|
old = input_data.get("old_string", "")[:60]
|
|
return f"**[Edit]**: {fp} — replaced '{old}...'"
|
|
if name == "Write":
|
|
fp = input_data.get("file_path", "?")
|
|
content_len = len(input_data.get("content", ""))
|
|
return f"**[Write]**: {fp} ({content_len} chars)"
|
|
|
|
if name in KEEP_SUMMARY_TOOLS:
|
|
if name == "Agent":
|
|
desc = input_data.get("description", "?")
|
|
return f"**[Agent]**: {desc}"
|
|
|
|
if name == "ToolSearch":
|
|
return None # noise
|
|
if name == "TaskCreate":
|
|
subj = input_data.get("subject", "?")
|
|
return f"[TaskCreate: {subj}]"
|
|
if name == "TaskUpdate":
|
|
tid = input_data.get("taskId", "?")
|
|
status = input_data.get("status", "?")
|
|
return f"[TaskUpdate: #{tid} → {status}]"
|
|
|
|
# Default: note the tool was called
|
|
return f"[{name}]"
|
|
|
|
|
|
def format_tool_result(
|
|
tool_name: str | None,
|
|
content: Any,
|
|
is_error: bool = False,
|
|
) -> str | None:
|
|
"""Format a tool_result content block for the transcript."""
|
|
text = ""
|
|
if isinstance(content, str):
|
|
text = content
|
|
elif isinstance(content, list):
|
|
parts = []
|
|
for item in content:
|
|
if isinstance(item, dict) and item.get("type") == "text":
|
|
parts.append(item.get("text", ""))
|
|
text = "\n".join(parts)
|
|
|
|
if not text.strip():
|
|
return None
|
|
|
|
if is_error:
|
|
return f"**[ERROR]**:\n```\n{truncate_lines(text, MAX_BASH_OUTPUT_LINES)}\n```"
|
|
|
|
if tool_name in KEEP_FULL_OUTPUT_TOOLS:
|
|
return f"```\n{truncate_lines(text, MAX_BASH_OUTPUT_LINES)}\n```"
|
|
|
|
if tool_name in SUMMARIZE_TOOLS:
|
|
# Just note the result size
|
|
line_count = len(text.splitlines())
|
|
char_count = len(text)
|
|
return f"[→ {line_count} lines, {char_count} chars]"
|
|
|
|
if tool_name in KEEP_CHANGE_TOOLS:
|
|
return None # The tool_use already captured what changed
|
|
|
|
if tool_name in KEEP_SUMMARY_TOOLS:
|
|
# Keep a summary of agent results
|
|
summary = text[:300]
|
|
if len(text) > 300:
|
|
summary += "..."
|
|
return f"> {summary}"
|
|
|
|
return None
|
|
|
|
|
|
def parse_content_blocks(
|
|
content: list[dict[str, Any]],
|
|
role: str,
|
|
tool_id_to_name: dict[str, str],
|
|
) -> list[str]:
|
|
"""Parse content blocks from a message into transcript lines."""
|
|
parts: list[str] = []
|
|
|
|
for block in content:
|
|
block_type = block.get("type")
|
|
|
|
if block_type == "text":
|
|
text = block.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
# Skip system-reminder content
|
|
if "<system-reminder>" in text:
|
|
# Strip system reminder tags and their content
|
|
text = re.sub(
|
|
r"<system-reminder>.*?</system-reminder>",
|
|
"",
|
|
text,
|
|
flags=re.DOTALL,
|
|
).strip()
|
|
# Skip local-command noise
|
|
if text.startswith("<local-command"):
|
|
continue
|
|
if text:
|
|
parts.append(text)
|
|
|
|
elif block_type == "thinking":
|
|
# Skip thinking blocks
|
|
continue
|
|
|
|
elif block_type == "tool_use":
|
|
tool_name = block.get("name", "unknown")
|
|
tool_id = block.get("id", "")
|
|
input_data = block.get("input", {})
|
|
tool_id_to_name[tool_id] = tool_name
|
|
formatted = format_tool_use(tool_name, input_data)
|
|
if formatted:
|
|
parts.append(formatted)
|
|
|
|
elif block_type == "tool_result":
|
|
tool_id = block.get("tool_use_id", "")
|
|
tool_name = tool_id_to_name.get(tool_id)
|
|
is_error = block.get("is_error", False)
|
|
result_content = block.get("content", "")
|
|
formatted = format_tool_result(tool_name, result_content, is_error)
|
|
if formatted:
|
|
parts.append(formatted)
|
|
|
|
return parts
|
|
|
|
|
|
def process_jsonl(
|
|
jsonl_path: Path,
|
|
byte_offset: int = 0,
|
|
) -> tuple[list[str], dict[str, Any]]:
|
|
"""Process a JSONL session file and return transcript lines + metadata.
|
|
|
|
Args:
|
|
jsonl_path: Path to the JSONL file
|
|
byte_offset: Start reading from this byte position (for incremental)
|
|
|
|
Returns:
|
|
Tuple of (transcript_lines, metadata_dict)
|
|
"""
|
|
transcript_lines: list[str] = []
|
|
metadata: dict[str, Any] = {
|
|
"first_date": None,
|
|
"last_date": None,
|
|
"message_count": 0,
|
|
"human_messages": 0,
|
|
"assistant_messages": 0,
|
|
"git_branch": None,
|
|
"new_byte_offset": 0,
|
|
}
|
|
|
|
# Map tool_use IDs to tool names for correlating results
|
|
tool_id_to_name: dict[str, str] = {}
|
|
|
|
# Track when a command/skill was just invoked so the next user message
|
|
# (the skill prompt injection) gets labeled correctly
|
|
last_command_name: str | None = None
|
|
|
|
with open(jsonl_path, "rb") as f:
|
|
if byte_offset > 0:
|
|
f.seek(byte_offset)
|
|
|
|
for raw_line in f:
|
|
try:
|
|
obj = json.loads(raw_line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
record_type = obj.get("type")
|
|
|
|
# Skip non-message types
|
|
if record_type not in ("user", "assistant"):
|
|
continue
|
|
|
|
msg = obj.get("message", {})
|
|
role = msg.get("role", record_type)
|
|
content = msg.get("content", "")
|
|
|
|
# Track metadata
|
|
date = extract_session_date(obj)
|
|
if metadata["first_date"] is None:
|
|
metadata["first_date"] = date
|
|
metadata["last_date"] = date
|
|
metadata["message_count"] += 1
|
|
|
|
if not metadata["git_branch"]:
|
|
metadata["git_branch"] = obj.get("gitBranch")
|
|
|
|
if role == "user":
|
|
metadata["human_messages"] += 1
|
|
elif role == "assistant":
|
|
metadata["assistant_messages"] += 1
|
|
|
|
# Process content
|
|
if isinstance(content, str):
|
|
text = content.strip()
|
|
# Skip system-reminder and local-command noise
|
|
if "<system-reminder>" in text:
|
|
text = re.sub(
|
|
r"<system-reminder>.*?</system-reminder>",
|
|
"",
|
|
text,
|
|
flags=re.DOTALL,
|
|
).strip()
|
|
if text.startswith("<local-command"):
|
|
continue
|
|
if text.startswith("<command-name>/exit"):
|
|
continue
|
|
|
|
# Detect command/skill invocation: <command-name>/foo</command-name>
|
|
cmd_match = re.search(
|
|
r"<command-name>/([^<]+)</command-name>", text,
|
|
)
|
|
if cmd_match:
|
|
last_command_name = cmd_match.group(1)
|
|
# Keep just a brief note about the command invocation
|
|
transcript_lines.append(
|
|
f"**Human**: /{last_command_name}"
|
|
)
|
|
transcript_lines.append("")
|
|
continue
|
|
|
|
# Detect skill prompt injection (large structured text after a command)
|
|
if (
|
|
last_command_name
|
|
and role == "user"
|
|
and len(text) > 500
|
|
):
|
|
# This is the skill's injected prompt — summarize it
|
|
transcript_lines.append(
|
|
f"[Skill prompt: /{last_command_name} — {len(text)} chars]"
|
|
)
|
|
transcript_lines.append("")
|
|
last_command_name = None
|
|
continue
|
|
|
|
# Also detect skill prompts by content pattern (catches cases
|
|
# where the command-name message wasn't separate, or where the
|
|
# prompt arrives without a preceding command-name tag)
|
|
if (
|
|
role == "user"
|
|
and len(text) > 500
|
|
and re.match(
|
|
r"^##\s*(Tracking|Step|Context|Instructions|Overview|Goal)",
|
|
text,
|
|
)
|
|
):
|
|
# Structured skill prompt — try to extract command name
|
|
cmd_in_text = re.search(
|
|
r'--command\s+"([^"]+)"', text,
|
|
)
|
|
prompt_label = cmd_in_text.group(1) if cmd_in_text else (last_command_name or "unknown")
|
|
transcript_lines.append(
|
|
f"[Skill prompt: /{prompt_label} — {len(text)} chars]"
|
|
)
|
|
transcript_lines.append("")
|
|
last_command_name = None
|
|
continue
|
|
|
|
last_command_name = None # Reset after non-matching message
|
|
|
|
if text:
|
|
label = "**Human**" if role == "user" else "**Assistant**"
|
|
transcript_lines.append(f"{label}: {text}")
|
|
transcript_lines.append("")
|
|
|
|
elif isinstance(content, list):
|
|
# Check if this is a skill prompt in list form
|
|
is_skill_prompt = False
|
|
skill_prompt_name = last_command_name
|
|
if role == "user":
|
|
for block in content:
|
|
if block.get("type") == "text":
|
|
block_text = block.get("text", "").strip()
|
|
# Detect by preceding command name
|
|
if last_command_name and len(block_text) > 500:
|
|
is_skill_prompt = True
|
|
break
|
|
# Detect by content pattern (## Tracking, etc.)
|
|
if (
|
|
len(block_text) > 500
|
|
and re.match(
|
|
r"^##\s*(Tracking|Step|Context|Instructions|Overview|Goal)",
|
|
block_text,
|
|
)
|
|
):
|
|
is_skill_prompt = True
|
|
# Try to extract command name from content
|
|
cmd_in_text = re.search(
|
|
r'--command\s+"([^"]+)"', block_text,
|
|
)
|
|
if cmd_in_text:
|
|
skill_prompt_name = cmd_in_text.group(1)
|
|
break
|
|
|
|
if is_skill_prompt:
|
|
total_len = sum(
|
|
len(b.get("text", ""))
|
|
for b in content
|
|
if b.get("type") == "text"
|
|
)
|
|
label = skill_prompt_name or "unknown"
|
|
transcript_lines.append(
|
|
f"[Skill prompt: /{label} — {total_len} chars]"
|
|
)
|
|
transcript_lines.append("")
|
|
last_command_name = None
|
|
continue
|
|
|
|
last_command_name = None
|
|
|
|
parts = parse_content_blocks(content, role, tool_id_to_name)
|
|
if parts:
|
|
# Determine if this is a tool result message (user role but
|
|
# contains only tool_result blocks — these are tool outputs,
|
|
# not human input)
|
|
has_only_tool_results = all(
|
|
b.get("type") in ("tool_result",)
|
|
for b in content
|
|
if b.get("type") != "text" or b.get("text", "").strip()
|
|
) and any(b.get("type") == "tool_result" for b in content)
|
|
|
|
if has_only_tool_results:
|
|
# Tool results — no speaker label, just the formatted output
|
|
for part in parts:
|
|
transcript_lines.append(part)
|
|
elif role == "user":
|
|
# Check if there's actual human text (not just tool results)
|
|
has_human_text = any(
|
|
b.get("type") == "text"
|
|
and b.get("text", "").strip()
|
|
and "<system-reminder>" not in b.get("text", "")
|
|
for b in content
|
|
)
|
|
label = "**Human**" if has_human_text else "**Assistant**"
|
|
if len(parts) == 1:
|
|
transcript_lines.append(f"{label}: {parts[0]}")
|
|
else:
|
|
transcript_lines.append(f"{label}:")
|
|
for part in parts:
|
|
transcript_lines.append(part)
|
|
else:
|
|
label = "**Assistant**"
|
|
if len(parts) == 1:
|
|
transcript_lines.append(f"{label}: {parts[0]}")
|
|
else:
|
|
transcript_lines.append(f"{label}:")
|
|
for part in parts:
|
|
transcript_lines.append(part)
|
|
transcript_lines.append("")
|
|
|
|
metadata["new_byte_offset"] = f.tell()
|
|
|
|
return transcript_lines, metadata
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_frontmatter(
|
|
session_id: str,
|
|
project: str,
|
|
date: str,
|
|
message_count: int,
|
|
git_branch: str | None = None,
|
|
) -> str:
|
|
"""Build YAML frontmatter for a conversation markdown file."""
|
|
lines = [
|
|
"---",
|
|
f"title: Session {session_id[:8]}",
|
|
"type: conversation",
|
|
f"project: {project}",
|
|
f"date: {date}",
|
|
f"session_id: {session_id}",
|
|
f"messages: {message_count}",
|
|
"status: extracted",
|
|
]
|
|
if git_branch:
|
|
lines.append(f"git_branch: {git_branch}")
|
|
lines.append("---")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def write_new_conversation(
|
|
output_path: Path,
|
|
session_id: str,
|
|
project: str,
|
|
transcript_lines: list[str],
|
|
metadata: dict[str, Any],
|
|
) -> None:
|
|
"""Write a new conversation markdown file."""
|
|
date = metadata["first_date"] or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
frontmatter = build_frontmatter(
|
|
session_id=session_id,
|
|
project=project,
|
|
date=date,
|
|
message_count=metadata["message_count"],
|
|
git_branch=metadata.get("git_branch"),
|
|
)
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_path, "w") as f:
|
|
f.write(frontmatter)
|
|
f.write("\n\n## Transcript\n\n")
|
|
f.write("\n".join(transcript_lines))
|
|
f.write("\n")
|
|
|
|
|
|
def append_to_conversation(
|
|
output_path: Path,
|
|
transcript_lines: list[str],
|
|
new_message_count: int,
|
|
) -> None:
|
|
"""Append new transcript content to an existing conversation file.
|
|
|
|
Updates the message count in frontmatter and appends new transcript lines.
|
|
"""
|
|
content = output_path.read_text()
|
|
|
|
# Update message count in frontmatter
|
|
content = re.sub(
|
|
r"^messages: \d+$",
|
|
f"messages: {new_message_count}",
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE,
|
|
)
|
|
|
|
# Add last_updated
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
if "last_updated:" in content:
|
|
content = re.sub(
|
|
r"^last_updated: .+$",
|
|
f"last_updated: {today}",
|
|
content,
|
|
count=1,
|
|
flags=re.MULTILINE,
|
|
)
|
|
else:
|
|
content = content.replace(
|
|
"\nstatus: extracted",
|
|
f"\nlast_updated: {today}\nstatus: extracted",
|
|
)
|
|
|
|
# Append new transcript
|
|
with open(output_path, "w") as f:
|
|
f.write(content)
|
|
if not content.endswith("\n"):
|
|
f.write("\n")
|
|
f.write("\n".join(transcript_lines))
|
|
f.write("\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main extraction logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def extract_session(
|
|
session_info: dict[str, Any],
|
|
state: dict[str, Any],
|
|
dry_run: bool = False,
|
|
) -> bool:
|
|
"""Extract a single session. Returns True if work was done."""
|
|
session_id = session_info["session_id"]
|
|
project = session_info["project"]
|
|
jsonl_path = session_info["jsonl_path"]
|
|
file_size = session_info["file_size"]
|
|
|
|
# Check state for prior extraction
|
|
session_state = state["sessions"].get(session_id, {})
|
|
last_offset = session_state.get("byte_offset", 0)
|
|
|
|
# Skip if no new content
|
|
if file_size <= last_offset:
|
|
return False
|
|
|
|
is_incremental = last_offset > 0
|
|
|
|
if dry_run:
|
|
mode = "append" if is_incremental else "new"
|
|
new_bytes = file_size - last_offset
|
|
print(f" [{mode}] {project}/{session_id[:8]} — {new_bytes:,} new bytes")
|
|
return True
|
|
|
|
# Parse the JSONL
|
|
transcript_lines, metadata = process_jsonl(jsonl_path, byte_offset=last_offset)
|
|
|
|
if not transcript_lines:
|
|
# Update offset even if no extractable content
|
|
state["sessions"][session_id] = {
|
|
"project": project,
|
|
"byte_offset": metadata["new_byte_offset"],
|
|
"message_count": session_state.get("message_count", 0),
|
|
"last_extracted": datetime.now(timezone.utc).isoformat(),
|
|
"summarized_through_msg": session_state.get("summarized_through_msg", 0),
|
|
}
|
|
return False
|
|
|
|
# Determine output path
|
|
date = metadata["first_date"] or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
if is_incremental:
|
|
# Use existing output file
|
|
output_file = session_state.get("output_file", "")
|
|
output_path = WIKI_DIR / output_file if output_file else None
|
|
else:
|
|
output_path = None
|
|
|
|
if output_path is None or not output_path.exists():
|
|
filename = f"{date}-{session_id[:8]}.md"
|
|
output_path = CONVERSATIONS_DIR / project / filename
|
|
|
|
# Write or append
|
|
total_messages = session_state.get("message_count", 0) + metadata["message_count"]
|
|
|
|
if is_incremental and output_path.exists():
|
|
append_to_conversation(output_path, transcript_lines, total_messages)
|
|
print(f" [append] {project}/{output_path.name} — +{metadata['message_count']} messages")
|
|
else:
|
|
write_new_conversation(output_path, session_id, project, transcript_lines, metadata)
|
|
print(f" [new] {project}/{output_path.name} — {metadata['message_count']} messages")
|
|
|
|
# Update state
|
|
state["sessions"][session_id] = {
|
|
"project": project,
|
|
"output_file": str(output_path.relative_to(WIKI_DIR)),
|
|
"byte_offset": metadata["new_byte_offset"],
|
|
"message_count": total_messages,
|
|
"last_extracted": datetime.now(timezone.utc).isoformat(),
|
|
"summarized_through_msg": session_state.get("summarized_through_msg", 0),
|
|
}
|
|
|
|
return True
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Extract Claude Code sessions into markdown transcripts",
|
|
)
|
|
parser.add_argument(
|
|
"--project",
|
|
help="Only extract sessions for this project code (e.g., mc, if, lp)",
|
|
)
|
|
parser.add_argument(
|
|
"--session",
|
|
help="Only extract this specific session (prefix match on session ID)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be extracted without writing files",
|
|
)
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Re-extract from the beginning, ignoring saved byte offsets",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
state = load_state()
|
|
|
|
if args.force:
|
|
# Reset all byte offsets
|
|
for sid in state["sessions"]:
|
|
state["sessions"][sid]["byte_offset"] = 0
|
|
|
|
# Discover sessions
|
|
sessions = discover_sessions(
|
|
project_filter=args.project,
|
|
session_filter=args.session,
|
|
)
|
|
|
|
if not sessions:
|
|
print("No sessions found matching filters.")
|
|
return
|
|
|
|
print(f"Found {len(sessions)} session(s) to check...")
|
|
if args.dry_run:
|
|
print("DRY RUN — no files will be written\n")
|
|
|
|
extracted = 0
|
|
for session_info in sessions:
|
|
if extract_session(session_info, state, dry_run=args.dry_run):
|
|
extracted += 1
|
|
|
|
if extracted == 0:
|
|
print("No new content to extract.")
|
|
else:
|
|
print(f"\nExtracted {extracted} session(s).")
|
|
|
|
if not args.dry_run:
|
|
save_state(state)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|