diff --git a/config.example.toml b/config.example.toml index c8051b7..5430566 100644 --- a/config.example.toml +++ b/config.example.toml @@ -28,9 +28,12 @@ sqlite_path = "data/lorabot.db" # MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame # headers further constrain the usable payload on your device. max_bytes = 184 -# Seconds to wait for an ACK before treating a chunk as failed. -ack_timeout_seconds = 30 -# How many times to retry a chunk after failure (0 = no retries). +# Per-attempt ACK wait. 0 = trust the device's path-aware suggestion (recommended). +# Set a positive value only if you need to override that suggestion. +ack_timeout_seconds = 0 +# How many times to retry a chunk after failure (0 = no retries). Total attempts +# = send_retries + 1. With send_retries >= 2 the third attempt onwards is sent as +# a flood broadcast (multi-hop) instead of direct. send_retries = 1 [web] @@ -47,7 +50,7 @@ enabled = true interval_seconds = 3600 at_startup = true # Flood = multi-hop advert across the mesh. False = zero-hop (neighbors only). -flood = false +flood = true # LLM tool calling. The weather tool (Open-Meteo, no key) is always on. Tools # in this section are optional and only registered when configured. Requires a diff --git a/src/lorabot/bot.py b/src/lorabot/bot.py index d4f0d9b..b5c1a35 100644 --- a/src/lorabot/bot.py +++ b/src/lorabot/bot.py @@ -56,10 +56,17 @@ async def run(cfg: Settings | None = None) -> None: mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate) # Default is False: the lib only marks contacts dirty on ADVERTISEMENT and # waits for the caller to re-pull. Turn it on so a fresh advert is reflected - # in the local cache before the peer's first DM lands. - mc.auto_update_contacts = True + # in the local cache before the peer's first DM lands. Not needed anymore since we're tracking + # contacts ourselves now. + # mc.auto_update_contacts = True await mc.ensure_contacts() - transport = MeshTransport(mc, ack_timeout=cfg.message.ack_timeout_seconds, send_retries=cfg.message.send_retries) + transport = MeshTransport( + mc, + db_conn, + ack_timeout=cfg.message.ack_timeout_seconds, + send_retries=cfg.message.send_retries, + ) + transport.sync_contacts() except BaseException: state.set_connected(False) if web_task is not None: @@ -102,6 +109,7 @@ async def run(cfg: Settings | None = None) -> None: ) sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm) + sub_contacts = mc.subscribe(EventType.NEW_CONTACT, transport.on_new_contact) await mc.start_auto_message_fetching() log.info("lorabot listening on %s", cfg.meshcore.serial_port) @@ -111,6 +119,7 @@ async def run(cfg: Settings | None = None) -> None: state.set_connected(False) state.set_advertise_callback(None) mc.unsubscribe(sub) + mc.unsubscribe(sub_contacts) await mc.stop_auto_message_fetching() await mc.disconnect() await llm.aclose() diff --git a/src/lorabot/config.py b/src/lorabot/config.py index a96051b..e510baf 100644 --- a/src/lorabot/config.py +++ b/src/lorabot/config.py @@ -37,7 +37,13 @@ class StorageCfg(BaseModel): class MessageCfg(BaseModel): max_bytes: int = Field(default=184, gt=0) - ack_timeout_seconds: float = Field(default=30.0, gt=0) + # 0 = use the device's path-aware suggested timeout (recommended). Set a + # positive value to override per attempt — useful only when the device's + # suggestion is consistently wrong for your link conditions. + ack_timeout_seconds: float = Field(default=0.0, ge=0) + # Number of retries after the first attempt. Total attempts = send_retries + 1. + # With send_retries >= 2 the third attempt onwards is sent as a flood broadcast + # (multi-hop) instead of direct — see meshcore's send_msg_with_retry. send_retries: int = Field(default=1, ge=0) diff --git a/src/lorabot/db.py b/src/lorabot/db.py index 45c32ce..598cf79 100644 --- a/src/lorabot/db.py +++ b/src/lorabot/db.py @@ -1,11 +1,19 @@ -"""SQLite persistence for per-sender conversation history.""" +"""SQLite persistence for conversation history and contacts.""" from __future__ import annotations +import json import sqlite3 from pathlib import Path SCHEMA = """ +CREATE TABLE IF NOT EXISTS contacts ( + public_key TEXT PRIMARY KEY, + adv_name TEXT, + data TEXT NOT NULL, + seen_at TEXT NOT NULL DEFAULT (datetime('now')) +); + CREATE TABLE IF NOT EXISTS conversations ( public_key TEXT PRIMARY KEY, contact_name TEXT, @@ -126,6 +134,28 @@ def set_thinking_enabled(conn: sqlite3.Connection, public_key: str, enabled: boo ) +def upsert_contact(conn: sqlite3.Connection, contact: dict) -> None: + conn.execute( + """ + INSERT INTO contacts (public_key, adv_name, data) + VALUES (?, ?, ?) + ON CONFLICT(public_key) DO UPDATE SET + adv_name = excluded.adv_name, + data = excluded.data, + seen_at = datetime('now') + """, + (contact["public_key"], contact.get("adv_name"), json.dumps(contact)), + ) + + +def get_contact_by_key_prefix(conn: sqlite3.Connection, prefix: str) -> dict | None: + row = conn.execute( + "SELECT data FROM contacts WHERE public_key LIKE ? || '%'", + (prefix,), + ).fetchone() + return json.loads(row["data"]) if row is not None else None + + def clear_history(conn: sqlite3.Connection, public_key: str) -> None: """Bump the per-conversation watermark to the current max message id. diff --git a/src/lorabot/handler.py b/src/lorabot/handler.py index 42b4259..bdebe96 100644 --- a/src/lorabot/handler.py +++ b/src/lorabot/handler.py @@ -104,7 +104,7 @@ def build_dm_handler( if not prefix or not text: return - contact = await transport.resolve_contact(prefix) + contact = transport.resolve_contact(prefix) if contact is None: log.info("ignoring DM from unknown sender %s (no contact after refresh)", prefix) return diff --git a/src/lorabot/transport.py b/src/lorabot/transport.py index 42c3a39..cea6964 100644 --- a/src/lorabot/transport.py +++ b/src/lorabot/transport.py @@ -4,14 +4,17 @@ from __future__ import annotations import asyncio import logging +import sqlite3 from collections import defaultdict from meshcore import EventType, MeshCore +from . import db from .messages import split_to_bytes log = logging.getLogger("lorabot") + class MeshTransport: """Owns the MeshCore connection reference; handles contact resolution and reliable chunked message delivery. @@ -19,28 +22,46 @@ class MeshTransport: Create one instance per MeshCore connection and pass it to build_dm_handler. """ - def __init__(self, mc: MeshCore, ack_timeout: float = 30.0, send_retries: int = 1) -> None: + def __init__( + self, + mc: MeshCore, + db_conn: sqlite3.Connection, + ack_timeout: float = 0.0, + send_retries: int = 1, + ) -> None: self._mc = mc + self._db = db_conn self._ack_timeout = ack_timeout self._send_retries = send_retries - self._contacts_lock = asyncio.Lock() self._send_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) - async def resolve_contact(self, prefix: str): - """Look up a contact by pubkey prefix; re-pull contacts from the device on miss.""" - contact = self._mc.get_contact_by_key_prefix(prefix) - if contact is not None: - return contact - async with self._contacts_lock: - contact = self._mc.get_contact_by_key_prefix(prefix) - if contact is not None: - return contact - try: - await self._mc.commands.get_contacts() - except Exception: - log.exception("get_contacts refresh failed") - return None - return self._mc.get_contact_by_key_prefix(prefix) + # ------------------------------------------------------------------ + # Contact management + # ------------------------------------------------------------------ + + def sync_contacts(self) -> None: + """Bulk-upsert the device's current contact list into the DB. + + Call once after the initial ensure_contacts() on connect. + """ + for contact in self._mc.contacts.values(): + db.upsert_contact(self._db, contact) + log.info("synced %d contacts to DB", len(self._mc.contacts)) + + async def on_new_contact(self, event) -> None: + """Subscriber for EventType.NEW_CONTACT — persists the contact to DB.""" + contact = event.payload + db.upsert_contact(self._db, contact) + log.info("new contact stored: %s (%s)", + contact.get("adv_name", ""), contact["public_key"][:12]) + + def resolve_contact(self, prefix: str): + """Look up a contact by pubkey prefix from the local DB.""" + return db.get_contact_by_key_prefix(self._db, prefix) + + # ------------------------------------------------------------------ + # Chunked sending + # ------------------------------------------------------------------ async def send_chunked( self, @@ -74,27 +95,32 @@ class MeshTransport: return "".join(sent) async def _send_chunk(self, contact, chunk: str) -> bool: - """Send one chunk, retrying once on failure.""" - for attempt in range(self._send_retries + 1): - if await self._attempt_send(contact, chunk): - return True - if attempt < self._send_retries: - log.info("retrying chunk for %s (attempt %d/%d)", - contact["public_key"][:12], attempt + 2, self._send_retries + 1) - log.error("chunk delivery failed for %s after %d attempts", - contact["public_key"][:12], self._send_retries + 1) - return False + """Send one chunk and wait for the recipient ACK. - async def _attempt_send(self, contact, chunk: str) -> bool: - """One send attempt. Returns True on ACK, False on device error or timeout.""" + Delegates to ``send_msg_with_retry`` which correlates the ACK event by + ``expected_ack`` code and retries up to ``max_attempts`` times. Returns + ``None`` when no ACK arrives across all attempts. + + Note on the library's default ``flood_after=2``: from the third attempt + onwards the library resets the path and re-sends as a flood broadcast. + This only kicks in when ``send_retries >= 2`` (max_attempts >= 3); with + the default ``send_retries=1`` we stay direct-only. + """ + pk_short = contact["public_key"][:12] try: - result = await asyncio.wait_for( - self._mc.commands.send_msg(contact, chunk), timeout=self._ack_timeout + result = await self._mc.commands.send_msg_with_retry( + contact, chunk, + max_attempts=self._send_retries + 1, + timeout=self._ack_timeout, ) - if result.type != EventType.ERROR: - return True - log.warning("send_msg error for %s: %s", contact["public_key"][:12], result.payload) - except asyncio.TimeoutError: - log.warning("ACK timeout for %s after %.1fs", - contact["public_key"][:12], self._ack_timeout) - return False + except Exception: + log.exception("send to %s raised", pk_short) + return False + if result is None: + log.error("no ACK from %s after %d attempts", + pk_short, self._send_retries + 1) + return False + if result.type == EventType.ERROR: + log.warning("send_msg error for %s: %s", pk_short, result.payload) + return False + return True diff --git a/src/lorabot/web.py b/src/lorabot/web.py index 40318f6..e288bdb 100644 --- a/src/lorabot/web.py +++ b/src/lorabot/web.py @@ -58,6 +58,7 @@ class AppState: "last_advert_at": _iso(self.last_advert_at), "last_advert_ok": self.last_advert_ok, "advertise_available": self._advertise is not None, + "contact_count": self.db.execute("SELECT COUNT(*) FROM contacts").fetchone()[0], } def set_advertise_callback(self, cb: AdvertCallback | None) -> None: @@ -145,6 +146,13 @@ def _list_conversations(db: sqlite3.Connection) -> list[dict]: return [dict(r) for r in rows] +def _list_contacts(db: sqlite3.Connection) -> list[dict]: + rows = db.execute( + "SELECT public_key, adv_name, seen_at FROM contacts ORDER BY adv_name ASC" + ).fetchall() + return [dict(r) for r in rows] + + def _get_conversation(db: sqlite3.Connection, public_key: str) -> dict | None: conv = db.execute( "SELECT public_key, contact_name, cleared_at_id, created_at, updated_at " @@ -173,6 +181,10 @@ async def _api_conversations(req: web.Request) -> web.Response: return web.json_response(_list_conversations(_state(req).db)) +async def _api_contacts(req: web.Request) -> web.Response: + return web.json_response(_list_contacts(_state(req).db)) + + def _require_same_origin(req: web.Request) -> None: """Reject obvious cross-site browser requests on state-changing endpoints. @@ -249,6 +261,7 @@ def build_app(state: AppState) -> web.Application: app.router.add_get("/", _index) app.router.add_get("/api/status", _api_status) app.router.add_get("/api/conversations", _api_conversations) + app.router.add_get("/api/contacts", _api_contacts) app.router.add_get("/api/conversations/{pk}", _api_conversation) app.router.add_get("/api/events", _api_events) app.router.add_post("/api/advertise", _api_advertise) @@ -320,13 +333,34 @@ aside { overflow-y: auto; background: var(--bg-alt); } -.aside-head { - padding: 8px 14px; - color: var(--dim); +.tabs { + display: flex; border-bottom: 1px dashed var(--line); - text-transform: lowercase; - letter-spacing: 1px; } +.tab-btn { + background: transparent; + color: var(--dim); + border: none; + border-right: 1px solid var(--line); + font: inherit; + font-size: 12px; + padding: 7px 12px; + cursor: pointer; + flex: 1; + text-align: left; + letter-spacing: 0.5px; + text-transform: lowercase; +} +.tab-btn:last-child { border-right: none; } +.tab-btn.active { color: var(--accent); } +.tab-btn:hover:not(.active) { color: var(--fg); } +.contact { + padding: 8px 14px; + border-bottom: 1px solid var(--line); +} +.contact .cname { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } +.contact .cname::before { content: "@ "; color: var(--dim); } +.contact .cdetail { color: var(--dim); font-size: 11px; display: flex; justify-content: space-between; gap: 8px; margin-top: 2px; } .conv { padding: 8px 14px; cursor: pointer; @@ -410,7 +444,10 @@ section { display: flex; flex-direction: column; min-height: 0; }
@@ -426,8 +463,10 @@ section { display: flex; flex-direction: column; min-height: 0; } "use strict"; const $ = (id) => document.getElementById(id); let conversations = []; +let contacts = []; let selectedKey = null; let connectedSince = null; +let activeTab = "convs"; function escapeHTML(s) { return (s == null ? "" : String(s)).replace(/[&<>"']/g, (c) => ({ @@ -565,6 +604,7 @@ function setStatus(s) { const btn = $("advert-btn"); btn.disabled = !s.advertise_available; connectedSince = s.connected_since; + if (s.contact_count != null) $("contact-count").textContent = s.contact_count; tickUptime(); } @@ -592,6 +632,38 @@ function tickUptime() { } setInterval(tickUptime, 1000); +function renderContacts() { + const el = $("sidebar"); + if (!contacts.length) { + el.innerHTML = '
no contacts yet
'; + return; + } + el.innerHTML = contacts.map((c) => ` +
+
${escapeHTML(c.adv_name || "?")}
+
+ ${escapeHTML(c.public_key.slice(0, 16))}… + ${escapeHTML(fmtTime(c.seen_at))} +
+
`).join(""); +} + +function setTab(tab) { + activeTab = tab; + $("tab-convs").classList.toggle("active", tab === "convs"); + $("tab-contacts").classList.toggle("active", tab === "contacts"); + if (tab === "convs") renderSidebar(); + else refreshContacts(); +} + +async function refreshContacts() { + try { + contacts = await fetchJSON("/api/contacts"); + $("contact-count").textContent = contacts.length; + } catch (e) { /* silently ignore */ } + if (activeTab === "contacts") renderContacts(); +} + async function refreshList() { conversations = await fetchJSON("/api/conversations"); renderSidebar(); @@ -633,6 +705,8 @@ function startStream() { (async function init() { $("advert-btn").addEventListener("click", sendAdvert); + $("tab-convs").addEventListener("click", () => setTab("convs")); + $("tab-contacts").addEventListener("click", () => setTab("contacts")); try { setStatus(await fetchJSON("/api/status")); await refreshList();