Reworked contact system and fixed ack sending

This commit is contained in:
2026-05-06 21:42:08 +02:00
parent 0bab658098
commit 136f15d35f
7 changed files with 202 additions and 54 deletions
+7 -4
View File
@@ -28,9 +28,12 @@ sqlite_path = "data/lorabot.db"
# MeshCore MAX_PACKET_PAYLOAD is 184 bytes. Lower this if your text-frame
# headers further constrain the usable payload on your device.
max_bytes = 184
# Seconds to wait for an ACK before treating a chunk as failed.
ack_timeout_seconds = 30
# How many times to retry a chunk after failure (0 = no retries).
# Per-attempt ACK wait. 0 = trust the device's path-aware suggestion (recommended).
# Set a positive value only if you need to override that suggestion.
ack_timeout_seconds = 0
# How many times to retry a chunk after failure (0 = no retries). Total attempts
# = send_retries + 1. With send_retries >= 2 the third attempt onwards is sent as
# a flood broadcast (multi-hop) instead of direct.
send_retries = 1
[web]
@@ -47,7 +50,7 @@ enabled = true
interval_seconds = 3600
at_startup = true
# Flood = multi-hop advert across the mesh. False = zero-hop (neighbors only).
flood = false
flood = true
# LLM tool calling. The weather tool (Open-Meteo, no key) is always on. Tools
# in this section are optional and only registered when configured. Requires a
+12 -3
View File
@@ -56,10 +56,17 @@ async def run(cfg: Settings | None = None) -> None:
mc = await MeshCore.create_serial(cfg.meshcore.serial_port, cfg.meshcore.baud_rate)
# Default is False: the lib only marks contacts dirty on ADVERTISEMENT and
# waits for the caller to re-pull. Turn it on so a fresh advert is reflected
# in the local cache before the peer's first DM lands.
mc.auto_update_contacts = True
# in the local cache before the peer's first DM lands. Not needed anymore since we're tracking
# contacts ourselves now.
# mc.auto_update_contacts = True
await mc.ensure_contacts()
transport = MeshTransport(mc, ack_timeout=cfg.message.ack_timeout_seconds, send_retries=cfg.message.send_retries)
transport = MeshTransport(
mc,
db_conn,
ack_timeout=cfg.message.ack_timeout_seconds,
send_retries=cfg.message.send_retries,
)
transport.sync_contacts()
except BaseException:
state.set_connected(False)
if web_task is not None:
@@ -102,6 +109,7 @@ async def run(cfg: Settings | None = None) -> None:
)
sub = mc.subscribe(EventType.CONTACT_MSG_RECV, on_dm)
sub_contacts = mc.subscribe(EventType.NEW_CONTACT, transport.on_new_contact)
await mc.start_auto_message_fetching()
log.info("lorabot listening on %s", cfg.meshcore.serial_port)
@@ -111,6 +119,7 @@ async def run(cfg: Settings | None = None) -> None:
state.set_connected(False)
state.set_advertise_callback(None)
mc.unsubscribe(sub)
mc.unsubscribe(sub_contacts)
await mc.stop_auto_message_fetching()
await mc.disconnect()
await llm.aclose()
+7 -1
View File
@@ -37,7 +37,13 @@ class StorageCfg(BaseModel):
class MessageCfg(BaseModel):
max_bytes: int = Field(default=184, gt=0)
ack_timeout_seconds: float = Field(default=30.0, gt=0)
# 0 = use the device's path-aware suggested timeout (recommended). Set a
# positive value to override per attempt — useful only when the device's
# suggestion is consistently wrong for your link conditions.
ack_timeout_seconds: float = Field(default=0.0, ge=0)
# Number of retries after the first attempt. Total attempts = send_retries + 1.
# With send_retries >= 2 the third attempt onwards is sent as a flood broadcast
# (multi-hop) instead of direct — see meshcore's send_msg_with_retry.
send_retries: int = Field(default=1, ge=0)
+31 -1
View File
@@ -1,11 +1,19 @@
"""SQLite persistence for per-sender conversation history."""
"""SQLite persistence for conversation history and contacts."""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
public_key TEXT PRIMARY KEY,
adv_name TEXT,
data TEXT NOT NULL,
seen_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS conversations (
public_key TEXT PRIMARY KEY,
contact_name TEXT,
@@ -126,6 +134,28 @@ def set_thinking_enabled(conn: sqlite3.Connection, public_key: str, enabled: boo
)
def upsert_contact(conn: sqlite3.Connection, contact: dict) -> None:
conn.execute(
"""
INSERT INTO contacts (public_key, adv_name, data)
VALUES (?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
adv_name = excluded.adv_name,
data = excluded.data,
seen_at = datetime('now')
""",
(contact["public_key"], contact.get("adv_name"), json.dumps(contact)),
)
def get_contact_by_key_prefix(conn: sqlite3.Connection, prefix: str) -> dict | None:
row = conn.execute(
"SELECT data FROM contacts WHERE public_key LIKE ? || '%'",
(prefix,),
).fetchone()
return json.loads(row["data"]) if row is not None else None
def clear_history(conn: sqlite3.Connection, public_key: str) -> None:
"""Bump the per-conversation watermark to the current max message id.
+1 -1
View File
@@ -104,7 +104,7 @@ def build_dm_handler(
if not prefix or not text:
return
contact = await transport.resolve_contact(prefix)
contact = transport.resolve_contact(prefix)
if contact is None:
log.info("ignoring DM from unknown sender %s (no contact after refresh)", prefix)
return
+64 -38
View File
@@ -4,14 +4,17 @@ from __future__ import annotations
import asyncio
import logging
import sqlite3
from collections import defaultdict
from meshcore import EventType, MeshCore
from . import db
from .messages import split_to_bytes
log = logging.getLogger("lorabot")
class MeshTransport:
"""Owns the MeshCore connection reference; handles contact resolution and
reliable chunked message delivery.
@@ -19,28 +22,46 @@ class MeshTransport:
Create one instance per MeshCore connection and pass it to build_dm_handler.
"""
def __init__(self, mc: MeshCore, ack_timeout: float = 30.0, send_retries: int = 1) -> None:
def __init__(
self,
mc: MeshCore,
db_conn: sqlite3.Connection,
ack_timeout: float = 0.0,
send_retries: int = 1,
) -> None:
self._mc = mc
self._db = db_conn
self._ack_timeout = ack_timeout
self._send_retries = send_retries
self._contacts_lock = asyncio.Lock()
self._send_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def resolve_contact(self, prefix: str):
"""Look up a contact by pubkey prefix; re-pull contacts from the device on miss."""
contact = self._mc.get_contact_by_key_prefix(prefix)
if contact is not None:
return contact
async with self._contacts_lock:
contact = self._mc.get_contact_by_key_prefix(prefix)
if contact is not None:
return contact
try:
await self._mc.commands.get_contacts()
except Exception:
log.exception("get_contacts refresh failed")
return None
return self._mc.get_contact_by_key_prefix(prefix)
# ------------------------------------------------------------------
# Contact management
# ------------------------------------------------------------------
def sync_contacts(self) -> None:
"""Bulk-upsert the device's current contact list into the DB.
Call once after the initial ensure_contacts() on connect.
"""
for contact in self._mc.contacts.values():
db.upsert_contact(self._db, contact)
log.info("synced %d contacts to DB", len(self._mc.contacts))
async def on_new_contact(self, event) -> None:
"""Subscriber for EventType.NEW_CONTACT — persists the contact to DB."""
contact = event.payload
db.upsert_contact(self._db, contact)
log.info("new contact stored: %s (%s)",
contact.get("adv_name", ""), contact["public_key"][:12])
def resolve_contact(self, prefix: str):
"""Look up a contact by pubkey prefix from the local DB."""
return db.get_contact_by_key_prefix(self._db, prefix)
# ------------------------------------------------------------------
# Chunked sending
# ------------------------------------------------------------------
async def send_chunked(
self,
@@ -74,27 +95,32 @@ class MeshTransport:
return "".join(sent)
async def _send_chunk(self, contact, chunk: str) -> bool:
"""Send one chunk, retrying once on failure."""
for attempt in range(self._send_retries + 1):
if await self._attempt_send(contact, chunk):
return True
if attempt < self._send_retries:
log.info("retrying chunk for %s (attempt %d/%d)",
contact["public_key"][:12], attempt + 2, self._send_retries + 1)
log.error("chunk delivery failed for %s after %d attempts",
contact["public_key"][:12], self._send_retries + 1)
return False
"""Send one chunk and wait for the recipient ACK.
async def _attempt_send(self, contact, chunk: str) -> bool:
"""One send attempt. Returns True on ACK, False on device error or timeout."""
Delegates to ``send_msg_with_retry`` which correlates the ACK event by
``expected_ack`` code and retries up to ``max_attempts`` times. Returns
``None`` when no ACK arrives across all attempts.
Note on the library's default ``flood_after=2``: from the third attempt
onwards the library resets the path and re-sends as a flood broadcast.
This only kicks in when ``send_retries >= 2`` (max_attempts >= 3); with
the default ``send_retries=1`` we stay direct-only.
"""
pk_short = contact["public_key"][:12]
try:
result = await asyncio.wait_for(
self._mc.commands.send_msg(contact, chunk), timeout=self._ack_timeout
result = await self._mc.commands.send_msg_with_retry(
contact, chunk,
max_attempts=self._send_retries + 1,
timeout=self._ack_timeout,
)
if result.type != EventType.ERROR:
return True
log.warning("send_msg error for %s: %s", contact["public_key"][:12], result.payload)
except asyncio.TimeoutError:
log.warning("ACK timeout for %s after %.1fs",
contact["public_key"][:12], self._ack_timeout)
return False
except Exception:
log.exception("send to %s raised", pk_short)
return False
if result is None:
log.error("no ACK from %s after %d attempts",
pk_short, self._send_retries + 1)
return False
if result.type == EventType.ERROR:
log.warning("send_msg error for %s: %s", pk_short, result.payload)
return False
return True
+80 -6
View File
@@ -58,6 +58,7 @@ class AppState:
"last_advert_at": _iso(self.last_advert_at),
"last_advert_ok": self.last_advert_ok,
"advertise_available": self._advertise is not None,
"contact_count": self.db.execute("SELECT COUNT(*) FROM contacts").fetchone()[0],
}
def set_advertise_callback(self, cb: AdvertCallback | None) -> None:
@@ -145,6 +146,13 @@ def _list_conversations(db: sqlite3.Connection) -> list[dict]:
return [dict(r) for r in rows]
def _list_contacts(db: sqlite3.Connection) -> list[dict]:
rows = db.execute(
"SELECT public_key, adv_name, seen_at FROM contacts ORDER BY adv_name ASC"
).fetchall()
return [dict(r) for r in rows]
def _get_conversation(db: sqlite3.Connection, public_key: str) -> dict | None:
conv = db.execute(
"SELECT public_key, contact_name, cleared_at_id, created_at, updated_at "
@@ -173,6 +181,10 @@ async def _api_conversations(req: web.Request) -> web.Response:
return web.json_response(_list_conversations(_state(req).db))
async def _api_contacts(req: web.Request) -> web.Response:
return web.json_response(_list_contacts(_state(req).db))
def _require_same_origin(req: web.Request) -> None:
"""Reject obvious cross-site browser requests on state-changing endpoints.
@@ -249,6 +261,7 @@ def build_app(state: AppState) -> web.Application:
app.router.add_get("/", _index)
app.router.add_get("/api/status", _api_status)
app.router.add_get("/api/conversations", _api_conversations)
app.router.add_get("/api/contacts", _api_contacts)
app.router.add_get("/api/conversations/{pk}", _api_conversation)
app.router.add_get("/api/events", _api_events)
app.router.add_post("/api/advertise", _api_advertise)
@@ -320,13 +333,34 @@ aside {
overflow-y: auto;
background: var(--bg-alt);
}
.aside-head {
padding: 8px 14px;
color: var(--dim);
.tabs {
display: flex;
border-bottom: 1px dashed var(--line);
text-transform: lowercase;
letter-spacing: 1px;
}
.tab-btn {
background: transparent;
color: var(--dim);
border: none;
border-right: 1px solid var(--line);
font: inherit;
font-size: 12px;
padding: 7px 12px;
cursor: pointer;
flex: 1;
text-align: left;
letter-spacing: 0.5px;
text-transform: lowercase;
}
.tab-btn:last-child { border-right: none; }
.tab-btn.active { color: var(--accent); }
.tab-btn:hover:not(.active) { color: var(--fg); }
.contact {
padding: 8px 14px;
border-bottom: 1px solid var(--line);
}
.contact .cname { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; }
.contact .cname::before { content: "@ "; color: var(--dim); }
.contact .cdetail { color: var(--dim); font-size: 11px; display: flex; justify-content: space-between; gap: 8px; margin-top: 2px; }
.conv {
padding: 8px 14px;
cursor: pointer;
@@ -410,7 +444,10 @@ section { display: flex; flex-direction: column; min-height: 0; }
</header>
<main>
<aside>
<div class="aside-head">conversations</div>
<div class="tabs">
<button class="tab-btn active" id="tab-convs" type="button">conversations</button>
<button class="tab-btn" id="tab-contacts" type="button">contacts (<span id="contact-count">—</span>)</button>
</div>
<div id="sidebar"><div class="empty">none yet<span class="cursor"></span></div></div>
</aside>
<section>
@@ -426,8 +463,10 @@ section { display: flex; flex-direction: column; min-height: 0; }
"use strict";
const $ = (id) => document.getElementById(id);
let conversations = [];
let contacts = [];
let selectedKey = null;
let connectedSince = null;
let activeTab = "convs";
function escapeHTML(s) {
return (s == null ? "" : String(s)).replace(/[&<>"']/g, (c) => ({
@@ -565,6 +604,7 @@ function setStatus(s) {
const btn = $("advert-btn");
btn.disabled = !s.advertise_available;
connectedSince = s.connected_since;
if (s.contact_count != null) $("contact-count").textContent = s.contact_count;
tickUptime();
}
@@ -592,6 +632,38 @@ function tickUptime() {
}
setInterval(tickUptime, 1000);
function renderContacts() {
const el = $("sidebar");
if (!contacts.length) {
el.innerHTML = '<div class="empty">no contacts yet</div>';
return;
}
el.innerHTML = contacts.map((c) => `
<div class="contact">
<div class="cname">${escapeHTML(c.adv_name || "?")}</div>
<div class="cdetail">
<span>${escapeHTML(c.public_key.slice(0, 16))}…</span>
<span>${escapeHTML(fmtTime(c.seen_at))}</span>
</div>
</div>`).join("");
}
function setTab(tab) {
activeTab = tab;
$("tab-convs").classList.toggle("active", tab === "convs");
$("tab-contacts").classList.toggle("active", tab === "contacts");
if (tab === "convs") renderSidebar();
else refreshContacts();
}
async function refreshContacts() {
try {
contacts = await fetchJSON("/api/contacts");
$("contact-count").textContent = contacts.length;
} catch (e) { /* silently ignore */ }
if (activeTab === "contacts") renderContacts();
}
async function refreshList() {
conversations = await fetchJSON("/api/conversations");
renderSidebar();
@@ -633,6 +705,8 @@ function startStream() {
(async function init() {
$("advert-btn").addEventListener("click", sendAdvert);
$("tab-convs").addEventListener("click", () => setTab("convs"));
$("tab-contacts").addEventListener("click", () => setTab("contacts"));
try {
setStatus(await fetchJSON("/api/status"));
await refreshList();