clawdie-ai/scripts/memory/embed-builtin-knowledge.py
Operator & Codex f31f406fec Update docs for DNS doctor and artifact refresh
---
Build: pass | Tests: pass — 2260 passed (671 files)
2026-05-09 16:48:36 +02:00

521 lines
17 KiB
Python

#!/usr/bin/env python3
"""
scripts/memory/embed-builtin-knowledge.py — Generate built-in knowledge bootstrap artifacts.
Discovers all project docs and skill definitions, chunks by markdown headings,
generates embeddings via OpenRouter (BAAI/bge-m3), and outputs a complete
artifact.sql ready for import into the skills database.
Usage:
OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py
OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py --dry-run
OPENROUTER_API_KEY=sk-or-... python3 scripts/memory/embed-builtin-knowledge.py \\
--output-sql bootstrap/skills-memory/artifact.sql \\
--output-metadata bootstrap/skills-memory/metadata.json
Output:
stdout by default — complete artifact.sql with all INSERT statements
--output-sql / --output-metadata for maintainer artifact refresh
Progress (stderr): Discovery and embedding progress.
"""
import argparse
import hashlib
import io
import json
import os
import re
import subprocess
import sys
import time
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
# ── Config ────────────────────────────────────────────────────────────────────
OPENROUTER_URL = "https://openrouter.ai/api/v1/embeddings"
MODEL = "BAAI/bge-m3"
DIMENSIONS = 1024
CHUNK_SIZE = 900
ARTIFACT_VERSION = "v1.0.0-complete"
SCHEMA_VERSION = "builtin-knowledge-base-v1"
SOURCE_SNAPSHOT = "Full project docs, internal docs, identity files, and skill definitions"
CHUNKER_NAME = "heading-split"
CHUNKER_VERSION = "v2"
NOTES = (
"Refreshable built-in knowledge artifact including all public docs, "
"internal docs, identity files, and skill definitions. Pre-generated "
"embeddings via OpenRouter baai/bge-m3 for hybrid search."
)
# Files to skip — outdated, test, or meta-only content
SKIP_FILES = {
'nanoclaw-architecture-final.md',
'WIZARD-SIMULATION-TEST.md',
'DOCUMENTATION-SYNC-RUNBOOK.md',
}
# Doc patterns: (glob relative to PROJECT_ROOT, source_type, importance)
DOC_PATTERNS = [
('docs/public/**/*.md', 'doc'),
('docs/internal/**/*.md', 'doc'),
('SOUL.md', 'identity'),
('IDENTITY.md', 'identity'),
('USER.md', 'identity'),
('AGENTS.md', 'identity'),
('MEMORY.md', 'identity'),
('CLAWDIE-ISO.md', 'identity'),
('.agent/skills/*/SKILL.md', 'skill'),
]
SKIP_GLOBS = (
'docs/internal/sessions/',
'docs/internal/BUILD-TEST-REPORT-*.md',
'docs/internal/test-results.md',
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate built-in knowledge SQL + metadata artifacts."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Discover and chunk sources without generating embeddings or writing files.",
)
parser.add_argument(
"--output-sql",
help="Write artifact SQL to this path instead of stdout.",
)
parser.add_argument(
"--output-metadata",
help="Write metadata JSON to this path.",
)
parser.add_argument(
"--artifact-version",
default=ARTIFACT_VERSION,
help=f"Artifact version label (default: {ARTIFACT_VERSION}).",
)
return parser.parse_args()
def atomic_write(target: str, content: str) -> None:
target_path = Path(target)
target_path.parent.mkdir(parents=True, exist_ok=True)
tmp_dir = PROJECT_ROOT / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / f"{target_path.name}.{os.getpid()}.tmp"
tmp_path.write_text(content, encoding="utf-8")
os.replace(tmp_path, target_path)
# ── Chunking ──────────────────────────────────────────────────────────────────
def chunk_markdown(text: str) -> list[str]:
"""Split markdown by headings, then further split long sections. Max CHUNK_SIZE chars."""
sections = re.split(r'(?=\n#{1,3} )', text)
chunks: list[str] = []
for section in sections:
section = section.strip()
if not section:
continue
if len(section) <= CHUNK_SIZE:
chunks.append(section)
else:
paragraphs = re.split(r'\n\n+', section)
current = ''
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current) + len(para) + 2 <= CHUNK_SIZE:
current = f'{current}\n\n{para}'.strip() if current else para
else:
if current:
chunks.append(current)
while len(para) > CHUNK_SIZE:
chunks.append(para[:CHUNK_SIZE])
para = para[CHUNK_SIZE:]
current = para
if current:
chunks.append(current)
return chunks or [text[:CHUNK_SIZE]]
# ── File discovery ────────────────────────────────────────────────────────────
def collect_files() -> list[tuple[Path, str]]:
"""Collect all embeddable files with their source_type."""
files: list[tuple[Path, str]] = []
seen: set[Path] = set()
for pattern, source_type in DOC_PATTERNS:
for path in sorted(PROJECT_ROOT.glob(pattern)):
if path.name in SKIP_FILES:
continue
rel = path.relative_to(PROJECT_ROOT)
rel_posix = rel.as_posix()
if rel_posix.startswith('docs/internal/sessions/'):
continue
if any(rel.match(skip_glob) for skip_glob in SKIP_GLOBS[1:]):
continue
if path in seen:
continue
seen.add(path)
files.append((path, source_type))
return files
def derive_title(path: Path, text: str) -> str:
"""Extract title from first heading or filename."""
for line in text.splitlines()[:10]:
line = line.strip()
if line.startswith('# '):
return line.lstrip('#').strip()[:120]
return path.stem.replace('-', ' ').replace('_', ' ').title()[:120]
# ── Embeddings ────────────────────────────────────────────────────────────────
def embed_batch(texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts via OpenRouter."""
api_key = os.environ.get("OPENROUTER_API_KEY", "")
if not api_key:
print("ERROR: OPENROUTER_API_KEY not set", file=sys.stderr)
sys.exit(1)
body = {"input": texts, "model": MODEL}
payload = json.dumps(body).encode()
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
req = urllib.request.Request(OPENROUTER_URL, data=payload, headers=headers)
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.load(resp)
return [d["embedding"] for d in sorted(data["data"], key=lambda d: d["index"])]
# ── SQL helpers ───────────────────────────────────────────────────────────────
def sql_escape(text: str) -> str:
"""Escape text for SQL using dollar-quoting where possible."""
# Use $$ quoting for chunk text (may contain single quotes)
return text
def format_vector(vec: list[float]) -> str:
return "[" + ",".join(f"{v:.8f}" for v in vec) + "]"
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
args = parse_args()
dry_run = args.dry_run
files = collect_files()
print(f"-- Discovered {len(files)} source files", file=sys.stderr)
if not files:
print("ERROR: No files found", file=sys.stderr)
sys.exit(1)
# Phase 1: Chunk all files
all_docs: list[dict] = []
total_chunks = 0
for path, source_type in files:
rel = str(path.relative_to(PROJECT_ROOT))
text = path.read_text(encoding='utf-8', errors='replace').strip()
if not text:
print(f" skip (empty): {rel}", file=sys.stderr)
continue
chunks = chunk_markdown(text)
title = derive_title(path, text)
checksum = hashlib.sha256(text.encode()).hexdigest()[:16]
all_docs.append({
'source_path': rel,
'source_type': source_type,
'title': title,
'checksum': checksum,
'chunks': chunks,
})
total_chunks += len(chunks)
print(f" {rel}: {len(chunks)} chunks", file=sys.stderr)
print(f"-- Total: {len(all_docs)} docs, {total_chunks} chunks", file=sys.stderr)
if dry_run:
print(
f"\nDRY RUN — would embed {total_chunks} chunks from {len(all_docs)} docs",
file=sys.stderr,
)
sys.exit(0)
# Phase 2: Generate embeddings (batch per file, with rate limiting)
print(f"-- Generating embeddings via {MODEL}...", file=sys.stderr)
for i, doc in enumerate(all_docs):
chunk_texts = doc['chunks']
try:
vectors = embed_batch(chunk_texts)
doc['vectors'] = vectors
print(f" [{i+1}/{len(all_docs)}] {doc['source_path']}: {len(vectors)} embeddings", file=sys.stderr)
except Exception as e:
print(f" ERROR embedding {doc['source_path']}: {e}", file=sys.stderr)
sys.exit(1)
# Rate limit: ~2 requests/sec to avoid 429s
if i < len(all_docs) - 1:
time.sleep(0.5)
# Phase 3: Output artifact.sql
git_commit = subprocess.run(
['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True, text=True, cwd=PROJECT_ROOT,
).stdout.strip() or None
now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S+00')
embedding_count = sum(len(d.get('vectors', [])) for d in all_docs)
artifact_version = args.artifact_version
sql_out = io.StringIO()
def out(line: str = "") -> None:
print(line, file=sql_out)
# Artifact metadata
metadata_json = json.dumps(
{
"notes": NOTES,
"search_mode": "hybrid",
"embedding_provider": "openrouter",
"embedding_model": MODEL.lower(),
"embedding_dimensions": DIMENSIONS,
},
indent=2,
)
metadata_contract = {
"artifact_version": artifact_version,
"schema_version": SCHEMA_VERSION,
"source_snapshot": SOURCE_SNAPSHOT,
"chunker_name": CHUNKER_NAME,
"chunker_version": CHUNKER_VERSION,
"chunk_size": CHUNK_SIZE,
"chunk_overlap": 0,
"embedding_provider": "openrouter",
"embedding_model": MODEL.lower(),
"embedding_dimensions": DIMENSIONS,
"generated_at": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"),
"document_count": len(all_docs),
"chunk_count": total_chunks,
"embedding_count": embedding_count,
"git_commit": git_commit,
"notes": NOTES,
}
out("-- Built-in knowledge bootstrap artifact")
out("-- Imported into the agent-system skills database.")
out()
out("BEGIN;")
out()
out(f"""INSERT INTO builtin_knowledge_artifacts (
artifact_version,
schema_version,
source_snapshot,
chunker_name,
chunker_version,
chunk_size,
chunk_overlap,
embedding_provider,
embedding_model,
embedding_dimensions,
generated_at,
document_count,
chunk_count,
embedding_count,
git_commit,
metadata
)
VALUES (
'{artifact_version}',
'{SCHEMA_VERSION}',
'{SOURCE_SNAPSHOT}',
'{CHUNKER_NAME}',
'{CHUNKER_VERSION}',
{CHUNK_SIZE},
0,
'openrouter',
'{MODEL.lower()}',
{DIMENSIONS},
TIMESTAMPTZ '{now}',
{len(all_docs)},
{total_chunks},
{embedding_count},
{f"'{git_commit}'" if git_commit else 'NULL'},
'{metadata_json}'::jsonb
)
ON CONFLICT (artifact_version) DO UPDATE
SET
schema_version = EXCLUDED.schema_version,
source_snapshot = EXCLUDED.source_snapshot,
chunker_name = EXCLUDED.chunker_name,
chunker_version = EXCLUDED.chunker_version,
chunk_size = EXCLUDED.chunk_size,
chunk_overlap = EXCLUDED.chunk_overlap,
embedding_provider = EXCLUDED.embedding_provider,
embedding_model = EXCLUDED.embedding_model,
embedding_dimensions = EXCLUDED.embedding_dimensions,
generated_at = EXCLUDED.generated_at,
document_count = EXCLUDED.document_count,
chunk_count = EXCLUDED.chunk_count,
embedding_count = EXCLUDED.embedding_count,
git_commit = EXCLUDED.git_commit,
metadata = EXCLUDED.metadata;""")
out()
# Documents
out(f"-- {len(all_docs)} documents")
doc_values = []
for doc in all_docs:
sp = doc['source_path'].replace("'", "''")
st = doc['source_type']
title = doc['title'].replace("'", "''")
cs = doc['checksum']
doc_values.append(
f" ('{sp}', '{st}', '{title}', 'en', '{cs}', '{{}}'::jsonb)"
)
out(f"""WITH artifact AS (
SELECT id
FROM builtin_knowledge_artifacts
WHERE artifact_version = '{artifact_version}'
),
docs(source_path, source_type, title, locale, checksum, metadata_json) AS (
VALUES
{',\n'.join(doc_values)}
)
INSERT INTO builtin_knowledge_documents (
artifact_id,
source_path,
source_type,
title,
locale,
checksum,
metadata
)
SELECT
artifact.id,
docs.source_path,
docs.source_type,
docs.title,
docs.locale,
docs.checksum,
docs.metadata_json
FROM artifact
CROSS JOIN docs
ON CONFLICT (artifact_id, source_path) DO UPDATE
SET
source_type = EXCLUDED.source_type,
title = EXCLUDED.title,
locale = EXCLUDED.locale,
checksum = EXCLUDED.checksum,
metadata = EXCLUDED.metadata;""")
out()
# Chunks
out(f"-- {total_chunks} chunks")
chunk_values = []
for doc in all_docs:
sp = doc['source_path'].replace("'", "''")
for order, chunk_text in enumerate(doc['chunks']):
content_hash = hashlib.sha256(chunk_text.encode()).hexdigest()[:32]
# Use dollar-quoting to avoid escaping issues
chunk_values.append(
f" ('{sp}', {order}, $chunk${chunk_text}$chunk$, '{content_hash}', '{{}}'::jsonb)"
)
out(f"""WITH artifact AS (
SELECT id
FROM builtin_knowledge_artifacts
WHERE artifact_version = '{artifact_version}'
),
docs AS (
SELECT id, source_path
FROM builtin_knowledge_documents
WHERE artifact_id = (SELECT id FROM artifact)
),
chunks(source_path, chunk_order, chunk_text, content_hash, metadata_json) AS (
VALUES
{',\n'.join(chunk_values)}
)
INSERT INTO builtin_knowledge_chunks (
document_id,
chunk_order,
chunk_text,
content_hash,
metadata
)
SELECT
docs.id,
chunks.chunk_order,
chunks.chunk_text,
chunks.content_hash,
chunks.metadata_json
FROM chunks
JOIN docs ON docs.source_path = chunks.source_path
ON CONFLICT (document_id, chunk_order) DO UPDATE
SET
chunk_text = EXCLUDED.chunk_text,
content_hash = EXCLUDED.content_hash,
metadata = EXCLUDED.metadata;""")
out()
# Embeddings
out(f"-- {embedding_count} embeddings")
out()
for doc in all_docs:
for chunk_text, vector in zip(doc['chunks'], doc.get('vectors', [])):
content_hash = hashlib.sha256(chunk_text.encode()).hexdigest()[:32]
vec_sql = format_vector(vector)
out(f"""INSERT INTO builtin_knowledge_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
SELECT c.id, '{vec_sql}'::vector, 'openrouter', '{MODEL.lower()}'
FROM builtin_knowledge_chunks c
WHERE c.content_hash = '{content_hash}'
ON CONFLICT (chunk_id) DO UPDATE
SET embedding = EXCLUDED.embedding,
embedding_provider = EXCLUDED.embedding_provider,
embedding_model = EXCLUDED.embedding_model;""")
out()
out("COMMIT;")
sql_content = sql_out.getvalue()
if args.output_sql:
atomic_write(args.output_sql, sql_content)
print(f"-- Wrote SQL artifact: {args.output_sql}", file=sys.stderr)
else:
sys.stdout.write(sql_content)
if args.output_metadata:
atomic_write(
args.output_metadata,
json.dumps(metadata_contract, indent=2) + "\n",
)
print(f"-- Wrote metadata: {args.output_metadata}", file=sys.stderr)
print(f"-- Generation complete: {len(all_docs)} docs, {total_chunks} chunks, {embedding_count} embeddings", file=sys.stderr)
if __name__ == "__main__":
main()