"""SQLite persistence for per-sender conversation history.""" from __future__ import annotations import sqlite3 from pathlib import Path SCHEMA = """ CREATE TABLE IF NOT EXISTS conversations ( public_key TEXT PRIMARY KEY, contact_name TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, public_key TEXT NOT NULL REFERENCES conversations(public_key), role TEXT NOT NULL CHECK (role IN ('user', 'assistant')), content TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id ON messages(public_key, id); """ def connect(path: str | Path) -> sqlite3.Connection: """Open the SQLite DB, ensure the parent dir and schema exist, return the connection.""" db_path = Path(path) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path, isolation_level=None) # autocommit conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode = WAL;") conn.execute("PRAGMA foreign_keys = ON;") conn.executescript(SCHEMA) return conn def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None: conn.execute( """ INSERT INTO conversations (public_key, contact_name) VALUES (?, ?) ON CONFLICT(public_key) DO UPDATE SET contact_name = excluded.contact_name, updated_at = datetime('now') """, (public_key, contact_name), ) def add_message(conn: sqlite3.Connection, public_key: str, role: str, content: str) -> None: conn.execute( "INSERT INTO messages (public_key, role, content) VALUES (?, ?, ?)", (public_key, role, content), ) def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]: """Return the conversation as OpenAI chat messages, oldest first.""" rows = conn.execute( "SELECT role, content FROM messages WHERE public_key = ? ORDER BY id ASC", (public_key,), ).fetchall() return [{"role": row["role"], "content": row["content"]} for row in rows]