Initial commit
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
"""meshbot — MeshCore ↔ LLM bridge."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,23 @@
|
||||
"""Entry point: ``python -m meshbot`` and the ``meshbot`` console script."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from .bot import run
|
||||
|
||||
|
||||
def _cli() -> None:
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(name)s: %(message)s",
|
||||
)
|
||||
try:
|
||||
asyncio.run(run())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_cli()
|
||||
@@ -0,0 +1,86 @@
|
||||
"""Main run loop: connect to the MeshCore device, route DMs through the LLM, reply."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from meshcore import EventType, MeshCore
|
||||
|
||||
from . import db
|
||||
from .config import Settings
|
||||
from .llm import LLMClient
|
||||
from .messages import trim_to_bytes
|
||||
|
||||
log = logging.getLogger("meshbot")
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
cfg = Settings()
|
||||
|
||||
db_conn = db.connect(cfg.storage.sqlite_path)
|
||||
llm = LLMClient(
|
||||
base_url=cfg.llm.base_url,
|
||||
api_key=cfg.llm.api_key,
|
||||
model=cfg.llm.model,
|
||||
system_prompt=cfg.llm.system_prompt,
|
||||
temperature=cfg.llm.temperature,
|
||||
timeout=cfg.llm.request_timeout_seconds,
|
||||
)
|
||||
|
||||
log.info("connecting to MeshCore on %s @ %d baud", cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
|
||||
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
|
||||
await mc.ensure_contacts()
|
||||
|
||||
# One lock per sender so a burst of messages from the same peer is processed
|
||||
# serially while different peers stay independent.
|
||||
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
|
||||
|
||||
async def on_dm(event) -> None:
|
||||
data = event.payload or {}
|
||||
prefix = data.get("pubkey_prefix")
|
||||
text = (data.get("text") or "").strip()
|
||||
if not prefix or not text:
|
||||
return
|
||||
|
||||
contact = mc.get_contact_by_key_prefix(prefix)
|
||||
if contact is None:
|
||||
log.info("ignoring DM from unknown sender %s", prefix)
|
||||
return
|
||||
|
||||
public_key = contact["public_key"]
|
||||
contact_name = contact.get("adv_name", "")
|
||||
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
|
||||
|
||||
async with locks[public_key]:
|
||||
db.upsert_conversation(db_conn, public_key, contact_name)
|
||||
db.add_message(db_conn, public_key, "user", text)
|
||||
history = db.get_history(db_conn, public_key)
|
||||
|
||||
try:
|
||||
reply = await llm.reply(history)
|
||||
except Exception:
|
||||
log.exception("LLM call failed for %s", public_key[:12])
|
||||
return
|
||||
|
||||
db.add_message(db_conn, public_key, "assistant", reply)
|
||||
outgoing = trim_to_bytes(reply, cfg.message.max_bytes)
|
||||
log.info("reply to %s (%d bytes): %s", public_key[:12], len(outgoing.encode("utf-8")), outgoing)
|
||||
|
||||
result = await mc.commands.send_msg(contact, outgoing)
|
||||
if result.type == EventType.ERROR:
|
||||
log.error("send_msg failed for %s: %s", public_key[:12], result.payload)
|
||||
|
||||
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
|
||||
await mc.start_auto_message_fetching()
|
||||
log.info("meshbot listening on %s", cfg.meshcore.serial_port)
|
||||
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
finally:
|
||||
mc.unsubscribe(sub)
|
||||
await mc.stop_auto_message_fetching()
|
||||
await mc.disconnect()
|
||||
await llm.aclose()
|
||||
db_conn.close()
|
||||
@@ -0,0 +1,74 @@
|
||||
"""Application settings loaded from TOML with env-var overrides."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic_settings import (
|
||||
BaseSettings,
|
||||
PydanticBaseSettingsSource,
|
||||
SettingsConfigDict,
|
||||
TomlConfigSettingsSource,
|
||||
)
|
||||
|
||||
|
||||
class MeshCoreCfg(BaseModel):
|
||||
serial_port: str
|
||||
baud_rate: int = 115200
|
||||
|
||||
|
||||
class LLMCfg(BaseModel):
|
||||
base_url: str
|
||||
api_key: str = "not-needed"
|
||||
model: str
|
||||
system_prompt: str = (
|
||||
"You are a concise assistant on a low-bandwidth mesh radio. "
|
||||
"Replies must be brief — under 180 bytes."
|
||||
)
|
||||
temperature: float = 0.7
|
||||
request_timeout_seconds: float = 60.0
|
||||
|
||||
|
||||
class StorageCfg(BaseModel):
|
||||
sqlite_path: Path = Path("data/meshbot.db")
|
||||
|
||||
|
||||
class MessageCfg(BaseModel):
|
||||
max_bytes: int = Field(default=184, gt=0)
|
||||
|
||||
|
||||
def _toml_path() -> Path:
|
||||
return Path(os.environ.get("MESHBOT_CONFIG", "config.toml"))
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
meshcore: MeshCoreCfg
|
||||
llm: LLMCfg
|
||||
storage: StorageCfg = StorageCfg()
|
||||
message: MessageCfg = MessageCfg()
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="MESHBOT_",
|
||||
env_nested_delimiter="__",
|
||||
toml_file=_toml_path(),
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: type[BaseSettings],
|
||||
init_settings: PydanticBaseSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||
# Order = priority (highest first): init args > env > TOML > secrets.
|
||||
return (
|
||||
init_settings,
|
||||
env_settings,
|
||||
TomlConfigSettingsSource(settings_cls),
|
||||
file_secret_settings,
|
||||
)
|
||||
@@ -0,0 +1,67 @@
|
||||
"""SQLite persistence for per-sender conversation history."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS conversations (
|
||||
public_key TEXT PRIMARY KEY,
|
||||
contact_name TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
public_key TEXT NOT NULL REFERENCES conversations(public_key),
|
||||
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
|
||||
content TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_pubkey_id
|
||||
ON messages(public_key, id);
|
||||
"""
|
||||
|
||||
|
||||
def connect(path: str | Path) -> sqlite3.Connection:
|
||||
"""Open the SQLite DB, ensure the parent dir and schema exist, return the connection."""
|
||||
db_path = Path(path)
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(db_path, isolation_level=None) # autocommit
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode = WAL;")
|
||||
conn.execute("PRAGMA foreign_keys = ON;")
|
||||
conn.executescript(SCHEMA)
|
||||
return conn
|
||||
|
||||
|
||||
def upsert_conversation(conn: sqlite3.Connection, public_key: str, contact_name: str) -> None:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO conversations (public_key, contact_name)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT(public_key) DO UPDATE SET
|
||||
contact_name = excluded.contact_name,
|
||||
updated_at = datetime('now')
|
||||
""",
|
||||
(public_key, contact_name),
|
||||
)
|
||||
|
||||
|
||||
def add_message(conn: sqlite3.Connection, public_key: str, role: str, content: str) -> None:
|
||||
conn.execute(
|
||||
"INSERT INTO messages (public_key, role, content) VALUES (?, ?, ?)",
|
||||
(public_key, role, content),
|
||||
)
|
||||
|
||||
|
||||
def get_history(conn: sqlite3.Connection, public_key: str) -> list[dict[str, str]]:
|
||||
"""Return the conversation as OpenAI chat messages, oldest first."""
|
||||
rows = conn.execute(
|
||||
"SELECT role, content FROM messages WHERE public_key = ? ORDER BY id ASC",
|
||||
(public_key,),
|
||||
).fetchall()
|
||||
return [{"role": row["role"], "content": row["content"]} for row in rows]
|
||||
@@ -0,0 +1,38 @@
|
||||
"""Thin wrapper around the OpenAI Python SDK aimed at OpenAI-compatible servers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
|
||||
class LLMClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
api_key: str,
|
||||
model: str,
|
||||
system_prompt: str,
|
||||
temperature: float,
|
||||
timeout: float,
|
||||
) -> None:
|
||||
self._client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=timeout)
|
||||
self._model = model
|
||||
self._system_prompt = system_prompt
|
||||
self._temperature = temperature
|
||||
|
||||
async def reply(self, history: list[dict[str, str]]) -> str:
|
||||
"""Send the system prompt + ``history`` and return the assistant's text."""
|
||||
messages: list[dict[str, str]] = [
|
||||
{"role": "system", "content": self._system_prompt},
|
||||
*history,
|
||||
]
|
||||
resp = await self._client.chat.completions.create(
|
||||
model=self._model,
|
||||
messages=messages,
|
||||
temperature=self._temperature,
|
||||
)
|
||||
return (resp.choices[0].message.content or "").strip()
|
||||
|
||||
async def aclose(self) -> None:
|
||||
await self._client.close()
|
||||
@@ -0,0 +1,21 @@
|
||||
"""Helpers for shaping outgoing mesh messages."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def trim_to_bytes(text: str, max_bytes: int) -> str:
|
||||
"""Return ``text`` truncated so its UTF-8 encoding is at most ``max_bytes`` bytes.
|
||||
|
||||
Backs off if the cut lands inside a multi-byte UTF-8 sequence so we never emit
|
||||
invalid UTF-8 to the radio.
|
||||
"""
|
||||
if max_bytes <= 0:
|
||||
return ""
|
||||
encoded = text.encode("utf-8")
|
||||
if len(encoded) <= max_bytes:
|
||||
return text
|
||||
cut = encoded[:max_bytes]
|
||||
# Continuation bytes start with bits 10xxxxxx; rewind past them.
|
||||
while cut and (cut[-1] & 0xC0) == 0x80:
|
||||
cut = cut[:-1]
|
||||
return cut.decode("utf-8", errors="ignore")
|
||||
Reference in New Issue
Block a user