#!/usr/bin/env python3 """Extract Claude Code session JSONL files into clean markdown transcripts. Phase A of the conversation mining pipeline. Deterministic, no LLM dependency. Handles incremental extraction via byte offset tracking for sessions that span hours or days. Usage: python3 extract-sessions.py # Extract all new sessions python3 extract-sessions.py --project mc # Extract one project python3 extract-sessions.py --session 0a543572 # Extract specific session python3 extract-sessions.py --dry-run # Show what would be extracted """ from __future__ import annotations import argparse import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CLAUDE_PROJECTS_DIR = Path(os.environ.get("CLAUDE_PROJECTS_DIR", str(Path.home() / ".claude" / "projects"))) WIKI_DIR = Path(os.environ.get("WIKI_DIR", str(Path.home() / "projects" / "wiki"))) CONVERSATIONS_DIR = WIKI_DIR / "conversations" MINE_STATE_FILE = WIKI_DIR / ".mine-state.json" # ════════════════════════════════════════════════════════════════════════════ # CONFIGURE ME — Map Claude project directory suffixes to wiki project codes # ════════════════════════════════════════════════════════════════════════════ # # Claude Code stores sessions under ~/.claude/projects//. The # directory name is derived from the absolute path of your project, so it # looks like `-Users-alice-projects-myapp` or `-home-alice-projects-myapp`. # # This map tells the extractor which suffix maps to which short wiki code # (the "wing"). More specific suffixes should appear first — the extractor # picks the first match. Everything unmatched goes into `general/`. # # Examples — replace with your own projects: PROJECT_MAP: dict[str, str] = { # More specific suffixes first "projects-wiki": "wiki", # this wiki itself "-claude": "cl", # ~/.claude config repo # Add your real projects here: # "my-webapp": "web", # "my-mobile-app": "mob", # "work-mono-repo": "work", # Catch-all — Claude sessions outside any tracked project "-home": "general", "-Users": "general", } # Tool call names to keep full output for KEEP_FULL_OUTPUT_TOOLS = {"Bash", "Skill"} # Tool call names to summarize (just note what was accessed) SUMMARIZE_TOOLS = {"Read", "Glob", "Grep"} # Tool call names to keep with path + change summary KEEP_CHANGE_TOOLS = {"Edit", "Write"} # Tool call names to keep description + result summary KEEP_SUMMARY_TOOLS = {"Agent"} # Max lines of Bash output to keep MAX_BASH_OUTPUT_LINES = 200 # --------------------------------------------------------------------------- # State management # --------------------------------------------------------------------------- def load_state() -> dict[str, Any]: """Load mining state from .mine-state.json.""" if MINE_STATE_FILE.exists(): with open(MINE_STATE_FILE) as f: return json.load(f) return {"sessions": {}, "last_run": None} def save_state(state: dict[str, Any]) -> None: """Save mining state to .mine-state.json.""" state["last_run"] = datetime.now(timezone.utc).isoformat() with open(MINE_STATE_FILE, "w") as f: json.dump(state, f, indent=2) # --------------------------------------------------------------------------- # Project mapping # --------------------------------------------------------------------------- def resolve_project_code(dir_name: str) -> str | None: """Map a Claude project directory name to a wiki project code. Directory names look like: -Users-alice-projects-myapp or -home-alice-projects-myapp """ for suffix, code in PROJECT_MAP.items(): if dir_name.endswith(suffix): return code return None def discover_sessions( project_filter: str | None = None, session_filter: str | None = None, ) -> list[dict[str, Any]]: """Discover JSONL session files from Claude projects directory.""" sessions = [] if not CLAUDE_PROJECTS_DIR.exists(): print(f"Claude projects directory not found: {CLAUDE_PROJECTS_DIR}", file=sys.stderr) return sessions for proj_dir in sorted(CLAUDE_PROJECTS_DIR.iterdir()): if not proj_dir.is_dir(): continue code = resolve_project_code(proj_dir.name) if code is None: continue if project_filter and code != project_filter: continue for jsonl_file in sorted(proj_dir.glob("*.jsonl")): session_id = jsonl_file.stem if session_filter and not session_id.startswith(session_filter): continue sessions.append({ "session_id": session_id, "project": code, "jsonl_path": jsonl_file, "file_size": jsonl_file.stat().st_size, }) return sessions # --------------------------------------------------------------------------- # JSONL parsing and filtering # --------------------------------------------------------------------------- def extract_timestamp(obj: dict[str, Any]) -> str | None: """Get timestamp from a JSONL record.""" ts = obj.get("timestamp") if isinstance(ts, str): return ts if isinstance(ts, (int, float)): return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).isoformat() return None def extract_session_date(obj: dict[str, Any]) -> str: """Get date string (YYYY-MM-DD) from a JSONL record timestamp.""" ts = extract_timestamp(obj) if ts: try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.strftime("%Y-%m-%d") except (ValueError, TypeError): pass return datetime.now(timezone.utc).strftime("%Y-%m-%d") def truncate_lines(text: str, max_lines: int) -> str: """Truncate text to max_lines, adding a note if truncated.""" lines = text.splitlines() if len(lines) <= max_lines: return text kept = lines[:max_lines] omitted = len(lines) - max_lines kept.append(f"\n[... {omitted} lines truncated ...]") return "\n".join(kept) def format_tool_use(name: str, input_data: dict[str, Any]) -> str | None: """Format a tool_use content block for the transcript.""" if name in KEEP_FULL_OUTPUT_TOOLS: if name == "Bash": cmd = input_data.get("command", "") desc = input_data.get("description", "") label = desc if desc else cmd[:100] return f"**[Bash]**: `{label}`" if name == "Skill": skill = input_data.get("skill", "") args = input_data.get("args", "") return f"**[Skill]**: /{skill} {args}".strip() if name in SUMMARIZE_TOOLS: if name == "Read": fp = input_data.get("file_path", "?") return f"[Read: {fp}]" if name == "Glob": pattern = input_data.get("pattern", "?") return f"[Glob: {pattern}]" if name == "Grep": pattern = input_data.get("pattern", "?") path = input_data.get("path", "") return f"[Grep: '{pattern}' in {path}]" if path else f"[Grep: '{pattern}']" if name in KEEP_CHANGE_TOOLS: if name == "Edit": fp = input_data.get("file_path", "?") old = input_data.get("old_string", "")[:60] return f"**[Edit]**: {fp} — replaced '{old}...'" if name == "Write": fp = input_data.get("file_path", "?") content_len = len(input_data.get("content", "")) return f"**[Write]**: {fp} ({content_len} chars)" if name in KEEP_SUMMARY_TOOLS: if name == "Agent": desc = input_data.get("description", "?") return f"**[Agent]**: {desc}" if name == "ToolSearch": return None # noise if name == "TaskCreate": subj = input_data.get("subject", "?") return f"[TaskCreate: {subj}]" if name == "TaskUpdate": tid = input_data.get("taskId", "?") status = input_data.get("status", "?") return f"[TaskUpdate: #{tid} → {status}]" # Default: note the tool was called return f"[{name}]" def format_tool_result( tool_name: str | None, content: Any, is_error: bool = False, ) -> str | None: """Format a tool_result content block for the transcript.""" text = "" if isinstance(content, str): text = content elif isinstance(content, list): parts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": parts.append(item.get("text", "")) text = "\n".join(parts) if not text.strip(): return None if is_error: return f"**[ERROR]**:\n```\n{truncate_lines(text, MAX_BASH_OUTPUT_LINES)}\n```" if tool_name in KEEP_FULL_OUTPUT_TOOLS: return f"```\n{truncate_lines(text, MAX_BASH_OUTPUT_LINES)}\n```" if tool_name in SUMMARIZE_TOOLS: # Just note the result size line_count = len(text.splitlines()) char_count = len(text) return f"[→ {line_count} lines, {char_count} chars]" if tool_name in KEEP_CHANGE_TOOLS: return None # The tool_use already captured what changed if tool_name in KEEP_SUMMARY_TOOLS: # Keep a summary of agent results summary = text[:300] if len(text) > 300: summary += "..." return f"> {summary}" return None def parse_content_blocks( content: list[dict[str, Any]], role: str, tool_id_to_name: dict[str, str], ) -> list[str]: """Parse content blocks from a message into transcript lines.""" parts: list[str] = [] for block in content: block_type = block.get("type") if block_type == "text": text = block.get("text", "").strip() if not text: continue # Skip system-reminder content if "" in text: # Strip system reminder tags and their content text = re.sub( r".*?", "", text, flags=re.DOTALL, ).strip() # Skip local-command noise if text.startswith(" tuple[list[str], dict[str, Any]]: """Process a JSONL session file and return transcript lines + metadata. Args: jsonl_path: Path to the JSONL file byte_offset: Start reading from this byte position (for incremental) Returns: Tuple of (transcript_lines, metadata_dict) """ transcript_lines: list[str] = [] metadata: dict[str, Any] = { "first_date": None, "last_date": None, "message_count": 0, "human_messages": 0, "assistant_messages": 0, "git_branch": None, "new_byte_offset": 0, } # Map tool_use IDs to tool names for correlating results tool_id_to_name: dict[str, str] = {} # Track when a command/skill was just invoked so the next user message # (the skill prompt injection) gets labeled correctly last_command_name: str | None = None with open(jsonl_path, "rb") as f: if byte_offset > 0: f.seek(byte_offset) for raw_line in f: try: obj = json.loads(raw_line) except json.JSONDecodeError: continue record_type = obj.get("type") # Skip non-message types if record_type not in ("user", "assistant"): continue msg = obj.get("message", {}) role = msg.get("role", record_type) content = msg.get("content", "") # Track metadata date = extract_session_date(obj) if metadata["first_date"] is None: metadata["first_date"] = date metadata["last_date"] = date metadata["message_count"] += 1 if not metadata["git_branch"]: metadata["git_branch"] = obj.get("gitBranch") if role == "user": metadata["human_messages"] += 1 elif role == "assistant": metadata["assistant_messages"] += 1 # Process content if isinstance(content, str): text = content.strip() # Skip system-reminder and local-command noise if "" in text: text = re.sub( r".*?", "", text, flags=re.DOTALL, ).strip() if text.startswith("/exit"): continue # Detect command/skill invocation: /foo cmd_match = re.search( r"/([^<]+)", text, ) if cmd_match: last_command_name = cmd_match.group(1) # Keep just a brief note about the command invocation transcript_lines.append( f"**Human**: /{last_command_name}" ) transcript_lines.append("") continue # Detect skill prompt injection (large structured text after a command) if ( last_command_name and role == "user" and len(text) > 500 ): # This is the skill's injected prompt — summarize it transcript_lines.append( f"[Skill prompt: /{last_command_name} — {len(text)} chars]" ) transcript_lines.append("") last_command_name = None continue # Also detect skill prompts by content pattern (catches cases # where the command-name message wasn't separate, or where the # prompt arrives without a preceding command-name tag) if ( role == "user" and len(text) > 500 and re.match( r"^##\s*(Tracking|Step|Context|Instructions|Overview|Goal)", text, ) ): # Structured skill prompt — try to extract command name cmd_in_text = re.search( r'--command\s+"([^"]+)"', text, ) prompt_label = cmd_in_text.group(1) if cmd_in_text else (last_command_name or "unknown") transcript_lines.append( f"[Skill prompt: /{prompt_label} — {len(text)} chars]" ) transcript_lines.append("") last_command_name = None continue last_command_name = None # Reset after non-matching message if text: label = "**Human**" if role == "user" else "**Assistant**" transcript_lines.append(f"{label}: {text}") transcript_lines.append("") elif isinstance(content, list): # Check if this is a skill prompt in list form is_skill_prompt = False skill_prompt_name = last_command_name if role == "user": for block in content: if block.get("type") == "text": block_text = block.get("text", "").strip() # Detect by preceding command name if last_command_name and len(block_text) > 500: is_skill_prompt = True break # Detect by content pattern (## Tracking, etc.) if ( len(block_text) > 500 and re.match( r"^##\s*(Tracking|Step|Context|Instructions|Overview|Goal)", block_text, ) ): is_skill_prompt = True # Try to extract command name from content cmd_in_text = re.search( r'--command\s+"([^"]+)"', block_text, ) if cmd_in_text: skill_prompt_name = cmd_in_text.group(1) break if is_skill_prompt: total_len = sum( len(b.get("text", "")) for b in content if b.get("type") == "text" ) label = skill_prompt_name or "unknown" transcript_lines.append( f"[Skill prompt: /{label} — {total_len} chars]" ) transcript_lines.append("") last_command_name = None continue last_command_name = None parts = parse_content_blocks(content, role, tool_id_to_name) if parts: # Determine if this is a tool result message (user role but # contains only tool_result blocks — these are tool outputs, # not human input) has_only_tool_results = all( b.get("type") in ("tool_result",) for b in content if b.get("type") != "text" or b.get("text", "").strip() ) and any(b.get("type") == "tool_result" for b in content) if has_only_tool_results: # Tool results — no speaker label, just the formatted output for part in parts: transcript_lines.append(part) elif role == "user": # Check if there's actual human text (not just tool results) has_human_text = any( b.get("type") == "text" and b.get("text", "").strip() and "" not in b.get("text", "") for b in content ) label = "**Human**" if has_human_text else "**Assistant**" if len(parts) == 1: transcript_lines.append(f"{label}: {parts[0]}") else: transcript_lines.append(f"{label}:") for part in parts: transcript_lines.append(part) else: label = "**Assistant**" if len(parts) == 1: transcript_lines.append(f"{label}: {parts[0]}") else: transcript_lines.append(f"{label}:") for part in parts: transcript_lines.append(part) transcript_lines.append("") metadata["new_byte_offset"] = f.tell() return transcript_lines, metadata # --------------------------------------------------------------------------- # Markdown generation # --------------------------------------------------------------------------- def build_frontmatter( session_id: str, project: str, date: str, message_count: int, git_branch: str | None = None, ) -> str: """Build YAML frontmatter for a conversation markdown file.""" lines = [ "---", f"title: Session {session_id[:8]}", "type: conversation", f"project: {project}", f"date: {date}", f"session_id: {session_id}", f"messages: {message_count}", "status: extracted", ] if git_branch: lines.append(f"git_branch: {git_branch}") lines.append("---") return "\n".join(lines) def write_new_conversation( output_path: Path, session_id: str, project: str, transcript_lines: list[str], metadata: dict[str, Any], ) -> None: """Write a new conversation markdown file.""" date = metadata["first_date"] or datetime.now(timezone.utc).strftime("%Y-%m-%d") frontmatter = build_frontmatter( session_id=session_id, project=project, date=date, message_count=metadata["message_count"], git_branch=metadata.get("git_branch"), ) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: f.write(frontmatter) f.write("\n\n## Transcript\n\n") f.write("\n".join(transcript_lines)) f.write("\n") def append_to_conversation( output_path: Path, transcript_lines: list[str], new_message_count: int, ) -> None: """Append new transcript content to an existing conversation file. Updates the message count in frontmatter and appends new transcript lines. """ content = output_path.read_text() # Update message count in frontmatter content = re.sub( r"^messages: \d+$", f"messages: {new_message_count}", content, count=1, flags=re.MULTILINE, ) # Add last_updated today = datetime.now(timezone.utc).strftime("%Y-%m-%d") if "last_updated:" in content: content = re.sub( r"^last_updated: .+$", f"last_updated: {today}", content, count=1, flags=re.MULTILINE, ) else: content = content.replace( "\nstatus: extracted", f"\nlast_updated: {today}\nstatus: extracted", ) # Append new transcript with open(output_path, "w") as f: f.write(content) if not content.endswith("\n"): f.write("\n") f.write("\n".join(transcript_lines)) f.write("\n") # --------------------------------------------------------------------------- # Main extraction logic # --------------------------------------------------------------------------- def extract_session( session_info: dict[str, Any], state: dict[str, Any], dry_run: bool = False, ) -> bool: """Extract a single session. Returns True if work was done.""" session_id = session_info["session_id"] project = session_info["project"] jsonl_path = session_info["jsonl_path"] file_size = session_info["file_size"] # Check state for prior extraction session_state = state["sessions"].get(session_id, {}) last_offset = session_state.get("byte_offset", 0) # Skip if no new content if file_size <= last_offset: return False is_incremental = last_offset > 0 if dry_run: mode = "append" if is_incremental else "new" new_bytes = file_size - last_offset print(f" [{mode}] {project}/{session_id[:8]} — {new_bytes:,} new bytes") return True # Parse the JSONL transcript_lines, metadata = process_jsonl(jsonl_path, byte_offset=last_offset) if not transcript_lines: # Update offset even if no extractable content state["sessions"][session_id] = { "project": project, "byte_offset": metadata["new_byte_offset"], "message_count": session_state.get("message_count", 0), "last_extracted": datetime.now(timezone.utc).isoformat(), "summarized_through_msg": session_state.get("summarized_through_msg", 0), } return False # Determine output path date = metadata["first_date"] or datetime.now(timezone.utc).strftime("%Y-%m-%d") if is_incremental: # Use existing output file output_file = session_state.get("output_file", "") output_path = WIKI_DIR / output_file if output_file else None else: output_path = None if output_path is None or not output_path.exists(): filename = f"{date}-{session_id[:8]}.md" output_path = CONVERSATIONS_DIR / project / filename # Write or append total_messages = session_state.get("message_count", 0) + metadata["message_count"] if is_incremental and output_path.exists(): append_to_conversation(output_path, transcript_lines, total_messages) print(f" [append] {project}/{output_path.name} — +{metadata['message_count']} messages") else: write_new_conversation(output_path, session_id, project, transcript_lines, metadata) print(f" [new] {project}/{output_path.name} — {metadata['message_count']} messages") # Update state state["sessions"][session_id] = { "project": project, "output_file": str(output_path.relative_to(WIKI_DIR)), "byte_offset": metadata["new_byte_offset"], "message_count": total_messages, "last_extracted": datetime.now(timezone.utc).isoformat(), "summarized_through_msg": session_state.get("summarized_through_msg", 0), } return True def main() -> None: parser = argparse.ArgumentParser( description="Extract Claude Code sessions into markdown transcripts", ) parser.add_argument( "--project", help="Only extract sessions for this project code (e.g., mc, if, lp)", ) parser.add_argument( "--session", help="Only extract this specific session (prefix match on session ID)", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be extracted without writing files", ) parser.add_argument( "--force", action="store_true", help="Re-extract from the beginning, ignoring saved byte offsets", ) args = parser.parse_args() state = load_state() if args.force: # Reset all byte offsets for sid in state["sessions"]: state["sessions"][sid]["byte_offset"] = 0 # Discover sessions sessions = discover_sessions( project_filter=args.project, session_filter=args.session, ) if not sessions: print("No sessions found matching filters.") return print(f"Found {len(sessions)} session(s) to check...") if args.dry_run: print("DRY RUN — no files will be written\n") extracted = 0 for session_info in sessions: if extract_session(session_info, state, dry_run=args.dry_run): extracted += 1 if extracted == 0: print("No new content to extract.") else: print(f"\nExtracted {extracted} session(s).") if not args.dry_run: save_state(state) if __name__ == "__main__": main()