diff --git a/Dockerfile b/Dockerfile index 80380f5..542ac27 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,12 +19,12 @@ FROM python:3.12-slim ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ - MESHBOT_CONFIG=/etc/meshbot/config.toml \ - MESHBOT_STORAGE__SQLITE_PATH=/data/meshbot.db + LORABOT_CONFIG=/etc/lorabot/config.toml \ + LORABOT_STORAGE__SQLITE_PATH=/data/lorabot.db -RUN useradd --system --home /app --shell /usr/sbin/nologin meshbot \ - && mkdir -p /data /etc/meshbot \ - && chown meshbot:meshbot /data +RUN useradd --system --home /app --shell /usr/sbin/nologin lorabot \ + && mkdir -p /data /etc/lorabot \ + && chown lorabot:lorabot /data WORKDIR /app @@ -32,8 +32,8 @@ COPY --from=builder /wheels/*.whl /tmp/wheels/ RUN pip install --no-cache-dir /tmp/wheels/*.whl \ && rm -rf /tmp/wheels -USER meshbot +USER lorabot -VOLUME ["/data", "/etc/meshbot"] +VOLUME ["/data", "/etc/lorabot"] -ENTRYPOINT ["meshbot"] +ENTRYPOINT ["lorabot"] diff --git a/README.md b/README.md index 43574b5..7cd5bf4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# meshbot +# lorabot Bridges a [MeshCore](https://meshcore.io) companion radio to any OpenAI-compatible LLM endpoint (e.g. `llama-server`, vLLM, Ollama). Listens for direct messages on the device, runs each @@ -14,19 +14,19 @@ pip install -e . cp config.example.toml config.toml # edit serial_port and [llm] in config.toml -python -m meshbot +python -m lorabot ``` -Config file path defaults to `./config.toml` and can be overridden with `MESHBOT_CONFIG`. -Any field can be overridden via env vars, e.g. `MESHBOT_LLM__API_KEY=sk-...`. +Config file path defaults to `./config.toml` and can be overridden with `LORABOT_CONFIG`. +Any field can be overridden via env vars, e.g. `LORABOT_LLM__API_KEY=sk-...`. ## Layout -- `src/meshbot/bot.py` — connect, subscribe to `CONTACT_MSG_RECV`, dispatch each DM. -- `src/meshbot/db.py` — SQLite schema and per-conversation repo functions. -- `src/meshbot/llm.py` — `AsyncOpenAI` wrapper. -- `src/meshbot/messages.py` — UTF-8-safe byte-length trimming. -- `src/meshbot/config.py` — TOML + env-var settings (pydantic-settings). +- `src/lorabot/bot.py` — connect, subscribe to `CONTACT_MSG_RECV`, dispatch each DM. +- `src/lorabot/db.py` — SQLite schema and per-conversation repo functions. +- `src/lorabot/llm.py` — `AsyncOpenAI` wrapper. +- `src/lorabot/messages.py` — UTF-8-safe byte-length trimming. +- `src/lorabot/config.py` — TOML + env-var settings (pydantic-settings). ## Docker @@ -34,23 +34,23 @@ Build and push a multi-arch image (`linux/amd64` + `linux/arm64`): ```sh docker login registry.example.com # once -export MESHBOT_IMAGE=registry.example.com/team/meshbot +export LORABOT_IMAGE=registry.example.com/team/lorabot ./scripts/build-and-push.sh # tags: latest + EXTRA_TAGS="v0.1.0" ./scripts/build-and-push.sh # add explicit version PUSH=0 PLATFORMS=linux/amd64 ./scripts/build-and-push.sh # local load only ``` -Run via compose (set `MESHBOT_IMAGE`, `MESHBOT_LLM_BASE_URL`, `MESHBOT_LLM_MODEL`, -optionally `MESHBOT_DEVICE`): +Run via compose (set `LORABOT_IMAGE`, `LORABOT_LLM_BASE_URL`, `LORABOT_LLM_MODEL`, +optionally `LORABOT_DEVICE`): ```sh -export MESHBOT_IMAGE=registry.example.com/team/meshbot:latest -export MESHBOT_LLM_BASE_URL=http://llama:8080/v1 -export MESHBOT_LLM_MODEL=llama-3.1-8b-instruct -export MESHBOT_DEVICE=/dev/ttyUSB0 +export LORABOT_IMAGE=registry.example.com/team/lorabot:latest +export LORABOT_LLM_BASE_URL=http://llama:8080/v1 +export LORABOT_LLM_MODEL=llama-3.1-8b-instruct +export LORABOT_DEVICE=/dev/ttyUSB0 docker compose up -d ``` -The container expects `config.toml` mounted at `/etc/meshbot/config.toml` and +The container expects `config.toml` mounted at `/etc/lorabot/config.toml` and persists SQLite to a named volume at `/data`. Any field can still be overridden -via `MESHBOT_
__` env vars. +via `LORABOT_
__` env vars. diff --git a/config.example.toml b/config.example.toml index 1234447..91d1f47 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,8 +1,8 @@ # Copy this file to `config.toml` and edit. The path can be overridden with -# the MESHBOT_CONFIG environment variable. Any field can be overridden with -# environment variables of the form MESHBOT_
__, e.g. -# MESHBOT_LLM__BASE_URL=http://llama:8080/v1 -# MESHBOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0 +# the LORABOT_CONFIG environment variable. Any field can be overridden with +# environment variables of the form LORABOT_
__, e.g. +# LORABOT_LLM__BASE_URL=http://llama:8080/v1 +# LORABOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0 [meshcore] serial_port = "/dev/ttyUSB0" @@ -17,9 +17,25 @@ temperature = 0.7 request_timeout_seconds = 60 [storage] -sqlite_path = "data/meshbot.db" +sqlite_path = "data/lorabot.db" [message] # MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame # headers further constrain the usable payload on your device. max_bytes = 184 + +[web] +# Built-in read-only web UI: stored conversations + live status. +enabled = true +host = "0.0.0.0" +port = 8080 + +[advertise] +# MeshCore companions don't advertise on their own. Lorabot can do it for them +# at a fixed cadence so the node stays visible on the mesh. Set +# interval_seconds = 0 to disable auto-advert (the web UI button still works). +enabled = true +interval_seconds = 3600 +at_startup = true +# Flood = multi-hop advert across the mesh. False = zero-hop (neighbors only). +flood = false diff --git a/docker-compose.yml b/docker-compose.yml index 9df0eff..19537e8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,25 +1,28 @@ services: - meshbot: - image: ${MESHBOT_IMAGE:?set MESHBOT_IMAGE to your image reference} - container_name: meshbot + lorabot: + image: ${LORABOT_IMAGE:?set LORABOT_IMAGE to your image reference} + container_name: lorabot restart: unless-stopped # MeshCore companion is on a USB serial port. Map the host device through to - # the container. Override MESHBOT_DEVICE for ttyACM0 etc. + # the container. Override LORABOT_DEVICE for ttyACM0 etc. devices: - - "${MESHBOT_DEVICE:-/dev/ttyUSB0}:${MESHBOT_DEVICE:-/dev/ttyUSB0}" + - "${LORABOT_DEVICE:-/dev/ttyUSB0}:${LORABOT_DEVICE:-/dev/ttyUSB0}" # Some serial chipsets need access to the dialout group on the host. group_add: - dialout environment: - MESHBOT_MESHCORE__SERIAL_PORT: ${MESHBOT_DEVICE:-/dev/ttyUSB0} - MESHBOT_LLM__BASE_URL: ${MESHBOT_LLM_BASE_URL:?set MESHBOT_LLM_BASE_URL} - MESHBOT_LLM__API_KEY: ${MESHBOT_LLM_API_KEY:-not-needed} - MESHBOT_LLM__MODEL: ${MESHBOT_LLM_MODEL:?set MESHBOT_LLM_MODEL} + LORABOT_MESHCORE__SERIAL_PORT: ${LORABOT_DEVICE:-/dev/ttyUSB0} + LORABOT_LLM__BASE_URL: ${LORABOT_LLM_BASE_URL:?set LORABOT_LLM_BASE_URL} + LORABOT_LLM__API_KEY: ${LORABOT_LLM_API_KEY:-not-needed} + LORABOT_LLM__MODEL: ${LORABOT_LLM_MODEL:?set LORABOT_LLM_MODEL} + ports: + # Built-in read-only web UI. Override via LORABOT_WEB_PORT. + - "${LORABOT_WEB_PORT:-8080}:8080" volumes: - - meshbot-data:/data - # Mount your config.toml at /etc/meshbot/config.toml. Anything not set in the + - lorabot-data:/data + # Mount your config.toml at /etc/lorabot/config.toml. Anything not set in the # TOML will fall back to defaults; env vars above always win. - - ./config.toml:/etc/meshbot/config.toml:ro + - ./config.toml:/etc/lorabot/config.toml:ro volumes: - meshbot-data: + lorabot-data: diff --git a/pyproject.toml b/pyproject.toml index 7bb41d2..0edb3f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ requires = ["setuptools>=68", "wheel"] build-backend = "setuptools.build_meta" [project] -name = "meshbot" +name = "lorabot" version = "0.1.0" description = "Bridge a MeshCore companion radio to an OpenAI-compatible LLM endpoint." readme = "README.md" @@ -11,6 +11,7 @@ requires-python = ">=3.11" license = { text = "MIT" } authors = [{ name = "Tobias Huttinger" }] dependencies = [ + "aiohttp>=3.9", "meshcore>=2.3", "openai>=1.40", "pydantic>=2.7", @@ -24,7 +25,7 @@ dev = [ ] [project.scripts] -meshbot = "meshbot.__main__:_cli" +lorabot = "lorabot.__main__:_cli" [tool.setuptools.packages.find] where = ["src"] diff --git a/scripts/build-and-push.sh b/scripts/build-and-push.sh index 1505bed..4e21f3c 100755 --- a/scripts/build-and-push.sh +++ b/scripts/build-and-push.sh @@ -2,25 +2,43 @@ # Build a multi-arch (linux/amd64, linux/arm64) image and push it to a registry. # # Required: -# MESHBOT_IMAGE Full image reference, e.g. registry.example.com/team/meshbot +# LORABOT_IMAGE Full image reference, e.g. registry.example.com/team/lorabot # # Optional: # PLATFORMS Comma-separated arch list (default: linux/amd64,linux/arm64) # EXTRA_TAGS Space-separated additional tags to push (e.g. "stable v0.1.0") # PUSH "1" (default) to push, "0" to build and load locally only # (note: --load only works with a single platform) -# BUILDER buildx builder name (default: meshbot-builder, auto-created) +# BUILDER buildx builder name (default: lorabot-builder, auto-created) set -euo pipefail -if [[ -z "${MESHBOT_IMAGE:-}" ]]; then - echo "error: MESHBOT_IMAGE is required (e.g. registry.example.com/team/meshbot)" >&2 +if [[ -z "${LORABOT_IMAGE:-}" ]]; then + echo "error: LORABOT_IMAGE is required (e.g. registry.example.com/team/lorabot)" >&2 + exit 1 +fi + +# Preflight: buildx must be installed (it's a separate package on Arch / some +# minimal distros). If it's missing, `docker buildx ...` is parsed by docker as +# `docker ...` with unknown flags, producing a confusing "unknown flag" error. +if ! docker buildx version >/dev/null 2>&1; then + echo "error: docker buildx is not available." >&2 + echo " Arch: sudo pacman -S docker-buildx" >&2 + echo " Debian/Ubu: sudo apt install docker-buildx-plugin" >&2 + echo " Other: https://github.com/docker/buildx#installing" >&2 + exit 1 +fi + +# Preflight: daemon reachable. +if ! docker info >/dev/null 2>&1; then + echo "error: cannot reach the docker daemon (is it running? are you in the 'docker' group?)" >&2 + echo " Arch: sudo systemctl start docker" >&2 exit 1 fi PLATFORMS="${PLATFORMS:-linux/amd64,linux/arm64}" PUSH="${PUSH:-1}" -BUILDER="${BUILDER:-meshbot-builder}" +BUILDER="${BUILDER:-lorabot-builder}" cd "$(dirname "$0")/.." @@ -34,9 +52,9 @@ else GIT_SHA="dev" fi -TAGS=(--tag "${MESHBOT_IMAGE}:latest" --tag "${MESHBOT_IMAGE}:${GIT_SHA}") +TAGS=(--tag "${LORABOT_IMAGE}:latest" --tag "${LORABOT_IMAGE}:${GIT_SHA}") for t in ${EXTRA_TAGS:-}; do - TAGS+=(--tag "${MESHBOT_IMAGE}:${t}") + TAGS+=(--tag "${LORABOT_IMAGE}:${t}") done # Make sure a buildx builder exists. Reuse if it's already there. @@ -51,7 +69,7 @@ docker buildx inspect --bootstrap >/dev/null OUTPUT_FLAG=() if [[ "${PUSH}" == "1" ]]; then OUTPUT_FLAG=(--push) - echo ">>> building & pushing ${MESHBOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}" + echo ">>> building & pushing ${LORABOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}" else # --load only works with a single platform; warn if user requested multi. if [[ "${PLATFORMS}" == *,* ]]; then @@ -59,7 +77,7 @@ else exit 1 fi OUTPUT_FLAG=(--load) - echo ">>> building ${MESHBOT_IMAGE} for ${PLATFORMS} (local load, no push)" + echo ">>> building ${LORABOT_IMAGE} for ${PLATFORMS} (local load, no push)" fi docker buildx build \ diff --git a/src/lorabot/__init__.py b/src/lorabot/__init__.py new file mode 100644 index 0000000..fedeee9 --- /dev/null +++ b/src/lorabot/__init__.py @@ -0,0 +1,3 @@ +"""lorabot — MeshCore ↔ LLM bridge.""" + +__version__ = "0.1.0" diff --git a/src/meshbot/__main__.py b/src/lorabot/__main__.py similarity index 85% rename from src/meshbot/__main__.py rename to src/lorabot/__main__.py index d7e977e..4a74da0 100644 --- a/src/meshbot/__main__.py +++ b/src/lorabot/__main__.py @@ -1,4 +1,4 @@ -"""Entry point: ``python -m meshbot`` and the ``meshbot`` console script.""" +"""Entry point: ``python -m lorabot`` and the ``lorabot`` console script.""" from __future__ import annotations diff --git a/src/lorabot/bot.py b/src/lorabot/bot.py new file mode 100644 index 0000000..2b4bd22 --- /dev/null +++ b/src/lorabot/bot.py @@ -0,0 +1,230 @@ +"""Main run loop: connect to the MeshCore device, route DMs through the LLM, reply.""" + +from __future__ import annotations + +import asyncio +import logging +from collections import defaultdict +from datetime import datetime, timezone + +from meshcore import EventType, MeshCore + +from . import db, web +from .config import Settings +from .llm import LLMClient +from .messages import split_to_bytes, trim_to_bytes + +log = logging.getLogger("lorabot") + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +async def run() -> None: + cfg = Settings() + + db_conn = db.connect(cfg.storage.sqlite_path) + state = web.AppState( + db_conn, + model=cfg.llm.model, + serial_port=cfg.meshcore.serial_port, + loop=asyncio.get_running_loop(), + ) + + llm = LLMClient( + base_url=cfg.llm.base_url, + api_key=cfg.llm.api_key, + model=cfg.llm.model, + system_prompt=cfg.llm.system_prompt, + temperature=cfg.llm.temperature, + timeout=cfg.llm.request_timeout_seconds, + ) + + web_task: asyncio.Task | None = None + if cfg.web.enabled: + web_task = asyncio.create_task(web.serve(state, cfg.web.host, cfg.web.port)) + + log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate) + try: + mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate) + # Default is False: the lib only marks contacts dirty on ADVERTISEMENT and + # waits for the caller to re-pull. Turn it on so a fresh advert is reflected + # in the local cache before the peer's first DM lands. + mc.auto_update_contacts = True + await mc.ensure_contacts() + except BaseException: + state.set_connected(False) + if web_task is not None: + web_task.cancel() + try: + await web_task + except (asyncio.CancelledError, Exception): + pass + await llm.aclose() + db_conn.close() + raise + state.set_connected(True, node_name=_self_name(mc)) + + async def _advertise() -> bool: + result = await mc.commands.send_advert(flood=cfg.advertise.flood) + ok = result.type != EventType.ERROR + if ok: + log.info("advert sent") + else: + log.warning("advert failed: %s", result.payload) + return ok + + state.set_advertise_callback(_advertise) + + advert_task: asyncio.Task | None = None + if cfg.advertise.enabled: + advert_task = asyncio.create_task( + _advert_loop(state, cfg.advertise.interval_seconds, cfg.advertise.at_startup) + ) + + # One lock per sender so a burst of messages from the same peer is processed + # serially while different peers stay independent. + locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + + contacts_lock = asyncio.Lock() + + async def _resolve_contact(prefix: str): + """Look up a contact by pubkey prefix. Re-pulls contacts from the device on miss.""" + contact = mc.get_contact_by_key_prefix(prefix) + if contact is not None: + return contact + async with contacts_lock: + contact = mc.get_contact_by_key_prefix(prefix) + if contact is not None: + return contact + try: + await mc.commands.get_contacts() + except Exception: + log.exception("get_contacts refresh failed") + return None + return mc.get_contact_by_key_prefix(prefix) + + async def on_dm(event) -> None: + data = event.payload or {} + prefix = data.get("pubkey_prefix") + text = (data.get("text") or "").strip() + if not prefix or not text: + return + + contact = await _resolve_contact(prefix) + if contact is None: + log.info("ignoring DM from unknown sender %s (no contact after refresh)", prefix) + return + + public_key = contact["public_key"] + contact_name = contact.get("adv_name", "") + log.info("DM from %s (%s): %s", contact_name, public_key[:12], text) + + async with locks[public_key]: + db.upsert_conversation(db_conn, public_key, contact_name) + db.add_message(db_conn, public_key, "user", text) + state.publish("message", { + "public_key": public_key, + "contact_name": contact_name, + "role": "user", + "content": text, + "created_at": _now_iso(), + }) + + if text.strip().lower() == "/clear": + reply = "history cleared." + db.add_message(db_conn, public_key, "assistant", reply) + db.clear_history(db_conn, public_key) + state.publish("message", { + "public_key": public_key, + "contact_name": contact_name, + "role": "assistant", + "content": reply, + "created_at": _now_iso(), + }) + outgoing = trim_to_bytes(reply, cfg.message.max_bytes) + log.info("/clear from %s — context reset", public_key[:12]) + result = await mc.commands.send_msg(contact, outgoing) + if result.type == EventType.ERROR: + log.error("send_msg failed for %s: %s", public_key[:12], result.payload) + return + + history = db.get_history(db_conn, public_key) + + try: + reply = await llm.reply(history) + except Exception: + log.exception("LLM call failed for %s", public_key[:12]) + return + + chunks = split_to_bytes(reply, cfg.message.max_bytes, max_chunks=2) + delivered = "".join(chunks) + db.add_message(db_conn, public_key, "assistant", delivered) + state.publish("message", { + "public_key": public_key, + "contact_name": contact_name, + "role": "assistant", + "content": delivered, + "created_at": _now_iso(), + }) + dropped = len(reply.encode("utf-8")) - len(delivered.encode("utf-8")) + if dropped > 0: + log.info("reply to %s split into %d chunks, dropped %d trailing bytes", + public_key[:12], len(chunks), dropped) + + for i, chunk in enumerate(chunks, 1): + log.info("reply to %s (%d/%d, %d bytes): %s", + public_key[:12], i, len(chunks), len(chunk.encode("utf-8")), chunk) + result = await mc.commands.send_msg(contact, chunk) + if result.type == EventType.ERROR: + log.error("send_msg failed for %s chunk %d/%d: %s", + public_key[:12], i, len(chunks), result.payload) + break + + sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm) + await mc.start_auto_message_fetching() + log.info("lorabot listening on %s", cfg.meshcore.serial_port) + + try: + await asyncio.Event().wait() + finally: + state.set_connected(False) + state.set_advertise_callback(None) + mc.unsubscribe(sub) + await mc.stop_auto_message_fetching() + await mc.disconnect() + await llm.aclose() + for task in (advert_task, web_task): + if task is not None: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + db_conn.close() + + +async def _advert_loop(state: web.AppState, interval: int, at_startup: bool) -> None: + """Periodically trigger adverts. ``interval == 0`` disables auto-repeat.""" + if at_startup: + await state.trigger_advertise() + if interval <= 0: + return + while True: + await asyncio.sleep(interval) + await state.trigger_advertise() + + +def _self_name(mc: MeshCore) -> str | None: + """Best-effort lookup of the local node's advertised name. Never raises.""" + for attr in ("self_info", "node_info", "device_info"): + try: + info = getattr(mc, attr, None) + except Exception: + continue + if isinstance(info, dict): + name = info.get("adv_name") or info.get("name") + if name: + return str(name) + return None diff --git a/src/meshbot/config.py b/src/lorabot/config.py similarity index 71% rename from src/meshbot/config.py rename to src/lorabot/config.py index 408c3e4..2d607eb 100644 --- a/src/meshbot/config.py +++ b/src/lorabot/config.py @@ -32,15 +32,31 @@ class LLMCfg(BaseModel): class StorageCfg(BaseModel): - sqlite_path: Path = Path("data/meshbot.db") + sqlite_path: Path = Path("data/lorabot.db") class MessageCfg(BaseModel): max_bytes: int = Field(default=184, gt=0) +class WebCfg(BaseModel): + enabled: bool = True + host: str = "0.0.0.0" + port: int = Field(default=8080, gt=0, lt=65536) + + +class AdvertiseCfg(BaseModel): + enabled: bool = True + # Seconds between automatic adverts. 0 = manual only (button still works). + interval_seconds: int = Field(default=3600, ge=0) + # Send one advert as soon as the device is connected. + at_startup: bool = True + # Flood the mesh (multi-hop) instead of a zero-hop advert. + flood: bool = False + + def _toml_path() -> Path: - return Path(os.environ.get("MESHBOT_CONFIG", "config.toml")) + return Path(os.environ.get("LORABOT_CONFIG", "config.toml")) class Settings(BaseSettings): @@ -48,9 +64,11 @@ class Settings(BaseSettings): llm: LLMCfg storage: StorageCfg = StorageCfg() message: MessageCfg = MessageCfg() + web: WebCfg = WebCfg() + advertise: AdvertiseCfg = AdvertiseCfg() model_config = SettingsConfigDict( - env_prefix="MESHBOT_", + env_prefix="LORABOT_", env_nested_delimiter="__", toml_file=_toml_path(), extra="ignore", diff --git a/src/meshbot/db.py b/src/lorabot/db.py similarity index 55% rename from src/meshbot/db.py rename to src/lorabot/db.py index 33b0c36..d3a23b2 100644 --- a/src/meshbot/db.py +++ b/src/lorabot/db.py @@ -7,10 +7,11 @@ from pathlib import Path SCHEMA = """ CREATE TABLE IF NOT EXISTS conversations ( - public_key TEXT PRIMARY KEY, - contact_name TEXT, - created_at TEXT NOT NULL DEFAULT (datetime('now')), - updated_at TEXT NOT NULL DEFAULT (datetime('now')) + public_key TEXT PRIMARY KEY, + contact_name TEXT, + cleared_at_id INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS messages ( @@ -35,9 +36,17 @@ def connect(path: str | Path) -> sqlite3.Connection: conn.execute("PRAGMA journal_mode = WAL;") conn.execute("PRAGMA foreign_keys = ON;") conn.executescript(SCHEMA) + _migrate(conn) return conn +def _migrate(conn: sqlite3.Connection) -> None: + """Apply additive migrations for DBs created before later columns existed.""" + cols = {row["name"] for row in conn.execute("PRAGMA table_info(conversations)")} + if "cleared_at_id" not in cols: + conn.execute("ALTER TABLE conversations ADD COLUMN cleared_at_id INTEGER NOT NULL DEFAULT 0") + + def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None: conn.execute( """ @@ -59,9 +68,39 @@ def add_message(conn: sqlite3.Connection, public_key: str, role: str, content: s def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]: - """Return the conversation as OpenAI chat messages, oldest first.""" + """Return the conversation as OpenAI chat messages, oldest first. + + Messages with ``id <= conversations.cleared_at_id`` are excluded so a peer can + reset their LLM context with ``/clear`` without losing the audit trail. + """ rows = conn.execute( - "SELECT role, content FROM messages WHERE public_key = ? ORDER BY id ASC", + """ + SELECT m.role, m.content + FROM messages m + JOIN conversations c ON c.public_key = m.public_key + WHERE m.public_key = ? AND m.id > c.cleared_at_id + ORDER BY m.id ASC + """, (public_key,), ).fetchall() return [{"role": row["role"], "content": row["content"]} for row in rows] + + +def clear_history(conn: sqlite3.Connection, public_key: str) -> None: + """Bump the per-conversation watermark to the current max message id. + + Old messages stay in the table for the web UI; future ``get_history`` calls + skip them. + """ + conn.execute( + """ + UPDATE conversations + SET cleared_at_id = COALESCE( + (SELECT MAX(id) FROM messages WHERE public_key = ?), + 0 + ), + updated_at = datetime('now') + WHERE public_key = ? + """, + (public_key, public_key), + ) diff --git a/src/meshbot/llm.py b/src/lorabot/llm.py similarity index 100% rename from src/meshbot/llm.py rename to src/lorabot/llm.py diff --git a/src/lorabot/messages.py b/src/lorabot/messages.py new file mode 100644 index 0000000..84f7cea --- /dev/null +++ b/src/lorabot/messages.py @@ -0,0 +1,52 @@ +"""Helpers for shaping outgoing mesh messages.""" + +from __future__ import annotations + + +def trim_to_bytes(text: str, max_bytes: int) -> str: + """Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes. + + Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit + invalid UTF-8 to the radio. + """ + if max_bytes <= 0: + return "" + encoded = text.encode("utf-8") + if len(encoded) <= max_bytes: + return text + cut = encoded[:max_bytes] + # Continuation bytes start with bits 10xxxxxx; rewind past them. + while cut and (cut[-1] & 0xC0) == 0x80: + cut = cut[:-1] + return cut.decode("utf-8", errors="ignore") + + +def split_to_bytes(text: str, max_bytes: int, max_chunks: int = 2) -> list[str]: + """Split ``text`` into up to ``max_chunks`` UTF-8-safe chunks of ``max_bytes`` each. + + Anything past ``max_chunks * max_bytes`` is dropped. Splits never land inside a + multi-byte UTF-8 sequence on either side of the boundary. + """ + if max_bytes <= 0 or max_chunks <= 0 or not text: + return [] + encoded = text.encode("utf-8") + n = len(encoded) + chunks: list[str] = [] + pos = 0 + for _ in range(max_chunks): + if pos >= n: + break + end = min(pos + max_bytes, n) + if end < n: + # Rewind past trailing UTF-8 continuation bytes (10xxxxxx). + while end > pos and (encoded[end - 1] & 0xC0) == 0x80: + end -= 1 + # And past a dangling leader byte (11xxxxxx) whose continuations + # would fall outside the budget. + if end > pos and (encoded[end - 1] & 0xC0) == 0xC0: + end -= 1 + if end == pos: + break + chunks.append(encoded[pos:end].decode("utf-8")) + pos = end + return chunks diff --git a/src/lorabot/web.py b/src/lorabot/web.py new file mode 100644 index 0000000..9f3c016 --- /dev/null +++ b/src/lorabot/web.py @@ -0,0 +1,634 @@ +"""Lightweight aiohttp web UI for browsing stored conversations and live status. + +Endpoints: + GET / single-page UI + GET /api/status device + model + uptime snapshot + GET /api/conversations list of conversations with message counts + GET /api/conversations/{pk} conversation header + ordered messages + GET /api/events Server-Sent Events stream (status, message) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import sqlite3 +from collections.abc import Awaitable, Callable +from datetime import datetime, timezone + +from aiohttp import web + +AdvertCallback = Callable[[], Awaitable[bool]] + +log = logging.getLogger("lorabot.web") + + +class AppState: + """Shared state between the bot loop and the web server.""" + + def __init__( + self, + db_conn: sqlite3.Connection, + *, + model: str, + serial_port: str, + loop: asyncio.AbstractEventLoop | None = None, + ) -> None: + self.db = db_conn + self.model = model + self.serial_port = serial_port + self.connected = False + self.connected_since: datetime | None = None + self.node_name: str | None = None + self.last_advert_at: datetime | None = None + self.last_advert_ok: bool | None = None + self._subscribers: set[asyncio.Queue] = set() + self._loop = loop + self._advertise: AdvertCallback | None = None + self._advert_lock = asyncio.Lock() + + def status(self) -> dict: + return { + "connected": self.connected, + "connected_since": _iso(self.connected_since), + "node_name": self.node_name, + "model": self.model, + "serial_port": self.serial_port, + "last_advert_at": _iso(self.last_advert_at), + "last_advert_ok": self.last_advert_ok, + "advertise_available": self._advertise is not None, + } + + def set_advertise_callback(self, cb: AdvertCallback | None) -> None: + self._advertise = cb + self.publish("status", self.status()) + + async def trigger_advertise(self) -> bool: + """Run the injected advertise callback. Serialized; never raises.""" + if self._advertise is None: + return False + async with self._advert_lock: + try: + ok = bool(await self._advertise()) + except Exception: + log.exception("advertise callback raised") + ok = False + self.last_advert_at = datetime.now(timezone.utc) + self.last_advert_ok = ok + self.publish("status", self.status()) + return ok + + def set_connected(self, ok: bool, *, node_name: str | None = None) -> None: + if ok and not self.connected: + self.connected_since = datetime.now(timezone.utc) + if not ok: + self.connected_since = None + self.connected = ok + if node_name is not None: + self.node_name = node_name + self.publish("status", self.status()) + + def publish(self, event: str, data: dict) -> None: + """Thread-safe: callable from any thread; dispatch hops to the loop.""" + loop = self._loop + if loop is None: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return # no loop, nothing to deliver to + self._loop = loop + if loop.is_closed(): + return + try: + running = asyncio.get_running_loop() + except RuntimeError: + running = None + if running is loop: + self._dispatch(event, data) + else: + loop.call_soon_threadsafe(self._dispatch, event, data) + + def _dispatch(self, event: str, data: dict) -> None: + for q in list(self._subscribers): + try: + q.put_nowait((event, data)) + except asyncio.QueueFull: + log.warning("dropping SSE event %s — subscriber backed up", event) + + def subscribe(self) -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=128) + self._subscribers.add(q) + return q + + def unsubscribe(self, q: asyncio.Queue) -> None: + self._subscribers.discard(q) + + +def _iso(dt: datetime | None) -> str | None: + return dt.isoformat() if dt is not None else None + + +def _list_conversations(db: sqlite3.Connection) -> list[dict]: + rows = db.execute( + """ + SELECT c.public_key, + c.contact_name, + c.updated_at, + (SELECT COUNT(*) FROM messages m WHERE m.public_key = c.public_key) AS message_count, + (SELECT content FROM messages m WHERE m.public_key = c.public_key + ORDER BY m.id DESC LIMIT 1) AS last_message + FROM conversations c + ORDER BY c.updated_at DESC + """ + ).fetchall() + return [dict(r) for r in rows] + + +def _get_conversation(db: sqlite3.Connection, public_key: str) -> dict | None: + conv = db.execute( + "SELECT public_key, contact_name, cleared_at_id, created_at, updated_at " + "FROM conversations WHERE public_key = ?", + (public_key,), + ).fetchone() + if conv is None: + return None + msgs = db.execute( + "SELECT id, role, content, created_at FROM messages " + "WHERE public_key = ? ORDER BY id ASC", + (public_key,), + ).fetchall() + return {"conversation": dict(conv), "messages": [dict(m) for m in msgs]} + + +async def _index(_req: web.Request) -> web.Response: + return web.Response(text=INDEX_HTML, content_type="text/html") + + +async def _api_status(req: web.Request) -> web.Response: + return web.json_response(_state(req).status()) + + +async def _api_conversations(req: web.Request) -> web.Response: + return web.json_response(_list_conversations(_state(req).db)) + + +async def _api_advertise(req: web.Request) -> web.Response: + state = _state(req) + if state._advertise is None: + raise web.HTTPServiceUnavailable(text="device not connected") + ok = await state.trigger_advertise() + return web.json_response({ + "ok": ok, + "last_advert_at": _iso(state.last_advert_at), + }) + + +async def _api_conversation(req: web.Request) -> web.Response: + pk = req.match_info["pk"] + data = _get_conversation(_state(req).db, pk) + if data is None: + raise web.HTTPNotFound(text="unknown public_key") + return web.json_response(data) + + +async def _api_events(req: web.Request) -> web.StreamResponse: + state = _state(req) + resp = web.StreamResponse( + status=200, + headers={ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + }, + ) + await resp.prepare(req) + await _sse_send(resp, "status", state.status()) + + q = state.subscribe() + try: + while True: + try: + event, data = await asyncio.wait_for(q.get(), timeout=15.0) + except asyncio.TimeoutError: + await resp.write(b": keepalive\n\n") + continue + await _sse_send(resp, event, data) + except (asyncio.CancelledError, ConnectionError): + pass + finally: + state.unsubscribe(q) + return resp + + +async def _sse_send(resp: web.StreamResponse, event: str, data: dict) -> None: + await resp.write(f"event: {event}\ndata: {json.dumps(data)}\n\n".encode()) + + +def _state(req: web.Request) -> AppState: + return req.app["state"] + + +def build_app(state: AppState) -> web.Application: + app = web.Application() + app["state"] = state + app.router.add_get("/", _index) + app.router.add_get("/api/status", _api_status) + app.router.add_get("/api/conversations", _api_conversations) + app.router.add_get("/api/conversations/{pk}", _api_conversation) + app.router.add_get("/api/events", _api_events) + app.router.add_post("/api/advertise", _api_advertise) + return app + + +async def serve(state: AppState, host: str, port: int) -> None: + """Run the web server until cancelled.""" + state._loop = asyncio.get_running_loop() + app = build_app(state) + runner = web.AppRunner(app, access_log=None) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + log.info("web UI on http://%s:%d", host, port) + try: + await asyncio.Event().wait() + finally: + await runner.cleanup() + + +INDEX_HTML = r""" + + + + +lorabot + + + +
+ lorabot + + + port: + model: + node: + advert: + + +
+
+ +
+
— no conversation selected —
+
select a conversation on the left
+
+
+
+ stream: connecting + +
+ + + +""" diff --git a/src/meshbot/__init__.py b/src/meshbot/__init__.py deleted file mode 100644 index a5486bb..0000000 --- a/src/meshbot/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""meshbot — MeshCore ↔ LLM bridge.""" - -__version__ = "0.1.0" diff --git a/src/meshbot/bot.py b/src/meshbot/bot.py deleted file mode 100644 index 30ea231..0000000 --- a/src/meshbot/bot.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Main run loop: connect to the MeshCore device, route DMs through the LLM, reply.""" - -from __future__ import annotations - -import asyncio -import logging -from collections import defaultdict - -from meshcore import EventType, MeshCore - -from . import db -from .config import Settings -from .llm import LLMClient -from .messages import trim_to_bytes - -log = logging.getLogger("meshbot") - - -async def run() -> None: - cfg = Settings() - - db_conn = db.connect(cfg.storage.sqlite_path) - llm = LLMClient( - base_url=cfg.llm.base_url, - api_key=cfg.llm.api_key, - model=cfg.llm.model, - system_prompt=cfg.llm.system_prompt, - temperature=cfg.llm.temperature, - timeout=cfg.llm.request_timeout_seconds, - ) - - log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate) - mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate) - await mc.ensure_contacts() - - # One lock per sender so a burst of messages from the same peer is processed - # serially while different peers stay independent. - locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) - - async def on_dm(event) -> None: - data = event.payload or {} - prefix = data.get("pubkey_prefix") - text = (data.get("text") or "").strip() - if not prefix or not text: - return - - contact = mc.get_contact_by_key_prefix(prefix) - if contact is None: - log.info("ignoring DM from unknown sender %s", prefix) - return - - public_key = contact["public_key"] - contact_name = contact.get("adv_name", "") - log.info("DM from %s (%s): %s", contact_name, public_key[:12], text) - - async with locks[public_key]: - db.upsert_conversation(db_conn, public_key, contact_name) - db.add_message(db_conn, public_key, "user", text) - history = db.get_history(db_conn, public_key) - - try: - reply = await llm.reply(history) - except Exception: - log.exception("LLM call failed for %s", public_key[:12]) - return - - db.add_message(db_conn, public_key, "assistant", reply) - outgoing = trim_to_bytes(reply, cfg.message.max_bytes) - log.info("reply to %s (%d bytes): %s", public_key[:12], len(outgoing.encode("utf-8")), outgoing) - - result = await mc.commands.send_msg(contact, outgoing) - if result.type == EventType.ERROR: - log.error("send_msg failed for %s: %s", public_key[:12], result.payload) - - sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm) - await mc.start_auto_message_fetching() - log.info("meshbot listening on %s", cfg.meshcore.serial_port) - - try: - await asyncio.Event().wait() - finally: - mc.unsubscribe(sub) - await mc.stop_auto_message_fetching() - await mc.disconnect() - await llm.aclose() - db_conn.close() diff --git a/src/meshbot/messages.py b/src/meshbot/messages.py deleted file mode 100644 index 196b8be..0000000 --- a/src/meshbot/messages.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Helpers for shaping outgoing mesh messages.""" - -from __future__ import annotations - - -def trim_to_bytes(text: str, max_bytes: int) -> str: - """Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes. - - Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit - invalid UTF-8 to the radio. - """ - if max_bytes <= 0: - return "" - encoded = text.encode("utf-8") - if len(encoded) <= max_bytes: - return text - cut = encoded[:max_bytes] - # Continuation bytes start with bits 10xxxxxx; rewind past them. - while cut and (cut[-1] & 0xC0) == 0x80: - cut = cut[:-1] - return cut.decode("utf-8", errors="ignore") diff --git a/tests/test_messages.py b/tests/test_messages.py index 715f4d1..dc63547 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -1,4 +1,4 @@ -from meshbot.messages import trim_to_bytes +from lorabot.messages import split_to_bytes, trim_to_bytes def test_short_ascii_passthrough(): @@ -38,3 +38,61 @@ def test_zero_or_negative_max_bytes(): def test_empty_input(): assert trim_to_bytes("", 184) == "" + + +# split_to_bytes + + +def test_split_short_input_single_chunk(): + assert split_to_bytes("hello", 184) == ["hello"] + + +def test_split_long_input_two_chunks_drops_rest(): + s = "x" * 500 + chunks = split_to_bytes(s, 180, max_chunks=2) + assert chunks == ["x" * 180, "x" * 180] + assert sum(len(c.encode("utf-8")) for c in chunks) == 360 + + +def test_split_exact_two_chunks_no_third(): + s = "x" * 360 + chunks = split_to_bytes(s, 180, max_chunks=2) + assert chunks == ["x" * 180, "x" * 180] + + +def test_split_does_not_break_multibyte(): + # 4 emoji × 4 bytes = 16 bytes total. Budget 5 bytes/chunk → 1 emoji per chunk. + chunks = split_to_bytes("🎉🎉🎉🎉", 5, max_chunks=2) + assert chunks == ["🎉", "🎉"] + for c in chunks: + assert len(c.encode("utf-8")) == 4 + + +def test_split_two_byte_char_at_boundary(): + # "abäcd" → bytes: a b ä(2) c d = 6 bytes. Budget 3/chunk: + # chunk1 must end at "ab" (3rd byte is start of ä, can't include without continuation). + # chunk2: "äc" = 3 bytes. + chunks = split_to_bytes("abäcd", 3, max_chunks=2) + assert chunks[0] == "ab" + assert chunks[1] == "äc" + # "d" is dropped (over the budget). + + +def test_split_empty_input(): + assert split_to_bytes("", 184) == [] + + +def test_split_zero_max_bytes(): + assert split_to_bytes("hi", 0) == [] + + +def test_split_zero_chunks(): + assert split_to_bytes("hi", 184, max_chunks=0) == [] + + +def test_split_concat_is_prefix_of_input(): + # The delivered text must always be a prefix of the original (no rearrangement). + src = "Hello world! 🎉 This is a longer message that should be split." + chunks = split_to_bytes(src, 20, max_chunks=2) + delivered = "".join(chunks) + assert src.startswith(delivered)