Compare commits

...

6 Commits

29 changed files with 2483 additions and 369 deletions
+8 -8
View File
@@ -19,12 +19,12 @@ FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \ ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \ PYTHONUNBUFFERED=1 \
MESHBOT_CONFIG=/etc/meshbot/config.toml \ LORABOT_CONFIG=/etc/lorabot/config.toml \
MESHBOT_STORAGE__SQLITE_PATH=/data/meshbot.db LORABOT_STORAGE__SQLITE_PATH=/data/lorabot.db
RUN useradd --system --home /app --shell /usr/sbin/nologin meshbot \ RUN useradd --system --home /app --shell /usr/sbin/nologin lorabot \
&& mkdir -p /data /etc/meshbot \ && mkdir -p /data /etc/lorabot \
&& chown meshbot:meshbot /data && chown lorabot:lorabot /data
WORKDIR /app WORKDIR /app
@@ -32,8 +32,8 @@ COPY --from=builder /wheels/*.whl /tmp/wheels/
RUN pip install --no-cache-dir /tmp/wheels/*.whl \ RUN pip install --no-cache-dir /tmp/wheels/*.whl \
&& rm -rf /tmp/wheels && rm -rf /tmp/wheels
USER meshbot USER lorabot
VOLUME ["/data", "/etc/meshbot"] VOLUME ["/data", "/etc/lorabot"]
ENTRYPOINT ["meshbot"] ENTRYPOINT ["lorabot"]
+18 -18
View File
@@ -1,4 +1,4 @@
# meshbot # lorabot
Bridges a [MeshCore](https://meshcore.io) companion radio to any OpenAI-compatible LLM endpoint Bridges a [MeshCore](https://meshcore.io) companion radio to any OpenAI-compatible LLM endpoint
(e.g. `llama-server`, vLLM, Ollama). Listens for direct messages on the device, runs each (e.g. `llama-server`, vLLM, Ollama). Listens for direct messages on the device, runs each
@@ -14,19 +14,19 @@ pip install -e .
cp config.example.toml config.toml cp config.example.toml config.toml
# edit serial_port and [llm] in config.toml # edit serial_port and [llm] in config.toml
python -m meshbot python -m lorabot
``` ```
Config file path defaults to `./config.toml` and can be overridden with `MESHBOT_CONFIG`. Config file path defaults to `./config.toml` and can be overridden with `LORABOT_CONFIG`.
Any field can be overridden via env vars, e.g. `MESHBOT_LLM__API_KEY=sk-...`. Any field can be overridden via env vars, e.g. `LORABOT_LLM__API_KEY=sk-...`.
## Layout ## Layout
- `src/meshbot/bot.py` — connect, subscribe to `CONTACT_MSG_RECV`, dispatch each DM. - `src/lorabot/bot.py` — connect, subscribe to `CONTACT_MSG_RECV`, dispatch each DM.
- `src/meshbot/db.py` — SQLite schema and per-conversation repo functions. - `src/lorabot/db.py` — SQLite schema and per-conversation repo functions.
- `src/meshbot/llm.py``AsyncOpenAI` wrapper. - `src/lorabot/llm.py``AsyncOpenAI` wrapper.
- `src/meshbot/messages.py` — UTF-8-safe byte-length trimming. - `src/lorabot/messages.py` — UTF-8-safe byte-length trimming.
- `src/meshbot/config.py` — TOML + env-var settings (pydantic-settings). - `src/lorabot/config.py` — TOML + env-var settings (pydantic-settings).
## Docker ## Docker
@@ -34,23 +34,23 @@ Build and push a multi-arch image (`linux/amd64` + `linux/arm64`):
```sh ```sh
docker login registry.example.com # once docker login registry.example.com # once
export MESHBOT_IMAGE=registry.example.com/team/meshbot export LORABOT_IMAGE=registry.example.com/team/lorabot
./scripts/build-and-push.sh # tags: latest + <git sha> ./scripts/build-and-push.sh # tags: latest + <git sha>
EXTRA_TAGS="v0.1.0" ./scripts/build-and-push.sh # add explicit version EXTRA_TAGS="v0.1.0" ./scripts/build-and-push.sh # add explicit version
PUSH=0 PLATFORMS=linux/amd64 ./scripts/build-and-push.sh # local load only PUSH=0 PLATFORMS=linux/amd64 ./scripts/build-and-push.sh # local load only
``` ```
Run via compose (set `MESHBOT_IMAGE`, `MESHBOT_LLM_BASE_URL`, `MESHBOT_LLM_MODEL`, Run via compose (set `LORABOT_IMAGE`, `LORABOT_LLM_BASE_URL`, `LORABOT_LLM_MODEL`,
optionally `MESHBOT_DEVICE`): optionally `LORABOT_DEVICE`):
```sh ```sh
export MESHBOT_IMAGE=registry.example.com/team/meshbot:latest export LORABOT_IMAGE=registry.example.com/team/lorabot:latest
export MESHBOT_LLM_BASE_URL=http://llama:8080/v1 export LORABOT_LLM_BASE_URL=http://llama:8080/v1
export MESHBOT_LLM_MODEL=llama-3.1-8b-instruct export LORABOT_LLM_MODEL=llama-3.1-8b-instruct
export MESHBOT_DEVICE=/dev/ttyUSB0 export LORABOT_DEVICE=/dev/ttyUSB0
docker compose up -d docker compose up -d
``` ```
The container expects `config.toml` mounted at `/etc/meshbot/config.toml` and The container expects `config.toml` mounted at `/etc/lorabot/config.toml` and
persists SQLite to a named volume at `/data`. Any field can still be overridden persists SQLite to a named volume at `/data`. Any field can still be overridden
via `MESHBOT_<SECTION>__<KEY>` env vars. via `LORABOT_<SECTION>__<KEY>` env vars.
+43 -6
View File
@@ -1,8 +1,13 @@
# Copy this file to `config.toml` and edit. The path can be overridden with # Copy this file to `config.toml` and edit. The path can be overridden with
# the MESHBOT_CONFIG environment variable. Any field can be overridden with # the LORABOT_CONFIG environment variable. Any field can be overridden with
# environment variables of the form MESHBOT_<SECTION>__<KEY>, e.g. # environment variables of the form LORABOT_<SECTION>__<KEY>, e.g.
# MESHBOT_LLM__BASE_URL=http://llama:8080/v1 # LORABOT_LLM__BASE_URL=http://llama:8080/v1
# MESHBOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0 # LORABOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0
[logging]
# DEBUG | INFO | WARNING | ERROR | CRITICAL (case-insensitive).
# DEBUG adds per-iteration LLM request logs and Tavily request details.
level = "INFO"
[meshcore] [meshcore]
serial_port = "/dev/ttyUSB0" serial_port = "/dev/ttyUSB0"
@@ -11,15 +16,47 @@ baud_rate = 115200
[llm] [llm]
base_url = "http://localhost:8080/v1" base_url = "http://localhost:8080/v1"
api_key = "not-needed" api_key = "not-needed"
model = "llama-3.1-8b-instruct" model = "gemma-4-E4B"
system_prompt = "You are a concise assistant on a low-bandwidth mesh radio. Replies must be brief — under 180 bytes." system_prompt = "You are a concise assistant on a low-bandwidth mesh radio. Replies must be brief — under 180 bytes."
temperature = 0.7 temperature = 0.7
request_timeout_seconds = 60 request_timeout_seconds = 60
[storage] [storage]
sqlite_path = "data/meshbot.db" sqlite_path = "data/lorabot.db"
[message] [message]
# MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame # MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame
# headers further constrain the usable payload on your device. # headers further constrain the usable payload on your device.
max_bytes = 184 max_bytes = 184
# Per-attempt ACK wait. 0 = trust the device's path-aware suggestion (recommended).
# Set a positive value only if you need to override that suggestion.
ack_timeout_seconds = 0
# How many times to retry a chunk after failure (0 = no retries). Total attempts
# = send_retries + 1. With send_retries >= 2 the third attempt onwards is sent as
# a flood broadcast (multi-hop) instead of direct.
send_retries = 1
[web]
# Built-in read-only web UI: stored conversations + live status.
enabled = true
host = "0.0.0.0"
port = 8080
[advertise]
# MeshCore companions don't advertise on their own. Lorabot can do it for them
# at a fixed cadence so the node stays visible on the mesh. Set
# interval_seconds = 0 to disable auto-advert (the web UI button still works).
enabled = true
interval_seconds = 3600
at_startup = true
# Flood = multi-hop advert across the mesh. False = zero-hop (neighbors only).
flood = true
# LLM tool calling. The weather tool (Open-Meteo, no key) is always on. Tools
# in this section are optional and only registered when configured. Requires a
# tool-capable model on the LLM server (Llama 3.1, Qwen, Hermes, …); models
# without tool support will simply ignore them.
[tools.tavily]
# Web search + page extraction via https://tavily.com (free tier available).
# Leave empty to disable both web_search and fetch_url tools.
api_key = ""
+19 -13
View File
@@ -1,25 +1,31 @@
services: services:
meshbot: lorabot:
image: ${MESHBOT_IMAGE:?set MESHBOT_IMAGE to your image reference} image: ${LORABOT_IMAGE:?set LORABOT_IMAGE to your image reference}
container_name: meshbot container_name: lorabot
restart: unless-stopped restart: unless-stopped
# MeshCore companion is on a USB serial port. Map the host device through to # MeshCore companion is on a USB serial port. Map the host device through to
# the container. Override MESHBOT_DEVICE for ttyACM0 etc. # the container. Override LORABOT_DEVICE for ttyACM0 etc.
devices: devices:
- "${MESHBOT_DEVICE:-/dev/ttyUSB0}:${MESHBOT_DEVICE:-/dev/ttyUSB0}" - "${LORABOT_DEVICE:-/dev/ttyUSB0}:${LORABOT_DEVICE:-/dev/ttyUSB0}"
# Some serial chipsets need access to the dialout group on the host. # Some serial chipsets need access to the dialout group on the host.
group_add: group_add:
- dialout - dialout
environment: environment:
MESHBOT_MESHCORE__SERIAL_PORT: ${MESHBOT_DEVICE:-/dev/ttyUSB0} LORABOT_MESHCORE__SERIAL_PORT: ${LORABOT_DEVICE:-/dev/ttyUSB0}
MESHBOT_LLM__BASE_URL: ${MESHBOT_LLM_BASE_URL:?set MESHBOT_LLM_BASE_URL} LORABOT_LLM__BASE_URL: ${LORABOT_LLM_BASE_URL:?set LORABOT_LLM_BASE_URL}
MESHBOT_LLM__API_KEY: ${MESHBOT_LLM_API_KEY:-not-needed} LORABOT_LLM__API_KEY: ${LORABOT_LLM_API_KEY:-not-needed}
MESHBOT_LLM__MODEL: ${MESHBOT_LLM_MODEL:?set MESHBOT_LLM_MODEL} LORABOT_LLM__MODEL: ${LORABOT_LLM_MODEL:?set LORABOT_LLM_MODEL}
# The app defaults to loopback; inside the container we need 0.0.0.0 so the
# docker port mapping below can reach it. Restrict exposure at the host port.
LORABOT_WEB__HOST: ${LORABOT_WEB_HOST:-0.0.0.0}
ports:
# Built-in read-only web UI. Override via LORABOT_WEB_PORT.
- "${LORABOT_WEB_PORT:-8080}:8080"
volumes: volumes:
- meshbot-data:/data - lorabot-data:/data
# Mount your config.toml at /etc/meshbot/config.toml. Anything not set in the # Mount your config.toml at /etc/lorabot/config.toml. Anything not set in the
# TOML will fall back to defaults; env vars above always win. # TOML will fall back to defaults; env vars above always win.
- ./config.toml:/etc/meshbot/config.toml:ro - ./config.toml:/etc/lorabot/config.toml:ro
volumes: volumes:
meshbot-data: lorabot-data:
+3 -2
View File
@@ -3,7 +3,7 @@ requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project] [project]
name = "meshbot" name = "lorabot"
version = "0.1.0" version = "0.1.0"
description = "Bridge a MeshCore companion radio to an OpenAI-compatible LLM endpoint." description = "Bridge a MeshCore companion radio to an OpenAI-compatible LLM endpoint."
readme = "README.md" readme = "README.md"
@@ -11,6 +11,7 @@ requires-python = ">=3.11"
license = { text = "MIT" } license = { text = "MIT" }
authors = [{ name = "Tobias Huttinger" }] authors = [{ name = "Tobias Huttinger" }]
dependencies = [ dependencies = [
"aiohttp>=3.9",
"meshcore>=2.3", "meshcore>=2.3",
"openai>=1.40", "openai>=1.40",
"pydantic>=2.7", "pydantic>=2.7",
@@ -24,7 +25,7 @@ dev = [
] ]
[project.scripts] [project.scripts]
meshbot = "meshbot.__main__:_cli" lorabot = "lorabot.__main__:_cli"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
+27 -9
View File
@@ -2,25 +2,43 @@
# Build a multi-arch (linux/amd64, linux/arm64) image and push it to a registry. # Build a multi-arch (linux/amd64, linux/arm64) image and push it to a registry.
# #
# Required: # Required:
# MESHBOT_IMAGE Full image reference, e.g. registry.example.com/team/meshbot # LORABOT_IMAGE Full image reference, e.g. registry.example.com/team/lorabot
# #
# Optional: # Optional:
# PLATFORMS Comma-separated arch list (default: linux/amd64,linux/arm64) # PLATFORMS Comma-separated arch list (default: linux/amd64,linux/arm64)
# EXTRA_TAGS Space-separated additional tags to push (e.g. "stable v0.1.0") # EXTRA_TAGS Space-separated additional tags to push (e.g. "stable v0.1.0")
# PUSH "1" (default) to push, "0" to build and load locally only # PUSH "1" (default) to push, "0" to build and load locally only
# (note: --load only works with a single platform) # (note: --load only works with a single platform)
# BUILDER buildx builder name (default: meshbot-builder, auto-created) # BUILDER buildx builder name (default: lorabot-builder, auto-created)
set -euo pipefail set -euo pipefail
if [[ -z "${MESHBOT_IMAGE:-}" ]]; then if [[ -z "${LORABOT_IMAGE:-}" ]]; then
echo "error: MESHBOT_IMAGE is required (e.g. registry.example.com/team/meshbot)" >&2 echo "error: LORABOT_IMAGE is required (e.g. registry.example.com/team/lorabot)" >&2
exit 1
fi
# Preflight: buildx must be installed (it's a separate package on Arch / some
# minimal distros). If it's missing, `docker buildx ...` is parsed by docker as
# `docker ...` with unknown flags, producing a confusing "unknown flag" error.
if ! docker buildx version >/dev/null 2>&1; then
echo "error: docker buildx is not available." >&2
echo " Arch: sudo pacman -S docker-buildx" >&2
echo " Debian/Ubu: sudo apt install docker-buildx-plugin" >&2
echo " Other: https://github.com/docker/buildx#installing" >&2
exit 1
fi
# Preflight: daemon reachable.
if ! docker info >/dev/null 2>&1; then
echo "error: cannot reach the docker daemon (is it running? are you in the 'docker' group?)" >&2
echo " Arch: sudo systemctl start docker" >&2
exit 1 exit 1
fi fi
PLATFORMS="${PLATFORMS:-linux/amd64,linux/arm64}" PLATFORMS="${PLATFORMS:-linux/amd64,linux/arm64}"
PUSH="${PUSH:-1}" PUSH="${PUSH:-1}"
BUILDER="${BUILDER:-meshbot-builder}" BUILDER="${BUILDER:-lorabot-builder}"
cd "$(dirname "$0")/.." cd "$(dirname "$0")/.."
@@ -34,9 +52,9 @@ else
GIT_SHA="dev" GIT_SHA="dev"
fi fi
TAGS=(--tag "${MESHBOT_IMAGE}:latest" --tag "${MESHBOT_IMAGE}:${GIT_SHA}") TAGS=(--tag "${LORABOT_IMAGE}:latest" --tag "${LORABOT_IMAGE}:${GIT_SHA}")
for t in ${EXTRA_TAGS:-}; do for t in ${EXTRA_TAGS:-}; do
TAGS+=(--tag "${MESHBOT_IMAGE}:${t}") TAGS+=(--tag "${LORABOT_IMAGE}:${t}")
done done
# Make sure a buildx builder exists. Reuse if it's already there. # Make sure a buildx builder exists. Reuse if it's already there.
@@ -51,7 +69,7 @@ docker buildx inspect --bootstrap >/dev/null
OUTPUT_FLAG=() OUTPUT_FLAG=()
if [[ "${PUSH}" == "1" ]]; then if [[ "${PUSH}" == "1" ]]; then
OUTPUT_FLAG=(--push) OUTPUT_FLAG=(--push)
echo ">>> building & pushing ${MESHBOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}" echo ">>> building & pushing ${LORABOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}"
else else
# --load only works with a single platform; warn if user requested multi. # --load only works with a single platform; warn if user requested multi.
if [[ "${PLATFORMS}" == *,* ]]; then if [[ "${PLATFORMS}" == *,* ]]; then
@@ -59,7 +77,7 @@ else
exit 1 exit 1
fi fi
OUTPUT_FLAG=(--load) OUTPUT_FLAG=(--load)
echo ">>> building ${MESHBOT_IMAGE} for ${PLATFORMS} (local load, no push)" echo ">>> building ${LORABOT_IMAGE} for ${PLATFORMS} (local load, no push)"
fi fi
docker buildx build \ docker buildx build \
+3
View File
@@ -0,0 +1,3 @@
"""lorabot — MeshCore ↔ LLM bridge."""
__version__ = "0.1.0"
+34
View File
@@ -0,0 +1,34 @@
"""Entry point: ``python -m lorabot`` and the ``lorabot`` console script."""
from __future__ import annotations
import asyncio
import logging
from .bot import run
from .config import Settings
_LOG_FORMAT = "%(asctime)s %(levelname)-7s %(name)s: %(message)s"
def _cli() -> None:
# Bootstrap config so any error during Settings() loading is still logged
# nicely. ``force=True`` lets us reapply with the user's level afterwards.
logging.basicConfig(level=logging.INFO, format=_LOG_FORMAT)
try:
cfg = Settings()
except Exception:
logging.exception("failed to load configuration")
raise SystemExit(1) from None
level = getattr(logging, cfg.logging.level.upper(), logging.INFO)
logging.basicConfig(level=level, format=_LOG_FORMAT, force=True)
try:
asyncio.run(run(cfg))
except KeyboardInterrupt:
pass
if __name__ == "__main__":
_cli()
+159
View File
@@ -0,0 +1,159 @@
"""Main run loop: connect to the MeshCore device, wire collaborators, listen for DMs."""
from __future__ import annotations
import asyncio
import logging
from meshcore import EventType, MeshCore
from . import db, web
from .commands import build_default_registry
from .config import Settings
from .handler import build_dm_handler
from .llm import LLMClient
from .tools import build_default_registry as build_default_tool_registry
from .transport import MeshTransport
log = logging.getLogger("lorabot")
async def run(cfg: Settings | None = None) -> None:
# ``cfg`` is normally built by the entry point so logging can be configured
# from it before we get here; falling back to ``Settings()`` keeps direct
# ``run()`` calls (tests, embedding) working.
if cfg is None:
cfg = Settings()
db_conn = db.connect(cfg.storage.sqlite_path)
state = web.AppState(
db_conn,
model=cfg.llm.model,
serial_port=cfg.meshcore.serial_port,
loop=asyncio.get_running_loop(),
)
tool_registry = build_default_tool_registry(
tavily_api_key=cfg.tools.tavily.api_key,
)
llm = LLMClient(
base_url=cfg.llm.base_url,
api_key=cfg.llm.api_key,
model=cfg.llm.model,
system_prompt=cfg.llm.system_prompt,
temperature=cfg.llm.temperature,
timeout=cfg.llm.request_timeout_seconds,
tools=tool_registry,
)
web_task: asyncio.Task | None = None
if cfg.web.enabled:
web_task = asyncio.create_task(web.serve(state, cfg.web.host, cfg.web.port))
log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
try:
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
# Default is False: the lib only marks contacts dirty on ADVERTISEMENT and
# waits for the caller to re-pull. Turn it on so a fresh advert is reflected
# in the local cache before the peer's first DM lands. Not needed anymore since we're tracking
# contacts ourselves now.
# mc.auto_update_contacts = True
await mc.ensure_contacts()
transport = MeshTransport(
mc,
db_conn,
ack_timeout=cfg.message.ack_timeout_seconds,
send_retries=cfg.message.send_retries,
)
transport.sync_contacts()
except BaseException:
state.set_connected(False)
if web_task is not None:
web_task.cancel()
try:
await web_task
except (asyncio.CancelledError, Exception):
pass
await llm.aclose()
await tool_registry.aclose()
db_conn.close()
raise
state.set_connected(True, node_name=_self_name(mc))
async def _advertise() -> bool:
result = await mc.commands.send_advert(flood=cfg.advertise.flood)
ok = result.type != EventType.ERROR
if ok:
log.info("advert sent")
else:
log.warning("advert failed: %s", result.payload)
return ok
state.set_advertise_callback(_advertise)
advert_task: asyncio.Task | None = None
if cfg.advertise.enabled:
advert_task = asyncio.create_task(
_advert_loop(state, cfg.advertise.interval_seconds, cfg.advertise.at_startup)
)
registry = build_default_registry()
on_dm = build_dm_handler(
db_conn=db_conn,
llm=llm,
registry=registry,
state=state,
cfg=cfg,
transport=transport,
)
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
sub_contacts = mc.subscribe(EventType.NEW_CONTACT, transport.on_new_contact)
await mc.start_auto_message_fetching()
log.info("lorabot listening on %s", cfg.meshcore.serial_port)
try:
await asyncio.Event().wait()
finally:
state.set_connected(False)
state.set_advertise_callback(None)
mc.unsubscribe(sub)
mc.unsubscribe(sub_contacts)
await mc.stop_auto_message_fetching()
await mc.disconnect()
await llm.aclose()
await tool_registry.aclose()
for task in (advert_task, web_task):
if task is not None:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
db_conn.close()
async def _advert_loop(state: web.AppState, interval: int, at_startup: bool) -> None:
"""Periodically trigger adverts. ``interval == 0`` disables auto-repeat."""
if at_startup:
await state.trigger_advertise()
if interval <= 0:
return
while True:
await asyncio.sleep(interval)
await state.trigger_advertise()
def _self_name(mc: MeshCore) -> str | None:
"""Best-effort lookup of the local node's advertised name. Never raises."""
for attr in ("self_info", "node_info", "device_info"):
try:
info = getattr(mc, attr, None)
except Exception:
continue
if isinstance(info, dict):
name = info.get("adv_name") or info.get("name")
if name:
return str(name)
return None
+121
View File
@@ -0,0 +1,121 @@
"""Slash-command parser and registry for incoming DMs.
A command is any DM whose text starts with ``/``. The first whitespace-separated
token (case-insensitive) selects the handler; the rest is passed through as the
raw argument string. Handlers return the reply text; ``None`` means "no reply".
"""
from __future__ import annotations
import sqlite3
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from . import db
if TYPE_CHECKING:
from .config import Settings
from .web import AppState
@dataclass
class CommandContext:
"""Everything a command handler might need to read state or react."""
db_conn: sqlite3.Connection
public_key: str
contact_name: str
cfg: Settings
state: AppState
@dataclass
class CommandResult:
"""What a command returns. ``after_send`` runs once the assistant turn is persisted."""
reply: str | None = None
after_send: Callable[[CommandContext], Awaitable[None]] | None = field(default=None, repr=False)
CommandHandler = Callable[[CommandContext, str], Awaitable[CommandResult | str | None]]
@dataclass
class Command:
name: str # without leading slash, lowercase
description: str
handler: CommandHandler
class CommandRegistry:
def __init__(self) -> None:
self._commands: dict[str, Command] = {}
def register(self, name: str, description: str) -> Callable[[CommandHandler], CommandHandler]:
def decorator(fn: CommandHandler) -> CommandHandler:
self._commands[name.lower()] = Command(name.lower(), description, fn)
return fn
return decorator
def list(self) -> list[Command]:
return sorted(self._commands.values(), key=lambda c: c.name)
@staticmethod
def parse(text: str) -> tuple[str, str] | None:
"""Return ``(name, args)`` if ``text`` is a slash command, else ``None``."""
stripped = text.strip()
if not stripped.startswith("/"):
return None
head, _, rest = stripped[1:].partition(" ")
if not head:
return None
return head.lower(), rest.strip()
async def dispatch(self, ctx: CommandContext, text: str) -> CommandResult | None:
"""If ``text`` is a known command, run it. ``None`` means "not a command"."""
parsed = self.parse(text)
if parsed is None:
return None
name, args = parsed
cmd = self._commands.get(name)
if cmd is None:
return CommandResult(reply=f"unknown command: /{name}")
out = await cmd.handler(ctx, args)
if isinstance(out, CommandResult):
return out
return CommandResult(reply=out)
def build_default_registry() -> CommandRegistry:
"""Registry with the built-in commands wired up."""
reg = CommandRegistry()
async def _clear_after_send(ctx: CommandContext) -> None:
# Bump the watermark *after* the "history cleared." reply is persisted so
# neither side of this exchange leaks into the next LLM context.
db.clear_history(ctx.db_conn, ctx.public_key)
@reg.register("clear", "reset LLM context for this conversation")
async def _clear(_ctx: CommandContext, _args: str) -> CommandResult:
return CommandResult(reply="history cleared.", after_send=_clear_after_send)
@reg.register("thinking", "show or set thinking mode: /thinking [on|off]")
async def _thinking(ctx: CommandContext, args: str) -> str:
arg = args.strip().lower()
if not arg:
current = db.get_thinking_enabled(ctx.db_conn, ctx.public_key)
return f"thinking is {'on' if current else 'off'}"
if arg in ("on", "1", "true", "yes"):
db.set_thinking_enabled(ctx.db_conn, ctx.public_key, True)
return "thinking on"
if arg in ("off", "0", "false", "no"):
db.set_thinking_enabled(ctx.db_conn, ctx.public_key, False)
return "thinking off"
return "usage: /thinking [on|off]"
@reg.register("help", "list available commands")
async def _help(_ctx: CommandContext, _args: str) -> str:
return "\n".join(f"/{c.name}{c.description}" for c in reg.list())
return reg
+119
View File
@@ -0,0 +1,119 @@
"""Application settings loaded from TOML with env-var overrides."""
from __future__ import annotations
import os
from pathlib import Path
from pydantic import BaseModel, Field
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
TomlConfigSettingsSource,
)
class MeshCoreCfg(BaseModel):
serial_port: str
baud_rate: int = 115200
class LLMCfg(BaseModel):
base_url: str
api_key: str = "not-needed"
model: str
system_prompt: str = (
"You are a concise assistant on a low-bandwidth mesh radio. "
"Replies must be brief — under 180 bytes."
)
temperature: float = 0.7
request_timeout_seconds: float = 60.0
class StorageCfg(BaseModel):
sqlite_path: Path = Path("data/lorabot.db")
class MessageCfg(BaseModel):
max_bytes: int = Field(default=184, gt=0)
# 0 = use the device's path-aware suggested timeout (recommended). Set a
# positive value to override per attempt — useful only when the device's
# suggestion is consistently wrong for your link conditions.
ack_timeout_seconds: float = Field(default=0.0, ge=0)
# Number of retries after the first attempt. Total attempts = send_retries + 1.
# With send_retries >= 2 the third attempt onwards is sent as a flood broadcast
# (multi-hop) instead of direct — see meshcore's send_msg_with_retry.
send_retries: int = Field(default=1, ge=0)
class WebCfg(BaseModel):
enabled: bool = True
# Default to loopback. Conversation logs are unauthenticated; only set this to
# 0.0.0.0 (e.g. inside Docker) when you understand the exposure.
host: str = "127.0.0.1"
port: int = Field(default=8080, gt=0, lt=65536)
class LoggingCfg(BaseModel):
# Standard level names: DEBUG, INFO, WARNING, ERROR, CRITICAL. Case-insensitive.
level: str = "INFO"
class TavilyCfg(BaseModel):
# Sign up at https://tavily.com for a key. When empty, the web_search and
# fetch_url tools simply aren't registered (the rest of the bot is unaffected).
api_key: str = ""
class ToolsCfg(BaseModel):
tavily: TavilyCfg = TavilyCfg()
class AdvertiseCfg(BaseModel):
enabled: bool = True
# Seconds between automatic adverts. 0 = manual only (button still works).
interval_seconds: int = Field(default=3600, ge=0)
# Send one advert as soon as the device is connected.
at_startup: bool = True
# Flood the mesh (multi-hop) instead of a zero-hop advert.
flood: bool = False
def _toml_path() -> Path:
return Path(os.environ.get("LORABOT_CONFIG", "config.toml"))
class Settings(BaseSettings):
meshcore: MeshCoreCfg
llm: LLMCfg
storage: StorageCfg = StorageCfg()
message: MessageCfg = MessageCfg()
web: WebCfg = WebCfg()
advertise: AdvertiseCfg = AdvertiseCfg()
tools: ToolsCfg = ToolsCfg()
logging: LoggingCfg = LoggingCfg()
model_config = SettingsConfigDict(
env_prefix="LORABOT_",
env_nested_delimiter="__",
toml_file=_toml_path(),
extra="ignore",
)
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
# Order = priority (highest first): init args > env > TOML > secrets.
return (
init_settings,
env_settings,
TomlConfigSettingsSource(settings_cls),
file_secret_settings,
)
+176
View File
@@ -0,0 +1,176 @@
"""SQLite persistence for conversation history and contacts."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
public_key TEXT PRIMARY KEY,
adv_name TEXT,
data TEXT NOT NULL,
seen_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS conversations (
public_key TEXT PRIMARY KEY,
contact_name TEXT,
cleared_at_id INTEGER NOT NULL DEFAULT 0,
thinking_enabled INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL REFERENCES conversations(public_key),
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
hidden_from_llm INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id
ON messages(public_key, id);
"""
def connect(path: str | Path) -> sqlite3.Connection:
"""Open the SQLite DB, ensure the parent dir and schema exist, return the connection."""
db_path = Path(path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path, isolation_level=None) # autocommit
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA foreign_keys = ON;")
conn.executescript(SCHEMA)
_migrate(conn)
return conn
def _migrate(conn: sqlite3.Connection) -> None:
"""Apply additive migrations for DBs created before later columns existed."""
conv_cols = {row["name"] for row in conn.execute("PRAGMA table_info(conversations)")}
if "cleared_at_id" not in conv_cols:
conn.execute("ALTER TABLE conversations ADD COLUMN cleared_at_id INTEGER NOT NULL DEFAULT 0")
if "thinking_enabled" not in conv_cols:
conn.execute(
"ALTER TABLE conversations ADD COLUMN thinking_enabled INTEGER NOT NULL DEFAULT 0"
)
msg_cols = {row["name"] for row in conn.execute("PRAGMA table_info(messages)")}
if "hidden_from_llm" not in msg_cols:
conn.execute(
"ALTER TABLE messages ADD COLUMN hidden_from_llm INTEGER NOT NULL DEFAULT 0"
)
def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None:
conn.execute(
"""
INSERT INTO conversations (public_key, contact_name)
VALUES (?, ?)
ON CONFLICT(public_key) DO UPDATE SET
contact_name = excluded.contact_name,
updated_at = datetime('now')
""",
(public_key, contact_name),
)
def add_message(
conn: sqlite3.Connection,
public_key: str,
role: str,
content: str,
*,
hidden_from_llm: bool = False,
) -> None:
conn.execute(
"INSERT INTO messages (public_key, role, content, hidden_from_llm) VALUES (?, ?, ?, ?)",
(public_key, role, content, 1 if hidden_from_llm else 0),
)
def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]:
"""Return the conversation as OpenAI chat messages, oldest first.
Messages with ``id <= conversations.cleared_at_id`` are excluded so a peer can
reset their LLM context with ``/clear`` without losing the audit trail.
"""
rows = conn.execute(
"""
SELECT m.role, m.content
FROM messages m
JOIN conversations c ON c.public_key = m.public_key
WHERE m.public_key = ?
AND m.id > c.cleared_at_id
AND m.hidden_from_llm = 0
ORDER BY m.id ASC
""",
(public_key,),
).fetchall()
return [{"role": row["role"], "content": row["content"]} for row in rows]
def get_thinking_enabled(conn: sqlite3.Connection, public_key: str) -> bool:
row = conn.execute(
"SELECT thinking_enabled FROM conversations WHERE public_key = ?",
(public_key,),
).fetchone()
return bool(row["thinking_enabled"]) if row is not None else False
def set_thinking_enabled(conn: sqlite3.Connection, public_key: str, enabled: bool) -> None:
conn.execute(
"""
UPDATE conversations
SET thinking_enabled = ?, updated_at = datetime('now')
WHERE public_key = ?
""",
(1 if enabled else 0, public_key),
)
def upsert_contact(conn: sqlite3.Connection, contact: dict) -> None:
conn.execute(
"""
INSERT INTO contacts (public_key, adv_name, data)
VALUES (?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
adv_name = excluded.adv_name,
data = excluded.data,
seen_at = datetime('now')
""",
(contact["public_key"], contact.get("adv_name"), json.dumps(contact)),
)
def get_contact_by_key_prefix(conn: sqlite3.Connection, prefix: str) -> dict | None:
row = conn.execute(
"SELECT data FROM contacts WHERE public_key LIKE ? || '%'",
(prefix,),
).fetchone()
return json.loads(row["data"]) if row is not None else None
def clear_history(conn: sqlite3.Connection, public_key: str) -> None:
"""Bump the per-conversation watermark to the current max message id.
Old messages stay in the table for the web UI; future ``get_history`` calls
skip them.
"""
conn.execute(
"""
UPDATE conversations
SET cleared_at_id = COALESCE(
(SELECT MAX(id) FROM messages WHERE public_key = ?),
0
),
updated_at = datetime('now')
WHERE public_key = ?
""",
(public_key, public_key),
)
+150
View File
@@ -0,0 +1,150 @@
"""DM event handler: route incoming DMs through the command registry or the LLM."""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import time
from collections import defaultdict
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from . import db
from .commands import CommandContext, CommandRegistry, CommandResult
from .config import Settings
from .llm import LLMClient
from .transport import MeshTransport
from .web import AppState
log = logging.getLogger("lorabot")
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class _Deduplicator:
"""Drops consecutive identical messages from the same sender within a time window."""
def __init__(self, window: float = 15.0) -> None:
self._window = window
self._seen: dict[str, tuple[str, float]] = {}
def is_duplicate(self, key: str, text: str) -> bool:
now = time.monotonic()
prev_text, prev_time = self._seen.get(key, ("", 0.0))
if text == prev_text and now - prev_time < self._window:
return True
self._seen[key] = (text, now)
return False
def _store_and_publish(
db_conn: sqlite3.Connection,
state: AppState,
public_key: str,
contact_name: str,
role: str,
text: str,
hidden_from_llm: bool,
) -> None:
db.add_message(db_conn, public_key, role, text, hidden_from_llm=hidden_from_llm)
state.publish("message", {
"public_key": public_key,
"contact_name": contact_name,
"role": role,
"content": text,
"created_at": _now_iso(),
})
async def _generate_reply(
registry: CommandRegistry,
llm: LLMClient,
db_conn: sqlite3.Connection,
public_key: str,
ctx: CommandContext,
text: str,
is_command: bool,
) -> tuple[str, CommandResult | None] | None:
"""Dispatch to command or LLM. Returns (reply, cmd_result), or None to abort."""
if is_command:
cmd_result = await registry.dispatch(ctx, text)
if cmd_result is None or cmd_result.reply is None:
return None
return cmd_result.reply, cmd_result
thinking = db.get_thinking_enabled(db_conn, public_key)
try:
reply = await llm.reply(db.get_history(db_conn, public_key), thinking=thinking)
except Exception:
log.exception("LLM call failed for %s", public_key[:12])
return None
return reply, None
def build_dm_handler(
*,
db_conn: sqlite3.Connection,
llm: LLMClient,
registry: CommandRegistry,
state: AppState,
cfg: Settings,
transport: MeshTransport,
) -> Callable[[object], Awaitable[None]]:
"""Return an ``on_dm(event)`` closure with all collaborators bound."""
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
dedup = _Deduplicator(window=15.0)
async def on_dm(event) -> None:
data = event.payload or {}
prefix = data.get("pubkey_prefix")
text = (data.get("text") or "").strip()
if not prefix or not text:
return
contact = transport.resolve_contact(prefix)
if contact is None:
log.info("ignoring DM from unknown sender %s (no contact after refresh)", prefix)
return
public_key = contact["public_key"]
contact_name = contact.get("adv_name", "")
if dedup.is_duplicate(public_key, text):
log.info("dropping duplicate DM from %s (%s): %s", contact_name, public_key[:12], text)
return
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
is_command = registry.parse(text) is not None
async with locks[public_key]:
db.upsert_conversation(db_conn, public_key, contact_name)
_store_and_publish(db_conn, state, public_key, contact_name, "user", text,
hidden_from_llm=is_command)
ctx = CommandContext(
db_conn=db_conn,
public_key=public_key,
contact_name=contact_name,
cfg=cfg,
state=state,
)
result = await _generate_reply(registry, llm, db_conn, public_key, ctx, text, is_command)
if result is None:
return
reply, cmd_result = result
delivered = await transport.send_chunked(contact, reply, cfg.message.max_bytes)
if not delivered:
return
_store_and_publish(db_conn, state, public_key, contact_name, "assistant", delivered,
hidden_from_llm=is_command)
if cmd_result is not None and cmd_result.after_send is not None:
await cmd_result.after_send(ctx)
return on_dm
+132
View File
@@ -0,0 +1,132 @@
"""Thin wrapper around the OpenAI Python SDK aimed at OpenAI-compatible servers."""
from __future__ import annotations
import logging
from typing import Any
from openai import AsyncOpenAI
from .tools import ToolRegistry
log = logging.getLogger("lorabot")
# Hard cap on assistant <-> tool round-trips per ``reply`` call. Stops a misbehaving
# model from looping on tool calls forever; in practice 1-2 iterations is normal.
_MAX_TOOL_ITERATIONS = 5
class LLMClient:
def __init__(
self,
*,
base_url: str,
api_key: str,
model: str,
system_prompt: str,
temperature: float,
timeout: float,
tools: ToolRegistry | None = None,
) -> None:
self._client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=timeout)
self._model = model
self._system_prompt = system_prompt
self._temperature = temperature
self._tools = tools
async def reply(self, history: list[dict[str, str]], *, thinking: bool = False) -> str:
"""Send the system prompt + ``history`` and return the assistant's text.
``thinking`` toggles the llama.cpp/server chat-template kwarg ``enable_thinking``,
which controls whether the model prepends its hidden reasoning turn (Gemma-style).
Passed via OpenAI SDK ``extra_body`` since it is not part of the standard schema.
If a ``ToolRegistry`` is wired in, the model may emit ``tool_calls``; we
dispatch them, append the results, and re-call until the model produces
plain content (or the iteration cap kicks in).
"""
# Local working copy: we'll mutate this with assistant/tool turns across
# the tool loop without touching the persisted DB history.
messages: list[dict[str, Any]] = [
{"role": "system", "content": self._system_prompt},
*history,
]
# Build the tool spec list once. ``None`` (not ``[]``) means "don't send
# the tools field at all" — some servers reject an empty list.
tool_specs = self._tools.specs() if self._tools else None
for iteration in range(_MAX_TOOL_ITERATIONS):
# Only attach ``tools`` when we actually have any registered.
kwargs: dict[str, Any] = {}
if tool_specs:
kwargs["tools"] = tool_specs
log.debug(
"LLM request: model=%s iter=%d msgs=%d tools=%d thinking=%s",
self._model,
iteration + 1,
len(messages),
len(tool_specs) if tool_specs else 0,
thinking,
)
resp = await self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=self._temperature,
extra_body={"chat_template_kwargs": {"enable_thinking": thinking}},
**kwargs,
)
msg = resp.choices[0].message
# No tool calls → this is the final text answer; return it.
# Also bail if tools aren't even configured: nothing to dispatch to.
if not msg.tool_calls or self._tools is None:
return (msg.content or "").strip()
# Visible at INFO so a normal log lets you see when the model
# actually reached for a tool, and which one(s).
log.info(
"LLM requested tool calls (iter %d): %s",
iteration + 1,
[tc.function.name for tc in msg.tool_calls],
)
# Echo the assistant's tool-calling turn back into ``messages`` so the
# next round-trip sees it. The OpenAI protocol requires this exact
# shape (role=assistant + tool_calls list) before role=tool entries.
messages.append({
"role": "assistant",
"content": msg.content,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in msg.tool_calls
],
})
# Run each tool the model asked for and append its result. Errors are
# surfaced as plain strings inside the registry so the model can see
# them and recover (e.g. retry with a different argument).
for tc in msg.tool_calls:
result = await self._tools.dispatch(
tc.function.name, tc.function.arguments
)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
# Loop continues: re-call the model with the tool results in context.
log.warning("LLM tool loop exceeded %d iterations", _MAX_TOOL_ITERATIONS)
return "(tool loop limit exceeded)"
async def aclose(self) -> None:
await self._client.close()
+52
View File
@@ -0,0 +1,52 @@
"""Helpers for shaping outgoing mesh messages."""
from __future__ import annotations
def trim_to_bytes(text: str, max_bytes: int) -> str:
"""Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes.
Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit
invalid UTF-8 to the radio.
"""
if max_bytes <= 0:
return ""
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
cut = encoded[:max_bytes]
# Continuation bytes start with bits 10xxxxxx; rewind past them.
while cut and (cut[-1] & 0xC0) == 0x80:
cut = cut[:-1]
return cut.decode("utf-8", errors="ignore")
def split_to_bytes(text: str, max_bytes: int, max_chunks: int = 2) -> list[str]:
"""Split ``text`` into up to ``max_chunks`` UTF-8-safe chunks of ``max_bytes`` each.
Anything past ``max_chunks * max_bytes`` is dropped. Splits never land inside a
multi-byte UTF-8 sequence on either side of the boundary.
"""
if max_bytes <= 0 or max_chunks <= 0 or not text:
return []
encoded = text.encode("utf-8")
n = len(encoded)
chunks: list[str] = []
pos = 0
for _ in range(max_chunks):
if pos >= n:
break
end = min(pos + max_bytes, n)
if end < n:
# Rewind past trailing UTF-8 continuation bytes (10xxxxxx).
while end > pos and (encoded[end - 1] & 0xC0) == 0x80:
end -= 1
# And past a dangling leader byte (11xxxxxx) whose continuations
# would fall outside the budget.
if end > pos and (encoded[end - 1] & 0xC0) == 0xC0:
end -= 1
if end == pos:
break
chunks.append(encoded[pos:end].decode("utf-8"))
pos = end
return chunks
+29
View File
@@ -0,0 +1,29 @@
"""LLM tool calling: registry + built-in tools."""
from __future__ import annotations
from .base import Tool, ToolRegistry
from .weather import WeatherTool
from .web import FetchUrlTool, WebSearchTool
def build_default_registry(*, tavily_api_key: str = "") -> ToolRegistry:
"""Build the default registry. Tavily-backed tools are only registered
when an API key is configured — without one, the bot silently runs with
just the offline tools."""
reg = ToolRegistry()
reg.register(WeatherTool())
if tavily_api_key:
reg.register(WebSearchTool(api_key=tavily_api_key))
reg.register(FetchUrlTool(api_key=tavily_api_key))
return reg
__all__ = [
"Tool",
"ToolRegistry",
"WeatherTool",
"WebSearchTool",
"FetchUrlTool",
"build_default_registry",
]
+90
View File
@@ -0,0 +1,90 @@
"""Tool abstraction and registry for OpenAI-style function calling."""
from __future__ import annotations
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import Any
log = logging.getLogger("lorabot")
def _truncate_for_log(s: str, max_len: int = 200) -> str:
"""Cap a log field so a giant tool argument doesn't blow up log lines."""
return s if len(s) <= max_len else s[:max_len] + ""
class Tool(ABC):
"""Single LLM-callable function. Subclasses set ``name``/``description``/
``parameters`` (JSON Schema) and implement ``run``."""
name: str
description: str
parameters: dict[str, Any]
@abstractmethod
async def run(self, **kwargs: Any) -> str:
...
async def aclose(self) -> None:
return None
def spec(self) -> dict[str, Any]:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
class ToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, Tool] = {}
def register(self, tool: Tool) -> None:
self._tools[tool.name] = tool
def specs(self) -> list[dict[str, Any]]:
return [t.spec() for t in self._tools.values()]
def __bool__(self) -> bool:
return bool(self._tools)
async def dispatch(self, name: str, arguments_json: str) -> str:
"""Run the named tool with JSON-encoded arguments. Errors are returned
as plain strings so the LLM can see and react to them."""
tool = self._tools.get(name)
if tool is None:
log.warning("tool call rejected: unknown tool %r", name)
return f"error: unknown tool {name!r}"
try:
args = json.loads(arguments_json) if arguments_json else {}
except json.JSONDecodeError as exc:
log.warning("tool %s: bad arguments JSON: %s", name, exc)
return f"error: bad arguments JSON ({exc})"
# Log entry + exit so failures are easy to attribute and we get a
# rough timing picture per tool. Args are truncated to keep logs sane.
log.info("tool %s called: %s", name, _truncate_for_log(arguments_json or "{}"))
started = time.monotonic()
try:
result = await tool.run(**args)
except Exception as exc:
elapsed_ms = (time.monotonic() - started) * 1000
log.exception("tool %s failed after %.0f ms", name, elapsed_ms)
return f"error: {exc}"
elapsed_ms = (time.monotonic() - started) * 1000
log.info("tool %s ok in %.0f ms (%d chars)", name, elapsed_ms, len(result))
return result
async def aclose(self) -> None:
for tool in self._tools.values():
try:
await tool.aclose()
except Exception:
log.exception("aclose failed for tool %s", tool.name)
+201
View File
@@ -0,0 +1,201 @@
"""Weather tool backed by Open-Meteo (no API key).
Returns the current observation, and optionally a multi-day daily forecast
(min/max temperature + weather code per day, in the location's local timezone).
"""
from __future__ import annotations
from datetime import date
import aiohttp
from .base import Tool
GEOCODE_URL = "https://geocoding-api.open-meteo.com/v1/search"
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
# Hard upper bound on daily forecast length. Open-Meteo allows up to 16 days but
# bandwidth and model context favor keeping replies compact.
_MAX_FORECAST_DAYS = 7
# WMO weather interpretation codes — compact summaries to keep LoRa replies short.
_WMO: dict[int, str] = {
0: "clear",
1: "mostly clear", 2: "partly cloudy", 3: "overcast",
45: "fog", 48: "rime fog",
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
56: "freezing drizzle", 57: "freezing drizzle",
61: "light rain", 63: "rain", 65: "heavy rain",
66: "freezing rain", 67: "freezing rain",
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
80: "rain showers", 81: "rain showers", 82: "violent showers",
85: "snow showers", 86: "heavy snow showers",
95: "thunderstorm", 96: "thunderstorm w/ hail", 99: "thunderstorm w/ hail",
}
class WeatherTool(Tool):
name = "get_weather"
description = (
"Current weather conditions for a place, optionally followed by a daily "
"forecast. Accepts a location name (e.g. 'Berlin', 'San Francisco, US') "
"or 'lat,lon' decimal coordinates. Set forecast_days to include the next "
"N days (today inclusive); 0 returns only the current observation."
)
parameters = {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City/place name, or 'lat,lon' decimal coordinates.",
},
"forecast_days": {
"type": "integer",
"description": (
f"Number of daily-forecast days to include (0{_MAX_FORECAST_DAYS}, "
"today inclusive). Defaults to 0 (current weather only)."
),
"minimum": 0,
"maximum": _MAX_FORECAST_DAYS,
"default": 0,
},
},
"required": ["location"],
}
def __init__(self) -> None:
self._session: aiohttp.ClientSession | None = None
async def aclose(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10)
)
return self._session
async def run(self, location: str = "", forecast_days: int = 0) -> str:
location = (location or "").strip()
if not location:
return "error: location is required"
# Clamp defensively: model may ignore the schema bounds.
try:
forecast_days = max(0, min(int(forecast_days), _MAX_FORECAST_DAYS))
except (TypeError, ValueError):
forecast_days = 0
coords = _parse_latlon(location)
if coords is not None:
lat, lon = coords
label = f"{lat:.3f},{lon:.3f}"
else:
geo = await self._geocode(location)
if geo is None:
return f"error: could not find location {location!r}"
lat, lon, label = geo
return await self._forecast(lat, lon, label, forecast_days)
async def _geocode(self, name: str) -> tuple[float, float, str] | None:
session = await self._get_session()
async with session.get(
GEOCODE_URL,
params={"name": name, "count": "1", "format": "json"},
) as resp:
resp.raise_for_status()
data = await resp.json()
results = data.get("results") or []
if not results:
return None
r = results[0]
bits = [r.get("name") or name]
if cc := r.get("country_code"):
bits.append(str(cc))
return float(r["latitude"]), float(r["longitude"]), ", ".join(bits)
async def _forecast(
self, lat: float, lon: float, label: str, forecast_days: int
) -> str:
# Build the request. ``timezone=auto`` makes daily aggregations align to
# the location's local calendar day instead of UTC.
params: dict[str, str] = {
"latitude": str(lat),
"longitude": str(lon),
"current": "temperature_2m,weather_code,wind_speed_10m",
"timezone": "auto",
}
if forecast_days > 0:
params["daily"] = "temperature_2m_max,temperature_2m_min,weather_code"
params["forecast_days"] = str(forecast_days)
session = await self._get_session()
async with session.get(FORECAST_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
# Current observation line (always present).
cur = data.get("current") or {}
temp = cur.get("temperature_2m")
code = cur.get("weather_code")
wind = cur.get("wind_speed_10m")
cond = _WMO.get(int(code), f"code {code}") if code is not None else "?"
cur_bits = [f"{label} now"]
if temp is not None:
cur_bits.append(f"{temp}°C")
cur_bits.append(cond)
if wind is not None:
cur_bits.append(f"wind {wind} km/h")
lines = [", ".join(cur_bits)]
# Daily forecast block (one line per day): "Mon 8-17°C, partly cloudy".
# Open-Meteo returns parallel arrays under ``daily`` keyed by date.
if forecast_days > 0:
daily = data.get("daily") or {}
days = daily.get("time") or []
tmax = daily.get("temperature_2m_max") or []
tmin = daily.get("temperature_2m_min") or []
codes = daily.get("weather_code") or []
for i, day_str in enumerate(days):
day_label = _short_day(day_str)
hi = tmax[i] if i < len(tmax) else None
lo = tmin[i] if i < len(tmin) else None
day_code = codes[i] if i < len(codes) else None
day_cond = (
_WMO.get(int(day_code), f"code {day_code}")
if day_code is not None
else "?"
)
if lo is not None and hi is not None:
lines.append(f"{day_label} {lo}{hi}°C, {day_cond}")
else:
lines.append(f"{day_label} {day_cond}")
return "; ".join(lines)
def _short_day(iso_date: str) -> str:
"""Render an ISO date (``YYYY-MM-DD``) as a short weekday (``Mon``).
Falls back to the raw string if parsing fails."""
try:
return date.fromisoformat(iso_date).strftime("%a")
except (TypeError, ValueError):
return iso_date
def _parse_latlon(text: str) -> tuple[float, float] | None:
if "," not in text:
return None
a, b = text.split(",", 1)
try:
lat = float(a.strip())
lon = float(b.strip())
except ValueError:
return None
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
return None
return lat, lon
+193
View File
@@ -0,0 +1,193 @@
"""Web search + page-extraction tools backed by Tavily (https://tavily.com).
Exposes two tools to the LLM:
- ``web_search`` — query Tavily for a list of {title, url, snippet} hits.
- ``fetch_url`` — pull the readable body text of a single page via Tavily's
``/extract`` endpoint (so we don't have to parse HTML ourselves).
Tavily handles all the messy bits — URL fetching, redirects, boilerplate
stripping, robots — so this module is just a thin HTTP wrapper.
"""
from __future__ import annotations
import logging
import aiohttp
from .base import Tool
log = logging.getLogger("lorabot")
_SEARCH_URL = "https://api.tavily.com/search"
_EXTRACT_URL = "https://api.tavily.com/extract"
# Sane defaults for the search tool. Upper bound keeps the tool message small
# enough that the LoRa-side LLM can still reason over it.
_DEFAULT_RESULTS = 5
_MAX_RESULTS = 10
# Cap extracted-page body size so a giant article doesn't blow the model's
# context. ~4 KB is plenty to summarize from for a 180-byte LoRa reply.
_MAX_EXTRACT_CHARS = 4000
class _TavilyHTTP:
"""Tiny per-tool HTTP helper. Each tool instance owns one ``aiohttp``
session; lazily created on first request, closed via ``aclose``."""
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._session: aiohttp.ClientSession | None = None
async def post(self, url: str, payload: dict) -> dict:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=20)
)
headers = {"Authorization": f"Bearer {self._api_key}"}
log.debug("tavily POST %s payload-keys=%s", url, list(payload))
async with self._session.post(url, json=payload, headers=headers) as resp:
# On non-2xx, capture the response body before raising so the log
# tells us *why* (auth, quota, bad query) — raise_for_status alone
# would only show the status code.
if resp.status >= 400:
body = await resp.text()
log.warning(
"tavily POST %s -> %d: %s", url, resp.status, body[:200]
)
resp.raise_for_status()
return await resp.json()
async def aclose(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
class WebSearchTool(Tool):
name = "web_search"
description = (
"Search the web. Returns titles, URLs and SHORT snippets — useful for "
"headlines, finding sources and orientation. Snippets are a few "
"sentences at most and are often truncated mid-thought. "
"If the user asked for any of: specific numbers, prices, dates, "
"percentages, technical specs, exact quotes, step-by-step "
"instructions, or 'current'/'latest' data — you MUST also call "
"fetch_url on the most relevant result before answering."
)
parameters = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query.",
},
"max_results": {
"type": "integer",
"description": (
f"How many results to return (1{_MAX_RESULTS}). "
f"Default {_DEFAULT_RESULTS}."
),
"minimum": 1,
"maximum": _MAX_RESULTS,
"default": _DEFAULT_RESULTS,
},
},
"required": ["query"],
}
def __init__(self, api_key: str) -> None:
self._http = _TavilyHTTP(api_key)
async def aclose(self) -> None:
await self._http.aclose()
async def run(self, query: str = "", max_results: int = _DEFAULT_RESULTS) -> str:
query = (query or "").strip()
if not query:
return "error: query is required"
# Defensive clamp — model may ignore the JSON-schema bounds.
try:
n = max(1, min(int(max_results), _MAX_RESULTS))
except (TypeError, ValueError):
n = _DEFAULT_RESULTS
data = await self._http.post(
_SEARCH_URL,
{
"query": query,
"max_results": n,
# "basic" is fast and cheap; "advanced" costs more credits but
# returns higher-quality snippets. Stick with basic for now.
"search_depth": "basic",
},
)
results = data.get("results") or []
if not results:
return "no results"
# Format as a numbered list — readable for the model and compact enough
# to fit comfortably in context alongside the rest of the conversation.
lines: list[str] = []
for i, r in enumerate(results, 1):
title = (r.get("title") or "").strip() or "(no title)"
url = r.get("url") or ""
snippet = (r.get("content") or "").strip()
lines.append(f"{i}. {title}{url}\n {snippet}")
return "\n".join(lines)
class FetchUrlTool(Tool):
name = "fetch_url"
description = (
"Fetch the full readable body text of a single web page. Call this "
"after web_search whenever the user asked for specifics — exact "
"numbers, prices, dates, quotes, technical details, or current data — "
"even if a snippet looks like it might answer. Snippets are short and "
"often misleading; the full page is the source of truth. Long pages "
"are truncated to keep context manageable."
)
parameters = {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Absolute URL of the page to fetch.",
},
},
"required": ["url"],
}
def __init__(self, api_key: str) -> None:
self._http = _TavilyHTTP(api_key)
async def aclose(self) -> None:
await self._http.aclose()
async def run(self, url: str = "") -> str:
url = (url or "").strip()
if not url:
return "error: url is required"
data = await self._http.post(
_EXTRACT_URL,
{"urls": [url], "extract_depth": "basic"},
)
results = data.get("results") or []
if not results:
# Tavily reports unreachable / blocked pages here instead of in
# ``results``. Surface the reason so the model can react.
failed = data.get("failed_results") or []
if failed:
reason = (failed[0] or {}).get("error", "unknown")
return f"error: extract failed ({reason})"
return "error: no content extracted"
body = (results[0].get("raw_content") or "").strip()
if not body:
return "error: page returned empty content"
if len(body) > _MAX_EXTRACT_CHARS:
body = body[:_MAX_EXTRACT_CHARS] + " …[truncated]"
return body
+126
View File
@@ -0,0 +1,126 @@
"""MeshCore-side transport: contact resolution and reliable chunked sending."""
from __future__ import annotations
import asyncio
import logging
import sqlite3
from collections import defaultdict
from meshcore import EventType, MeshCore
from . import db
from .messages import split_to_bytes
log = logging.getLogger("lorabot")
class MeshTransport:
"""Owns the MeshCore connection reference; handles contact resolution and
reliable chunked message delivery.
Create one instance per MeshCore connection and pass it to build_dm_handler.
"""
def __init__(
self,
mc: MeshCore,
db_conn: sqlite3.Connection,
ack_timeout: float = 0.0,
send_retries: int = 1,
) -> None:
self._mc = mc
self._db = db_conn
self._ack_timeout = ack_timeout
self._send_retries = send_retries
self._send_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# ------------------------------------------------------------------
# Contact management
# ------------------------------------------------------------------
def sync_contacts(self) -> None:
"""Bulk-upsert the device's current contact list into the DB.
Call once after the initial ensure_contacts() on connect.
"""
for contact in self._mc.contacts.values():
db.upsert_contact(self._db, contact)
log.info("synced %d contacts to DB", len(self._mc.contacts))
async def on_new_contact(self, event) -> None:
"""Subscriber for EventType.NEW_CONTACT — persists the contact to DB."""
contact = event.payload
db.upsert_contact(self._db, contact)
log.info("new contact stored: %s (%s)",
contact.get("adv_name", ""), contact["public_key"][:12])
def resolve_contact(self, prefix: str):
"""Look up a contact by pubkey prefix from the local DB."""
return db.get_contact_by_key_prefix(self._db, prefix)
# ------------------------------------------------------------------
# Chunked sending
# ------------------------------------------------------------------
async def send_chunked(
self,
contact,
text: str,
max_bytes: int,
max_chunks: int = 2,
) -> str:
"""Split ``text`` and send chunks in order, waiting for ACK before each next one.
Returns the concatenation of chunks that were successfully delivered.
"""
chunks = split_to_bytes(text, max_bytes, max_chunks=max_chunks)
pk = contact["public_key"]
pk_short = pk[:12]
planned_bytes = sum(len(c.encode("utf-8")) for c in chunks)
dropped = len(text.encode("utf-8")) - planned_bytes
if dropped > 0:
log.info("reply to %s split into %d chunks, dropped %d trailing bytes",
pk_short, len(chunks), dropped)
sent: list[str] = []
async with self._send_locks[pk]:
for i, chunk in enumerate(chunks, 1):
log.info("reply to %s (%d/%d, %d bytes): %s",
pk_short, i, len(chunks), len(chunk.encode("utf-8")), chunk)
if not await self._send_chunk(contact, chunk):
break
sent.append(chunk)
return "".join(sent)
async def _send_chunk(self, contact, chunk: str) -> bool:
"""Send one chunk and wait for the recipient ACK.
Delegates to ``send_msg_with_retry`` which correlates the ACK event by
``expected_ack`` code and retries up to ``max_attempts`` times. Returns
``None`` when no ACK arrives across all attempts.
Note on the library's default ``flood_after=2``: from the third attempt
onwards the library resets the path and re-sends as a flood broadcast.
This only kicks in when ``send_retries >= 2`` (max_attempts >= 3); with
the default ``send_retries=1`` we stay direct-only.
"""
pk_short = contact["public_key"][:12]
try:
result = await self._mc.commands.send_msg_with_retry(
contact, chunk,
max_attempts=self._send_retries + 1,
timeout=self._ack_timeout,
)
except Exception:
log.exception("send to %s raised", pk_short)
return False
if result is None:
log.error("no ACK from %s after %d attempts",
pk_short, self._send_retries + 1)
return False
if result.type == EventType.ERROR:
log.warning("send_msg error for %s: %s", pk_short, result.payload)
return False
return True
+721
View File
@@ -0,0 +1,721 @@
"""Lightweight aiohttp web UI for browsing stored conversations and live status.
Endpoints:
GET / single-page UI
GET /api/status device + model + uptime snapshot
GET /api/conversations list of conversations with message counts
GET /api/conversations/{pk} conversation header + ordered messages
GET /api/events Server-Sent Events stream (status, message)
"""
from __future__ import annotations
import asyncio
import json
import logging
import sqlite3
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from aiohttp import web
AdvertCallback = Callable[[], Awaitable[bool]]
log = logging.getLogger("lorabot.web")
class AppState:
"""Shared state between the bot loop and the web server."""
def __init__(
self,
db_conn: sqlite3.Connection,
*,
model: str,
serial_port: str,
loop: asyncio.AbstractEventLoop | None = None,
) -> None:
self.db = db_conn
self.model = model
self.serial_port = serial_port
self.connected = False
self.connected_since: datetime | None = None
self.node_name: str | None = None
self.last_advert_at: datetime | None = None
self.last_advert_ok: bool | None = None
self._subscribers: set[asyncio.Queue] = set()
self._loop = loop
self._advertise: AdvertCallback | None = None
self._advert_lock = asyncio.Lock()
def status(self) -> dict:
return {
"connected": self.connected,
"connected_since": _iso(self.connected_since),
"node_name": self.node_name,
"model": self.model,
"serial_port": self.serial_port,
"last_advert_at": _iso(self.last_advert_at),
"last_advert_ok": self.last_advert_ok,
"advertise_available": self._advertise is not None,
"contact_count": self.db.execute("SELECT COUNT(*) FROM contacts").fetchone()[0],
}
def set_advertise_callback(self, cb: AdvertCallback | None) -> None:
self._advertise = cb
self.publish("status", self.status())
async def trigger_advertise(self) -> bool:
"""Run the injected advertise callback. Serialized; never raises."""
if self._advertise is None:
return False
async with self._advert_lock:
try:
ok = bool(await self._advertise())
except Exception:
log.exception("advertise callback raised")
ok = False
self.last_advert_at = datetime.now(timezone.utc)
self.last_advert_ok = ok
self.publish("status", self.status())
return ok
def set_connected(self, ok: bool, *, node_name: str | None = None) -> None:
if ok and not self.connected:
self.connected_since = datetime.now(timezone.utc)
if not ok:
self.connected_since = None
self.connected = ok
if node_name is not None:
self.node_name = node_name
self.publish("status", self.status())
def publish(self, event: str, data: dict) -> None:
"""Thread-safe: callable from any thread; dispatch hops to the loop."""
loop = self._loop
if loop is None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return # no loop, nothing to deliver to
self._loop = loop
if loop.is_closed():
return
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None
if running is loop:
self._dispatch(event, data)
else:
loop.call_soon_threadsafe(self._dispatch, event, data)
def _dispatch(self, event: str, data: dict) -> None:
for q in list(self._subscribers):
try:
q.put_nowait((event, data))
except asyncio.QueueFull:
log.warning("dropping SSE event %s — subscriber backed up", event)
def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=128)
self._subscribers.add(q)
return q
def unsubscribe(self, q: asyncio.Queue) -> None:
self._subscribers.discard(q)
def _iso(dt: datetime | None) -> str | None:
return dt.isoformat() if dt is not None else None
def _list_conversations(db: sqlite3.Connection) -> list[dict]:
rows = db.execute(
"""
SELECT c.public_key,
c.contact_name,
c.updated_at,
(SELECT COUNT(*) FROM messages m WHERE m.public_key = c.public_key) AS message_count,
(SELECT content FROM messages m WHERE m.public_key = c.public_key
ORDER BY m.id DESC LIMIT 1) AS last_message
FROM conversations c
ORDER BY c.updated_at DESC
"""
).fetchall()
return [dict(r) for r in rows]
def _list_contacts(db: sqlite3.Connection) -> list[dict]:
rows = db.execute(
"SELECT public_key, adv_name, seen_at FROM contacts ORDER BY adv_name ASC"
).fetchall()
return [dict(r) for r in rows]
def _get_conversation(db: sqlite3.Connection, public_key: str) -> dict | None:
conv = db.execute(
"SELECT public_key, contact_name, cleared_at_id, created_at, updated_at "
"FROM conversations WHERE public_key = ?",
(public_key,),
).fetchone()
if conv is None:
return None
msgs = db.execute(
"SELECT id, role, content, created_at FROM messages "
"WHERE public_key = ? ORDER BY id ASC",
(public_key,),
).fetchall()
return {"conversation": dict(conv), "messages": [dict(m) for m in msgs]}
async def _index(_req: web.Request) -> web.Response:
return web.Response(text=INDEX_HTML, content_type="text/html")
async def _api_status(req: web.Request) -> web.Response:
return web.json_response(_state(req).status())
async def _api_conversations(req: web.Request) -> web.Response:
return web.json_response(_list_conversations(_state(req).db))
async def _api_contacts(req: web.Request) -> web.Response:
return web.json_response(_list_contacts(_state(req).db))
def _require_same_origin(req: web.Request) -> None:
"""Reject obvious cross-site browser requests on state-changing endpoints.
``Sec-Fetch-Site`` is set by all current browsers on every request. ``same-origin``
is the only value we accept. Non-browser clients (curl, scripts) don't send the
header at all — those pass through, since they're not the CSRF threat model.
"""
site = req.headers.get("Sec-Fetch-Site")
if site is not None and site != "same-origin":
raise web.HTTPForbidden(text=f"cross-origin request rejected (Sec-Fetch-Site: {site})")
async def _api_advertise(req: web.Request) -> web.Response:
_require_same_origin(req)
state = _state(req)
if state._advertise is None:
raise web.HTTPServiceUnavailable(text="device not connected")
ok = await state.trigger_advertise()
return web.json_response({
"ok": ok,
"last_advert_at": _iso(state.last_advert_at),
})
async def _api_conversation(req: web.Request) -> web.Response:
pk = req.match_info["pk"]
data = _get_conversation(_state(req).db, pk)
if data is None:
raise web.HTTPNotFound(text="unknown public_key")
return web.json_response(data)
async def _api_events(req: web.Request) -> web.StreamResponse:
state = _state(req)
resp = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
await resp.prepare(req)
await _sse_send(resp, "status", state.status())
q = state.subscribe()
try:
while True:
try:
event, data = await asyncio.wait_for(q.get(), timeout=15.0)
except asyncio.TimeoutError:
await resp.write(b": keepalive\n\n")
continue
await _sse_send(resp, event, data)
except (asyncio.CancelledError, ConnectionError):
pass
finally:
state.unsubscribe(q)
return resp
async def _sse_send(resp: web.StreamResponse, event: str, data: dict) -> None:
await resp.write(f"event: {event}\ndata: {json.dumps(data)}\n\n".encode())
def _state(req: web.Request) -> AppState:
return req.app["state"]
def build_app(state: AppState) -> web.Application:
app = web.Application()
app["state"] = state
app.router.add_get("/", _index)
app.router.add_get("/api/status", _api_status)
app.router.add_get("/api/conversations", _api_conversations)
app.router.add_get("/api/contacts", _api_contacts)
app.router.add_get("/api/conversations/{pk}", _api_conversation)
app.router.add_get("/api/events", _api_events)
app.router.add_post("/api/advertise", _api_advertise)
return app
async def serve(state: AppState, host: str, port: int) -> None:
"""Run the web server until cancelled."""
state._loop = asyncio.get_running_loop()
app = build_app(state)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
log.info("web UI on http://%s:%d", host, port)
try:
await asyncio.Event().wait()
finally:
await runner.cleanup()
INDEX_HTML = r"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>lorabot</title>
<style>
:root {
--bg: #0a0e0a;
--bg-alt: #0e140e;
--fg: #b6f5b6;
--dim: #4a7a4a;
--accent: #7fff7f;
--warn: #ffaa33;
--err: #ff5f5f;
--line: #1a2a1a;
}
* { box-sizing: border-box; }
html, body { height: 100%; margin: 0; }
body {
background: var(--bg);
color: var(--fg);
font: 13px/1.45 ui-monospace, "JetBrains Mono", "Fira Mono", "Cascadia Code", Menlo, Consolas, monospace;
display: grid;
grid-template-rows: auto 1fr auto;
height: 100vh;
overflow: hidden;
}
header, footer {
padding: 6px 14px;
border-bottom: 1px solid var(--line);
display: flex;
justify-content: space-between;
align-items: center;
gap: 12px;
flex-wrap: wrap;
}
footer { border-top: 1px solid var(--line); border-bottom: none; color: var(--dim); font-size: 12px; }
.title { color: var(--accent); letter-spacing: 0.5px; }
.title::before { content: ""; margin-right: 6px; }
.status-line { display: flex; gap: 16px; align-items: center; flex-wrap: wrap; color: var(--dim); }
.status-line .v { color: var(--fg); }
.dot { width: 8px; height: 8px; border-radius: 50%; background: var(--err); display: inline-block; vertical-align: middle; margin-right: 6px; }
.dot.ok { background: var(--accent); box-shadow: 0 0 6px var(--accent); }
main { display: grid; grid-template-columns: 280px 1fr; min-height: 0; }
aside {
border-right: 1px solid var(--line);
overflow-y: auto;
background: var(--bg-alt);
}
.tabs {
display: flex;
border-bottom: 1px dashed var(--line);
}
.tab-btn {
background: transparent;
color: var(--dim);
border: none;
border-right: 1px solid var(--line);
font: inherit;
font-size: 12px;
padding: 7px 12px;
cursor: pointer;
flex: 1;
text-align: left;
letter-spacing: 0.5px;
text-transform: lowercase;
}
.tab-btn:last-child { border-right: none; }
.tab-btn.active { color: var(--accent); }
.tab-btn:hover:not(.active) { color: var(--fg); }
.contact {
padding: 8px 14px;
border-bottom: 1px solid var(--line);
}
.contact .cname { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
.contact .cname::before { content: "@ "; color: var(--dim); }
.contact .cdetail { color: var(--dim); font-size: 11px; display: flex; justify-content: space-between; gap: 8px; margin-top: 2px; }
.conv {
padding: 8px 14px;
cursor: pointer;
border-bottom: 1px solid var(--line);
display: flex;
flex-direction: column;
gap: 2px;
}
.conv:hover { background: #102010; }
.conv.active { background: #14241a; color: var(--accent); }
.conv .row { display: flex; justify-content: space-between; align-items: baseline; gap: 8px; }
.conv .name { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; flex: 1; }
.conv .name::before { content: "> "; color: var(--dim); }
.conv.active .name::before { color: var(--accent); }
.conv .count { color: var(--dim); }
.conv .pk { color: var(--dim); font-size: 11px; }
.conv .preview { color: var(--dim); font-size: 11px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
section { display: flex; flex-direction: column; min-height: 0; }
.thread-head {
padding: 8px 16px;
color: var(--dim);
border-bottom: 1px dashed var(--line);
display: flex;
justify-content: space-between;
gap: 12px;
}
.thread-head .who { color: var(--accent); }
.thread { flex: 1; overflow-y: auto; padding: 12px 16px; }
.msg { margin: 10px 0; }
.msg .head { color: var(--dim); font-size: 11px; margin-bottom: 2px; }
.msg .body { white-space: pre-wrap; word-wrap: break-word; }
.msg.user .head .role { color: var(--accent); }
.msg.assistant .head .role { color: var(--warn); }
.empty { color: var(--dim); padding: 24px 14px; text-align: center; }
.divider {
color: var(--dim);
text-align: center;
margin: 14px 0;
letter-spacing: 2px;
font-size: 11px;
}
.divider::before, .divider::after { content: "─── "; }
.divider::after { content: " ───"; }
::-webkit-scrollbar { width: 8px; height: 8px; }
::-webkit-scrollbar-thumb { background: var(--line); }
::-webkit-scrollbar-thumb:hover { background: #2a4a2a; }
::selection { background: var(--accent); color: var(--bg); }
.btn {
background: transparent;
color: var(--accent);
border: 1px solid var(--line);
font: inherit;
padding: 2px 8px;
cursor: pointer;
letter-spacing: 0.5px;
}
.btn:hover:not(:disabled) { background: #14241a; border-color: var(--accent); }
.btn:active:not(:disabled) { background: #1a3020; }
.btn:disabled { color: var(--dim); cursor: not-allowed; }
.btn.busy { color: var(--warn); border-color: var(--warn); }
.fail { color: var(--err); }
.cursor::after { content: ""; color: var(--accent); animation: blink 1s steps(2) infinite; margin-left: 2px; }
@keyframes blink { 50% { opacity: 0; } }
@media (max-width: 720px) {
main { grid-template-columns: 1fr; grid-template-rows: 200px 1fr; }
aside { border-right: none; border-bottom: 1px solid var(--line); }
}
</style>
</head>
<body>
<header>
<span class="title">lorabot</span>
<span class="status-line">
<span><span class="dot" id="dot"></span><span id="conn">…</span></span>
<span>port: <span class="v" id="port">—</span></span>
<span>model: <span class="v" id="model">—</span></span>
<span>node: <span class="v" id="node">—</span></span>
<span>advert: <span class="v" id="advert">—</span></span>
<button id="advert-btn" class="btn" type="button" disabled>[ advertise ]</button>
</span>
</header>
<main>
<aside>
<div class="tabs">
<button class="tab-btn active" id="tab-convs" type="button">conversations</button>
<button class="tab-btn" id="tab-contacts" type="button">contacts (<span id="contact-count">—</span>)</button>
</div>
<div id="sidebar"><div class="empty">none yet<span class="cursor"></span></div></div>
</aside>
<section>
<div class="thread-head"><span id="who">— no conversation selected —</span><span id="pk"></span></div>
<div id="thread" class="thread"><div class="empty">select a conversation on the left</div></div>
</section>
</main>
<footer>
<span id="footinfo">stream: connecting<span class="cursor"></span></span>
<span id="uptime"></span>
</footer>
<script>
"use strict";
const $ = (id) => document.getElementById(id);
let conversations = [];
let contacts = [];
let selectedKey = null;
let connectedSince = null;
let activeTab = "convs";
function escapeHTML(s) {
return (s == null ? "" : String(s)).replace(/[&<>"']/g, (c) => ({
"&": "&amp;", "<": "&lt;", ">": "&gt;", '"': "&quot;", "'": "&#39;"
})[c]);
}
function parseTs(iso) {
if (!iso) return null;
// SQLite "YYYY-MM-DD HH:MM:SS" is UTC; ISO with offset is also fine.
const s = iso.includes("T") ? iso : iso.replace(" ", "T") + "Z";
const d = new Date(s);
return isNaN(d.getTime()) ? null : d;
}
function fmtTime(iso) {
const d = parseTs(iso);
return d ? d.toLocaleString(undefined, { hour12: false }) : "";
}
function fmtUptime(since) {
const d = parseTs(since);
if (!d) return "";
let s = Math.max(0, Math.floor((Date.now() - d.getTime()) / 1000));
const days = Math.floor(s / 86400); s -= days * 86400;
const hrs = Math.floor(s / 3600); s -= hrs * 3600;
const mins = Math.floor(s / 60); s -= mins * 60;
const pad = (n) => String(n).padStart(2, "0");
return (days ? days + "d " : "") + `${pad(hrs)}:${pad(mins)}:${pad(s)}`;
}
async function fetchJSON(url) {
const r = await fetch(url);
if (!r.ok) throw new Error(`${url}: ${r.status}`);
return r.json();
}
function renderSidebar() {
const el = $("sidebar");
if (!conversations.length) {
el.innerHTML = '<div class="empty">none yet</div>';
return;
}
el.innerHTML = conversations.map((c) => `
<div class="conv${c.public_key === selectedKey ? " active" : ""}" data-pk="${escapeHTML(c.public_key)}">
<div class="row">
<span class="name">${escapeHTML(c.contact_name || "?")}</span>
<span class="count">[${c.message_count}]</span>
</div>
<div class="pk">${escapeHTML(c.public_key.slice(0, 16))}…</div>
<div class="preview">${escapeHTML((c.last_message || "").slice(0, 80))}</div>
</div>`).join("");
el.querySelectorAll(".conv").forEach((n) => {
n.addEventListener("click", () => select(n.dataset.pk));
});
}
function setHead(conv) {
if (!conv) {
$("who").textContent = "— no conversation selected —";
$("pk").textContent = "";
return;
}
$("who").innerHTML = `<span class="who">${escapeHTML(conv.contact_name || "?")}</span>`;
$("pk").textContent = conv.public_key;
}
function renderThread(messages, clearedAt) {
const el = $("thread");
if (!messages.length) {
el.innerHTML = '<div class="empty">empty conversation</div>';
return;
}
const parts = [];
let dividerEmitted = false;
for (const m of messages) {
if (!dividerEmitted && clearedAt && m.id > clearedAt) {
parts.push('<div class="divider">history cleared</div>');
dividerEmitted = true;
}
parts.push(renderMsg(m));
}
el.innerHTML = parts.join("");
el.scrollTop = el.scrollHeight;
}
function renderMsg(m) {
const arrow = m.role === "user" ? "&lt;" : "&gt;";
return `
<div class="msg ${m.role}">
<div class="head"><span class="role">${arrow} ${m.role}</span> · ${escapeHTML(fmtTime(m.created_at))}</div>
<div class="body">${escapeHTML(m.content)}</div>
</div>`;
}
let clearedAtByKey = {};
async function select(pk) {
selectedKey = pk;
renderSidebar();
try {
const data = await fetchJSON(`/api/conversations/${encodeURIComponent(pk)}`);
setHead(data.conversation);
clearedAtByKey[pk] = data.conversation.cleared_at_id || 0;
renderThread(data.messages, clearedAtByKey[pk]);
} catch (e) {
$("thread").innerHTML = `<div class="empty">load failed: ${escapeHTML(e.message)}</div>`;
}
}
function appendMessage(m) {
const el = $("thread");
const empty = el.querySelector(".empty");
if (empty) el.innerHTML = "";
const wasAtBottom = el.scrollTop + el.clientHeight >= el.scrollHeight - 30;
el.insertAdjacentHTML("beforeend", renderMsg(m));
if (wasAtBottom) el.scrollTop = el.scrollHeight;
}
function setStatus(s) {
const ok = !!s.connected;
$("dot").className = "dot" + (ok ? " ok" : "");
$("conn").textContent = ok ? "connected" : "disconnected";
$("port").textContent = s.serial_port || "";
$("model").textContent = s.model || "";
$("node").textContent = s.node_name || "";
const ad = $("advert");
if (s.last_advert_at) {
ad.textContent = fmtTime(s.last_advert_at) + (s.last_advert_ok === false ? " (failed)" : "");
ad.className = "v" + (s.last_advert_ok === false ? " fail" : "");
} else {
ad.textContent = "";
ad.className = "v";
}
const btn = $("advert-btn");
btn.disabled = !s.advertise_available;
connectedSince = s.connected_since;
if (s.contact_count != null) $("contact-count").textContent = s.contact_count;
tickUptime();
}
async function sendAdvert() {
const btn = $("advert-btn");
if (btn.disabled) return;
btn.disabled = true;
btn.classList.add("busy");
btn.textContent = "[ advertising… ]";
try {
const r = await fetch("/api/advertise", { method: "POST" });
if (!r.ok) throw new Error("HTTP " + r.status);
} catch (e) {
$("footinfo").textContent = "advert failed: " + e.message;
} finally {
btn.classList.remove("busy");
btn.textContent = "[ advertise ]";
// re-enable on next status event; if SSE is dead, give it back manually
setTimeout(() => { btn.disabled = false; }, 1500);
}
}
function tickUptime() {
$("uptime").textContent = connectedSince ? "uptime " + fmtUptime(connectedSince) : "";
}
setInterval(tickUptime, 1000);
function renderContacts() {
const el = $("sidebar");
if (!contacts.length) {
el.innerHTML = '<div class="empty">no contacts yet</div>';
return;
}
el.innerHTML = contacts.map((c) => `
<div class="contact">
<div class="cname">${escapeHTML(c.adv_name || "?")}</div>
<div class="cdetail">
<span>${escapeHTML(c.public_key.slice(0, 16))}…</span>
<span>${escapeHTML(fmtTime(c.seen_at))}</span>
</div>
</div>`).join("");
}
function setTab(tab) {
activeTab = tab;
$("tab-convs").classList.toggle("active", tab === "convs");
$("tab-contacts").classList.toggle("active", tab === "contacts");
if (tab === "convs") renderSidebar();
else refreshContacts();
}
async function refreshContacts() {
try {
contacts = await fetchJSON("/api/contacts");
$("contact-count").textContent = contacts.length;
} catch (e) { /* silently ignore */ }
if (activeTab === "contacts") renderContacts();
}
async function refreshList() {
conversations = await fetchJSON("/api/conversations");
renderSidebar();
}
function bumpConv(m) {
let conv = conversations.find((c) => c.public_key === m.public_key);
if (!conv) {
conv = {
public_key: m.public_key,
contact_name: m.contact_name || "?",
message_count: 0,
updated_at: m.created_at,
last_message: m.content,
};
conversations.unshift(conv);
} else {
conv.message_count += 1;
conv.contact_name = m.contact_name || conv.contact_name;
conv.last_message = m.content;
conv.updated_at = m.created_at;
conversations = [conv, ...conversations.filter((c) => c !== conv)];
}
renderSidebar();
}
function startStream() {
const es = new EventSource("/api/events");
es.addEventListener("open", () => { $("footinfo").textContent = "stream: live"; });
es.addEventListener("error", () => { $("footinfo").textContent = "stream: reconnecting…"; });
es.addEventListener("status", (e) => setStatus(JSON.parse(e.data)));
es.addEventListener("message", (e) => {
const m = JSON.parse(e.data);
bumpConv(m);
if (m.public_key === selectedKey) appendMessage(m);
});
return es;
}
(async function init() {
$("advert-btn").addEventListener("click", sendAdvert);
$("tab-convs").addEventListener("click", () => setTab("convs"));
$("tab-contacts").addEventListener("click", () => setTab("contacts"));
try {
setStatus(await fetchJSON("/api/status"));
await refreshList();
} catch (e) {
$("footinfo").textContent = "init failed: " + e.message;
}
startStream();
})();
</script>
</body>
</html>
"""
-3
View File
@@ -1,3 +0,0 @@
"""meshbot — MeshCore ↔ LLM bridge."""
__version__ = "0.1.0"
-23
View File
@@ -1,23 +0,0 @@
"""Entry point: ``python -m meshbot`` and the ``meshbot`` console script."""
from __future__ import annotations
import asyncio
import logging
from .bot import run
def _cli() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s: %(message)s",
)
try:
asyncio.run(run())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
_cli()
-86
View File
@@ -1,86 +0,0 @@
"""Main run loop: connect to the MeshCore device, route DMs through the LLM, reply."""
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict
from meshcore import EventType, MeshCore
from . import db
from .config import Settings
from .llm import LLMClient
from .messages import trim_to_bytes
log = logging.getLogger("meshbot")
async def run() -> None:
cfg = Settings()
db_conn = db.connect(cfg.storage.sqlite_path)
llm = LLMClient(
base_url=cfg.llm.base_url,
api_key=cfg.llm.api_key,
model=cfg.llm.model,
system_prompt=cfg.llm.system_prompt,
temperature=cfg.llm.temperature,
timeout=cfg.llm.request_timeout_seconds,
)
log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
await mc.ensure_contacts()
# One lock per sender so a burst of messages from the same peer is processed
# serially while different peers stay independent.
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def on_dm(event) -> None:
data = event.payload or {}
prefix = data.get("pubkey_prefix")
text = (data.get("text") or "").strip()
if not prefix or not text:
return
contact = mc.get_contact_by_key_prefix(prefix)
if contact is None:
log.info("ignoring DM from unknown sender %s", prefix)
return
public_key = contact["public_key"]
contact_name = contact.get("adv_name", "")
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
async with locks[public_key]:
db.upsert_conversation(db_conn, public_key, contact_name)
db.add_message(db_conn, public_key, "user", text)
history = db.get_history(db_conn, public_key)
try:
reply = await llm.reply(history)
except Exception:
log.exception("LLM call failed for %s", public_key[:12])
return
db.add_message(db_conn, public_key, "assistant", reply)
outgoing = trim_to_bytes(reply, cfg.message.max_bytes)
log.info("reply to %s (%d bytes): %s", public_key[:12], len(outgoing.encode("utf-8")), outgoing)
result = await mc.commands.send_msg(contact, outgoing)
if result.type == EventType.ERROR:
log.error("send_msg failed for %s: %s", public_key[:12], result.payload)
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
await mc.start_auto_message_fetching()
log.info("meshbot listening on %s", cfg.meshcore.serial_port)
try:
await asyncio.Event().wait()
finally:
mc.unsubscribe(sub)
await mc.stop_auto_message_fetching()
await mc.disconnect()
await llm.aclose()
db_conn.close()
-74
View File
@@ -1,74 +0,0 @@
"""Application settings loaded from TOML with env-var overrides."""
from __future__ import annotations
import os
from pathlib import Path
from pydantic import BaseModel, Field
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
TomlConfigSettingsSource,
)
class MeshCoreCfg(BaseModel):
serial_port: str
baud_rate: int = 115200
class LLMCfg(BaseModel):
base_url: str
api_key: str = "not-needed"
model: str
system_prompt: str = (
"You are a concise assistant on a low-bandwidth mesh radio. "
"Replies must be brief — under 180 bytes."
)
temperature: float = 0.7
request_timeout_seconds: float = 60.0
class StorageCfg(BaseModel):
sqlite_path: Path = Path("data/meshbot.db")
class MessageCfg(BaseModel):
max_bytes: int = Field(default=184, gt=0)
def _toml_path() -> Path:
return Path(os.environ.get("MESHBOT_CONFIG", "config.toml"))
class Settings(BaseSettings):
meshcore: MeshCoreCfg
llm: LLMCfg
storage: StorageCfg = StorageCfg()
message: MessageCfg = MessageCfg()
model_config = SettingsConfigDict(
env_prefix="MESHBOT_",
env_nested_delimiter="__",
toml_file=_toml_path(),
extra="ignore",
)
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
# Order = priority (highest first): init args > env > TOML > secrets.
return (
init_settings,
env_settings,
TomlConfigSettingsSource(settings_cls),
file_secret_settings,
)
-67
View File
@@ -1,67 +0,0 @@
"""SQLite persistence for per-sender conversation history."""
from __future__ import annotations
import sqlite3
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS conversations (
public_key TEXT PRIMARY KEY,
contact_name TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL REFERENCES conversations(public_key),
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id
ON messages(public_key, id);
"""
def connect(path: str | Path) -> sqlite3.Connection:
"""Open the SQLite DB, ensure the parent dir and schema exist, return the connection."""
db_path = Path(path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path, isolation_level=None) # autocommit
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA foreign_keys = ON;")
conn.executescript(SCHEMA)
return conn
def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None:
conn.execute(
"""
INSERT INTO conversations (public_key, contact_name)
VALUES (?, ?)
ON CONFLICT(public_key) DO UPDATE SET
contact_name = excluded.contact_name,
updated_at = datetime('now')
""",
(public_key, contact_name),
)
def add_message(conn: sqlite3.Connection, public_key: str, role: str, content: str) -> None:
conn.execute(
"INSERT INTO messages (public_key, role, content) VALUES (?, ?, ?)",
(public_key, role, content),
)
def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]:
"""Return the conversation as OpenAI chat messages, oldest first."""
rows = conn.execute(
"SELECT role, content FROM messages WHERE public_key = ? ORDER BY id ASC",
(public_key,),
).fetchall()
return [{"role": row["role"], "content": row["content"]} for row in rows]
-38
View File
@@ -1,38 +0,0 @@
"""Thin wrapper around the OpenAI Python SDK aimed at OpenAI-compatible servers."""
from __future__ import annotations
from openai import AsyncOpenAI
class LLMClient:
def __init__(
self,
*,
base_url: str,
api_key: str,
model: str,
system_prompt: str,
temperature: float,
timeout: float,
) -> None:
self._client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=timeout)
self._model = model
self._system_prompt = system_prompt
self._temperature = temperature
async def reply(self, history: list[dict[str, str]]) -> str:
"""Send the system prompt + ``history`` and return the assistant's text."""
messages: list[dict[str, str]] = [
{"role": "system", "content": self._system_prompt},
*history,
]
resp = await self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=self._temperature,
)
return (resp.choices[0].message.content or "").strip()
async def aclose(self) -> None:
await self._client.close()
-21
View File
@@ -1,21 +0,0 @@
"""Helpers for shaping outgoing mesh messages."""
from __future__ import annotations
def trim_to_bytes(text: str, max_bytes: int) -> str:
"""Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes.
Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit
invalid UTF-8 to the radio.
"""
if max_bytes <= 0:
return ""
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
cut = encoded[:max_bytes]
# Continuation bytes start with bits 10xxxxxx; rewind past them.
while cut and (cut[-1] & 0xC0) == 0x80:
cut = cut[:-1]
return cut.decode("utf-8", errors="ignore")
+59 -1
View File
@@ -1,4 +1,4 @@
from meshbot.messages import trim_to_bytes from lorabot.messages import split_to_bytes, trim_to_bytes
def test_short_ascii_passthrough(): def test_short_ascii_passthrough():
@@ -38,3 +38,61 @@ def test_zero_or_negative_max_bytes():
def test_empty_input(): def test_empty_input():
assert trim_to_bytes("", 184) == "" assert trim_to_bytes("", 184) == ""
# split_to_bytes
def test_split_short_input_single_chunk():
assert split_to_bytes("hello", 184) == ["hello"]
def test_split_long_input_two_chunks_drops_rest():
s = "x" * 500
chunks = split_to_bytes(s, 180, max_chunks=2)
assert chunks == ["x" * 180, "x" * 180]
assert sum(len(c.encode("utf-8")) for c in chunks) == 360
def test_split_exact_two_chunks_no_third():
s = "x" * 360
chunks = split_to_bytes(s, 180, max_chunks=2)
assert chunks == ["x" * 180, "x" * 180]
def test_split_does_not_break_multibyte():
# 4 emoji × 4 bytes = 16 bytes total. Budget 5 bytes/chunk → 1 emoji per chunk.
chunks = split_to_bytes("🎉🎉🎉🎉", 5, max_chunks=2)
assert chunks == ["🎉", "🎉"]
for c in chunks:
assert len(c.encode("utf-8")) == 4
def test_split_two_byte_char_at_boundary():
# "abäcd" → bytes: a b ä(2) c d = 6 bytes. Budget 3/chunk:
# chunk1 must end at "ab" (3rd byte is start of ä, can't include without continuation).
# chunk2: "äc" = 3 bytes.
chunks = split_to_bytes("abäcd", 3, max_chunks=2)
assert chunks[0] == "ab"
assert chunks[1] == "äc"
# "d" is dropped (over the budget).
def test_split_empty_input():
assert split_to_bytes("", 184) == []
def test_split_zero_max_bytes():
assert split_to_bytes("hi", 0) == []
def test_split_zero_chunks():
assert split_to_bytes("hi", 184, max_chunks=0) == []
def test_split_concat_is_prefix_of_input():
# The delivered text must always be a prefix of the original (no rearrangement).
src = "Hello world! 🎉 This is a longer message that should be split."
chunks = split_to_bytes(src, 20, max_chunks=2)
delivered = "".join(chunks)
assert src.startswith(delivered)