clawdie-ai/scripts/memory/import-memories.py
Mevy Assistant b8fd655f02 Refactor V2 identity and platform ownership model
Make the multitenant branch use a clean PLATFORM_*/TENANT_* model, remove active AGENT_NAME runtime usage, collapse hostd ownership into the shared platform, add operator audit surfaces, and add read-only tenant lifecycle commands.

---
Build: pass | Tests: pass — 151 passed (14 files)
2026-04-24 07:49:09 +02:00

268 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
"""
scripts/memory/import-memories.py
Import session memories from a transfer JSON into {agent}_brain.
Usage:
python3 scripts/memory/import-memories.py <transfer.json>
Each 'memories' entry is inserted into:
memories — one row per session
memory_chunks — one row per chunk (≤500 chars, sentence-split)
memory_embeddings — one row per chunk via bge-m3 at EMBED_BASE_URL
"""
import hashlib
import json
import os
import re
import sys
import urllib.request
import urllib.error
from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
def load_env(path: Path) -> dict[str, str]:
env: dict[str, str] = {}
if not path.exists():
return env
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
k, _, v = line.partition('=')
env[k.strip()] = v.strip().strip('"\'')
return env
ENV = load_env(PROJECT_ROOT / '.env')
TENANT_ID = ENV.get('TENANT_ID', 'clawdie')
DB_HOST = ENV.get('WARDEN_DB_IP', '10.0.1.3')
DB_PORT = int(ENV.get('DB_PORT', '5432'))
DB_NAME = ENV.get('MEMORY_DB_NAME') or f'{TENANT_ID}_brain'
DB_USER = ENV.get('MEMORY_DB_USER') or f'{TENANT_ID}_brain'
DB_PASSWORD = ENV.get('MEMORY_DB_PASSWORD', '')
EMBED_URL = ENV.get('EMBED_BASE_URL', 'http://10.0.1.5:8080/v1')
EMBED_MODEL = ENV.get('EMBED_MODEL', 'bge-m3')
EMBED_DIMS = int(ENV.get('EMBED_DIMENSIONS', '1024'))
EMBED_KEY = ENV.get('EMBED_API_KEY', '') or ENV.get('OPENAI_API_KEY', '')
# ── PostgreSQL (psycopg2 or psql subprocess fallback) ─────────────────────────
try:
import psycopg2
import psycopg2.extras
_HAVE_PSYCOPG2 = True
except ImportError:
_HAVE_PSYCOPG2 = False
def get_conn():
if not _HAVE_PSYCOPG2:
raise RuntimeError("psycopg2 not available — install it or use the psql fallback")
return psycopg2.connect(
host=DB_HOST, port=DB_PORT,
dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD,
)
# ── psql subprocess fallback ──────────────────────────────────────────────────
import subprocess
import tempfile
def psql_exec(sql: str) -> str:
"""Run SQL via psql and return stdout. Used when psycopg2 is absent."""
env = os.environ.copy()
env['PGPASSWORD'] = DB_PASSWORD
result = subprocess.run(
['psql', '-h', DB_HOST, '-p', str(DB_PORT), '-U', DB_USER, '-d', DB_NAME,
'--no-align', '--tuples-only', '--quiet', '-c', sql],
capture_output=True, text=True, env=env,
)
if result.returncode != 0:
raise RuntimeError(f"psql error: {result.stderr.strip()}")
return result.stdout.strip()
# ── Chunking ──────────────────────────────────────────────────────────────────
MAX_CHUNK = 500
def chunk_text(text: str) -> list[str]:
"""Split text into sentence-boundary chunks ≤ MAX_CHUNK chars."""
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
chunks: list[str] = []
current = ''
for sent in sentences:
if not sent:
continue
if len(current) + len(sent) + 1 <= MAX_CHUNK:
current = f'{current} {sent}'.strip() if current else sent
else:
if current:
chunks.append(current)
# If single sentence > MAX_CHUNK, split hard
while len(sent) > MAX_CHUNK:
chunks.append(sent[:MAX_CHUNK])
sent = sent[MAX_CHUNK:]
current = sent
if current:
chunks.append(current)
return chunks or [text[:MAX_CHUNK]]
# ── Embeddings ────────────────────────────────────────────────────────────────
def embed(texts: list[str]) -> list[list[float]]:
body: dict = {'input': texts, 'model': EMBED_MODEL}
if 'openai.com' in EMBED_URL: # OpenAI supports dimension truncation; others may not
body['dimensions'] = EMBED_DIMS
payload = json.dumps(body).encode()
headers = {'Content-Type': 'application/json'}
if EMBED_KEY:
headers['Authorization'] = f'Bearer {EMBED_KEY}'
req = urllib.request.Request(f'{EMBED_URL}/embeddings', data=payload, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
body_resp = json.load(resp)
return [d['embedding'] for d in sorted(body_resp['data'], key=lambda d: d['index'])]
# ── Import ────────────────────────────────────────────────────────────────────
def format_array(items: list[str]) -> str:
"""Format Python list as PostgreSQL text[] literal."""
escaped = [item.replace("'", "''").replace('\\', '\\\\') for item in items]
return '{' + ','.join(f'"{e}"' for e in escaped) + '}'
def import_memory(mem: dict, conn=None) -> None:
date_str = mem.get('date', '')
importance = int(mem.get('importance', 2))
summary = mem.get('summary', '').strip()
topics = mem.get('topics', [])
if not summary:
print(f" skip: empty summary for {date_str}")
return
session_id = f'import:{date_str}'
if _HAVE_PSYCOPG2 and conn:
cur = conn.cursor()
# Duplicate check
cur.execute("SELECT id FROM memories WHERE session_id = %s", (session_id,))
existing = cur.fetchone()
if existing:
print(f" skip {date_str}: already imported")
cur.close()
return
# Insert memory
cur.execute("""
INSERT INTO memories (session_id, importance, summary, topics)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (session_id, importance, summary, topics))
memory_id = cur.fetchone()[0]
chunks = chunk_text(summary)
vectors = embed(chunks)
for order, (chunk, vector) in enumerate(zip(chunks, vectors)):
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
cur.execute("""
INSERT INTO memory_chunks (memory_id, chunk_order, chunk_text, content_hash)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (memory_id, order, chunk, content_hash))
chunk_id = cur.fetchone()[0]
vector_str = '[' + ','.join(str(v) for v in vector) + ']'
cur.execute("""
INSERT INTO memory_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
VALUES (%s, %s::vector, %s, %s)
""", (chunk_id, vector_str, 'llama-cpp', EMBED_MODEL))
conn.commit()
cur.close()
print(f"{date_str} importance={importance} chunks={len(chunks)}")
else:
# psql fallback — build SQL and execute
safe_summary = summary.replace("'", "''")
safe_session = session_id.replace("'", "''")
topics_arr = format_array(topics)
# Duplicate check
existing = psql_exec(f"SELECT id FROM memories WHERE session_id = '{safe_session}' LIMIT 1")
if existing:
print(f" skip {date_str}: already imported")
return
mem_id = psql_exec(f"""
INSERT INTO memories (session_id, importance, summary, topics)
VALUES ('{safe_session}', {importance}, '{safe_summary}', '{topics_arr}')
RETURNING id
""")
chunks = chunk_text(summary)
vectors = embed(chunks)
for order, (chunk, vector) in enumerate(zip(chunks, vectors)):
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
safe_chunk = chunk.replace("'", "''")
chunk_id = psql_exec(f"""
INSERT INTO memory_chunks (memory_id, chunk_order, chunk_text, content_hash)
VALUES ('{mem_id}', {order}, '{safe_chunk}', '{content_hash}')
RETURNING id
""")
vector_str = '[' + ','.join(str(v) for v in vector) + ']'
psql_exec(f"""
INSERT INTO memory_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
VALUES ('{chunk_id}', '{vector_str}'::vector, 'llama-cpp', '{EMBED_MODEL}')
""")
print(f"{date_str} importance={importance} chunks={len(chunks)}")
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
if len(sys.argv) < 2:
print(f"Usage: python3 {sys.argv[0]} <transfer.json>")
sys.exit(1)
transfer_path = Path(sys.argv[1])
if not transfer_path.exists():
print(f"File not found: {transfer_path}")
sys.exit(1)
data = json.loads(transfer_path.read_text())
memories = data.get('memories', [])
if not memories:
print("No memories found in transfer file.")
sys.exit(0)
print(f"Importing {len(memories)} memories into {DB_NAME}@{DB_HOST}")
print(f"Embedding via {EMBED_URL} ({EMBED_MODEL})")
print()
conn = None
if _HAVE_PSYCOPG2:
try:
conn = get_conn()
print("Connected via psycopg2")
except Exception as e:
print(f"psycopg2 connect failed: {e} — falling back to psql")
for mem in memories:
import_memory(mem, conn)
if conn:
conn.close()
print()
print("Done.")
if __name__ == '__main__':
main()