Compare commits

...

8 Commits

30 changed files with 2539 additions and 371 deletions
+8 -8
View File
@@ -19,12 +19,12 @@ FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
MESHBOT_CONFIG=/etc/meshbot/config.toml \
MESHBOT_STORAGE__SQLITE_PATH=/data/meshbot.db
LORABOT_CONFIG=/etc/lorabot/config.toml \
LORABOT_STORAGE__SQLITE_PATH=/data/lorabot.db
RUN useradd --system --home /app --shell /usr/sbin/nologin meshbot \
&& mkdir -p /data /etc/meshbot \
&& chown meshbot:meshbot /data
RUN useradd --system --home /app --shell /usr/sbin/nologin lorabot \
&& mkdir -p /data /etc/lorabot \
&& chown lorabot:lorabot /data
WORKDIR /app
@@ -32,8 +32,8 @@ COPY --from=builder /wheels/*.whl /tmp/wheels/
RUN pip install --no-cache-dir /tmp/wheels/*.whl \
&& rm -rf /tmp/wheels
USER meshbot
USER lorabot
VOLUME ["/data", "/etc/meshbot"]
VOLUME ["/data", "/etc/lorabot"]
ENTRYPOINT ["meshbot"]
ENTRYPOINT ["lorabot"]
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) [year] [fullname]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+52 -19
View File
@@ -1,9 +1,28 @@
# meshbot
# lorabot
Bridges a [MeshCore](https://meshcore.io) companion radio to any OpenAI-compatible LLM endpoint
(e.g. `llama-server`, vLLM, Ollama). Listens for direct messages on the device, runs each
conversation through the LLM with full per-sender history stored in SQLite, and replies back
over the mesh — trimmed to the MeshCore packet payload limit.
over the mesh — split into UTF-8-safe chunks within the MeshCore packet payload limit.
## Features
- **LLM-backed DMs** with persistent per-sender chat history; replies are split across multiple
packets when needed.
- **Reliable chunked delivery** via meshcore's `send_msg_with_retry`: each chunk waits for the
recipient ACK (correlated by `expected_ack` code) before the next chunk goes out, with a
configurable retry budget.
- **Contact persistence**: contacts are mirrored from the device into SQLite at connect and
kept in sync via `NEW_CONTACT` events, so resolution is local and survives a flaky or
cleared device contact list.
- **Inbound deduplication**: consecutive identical DMs from the same sender within a short
window are dropped (handles radio retransmits and impatient peers).
- **Slash commands**: `/help`, `/clear` (reset LLM context), `/thinking` (toggle thinking-mode
prompts).
- **LLM tool calling** (when the model supports it): a built-in weather tool (Open-Meteo, no
key); web search and URL fetch via [Tavily](https://tavily.com) when an API key is set.
- **Web UI** (read-only by default): live status, conversations, contact list, manual advert
button. Listens on `127.0.0.1:8080` by default.
## Quick start
@@ -14,19 +33,33 @@ pip install -e .
cp config.example.toml config.toml
# edit serial_port and [llm] in config.toml
python -m meshbot
python -m lorabot
```
Config file path defaults to `./config.toml` and can be overridden with `MESHBOT_CONFIG`.
Any field can be overridden via env vars, e.g. `MESHBOT_LLM__API_KEY=sk-...`.
Config file path defaults to `./config.toml` and can be overridden with `LORABOT_CONFIG`.
Any field can be overridden via env vars, e.g. `LORABOT_LLM__API_KEY=sk-...`.
The web UI is at <http://localhost:8080> while the bot is running.
## Layout
- `src/meshbot/bot.py` — connect, subscribe to `CONTACT_MSG_RECV`, dispatch each DM.
- `src/meshbot/db.py` — SQLite schema and per-conversation repo functions.
- `src/meshbot/llm.py``AsyncOpenAI` wrapper.
- `src/meshbot/messages.py` — UTF-8-safe byte-length trimming.
- `src/meshbot/config.py`TOML + env-var settings (pydantic-settings).
- [src/lorabot/bot.py](src/lorabot/bot.py) — connect, wire collaborators, subscribe to
`CONTACT_MSG_RECV` and `NEW_CONTACT`.
- [src/lorabot/transport.py](src/lorabot/transport.py)`MeshTransport`: contact resolution
(DB-backed) and reliable chunked sending with per-contact serialization.
- [src/lorabot/handler.py](src/lorabot/handler.py)DM pipeline: dedup → command dispatch
or LLM call → send → persist.
- [src/lorabot/commands.py](src/lorabot/commands.py) — slash-command parser and registry.
- [src/lorabot/llm.py](src/lorabot/llm.py) — `AsyncOpenAI` wrapper with tool-call loop.
- [src/lorabot/tools/](src/lorabot/tools/) — pluggable LLM tools (weather, web search,
fetch URL).
- [src/lorabot/db.py](src/lorabot/db.py) — SQLite schema and repo functions for
conversations, messages, contacts.
- [src/lorabot/messages.py](src/lorabot/messages.py) — UTF-8-safe byte-length splitting.
- [src/lorabot/web.py](src/lorabot/web.py) — aiohttp web UI (status, conversations,
contacts, SSE).
- [src/lorabot/config.py](src/lorabot/config.py) — TOML + env-var settings
(pydantic-settings).
## Docker
@@ -34,23 +67,23 @@ Build and push a multi-arch image (`linux/amd64` + `linux/arm64`):
```sh
docker login registry.example.com # once
export MESHBOT_IMAGE=registry.example.com/team/meshbot
export LORABOT_IMAGE=registry.example.com/team/lorabot
./scripts/build-and-push.sh # tags: latest + <git sha>
EXTRA_TAGS="v0.1.0" ./scripts/build-and-push.sh # add explicit version
PUSH=0 PLATFORMS=linux/amd64 ./scripts/build-and-push.sh # local load only
```
Run via compose (set `MESHBOT_IMAGE`, `MESHBOT_LLM_BASE_URL`, `MESHBOT_LLM_MODEL`,
optionally `MESHBOT_DEVICE`):
Run via compose (set `LORABOT_IMAGE`, `LORABOT_LLM_BASE_URL`, `LORABOT_LLM_MODEL`,
optionally `LORABOT_DEVICE`):
```sh
export MESHBOT_IMAGE=registry.example.com/team/meshbot:latest
export MESHBOT_LLM_BASE_URL=http://llama:8080/v1
export MESHBOT_LLM_MODEL=llama-3.1-8b-instruct
export MESHBOT_DEVICE=/dev/ttyUSB0
export LORABOT_IMAGE=registry.example.com/team/lorabot:latest
export LORABOT_LLM_BASE_URL=http://llama:8080/v1
export LORABOT_LLM_MODEL=llama-3.1-8b-instruct
export LORABOT_DEVICE=/dev/ttyUSB0
docker compose up -d
```
The container expects `config.toml` mounted at `/etc/meshbot/config.toml` and
The container expects `config.toml` mounted at `/etc/lorabot/config.toml` and
persists SQLite to a named volume at `/data`. Any field can still be overridden
via `MESHBOT_<SECTION>__<KEY>` env vars.
via `LORABOT_<SECTION>__<KEY>` env vars.
+44 -7
View File
@@ -1,8 +1,13 @@
# Copy this file to `config.toml` and edit. The path can be overridden with
# the MESHBOT_CONFIG environment variable. Any field can be overridden with
# environment variables of the form MESHBOT_<SECTION>__<KEY>, e.g.
# MESHBOT_LLM__BASE_URL=http://llama:8080/v1
# MESHBOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0
# the LORABOT_CONFIG environment variable. Any field can be overridden with
# environment variables of the form LORABOT_<SECTION>__<KEY>, e.g.
# LORABOT_LLM__BASE_URL=http://llama:8080/v1
# LORABOT_MESHCORE__SERIAL_PORT=/dev/ttyACM0
[logging]
# DEBUG | INFO | WARNING | ERROR | CRITICAL (case-insensitive).
# DEBUG adds per-iteration LLM request logs and Tavily request details.
level = "INFO"
[meshcore]
serial_port = "/dev/ttyUSB0"
@@ -11,15 +16,47 @@ baud_rate = 115200
[llm]
base_url = "http://localhost:8080/v1"
api_key = "not-needed"
model = "llama-3.1-8b-instruct"
model = "gemma-4-E4B"
system_prompt = "You are a concise assistant on a low-bandwidth mesh radio. Replies must be brief — under 180 bytes."
temperature = 0.7
temperature = 1.0
request_timeout_seconds = 60
[storage]
sqlite_path = "data/meshbot.db"
sqlite_path = "data/lorabot.db"
[message]
# MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame
# headers further constrain the usable payload on your device.
max_bytes = 184
# Per-attempt ACK wait. 0 = trust the device's path-aware suggestion (recommended).
# Set a positive value only if you need to override that suggestion.
ack_timeout_seconds = 0
# How many times to retry a chunk after failure (0 = no retries). Total attempts
# = send_retries + 1. With send_retries >= 2 the third attempt onwards is sent as
# a flood broadcast (multi-hop) instead of direct.
send_retries = 1
[web]
# Built-in read-only web UI: stored conversations + live status.
enabled = true
host = "127.0.0.1"
port = 8080
[advertise]
# MeshCore companions don't advertise on their own. Lorabot can do it for them
# at a fixed cadence so the node stays visible on the mesh. Set
# interval_seconds = 0 to disable auto-advert (the web UI button still works).
enabled = true
interval_seconds = 3600
at_startup = true
# Flood = multi-hop advert across the mesh. False = zero-hop (neighbors only).
flood = true
# LLM tool calling. The weather tool (Open-Meteo, no key) is always on. Tools
# in this section are optional and only registered when configured. Requires a
# tool-capable model on the LLM server (Llama 3.1, Qwen, Hermes, …); models
# without tool support will simply ignore them.
[tools.tavily]
# Web search + page extraction via https://tavily.com (free tier available).
# Leave empty to disable both web_search and fetch_url tools.
api_key = ""
+19 -13
View File
@@ -1,25 +1,31 @@
services:
meshbot:
image: ${MESHBOT_IMAGE:?set MESHBOT_IMAGE to your image reference}
container_name: meshbot
lorabot:
image: ${LORABOT_IMAGE:?set LORABOT_IMAGE to your image reference}
container_name: lorabot
restart: unless-stopped
# MeshCore companion is on a USB serial port. Map the host device through to
# the container. Override MESHBOT_DEVICE for ttyACM0 etc.
# the container. Override LORABOT_DEVICE for ttyACM0 etc.
devices:
- "${MESHBOT_DEVICE:-/dev/ttyUSB0}:${MESHBOT_DEVICE:-/dev/ttyUSB0}"
- "${LORABOT_DEVICE:-/dev/ttyUSB0}:${LORABOT_DEVICE:-/dev/ttyUSB0}"
# Some serial chipsets need access to the dialout group on the host.
group_add:
- dialout
environment:
MESHBOT_MESHCORE__SERIAL_PORT: ${MESHBOT_DEVICE:-/dev/ttyUSB0}
MESHBOT_LLM__BASE_URL: ${MESHBOT_LLM_BASE_URL:?set MESHBOT_LLM_BASE_URL}
MESHBOT_LLM__API_KEY: ${MESHBOT_LLM_API_KEY:-not-needed}
MESHBOT_LLM__MODEL: ${MESHBOT_LLM_MODEL:?set MESHBOT_LLM_MODEL}
LORABOT_MESHCORE__SERIAL_PORT: ${LORABOT_DEVICE:-/dev/ttyUSB0}
LORABOT_LLM__BASE_URL: ${LORABOT_LLM_BASE_URL:?set LORABOT_LLM_BASE_URL}
LORABOT_LLM__API_KEY: ${LORABOT_LLM_API_KEY:-not-needed}
LORABOT_LLM__MODEL: ${LORABOT_LLM_MODEL:?set LORABOT_LLM_MODEL}
# The app defaults to loopback; inside the container we need 0.0.0.0 so the
# docker port mapping below can reach it. Restrict exposure at the host port.
LORABOT_WEB__HOST: ${LORABOT_WEB_HOST:-0.0.0.0}
ports:
# Built-in read-only web UI. Override via LORABOT_WEB_PORT.
- "${LORABOT_WEB_PORT:-8080}:8080"
volumes:
- meshbot-data:/data
# Mount your config.toml at /etc/meshbot/config.toml. Anything not set in the
- lorabot-data:/data
# Mount your config.toml at /etc/lorabot/config.toml. Anything not set in the
# TOML will fall back to defaults; env vars above always win.
- ./config.toml:/etc/meshbot/config.toml:ro
- ./config.toml:/etc/lorabot/config.toml:ro
volumes:
meshbot-data:
lorabot-data:
+3 -2
View File
@@ -3,7 +3,7 @@ requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "meshbot"
name = "lorabot"
version = "0.1.0"
description = "Bridge a MeshCore companion radio to an OpenAI-compatible LLM endpoint."
readme = "README.md"
@@ -11,6 +11,7 @@ requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Tobias Huttinger" }]
dependencies = [
"aiohttp>=3.9",
"meshcore>=2.3",
"openai>=1.40",
"pydantic>=2.7",
@@ -24,7 +25,7 @@ dev = [
]
[project.scripts]
meshbot = "meshbot.__main__:_cli"
lorabot = "lorabot.__main__:_cli"
[tool.setuptools.packages.find]
where = ["src"]
+27 -9
View File
@@ -2,25 +2,43 @@
# Build a multi-arch (linux/amd64, linux/arm64) image and push it to a registry.
#
# Required:
# MESHBOT_IMAGE Full image reference, e.g. registry.example.com/team/meshbot
# LORABOT_IMAGE Full image reference, e.g. registry.example.com/team/lorabot
#
# Optional:
# PLATFORMS Comma-separated arch list (default: linux/amd64,linux/arm64)
# EXTRA_TAGS Space-separated additional tags to push (e.g. "stable v0.1.0")
# PUSH "1" (default) to push, "0" to build and load locally only
# (note: --load only works with a single platform)
# BUILDER buildx builder name (default: meshbot-builder, auto-created)
# BUILDER buildx builder name (default: lorabot-builder, auto-created)
set -euo pipefail
if [[ -z "${MESHBOT_IMAGE:-}" ]]; then
echo "error: MESHBOT_IMAGE is required (e.g. registry.example.com/team/meshbot)" >&2
if [[ -z "${LORABOT_IMAGE:-}" ]]; then
echo "error: LORABOT_IMAGE is required (e.g. registry.example.com/team/lorabot)" >&2
exit 1
fi
# Preflight: buildx must be installed (it's a separate package on Arch / some
# minimal distros). If it's missing, `docker buildx ...` is parsed by docker as
# `docker ...` with unknown flags, producing a confusing "unknown flag" error.
if ! docker buildx version >/dev/null 2>&1; then
echo "error: docker buildx is not available." >&2
echo " Arch: sudo pacman -S docker-buildx" >&2
echo " Debian/Ubu: sudo apt install docker-buildx-plugin" >&2
echo " Other: https://github.com/docker/buildx#installing" >&2
exit 1
fi
# Preflight: daemon reachable.
if ! docker info >/dev/null 2>&1; then
echo "error: cannot reach the docker daemon (is it running? are you in the 'docker' group?)" >&2
echo " Arch: sudo systemctl start docker" >&2
exit 1
fi
PLATFORMS="${PLATFORMS:-linux/amd64,linux/arm64}"
PUSH="${PUSH:-1}"
BUILDER="${BUILDER:-meshbot-builder}"
BUILDER="${BUILDER:-lorabot-builder}"
cd "$(dirname "$0")/.."
@@ -34,9 +52,9 @@ else
GIT_SHA="dev"
fi
TAGS=(--tag "${MESHBOT_IMAGE}:latest" --tag "${MESHBOT_IMAGE}:${GIT_SHA}")
TAGS=(--tag "${LORABOT_IMAGE}:latest" --tag "${LORABOT_IMAGE}:${GIT_SHA}")
for t in ${EXTRA_TAGS:-}; do
TAGS+=(--tag "${MESHBOT_IMAGE}:${t}")
TAGS+=(--tag "${LORABOT_IMAGE}:${t}")
done
# Make sure a buildx builder exists. Reuse if it's already there.
@@ -51,7 +69,7 @@ docker buildx inspect --bootstrap >/dev/null
OUTPUT_FLAG=()
if [[ "${PUSH}" == "1" ]]; then
OUTPUT_FLAG=(--push)
echo ">>> building & pushing ${MESHBOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}"
echo ">>> building & pushing ${LORABOT_IMAGE} (${PLATFORMS}) tags: latest, ${GIT_SHA}${EXTRA_TAGS:+, $EXTRA_TAGS}"
else
# --load only works with a single platform; warn if user requested multi.
if [[ "${PLATFORMS}" == *,* ]]; then
@@ -59,7 +77,7 @@ else
exit 1
fi
OUTPUT_FLAG=(--load)
echo ">>> building ${MESHBOT_IMAGE} for ${PLATFORMS} (local load, no push)"
echo ">>> building ${LORABOT_IMAGE} for ${PLATFORMS} (local load, no push)"
fi
docker buildx build \
+3
View File
@@ -0,0 +1,3 @@
"""lorabot — MeshCore ↔ LLM bridge."""
__version__ = "0.1.0"
+34
View File
@@ -0,0 +1,34 @@
"""Entry point: ``python -m lorabot`` and the ``lorabot`` console script."""
from __future__ import annotations
import asyncio
import logging
from .bot import run
from .config import Settings
_LOG_FORMAT = "%(asctime)s %(levelname)-7s %(name)s: %(message)s"
def _cli() -> None:
# Bootstrap config so any error during Settings() loading is still logged
# nicely. ``force=True`` lets us reapply with the user's level afterwards.
logging.basicConfig(level=logging.INFO, format=_LOG_FORMAT)
try:
cfg = Settings()
except Exception:
logging.exception("failed to load configuration")
raise SystemExit(1) from None
level = getattr(logging, cfg.logging.level.upper(), logging.INFO)
logging.basicConfig(level=level, format=_LOG_FORMAT, force=True)
try:
asyncio.run(run(cfg))
except KeyboardInterrupt:
pass
if __name__ == "__main__":
_cli()
+159
View File
@@ -0,0 +1,159 @@
"""Main run loop: connect to the MeshCore device, wire collaborators, listen for DMs."""
from __future__ import annotations
import asyncio
import logging
from meshcore import EventType, MeshCore
from . import db, web
from .commands import build_default_registry
from .config import Settings
from .handler import build_dm_handler
from .llm import LLMClient
from .tools import build_default_registry as build_default_tool_registry
from .transport import MeshTransport
log = logging.getLogger("lorabot")
async def run(cfg: Settings | None = None) -> None:
# ``cfg`` is normally built by the entry point so logging can be configured
# from it before we get here; falling back to ``Settings()`` keeps direct
# ``run()`` calls (tests, embedding) working.
if cfg is None:
cfg = Settings()
db_conn = db.connect(cfg.storage.sqlite_path)
state = web.AppState(
db_conn,
model=cfg.llm.model,
serial_port=cfg.meshcore.serial_port,
loop=asyncio.get_running_loop(),
)
tool_registry = build_default_tool_registry(
tavily_api_key=cfg.tools.tavily.api_key,
)
llm = LLMClient(
base_url=cfg.llm.base_url,
api_key=cfg.llm.api_key,
model=cfg.llm.model,
system_prompt=cfg.llm.system_prompt,
temperature=cfg.llm.temperature,
timeout=cfg.llm.request_timeout_seconds,
tools=tool_registry,
)
web_task: asyncio.Task | None = None
if cfg.web.enabled:
web_task = asyncio.create_task(web.serve(state, cfg.web.host, cfg.web.port))
log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
try:
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
# Default is False: the lib only marks contacts dirty on ADVERTISEMENT and
# waits for the caller to re-pull. Turn it on so a fresh advert is reflected
# in the local cache before the peer's first DM lands. Not needed anymore since we're tracking
# contacts ourselves now.
# mc.auto_update_contacts = True
await mc.ensure_contacts()
transport = MeshTransport(
mc,
db_conn,
ack_timeout=cfg.message.ack_timeout_seconds,
send_retries=cfg.message.send_retries,
)
transport.sync_contacts()
except BaseException:
state.set_connected(False)
if web_task is not None:
web_task.cancel()
try:
await web_task
except (asyncio.CancelledError, Exception):
pass
await llm.aclose()
await tool_registry.aclose()
db_conn.close()
raise
state.set_connected(True, node_name=_self_name(mc))
async def _advertise() -> bool:
result = await mc.commands.send_advert(flood=cfg.advertise.flood)
ok = result.type != EventType.ERROR
if ok:
log.info("advert sent")
else:
log.warning("advert failed: %s", result.payload)
return ok
state.set_advertise_callback(_advertise)
advert_task: asyncio.Task | None = None
if cfg.advertise.enabled:
advert_task = asyncio.create_task(
_advert_loop(state, cfg.advertise.interval_seconds, cfg.advertise.at_startup)
)
registry = build_default_registry()
on_dm = build_dm_handler(
db_conn=db_conn,
llm=llm,
registry=registry,
state=state,
cfg=cfg,
transport=transport,
)
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
sub_contacts = mc.subscribe(EventType.NEW_CONTACT, transport.on_new_contact)
await mc.start_auto_message_fetching()
log.info("lorabot listening on %s", cfg.meshcore.serial_port)
try:
await asyncio.Event().wait()
finally:
state.set_connected(False)
state.set_advertise_callback(None)
mc.unsubscribe(sub)
mc.unsubscribe(sub_contacts)
await mc.stop_auto_message_fetching()
await mc.disconnect()
await llm.aclose()
await tool_registry.aclose()
for task in (advert_task, web_task):
if task is not None:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
db_conn.close()
async def _advert_loop(state: web.AppState, interval: int, at_startup: bool) -> None:
"""Periodically trigger adverts. ``interval == 0`` disables auto-repeat."""
if at_startup:
await state.trigger_advertise()
if interval <= 0:
return
while True:
await asyncio.sleep(interval)
await state.trigger_advertise()
def _self_name(mc: MeshCore) -> str | None:
"""Best-effort lookup of the local node's advertised name. Never raises."""
for attr in ("self_info", "node_info", "device_info"):
try:
info = getattr(mc, attr, None)
except Exception:
continue
if isinstance(info, dict):
name = info.get("adv_name") or info.get("name")
if name:
return str(name)
return None
+121
View File
@@ -0,0 +1,121 @@
"""Slash-command parser and registry for incoming DMs.
A command is any DM whose text starts with ``/``. The first whitespace-separated
token (case-insensitive) selects the handler; the rest is passed through as the
raw argument string. Handlers return the reply text; ``None`` means "no reply".
"""
from __future__ import annotations
import sqlite3
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from . import db
if TYPE_CHECKING:
from .config import Settings
from .web import AppState
@dataclass
class CommandContext:
"""Everything a command handler might need to read state or react."""
db_conn: sqlite3.Connection
public_key: str
contact_name: str
cfg: Settings
state: AppState
@dataclass
class CommandResult:
"""What a command returns. ``after_send`` runs once the assistant turn is persisted."""
reply: str | None = None
after_send: Callable[[CommandContext], Awaitable[None]] | None = field(default=None, repr=False)
CommandHandler = Callable[[CommandContext, str], Awaitable[CommandResult | str | None]]
@dataclass
class Command:
name: str # without leading slash, lowercase
description: str
handler: CommandHandler
class CommandRegistry:
def __init__(self) -> None:
self._commands: dict[str, Command] = {}
def register(self, name: str, description: str) -> Callable[[CommandHandler], CommandHandler]:
def decorator(fn: CommandHandler) -> CommandHandler:
self._commands[name.lower()] = Command(name.lower(), description, fn)
return fn
return decorator
def list(self) -> list[Command]:
return sorted(self._commands.values(), key=lambda c: c.name)
@staticmethod
def parse(text: str) -> tuple[str, str] | None:
"""Return ``(name, args)`` if ``text`` is a slash command, else ``None``."""
stripped = text.strip()
if not stripped.startswith("/"):
return None
head, _, rest = stripped[1:].partition(" ")
if not head:
return None
return head.lower(), rest.strip()
async def dispatch(self, ctx: CommandContext, text: str) -> CommandResult | None:
"""If ``text`` is a known command, run it. ``None`` means "not a command"."""
parsed = self.parse(text)
if parsed is None:
return None
name, args = parsed
cmd = self._commands.get(name)
if cmd is None:
return CommandResult(reply=f"unknown command: /{name}")
out = await cmd.handler(ctx, args)
if isinstance(out, CommandResult):
return out
return CommandResult(reply=out)
def build_default_registry() -> CommandRegistry:
"""Registry with the built-in commands wired up."""
reg = CommandRegistry()
async def _clear_after_send(ctx: CommandContext) -> None:
# Bump the watermark *after* the "history cleared." reply is persisted so
# neither side of this exchange leaks into the next LLM context.
db.clear_history(ctx.db_conn, ctx.public_key)
@reg.register("clear", "reset LLM context for this conversation")
async def _clear(_ctx: CommandContext, _args: str) -> CommandResult:
return CommandResult(reply="history cleared.", after_send=_clear_after_send)
@reg.register("thinking", "show or set thinking mode: /thinking [on|off]")
async def _thinking(ctx: CommandContext, args: str) -> str:
arg = args.strip().lower()
if not arg:
current = db.get_thinking_enabled(ctx.db_conn, ctx.public_key)
return f"thinking is {'on' if current else 'off'}"
if arg in ("on", "1", "true", "yes"):
db.set_thinking_enabled(ctx.db_conn, ctx.public_key, True)
return "thinking on"
if arg in ("off", "0", "false", "no"):
db.set_thinking_enabled(ctx.db_conn, ctx.public_key, False)
return "thinking off"
return "usage: /thinking [on|off]"
@reg.register("help", "list available commands")
async def _help(_ctx: CommandContext, _args: str) -> str:
return "\n".join(f"/{c.name}{c.description}" for c in reg.list())
return reg
+119
View File
@@ -0,0 +1,119 @@
"""Application settings loaded from TOML with env-var overrides."""
from __future__ import annotations
import os
from pathlib import Path
from pydantic import BaseModel, Field
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
TomlConfigSettingsSource,
)
class MeshCoreCfg(BaseModel):
serial_port: str
baud_rate: int = 115200
class LLMCfg(BaseModel):
base_url: str
api_key: str = "not-needed"
model: str
system_prompt: str = (
"You are a concise assistant on a low-bandwidth mesh radio. "
"Replies must be brief — under 180 bytes."
)
temperature: float = 0.7
request_timeout_seconds: float = 60.0
class StorageCfg(BaseModel):
sqlite_path: Path = Path("data/lorabot.db")
class MessageCfg(BaseModel):
max_bytes: int = Field(default=184, gt=0)
# 0 = use the device's path-aware suggested timeout (recommended). Set a
# positive value to override per attempt — useful only when the device's
# suggestion is consistently wrong for your link conditions.
ack_timeout_seconds: float = Field(default=0.0, ge=0)
# Number of retries after the first attempt. Total attempts = send_retries + 1.
# With send_retries >= 2 the third attempt onwards is sent as a flood broadcast
# (multi-hop) instead of direct — see meshcore's send_msg_with_retry.
send_retries: int = Field(default=1, ge=0)
class WebCfg(BaseModel):
enabled: bool = True
# Default to loopback. Conversation logs are unauthenticated; only set this to
# 0.0.0.0 (e.g. inside Docker) when you understand the exposure.
host: str = "127.0.0.1"
port: int = Field(default=8080, gt=0, lt=65536)
class LoggingCfg(BaseModel):
# Standard level names: DEBUG, INFO, WARNING, ERROR, CRITICAL. Case-insensitive.
level: str = "INFO"
class TavilyCfg(BaseModel):
# Sign up at https://tavily.com for a key. When empty, the web_search and
# fetch_url tools simply aren't registered (the rest of the bot is unaffected).
api_key: str = ""
class ToolsCfg(BaseModel):
tavily: TavilyCfg = TavilyCfg()
class AdvertiseCfg(BaseModel):
enabled: bool = True
# Seconds between automatic adverts. 0 = manual only (button still works).
interval_seconds: int = Field(default=3600, ge=0)
# Send one advert as soon as the device is connected.
at_startup: bool = True
# Flood the mesh (multi-hop) instead of a zero-hop advert.
flood: bool = False
def _toml_path() -> Path:
return Path(os.environ.get("LORABOT_CONFIG", "config.toml"))
class Settings(BaseSettings):
meshcore: MeshCoreCfg
llm: LLMCfg
storage: StorageCfg = StorageCfg()
message: MessageCfg = MessageCfg()
web: WebCfg = WebCfg()
advertise: AdvertiseCfg = AdvertiseCfg()
tools: ToolsCfg = ToolsCfg()
logging: LoggingCfg = LoggingCfg()
model_config = SettingsConfigDict(
env_prefix="LORABOT_",
env_nested_delimiter="__",
toml_file=_toml_path(),
extra="ignore",
)
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
# Order = priority (highest first): init args > env > TOML > secrets.
return (
init_settings,
env_settings,
TomlConfigSettingsSource(settings_cls),
file_secret_settings,
)
+176
View File
@@ -0,0 +1,176 @@
"""SQLite persistence for conversation history and contacts."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
public_key TEXT PRIMARY KEY,
adv_name TEXT,
data TEXT NOT NULL,
seen_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS conversations (
public_key TEXT PRIMARY KEY,
contact_name TEXT,
cleared_at_id INTEGER NOT NULL DEFAULT 0,
thinking_enabled INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL REFERENCES conversations(public_key),
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
hidden_from_llm INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id
ON messages(public_key, id);
"""
def connect(path: str | Path) -> sqlite3.Connection:
"""Open the SQLite DB, ensure the parent dir and schema exist, return the connection."""
db_path = Path(path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path, isolation_level=None) # autocommit
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA foreign_keys = ON;")
conn.executescript(SCHEMA)
_migrate(conn)
return conn
def _migrate(conn: sqlite3.Connection) -> None:
"""Apply additive migrations for DBs created before later columns existed."""
conv_cols = {row["name"] for row in conn.execute("PRAGMA table_info(conversations)")}
if "cleared_at_id" not in conv_cols:
conn.execute("ALTER TABLE conversations ADD COLUMN cleared_at_id INTEGER NOT NULL DEFAULT 0")
if "thinking_enabled" not in conv_cols:
conn.execute(
"ALTER TABLE conversations ADD COLUMN thinking_enabled INTEGER NOT NULL DEFAULT 0"
)
msg_cols = {row["name"] for row in conn.execute("PRAGMA table_info(messages)")}
if "hidden_from_llm" not in msg_cols:
conn.execute(
"ALTER TABLE messages ADD COLUMN hidden_from_llm INTEGER NOT NULL DEFAULT 0"
)
def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None:
conn.execute(
"""
INSERT INTO conversations (public_key, contact_name)
VALUES (?, ?)
ON CONFLICT(public_key) DO UPDATE SET
contact_name = excluded.contact_name,
updated_at = datetime('now')
""",
(public_key, contact_name),
)
def add_message(
conn: sqlite3.Connection,
public_key: str,
role: str,
content: str,
*,
hidden_from_llm: bool = False,
) -> None:
conn.execute(
"INSERT INTO messages (public_key, role, content, hidden_from_llm) VALUES (?, ?, ?, ?)",
(public_key, role, content, 1 if hidden_from_llm else 0),
)
def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]:
"""Return the conversation as OpenAI chat messages, oldest first.
Messages with ``id <= conversations.cleared_at_id`` are excluded so a peer can
reset their LLM context with ``/clear`` without losing the audit trail.
"""
rows = conn.execute(
"""
SELECT m.role, m.content
FROM messages m
JOIN conversations c ON c.public_key = m.public_key
WHERE m.public_key = ?
AND m.id > c.cleared_at_id
AND m.hidden_from_llm = 0
ORDER BY m.id ASC
""",
(public_key,),
).fetchall()
return [{"role": row["role"], "content": row["content"]} for row in rows]
def get_thinking_enabled(conn: sqlite3.Connection, public_key: str) -> bool:
row = conn.execute(
"SELECT thinking_enabled FROM conversations WHERE public_key = ?",
(public_key,),
).fetchone()
return bool(row["thinking_enabled"]) if row is not None else False
def set_thinking_enabled(conn: sqlite3.Connection, public_key: str, enabled: bool) -> None:
conn.execute(
"""
UPDATE conversations
SET thinking_enabled = ?, updated_at = datetime('now')
WHERE public_key = ?
""",
(1 if enabled else 0, public_key),
)
def upsert_contact(conn: sqlite3.Connection, contact: dict) -> None:
conn.execute(
"""
INSERT INTO contacts (public_key, adv_name, data)
VALUES (?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
adv_name = excluded.adv_name,
data = excluded.data,
seen_at = datetime('now')
""",
(contact["public_key"], contact.get("adv_name"), json.dumps(contact)),
)
def get_contact_by_key_prefix(conn: sqlite3.Connection, prefix: str) -> dict | None:
row = conn.execute(
"SELECT data FROM contacts WHERE public_key LIKE ? || '%'",
(prefix,),
).fetchone()
return json.loads(row["data"]) if row is not None else None
def clear_history(conn: sqlite3.Connection, public_key: str) -> None:
"""Bump the per-conversation watermark to the current max message id.
Old messages stay in the table for the web UI; future ``get_history`` calls
skip them.
"""
conn.execute(
"""
UPDATE conversations
SET cleared_at_id = COALESCE(
(SELECT MAX(id) FROM messages WHERE public_key = ?),
0
),
updated_at = datetime('now')
WHERE public_key = ?
""",
(public_key, public_key),
)
+150
View File
@@ -0,0 +1,150 @@
"""DM event handler: route incoming DMs through the command registry or the LLM."""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import time
from collections import defaultdict
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from . import db
from .commands import CommandContext, CommandRegistry, CommandResult
from .config import Settings
from .llm import LLMClient
from .transport import MeshTransport
from .web import AppState
log = logging.getLogger("lorabot")
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class _Deduplicator:
"""Drops consecutive identical messages from the same sender within a time window."""
def __init__(self, window: float = 15.0) -> None:
self._window = window
self._seen: dict[str, tuple[str, float]] = {}
def is_duplicate(self, key: str, text: str) -> bool:
now = time.monotonic()
prev_text, prev_time = self._seen.get(key, ("", 0.0))
if text == prev_text and now - prev_time < self._window:
return True
self._seen[key] = (text, now)
return False
def _store_and_publish(
db_conn: sqlite3.Connection,
state: AppState,
public_key: str,
contact_name: str,
role: str,
text: str,
hidden_from_llm: bool,
) -> None:
db.add_message(db_conn, public_key, role, text, hidden_from_llm=hidden_from_llm)
state.publish("message", {
"public_key": public_key,
"contact_name": contact_name,
"role": role,
"content": text,
"created_at": _now_iso(),
})
async def _generate_reply(
registry: CommandRegistry,
llm: LLMClient,
db_conn: sqlite3.Connection,
public_key: str,
ctx: CommandContext,
text: str,
is_command: bool,
) -> tuple[str, CommandResult | None] | None:
"""Dispatch to command or LLM. Returns (reply, cmd_result), or None to abort."""
if is_command:
cmd_result = await registry.dispatch(ctx, text)
if cmd_result is None or cmd_result.reply is None:
return None
return cmd_result.reply, cmd_result
thinking = db.get_thinking_enabled(db_conn, public_key)
try:
reply = await llm.reply(db.get_history(db_conn, public_key), thinking=thinking)
except Exception:
log.exception("LLM call failed for %s", public_key[:12])
return None
return reply, None
def build_dm_handler(
*,
db_conn: sqlite3.Connection,
llm: LLMClient,
registry: CommandRegistry,
state: AppState,
cfg: Settings,
transport: MeshTransport,
) -> Callable[[object], Awaitable[None]]:
"""Return an ``on_dm(event)`` closure with all collaborators bound."""
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
dedup = _Deduplicator(window=15.0)
async def on_dm(event) -> None:
data = event.payload or {}
prefix = data.get("pubkey_prefix")
text = (data.get("text") or "").strip()
if not prefix or not text:
return
contact = transport.resolve_contact(prefix)
if contact is None:
log.info("ignoring DM from unknown sender %s (no contact after refresh)", prefix)
return
public_key = contact["public_key"]
contact_name = contact.get("adv_name", "")
if dedup.is_duplicate(public_key, text):
log.info("dropping duplicate DM from %s (%s): %s", contact_name, public_key[:12], text)
return
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
is_command = registry.parse(text) is not None
async with locks[public_key]:
db.upsert_conversation(db_conn, public_key, contact_name)
_store_and_publish(db_conn, state, public_key, contact_name, "user", text,
hidden_from_llm=is_command)
ctx = CommandContext(
db_conn=db_conn,
public_key=public_key,
contact_name=contact_name,
cfg=cfg,
state=state,
)
result = await _generate_reply(registry, llm, db_conn, public_key, ctx, text, is_command)
if result is None:
return
reply, cmd_result = result
delivered = await transport.send_chunked(contact, reply, cfg.message.max_bytes)
if not delivered:
return
_store_and_publish(db_conn, state, public_key, contact_name, "assistant", delivered,
hidden_from_llm=is_command)
if cmd_result is not None and cmd_result.after_send is not None:
await cmd_result.after_send(ctx)
return on_dm
+132
View File
@@ -0,0 +1,132 @@
"""Thin wrapper around the OpenAI Python SDK aimed at OpenAI-compatible servers."""
from __future__ import annotations
import logging
from typing import Any
from openai import AsyncOpenAI
from .tools import ToolRegistry
log = logging.getLogger("lorabot")
# Hard cap on assistant <-> tool round-trips per ``reply`` call. Stops a misbehaving
# model from looping on tool calls forever; in practice 1-2 iterations is normal.
_MAX_TOOL_ITERATIONS = 5
class LLMClient:
def __init__(
self,
*,
base_url: str,
api_key: str,
model: str,
system_prompt: str,
temperature: float,
timeout: float,
tools: ToolRegistry | None = None,
) -> None:
self._client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=timeout)
self._model = model
self._system_prompt = system_prompt
self._temperature = temperature
self._tools = tools
async def reply(self, history: list[dict[str, str]], *, thinking: bool = False) -> str:
"""Send the system prompt + ``history`` and return the assistant's text.
``thinking`` toggles the llama.cpp/server chat-template kwarg ``enable_thinking``,
which controls whether the model prepends its hidden reasoning turn (Gemma-style).
Passed via OpenAI SDK ``extra_body`` since it is not part of the standard schema.
If a ``ToolRegistry`` is wired in, the model may emit ``tool_calls``; we
dispatch them, append the results, and re-call until the model produces
plain content (or the iteration cap kicks in).
"""
# Local working copy: we'll mutate this with assistant/tool turns across
# the tool loop without touching the persisted DB history.
messages: list[dict[str, Any]] = [
{"role": "system", "content": self._system_prompt},
*history,
]
# Build the tool spec list once. ``None`` (not ``[]``) means "don't send
# the tools field at all" — some servers reject an empty list.
tool_specs = self._tools.specs() if self._tools else None
for iteration in range(_MAX_TOOL_ITERATIONS):
# Only attach ``tools`` when we actually have any registered.
kwargs: dict[str, Any] = {}
if tool_specs:
kwargs["tools"] = tool_specs
log.debug(
"LLM request: model=%s iter=%d msgs=%d tools=%d thinking=%s",
self._model,
iteration + 1,
len(messages),
len(tool_specs) if tool_specs else 0,
thinking,
)
resp = await self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=self._temperature,
extra_body={"chat_template_kwargs": {"enable_thinking": thinking}},
**kwargs,
)
msg = resp.choices[0].message
# No tool calls → this is the final text answer; return it.
# Also bail if tools aren't even configured: nothing to dispatch to.
if not msg.tool_calls or self._tools is None:
return (msg.content or "").strip()
# Visible at INFO so a normal log lets you see when the model
# actually reached for a tool, and which one(s).
log.info(
"LLM requested tool calls (iter %d): %s",
iteration + 1,
[tc.function.name for tc in msg.tool_calls],
)
# Echo the assistant's tool-calling turn back into ``messages`` so the
# next round-trip sees it. The OpenAI protocol requires this exact
# shape (role=assistant + tool_calls list) before role=tool entries.
messages.append({
"role": "assistant",
"content": msg.content,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in msg.tool_calls
],
})
# Run each tool the model asked for and append its result. Errors are
# surfaced as plain strings inside the registry so the model can see
# them and recover (e.g. retry with a different argument).
for tc in msg.tool_calls:
result = await self._tools.dispatch(
tc.function.name, tc.function.arguments
)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
# Loop continues: re-call the model with the tool results in context.
log.warning("LLM tool loop exceeded %d iterations", _MAX_TOOL_ITERATIONS)
return "(tool loop limit exceeded)"
async def aclose(self) -> None:
await self._client.close()
+52
View File
@@ -0,0 +1,52 @@
"""Helpers for shaping outgoing mesh messages."""
from __future__ import annotations
def trim_to_bytes(text: str, max_bytes: int) -> str:
"""Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes.
Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit
invalid UTF-8 to the radio.
"""
if max_bytes <= 0:
return ""
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
cut = encoded[:max_bytes]
# Continuation bytes start with bits 10xxxxxx; rewind past them.
while cut and (cut[-1] & 0xC0) == 0x80:
cut = cut[:-1]
return cut.decode("utf-8", errors="ignore")
def split_to_bytes(text: str, max_bytes: int, max_chunks: int = 2) -> list[str]:
"""Split ``text`` into up to ``max_chunks`` UTF-8-safe chunks of ``max_bytes`` each.
Anything past ``max_chunks * max_bytes`` is dropped. Splits never land inside a
multi-byte UTF-8 sequence on either side of the boundary.
"""
if max_bytes <= 0 or max_chunks <= 0 or not text:
return []
encoded = text.encode("utf-8")
n = len(encoded)
chunks: list[str] = []
pos = 0
for _ in range(max_chunks):
if pos >= n:
break
end = min(pos + max_bytes, n)
if end < n:
# Rewind past trailing UTF-8 continuation bytes (10xxxxxx).
while end > pos and (encoded[end - 1] & 0xC0) == 0x80:
end -= 1
# And past a dangling leader byte (11xxxxxx) whose continuations
# would fall outside the budget.
if end > pos and (encoded[end - 1] & 0xC0) == 0xC0:
end -= 1
if end == pos:
break
chunks.append(encoded[pos:end].decode("utf-8"))
pos = end
return chunks
+29
View File
@@ -0,0 +1,29 @@
"""LLM tool calling: registry + built-in tools."""
from __future__ import annotations
from .base import Tool, ToolRegistry
from .weather import WeatherTool
from .web import FetchUrlTool, WebSearchTool
def build_default_registry(*, tavily_api_key: str = "") -> ToolRegistry:
"""Build the default registry. Tavily-backed tools are only registered
when an API key is configured — without one, the bot silently runs with
just the offline tools."""
reg = ToolRegistry()
reg.register(WeatherTool())
if tavily_api_key:
reg.register(WebSearchTool(api_key=tavily_api_key))
reg.register(FetchUrlTool(api_key=tavily_api_key))
return reg
__all__ = [
"Tool",
"ToolRegistry",
"WeatherTool",
"WebSearchTool",
"FetchUrlTool",
"build_default_registry",
]
+90
View File
@@ -0,0 +1,90 @@
"""Tool abstraction and registry for OpenAI-style function calling."""
from __future__ import annotations
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import Any
log = logging.getLogger("lorabot")
def _truncate_for_log(s: str, max_len: int = 200) -> str:
"""Cap a log field so a giant tool argument doesn't blow up log lines."""
return s if len(s) <= max_len else s[:max_len] + ""
class Tool(ABC):
"""Single LLM-callable function. Subclasses set ``name``/``description``/
``parameters`` (JSON Schema) and implement ``run``."""
name: str
description: str
parameters: dict[str, Any]
@abstractmethod
async def run(self, **kwargs: Any) -> str:
...
async def aclose(self) -> None:
return None
def spec(self) -> dict[str, Any]:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
class ToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, Tool] = {}
def register(self, tool: Tool) -> None:
self._tools[tool.name] = tool
def specs(self) -> list[dict[str, Any]]:
return [t.spec() for t in self._tools.values()]
def __bool__(self) -> bool:
return bool(self._tools)
async def dispatch(self, name: str, arguments_json: str) -> str:
"""Run the named tool with JSON-encoded arguments. Errors are returned
as plain strings so the LLM can see and react to them."""
tool = self._tools.get(name)
if tool is None:
log.warning("tool call rejected: unknown tool %r", name)
return f"error: unknown tool {name!r}"
try:
args = json.loads(arguments_json) if arguments_json else {}
except json.JSONDecodeError as exc:
log.warning("tool %s: bad arguments JSON: %s", name, exc)
return f"error: bad arguments JSON ({exc})"
# Log entry + exit so failures are easy to attribute and we get a
# rough timing picture per tool. Args are truncated to keep logs sane.
log.info("tool %s called: %s", name, _truncate_for_log(arguments_json or "{}"))
started = time.monotonic()
try:
result = await tool.run(**args)
except Exception as exc:
elapsed_ms = (time.monotonic() - started) * 1000
log.exception("tool %s failed after %.0f ms", name, elapsed_ms)
return f"error: {exc}"
elapsed_ms = (time.monotonic() - started) * 1000
log.info("tool %s ok in %.0f ms (%d chars)", name, elapsed_ms, len(result))
return result
async def aclose(self) -> None:
for tool in self._tools.values():
try:
await tool.aclose()
except Exception:
log.exception("aclose failed for tool %s", tool.name)
+201
View File
@@ -0,0 +1,201 @@
"""Weather tool backed by Open-Meteo (no API key).
Returns the current observation, and optionally a multi-day daily forecast
(min/max temperature + weather code per day, in the location's local timezone).
"""
from __future__ import annotations
from datetime import date
import aiohttp
from .base import Tool
GEOCODE_URL = "https://geocoding-api.open-meteo.com/v1/search"
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
# Hard upper bound on daily forecast length. Open-Meteo allows up to 16 days but
# bandwidth and model context favor keeping replies compact.
_MAX_FORECAST_DAYS = 7
# WMO weather interpretation codes — compact summaries to keep LoRa replies short.
_WMO: dict[int, str] = {
0: "clear",
1: "mostly clear", 2: "partly cloudy", 3: "overcast",
45: "fog", 48: "rime fog",
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
56: "freezing drizzle", 57: "freezing drizzle",
61: "light rain", 63: "rain", 65: "heavy rain",
66: "freezing rain", 67: "freezing rain",
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
80: "rain showers", 81: "rain showers", 82: "violent showers",
85: "snow showers", 86: "heavy snow showers",
95: "thunderstorm", 96: "thunderstorm w/ hail", 99: "thunderstorm w/ hail",
}
class WeatherTool(Tool):
name = "get_weather"
description = (
"Current weather conditions for a place, optionally followed by a daily "
"forecast. Accepts a location name (e.g. 'Berlin', 'San Francisco, US') "
"or 'lat,lon' decimal coordinates. Set forecast_days to include the next "
"N days (today inclusive); 0 returns only the current observation."
)
parameters = {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City/place name, or 'lat,lon' decimal coordinates.",
},
"forecast_days": {
"type": "integer",
"description": (
f"Number of daily-forecast days to include (0{_MAX_FORECAST_DAYS}, "
"today inclusive). Defaults to 0 (current weather only)."
),
"minimum": 0,
"maximum": _MAX_FORECAST_DAYS,
"default": 0,
},
},
"required": ["location"],
}
def __init__(self) -> None:
self._session: aiohttp.ClientSession | None = None
async def aclose(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10)
)
return self._session
async def run(self, location: str = "", forecast_days: int = 0) -> str:
location = (location or "").strip()
if not location:
return "error: location is required"
# Clamp defensively: model may ignore the schema bounds.
try:
forecast_days = max(0, min(int(forecast_days), _MAX_FORECAST_DAYS))
except (TypeError, ValueError):
forecast_days = 0
coords = _parse_latlon(location)
if coords is not None:
lat, lon = coords
label = f"{lat:.3f},{lon:.3f}"
else:
geo = await self._geocode(location)
if geo is None:
return f"error: could not find location {location!r}"
lat, lon, label = geo
return await self._forecast(lat, lon, label, forecast_days)
async def _geocode(self, name: str) -> tuple[float, float, str] | None:
session = await self._get_session()
async with session.get(
GEOCODE_URL,
params={"name": name, "count": "1", "format": "json"},
) as resp:
resp.raise_for_status()
data = await resp.json()
results = data.get("results") or []
if not results:
return None
r = results[0]
bits = [r.get("name") or name]
if cc := r.get("country_code"):
bits.append(str(cc))
return float(r["latitude"]), float(r["longitude"]), ", ".join(bits)
async def _forecast(
self, lat: float, lon: float, label: str, forecast_days: int
) -> str:
# Build the request. ``timezone=auto`` makes daily aggregations align to
# the location's local calendar day instead of UTC.
params: dict[str, str] = {
"latitude": str(lat),
"longitude": str(lon),
"current": "temperature_2m,weather_code,wind_speed_10m",
"timezone": "auto",
}
if forecast_days > 0:
params["daily"] = "temperature_2m_max,temperature_2m_min,weather_code"
params["forecast_days"] = str(forecast_days)
session = await self._get_session()
async with session.get(FORECAST_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
# Current observation line (always present).
cur = data.get("current") or {}
temp = cur.get("temperature_2m")
code = cur.get("weather_code")
wind = cur.get("wind_speed_10m")
cond = _WMO.get(int(code), f"code {code}") if code is not None else "?"
cur_bits = [f"{label} now"]
if temp is not None:
cur_bits.append(f"{temp}°C")
cur_bits.append(cond)
if wind is not None:
cur_bits.append(f"wind {wind} km/h")
lines = [", ".join(cur_bits)]
# Daily forecast block (one line per day): "Mon 8-17°C, partly cloudy".
# Open-Meteo returns parallel arrays under ``daily`` keyed by date.
if forecast_days > 0:
daily = data.get("daily") or {}
days = daily.get("time") or []
tmax = daily.get("temperature_2m_max") or []
tmin = daily.get("temperature_2m_min") or []
codes = daily.get("weather_code") or []
for i, day_str in enumerate(days):
day_label = _short_day(day_str)
hi = tmax[i] if i < len(tmax) else None
lo = tmin[i] if i < len(tmin) else None
day_code = codes[i] if i < len(codes) else None
day_cond = (
_WMO.get(int(day_code), f"code {day_code}")
if day_code is not None
else "?"
)
if lo is not None and hi is not None:
lines.append(f"{day_label} {lo}{hi}°C, {day_cond}")
else:
lines.append(f"{day_label} {day_cond}")
return "; ".join(lines)
def _short_day(iso_date: str) -> str:
"""Render an ISO date (``YYYY-MM-DD``) as a short weekday (``Mon``).
Falls back to the raw string if parsing fails."""
try:
return date.fromisoformat(iso_date).strftime("%a")
except (TypeError, ValueError):
return iso_date
def _parse_latlon(text: str) -> tuple[float, float] | None:
if "," not in text:
return None
a, b = text.split(",", 1)
try:
lat = float(a.strip())
lon = float(b.strip())
except ValueError:
return None
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
return None
return lat, lon
+193
View File
@@ -0,0 +1,193 @@
"""Web search + page-extraction tools backed by Tavily (https://tavily.com).
Exposes two tools to the LLM:
- ``web_search`` — query Tavily for a list of {title, url, snippet} hits.
- ``fetch_url`` — pull the readable body text of a single page via Tavily's
``/extract`` endpoint (so we don't have to parse HTML ourselves).
Tavily handles all the messy bits — URL fetching, redirects, boilerplate
stripping, robots — so this module is just a thin HTTP wrapper.
"""
from __future__ import annotations
import logging
import aiohttp
from .base import Tool
log = logging.getLogger("lorabot")
_SEARCH_URL = "https://api.tavily.com/search"
_EXTRACT_URL = "https://api.tavily.com/extract"
# Sane defaults for the search tool. Upper bound keeps the tool message small
# enough that the LoRa-side LLM can still reason over it.
_DEFAULT_RESULTS = 5
_MAX_RESULTS = 10
# Cap extracted-page body size so a giant article doesn't blow the model's
# context. ~4 KB is plenty to summarize from for a 180-byte LoRa reply.
_MAX_EXTRACT_CHARS = 4000
class _TavilyHTTP:
"""Tiny per-tool HTTP helper. Each tool instance owns one ``aiohttp``
session; lazily created on first request, closed via ``aclose``."""
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._session: aiohttp.ClientSession | None = None
async def post(self, url: str, payload: dict) -> dict:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=20)
)
headers = {"Authorization": f"Bearer {self._api_key}"}
log.debug("tavily POST %s payload-keys=%s", url, list(payload))
async with self._session.post(url, json=payload, headers=headers) as resp:
# On non-2xx, capture the response body before raising so the log
# tells us *why* (auth, quota, bad query) — raise_for_status alone
# would only show the status code.
if resp.status >= 400:
body = await resp.text()
log.warning(
"tavily POST %s -> %d: %s", url, resp.status, body[:200]
)
resp.raise_for_status()
return await resp.json()
async def aclose(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
class WebSearchTool(Tool):
name = "web_search"
description = (
"Search the web. Returns titles, URLs and SHORT snippets — useful for "
"headlines, finding sources and orientation. Snippets are a few "
"sentences at most and are often truncated mid-thought. "
"If the user asked for any of: specific numbers, prices, dates, "
"percentages, technical specs, exact quotes, step-by-step "
"instructions, or 'current'/'latest' data — you MUST also call "
"fetch_url on the most relevant result before answering."
)
parameters = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query.",
},
"max_results": {
"type": "integer",
"description": (
f"How many results to return (1{_MAX_RESULTS}). "
f"Default {_DEFAULT_RESULTS}."
),
"minimum": 1,
"maximum": _MAX_RESULTS,
"default": _DEFAULT_RESULTS,
},
},
"required": ["query"],
}
def __init__(self, api_key: str) -> None:
self._http = _TavilyHTTP(api_key)
async def aclose(self) -> None:
await self._http.aclose()
async def run(self, query: str = "", max_results: int = _DEFAULT_RESULTS) -> str:
query = (query or "").strip()
if not query:
return "error: query is required"
# Defensive clamp — model may ignore the JSON-schema bounds.
try:
n = max(1, min(int(max_results), _MAX_RESULTS))
except (TypeError, ValueError):
n = _DEFAULT_RESULTS
data = await self._http.post(
_SEARCH_URL,
{
"query": query,
"max_results": n,
# "basic" is fast and cheap; "advanced" costs more credits but
# returns higher-quality snippets. Stick with basic for now.
"search_depth": "basic",
},
)
results = data.get("results") or []
if not results:
return "no results"
# Format as a numbered list — readable for the model and compact enough
# to fit comfortably in context alongside the rest of the conversation.
lines: list[str] = []
for i, r in enumerate(results, 1):
title = (r.get("title") or "").strip() or "(no title)"
url = r.get("url") or ""
snippet = (r.get("content") or "").strip()
lines.append(f"{i}. {title}{url}\n {snippet}")
return "\n".join(lines)
class FetchUrlTool(Tool):
name = "fetch_url"
description = (
"Fetch the full readable body text of a single web page. Call this "
"after web_search whenever the user asked for specifics — exact "
"numbers, prices, dates, quotes, technical details, or current data — "
"even if a snippet looks like it might answer. Snippets are short and "
"often misleading; the full page is the source of truth. Long pages "
"are truncated to keep context manageable."
)
parameters = {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Absolute URL of the page to fetch.",
},
},
"required": ["url"],
}
def __init__(self, api_key: str) -> None:
self._http = _TavilyHTTP(api_key)
async def aclose(self) -> None:
await self._http.aclose()
async def run(self, url: str = "") -> str:
url = (url or "").strip()
if not url:
return "error: url is required"
data = await self._http.post(
_EXTRACT_URL,
{"urls": [url], "extract_depth": "basic"},
)
results = data.get("results") or []
if not results:
# Tavily reports unreachable / blocked pages here instead of in
# ``results``. Surface the reason so the model can react.
failed = data.get("failed_results") or []
if failed:
reason = (failed[0] or {}).get("error", "unknown")
return f"error: extract failed ({reason})"
return "error: no content extracted"
body = (results[0].get("raw_content") or "").strip()
if not body:
return "error: page returned empty content"
if len(body) > _MAX_EXTRACT_CHARS:
body = body[:_MAX_EXTRACT_CHARS] + " …[truncated]"
return body
+126
View File
@@ -0,0 +1,126 @@
"""MeshCore-side transport: contact resolution and reliable chunked sending."""
from __future__ import annotations
import asyncio
import logging
import sqlite3
from collections import defaultdict
from meshcore import EventType, MeshCore
from . import db
from .messages import split_to_bytes
log = logging.getLogger("lorabot")
class MeshTransport:
"""Owns the MeshCore connection reference; handles contact resolution and
reliable chunked message delivery.
Create one instance per MeshCore connection and pass it to build_dm_handler.
"""
def __init__(
self,
mc: MeshCore,
db_conn: sqlite3.Connection,
ack_timeout: float = 0.0,
send_retries: int = 1,
) -> None:
self._mc = mc
self._db = db_conn
self._ack_timeout = ack_timeout
self._send_retries = send_retries
self._send_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# ------------------------------------------------------------------
# Contact management
# ------------------------------------------------------------------
def sync_contacts(self) -> None:
"""Bulk-upsert the device's current contact list into the DB.
Call once after the initial ensure_contacts() on connect.
"""
for contact in self._mc.contacts.values():
db.upsert_contact(self._db, contact)
log.info("synced %d contacts to DB", len(self._mc.contacts))
async def on_new_contact(self, event) -> None:
"""Subscriber for EventType.NEW_CONTACT — persists the contact to DB."""
contact = event.payload
db.upsert_contact(self._db, contact)
log.info("new contact stored: %s (%s)",
contact.get("adv_name", ""), contact["public_key"][:12])
def resolve_contact(self, prefix: str):
"""Look up a contact by pubkey prefix from the local DB."""
return db.get_contact_by_key_prefix(self._db, prefix)
# ------------------------------------------------------------------
# Chunked sending
# ------------------------------------------------------------------
async def send_chunked(
self,
contact,
text: str,
max_bytes: int,
max_chunks: int = 2,
) -> str:
"""Split ``text`` and send chunks in order, waiting for ACK before each next one.
Returns the concatenation of chunks that were successfully delivered.
"""
chunks = split_to_bytes(text, max_bytes, max_chunks=max_chunks)
pk = contact["public_key"]
pk_short = pk[:12]
planned_bytes = sum(len(c.encode("utf-8")) for c in chunks)
dropped = len(text.encode("utf-8")) - planned_bytes
if dropped > 0:
log.info("reply to %s split into %d chunks, dropped %d trailing bytes",
pk_short, len(chunks), dropped)
sent: list[str] = []
async with self._send_locks[pk]:
for i, chunk in enumerate(chunks, 1):
log.info("reply to %s (%d/%d, %d bytes): %s",
pk_short, i, len(chunks), len(chunk.encode("utf-8")), chunk)
if not await self._send_chunk(contact, chunk):
break
sent.append(chunk)
return "".join(sent)
async def _send_chunk(self, contact, chunk: str) -> bool:
"""Send one chunk and wait for the recipient ACK.
Delegates to ``send_msg_with_retry`` which correlates the ACK event by
``expected_ack`` code and retries up to ``max_attempts`` times. Returns
``None`` when no ACK arrives across all attempts.
Note on the library's default ``flood_after=2``: from the third attempt
onwards the library resets the path and re-sends as a flood broadcast.
This only kicks in when ``send_retries >= 2`` (max_attempts >= 3); with
the default ``send_retries=1`` we stay direct-only.
"""
pk_short = contact["public_key"][:12]
try:
result = await self._mc.commands.send_msg_with_retry(
contact, chunk,
max_attempts=self._send_retries + 1,
timeout=self._ack_timeout,
)
except Exception:
log.exception("send to %s raised", pk_short)
return False
if result is None:
log.error("no ACK from %s after %d attempts",
pk_short, self._send_retries + 1)
return False
if result.type == EventType.ERROR:
log.warning("send_msg error for %s: %s", pk_short, result.payload)
return False
return True
+721
View File
@@ -0,0 +1,721 @@
"""Lightweight aiohttp web UI for browsing stored conversations and live status.
Endpoints:
GET / single-page UI
GET /api/status device + model + uptime snapshot
GET /api/conversations list of conversations with message counts
GET /api/conversations/{pk} conversation header + ordered messages
GET /api/events Server-Sent Events stream (status, message)
"""
from __future__ import annotations
import asyncio
import json
import logging
import sqlite3
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from aiohttp import web
AdvertCallback = Callable[[], Awaitable[bool]]
log = logging.getLogger("lorabot.web")
class AppState:
"""Shared state between the bot loop and the web server."""
def __init__(
self,
db_conn: sqlite3.Connection,
*,
model: str,
serial_port: str,
loop: asyncio.AbstractEventLoop | None = None,
) -> None:
self.db = db_conn
self.model = model
self.serial_port = serial_port
self.connected = False
self.connected_since: datetime | None = None
self.node_name: str | None = None
self.last_advert_at: datetime | None = None
self.last_advert_ok: bool | None = None
self._subscribers: set[asyncio.Queue] = set()
self._loop = loop
self._advertise: AdvertCallback | None = None
self._advert_lock = asyncio.Lock()
def status(self) -> dict:
return {
"connected": self.connected,
"connected_since": _iso(self.connected_since),
"node_name": self.node_name,
"model": self.model,
"serial_port": self.serial_port,
"last_advert_at": _iso(self.last_advert_at),
"last_advert_ok": self.last_advert_ok,
"advertise_available": self._advertise is not None,
"contact_count": self.db.execute("SELECT COUNT(*) FROM contacts").fetchone()[0],
}
def set_advertise_callback(self, cb: AdvertCallback | None) -> None:
self._advertise = cb
self.publish("status", self.status())
async def trigger_advertise(self) -> bool:
"""Run the injected advertise callback. Serialized; never raises."""
if self._advertise is None:
return False
async with self._advert_lock:
try:
ok = bool(await self._advertise())
except Exception:
log.exception("advertise callback raised")
ok = False
self.last_advert_at = datetime.now(timezone.utc)
self.last_advert_ok = ok
self.publish("status", self.status())
return ok
def set_connected(self, ok: bool, *, node_name: str | None = None) -> None:
if ok and not self.connected:
self.connected_since = datetime.now(timezone.utc)
if not ok:
self.connected_since = None
self.connected = ok
if node_name is not None:
self.node_name = node_name
self.publish("status", self.status())
def publish(self, event: str, data: dict) -> None:
"""Thread-safe: callable from any thread; dispatch hops to the loop."""
loop = self._loop
if loop is None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return # no loop, nothing to deliver to
self._loop = loop
if loop.is_closed():
return
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None
if running is loop:
self._dispatch(event, data)
else:
loop.call_soon_threadsafe(self._dispatch, event, data)
def _dispatch(self, event: str, data: dict) -> None:
for q in list(self._subscribers):
try:
q.put_nowait((event, data))
except asyncio.QueueFull:
log.warning("dropping SSE event %s — subscriber backed up", event)
def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=128)
self._subscribers.add(q)
return q
def unsubscribe(self, q: asyncio.Queue) -> None:
self._subscribers.discard(q)
def _iso(dt: datetime | None) -> str | None:
return dt.isoformat() if dt is not None else None
def _list_conversations(db: sqlite3.Connection) -> list[dict]:
rows = db.execute(
"""
SELECT c.public_key,
c.contact_name,
c.updated_at,
(SELECT COUNT(*) FROM messages m WHERE m.public_key = c.public_key) AS message_count,
(SELECT content FROM messages m WHERE m.public_key = c.public_key
ORDER BY m.id DESC LIMIT 1) AS last_message
FROM conversations c
ORDER BY c.updated_at DESC
"""
).fetchall()
return [dict(r) for r in rows]
def _list_contacts(db: sqlite3.Connection) -> list[dict]:
rows = db.execute(
"SELECT public_key, adv_name, seen_at FROM contacts ORDER BY adv_name ASC"
).fetchall()
return [dict(r) for r in rows]
def _get_conversation(db: sqlite3.Connection, public_key: str) -> dict | None:
conv = db.execute(
"SELECT public_key, contact_name, cleared_at_id, created_at, updated_at "
"FROM conversations WHERE public_key = ?",
(public_key,),
).fetchone()
if conv is None:
return None
msgs = db.execute(
"SELECT id, role, content, created_at FROM messages "
"WHERE public_key = ? ORDER BY id ASC",
(public_key,),
).fetchall()
return {"conversation": dict(conv), "messages": [dict(m) for m in msgs]}
async def _index(_req: web.Request) -> web.Response:
return web.Response(text=INDEX_HTML, content_type="text/html")
async def _api_status(req: web.Request) -> web.Response:
return web.json_response(_state(req).status())
async def _api_conversations(req: web.Request) -> web.Response:
return web.json_response(_list_conversations(_state(req).db))
async def _api_contacts(req: web.Request) -> web.Response:
return web.json_response(_list_contacts(_state(req).db))
def _require_same_origin(req: web.Request) -> None:
"""Reject obvious cross-site browser requests on state-changing endpoints.
``Sec-Fetch-Site`` is set by all current browsers on every request. ``same-origin``
is the only value we accept. Non-browser clients (curl, scripts) don't send the
header at all — those pass through, since they're not the CSRF threat model.
"""
site = req.headers.get("Sec-Fetch-Site")
if site is not None and site != "same-origin":
raise web.HTTPForbidden(text=f"cross-origin request rejected (Sec-Fetch-Site: {site})")
async def _api_advertise(req: web.Request) -> web.Response:
_require_same_origin(req)
state = _state(req)
if state._advertise is None:
raise web.HTTPServiceUnavailable(text="device not connected")
ok = await state.trigger_advertise()
return web.json_response({
"ok": ok,
"last_advert_at": _iso(state.last_advert_at),
})
async def _api_conversation(req: web.Request) -> web.Response:
pk = req.match_info["pk"]
data = _get_conversation(_state(req).db, pk)
if data is None:
raise web.HTTPNotFound(text="unknown public_key")
return web.json_response(data)
async def _api_events(req: web.Request) -> web.StreamResponse:
state = _state(req)
resp = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
await resp.prepare(req)
await _sse_send(resp, "status", state.status())
q = state.subscribe()
try:
while True:
try:
event, data = await asyncio.wait_for(q.get(), timeout=15.0)
except asyncio.TimeoutError:
await resp.write(b": keepalive\n\n")
continue
await _sse_send(resp, event, data)
except (asyncio.CancelledError, ConnectionError):
pass
finally:
state.unsubscribe(q)
return resp
async def _sse_send(resp: web.StreamResponse, event: str, data: dict) -> None:
await resp.write(f"event: {event}\ndata: {json.dumps(data)}\n\n".encode())
def _state(req: web.Request) -> AppState:
return req.app["state"]
def build_app(state: AppState) -> web.Application:
app = web.Application()
app["state"] = state
app.router.add_get("/", _index)
app.router.add_get("/api/status", _api_status)
app.router.add_get("/api/conversations", _api_conversations)
app.router.add_get("/api/contacts", _api_contacts)
app.router.add_get("/api/conversations/{pk}", _api_conversation)
app.router.add_get("/api/events", _api_events)
app.router.add_post("/api/advertise", _api_advertise)
return app
async def serve(state: AppState, host: str, port: int) -> None:
"""Run the web server until cancelled."""
state._loop = asyncio.get_running_loop()
app = build_app(state)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
log.info("web UI on http://%s:%d", host, port)
try:
await asyncio.Event().wait()
finally:
await runner.cleanup()
INDEX_HTML = r"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>lorabot</title>
<style>
:root {
--bg: #0a0e0a;
--bg-alt: #0e140e;
--fg: #b6f5b6;
--dim: #4a7a4a;
--accent: #7fff7f;
--warn: #ffaa33;
--err: #ff5f5f;
--line: #1a2a1a;
}
* { box-sizing: border-box; }
html, body { height: 100%; margin: 0; }
body {
background: var(--bg);
color: var(--fg);
font: 13px/1.45 ui-monospace, "JetBrains Mono", "Fira Mono", "Cascadia Code", Menlo, Consolas, monospace;
display: grid;
grid-template-rows: auto 1fr auto;
height: 100vh;
overflow: hidden;
}
header, footer {
padding: 6px 14px;
border-bottom: 1px solid var(--line);
display: flex;
justify-content: space-between;
align-items: center;
gap: 12px;
flex-wrap: wrap;
}
footer { border-top: 1px solid var(--line); border-bottom: none; color: var(--dim); font-size: 12px; }
.title { color: var(--accent); letter-spacing: 0.5px; }
.title::before { content: ""; margin-right: 6px; }
.status-line { display: flex; gap: 16px; align-items: center; flex-wrap: wrap; color: var(--dim); }
.status-line .v { color: var(--fg); }
.dot { width: 8px; height: 8px; border-radius: 50%; background: var(--err); display: inline-block; vertical-align: middle; margin-right: 6px; }
.dot.ok { background: var(--accent); box-shadow: 0 0 6px var(--accent); }
main { display: grid; grid-template-columns: 280px 1fr; min-height: 0; }
aside {
border-right: 1px solid var(--line);
overflow-y: auto;
background: var(--bg-alt);
}
.tabs {
display: flex;
border-bottom: 1px dashed var(--line);
}
.tab-btn {
background: transparent;
color: var(--dim);
border: none;
border-right: 1px solid var(--line);
font: inherit;
font-size: 12px;
padding: 7px 12px;
cursor: pointer;
flex: 1;
text-align: left;
letter-spacing: 0.5px;
text-transform: lowercase;
}
.tab-btn:last-child { border-right: none; }
.tab-btn.active { color: var(--accent); }
.tab-btn:hover:not(.active) { color: var(--fg); }
.contact {
padding: 8px 14px;
border-bottom: 1px solid var(--line);
}
.contact .cname { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
.contact .cname::before { content: "@ "; color: var(--dim); }
.contact .cdetail { color: var(--dim); font-size: 11px; display: flex; justify-content: space-between; gap: 8px; margin-top: 2px; }
.conv {
padding: 8px 14px;
cursor: pointer;
border-bottom: 1px solid var(--line);
display: flex;
flex-direction: column;
gap: 2px;
}
.conv:hover { background: #102010; }
.conv.active { background: #14241a; color: var(--accent); }
.conv .row { display: flex; justify-content: space-between; align-items: baseline; gap: 8px; }
.conv .name { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; flex: 1; }
.conv .name::before { content: "> "; color: var(--dim); }
.conv.active .name::before { color: var(--accent); }
.conv .count { color: var(--dim); }
.conv .pk { color: var(--dim); font-size: 11px; }
.conv .preview { color: var(--dim); font-size: 11px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
section { display: flex; flex-direction: column; min-height: 0; }
.thread-head {
padding: 8px 16px;
color: var(--dim);
border-bottom: 1px dashed var(--line);
display: flex;
justify-content: space-between;
gap: 12px;
}
.thread-head .who { color: var(--accent); }
.thread { flex: 1; overflow-y: auto; padding: 12px 16px; }
.msg { margin: 10px 0; }
.msg .head { color: var(--dim); font-size: 11px; margin-bottom: 2px; }
.msg .body { white-space: pre-wrap; word-wrap: break-word; }
.msg.user .head .role { color: var(--accent); }
.msg.assistant .head .role { color: var(--warn); }
.empty { color: var(--dim); padding: 24px 14px; text-align: center; }
.divider {
color: var(--dim);
text-align: center;
margin: 14px 0;
letter-spacing: 2px;
font-size: 11px;
}
.divider::before, .divider::after { content: "─── "; }
.divider::after { content: " ───"; }
::-webkit-scrollbar { width: 8px; height: 8px; }
::-webkit-scrollbar-thumb { background: var(--line); }
::-webkit-scrollbar-thumb:hover { background: #2a4a2a; }
::selection { background: var(--accent); color: var(--bg); }
.btn {
background: transparent;
color: var(--accent);
border: 1px solid var(--line);
font: inherit;
padding: 2px 8px;
cursor: pointer;
letter-spacing: 0.5px;
}
.btn:hover:not(:disabled) { background: #14241a; border-color: var(--accent); }
.btn:active:not(:disabled) { background: #1a3020; }
.btn:disabled { color: var(--dim); cursor: not-allowed; }
.btn.busy { color: var(--warn); border-color: var(--warn); }
.fail { color: var(--err); }
.cursor::after { content: ""; color: var(--accent); animation: blink 1s steps(2) infinite; margin-left: 2px; }
@keyframes blink { 50% { opacity: 0; } }
@media (max-width: 720px) {
main { grid-template-columns: 1fr; grid-template-rows: 200px 1fr; }
aside { border-right: none; border-bottom: 1px solid var(--line); }
}
</style>
</head>
<body>
<header>
<span class="title">lorabot</span>
<span class="status-line">
<span><span class="dot" id="dot"></span><span id="conn">…</span></span>
<span>port: <span class="v" id="port">—</span></span>
<span>model: <span class="v" id="model">—</span></span>
<span>node: <span class="v" id="node">—</span></span>
<span>advert: <span class="v" id="advert">—</span></span>
<button id="advert-btn" class="btn" type="button" disabled>[ advertise ]</button>
</span>
</header>
<main>
<aside>
<div class="tabs">
<button class="tab-btn active" id="tab-convs" type="button">conversations</button>
<button class="tab-btn" id="tab-contacts" type="button">contacts (<span id="contact-count">—</span>)</button>
</div>
<div id="sidebar"><div class="empty">none yet<span class="cursor"></span></div></div>
</aside>
<section>
<div class="thread-head"><span id="who">— no conversation selected —</span><span id="pk"></span></div>
<div id="thread" class="thread"><div class="empty">select a conversation on the left</div></div>
</section>
</main>
<footer>
<span id="footinfo">stream: connecting<span class="cursor"></span></span>
<span id="uptime"></span>
</footer>
<script>
"use strict";
const $ = (id) => document.getElementById(id);
let conversations = [];
let contacts = [];
let selectedKey = null;
let connectedSince = null;
let activeTab = "convs";
function escapeHTML(s) {
return (s == null ? "" : String(s)).replace(/[&<>"']/g, (c) => ({
"&": "&amp;", "<": "&lt;", ">": "&gt;", '"': "&quot;", "'": "&#39;"
})[c]);
}
function parseTs(iso) {
if (!iso) return null;
// SQLite "YYYY-MM-DD HH:MM:SS" is UTC; ISO with offset is also fine.
const s = iso.includes("T") ? iso : iso.replace(" ", "T") + "Z";
const d = new Date(s);
return isNaN(d.getTime()) ? null : d;
}
function fmtTime(iso) {
const d = parseTs(iso);
return d ? d.toLocaleString(undefined, { hour12: false }) : "";
}
function fmtUptime(since) {
const d = parseTs(since);
if (!d) return "";
let s = Math.max(0, Math.floor((Date.now() - d.getTime()) / 1000));
const days = Math.floor(s / 86400); s -= days * 86400;
const hrs = Math.floor(s / 3600); s -= hrs * 3600;
const mins = Math.floor(s / 60); s -= mins * 60;
const pad = (n) => String(n).padStart(2, "0");
return (days ? days + "d " : "") + `${pad(hrs)}:${pad(mins)}:${pad(s)}`;
}
async function fetchJSON(url) {
const r = await fetch(url);
if (!r.ok) throw new Error(`${url}: ${r.status}`);
return r.json();
}
function renderSidebar() {
const el = $("sidebar");
if (!conversations.length) {
el.innerHTML = '<div class="empty">none yet</div>';
return;
}
el.innerHTML = conversations.map((c) => `
<div class="conv${c.public_key === selectedKey ? " active" : ""}" data-pk="${escapeHTML(c.public_key)}">
<div class="row">
<span class="name">${escapeHTML(c.contact_name || "?")}</span>
<span class="count">[${c.message_count}]</span>
</div>
<div class="pk">${escapeHTML(c.public_key.slice(0, 16))}…</div>
<div class="preview">${escapeHTML((c.last_message || "").slice(0, 80))}</div>
</div>`).join("");
el.querySelectorAll(".conv").forEach((n) => {
n.addEventListener("click", () => select(n.dataset.pk));
});
}
function setHead(conv) {
if (!conv) {
$("who").textContent = "— no conversation selected —";
$("pk").textContent = "";
return;
}
$("who").innerHTML = `<span class="who">${escapeHTML(conv.contact_name || "?")}</span>`;
$("pk").textContent = conv.public_key;
}
function renderThread(messages, clearedAt) {
const el = $("thread");
if (!messages.length) {
el.innerHTML = '<div class="empty">empty conversation</div>';
return;
}
const parts = [];
let dividerEmitted = false;
for (const m of messages) {
if (!dividerEmitted && clearedAt && m.id > clearedAt) {
parts.push('<div class="divider">history cleared</div>');
dividerEmitted = true;
}
parts.push(renderMsg(m));
}
el.innerHTML = parts.join("");
el.scrollTop = el.scrollHeight;
}
function renderMsg(m) {
const arrow = m.role === "user" ? "&lt;" : "&gt;";
return `
<div class="msg ${m.role}">
<div class="head"><span class="role">${arrow} ${m.role}</span> · ${escapeHTML(fmtTime(m.created_at))}</div>
<div class="body">${escapeHTML(m.content)}</div>
</div>`;
}
let clearedAtByKey = {};
async function select(pk) {
selectedKey = pk;
renderSidebar();
try {
const data = await fetchJSON(`/api/conversations/${encodeURIComponent(pk)}`);
setHead(data.conversation);
clearedAtByKey[pk] = data.conversation.cleared_at_id || 0;
renderThread(data.messages, clearedAtByKey[pk]);
} catch (e) {
$("thread").innerHTML = `<div class="empty">load failed: ${escapeHTML(e.message)}</div>`;
}
}
function appendMessage(m) {
const el = $("thread");
const empty = el.querySelector(".empty");
if (empty) el.innerHTML = "";
const wasAtBottom = el.scrollTop + el.clientHeight >= el.scrollHeight - 30;
el.insertAdjacentHTML("beforeend", renderMsg(m));
if (wasAtBottom) el.scrollTop = el.scrollHeight;
}
function setStatus(s) {
const ok = !!s.connected;
$("dot").className = "dot" + (ok ? " ok" : "");
$("conn").textContent = ok ? "connected" : "disconnected";
$("port").textContent = s.serial_port || "";
$("model").textContent = s.model || "";
$("node").textContent = s.node_name || "";
const ad = $("advert");
if (s.last_advert_at) {
ad.textContent = fmtTime(s.last_advert_at) + (s.last_advert_ok === false ? " (failed)" : "");
ad.className = "v" + (s.last_advert_ok === false ? " fail" : "");
} else {
ad.textContent = "";
ad.className = "v";
}
const btn = $("advert-btn");
btn.disabled = !s.advertise_available;
connectedSince = s.connected_since;
if (s.contact_count != null) $("contact-count").textContent = s.contact_count;
tickUptime();
}
async function sendAdvert() {
const btn = $("advert-btn");
if (btn.disabled) return;
btn.disabled = true;
btn.classList.add("busy");
btn.textContent = "[ advertising… ]";
try {
const r = await fetch("/api/advertise", { method: "POST" });
if (!r.ok) throw new Error("HTTP " + r.status);
} catch (e) {
$("footinfo").textContent = "advert failed: " + e.message;
} finally {
btn.classList.remove("busy");
btn.textContent = "[ advertise ]";
// re-enable on next status event; if SSE is dead, give it back manually
setTimeout(() => { btn.disabled = false; }, 1500);
}
}
function tickUptime() {
$("uptime").textContent = connectedSince ? "uptime " + fmtUptime(connectedSince) : "";
}
setInterval(tickUptime, 1000);
function renderContacts() {
const el = $("sidebar");
if (!contacts.length) {
el.innerHTML = '<div class="empty">no contacts yet</div>';
return;
}
el.innerHTML = contacts.map((c) => `
<div class="contact">
<div class="cname">${escapeHTML(c.adv_name || "?")}</div>
<div class="cdetail">
<span>${escapeHTML(c.public_key.slice(0, 16))}…</span>
<span>${escapeHTML(fmtTime(c.seen_at))}</span>
</div>
</div>`).join("");
}
function setTab(tab) {
activeTab = tab;
$("tab-convs").classList.toggle("active", tab === "convs");
$("tab-contacts").classList.toggle("active", tab === "contacts");
if (tab === "convs") renderSidebar();
else refreshContacts();
}
async function refreshContacts() {
try {
contacts = await fetchJSON("/api/contacts");
$("contact-count").textContent = contacts.length;
} catch (e) { /* silently ignore */ }
if (activeTab === "contacts") renderContacts();
}
async function refreshList() {
conversations = await fetchJSON("/api/conversations");
renderSidebar();
}
function bumpConv(m) {
let conv = conversations.find((c) => c.public_key === m.public_key);
if (!conv) {
conv = {
public_key: m.public_key,
contact_name: m.contact_name || "?",
message_count: 0,
updated_at: m.created_at,
last_message: m.content,
};
conversations.unshift(conv);
} else {
conv.message_count += 1;
conv.contact_name = m.contact_name || conv.contact_name;
conv.last_message = m.content;
conv.updated_at = m.created_at;
conversations = [conv, ...conversations.filter((c) => c !== conv)];
}
renderSidebar();
}
function startStream() {
const es = new EventSource("/api/events");
es.addEventListener("open", () => { $("footinfo").textContent = "stream: live"; });
es.addEventListener("error", () => { $("footinfo").textContent = "stream: reconnecting…"; });
es.addEventListener("status", (e) => setStatus(JSON.parse(e.data)));
es.addEventListener("message", (e) => {
const m = JSON.parse(e.data);
bumpConv(m);
if (m.public_key === selectedKey) appendMessage(m);
});
return es;
}
(async function init() {
$("advert-btn").addEventListener("click", sendAdvert);
$("tab-convs").addEventListener("click", () => setTab("convs"));
$("tab-contacts").addEventListener("click", () => setTab("contacts"));
try {
setStatus(await fetchJSON("/api/status"));
await refreshList();
} catch (e) {
$("footinfo").textContent = "init failed: " + e.message;
}
startStream();
})();
</script>
</body>
</html>
"""
-3
View File
@@ -1,3 +0,0 @@
"""meshbot — MeshCore ↔ LLM bridge."""
__version__ = "0.1.0"
-23
View File
@@ -1,23 +0,0 @@
"""Entry point: ``python -m meshbot`` and the ``meshbot`` console script."""
from __future__ import annotations
import asyncio
import logging
from .bot import run
def _cli() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s: %(message)s",
)
try:
asyncio.run(run())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
_cli()
-86
View File
@@ -1,86 +0,0 @@
"""Main run loop: connect to the MeshCore device, route DMs through the LLM, reply."""
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict
from meshcore import EventType, MeshCore
from . import db
from .config import Settings
from .llm import LLMClient
from .messages import trim_to_bytes
log = logging.getLogger("meshbot")
async def run() -> None:
cfg = Settings()
db_conn = db.connect(cfg.storage.sqlite_path)
llm = LLMClient(
base_url=cfg.llm.base_url,
api_key=cfg.llm.api_key,
model=cfg.llm.model,
system_prompt=cfg.llm.system_prompt,
temperature=cfg.llm.temperature,
timeout=cfg.llm.request_timeout_seconds,
)
log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
await mc.ensure_contacts()
# One lock per sender so a burst of messages from the same peer is processed
# serially while different peers stay independent.
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def on_dm(event) -> None:
data = event.payload or {}
prefix = data.get("pubkey_prefix")
text = (data.get("text") or "").strip()
if not prefix or not text:
return
contact = mc.get_contact_by_key_prefix(prefix)
if contact is None:
log.info("ignoring DM from unknown sender %s", prefix)
return
public_key = contact["public_key"]
contact_name = contact.get("adv_name", "")
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
async with locks[public_key]:
db.upsert_conversation(db_conn, public_key, contact_name)
db.add_message(db_conn, public_key, "user", text)
history = db.get_history(db_conn, public_key)
try:
reply = await llm.reply(history)
except Exception:
log.exception("LLM call failed for %s", public_key[:12])
return
db.add_message(db_conn, public_key, "assistant", reply)
outgoing = trim_to_bytes(reply, cfg.message.max_bytes)
log.info("reply to %s (%d bytes): %s", public_key[:12], len(outgoing.encode("utf-8")), outgoing)
result = await mc.commands.send_msg(contact, outgoing)
if result.type == EventType.ERROR:
log.error("send_msg failed for %s: %s", public_key[:12], result.payload)
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
await mc.start_auto_message_fetching()
log.info("meshbot listening on %s", cfg.meshcore.serial_port)
try:
await asyncio.Event().wait()
finally:
mc.unsubscribe(sub)
await mc.stop_auto_message_fetching()
await mc.disconnect()
await llm.aclose()
db_conn.close()
-74
View File
@@ -1,74 +0,0 @@
"""Application settings loaded from TOML with env-var overrides."""
from __future__ import annotations
import os
from pathlib import Path
from pydantic import BaseModel, Field
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
TomlConfigSettingsSource,
)
class MeshCoreCfg(BaseModel):
serial_port: str
baud_rate: int = 115200
class LLMCfg(BaseModel):
base_url: str
api_key: str = "not-needed"
model: str
system_prompt: str = (
"You are a concise assistant on a low-bandwidth mesh radio. "
"Replies must be brief — under 180 bytes."
)
temperature: float = 0.7
request_timeout_seconds: float = 60.0
class StorageCfg(BaseModel):
sqlite_path: Path = Path("data/meshbot.db")
class MessageCfg(BaseModel):
max_bytes: int = Field(default=184, gt=0)
def _toml_path() -> Path:
return Path(os.environ.get("MESHBOT_CONFIG", "config.toml"))
class Settings(BaseSettings):
meshcore: MeshCoreCfg
llm: LLMCfg
storage: StorageCfg = StorageCfg()
message: MessageCfg = MessageCfg()
model_config = SettingsConfigDict(
env_prefix="MESHBOT_",
env_nested_delimiter="__",
toml_file=_toml_path(),
extra="ignore",
)
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
# Order = priority (highest first): init args > env > TOML > secrets.
return (
init_settings,
env_settings,
TomlConfigSettingsSource(settings_cls),
file_secret_settings,
)
-67
View File
@@ -1,67 +0,0 @@
"""SQLite persistence for per-sender conversation history."""
from __future__ import annotations
import sqlite3
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS conversations (
public_key TEXT PRIMARY KEY,
contact_name TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL REFERENCES conversations(public_key),
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id
ON messages(public_key, id);
"""
def connect(path: str | Path) -> sqlite3.Connection:
"""Open the SQLite DB, ensure the parent dir and schema exist, return the connection."""
db_path = Path(path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path, isolation_level=None) # autocommit
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA foreign_keys = ON;")
conn.executescript(SCHEMA)
return conn
def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None:
conn.execute(
"""
INSERT INTO conversations (public_key, contact_name)
VALUES (?, ?)
ON CONFLICT(public_key) DO UPDATE SET
contact_name = excluded.contact_name,
updated_at = datetime('now')
""",
(public_key, contact_name),
)
def add_message(conn: sqlite3.Connection, public_key: str, role: str, content: str) -> None:
conn.execute(
"INSERT INTO messages (public_key, role, content) VALUES (?, ?, ?)",
(public_key, role, content),
)
def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]:
"""Return the conversation as OpenAI chat messages, oldest first."""
rows = conn.execute(
"SELECT role, content FROM messages WHERE public_key = ? ORDER BY id ASC",
(public_key,),
).fetchall()
return [{"role": row["role"], "content": row["content"]} for row in rows]
-38
View File
@@ -1,38 +0,0 @@
"""Thin wrapper around the OpenAI Python SDK aimed at OpenAI-compatible servers."""
from __future__ import annotations
from openai import AsyncOpenAI
class LLMClient:
def __init__(
self,
*,
base_url: str,
api_key: str,
model: str,
system_prompt: str,
temperature: float,
timeout: float,
) -> None:
self._client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=timeout)
self._model = model
self._system_prompt = system_prompt
self._temperature = temperature
async def reply(self, history: list[dict[str, str]]) -> str:
"""Send the system prompt + ``history`` and return the assistant's text."""
messages: list[dict[str, str]] = [
{"role": "system", "content": self._system_prompt},
*history,
]
resp = await self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=self._temperature,
)
return (resp.choices[0].message.content or "").strip()
async def aclose(self) -> None:
await self._client.close()
-21
View File
@@ -1,21 +0,0 @@
"""Helpers for shaping outgoing mesh messages."""
from __future__ import annotations
def trim_to_bytes(text: str, max_bytes: int) -> str:
"""Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes.
Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit
invalid UTF-8 to the radio.
"""
if max_bytes <= 0:
return ""
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
cut = encoded[:max_bytes]
# Continuation bytes start with bits 10xxxxxx; rewind past them.
while cut and (cut[-1] & 0xC0) == 0x80:
cut = cut[:-1]
return cut.decode("utf-8", errors="ignore")
+59 -1
View File
@@ -1,4 +1,4 @@
from meshbot.messages import trim_to_bytes
from lorabot.messages import split_to_bytes, trim_to_bytes
def test_short_ascii_passthrough():
@@ -38,3 +38,61 @@ def test_zero_or_negative_max_bytes():
def test_empty_input():
assert trim_to_bytes("", 184) == ""
# split_to_bytes
def test_split_short_input_single_chunk():
assert split_to_bytes("hello", 184) == ["hello"]
def test_split_long_input_two_chunks_drops_rest():
s = "x" * 500
chunks = split_to_bytes(s, 180, max_chunks=2)
assert chunks == ["x" * 180, "x" * 180]
assert sum(len(c.encode("utf-8")) for c in chunks) == 360
def test_split_exact_two_chunks_no_third():
s = "x" * 360
chunks = split_to_bytes(s, 180, max_chunks=2)
assert chunks == ["x" * 180, "x" * 180]
def test_split_does_not_break_multibyte():
# 4 emoji × 4 bytes = 16 bytes total. Budget 5 bytes/chunk → 1 emoji per chunk.
chunks = split_to_bytes("🎉🎉🎉🎉", 5, max_chunks=2)
assert chunks == ["🎉", "🎉"]
for c in chunks:
assert len(c.encode("utf-8")) == 4
def test_split_two_byte_char_at_boundary():
# "abäcd" → bytes: a b ä(2) c d = 6 bytes. Budget 3/chunk:
# chunk1 must end at "ab" (3rd byte is start of ä, can't include without continuation).
# chunk2: "äc" = 3 bytes.
chunks = split_to_bytes("abäcd", 3, max_chunks=2)
assert chunks[0] == "ab"
assert chunks[1] == "äc"
# "d" is dropped (over the budget).
def test_split_empty_input():
assert split_to_bytes("", 184) == []
def test_split_zero_max_bytes():
assert split_to_bytes("hi", 0) == []
def test_split_zero_chunks():
assert split_to_bytes("hi", 184, max_chunks=0) == []
def test_split_concat_is_prefix_of_input():
# The delivered text must always be a prefix of the original (no rearrangement).
src = "Hello world! 🎉 This is a longer message that should be split."
chunks = split_to_bytes(src, 20, max_chunks=2)
delivered = "".join(chunks)
assert src.startswith(delivered)