Make the multitenant branch use a clean PLATFORM_*/TENANT_* model, remove active AGENT_NAME runtime usage, collapse hostd ownership into the shared platform, add operator audit surfaces, and add read-only tenant lifecycle commands. --- Build: pass | Tests: pass — 151 passed (14 files)
268 lines
10 KiB
Python
Executable file
268 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
scripts/memory/import-memories.py
|
|
Import session memories from a transfer JSON into {agent}_brain.
|
|
|
|
Usage:
|
|
python3 scripts/memory/import-memories.py <transfer.json>
|
|
|
|
Each 'memories' entry is inserted into:
|
|
memories — one row per session
|
|
memory_chunks — one row per chunk (≤500 chars, sentence-split)
|
|
memory_embeddings — one row per chunk via bge-m3 at EMBED_BASE_URL
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
from pathlib import Path
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
|
|
def load_env(path: Path) -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
if not path.exists():
|
|
return env
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith('#') or '=' not in line:
|
|
continue
|
|
k, _, v = line.partition('=')
|
|
env[k.strip()] = v.strip().strip('"\'')
|
|
return env
|
|
|
|
ENV = load_env(PROJECT_ROOT / '.env')
|
|
|
|
TENANT_ID = ENV.get('TENANT_ID', 'clawdie')
|
|
DB_HOST = ENV.get('WARDEN_DB_IP', '10.0.1.3')
|
|
DB_PORT = int(ENV.get('DB_PORT', '5432'))
|
|
DB_NAME = ENV.get('MEMORY_DB_NAME') or f'{TENANT_ID}_brain'
|
|
DB_USER = ENV.get('MEMORY_DB_USER') or f'{TENANT_ID}_brain'
|
|
DB_PASSWORD = ENV.get('MEMORY_DB_PASSWORD', '')
|
|
EMBED_URL = ENV.get('EMBED_BASE_URL', 'http://10.0.1.5:8080/v1')
|
|
EMBED_MODEL = ENV.get('EMBED_MODEL', 'bge-m3')
|
|
EMBED_DIMS = int(ENV.get('EMBED_DIMENSIONS', '1024'))
|
|
EMBED_KEY = ENV.get('EMBED_API_KEY', '') or ENV.get('OPENAI_API_KEY', '')
|
|
|
|
# ── PostgreSQL (psycopg2 or psql subprocess fallback) ─────────────────────────
|
|
|
|
try:
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
_HAVE_PSYCOPG2 = True
|
|
except ImportError:
|
|
_HAVE_PSYCOPG2 = False
|
|
|
|
def get_conn():
|
|
if not _HAVE_PSYCOPG2:
|
|
raise RuntimeError("psycopg2 not available — install it or use the psql fallback")
|
|
return psycopg2.connect(
|
|
host=DB_HOST, port=DB_PORT,
|
|
dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD,
|
|
)
|
|
|
|
# ── psql subprocess fallback ──────────────────────────────────────────────────
|
|
|
|
import subprocess
|
|
import tempfile
|
|
|
|
def psql_exec(sql: str) -> str:
|
|
"""Run SQL via psql and return stdout. Used when psycopg2 is absent."""
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = DB_PASSWORD
|
|
result = subprocess.run(
|
|
['psql', '-h', DB_HOST, '-p', str(DB_PORT), '-U', DB_USER, '-d', DB_NAME,
|
|
'--no-align', '--tuples-only', '--quiet', '-c', sql],
|
|
capture_output=True, text=True, env=env,
|
|
)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"psql error: {result.stderr.strip()}")
|
|
return result.stdout.strip()
|
|
|
|
# ── Chunking ──────────────────────────────────────────────────────────────────
|
|
|
|
MAX_CHUNK = 500
|
|
|
|
def chunk_text(text: str) -> list[str]:
|
|
"""Split text into sentence-boundary chunks ≤ MAX_CHUNK chars."""
|
|
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
|
|
chunks: list[str] = []
|
|
current = ''
|
|
for sent in sentences:
|
|
if not sent:
|
|
continue
|
|
if len(current) + len(sent) + 1 <= MAX_CHUNK:
|
|
current = f'{current} {sent}'.strip() if current else sent
|
|
else:
|
|
if current:
|
|
chunks.append(current)
|
|
# If single sentence > MAX_CHUNK, split hard
|
|
while len(sent) > MAX_CHUNK:
|
|
chunks.append(sent[:MAX_CHUNK])
|
|
sent = sent[MAX_CHUNK:]
|
|
current = sent
|
|
if current:
|
|
chunks.append(current)
|
|
return chunks or [text[:MAX_CHUNK]]
|
|
|
|
# ── Embeddings ────────────────────────────────────────────────────────────────
|
|
|
|
def embed(texts: list[str]) -> list[list[float]]:
|
|
body: dict = {'input': texts, 'model': EMBED_MODEL}
|
|
if 'openai.com' in EMBED_URL: # OpenAI supports dimension truncation; others may not
|
|
body['dimensions'] = EMBED_DIMS
|
|
payload = json.dumps(body).encode()
|
|
headers = {'Content-Type': 'application/json'}
|
|
if EMBED_KEY:
|
|
headers['Authorization'] = f'Bearer {EMBED_KEY}'
|
|
req = urllib.request.Request(f'{EMBED_URL}/embeddings', data=payload, headers=headers)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
body_resp = json.load(resp)
|
|
return [d['embedding'] for d in sorted(body_resp['data'], key=lambda d: d['index'])]
|
|
|
|
# ── Import ────────────────────────────────────────────────────────────────────
|
|
|
|
def format_array(items: list[str]) -> str:
|
|
"""Format Python list as PostgreSQL text[] literal."""
|
|
escaped = [item.replace("'", "''").replace('\\', '\\\\') for item in items]
|
|
return '{' + ','.join(f'"{e}"' for e in escaped) + '}'
|
|
|
|
def import_memory(mem: dict, conn=None) -> None:
|
|
date_str = mem.get('date', '')
|
|
importance = int(mem.get('importance', 2))
|
|
summary = mem.get('summary', '').strip()
|
|
topics = mem.get('topics', [])
|
|
|
|
if not summary:
|
|
print(f" skip: empty summary for {date_str}")
|
|
return
|
|
|
|
session_id = f'import:{date_str}'
|
|
|
|
if _HAVE_PSYCOPG2 and conn:
|
|
cur = conn.cursor()
|
|
|
|
# Duplicate check
|
|
cur.execute("SELECT id FROM memories WHERE session_id = %s", (session_id,))
|
|
existing = cur.fetchone()
|
|
if existing:
|
|
print(f" skip {date_str}: already imported")
|
|
cur.close()
|
|
return
|
|
|
|
# Insert memory
|
|
cur.execute("""
|
|
INSERT INTO memories (session_id, importance, summary, topics)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (session_id, importance, summary, topics))
|
|
memory_id = cur.fetchone()[0]
|
|
|
|
chunks = chunk_text(summary)
|
|
vectors = embed(chunks)
|
|
|
|
for order, (chunk, vector) in enumerate(zip(chunks, vectors)):
|
|
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
|
|
|
|
cur.execute("""
|
|
INSERT INTO memory_chunks (memory_id, chunk_order, chunk_text, content_hash)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (memory_id, order, chunk, content_hash))
|
|
chunk_id = cur.fetchone()[0]
|
|
|
|
vector_str = '[' + ','.join(str(v) for v in vector) + ']'
|
|
cur.execute("""
|
|
INSERT INTO memory_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
|
|
VALUES (%s, %s::vector, %s, %s)
|
|
""", (chunk_id, vector_str, 'llama-cpp', EMBED_MODEL))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
print(f" ✓ {date_str} importance={importance} chunks={len(chunks)}")
|
|
|
|
else:
|
|
# psql fallback — build SQL and execute
|
|
safe_summary = summary.replace("'", "''")
|
|
safe_session = session_id.replace("'", "''")
|
|
topics_arr = format_array(topics)
|
|
|
|
# Duplicate check
|
|
existing = psql_exec(f"SELECT id FROM memories WHERE session_id = '{safe_session}' LIMIT 1")
|
|
if existing:
|
|
print(f" skip {date_str}: already imported")
|
|
return
|
|
|
|
mem_id = psql_exec(f"""
|
|
INSERT INTO memories (session_id, importance, summary, topics)
|
|
VALUES ('{safe_session}', {importance}, '{safe_summary}', '{topics_arr}')
|
|
RETURNING id
|
|
""")
|
|
|
|
chunks = chunk_text(summary)
|
|
vectors = embed(chunks)
|
|
|
|
for order, (chunk, vector) in enumerate(zip(chunks, vectors)):
|
|
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
|
|
safe_chunk = chunk.replace("'", "''")
|
|
chunk_id = psql_exec(f"""
|
|
INSERT INTO memory_chunks (memory_id, chunk_order, chunk_text, content_hash)
|
|
VALUES ('{mem_id}', {order}, '{safe_chunk}', '{content_hash}')
|
|
RETURNING id
|
|
""")
|
|
vector_str = '[' + ','.join(str(v) for v in vector) + ']'
|
|
psql_exec(f"""
|
|
INSERT INTO memory_embeddings (chunk_id, embedding, embedding_provider, embedding_model)
|
|
VALUES ('{chunk_id}', '{vector_str}'::vector, 'llama-cpp', '{EMBED_MODEL}')
|
|
""")
|
|
|
|
print(f" ✓ {date_str} importance={importance} chunks={len(chunks)}")
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: python3 {sys.argv[0]} <transfer.json>")
|
|
sys.exit(1)
|
|
|
|
transfer_path = Path(sys.argv[1])
|
|
if not transfer_path.exists():
|
|
print(f"File not found: {transfer_path}")
|
|
sys.exit(1)
|
|
|
|
data = json.loads(transfer_path.read_text())
|
|
memories = data.get('memories', [])
|
|
|
|
if not memories:
|
|
print("No memories found in transfer file.")
|
|
sys.exit(0)
|
|
|
|
print(f"Importing {len(memories)} memories into {DB_NAME}@{DB_HOST}")
|
|
print(f"Embedding via {EMBED_URL} ({EMBED_MODEL})")
|
|
print()
|
|
|
|
conn = None
|
|
if _HAVE_PSYCOPG2:
|
|
try:
|
|
conn = get_conn()
|
|
print("Connected via psycopg2")
|
|
except Exception as e:
|
|
print(f"psycopg2 connect failed: {e} — falling back to psql")
|
|
|
|
for mem in memories:
|
|
import_memory(mem, conn)
|
|
|
|
if conn:
|
|
conn.close()
|
|
|
|
print()
|
|
print("Done.")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|