clawdie-ai/scripts/memory/embed-docs.py

287 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
scripts/memory/embed-docs.py Embed project docs into {agent}_brain.
Reads markdown files, splits by heading sections, embeds via bge-m3,
and inserts into the memories/memory_chunks/memory_embeddings tables.
Usage:
python3 scripts/memory/embed-docs.py # embed all docs
python3 scripts/memory/embed-docs.py --dry-run # show what would be embedded
python3 scripts/memory/embed-docs.py --force # re-embed even if already present
2026-04-06 13:22:24 +00:00
session_id pattern: doc:<relative-path> e.g. doc:docs/public/install/install.md
"""
import hashlib
import json
import os
import re
import subprocess
import sys
from pathlib import Path
# ── Config ─────────────────────────────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
def load_env(path: Path) -> dict[str, str]:
env: dict[str, str] = {}
if not path.exists():
return env
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
k, _, v = line.partition('=')
env[k.strip()] = v.strip().strip('"\'')
return env
ENV = load_env(PROJECT_ROOT / '.env')
TENANT_ID = ENV.get('TENANT_ID', 'clawdie')
DB_HOST = ENV.get('WARDEN_DB_IP', '10.0.1.3')
DB_PORT = ENV.get('DB_PORT', '5432')
DB_NAME = ENV.get('MEMORY_DB_NAME') or f'{TENANT_ID}_brain'
DB_USER = ENV.get('MEMORY_DB_USER') or f'{TENANT_ID}_brain'
DB_PASSWORD = ENV.get('MEMORY_DB_PASSWORD', '')
EMBED_URL = ENV.get('EMBED_BASE_URL', 'http://10.0.1.5:8080/v1')
EMBED_MODEL = ENV.get('EMBED_MODEL', 'bge-m3')
EMBED_DIMS = int(ENV.get('EMBED_DIMENSIONS', '1024'))
EMBED_KEY = ENV.get('EMBED_API_KEY', '') or ENV.get('OPENAI_API_KEY', '')
# Files to skip — outdated, test, or meta-only content
SKIP_FILES = {
'nanoclaw-architecture-final.md', # old NanoClaw predecessor, pre-FreeBSD
'WIZARD-SIMULATION-TEST.md', # test fixture, not knowledge
'DOCUMENTATION-SYNC-RUNBOOK.md', # meta-docs about managing docs
}
# Docs to embed: docs/public + docs/internal + root identity files
DOC_PATTERNS = [
('docs/public/**/*.md', 3), # (glob relative to project root, importance)
('docs/internal/**/*.md', 2),
('SOUL.md', 5),
('IDENTITY.md', 5),
('USER.md', 5),
('AGENTS.md', 4),
('MEMORY.md', 4),
('CLAWDIE-ISO.md', 3),
]
MAX_CHUNK = 900 # chars — larger than session memories for doc retrieval
# ── Chunking ───────────────────────────────────────────────────────────────────
def chunk_markdown(text: str) -> list[str]:
"""
Split markdown by headings (## / ###), then further split long sections.
Each chunk is MAX_CHUNK chars.
"""
# Split on heading lines
sections = re.split(r'(?=\n#{1,3} )', text)
chunks: list[str] = []
for section in sections:
section = section.strip()
if not section:
continue
if len(section) <= MAX_CHUNK:
chunks.append(section)
else:
# Split long section on paragraph boundaries
paragraphs = re.split(r'\n\n+', section)
current = ''
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current) + len(para) + 2 <= MAX_CHUNK:
current = f'{current}\n\n{para}'.strip() if current else para
else:
if current:
chunks.append(current)
# If single paragraph still too long, hard split
while len(para) > MAX_CHUNK:
chunks.append(para[:MAX_CHUNK])
para = para[MAX_CHUNK:]
current = para
if current:
chunks.append(current)
return chunks or [text[:MAX_CHUNK]]
# ── Embeddings ─────────────────────────────────────────────────────────────────
import urllib.request
def embed(texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts. Sends all chunks in one API call."""
body: dict = {'input': texts, 'model': EMBED_MODEL}
if 'openai.com' in EMBED_URL: # OpenAI supports dimension truncation; others may not
body['dimensions'] = EMBED_DIMS
payload = json.dumps(body).encode()
headers = {'Content-Type': 'application/json'}
if EMBED_KEY:
headers['Authorization'] = f'Bearer {EMBED_KEY}'
req = urllib.request.Request(f'{EMBED_URL}/embeddings', data=payload, headers=headers)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.load(resp)
return [d['embedding'] for d in sorted(data['data'], key=lambda d: d['index'])]
# ── PostgreSQL via psql ────────────────────────────────────────────────────────
def psql(sql: str) -> str:
env = os.environ.copy()
env['PGPASSWORD'] = DB_PASSWORD
r = subprocess.run(
['psql', '-h', DB_HOST, '-p', DB_PORT, '-U', DB_USER, '-d', DB_NAME,
'--no-align', '--tuples-only', '--quiet', '-c', sql],
capture_output=True, text=True, env=env,
)
if r.returncode != 0:
raise RuntimeError(f"psql: {r.stderr.strip()}")
return r.stdout.strip()
def session_exists(session_id: str) -> bool:
safe = session_id.replace("'", "''")
result = psql(f"SELECT 1 FROM memories WHERE session_id = '{safe}' LIMIT 1")
return bool(result.strip())
def insert_doc(session_id: str, summary: str, topics: list[str],
importance: int, chunks: list[str], vectors: list[list[float]]) -> None:
safe_summary = summary.replace("'", "''")
safe_session = session_id.replace("'", "''")
topics_arr = '{' + ','.join(f'"{t}"' for t in topics) + '}'
mem_id = psql(f"""
INSERT INTO memories (session_id, importance, summary, topics)
VALUES ('{safe_session}', {importance}, '{safe_summary}', '{topics_arr}')
RETURNING id
""")
for order, (chunk, vector) in enumerate(zip(chunks, vectors)):
safe_chunk = chunk.replace("'", "''")
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
chunk_id = psql(f"""
INSERT INTO memory_chunks (memory_id, chunk_order, chunk_text, content_hash)
VALUES ('{mem_id}', {order}, '{safe_chunk}', '{content_hash}')
RETURNING id
""")
vector_str = '[' + ','.join(str(v) for v in vector) + ']'
psql(f"""
INSERT INTO memory_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
VALUES ('{chunk_id}', '{vector_str}'::vector, 'llama-cpp', '{EMBED_MODEL}')
""")
def delete_doc(session_id: str) -> None:
safe = session_id.replace("'", "''")
psql(f"""
DELETE FROM memory_embeddings
WHERE chunk_id IN (
SELECT mc.id FROM memory_chunks mc
JOIN memories m ON mc.memory_id = m.id
WHERE m.session_id = '{safe}'
)
""")
psql(f"""
DELETE FROM memory_chunks
WHERE memory_id IN (
SELECT id FROM memories WHERE session_id = '{safe}'
)
""")
psql(f"DELETE FROM memories WHERE session_id = '{safe}'")
# ── Topic extraction ───────────────────────────────────────────────────────────
def extract_topics(path: Path, text: str) -> list[str]:
topics = [path.stem.lower().replace('-', ' ').replace('_', ' ')]
# Pull first-level headings as topics
for m in re.finditer(r'^# (.+)', text, re.MULTILINE):
word = m.group(1).strip().lower()[:30]
if word and word not in topics:
topics.append(word)
return topics[:5]
# ── Main ───────────────────────────────────────────────────────────────────────
def collect_files() -> list[tuple[Path, int]]:
files: list[tuple[Path, int]] = []
seen: set[Path] = set()
for pattern, importance in DOC_PATTERNS:
for path in sorted(PROJECT_ROOT.glob(pattern)):
if path.name in SKIP_FILES:
continue
if path in seen:
continue
seen.add(path)
files.append((path, importance))
return files
def main() -> None:
dry_run = '--dry-run' in sys.argv
force = '--force' in sys.argv
files = collect_files()
print(f"Embedding {len(files)} documents into {DB_NAME}@{DB_HOST}")
print(f"Embed: {EMBED_URL} ({EMBED_MODEL}) chunk_size={MAX_CHUNK}")
if dry_run:
print("DRY RUN — no writes\n")
print()
total_chunks = 0
skipped = 0
embedded = 0
for path, importance in files:
rel = str(path.relative_to(PROJECT_ROOT))
session_id = f'doc:{rel}'
text = path.read_text(encoding='utf-8', errors='replace').strip()
if not text:
print(f" skip (empty): {rel}")
skipped += 1
continue
if not dry_run and not force and session_exists(session_id):
print(f" skip (exists): {rel}")
skipped += 1
continue
chunks = chunk_markdown(text)
summary = f"{path.name}: {text.splitlines()[0].lstrip('#').strip()[:80]}"
topics = extract_topics(path, text)
if dry_run:
print(f" would embed: {rel} ({len(chunks)} chunks, importance={importance})")
total_chunks += len(chunks)
continue
# Embed in one batch per file
try:
vectors = embed(chunks)
except Exception as e:
print(f" ERROR embedding {rel}: {e}")
continue
if force and session_exists(session_id):
delete_doc(session_id)
try:
insert_doc(session_id, summary, topics, importance, chunks, vectors)
print(f"{rel} ({len(chunks)} chunks)")
total_chunks += len(chunks)
embedded += 1
except Exception as e:
print(f" ERROR inserting {rel}: {e}")
print()
if dry_run:
print(f"Would embed {len(files) - skipped} docs → {total_chunks} chunks")
else:
print(f"Done. {embedded} docs embedded, {skipped} skipped, {total_chunks} total chunks.")
if __name__ == '__main__':
main()