#!/usr/bin/env python3 """ scripts/memory/embed-builtin-knowledge.py — Generate built-in knowledge bootstrap artifacts. Discovers all project docs and skill definitions, chunks by markdown headings, generates embeddings via OpenRouter (BAAI/bge-m3), and outputs a complete artifact.sql ready for import into the skills database. Usage: OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py --dry-run OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py \\ --output-sql bootstrap/skills-memory/artifact.sql \\ --output-metadata bootstrap/skills-memory/metadata.json Output: stdout by default — complete artifact.sql with all INSERT statements --output-sql / --output-metadata for maintainer artifact refresh Progress (stderr): Discovery and embedding progress. """ import argparse import hashlib import io import json import os import re import subprocess import sys import time import urllib.request from datetime import datetime, timezone from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent # ── Config ──────────────────────────────────────────────────────────────────── OPENROUTER_URL = "https://openrouter.ai/api/v1/embeddings" MODEL = "BAAI/bge-m3" DIMENSIONS = 1024 CHUNK_SIZE = 900 ARTIFACT_VERSION = "v1.0.0-complete" SCHEMA_VERSION = "builtin-knowledge-base-v1" SOURCE_SNAPSHOT = "Full project docs, internal docs, identity files, and skill definitions" CHUNKER_NAME = "heading-split" CHUNKER_VERSION = "v2" NOTES = ( "Refreshable built-in knowledge artifact including all public docs, " "internal docs, identity files, and skill definitions. Pre-generated " "embeddings via OpenRouter baai/bge-m3 for hybrid search." ) # Files to skip — outdated, test, or meta-only content SKIP_FILES = { 'nanoclaw-architecture-final.md', 'WIZARD-SIMULATION-TEST.md', 'DOCUMENTATION-SYNC-RUNBOOK.md', } # Doc patterns: (glob relative to PROJECT_ROOT, source_type, importance) DOC_PATTERNS = [ ('docs/public/**/*.md', 'doc'), ('docs/internal/**/*.md', 'doc'), ('SOUL.md', 'identity'), ('IDENTITY.md', 'identity'), ('USER.md', 'identity'), ('AGENTS.md', 'identity'), ('MEMORY.md', 'identity'), ('CLAWDIE-ISO.md', 'identity'), ('.agent/skills/*/SKILL.md', 'skill'), ] SKIP_GLOBS = ( 'docs/internal/sessions/', 'docs/internal/BUILD-TEST-REPORT-*.md', 'docs/internal/test-results.md', ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate built-in knowledge SQL + metadata artifacts." ) parser.add_argument( "--dry-run", action="store_true", help="Discover and chunk sources without generating embeddings or writing files.", ) parser.add_argument( "--output-sql", help="Write artifact SQL to this path instead of stdout.", ) parser.add_argument( "--output-metadata", help="Write metadata JSON to this path.", ) parser.add_argument( "--artifact-version", default=ARTIFACT_VERSION, help=f"Artifact version label (default: {ARTIFACT_VERSION}).", ) return parser.parse_args() def atomic_write(target: str, content: str) -> None: target_path = Path(target) target_path.parent.mkdir(parents=True, exist_ok=True) tmp_dir = PROJECT_ROOT / "tmp" tmp_dir.mkdir(parents=True, exist_ok=True) tmp_path = tmp_dir / f"{target_path.name}.{os.getpid()}.tmp" tmp_path.write_text(content, encoding="utf-8") os.replace(tmp_path, target_path) # ── Chunking ────────────────────────────────────────────────────────────────── def chunk_markdown(text: str) -> list[str]: """Split markdown by headings, then further split long sections. Max CHUNK_SIZE chars.""" sections = re.split(r'(?=\n#{1,3} )', text) chunks: list[str] = [] for section in sections: section = section.strip() if not section: continue if len(section) <= CHUNK_SIZE: chunks.append(section) else: paragraphs = re.split(r'\n\n+', section) current = '' for para in paragraphs: para = para.strip() if not para: continue if len(current) + len(para) + 2 <= CHUNK_SIZE: current = f'{current}\n\n{para}'.strip() if current else para else: if current: chunks.append(current) while len(para) > CHUNK_SIZE: chunks.append(para[:CHUNK_SIZE]) para = para[CHUNK_SIZE:] current = para if current: chunks.append(current) return chunks or [text[:CHUNK_SIZE]] # ── File discovery ──────────────────────────────────────────────────────────── def collect_files() -> list[tuple[Path, str]]: """Collect all embeddable files with their source_type.""" files: list[tuple[Path, str]] = [] seen: set[Path] = set() for pattern, source_type in DOC_PATTERNS: for path in sorted(PROJECT_ROOT.glob(pattern)): if path.name in SKIP_FILES: continue rel = path.relative_to(PROJECT_ROOT) rel_posix = rel.as_posix() if rel_posix.startswith('docs/internal/sessions/'): continue if any(rel.match(skip_glob) for skip_glob in SKIP_GLOBS[1:]): continue if path in seen: continue seen.add(path) files.append((path, source_type)) return files def derive_title(path: Path, text: str) -> str: """Extract title from first heading or filename.""" for line in text.splitlines()[:10]: line = line.strip() if line.startswith('# '): return line.lstrip('#').strip()[:120] return path.stem.replace('-', ' ').replace('_', ' ').title()[:120] # ── Embeddings ──────────────────────────────────────────────────────────────── def embed_batch(texts: list[str]) -> list[list[float]]: """Embed a batch of texts via OpenRouter.""" api_key = os.environ.get("OPENROUTER_API_KEY", "") if not api_key: print("ERROR: OPENROUTER_API_KEY not set", file=sys.stderr) sys.exit(1) body = {"input": texts, "model": MODEL} payload = json.dumps(body).encode() headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", } req = urllib.request.Request(OPENROUTER_URL, data=payload, headers=headers) with urllib.request.urlopen(req, timeout=120) as resp: data = json.load(resp) return [d["embedding"] for d in sorted(data["data"], key=lambda d: d["index"])] # ── SQL helpers ─────────────────────────────────────────────────────────────── def sql_escape(text: str) -> str: """Escape text for SQL using dollar-quoting where possible.""" # Use $$ quoting for chunk text (may contain single quotes) return text def format_vector(vec: list[float]) -> str: return "[" + ",".join(f"{v:.8f}" for v in vec) + "]" # ── Main ────────────────────────────────────────────────────────────────────── def main(): args = parse_args() dry_run = args.dry_run files = collect_files() print(f"-- Discovered {len(files)} source files", file=sys.stderr) if not files: print("ERROR: No files found", file=sys.stderr) sys.exit(1) # Phase 1: Chunk all files all_docs: list[dict] = [] total_chunks = 0 for path, source_type in files: rel = str(path.relative_to(PROJECT_ROOT)) text = path.read_text(encoding='utf-8', errors='replace').strip() if not text: print(f" skip (empty): {rel}", file=sys.stderr) continue chunks = chunk_markdown(text) title = derive_title(path, text) checksum = hashlib.sha256(text.encode()).hexdigest()[:16] all_docs.append({ 'source_path': rel, 'source_type': source_type, 'title': title, 'checksum': checksum, 'chunks': chunks, }) total_chunks += len(chunks) print(f" {rel}: {len(chunks)} chunks", file=sys.stderr) print(f"-- Total: {len(all_docs)} docs, {total_chunks} chunks", file=sys.stderr) if dry_run: print( f"\nDRY RUN — would embed {total_chunks} chunks from {len(all_docs)} docs", file=sys.stderr, ) sys.exit(0) # Phase 2: Generate embeddings (batch per file, with rate limiting) print(f"-- Generating embeddings via {MODEL}...", file=sys.stderr) for i, doc in enumerate(all_docs): chunk_texts = doc['chunks'] try: vectors = embed_batch(chunk_texts) doc['vectors'] = vectors print(f" [{i+1}/{len(all_docs)}] {doc['source_path']}: {len(vectors)} embeddings", file=sys.stderr) except Exception as e: print(f" ERROR embedding {doc['source_path']}: {e}", file=sys.stderr) sys.exit(1) # Rate limit: ~2 requests/sec to avoid 429s if i < len(all_docs) - 1: time.sleep(0.5) # Phase 3: Output artifact.sql git_commit = subprocess.run( ['git', 'rev-parse', '--short', 'HEAD'], capture_output=True, text=True, cwd=PROJECT_ROOT, ).stdout.strip() or None now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S+00') embedding_count = sum(len(d.get('vectors', [])) for d in all_docs) artifact_version = args.artifact_version sql_out = io.StringIO() def out(line: str = "") -> None: print(line, file=sql_out) # Artifact metadata metadata_json = json.dumps( { "notes": NOTES, "search_mode": "hybrid", "embedding_provider": "openrouter", "embedding_model": MODEL.lower(), "embedding_dimensions": DIMENSIONS, }, indent=2, ) metadata_contract = { "artifact_version": artifact_version, "schema_version": SCHEMA_VERSION, "source_snapshot": SOURCE_SNAPSHOT, "chunker_name": CHUNKER_NAME, "chunker_version": CHUNKER_VERSION, "chunk_size": CHUNK_SIZE, "chunk_overlap": 0, "embedding_provider": "openrouter", "embedding_model": MODEL.lower(), "embedding_dimensions": DIMENSIONS, "generated_at": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"), "document_count": len(all_docs), "chunk_count": total_chunks, "embedding_count": embedding_count, "git_commit": git_commit, "notes": NOTES, } out("-- Built-in knowledge bootstrap artifact") out("-- Imported into the agent-system skills database.") out() out("BEGIN;") out() out(f"""INSERT INTO builtin_knowledge_artifacts ( artifact_version, schema_version, source_snapshot, chunker_name, chunker_version, chunk_size, chunk_overlap, embedding_provider, embedding_model, embedding_dimensions, generated_at, document_count, chunk_count, embedding_count, git_commit, metadata ) VALUES ( '{artifact_version}', '{SCHEMA_VERSION}', '{SOURCE_SNAPSHOT}', '{CHUNKER_NAME}', '{CHUNKER_VERSION}', {CHUNK_SIZE}, 0, 'openrouter', '{MODEL.lower()}', {DIMENSIONS}, TIMESTAMPTZ '{now}', {len(all_docs)}, {total_chunks}, {embedding_count}, {f"'{git_commit}'" if git_commit else 'NULL'}, '{metadata_json}'::jsonb ) ON CONFLICT (artifact_version) DO UPDATE SET schema_version = EXCLUDED.schema_version, source_snapshot = EXCLUDED.source_snapshot, chunker_name = EXCLUDED.chunker_name, chunker_version = EXCLUDED.chunker_version, chunk_size = EXCLUDED.chunk_size, chunk_overlap = EXCLUDED.chunk_overlap, embedding_provider = EXCLUDED.embedding_provider, embedding_model = EXCLUDED.embedding_model, embedding_dimensions = EXCLUDED.embedding_dimensions, generated_at = EXCLUDED.generated_at, document_count = EXCLUDED.document_count, chunk_count = EXCLUDED.chunk_count, embedding_count = EXCLUDED.embedding_count, git_commit = EXCLUDED.git_commit, metadata = EXCLUDED.metadata;""") out() # Documents out(f"-- {len(all_docs)} documents") doc_values = [] for doc in all_docs: sp = doc['source_path'].replace("'", "''") st = doc['source_type'] title = doc['title'].replace("'", "''") cs = doc['checksum'] doc_values.append( f" ('{sp}', '{st}', '{title}', 'en', '{cs}', '{{}}'::jsonb)" ) out(f"""WITH artifact AS ( SELECT id FROM builtin_knowledge_artifacts WHERE artifact_version = '{artifact_version}' ), docs(source_path, source_type, title, locale, checksum, metadata_json) AS ( VALUES {',\n'.join(doc_values)} ) INSERT INTO builtin_knowledge_documents ( artifact_id, source_path, source_type, title, locale, checksum, metadata ) SELECT artifact.id, docs.source_path, docs.source_type, docs.title, docs.locale, docs.checksum, docs.metadata_json FROM artifact CROSS JOIN docs ON CONFLICT (artifact_id, source_path) DO UPDATE SET source_type = EXCLUDED.source_type, title = EXCLUDED.title, locale = EXCLUDED.locale, checksum = EXCLUDED.checksum, metadata = EXCLUDED.metadata;""") out() # Chunks out(f"-- {total_chunks} chunks") chunk_values = [] for doc in all_docs: sp = doc['source_path'].replace("'", "''") for order, chunk_text in enumerate(doc['chunks']): content_hash = hashlib.sha256(chunk_text.encode()).hexdigest()[:32] # Use dollar-quoting to avoid escaping issues chunk_values.append( f" ('{sp}', {order}, $chunk${chunk_text}$chunk$, '{content_hash}', '{{}}'::jsonb)" ) out(f"""WITH artifact AS ( SELECT id FROM builtin_knowledge_artifacts WHERE artifact_version = '{artifact_version}' ), docs AS ( SELECT id, source_path FROM builtin_knowledge_documents WHERE artifact_id = (SELECT id FROM artifact) ), chunks(source_path, chunk_order, chunk_text, content_hash, metadata_json) AS ( VALUES {',\n'.join(chunk_values)} ) INSERT INTO builtin_knowledge_chunks ( document_id, chunk_order, chunk_text, content_hash, metadata ) SELECT docs.id, chunks.chunk_order, chunks.chunk_text, chunks.content_hash, chunks.metadata_json FROM chunks JOIN docs ON docs.source_path = chunks.source_path ON CONFLICT (document_id, chunk_order) DO UPDATE SET chunk_text = EXCLUDED.chunk_text, content_hash = EXCLUDED.content_hash, metadata = EXCLUDED.metadata;""") out() # Embeddings out(f"-- {embedding_count} embeddings") out() for doc in all_docs: for chunk_text, vector in zip(doc['chunks'], doc.get('vectors', [])): content_hash = hashlib.sha256(chunk_text.encode()).hexdigest()[:32] vec_sql = format_vector(vector) out(f"""INSERT INTO builtin_knowledge_embeddings (chunk_id, embedding, embedding_provider, embedding_model) SELECT c.id, '{vec_sql}'::vector, 'openrouter', '{MODEL.lower()}' FROM builtin_knowledge_chunks c WHERE c.content_hash = '{content_hash}' ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding, embedding_provider = EXCLUDED.embedding_provider, embedding_model = EXCLUDED.embedding_model;""") out() out("COMMIT;") sql_content = sql_out.getvalue() if args.output_sql: atomic_write(args.output_sql, sql_content) print(f"-- Wrote SQL artifact: {args.output_sql}", file=sys.stderr) else: sys.stdout.write(sql_content) if args.output_metadata: atomic_write( args.output_metadata, json.dumps(metadata_contract, indent=2) + "\n", ) print(f"-- Wrote metadata: {args.output_metadata}", file=sys.stderr) print(f"-- Generation complete: {len(all_docs)} docs, {total_chunks} chunks, {embedding_count} embeddings", file=sys.stderr) if __name__ == "__main__": main()