From 0bab65809873266ed33c1f4097fb937004a2a1ce Mon Sep 17 00:00:00 2001 From: Tobias Huttinger Date: Wed, 6 May 2026 20:16:53 +0200 Subject: [PATCH] Implemented message deduplication for consecuitve received messages from one user in a specified timeframe --- src/lorabot/handler.py | 115 +++++++++++++++++++++++++++-------------- 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/src/lorabot/handler.py b/src/lorabot/handler.py index bfcf7aa..42b4259 100644 --- a/src/lorabot/handler.py +++ b/src/lorabot/handler.py @@ -5,12 +5,13 @@ from __future__ import annotations import asyncio import logging import sqlite3 +import time from collections import defaultdict from collections.abc import Awaitable, Callable from datetime import datetime, timezone from . import db -from .commands import CommandContext, CommandRegistry +from .commands import CommandContext, CommandRegistry, CommandResult from .config import Settings from .llm import LLMClient from .transport import MeshTransport @@ -23,6 +24,65 @@ def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() +class _Deduplicator: + """Drops consecutive identical messages from the same sender within a time window.""" + + def __init__(self, window: float = 15.0) -> None: + self._window = window + self._seen: dict[str, tuple[str, float]] = {} + + def is_duplicate(self, key: str, text: str) -> bool: + now = time.monotonic() + prev_text, prev_time = self._seen.get(key, ("", 0.0)) + if text == prev_text and now - prev_time < self._window: + return True + self._seen[key] = (text, now) + return False + + +def _store_and_publish( + db_conn: sqlite3.Connection, + state: AppState, + public_key: str, + contact_name: str, + role: str, + text: str, + hidden_from_llm: bool, +) -> None: + db.add_message(db_conn, public_key, role, text, hidden_from_llm=hidden_from_llm) + state.publish("message", { + "public_key": public_key, + "contact_name": contact_name, + "role": role, + "content": text, + "created_at": _now_iso(), + }) + + +async def _generate_reply( + registry: CommandRegistry, + llm: LLMClient, + db_conn: sqlite3.Connection, + public_key: str, + ctx: CommandContext, + text: str, + is_command: bool, +) -> tuple[str, CommandResult | None] | None: + """Dispatch to command or LLM. Returns (reply, cmd_result), or None to abort.""" + if is_command: + cmd_result = await registry.dispatch(ctx, text) + if cmd_result is None or cmd_result.reply is None: + return None + return cmd_result.reply, cmd_result + thinking = db.get_thinking_enabled(db_conn, public_key) + try: + reply = await llm.reply(db.get_history(db_conn, public_key), thinking=thinking) + except Exception: + log.exception("LLM call failed for %s", public_key[:12]) + return None + return reply, None + + def build_dm_handler( *, db_conn: sqlite3.Connection, @@ -34,9 +94,8 @@ def build_dm_handler( ) -> Callable[[object], Awaitable[None]]: """Return an ``on_dm(event)`` closure with all collaborators bound.""" - # One lock per sender so a burst of messages from the same peer is processed - # serially while different peers stay independent. locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + dedup = _Deduplicator(window=15.0) async def on_dm(event) -> None: data = event.payload or {} @@ -52,23 +111,18 @@ def build_dm_handler( public_key = contact["public_key"] contact_name = contact.get("adv_name", "") - log.info("DM from %s (%s): %s", contact_name, public_key[:12], text) - # Decided up front so both turns of a command exchange are stored consistently - # with hidden_from_llm. Commands and their replies stay out of LLM context; - # the web UI still shows them. + if dedup.is_duplicate(public_key, text): + log.info("dropping duplicate DM from %s (%s): %s", contact_name, public_key[:12], text) + return + + log.info("DM from %s (%s): %s", contact_name, public_key[:12], text) is_command = registry.parse(text) is not None async with locks[public_key]: db.upsert_conversation(db_conn, public_key, contact_name) - db.add_message(db_conn, public_key, "user", text, hidden_from_llm=is_command) - state.publish("message", { - "public_key": public_key, - "contact_name": contact_name, - "role": "user", - "content": text, - "created_at": _now_iso(), - }) + _store_and_publish(db_conn, state, public_key, contact_name, "user", text, + hidden_from_llm=is_command) ctx = CommandContext( db_conn=db_conn, @@ -78,36 +132,17 @@ def build_dm_handler( state=state, ) - if is_command: - cmd_result = await registry.dispatch(ctx, text) - if cmd_result is None or cmd_result.reply is None: - return - reply = cmd_result.reply - else: - cmd_result = None - thinking = db.get_thinking_enabled(db_conn, public_key) - try: - reply = await llm.reply( - db.get_history(db_conn, public_key), - thinking=thinking, - ) - except Exception: - log.exception("LLM call failed for %s", public_key[:12]) - return + result = await _generate_reply(registry, llm, db_conn, public_key, ctx, text, is_command) + if result is None: + return + reply, cmd_result = result delivered = await transport.send_chunked(contact, reply, cfg.message.max_bytes) if not delivered: - # Nothing made it onto the radio; don't persist anything return - db.add_message(db_conn, public_key, "assistant", delivered, hidden_from_llm=is_command) - state.publish("message", { - "public_key": public_key, - "contact_name": contact_name, - "role": "assistant", - "content": delivered, - "created_at": _now_iso(), - }) + _store_and_publish(db_conn, state, public_key, contact_name, "assistant", delivered, + hidden_from_llm=is_command) if cmd_result is not None and cmd_result.after_send is not None: await cmd_result.after_send(ctx)