Implemented message deduplication for consecuitve received messages from one user in a specified timeframe
This commit is contained in:
+74
-39
@@ -5,12 +5,13 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from . import db
|
from . import db
|
||||||
from .commands import CommandContext, CommandRegistry
|
from .commands import CommandContext, CommandRegistry, CommandResult
|
||||||
from .config import Settings
|
from .config import Settings
|
||||||
from .llm import LLMClient
|
from .llm import LLMClient
|
||||||
from .transport import MeshTransport
|
from .transport import MeshTransport
|
||||||
@@ -23,6 +24,65 @@ def _now_iso() -> str:
|
|||||||
return datetime.now(timezone.utc).isoformat()
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
class _Deduplicator:
|
||||||
|
"""Drops consecutive identical messages from the same sender within a time window."""
|
||||||
|
|
||||||
|
def __init__(self, window: float = 15.0) -> None:
|
||||||
|
self._window = window
|
||||||
|
self._seen: dict[str, tuple[str, float]] = {}
|
||||||
|
|
||||||
|
def is_duplicate(self, key: str, text: str) -> bool:
|
||||||
|
now = time.monotonic()
|
||||||
|
prev_text, prev_time = self._seen.get(key, ("", 0.0))
|
||||||
|
if text == prev_text and now - prev_time < self._window:
|
||||||
|
return True
|
||||||
|
self._seen[key] = (text, now)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _store_and_publish(
|
||||||
|
db_conn: sqlite3.Connection,
|
||||||
|
state: AppState,
|
||||||
|
public_key: str,
|
||||||
|
contact_name: str,
|
||||||
|
role: str,
|
||||||
|
text: str,
|
||||||
|
hidden_from_llm: bool,
|
||||||
|
) -> None:
|
||||||
|
db.add_message(db_conn, public_key, role, text, hidden_from_llm=hidden_from_llm)
|
||||||
|
state.publish("message", {
|
||||||
|
"public_key": public_key,
|
||||||
|
"contact_name": contact_name,
|
||||||
|
"role": role,
|
||||||
|
"content": text,
|
||||||
|
"created_at": _now_iso(),
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
async def _generate_reply(
|
||||||
|
registry: CommandRegistry,
|
||||||
|
llm: LLMClient,
|
||||||
|
db_conn: sqlite3.Connection,
|
||||||
|
public_key: str,
|
||||||
|
ctx: CommandContext,
|
||||||
|
text: str,
|
||||||
|
is_command: bool,
|
||||||
|
) -> tuple[str, CommandResult | None] | None:
|
||||||
|
"""Dispatch to command or LLM. Returns (reply, cmd_result), or None to abort."""
|
||||||
|
if is_command:
|
||||||
|
cmd_result = await registry.dispatch(ctx, text)
|
||||||
|
if cmd_result is None or cmd_result.reply is None:
|
||||||
|
return None
|
||||||
|
return cmd_result.reply, cmd_result
|
||||||
|
thinking = db.get_thinking_enabled(db_conn, public_key)
|
||||||
|
try:
|
||||||
|
reply = await llm.reply(db.get_history(db_conn, public_key), thinking=thinking)
|
||||||
|
except Exception:
|
||||||
|
log.exception("LLM call failed for %s", public_key[:12])
|
||||||
|
return None
|
||||||
|
return reply, None
|
||||||
|
|
||||||
|
|
||||||
def build_dm_handler(
|
def build_dm_handler(
|
||||||
*,
|
*,
|
||||||
db_conn: sqlite3.Connection,
|
db_conn: sqlite3.Connection,
|
||||||
@@ -34,9 +94,8 @@ def build_dm_handler(
|
|||||||
) -> Callable[[object], Awaitable[None]]:
|
) -> Callable[[object], Awaitable[None]]:
|
||||||
"""Return an ``on_dm(event)`` closure with all collaborators bound."""
|
"""Return an ``on_dm(event)`` closure with all collaborators bound."""
|
||||||
|
|
||||||
# One lock per sender so a burst of messages from the same peer is processed
|
|
||||||
# serially while different peers stay independent.
|
|
||||||
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
|
locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
|
||||||
|
dedup = _Deduplicator(window=15.0)
|
||||||
|
|
||||||
async def on_dm(event) -> None:
|
async def on_dm(event) -> None:
|
||||||
data = event.payload or {}
|
data = event.payload or {}
|
||||||
@@ -52,23 +111,18 @@ def build_dm_handler(
|
|||||||
|
|
||||||
public_key = contact["public_key"]
|
public_key = contact["public_key"]
|
||||||
contact_name = contact.get("adv_name", "")
|
contact_name = contact.get("adv_name", "")
|
||||||
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
|
|
||||||
|
|
||||||
# Decided up front so both turns of a command exchange are stored consistently
|
if dedup.is_duplicate(public_key, text):
|
||||||
# with hidden_from_llm. Commands and their replies stay out of LLM context;
|
log.info("dropping duplicate DM from %s (%s): %s", contact_name, public_key[:12], text)
|
||||||
# the web UI still shows them.
|
return
|
||||||
|
|
||||||
|
log.info("DM from %s (%s): %s", contact_name, public_key[:12], text)
|
||||||
is_command = registry.parse(text) is not None
|
is_command = registry.parse(text) is not None
|
||||||
|
|
||||||
async with locks[public_key]:
|
async with locks[public_key]:
|
||||||
db.upsert_conversation(db_conn, public_key, contact_name)
|
db.upsert_conversation(db_conn, public_key, contact_name)
|
||||||
db.add_message(db_conn, public_key, "user", text, hidden_from_llm=is_command)
|
_store_and_publish(db_conn, state, public_key, contact_name, "user", text,
|
||||||
state.publish("message", {
|
hidden_from_llm=is_command)
|
||||||
"public_key": public_key,
|
|
||||||
"contact_name": contact_name,
|
|
||||||
"role": "user",
|
|
||||||
"content": text,
|
|
||||||
"created_at": _now_iso(),
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx = CommandContext(
|
ctx = CommandContext(
|
||||||
db_conn=db_conn,
|
db_conn=db_conn,
|
||||||
@@ -78,36 +132,17 @@ def build_dm_handler(
|
|||||||
state=state,
|
state=state,
|
||||||
)
|
)
|
||||||
|
|
||||||
if is_command:
|
result = await _generate_reply(registry, llm, db_conn, public_key, ctx, text, is_command)
|
||||||
cmd_result = await registry.dispatch(ctx, text)
|
if result is None:
|
||||||
if cmd_result is None or cmd_result.reply is None:
|
|
||||||
return
|
|
||||||
reply = cmd_result.reply
|
|
||||||
else:
|
|
||||||
cmd_result = None
|
|
||||||
thinking = db.get_thinking_enabled(db_conn, public_key)
|
|
||||||
try:
|
|
||||||
reply = await llm.reply(
|
|
||||||
db.get_history(db_conn, public_key),
|
|
||||||
thinking=thinking,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
log.exception("LLM call failed for %s", public_key[:12])
|
|
||||||
return
|
return
|
||||||
|
reply, cmd_result = result
|
||||||
|
|
||||||
delivered = await transport.send_chunked(contact, reply, cfg.message.max_bytes)
|
delivered = await transport.send_chunked(contact, reply, cfg.message.max_bytes)
|
||||||
if not delivered:
|
if not delivered:
|
||||||
# Nothing made it onto the radio; don't persist anything
|
|
||||||
return
|
return
|
||||||
|
|
||||||
db.add_message(db_conn, public_key, "assistant", delivered, hidden_from_llm=is_command)
|
_store_and_publish(db_conn, state, public_key, contact_name, "assistant", delivered,
|
||||||
state.publish("message", {
|
hidden_from_llm=is_command)
|
||||||
"public_key": public_key,
|
|
||||||
"contact_name": contact_name,
|
|
||||||
"role": "assistant",
|
|
||||||
"content": delivered,
|
|
||||||
"created_at": _now_iso(),
|
|
||||||
})
|
|
||||||
|
|
||||||
if cmd_result is not None and cmd_result.after_send is not None:
|
if cmd_result is not None and cmd_result.after_send is not None:
|
||||||
await cmd_result.after_send(ctx)
|
await cmd_result.after_send(ctx)
|
||||||
|
|||||||
Reference in New Issue
Block a user