"""Matrix gateway adapter.

Connects to any Matrix homeserver (self-hosted or matrix.org) via the
mautrix Python SDK.  Supports optional end-to-end encryption (E2EE)
when installed with ``pip install "mautrix[encryption]"``.

Environment variables:
    MATRIX_HOMESERVER           Homeserver URL (e.g. https://matrix.example.org)
    MATRIX_ACCESS_TOKEN         Access token (preferred auth method)
    MATRIX_USER_ID              Full user ID (@bot:server) — required for password login
    MATRIX_PASSWORD             Password (alternative to access token)
    MATRIX_ENCRYPTION           Set "true" to enable E2EE
    MATRIX_E2EE_MODE            off | optional | required. Overrides MATRIX_ENCRYPTION
                                when set. Legacy MATRIX_ENCRYPTION=true maps to required.
    MATRIX_DEVICE_ID            Stable device ID for E2EE persistence across restarts
    MATRIX_PROXY                HTTP(S) or SOCKS proxy URL for Matrix traffic
    MATRIX_ALLOWED_USERS    Comma-separated Matrix user IDs (@user:server)
    MATRIX_ALLOWED_ROOMS    Comma-separated Matrix room IDs allowed to trigger turns
    MATRIX_HOME_ROOM        Room ID for cron/notification delivery
    MATRIX_REACTIONS        Set "false" to disable processing lifecycle reactions
                            (eyes/checkmark/cross). Default: true
    MATRIX_REQUIRE_MENTION      Require @mention in rooms (default: true)
    MATRIX_FREE_RESPONSE_ROOMS  Comma-separated room IDs exempt from mention requirement
                                (alias of matrix.free_response_rooms)
    MATRIX_ALLOWED_ROOMS    Comma-separated room IDs; if set, bot ONLY responds
                            in these rooms (whitelist, DMs exempt; alias of
                            matrix.allowed_rooms)
    MATRIX_IGNORE_USER_PATTERNS Comma-separated regular expressions for appservice /
                                bridge ghost user IDs to ignore
    MATRIX_PROCESS_NOTICES      Set "true" to process inbound m.notice events
                                (default: false)
    MATRIX_ALLOW_ROOM_MENTIONS  Allow outbound @room mentions to notify whole rooms
                                (default: false)
    MATRIX_TOOLS_ALLOW_REDACTION
                              Allow Matrix redaction tool execution (default: false)
    MATRIX_TOOLS_ALLOW_INVITES Allow Matrix invite tool execution (default: false)
    MATRIX_TOOLS_ALLOW_ROOM_CREATE
                              Allow Matrix room creation tool execution (default: false)
    MATRIX_AUTO_THREAD          Auto-create threads for room messages (default: true)
    MATRIX_DM_AUTO_THREAD       Auto-create threads for DM messages (default: false)
    MATRIX_RECOVERY_KEY         Recovery key for cross-signing verification after device key rotation
    MATRIX_DM_MENTION_THREADS   Create a thread when bot is @mentioned in a DM (default: false)
    MATRIX_ALLOW_PUBLIC_ROOMS   Allow Matrix tools to create public rooms (default: false)
    MATRIX_APPROVAL_REQUIRE_SENDER
                              Require reaction controls to come from the original requester
                              when requester metadata is available (default: true)
    MATRIX_APPROVAL_TIMEOUT_SECONDS
                              Reaction approval/model-picker timeout (default: 300)
"""

from __future__ import annotations

import asyncio
import inspect
import logging
import mimetypes
import os
import re
import time
from urllib.parse import urlsplit, urlunsplit
from dataclasses import dataclass, field

from html import escape as _html_escape
from html.parser import HTMLParser
from pathlib import Path
from typing import Any, Dict, Optional, Set

try:
    from mautrix.types import (
        ContentURI,
        EventID,
        EventType,
        PaginationDirection,
        PresenceState,
        RoomCreatePreset,
        RoomID,
        SyncToken,
        TrustState,
        UserID,
    )
except ImportError:
    # Stubs so the module is importable without mautrix installed.
    # check_matrix_requirements() will return False and the adapter
    # won't be instantiated in production, but tests may exercise
    # adapter methods so stubs must have the right attributes.
    ContentURI = EventID = RoomID = SyncToken = UserID = str  # type: ignore[misc,assignment]

    class _EventTypeStub:  # type: ignore[no-redef]
        ROOM_MESSAGE = "m.room.message"
        REACTION = "m.reaction"
        ROOM_ENCRYPTED = "m.room.encrypted"
        ROOM_NAME = "m.room.name"

    EventType = _EventTypeStub  # type: ignore[misc,assignment]

    class _PaginationDirectionStub:  # type: ignore[no-redef]
        BACKWARD = "b"
        FORWARD = "f"

    PaginationDirection = _PaginationDirectionStub  # type: ignore[misc,assignment]

    class _PresenceStateStub:  # type: ignore[no-redef]
        ONLINE = "online"
        OFFLINE = "offline"
        UNAVAILABLE = "unavailable"

    PresenceState = _PresenceStateStub  # type: ignore[misc,assignment]

    class _RoomCreatePresetStub:  # type: ignore[no-redef]
        PRIVATE = "private_chat"
        PUBLIC = "public_chat"
        TRUSTED_PRIVATE = "trusted_private_chat"

    RoomCreatePreset = _RoomCreatePresetStub  # type: ignore[misc,assignment]

    class _TrustStateStub:  # type: ignore[no-redef]
        UNVERIFIED = 0
        VERIFIED = 1

    TrustState = _TrustStateStub  # type: ignore[misc,assignment]

from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
    BasePlatformAdapter,
    MessageEvent,
    MessageType,
    ProcessingOutcome,
    SendResult,
    resolve_proxy_url,
    proxy_kwargs_for_aiohttp,
)
from gateway.platforms.helpers import ThreadParticipationTracker

logger = logging.getLogger(__name__)

_MATRIX_BANG_COMMAND_RE = re.compile(
    r"^!([A-Za-z][A-Za-z0-9_-]*)(?=$|\s)(.*)$",
    re.DOTALL,
)


def _resolve_matrix_bang_command(name: str) -> str | None:
    """Resolve a ``!command`` token to a dispatchable Hermes command token.

    Matrix clients often reserve leading ``/`` for local client commands.
    Hermes accepts ``!command`` as a Matrix-friendly alias, but only for
    commands that the gateway can actually dispatch so ordinary exclamations
    remain normal chat text.

    Returns the token form that actually resolves (which may differ from
    *name* only by underscore→hyphen normalization, e.g. ``reload_skills`` →
    ``reload-skills``) so the emitted ``/command`` always resolves downstream,
    or ``None`` when *name* is not a known command. Aliases are intentionally
    left as-is — the gateway dispatcher resolves them to their canonical name.
    """
    if not name:
        return None
    # Try the raw lowercased token first, then its hyphenated variant, so
    # forms like ``!reload_skills`` resolve against ``reload-skills``. We emit
    # whichever candidate resolved (not a forced canonical form) to preserve
    # alias passthrough — the gateway dispatcher canonicalizes aliases itself.
    candidates = [name.lower()]
    hyphenated = name.lower().replace("_", "-")
    if hyphenated != candidates[0]:
        candidates.append(hyphenated)

    try:
        from hermes_cli.commands import is_gateway_known_command

        for candidate in candidates:
            if is_gateway_known_command(candidate):
                return candidate
    except Exception:
        logger.debug(
            "Matrix: is_gateway_known_command failed for %r", name, exc_info=True
        )

    try:
        from agent.skill_commands import get_skill_commands

        skill_commands = get_skill_commands() or {}
        # Skill command keys are stored slash-prefixed (e.g. "/arxiv"), so
        # compare against the "/candidate" form, not the bare token.
        for candidate in candidates:
            if f"/{candidate}" in skill_commands:
                return candidate
    except Exception:
        logger.debug("Matrix: get_skill_commands failed for %r", name, exc_info=True)

    return None


def _normalize_matrix_bang_command(text: str) -> str:
    """Convert Matrix ``!command`` aliases to normal Hermes ``/command`` text."""
    if not text or not text.startswith("!"):
        return text
    match = _MATRIX_BANG_COMMAND_RE.match(text)
    if not match:
        return text
    resolved = _resolve_matrix_bang_command(match.group(1))
    if resolved is None:
        return text
    return f"/{resolved}{match.group(2) or ''}"


class _MatrixHtmlSanitizer(HTMLParser):
    """Allowlist sanitizer for Matrix-compatible formatted HTML."""

    _ALLOWED_TAGS = {
        "a", "b", "blockquote", "br", "code", "del", "em", "h1", "h2", "h3",
        "h4", "h5", "h6", "hr", "i", "li", "ol", "p", "pre", "s", "strike",
        "strong", "ul",
    }
    _VOID_TAGS = {"br", "hr"}

    def __init__(self) -> None:
        super().__init__(convert_charrefs=False)
        self._parts: list[str] = []
        self._skip_depth = 0

    @staticmethod
    def _safe_url(value: str) -> str:
        stripped = re.sub(r"[\x00-\x1f\x7f]+", "", value or "").strip()
        match = re.match(r"^([A-Za-z][A-Za-z0-9+.-]*):", stripped)
        scheme = match.group(1).lower() if match else ""
        if scheme and scheme not in {"http", "https", "matrix", "mailto"}:
            return ""
        return stripped

    def _safe_attrs(self, tag: str, attrs: list[tuple[str, str | None]]) -> str:
        safe: list[str] = []
        for key, value in attrs:
            attr = str(key or "").lower()
            raw_value = "" if value is None else str(value)
            if attr.startswith("on"):
                continue
            if tag == "a" and attr == "href":
                href = self._safe_url(raw_value)
                if href:
                    safe.append(f' href="{_html_escape(href, quote=True)}"')
            elif tag == "code" and attr == "class":
                if re.fullmatch(r"language-[A-Za-z0-9_+.-]{1,64}", raw_value):
                    safe.append(f' class="{_html_escape(raw_value, quote=True)}"')
        return "".join(safe)

    def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
        tag = tag.lower()
        if tag in {"script", "style"}:
            self._skip_depth += 1
            return
        if self._skip_depth:
            return
        if tag not in self._ALLOWED_TAGS:
            return
        if tag in self._VOID_TAGS:
            self._parts.append(f"<{tag}>")
            return
        self._parts.append(f"<{tag}{self._safe_attrs(tag, attrs)}>")

    def handle_endtag(self, tag: str) -> None:
        tag = tag.lower()
        if tag in {"script", "style"} and self._skip_depth:
            self._skip_depth -= 1
            return
        if self._skip_depth or tag not in self._ALLOWED_TAGS or tag in self._VOID_TAGS:
            return
        self._parts.append(f"</{tag}>")

    def handle_data(self, data: str) -> None:
        if not self._skip_depth:
            self._parts.append(_html_escape(data))

    def handle_entityref(self, name: str) -> None:
        if not self._skip_depth:
            self._parts.append(f"&{name};")

    def handle_charref(self, name: str) -> None:
        if not self._skip_depth:
            self._parts.append(f"&#{name};")

    def get_html(self) -> str:
        return "".join(self._parts)


@dataclass(frozen=True)
class MatrixRoomIdentity:
    """Resolved Matrix room identity for routing and prompt context."""

    room_id: str
    room_name: str | None
    room_topic: str | None
    canonical_alias: str | None
    server_name: str | None
    joined_member_count: int | None
    is_direct_account_data: bool
    display_name: str
    has_explicit_name: bool
    chat_type: str
    conflict: bool = False


@dataclass
class _MatrixApprovalPrompt:
    """Tracks a pending Matrix reaction-based exec approval prompt."""

    def __init__(
        self,
        session_key: str,
        chat_id: str,
        message_id: str,
        resolved: bool = False,
        requester_user_id: str | None = None,
        expires_at: float | None = None,
    ):
        self.session_key = session_key
        self.chat_id = chat_id
        self.message_id = message_id
        self.resolved = resolved
        self.requester_user_id = requester_user_id
        self.expires_at = expires_at
        self.bot_reaction_events: dict[str, str] = {}  # emoji -> event_id


@dataclass
class _MatrixModelPickerPrompt:
    """Tracks a pending Matrix reaction-based model picker prompt."""

    chat_id: str
    message_id: str
    session_key: str
    choices: dict[str, tuple[str, str]]
    on_model_selected: Any
    requester_user_id: str | None = None
    expires_at: float | None = None
    resolved: bool = False
    bot_reaction_events: dict[str, str] = field(default_factory=dict)


# Matrix message size limit (4000 chars practical, spec has no hard limit
# but clients render poorly above this).
MAX_MESSAGE_LENGTH = 4000

# Store directory for E2EE keys and sync state.
# Uses get_hermes_home() so each profile gets its own Matrix store.
from hermes_constants import get_hermes_dir as _get_hermes_dir

_STORE_DIR = _get_hermes_dir("platforms/matrix/store", "matrix/store")
_CRYPTO_DB_PATH = _STORE_DIR / "crypto.db"

# Grace period: ignore messages older than this many seconds before startup.
_STARTUP_GRACE_SECONDS = 5

_OUTBOUND_MENTION_RE = re.compile(
    r"(?<![\w/])(@[0-9A-Za-z._=/-]+:[0-9A-Za-z.-]+(?::\d+)?)"
)

_E2EE_INSTALL_HINT = (
    "Install with: pip install 'mautrix[encryption]' asyncpg aiosqlite  "
    "(requires libolm C library)"
)

_MATRIX_IMAGE_FILENAME_EXTS = frozenset({
    ".jpg",
    ".jpeg",
    ".png",
    ".gif",
    ".webp",
    ".bmp",
    ".svg",
    ".heic",
    ".heif",
    ".avif",
})

_MATRIX_MODEL_PICKER_REACTIONS = (
    "1\ufe0f\u20e3",
    "2\ufe0f\u20e3",
    "3\ufe0f\u20e3",
    "4\ufe0f\u20e3",
    "5\ufe0f\u20e3",
    "6\ufe0f\u20e3",
    "7\ufe0f\u20e3",
    "8\ufe0f\u20e3",
    "9\ufe0f\u20e3",
    "\U0001f51f",
)

_MATRIX_CAPABILITIES: Dict[str, str] = {
    "text": "yes",
    "threads": "yes",
    "reactions": "yes",
    "approvals": "yes",
    "model picker": "yes",
    "thinking panes": "yes",
    "images": "yes",
    "multiple images": "yes",
    "files": "yes",
    "voice/audio": "yes",
    "video": "yes",
    "E2EE": "off / optional / required",
    "diagnostics": "yes",
}


def get_matrix_capabilities() -> Dict[str, str]:
    """Return Matrix gateway capabilities for docs and release checks."""
    return dict(_MATRIX_CAPABILITIES)


def _looks_like_matrix_image_filename(text: str) -> bool:
    """Return True when Matrix image body text is probably just a transport filename.

    Matrix ``m.image`` events commonly populate ``content.body`` with the uploaded
    filename when the user did not add a caption. Treating that raw filename as
    user-authored text confuses downstream vision enrichment.
    """
    candidate = str(text or "").strip()
    if not candidate or "\n" in candidate or candidate.endswith("/"):
        return False

    name = Path(candidate).name
    if not name or name != candidate:
        return False

    suffix = Path(name).suffix.lower()
    if not suffix:
        return False

    guessed_type, _ = mimetypes.guess_type(name)
    if guessed_type and guessed_type.startswith("image/"):
        return True
    return suffix in _MATRIX_IMAGE_FILENAME_EXTS


def _matrix_event_timestamp_seconds(event: Any) -> float:
    """Return a Matrix event timestamp in seconds, accepting ms or sec values."""
    raw_ts = (
        getattr(event, "timestamp", None)
        or getattr(event, "server_timestamp", None)
        or 0
    )
    if not raw_ts:
        return 0.0
    try:
        ts = float(raw_ts)
    except (TypeError, ValueError):
        return 0.0
    # Matrix origin_server_ts is milliseconds. Some tests/fakes and SDK objects
    # expose seconds; do not turn those into 1970-era timestamps.
    if ts > 10_000_000_000:
        return ts / 1000.0
    return ts


def _create_matrix_session(proxy_url: str | None):
    """Create an ``aiohttp.ClientSession`` whose proxy applies to *all* requests.

    mautrix's ``HTTPAPI._send()`` calls ``session.request()`` without forwarding
    per-request ``proxy=`` kwargs.  For HTTP(S) proxies we use aiohttp's native
    ``proxy=`` session parameter which sets a default for every request.  For SOCKS
    we use ``aiohttp_socks.ProxyConnector`` (connector-level).
    When no proxy is configured we enable ``trust_env`` so standard env vars
    (``HTTP_PROXY`` / ``HTTPS_PROXY``) are honoured automatically.
    """
    import aiohttp

    if not proxy_url:
        return aiohttp.ClientSession(trust_env=True)

    if proxy_url.split("://")[0].lower().startswith("socks"):
        try:
            from aiohttp_socks import ProxyConnector

            return aiohttp.ClientSession(
                connector=ProxyConnector.from_url(proxy_url, rdns=True),
            )
        except ImportError:
            logger.warning(
                "aiohttp_socks not installed — SOCKS proxy %s ignored. "
                "Run: pip install aiohttp-socks",
                proxy_url,
            )
            return aiohttp.ClientSession(trust_env=True)

    return aiohttp.ClientSession(proxy=proxy_url)


def _check_e2ee_deps() -> bool:
    """Return True if mautrix E2EE dependencies are available.

    Verifies python-olm (via mautrix.crypto.OlmMachine), the SQLite crypto
    store backend (mautrix.crypto.store.asyncpg.PgCryptoStore — yes, the
    PgCryptoStore class also drives the sqlite backend in mautrix 0.21),
    and the database drivers actually used at connect time (``asyncpg`` for
    the underlying upgrade_table machinery, ``aiosqlite`` for the
    ``sqlite:///`` URL we pass to ``Database.create``).  Without all four,
    encrypted rooms fail at connect time with a confusing
    ``No module named 'asyncpg'`` (#31116).
    """
    try:
        from mautrix.crypto import OlmMachine  # noqa: F401
        from mautrix.crypto.store.asyncpg import PgCryptoStore  # noqa: F401
        import asyncpg  # noqa: F401
        import aiosqlite  # noqa: F401

        return True
    except (ImportError, AttributeError):
        return False


def _normalize_e2ee_mode(value: Any) -> str:
    """Normalize Matrix E2EE mode to off/optional/required."""
    raw = str(value or "").strip().lower()
    if raw in ("required", "require", "true", "1", "yes", "on"):
        return "required"
    if raw in ("optional", "prefer", "preferred"):
        return "optional"
    return "off"


def _resolve_e2ee_mode(extra: Optional[Dict[str, Any]] = None) -> str:
    """Resolve E2EE mode with MATRIX_ENCRYPTION backwards compatibility."""
    extra = extra or {}
    explicit = extra.get("e2ee_mode") or os.getenv("MATRIX_E2EE_MODE", "")
    if explicit:
        return _normalize_e2ee_mode(explicit)
    legacy_enabled = extra.get(
        "encryption",
        os.getenv("MATRIX_ENCRYPTION", "").lower() in ("true", "1", "yes"),
    )
    return "required" if legacy_enabled else "off"


def _redact_matrix_value(value: Any) -> str:
    """Return a safe, non-reversible preview for Matrix diagnostics."""
    text = str(value or "").strip()
    if not text:
        return ""
    return "***"


def _write_matrix_recovery_key_output_file(recovery_key: str) -> Optional[Path]:
    """Write a generated Matrix recovery key to an operator-chosen file.

    The file is created with mode 0600 and never overwritten. Returns the path
    when written, otherwise None.
    """
    output_file = os.getenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", "").strip()
    if not output_file:
        return None
    path = Path(output_file).expanduser()
    path.parent.mkdir(parents=True, exist_ok=True)
    flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
    fd = os.open(path, flags, 0o600)
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as fh:
            fh.write(recovery_key)
            fh.write("\n")
    except Exception:
        try:
            os.close(fd)
        except OSError:
            pass
        raise
    return path


def _get_matrix_recovery_key_output_target() -> tuple[Optional[Path], str]:
    """Return a usable one-time recovery-key output path, or a redacted reason."""
    output_file = os.getenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", "").strip()
    if not output_file:
        return None, "not_configured"
    path = Path(output_file).expanduser()
    if path.exists():
        return None, "exists"
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
    except Exception as exc:
        return None, f"unusable: {exc}"
    return path, ""


def _handle_generated_matrix_recovery_key(mxid: str, recovery_key: str) -> None:
    """Handle a freshly generated Matrix recovery key without logging it."""
    try:
        output_path = _write_matrix_recovery_key_output_file(recovery_key)
    except FileExistsError:
        logger.warning(
            "Matrix: bootstrapped cross-signing for %s. Recovery key output file "
            "already exists; refusing to overwrite. Store the generated key "
            "securely and set MATRIX_RECOVERY_KEY for future restarts.",
            mxid,
        )
        return
    except Exception as exc:
        logger.warning(
            "Matrix: bootstrapped cross-signing for %s, but failed to write "
            "MATRIX_RECOVERY_KEY_OUTPUT_FILE: %s. Store the generated key "
            "securely and set MATRIX_RECOVERY_KEY for future restarts.",
            mxid,
            exc,
        )
        return

    if output_path:
        logger.warning(
            "Matrix: bootstrapped cross-signing for %s. A new recovery key was "
            "written to %s with mode 0600. Move it to your secret store and set "
            "MATRIX_RECOVERY_KEY for future restarts.",
            mxid,
            output_path,
        )
    else:
        logger.warning(
            "Matrix: bootstrapped cross-signing for %s. A new recovery key was "
            "generated but will not be logged. Set MATRIX_RECOVERY_KEY_OUTPUT_FILE "
            "to write it once with mode 0600, or configure MATRIX_RECOVERY_KEY "
            "from your Matrix client before future restarts.",
            mxid,
        )


def _sanitize_matrix_html(html: str) -> str:
    sanitizer = _MatrixHtmlSanitizer()
    try:
        sanitizer.feed(html or "")
        sanitizer.close()
        return sanitizer.get_html()
    except Exception:
        return _html_escape(html or "")


def _redact_url_for_log(url: str) -> str:
    """Strip query/fragment from URLs before logging signed media links."""
    try:
        parts = urlsplit(str(url))
        if not parts.scheme and not parts.netloc:
            return str(url).split("?", 1)[0].split("#", 1)[0]
        return urlunsplit((parts.scheme, parts.netloc, parts.path, "", ""))
    except Exception:
        return "<url>"


def _pre_sanitize_matrix_markdown(text: str) -> str:
    """Remove unsafe raw HTML before Markdown conversion can escape it."""
    result = re.sub(
        r"(?is)<\s*(script|style)\b[^>]*>.*?<\s*/\s*\1\s*>",
        "",
        text or "",
    )
    result = re.sub(
        r"""(?is)\s+on[a-z0-9_-]+\s*=\s*("[^"]*"|'[^']*'|[^\s>]+)""",
        "",
        result,
    )
    result = re.sub(
        r"""(?is)\s+(href|src)\s*=\s*("[^"]*(?:javascript|data|vbscript):[^"]*"|'[^']*(?:javascript|data|vbscript):[^']*'|[^\s>]*(?:javascript|data|vbscript):[^\s>]*)""",
        "",
        result,
    )
    return result


def check_matrix_requirements() -> bool:
    """Return True if the Matrix adapter can be used.

    Lazy-installs the full ``platform.matrix`` feature group via
    ``tools.lazy_deps.ensure_and_bind`` whenever any of the declared
    packages (mautrix, Markdown, aiosqlite, asyncpg, aiohttp-socks) is
    missing — not just mautrix itself.  Previously this short-circuited on
    ``import mautrix``, which left the other four packages uninstalled
    forever and broke E2EE connect with ``No module named 'asyncpg'``
    (#31116).  Rebinds module-level type globals on success.
    """
    token = os.getenv("MATRIX_ACCESS_TOKEN", "")
    password = os.getenv("MATRIX_PASSWORD", "")
    homeserver = os.getenv("MATRIX_HOMESERVER", "")

    if not token and not password:
        logger.debug("Matrix: neither MATRIX_ACCESS_TOKEN nor MATRIX_PASSWORD set")
        return False
    if not homeserver:
        logger.warning("Matrix: MATRIX_HOMESERVER not set")
        return False

    # Check whether any package in the platform.matrix feature group is
    # missing.  ``feature_missing`` is cheap (per-spec importlib.metadata
    # lookups) and correctly handles ``mautrix[encryption]`` by stripping
    # the extras marker before checking the bare package.
    try:
        from tools.lazy_deps import feature_missing, ensure_and_bind
        missing = feature_missing("platform.matrix")
    except Exception as exc:  # pragma: no cover — defensive
        logger.debug("Matrix: lazy_deps lookup failed: %s", exc)
        missing = ()
        ensure_and_bind = None  # type: ignore[assignment]

    if missing or ensure_and_bind is None:
        def _import():
            from mautrix.types import (
                ContentURI, EventID, EventType, PaginationDirection,
                PresenceState, RoomCreatePreset, RoomID, SyncToken,
                TrustState, UserID,
            )
            return {
                "ContentURI": ContentURI,
                "EventID": EventID,
                "EventType": EventType,
                "PaginationDirection": PaginationDirection,
                "PresenceState": PresenceState,
                "RoomCreatePreset": RoomCreatePreset,
                "RoomID": RoomID,
                "SyncToken": SyncToken,
                "TrustState": TrustState,
                "UserID": UserID,
            }

        if ensure_and_bind is None:
            return False
        if not ensure_and_bind("platform.matrix", _import, globals(), prompt=False):
            logger.warning(
                "Matrix: required packages not installed (%s). "
                "Run: pip install 'mautrix[encryption]' asyncpg aiosqlite "
                "Markdown aiohttp-socks",
                ", ".join(missing) if missing else "platform.matrix",
            )
            return False

    e2ee_mode = _resolve_e2ee_mode()
    if e2ee_mode == "required" and not _check_e2ee_deps():
        logger.error(
            "Matrix: E2EE is required but dependencies are missing. %s. "
            "Without this, encrypted rooms will not work. "
            "Set MATRIX_E2EE_MODE=off to disable E2EE.",
            _E2EE_INSTALL_HINT,
        )
        return False
    if e2ee_mode == "optional" and not _check_e2ee_deps():
        logger.warning(
            "Matrix: E2EE optional but dependencies are missing. %s",
            _E2EE_INSTALL_HINT,
        )

    return True


class _CryptoStateStore:
    """Adapter that satisfies the mautrix crypto StateStore interface.

    OlmMachine requires a StateStore with ``is_encrypted``,
    ``get_encryption_info``, and ``find_shared_rooms``.  The basic
    ``MemoryStateStore`` from ``mautrix.client`` doesn't implement these,
    so we provide simple implementations that consult the client's room
    state.
    """

    def __init__(self, client_state_store: Any, joined_rooms: set):
        self._ss = client_state_store
        self._joined_rooms = joined_rooms

    async def is_encrypted(self, room_id: str) -> bool:
        return (await self.get_encryption_info(room_id)) is not None

    async def get_encryption_info(self, room_id: str):
        if hasattr(self._ss, "get_encryption_info"):
            return await self._ss.get_encryption_info(room_id)
        return None

    async def find_shared_rooms(self, user_id: str) -> list:
        # Return all joined rooms — simple but correct for a single-user bot.
        return list(self._joined_rooms)


class MatrixAdapter(BasePlatformAdapter):
    """Gateway adapter for Matrix (any homeserver)."""

    supports_code_blocks = True  # Matrix renders fenced code blocks (HTML/markdown)

    # Matrix clients commonly reserve typed "/" for client-local commands;
    # the adapter accepts "!command" as the alias that always reaches Hermes
    # (see _normalize_matrix_bang_command), so instruction text shows "!".
    typed_command_prefix = "!"

    # Threshold for detecting Matrix client-side message splits.
    # When a chunk is near the ~4000-char practical limit, a continuation
    # is almost certain.
    _SPLIT_THRESHOLD = 3900

    def __init__(self, config: PlatformConfig):
        super().__init__(config, Platform.MATRIX)

        self._homeserver: str = (
            config.extra.get("homeserver", "") or os.getenv("MATRIX_HOMESERVER", "")
        ).rstrip("/")
        self._access_token: str = config.token or os.getenv("MATRIX_ACCESS_TOKEN", "")
        self._user_id: str = config.extra.get("user_id", "") or os.getenv(
            "MATRIX_USER_ID", ""
        )
        self._password: str = config.extra.get("password", "") or os.getenv(
            "MATRIX_PASSWORD", ""
        )
        self._e2ee_mode: str = _resolve_e2ee_mode(config.extra)
        self._encryption: bool = self._e2ee_mode != "off"
        self._device_id: str = config.extra.get("device_id", "") or os.getenv(
            "MATRIX_DEVICE_ID", ""
        )

        self._client: Any = None  # mautrix.client.Client
        self._crypto_db: Any = None  # mautrix.util.async_db.Database
        self._sync_task: Optional[asyncio.Task] = None
        self._closing = False
        self._startup_ts: float = 0.0
        # Clock-skew detection: count grace-check drops that happen well
        # after startup (i.e. not initial-sync backfill).  If the host's
        # system clock is set ahead of real time, the startup grace check
        # `event_ts < startup_ts - 5` silently drops every live message.
        # See #12614 — the symptom is "bot joins rooms but never replies".
        # Drops only count when their skew matches the first sampled drop
        # (within 60s), so varied-age backfill from freshly-invited rooms
        # doesn't trip the heuristic.
        self._late_grace_drops: int = 0
        self._late_grace_skew: float = 0.0
        self._clock_skew_warned: bool = False
        self._last_sync_ts: float = 0.0

        # Cache: room_id → bool (is DM)
        self._dm_rooms: Dict[str, bool] = {}
        self._room_identities: Dict[str, MatrixRoomIdentity] = {}
        self._room_identity_cached_at: Dict[str, float] = {}
        try:
            self._room_identity_ttl_seconds = float(
                os.getenv("MATRIX_ROOM_IDENTITY_TTL_SECONDS", "60")
            )
        except ValueError:
            self._room_identity_ttl_seconds = 60.0
        self._room_identity_cache_max = 256
        # Set of room IDs we've joined
        self._joined_rooms: Set[str] = set()
        # Event deduplication (bounded deque keeps newest entries)
        from collections import deque

        self._processed_events: deque = deque(maxlen=1000)
        self._processed_events_set: set = set()

        # Buffer for undecrypted events pending key receipt.
        # Each entry: (room_id, event, timestamp)

        # Thread participation tracking (for require_mention bypass)
        self._threads = ThreadParticipationTracker("matrix")

        # Mention/thread gating — parsed once from config.extra or env vars.
        self._require_mention: bool = self._parse_require_mention(config)
        self._thread_require_mention: bool = self._parse_thread_require_mention(config)
        free_rooms_raw = config.extra.get("free_response_rooms")
        if free_rooms_raw is None:
            free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
        if isinstance(free_rooms_raw, list):
            self._free_rooms: Set[str] = {
                str(r).strip() for r in free_rooms_raw if str(r).strip()
            }
        else:
            self._free_rooms: Set[str] = {
                r.strip() for r in str(free_rooms_raw).split(",") if r.strip()
            }
        # If non-empty, bot ONLY responds in these rooms (whitelist); DMs exempt.
        allowed_rooms_raw = config.extra.get("allowed_rooms")
        if allowed_rooms_raw is None:
            allowed_rooms_raw = os.getenv("MATRIX_ALLOWED_ROOMS", "")
        if isinstance(allowed_rooms_raw, list):
            self._allowed_rooms: Set[str] = {
                str(r).strip() for r in allowed_rooms_raw if str(r).strip()
            }
        else:
            self._allowed_rooms: Set[str] = {
                r.strip() for r in str(allowed_rooms_raw).split(",") if r.strip()
            }
        self._allow_room_mentions: bool = os.getenv(
            "MATRIX_ALLOW_ROOM_MENTIONS", "false"
        ).lower() in ("true", "1", "yes")
        self._auto_thread: bool = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in (
            "true",
            "1",
            "yes",
        )
        self._dm_auto_thread: bool = os.getenv(
            "MATRIX_DM_AUTO_THREAD", "false"
        ).lower() in {"true", "1", "yes"}
        self._dm_mention_threads: bool = os.getenv(
            "MATRIX_DM_MENTION_THREADS", "false"
        ).lower() in ("true", "1", "yes")
        raw_session_scope = os.getenv("MATRIX_SESSION_SCOPE", "auto").strip().lower()
        self._matrix_session_scope = (
            raw_session_scope if raw_session_scope in {"auto", "room", "thread"} else "auto"
        )
        self._process_notices: bool = os.getenv(
            "MATRIX_PROCESS_NOTICES", "false"
        ).lower() in ("true", "1", "yes")

        # Reactions: configurable via MATRIX_REACTIONS (default: true).
        self._reactions_enabled: bool = os.getenv(
            "MATRIX_REACTIONS", "true"
        ).lower() not in {"false", "0", "no"}
        self._pending_reactions: dict[tuple[str, str], str] = {}
        # Delay before redacting reactions so Matrix homeservers have time to
        # deliver the final message event without tripping "missing event"
        # errors in some clients.  5s is empirically safe; not user-tunable —
        # if that changes, add a config.yaml entry rather than an env var.
        self._reaction_redaction_delay_seconds = 5.0
        self._reaction_redaction_tasks: Set[asyncio.Task] = set()

        # Proxy support — resolve once at init, reuse for all HTTP traffic.
        self._proxy_url: str | None = resolve_proxy_url(platform_env_var="MATRIX_PROXY")
        if self._proxy_url:
            logger.info("Matrix: proxy configured — %s", self._proxy_url)
        try:
            self._max_media_bytes = int(os.getenv("MATRIX_MAX_MEDIA_BYTES", str(100 * 1024 * 1024)))
        except ValueError:
            self._max_media_bytes = 100 * 1024 * 1024

        # Text batching: merge rapid successive messages (Telegram-style).
        # Matrix clients split long messages around 4000 chars.
        self._text_batch_delay_seconds = float(
            os.getenv("HERMES_MATRIX_TEXT_BATCH_DELAY_SECONDS", "0.6")
        )
        self._text_batch_split_delay_seconds = float(
            os.getenv("HERMES_MATRIX_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")
        )
        self._pending_text_batches: Dict[str, MessageEvent] = {}
        self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}

        # Matrix reaction-based dangerous command approvals.
        self._approval_reaction_map = {
            "✅": "once",
            "♾️": "always",
            "♾": "always",
            "\u267e\ufe0f": "always",
            "\u267e": "always",
            "❌": "deny",
            "❎": "deny",
        }
        self._approval_prompts_by_event: Dict[str, _MatrixApprovalPrompt] = {}
        self._approval_prompt_by_session: Dict[str, str] = {}
        self._approval_require_sender: bool = os.getenv(
            "MATRIX_APPROVAL_REQUIRE_SENDER", "true"
        ).lower() in ("true", "1", "yes")
        try:
            self._approval_timeout_seconds = int(
                os.getenv("MATRIX_APPROVAL_TIMEOUT_SECONDS", "300")
            )
        except ValueError:
            self._approval_timeout_seconds = 300
        self._model_picker_prompts_by_event: Dict[str, _MatrixModelPickerPrompt] = {}
        allowed_users_raw = os.getenv("MATRIX_ALLOWED_USERS", "")
        self._allowed_user_ids: Set[str] = {
            u.strip() for u in allowed_users_raw.split(",") if u.strip()
        }
        self._allowed_room_ids: Set[str] = set(self._allowed_rooms)
        ignore_patterns_raw = os.getenv("MATRIX_IGNORE_USER_PATTERNS", "")
        self._ignored_user_patterns: list[re.Pattern[str]] = []
        for pattern in (p.strip() for p in ignore_patterns_raw.split(",") if p.strip()):
            try:
                self._ignored_user_patterns.append(re.compile(pattern))
            except re.error as exc:
                logger.warning(
                    "Matrix: ignoring invalid MATRIX_IGNORE_USER_PATTERNS entry %r: %s",
                    pattern,
                    exc,
                )

    def _is_duplicate_event(self, event_id) -> bool:
        """Return True if this event was already processed. Tracks the ID otherwise."""
        if not event_id:
            return False
        if event_id in self._processed_events_set:
            return True
        if len(self._processed_events) == self._processed_events.maxlen:
            evicted = self._processed_events[0]
            self._processed_events_set.discard(evicted)
        self._processed_events.append(event_id)
        self._processed_events_set.add(event_id)
        return False

    @staticmethod
    def _parse_require_mention(config) -> bool:
        """Parse require_mention from config.extra or env var.

        Handles both YAML booleans and string values (``\"true\"``, ``\"false\"``,
        ``\"yes\"``, ``\"no\"``, ``\"on\"``, ``\"off\"``, ``\"1\"``, ``\"0\"``).
        Falls back to ``MATRIX_REQUIRE_MENTION`` env var, default ``true``.
        """
        configured = config.extra.get("require_mention")
        if configured is not None:
            if isinstance(configured, bool):
                return configured
            if isinstance(configured, str):
                return configured.lower() not in {"false", "0", "no", "off"}
            return bool(configured)
        return os.getenv(
            "MATRIX_REQUIRE_MENTION", "true"
        ).lower() not in {"false", "0", "no", "off"}

    @staticmethod
    def _parse_thread_require_mention(config) -> bool:
        """Parse thread_require_mention from config.extra or env var.

        Handles both YAML booleans and string values (``\"true\"``, ``\"false\"``,
        ``\"yes\"``, ``\"no\"``, ``\"on\"``, ``\"off\"``, ``\"1\"``, ``\"0\"``).
        Falls back to ``MATRIX_THREAD_REQUIRE_MENTION`` env var, default ``false``.
        Mirrors Discord adapter's parsing pattern.
        """
        configured = config.extra.get("thread_require_mention")
        if configured is not None:
            if isinstance(configured, bool):
                return configured
            if isinstance(configured, str):
                return configured.lower() not in {"false", "0", "no", "off"}
            # int, float, etc. — truthiness fallback
            return bool(configured)
        return os.getenv(
            "MATRIX_THREAD_REQUIRE_MENTION", "false"
        ).lower() in {"true", "1", "yes", "on"}

    # ------------------------------------------------------------------
    # E2EE helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _extract_server_ed25519(device_keys_obj: Any) -> Optional[str]:
        """Extract the ed25519 identity key from a DeviceKeys object."""
        for kid, kval in (getattr(device_keys_obj, "keys", {}) or {}).items():
            if str(kid).startswith("ed25519:"):
                return str(kval)
        return None

    async def _reverify_keys_after_upload(
        self, client: Any, local_ed25519: str
    ) -> bool:
        """Re-query the server after share_keys() and verify our ed25519 key matches."""
        try:
            resp = await client.query_keys({client.mxid: [client.device_id]})
            dk = getattr(resp, "device_keys", {}) or {}
            ud = dk.get(str(client.mxid)) or {}
            dev = ud.get(str(client.device_id))
            if dev:
                server_ed = self._extract_server_ed25519(dev)
                if server_ed != local_ed25519:
                    logger.error(
                        "Matrix: device %s has immutable identity keys that "
                        "don't match this installation. Generate a new access "
                        "token with a fresh device.",
                        client.device_id,
                    )
                    return False
        except Exception as exc:
            logger.error("Matrix: post-upload key verification failed: %s", exc, exc_info=True)
            return False
        return True

    async def _verify_device_keys_on_server(self, client: Any, olm: Any) -> bool:
        """Verify our device keys are on the homeserver after loading crypto state.

        Returns True if keys are valid or were successfully re-uploaded.
        Returns False if verification fails (caller should refuse E2EE).
        """
        try:
            resp = await client.query_keys({client.mxid: [client.device_id]})
        except Exception as exc:
            logger.error(
                "Matrix: cannot verify device keys on server: %s — refusing E2EE",
                exc,
                exc_info=True,
            )
            return False

        device_keys_map = getattr(resp, "device_keys", {}) or {}
        our_user_devices = device_keys_map.get(str(client.mxid)) or {}
        our_keys = our_user_devices.get(str(client.device_id))
        local_ed25519 = olm.account.identity_keys.get("ed25519")

        if not our_keys:
            logger.warning("Matrix: device keys missing from server — re-uploading")
            olm.account.shared = False
            try:
                await olm.share_keys()
            except Exception as exc:
                logger.error("Matrix: failed to re-upload device keys: %s", exc, exc_info=True)
                return False
            return await self._reverify_keys_after_upload(client, local_ed25519)

        server_ed25519 = self._extract_server_ed25519(our_keys)

        if server_ed25519 != local_ed25519:
            if olm.account.shared:
                logger.error(
                    "Matrix: server has different identity keys for device %s — "
                    "local crypto state is stale. Delete %s and restart.",
                    client.device_id,
                    _CRYPTO_DB_PATH,
                )
                return False

            logger.warning(
                "Matrix: server has stale keys for device %s — attempting re-upload",
                client.device_id,
            )
            try:
                await client.api.request(
                    client.api.Method.DELETE
                    if hasattr(client.api, "Method")
                    else "DELETE",
                    f"/_matrix/client/v3/devices/{client.device_id}",
                )
                logger.info(
                    "Matrix: deleted stale device %s from server", client.device_id
                )
            except Exception:
                pass
            try:
                await olm.share_keys()
            except Exception as exc:
                logger.error(
                    "Matrix: cannot upload device keys for %s: %s. "
                    "Try generating a new access token to get a fresh device.",
                    client.device_id,
                    exc,
                    exc_info=True,
                )
                return False
            return await self._reverify_keys_after_upload(client, local_ed25519)

        return True

    # ------------------------------------------------------------------
    # Required overrides
    # ------------------------------------------------------------------

    async def connect(self) -> bool:
        """Connect to the Matrix homeserver and start syncing."""
        from mautrix.api import HTTPAPI
        from mautrix.client import Client
        from mautrix.client.state_store import MemoryStateStore, MemorySyncStore

        if not self._homeserver:
            logger.error("Matrix: homeserver URL not configured")
            return False

        # Ensure store dir exists for E2EE key persistence.
        _STORE_DIR.mkdir(parents=True, exist_ok=True)

        # Create the HTTP API layer.
        client_session = _create_matrix_session(self._proxy_url)
        api = HTTPAPI(
            base_url=self._homeserver,
            token=self._access_token or "",
            client_session=client_session,
        )

        # Create the client.
        state_store = MemoryStateStore()
        sync_store = MemorySyncStore()
        client = Client(
            mxid=UserID(self._user_id) if self._user_id else UserID(""),
            device_id=self._device_id or None,
            api=api,
            state_store=state_store,
            sync_store=sync_store,
        )

        self._client = client

        # Authenticate.
        if self._access_token:
            api.token = self._access_token

            # Validate the token and learn user_id / device_id.
            try:
                resp = await client.whoami()
                resolved_user_id = getattr(resp, "user_id", "") or self._user_id
                resolved_device_id = getattr(resp, "device_id", "")
                if resolved_user_id:
                    self._user_id = str(resolved_user_id)
                    client.mxid = UserID(self._user_id)

                # Prefer user-configured device_id for stable E2EE identity.
                effective_device_id = self._device_id or resolved_device_id
                if effective_device_id:
                    client.device_id = effective_device_id

                logger.info(
                    "Matrix: using access token for %s%s",
                    self._user_id or "(unknown user)",
                    f" (device {effective_device_id})" if effective_device_id else "",
                )
            except Exception as exc:
                logger.error(
                    "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER: %s",
                    exc,
                    exc_info=True,
                )
                await api.session.close()
                return False
        elif self._password and self._user_id:
            try:
                resp = await client.login(
                    identifier=self._user_id,
                    password=self._password,
                    device_name="Hermes Agent",
                    device_id=self._device_id or None,
                )
                if resp and hasattr(resp, "device_id"):
                    client.device_id = resp.device_id
                logger.info("Matrix: logged in as %s", self._user_id)
            except Exception as exc:
                logger.error("Matrix: login failed — %s", exc)
                await api.session.close()
                return False
        else:
            logger.error(
                "Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD"
            )
            await api.session.close()
            return False

        # Set up E2EE if requested.
        if self._encryption:
            if not _check_e2ee_deps():
                if self._e2ee_mode == "optional":
                    logger.warning(
                        "Matrix: E2EE optional but dependencies are missing. "
                        "Continuing without encrypted-room support. %s",
                        _E2EE_INSTALL_HINT,
                    )
                    self._encryption = False
                else:
                    logger.error(
                        "Matrix: E2EE is required but dependencies are missing. %s. "
                        "Refusing to connect — encrypted rooms would silently fail.",
                        _E2EE_INSTALL_HINT,
                    )
                    await api.session.close()
                    return False
            if not self._encryption:
                pass
            else:
                try:
                    from mautrix.crypto import OlmMachine
                    from mautrix.crypto.store.asyncpg import PgCryptoStore
                    from mautrix.util.async_db import Database

                    _STORE_DIR.mkdir(parents=True, exist_ok=True)
                except Exception as exc:
                    if self._e2ee_mode == "optional":
                        logger.warning(
                            "Matrix: failed to import optional E2EE client; "
                            "continuing without encrypted-room support: %s. %s",
                            exc,
                            _E2EE_INSTALL_HINT,
                        )
                        self._encryption = False
                    else:
                        logger.error(
                            "Matrix: failed to import E2EE client: %s. %s",
                            exc,
                            _E2EE_INSTALL_HINT,
                        )
                        await api.session.close()
                        return False
            if self._encryption:
                try:
                    # Remove legacy pickle file from pre-SQLite era.
                    legacy_pickle = _STORE_DIR / "crypto_store.pickle"
                    if legacy_pickle.exists():
                        logger.info(
                            "Matrix: removing legacy crypto_store.pickle (migrated to SQLite)"
                        )
                        legacy_pickle.unlink()

                    crypto_db = Database.create(
                        f"sqlite:///{_CRYPTO_DB_PATH}",
                        upgrade_table=PgCryptoStore.upgrade_table,
                    )
                    await crypto_db.start()
                    self._crypto_db = crypto_db

                    _acct_id = self._user_id or "hermes"
                    _pickle_key = f"{_acct_id}:{self._device_id or 'default'}"
                    crypto_store = PgCryptoStore(
                        account_id=_acct_id,
                        pickle_key=_pickle_key,
                        db=crypto_db,
                    )
                    await crypto_store.open()

                    if client.device_id:
                        await crypto_store.put_device_id(client.device_id)

                    crypto_state = _CryptoStateStore(state_store, self._joined_rooms)
                    olm = OlmMachine(client, crypto_store, crypto_state)
                    olm.share_keys_min_trust = TrustState.UNVERIFIED
                    olm.send_keys_min_trust = TrustState.UNVERIFIED

                    await olm.load()

                    if not await self._verify_device_keys_on_server(client, olm):
                        await crypto_db.stop()
                        await api.session.close()
                        return False

                    try:
                        await olm.share_keys()
                    except Exception as exc:
                        exc_str = str(exc)
                        if "already exists" in exc_str:
                            logger.error(
                                "Matrix: device %s has stale one-time keys on the "
                                "server signed with a previous identity key. "
                                "Delete the device from the homeserver and restart, "
                                "or generate a new access token to get a fresh device ID.",
                                client.device_id,
                            )
                            await crypto_db.stop()
                            await api.session.close()
                            return False
                        logger.warning("Matrix: share_keys() warning during startup: %s", exc)

                    recovery_key = os.getenv("MATRIX_RECOVERY_KEY", "").strip()
                    if recovery_key:
                        try:
                            await olm.verify_with_recovery_key(recovery_key)
                            logger.info("Matrix: cross-signing verified via recovery key")
                        except Exception as exc:
                            logger.warning("Matrix: recovery key verification failed: %s", exc)
                    else:
                        try:
                            own_xsign = await olm.get_own_cross_signing_public_keys()
                        except Exception as exc:
                            own_xsign = None
                            logger.warning("Matrix: cross-signing key lookup failed: %s", exc)
                        if own_xsign is None:
                            _, output_error = _get_matrix_recovery_key_output_target()
                            if output_error == "not_configured":
                                logger.warning(
                                    "Matrix: cross-signing keys are missing, but "
                                    "automatic bootstrap is skipped because "
                                    "MATRIX_RECOVERY_KEY_OUTPUT_FILE is not configured. "
                                    "Configure MATRIX_RECOVERY_KEY from your Matrix client "
                                    "or set MATRIX_RECOVERY_KEY_OUTPUT_FILE to write a new "
                                    "recovery key once with mode 0600."
                                )
                            elif output_error == "exists":
                                logger.warning(
                                    "Matrix: cross-signing keys are missing, but "
                                    "automatic bootstrap is skipped because "
                                    "MATRIX_RECOVERY_KEY_OUTPUT_FILE already exists and "
                                    "will not be overwritten."
                                )
                            elif output_error:
                                logger.warning(
                                    "Matrix: cross-signing keys are missing, but "
                                    "automatic bootstrap is skipped because "
                                    "MATRIX_RECOVERY_KEY_OUTPUT_FILE is not usable: %s",
                                    output_error,
                                )
                            else:
                                try:
                                    new_recovery_key = await olm.generate_recovery_key()
                                    _handle_generated_matrix_recovery_key(
                                        str(client.mxid),
                                        new_recovery_key,
                                    )
                                except Exception as exc:
                                    logger.warning(
                                        "Matrix: cross-signing bootstrap failed "
                                        "(non-fatal — Element will show 'not verified by its owner'): %s",
                                        exc,
                                    )

                    client.crypto = olm
                    logger.info(
                        "Matrix: E2EE enabled (store: %s%s)",
                        str(_CRYPTO_DB_PATH),
                        f", device_id={client.device_id}" if client.device_id else "",
                    )
                except Exception as exc:
                    if self._e2ee_mode == "optional":
                        logger.warning(
                            "Matrix: failed to create optional E2EE client; "
                            "continuing without encrypted-room support: %s. %s",
                            exc,
                            _E2EE_INSTALL_HINT,
                        )
                        self._encryption = False
                    else:
                        logger.error(
                            "Matrix: failed to create E2EE client: %s. %s",
                            exc,
                            _E2EE_INSTALL_HINT,
                        )
                        await api.session.close()
                        return False

        # Register event handlers.
        from mautrix.client import InternalEventType as IntEvt
        from mautrix.client.dispatcher import MembershipEventDispatcher

        # Without this the INVITE handler below never fires.
        client.add_dispatcher(MembershipEventDispatcher)

        client.add_event_handler(EventType.ROOM_MESSAGE, self._on_room_message)
        client.add_event_handler(EventType.REACTION, self._on_reaction)
        client.add_event_handler(IntEvt.INVITE, self._on_invite)

        # Initial sync to catch up, then start background sync.
        self._startup_ts = time.time()
        # Reset clock-skew detector for each connect cycle so a reconnect
        # after the user fixes NTP doesn't inherit stale counters.
        self._late_grace_drops = 0
        self._late_grace_skew = 0.0
        self._clock_skew_warned = False
        self._closing = False

        try:
            sync_data = await client.sync(timeout=10000, full_state=True)
            if isinstance(sync_data, dict):
                self._last_sync_ts = time.time()
                rooms_join = sync_data.get("rooms", {}).get("join", {})
                self._joined_rooms.clear()
                self._joined_rooms.update(rooms_join.keys())
                self._room_identities.clear()
                self._room_identity_cached_at.clear()
                # Store the next_batch token so incremental syncs start
                # from where the initial sync left off.
                nb = sync_data.get("next_batch")
                if nb:
                    await client.sync_store.put_next_batch(nb)
                logger.info(
                    "Matrix: initial sync complete, joined %d rooms",
                    len(self._joined_rooms),
                )
                # Build DM room cache from m.direct account data.
                await self._refresh_dm_cache()

                # Dispatch events from the initial sync so the OlmMachine
                # receives to-device key shares queued while we were offline.
                try:
                    await self._dispatch_sync(sync_data)
                except Exception as exc:
                    logger.warning("Matrix: initial sync event dispatch error: %s", exc)
                await self._join_pending_invites(sync_data)
            else:
                logger.warning(
                    "Matrix: initial sync returned unexpected type %s",
                    type(sync_data).__name__,
                )
        except Exception as exc:
            logger.warning("Matrix: initial sync error: %s", exc)

        # Share keys after initial sync if E2EE is enabled.
        if self._encryption and getattr(client, "crypto", None):
            try:
                await client.crypto.share_keys()
            except Exception as exc:
                logger.warning("Matrix: initial key share failed: %s", exc)

        # Start the sync loop.
        self._sync_task = asyncio.create_task(self._sync_loop())
        self._mark_connected()
        return True

    async def disconnect(self) -> None:
        """Disconnect from Matrix."""
        self._closing = True

        if self._sync_task and not self._sync_task.done():
            self._sync_task.cancel()
            try:
                await self._sync_task
            except (asyncio.CancelledError, Exception):
                pass

        redaction_tasks = list(self._reaction_redaction_tasks)
        for task in redaction_tasks:
            if not task.done():
                task.cancel()
        if redaction_tasks:
            await asyncio.gather(*redaction_tasks, return_exceptions=True)
        self._reaction_redaction_tasks.clear()

        # Close the SQLite crypto store database.
        if hasattr(self, "_crypto_db") and self._crypto_db:
            try:
                await self._crypto_db.stop()
            except Exception as exc:
                logger.debug("Matrix: could not close crypto DB on disconnect: %s", exc)

        if self._client:
            try:
                await self._client.api.session.close()
            except Exception:
                pass
            self._client = None

        logger.info("Matrix: disconnected")

    async def send(
        self,
        chat_id: str,
        content: str,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Send a message to a Matrix room."""

        if not content:
            return SendResult(success=True)

        formatted = self.format_message(content)
        chunks = self.truncate_message(formatted, MAX_MESSAGE_LENGTH)

        last_event_id = None
        for i, chunk in enumerate(chunks):
            msg_content = self._build_text_message_content(chunk)

            self._apply_relation_metadata(msg_content, reply_to=reply_to, metadata=metadata)

            try:
                event_id = await asyncio.wait_for(
                    self._client.send_message_event(
                        RoomID(chat_id),
                        EventType.ROOM_MESSAGE,
                        msg_content,
                    ),
                    timeout=45,
                )
                last_event_id = str(event_id)
                logger.info("Matrix: sent event %s to %s", last_event_id, chat_id)
            except Exception as exc:
                # On E2EE errors, retry after sharing keys.
                if self._encryption and getattr(self._client, "crypto", None):
                    try:
                        await self._client.crypto.share_keys()
                        event_id = await asyncio.wait_for(
                            self._client.send_message_event(
                                RoomID(chat_id),
                                EventType.ROOM_MESSAGE,
                                msg_content,
                            ),
                            timeout=45,
                        )
                        last_event_id = str(event_id)
                        logger.info(
                            "Matrix: sent event %s to %s (after key share)",
                            last_event_id,
                            chat_id,
                        )
                        continue
                    except Exception as retry_exc:
                        logger.error(
                            "Matrix: failed to send to %s after retry: %s",
                            chat_id,
                            retry_exc,
                        )
                        return SendResult(success=False, error=str(retry_exc))
                logger.error("Matrix: failed to send to %s: %s", chat_id, exc)
                return SendResult(success=False, error=str(exc))

        return SendResult(success=True, message_id=last_event_id)

    async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
        """Return room name and type (dm/group)."""
        identity = await self._resolve_room_identity(chat_id)
        chat_type = "dm" if identity.chat_type == "dm" else "group"
        return {"name": identity.display_name, "type": chat_type}

    def get_diagnostics(self) -> Dict[str, Any]:
        """Return redacted Matrix readiness/status diagnostics."""
        now = time.time()
        token_present = bool(self._access_token)
        user_id = self._user_id or getattr(self._client, "mxid", "") or ""
        device_id = self._device_id or getattr(self._client, "device_id", "") or ""
        return {
            "platform": "matrix",
            "homeserver": self._homeserver,
            "auth": {
                "access_token_present": token_present,
                "password_present": bool(self._password),
                "token_preview": "***" if token_present else "",
                "user_id": user_id,
                "device_id_present": bool(device_id),
                "device_id_preview": _redact_matrix_value(device_id),
            },
            "sync": {
                "connected": self._client is not None,
                "joined_room_count": len(self._joined_rooms),
                "last_sync_age_seconds": (
                    max(0.0, now - self._last_sync_ts) if self._last_sync_ts else None
                ),
            },
            "e2ee": {
                "mode": self._e2ee_mode,
                "enabled": bool(self._encryption),
                "deps_available": _check_e2ee_deps(),
                "crypto_store_path": str(_CRYPTO_DB_PATH),
                "recovery_key_configured": bool(os.getenv("MATRIX_RECOVERY_KEY", "").strip()),
            },
            "policy": {
                "allowed_user_count": len(self._allowed_user_ids),
                "allowed_room_count": len(self._allowed_room_ids),
                "ignored_user_pattern_count": len(self._ignored_user_patterns),
                "require_mention": self._require_mention,
                "free_response_room_count": len(self._free_rooms),
                "allow_room_mentions": self._allow_room_mentions,
                "process_notices": self._process_notices,
                "allow_public_rooms": os.getenv("MATRIX_ALLOW_PUBLIC_ROOMS", "").lower()
                in ("true", "1", "yes"),
            },
            "media": {
                "max_media_bytes": self._max_media_bytes,
            },
        }

    # ------------------------------------------------------------------
    # Optional overrides
    # ------------------------------------------------------------------

    async def send_typing(
        self, chat_id: str, metadata: Optional[Dict[str, Any]] = None
    ) -> None:
        """Send a typing indicator."""
        if self._client:
            try:
                await self._client.set_typing(RoomID(chat_id), timeout=30000)
            except Exception:
                pass

    async def stop_typing(self, chat_id: str) -> None:
        """Clear the typing indicator."""
        if self._client:
            try:
                await self._client.set_typing(RoomID(chat_id), timeout=0)
            except Exception:
                pass


    async def edit_message(
        self, chat_id: str, message_id: str, content: str, *, finalize: bool = False
    ) -> SendResult:
        """Edit an existing message (via m.replace)."""

        formatted = self.format_message(content)
        new_content = self._build_text_message_content(formatted)
        msg_content: Dict[str, Any] = {
            "msgtype": "m.text",
            "body": f"* {formatted}",
            "m.new_content": new_content,
        }
        if "m.mentions" in new_content:
            msg_content["m.mentions"] = new_content["m.mentions"]
        if "formatted_body" in new_content:
            msg_content["format"] = "org.matrix.custom.html"
            msg_content["formatted_body"] = f'* {new_content["formatted_body"]}'
        msg_content["m.relates_to"] = {
            "rel_type": "m.replace",
            "event_id": message_id,
        }

        try:
            event_id = await self._client.send_message_event(
                RoomID(chat_id),
                EventType.ROOM_MESSAGE,
                msg_content,
            )
            return SendResult(success=True, message_id=str(event_id))
        except Exception as exc:
            return SendResult(success=False, error=str(exc))

    async def send_image(
        self,
        chat_id: str,
        image_url: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Download an image URL and upload it to Matrix."""
        from tools.url_safety import is_safe_url

        if not is_safe_url(image_url):
            logger.warning("Matrix: blocked unsafe image URL (SSRF protection)")
            return await super().send_image(
                chat_id, image_url, caption, reply_to, metadata=metadata
            )

        try:
            data, ct, fname = await self._download_external_media_with_cap(image_url)
        except Exception as exc:
            logger.warning(
                "Matrix: failed to download image %s: %s",
                _redact_url_for_log(image_url),
                exc,
            )
            fallback = (
                "I couldn't download and upload the image to Matrix. "
                "The source URL was not shown because it may contain private tokens."
            )
            if caption:
                fallback = f"{caption}\n{fallback}"
            return await self.send(
                chat_id,
                fallback,
                reply_to,
            )

        return await self._upload_and_send(
            chat_id, data, fname, ct, "m.image", caption, reply_to, metadata
        )

    async def _download_external_media_with_cap(self, url: str) -> tuple[bytes, str, str]:
        """Download external media while enforcing redirect safety and size caps."""
        from tools.url_safety import is_safe_url

        if not is_safe_url(url):
            raise ValueError("blocked unsafe media URL")

        def _check_content_length(headers: Any) -> None:
            raw = None
            try:
                raw = headers.get("Content-Length") or headers.get("content-length")
            except Exception:
                raw = None
            if raw is None:
                return
            try:
                size = int(raw)
            except (TypeError, ValueError):
                return
            if size > self._max_media_bytes:
                raise ValueError(
                    f"media exceeds Matrix limit ({size} > {self._max_media_bytes} bytes)"
                )

        def _check_image_content_type(content_type: str) -> str:
            content_type = str(content_type or "").split(";", 1)[0].strip().lower()
            if not content_type.startswith("image/"):
                raise ValueError("external media is not an image")
            return content_type

        def _append_chunk(parts: list[bytes], total: int, chunk: bytes) -> int:
            total += len(chunk)
            if total > self._max_media_bytes:
                raise ValueError(
                    f"media exceeds Matrix limit (> {self._max_media_bytes} bytes)"
                )
            parts.append(chunk)
            return total

        fname = url.rsplit("/", 1)[-1].split("?")[0] or "image.png"

        try:
            import aiohttp as _aiohttp

            _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(self._proxy_url)
            async with _aiohttp.ClientSession(**_sess_kw) as http:
                async with http.get(
                    url,
                    timeout=_aiohttp.ClientTimeout(total=30),
                    allow_redirects=True,
                    **_req_kw,
                ) as resp:
                    resp.raise_for_status()
                    if not is_safe_url(str(resp.url)):
                        raise ValueError("blocked unsafe redirect URL")
                    _check_content_length(resp.headers)
                    parts: list[bytes] = []
                    total = 0
                    async for chunk in resp.content.iter_chunked(65536):
                        total = _append_chunk(parts, total, bytes(chunk))
                    ct = _check_image_content_type(
                        getattr(resp, "content_type", None)
                        or resp.headers.get("content-type", "application/octet-stream")
                    )
                    return b"".join(parts), ct, fname
        except ImportError:
            import httpx

            _httpx_kw: dict = {}
            if self._proxy_url:
                _httpx_kw["proxy"] = self._proxy_url
            async with httpx.AsyncClient(**_httpx_kw) as http:
                async with http.stream(
                    "GET",
                    url,
                    follow_redirects=True,
                    timeout=30,
                ) as resp:
                    resp.raise_for_status()
                    if not is_safe_url(str(resp.url)):
                        raise ValueError("blocked unsafe redirect URL")
                    _check_content_length(resp.headers)
                    parts: list[bytes] = []
                    total = 0
                    async for chunk in resp.aiter_bytes():
                        total = _append_chunk(parts, total, bytes(chunk))
                    ct = _check_image_content_type(
                        resp.headers.get("content-type", "application/octet-stream")
                    )
                    return b"".join(parts), ct, fname

    async def send_image_file(
        self,
        chat_id: str,
        image_path: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Upload a local image file to Matrix."""
        return await self._send_local_file(
            chat_id, image_path, "m.image", caption, reply_to, metadata=metadata
        )

    async def send_multiple_images(
        self,
        chat_id: str,
        images: list[tuple[str, str]],
        metadata: Optional[Dict[str, Any]] = None,
        human_delay: float = 0.0,
    ) -> None:
        """Send multiple Matrix images as one ordered logical batch."""
        if not images:
            return
        from urllib.parse import unquote as _unquote

        total = len(images)
        for idx, (image_url, alt_text) in enumerate(images, start=1):
            if human_delay > 0 and idx > 1:
                await asyncio.sleep(human_delay)
            caption = alt_text or None
            if total > 1 and caption:
                caption = f"{caption} ({idx}/{total})"
            if image_url.startswith("file://"):
                result = await self.send_image_file(
                    chat_id=chat_id,
                    image_path=_unquote(image_url[7:]),
                    caption=caption,
                    metadata=metadata,
                )
            else:
                result = await self.send_image(
                    chat_id=chat_id,
                    image_url=image_url,
                    caption=caption,
                    metadata=metadata,
                )
            if not result.success:
                logger.warning("Matrix: failed to send image %d/%d: %s", idx, total, result.error)

    async def send_document(
        self,
        chat_id: str,
        file_path: str,
        caption: Optional[str] = None,
        file_name: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Upload a local file as a document."""
        return await self._send_local_file(
            chat_id, file_path, "m.file", caption, reply_to, file_name, metadata
        )

    async def send_voice(
        self,
        chat_id: str,
        audio_path: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Upload an audio file as a voice message (MSC3245 native voice)."""
        return await self._send_local_file(
            chat_id,
            audio_path,
            "m.audio",
            caption,
            reply_to,
            metadata=metadata,
            is_voice=True,
        )

    async def send_video(
        self,
        chat_id: str,
        video_path: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Upload a video file."""
        return await self._send_local_file(
            chat_id, video_path, "m.video", caption, reply_to, metadata=metadata
        )

    async def send_exec_approval(
        self,
        chat_id: str,
        command: str,
        session_key: str,
        description: str = "dangerous command",
        metadata: Optional[dict] = None,
    ) -> SendResult:
        """Send a reaction-based exec approval prompt for Matrix."""
        if not self._client:
            return SendResult(success=False, error="Not connected")

        requester_user_id = str((metadata or {}).get("requester_user_id") or "") or None
        cmd_preview = command[:2000] + "..." if len(command) > 2000 else command
        text = (
            "⚠️ **Dangerous command requires approval**\n"
            f"```\n{cmd_preview}\n```\n"
            f"Reason: {description}\n\n"
            "Reply `!approve` to execute, `!approve session` to approve this pattern for the session, "
            "`!approve always` to approve permanently, or `!deny` to cancel.\n\n"
            "You can also click the reaction to approve:\n"
            "✅ = approve\n"
            "❎ = deny"
        )

        result = await self.send(chat_id, text, metadata=metadata)
        if not result.success or not result.message_id:
            return result

        prompt = _MatrixApprovalPrompt(
            session_key=session_key,
            chat_id=chat_id,
            message_id=result.message_id,
            requester_user_id=requester_user_id,
            expires_at=time.monotonic() + max(self._approval_timeout_seconds, 0),
        )
        old_event = self._approval_prompt_by_session.get(session_key)
        if old_event:
            self._approval_prompts_by_event.pop(old_event, None)
        self._approval_prompts_by_event[result.message_id] = prompt
        self._approval_prompt_by_session[session_key] = result.message_id

        for emoji in ("✅", "♾️", "❌"):
            try:
                reaction_result = await self._send_reaction(chat_id, result.message_id, emoji)
                # Save the bot's reaction event_id for later cleanup
                if reaction_result:
                    prompt.bot_reaction_events[emoji] = str(reaction_result)
            except Exception as exc:
                logger.debug("Matrix: failed to add approval reaction %s: %s", emoji, exc)

        return result

    async def send_model_picker(
        self,
        chat_id: str,
        providers: list,
        current_model: str,
        current_provider: str,
        session_key: str,
        on_model_selected,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> SendResult:
        """Send a Matrix reaction-based model picker."""
        if not self._client:
            return SendResult(success=False, error="Not connected")

        flat_choices: list[tuple[str, str, str, str]] = []
        for provider in providers or []:
            provider_slug = str(provider.get("slug") or "")
            provider_name = str(provider.get("name") or provider_slug)
            models = provider.get("models") or []
            for model_id in models:
                if len(flat_choices) >= len(_MATRIX_MODEL_PICKER_REACTIONS):
                    break
                flat_choices.append((
                    _MATRIX_MODEL_PICKER_REACTIONS[len(flat_choices)],
                    str(model_id),
                    provider_slug,
                    provider_name,
                ))
            if len(flat_choices) >= len(_MATRIX_MODEL_PICKER_REACTIONS):
                break

        if not flat_choices:
            return await self.send(
                chat_id,
                "No authenticated models are available for this session.",
                metadata=metadata,
            )

        try:
            from hermes_cli.providers import get_label
            provider_label = get_label(current_provider)
        except Exception:
            provider_label = current_provider

        lines = [
            "⚙ **Model Configuration**",
            f"Current model: `{current_model or 'unknown'}`",
            f"Provider: {provider_label or 'unknown'}",
            "",
            "React to choose a model:",
        ]
        choices: dict[str, tuple[str, str]] = {}
        for emoji, model_id, provider_slug, provider_name in flat_choices:
            choices[emoji] = (model_id, provider_slug)
            lines.append(f"{emoji} `{model_id}` — {provider_name}")

        result = await self.send(chat_id, "\n".join(lines), metadata=metadata)
        if not result.success or not result.message_id:
            return result

        prompt = _MatrixModelPickerPrompt(
            chat_id=chat_id,
            message_id=result.message_id,
            session_key=session_key,
            choices=choices,
            on_model_selected=on_model_selected,
            requester_user_id=str((metadata or {}).get("requester_user_id") or "") or None,
            expires_at=time.monotonic() + max(self._approval_timeout_seconds, 0),
        )
        self._model_picker_prompts_by_event[result.message_id] = prompt

        for emoji in choices:
            try:
                reaction_event_id = await self._send_reaction(chat_id, result.message_id, emoji)
                if reaction_event_id:
                    prompt.bot_reaction_events[emoji] = str(reaction_event_id)
            except Exception as exc:
                logger.debug("Matrix: failed to add model picker reaction %s: %s", emoji, exc)

        return result

    def format_message(self, content: str) -> str:
        """Pass-through — Matrix supports standard Markdown natively."""
        # Strip image markdown; media is uploaded separately.
        content = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", content)
        return content

    # ------------------------------------------------------------------
    # File helpers
    # ------------------------------------------------------------------

    async def _upload_and_send(
        self,
        room_id: str,
        data: bytes,
        filename: str,
        content_type: str,
        msgtype: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
        is_voice: bool = False,
    ) -> SendResult:
        """Upload bytes to Matrix and send as a media message."""
        if len(data) > self._max_media_bytes:
            return SendResult(
                success=False,
                error=f"Media file exceeds Matrix limit ({len(data)} > {self._max_media_bytes} bytes)",
            )

        upload_data = data
        encrypted_file = None
        if self._encryption and getattr(self._client, "crypto", None):
            state_store = getattr(self._client, "state_store", None)
            if state_store:
                try:
                    room_encrypted = bool(await state_store.is_encrypted(RoomID(room_id)))
                except Exception:
                    room_encrypted = False
                if room_encrypted:
                    try:
                        from mautrix.crypto.attachments import encrypt_attachment
                        upload_data, encrypted_file = encrypt_attachment(data)
                    except Exception as exc:
                        logger.error("Matrix: attachment encryption failed: %s", exc)
                        return SendResult(success=False, error=str(exc))

        # Upload to homeserver.
        try:
            mxc_url = await self._client.upload_media(
                upload_data,
                mime_type=content_type,
                filename=filename,
                size=len(upload_data),
            )
        except Exception as exc:
            logger.error("Matrix: upload failed: %s", exc)
            return SendResult(success=False, error=str(exc))

        # Build media message content.
        msg_content: Dict[str, Any] = {
            "msgtype": msgtype,
            "body": caption or filename,
            "info": {
                "mimetype": content_type,
                "size": len(data),
            },
        }
        if encrypted_file is not None:
            file_payload = encrypted_file.serialize()
            file_payload["url"] = str(mxc_url)
            msg_content["file"] = file_payload
        else:
            msg_content["url"] = str(mxc_url)

        # Add MSC3245 voice flag for native voice messages.
        if is_voice:
            msg_content["org.matrix.msc3245.voice"] = {}

        self._apply_relation_metadata(msg_content, reply_to=reply_to, metadata=metadata)

        try:
            event_id = await self._client.send_message_event(
                RoomID(room_id),
                EventType.ROOM_MESSAGE,
                msg_content,
            )
            return SendResult(success=True, message_id=str(event_id))
        except Exception as exc:
            return SendResult(success=False, error=str(exc))

    async def _send_local_file(
        self,
        room_id: str,
        file_path: str,
        msgtype: str,
        caption: Optional[str] = None,
        reply_to: Optional[str] = None,
        file_name: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
        is_voice: bool = False,
    ) -> SendResult:
        """Read a local file and upload it."""
        p = Path(file_path).expanduser()
        if not p.exists():
            return await self.send(
                room_id, f"{caption or ''}\n(file not found: {file_path})", reply_to
            )
        try:
            file_size = p.stat().st_size
        except OSError:
            file_size = 0
        if file_size > self._max_media_bytes:
            return SendResult(
                success=False,
                error=f"Media file exceeds Matrix limit ({file_size} > {self._max_media_bytes} bytes)",
            )

        fname = file_name or p.name
        ct = mimetypes.guess_type(fname)[0] or "application/octet-stream"
        data = p.read_bytes()

        return await self._upload_and_send(
            room_id, data, fname, ct, msgtype, caption, reply_to, metadata, is_voice
        )

    # ------------------------------------------------------------------
    # Sync loop
    # ------------------------------------------------------------------

    async def _sync_loop(self) -> None:
        """Continuously sync with the homeserver."""
        client = self._client
        # Resume from the token stored during the initial sync.
        next_batch = await client.sync_store.get_next_batch()
        while not self._closing:
            try:
                # Wrap in asyncio.wait_for to guard against TCP-level hangs
                # that the Matrix long-poll timeout cannot catch. Long-poll
                # is 30s, so 45s gives 15s slack for network drain.
                sync_data = await asyncio.wait_for(
                    client.sync(
                        since=next_batch,
                        timeout=30000,
                    ),
                    timeout=45.0,
                )

                # nio returns SyncError objects (not exceptions) for auth
                # failures like M_UNKNOWN_TOKEN.  Detect and stop immediately.
                _sync_msg = getattr(sync_data, "message", None)
                if _sync_msg and isinstance(_sync_msg, str):
                    _lower = _sync_msg.lower()
                    if "m_unknown_token" in _lower or "unknown_token" in _lower:
                        logger.error(
                            "Matrix: permanent auth error from sync: %s — stopping",
                            _sync_msg,
                        )
                        return

                if isinstance(sync_data, dict):
                    self._last_sync_ts = time.time()
                    # Update joined rooms from sync response.
                    rooms_join = sync_data.get("rooms", {}).get("join", {})
                    if rooms_join:
                        self._joined_rooms.update(rooms_join.keys())
                        self._room_identities.clear()
                        self._room_identity_cached_at.clear()

                    # Advance the sync token so the next request is
                    # incremental instead of a full initial sync.
                    nb = sync_data.get("next_batch")
                    if nb:
                        next_batch = nb
                        await client.sync_store.put_next_batch(nb)

                    # Dispatch events to registered handlers so that
                    # _on_room_message / _on_reaction / _on_invite fire.
                    try:
                        await self._dispatch_sync(sync_data)
                    except Exception as exc:
                        logger.warning("Matrix: sync event dispatch error: %s", exc)
                    await self._join_pending_invites(sync_data)

            except asyncio.CancelledError:
                return
            except Exception as exc:
                if self._closing:
                    return
                # Detect permanent auth/permission failures.
                err_str = str(exc).lower()
                if (
                    "401" in err_str
                    or "403" in err_str
                    or "unauthorized" in err_str
                    or "forbidden" in err_str
                ):
                    logger.error(
                        "Matrix: permanent auth error: %s — stopping sync", exc
                    )
                    return
                logger.warning("Matrix: sync error: %s — retrying in 5s", exc)
                await asyncio.sleep(5)

    # ------------------------------------------------------------------
    # Event callbacks
    # ------------------------------------------------------------------

    async def _dispatch_sync(self, sync_data: Dict[str, Any]) -> None:
        """Dispatch a sync response through the mautrix event machinery."""
        client = self._client
        if not client or not hasattr(client, "handle_sync"):
            return
        tasks = client.handle_sync(sync_data)
        if inspect.isawaitable(tasks):
            tasks = await tasks
        if tasks:
            await asyncio.gather(*tasks)

    def _is_self_sender(self, sender: str) -> bool:
        """Return True if the sender refers to the bot's own account.

        Matrix user IDs are byte-compared after trimming whitespace and
        lowercasing — some homeservers normalize the localpart case
        differently at different API surfaces, and the reply-loop tail
        of the "hall of mirrors" bug (#15763) has been observed with the
        bot's own account bypassing a case-sensitive equality check.

        When ``self._user_id`` is empty (whoami hasn't resolved yet, or
        login failed), we cannot prove a sender is NOT us, so we return
        True defensively — an unidentified bot dropping its own events
        is always preferable to falling into an echo loop.
        """
        own = (self._user_id or "").strip().lower()
        if not own:
            return True
        return sender.strip().lower() == own

    @staticmethod
    def _is_system_or_bridge_sender(sender: str) -> bool:
        """Return True if the sender looks like a system / bridge / appservice
        identity rather than a real user.

        Appservice namespaces on Matrix conventionally prefix bot / puppet
        user IDs with an underscore (e.g. ``@_telegram_12345:server``,
        ``@_discord_999:server``, ``@_slack_...:server``).  Server-notices
        bots and bridge-controller bots on many homeservers use the same
        pattern.

        We treat these as system identities for pairing purposes: they
        should never be offered a pairing code, because an operator
        approving the code would hand the bridge itself permanent
        authorization — and every outbound message relayed by the bridge
        would then loop back into the agent as an "authorized user
        message", which is the root of issue #15763.

        Matches:
            ``@_something:server``   — appservice namespace convention
            ``@:server``             — malformed / empty localpart
            ``:server``              — malformed, no leading ``@``
        """
        s = (sender or "").strip()
        if not s:
            return True
        # Localpart is everything between leading '@' and ':'
        if s.startswith("@"):
            s = s[1:]
        if ":" in s:
            localpart, _, _ = s.partition(":")
        else:
            localpart = s
        if not localpart:
            return True
        return localpart.startswith("_")

    def _matches_ignored_user_pattern(self, sender: str) -> bool:
        """Return True when sender matches configured Matrix ignore patterns."""
        return any(pattern.search(sender or "") for pattern in self._ignored_user_patterns)

    def _is_allowed_matrix_room(self, room_id: str) -> bool:
        """Return True when MATRIX_ALLOWED_ROOMS permits the room."""
        return not self._allowed_room_ids or room_id in self._allowed_room_ids

    async def _is_allowed_matrix_room_event(self, room_id: str) -> bool:
        """Return True when a room event may proceed past intake filters.

        MATRIX_ALLOWED_ROOMS constrains shared rooms. Matrix DMs are exempt so
        personal chats still work when operators use a room allowlist for
        project rooms.
        """
        if self._is_allowed_matrix_room(room_id):
            return True
        try:
            return await self._is_dm_room(room_id)
        except Exception as exc:
            logger.debug(
                "Matrix: could not resolve room identity for allowlist check in %s: %s",
                room_id,
                exc,
            )
            return False

    async def _on_room_message(self, event: Any) -> None:
        """Handle incoming room message events (text, media)."""
        room_id = str(getattr(event, "room_id", ""))
        sender = str(getattr(event, "sender", ""))

        # Diagnostic: confirm the callback is firing at all when DEBUG is on.
        # Helps users troubleshoot silent inbound issues like #5819, #7914, #12614.
        logger.debug(
            "Matrix: callback fired — event %s from %s in %s",
            getattr(event, "event_id", "?"),
            sender,
            room_id,
        )

        # Ignore own messages (case-insensitive; also drops when our own
        # user_id hasn't been resolved yet — see _is_self_sender docstring
        # and issue #15763).
        if self._is_self_sender(sender):
            return

        # Ignore appservice / bridge / system identities so they never
        # trigger the pairing flow.  Once a bridge user is paired, every
        # outbound message it relays would loop back as an authorized
        # user message (the "hall of mirrors" in #15763).
        if self._is_system_or_bridge_sender(sender):
            logger.debug(
                "Matrix: ignoring system/bridge sender %s in %s",
                sender,
                room_id,
            )
            return
        if self._matches_ignored_user_pattern(sender):
            logger.debug(
                "Matrix: ignoring sender %s in %s due to configured ignore pattern",
                sender,
                room_id,
            )
            return
        if not await self._is_allowed_matrix_room_event(room_id):
            logger.info(
                "Matrix: ignoring message from unauthorized room %s",
                room_id,
            )
            return

        # Deduplicate by event ID.
        event_id = str(getattr(event, "event_id", ""))
        if self._is_duplicate_event(event_id):
            return

        # Startup grace: ignore old messages from initial sync.
        event_ts = _matrix_event_timestamp_seconds(event)
        if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS:
            # If we are well past startup but events are still being dropped
            # by the grace check, the host clock is probably set ahead of
            # real time — every live event then looks "older than startup".
            # Warn once so users can fix NTP instead of chasing a ghost.
            # See #12614 (Schnurzel700, April 2026).
            #
            # Filter out backfill (events legitimately old) by requiring:
            #  - we are >30s past startup (initial-sync replay window closed)
            #  - the skew is *consistent* across consecutive drops, which is
            #    the signature of a constant clock offset rather than a
            #    variable-age room history.  Backfill from a freshly invited
            #    room can deliver events spanning hours/days — those skews
            #    will be all over the place and reset the counter.
            if not self._clock_skew_warned and (
                time.time() - self._startup_ts > 30
            ):
                skew = self._startup_ts - event_ts
                # Sanity bound: malformed events with negative or absurd
                # timestamps shouldn't count.
                if 5 < skew < 86400:
                    if self._late_grace_drops == 0:
                        self._late_grace_skew = skew
                        self._late_grace_drops = 1
                    elif abs(skew - self._late_grace_skew) < 60:
                        # Consistent offset → likely real clock skew.
                        self._late_grace_drops += 1
                    else:
                        # Varied skew → likely backfill, restart sampling.
                        self._late_grace_skew = skew
                        self._late_grace_drops = 1
                    if self._late_grace_drops >= 3:
                        logger.warning(
                            "Matrix: dropped %d consecutive live events as "
                            "'too old' more than 30s after startup (skew "
                            "≈ %.0fs). The host system clock is likely set "
                            "ahead of real time, which causes the startup "
                            "grace filter to silently discard every incoming "
                            "message. Run `timedatectl set-ntp true` (or "
                            "sync NTP) and restart the bot.",
                            self._late_grace_drops,
                            skew,
                        )
                        self._clock_skew_warned = True
            return

        # Extract content from the event.
        content = getattr(event, "content", None)
        if content is None:
            return

        # Get msgtype — either from content object or raw dict.
        if hasattr(content, "msgtype"):
            msgtype = str(content.msgtype)
        elif isinstance(content, dict):
            msgtype = content.get("msgtype", "")
        else:
            msgtype = ""

        # Determine source content dict for relation/thread extraction.
        if isinstance(content, dict):
            source_content = content
        elif hasattr(content, "serialize"):
            source_content = content.serialize()
        else:
            source_content = {}

        relates_to = source_content.get("m.relates_to", {})

        # Skip edits (m.replace relation).
        if relates_to.get("rel_type") == "m.replace":
            return

        # Ignore m.notice to prevent bot-to-bot loops (m.notice is the
        # conventional msgtype for bot responses in the Matrix ecosystem).
        if msgtype == "m.notice" and not self._process_notices:
            return

        # Dispatch by msgtype.
        media_msgtypes = ("m.image", "m.audio", "m.video", "m.file")
        if msgtype in media_msgtypes:
            await self._handle_media_message(
                room_id, sender, event_id, event_ts, source_content, relates_to, msgtype
            )
        elif msgtype in ("m.text", "m.notice"):
            await self._handle_text_message(
                room_id, sender, event_id, event_ts, source_content, relates_to
            )

    async def _resolve_message_context(
        self,
        room_id: str,
        sender: str,
        event_id: str,
        body: str,
        source_content: dict,
        relates_to: dict,
    ) -> Optional[tuple]:
        """Shared mention/thread/DM gating for text and media handlers.

        Returns (body, is_dm, chat_type, thread_id, display_name, source)
        or None if the message should be dropped (mention gating).
        """
        identity = await self._resolve_room_identity(room_id)
        is_dm = await self._is_dm_room(room_id)
        chat_type = "dm" if is_dm else "group"

        thread_id = None
        if relates_to.get("rel_type") == "m.thread":
            thread_id = relates_to.get("event_id")

        formatted_body = source_content.get("formatted_body")
        # m.mentions.user_ids (MSC3952 / Matrix v1.7) — authoritative mention signal.
        mentions_block = source_content.get("m.mentions") or {}
        mention_user_ids = (
            mentions_block.get("user_ids") if isinstance(mentions_block, dict) else None
        )
        is_mentioned = self._is_bot_mentioned(body, formatted_body, mention_user_ids)

        # Require-mention gating.
        if not is_dm:
            # allowed_rooms check (whitelist — must pass before other gating).
            # When set, messages from rooms NOT in this whitelist are silently
            # ignored, even if @mentioned.  DMs are already excluded above.
            if self._allowed_rooms and room_id not in self._allowed_rooms:
                logger.debug(
                    "Matrix: ignoring message %s in %s — room not in "
                    "MATRIX_ALLOWED_ROOMS whitelist",
                    event_id,
                    room_id,
                )
                return None

            is_free_room = room_id in self._free_rooms
            in_bot_thread = bool(thread_id and thread_id in self._threads)
            is_command = body.startswith("/")
            if self._require_mention and not is_free_room and not in_bot_thread:
                if not is_mentioned and not is_command:
                    logger.debug(
                        "Matrix: ignoring message %s in %s — no @mention "
                        "(set MATRIX_REQUIRE_MENTION=false to disable)",
                        event_id,
                        room_id,
                    )
                    return None

            # Thread-level @mention gating: even in a bot-participated thread,
            # require @mention when thread_require_mention is enabled.
            # Prevents infinite reply loops in multi-agent shared rooms
            # where multiple bots all participate in the same thread.
            elif (self._thread_require_mention and in_bot_thread
                  and not is_free_room):
                if not is_mentioned:
                    logger.debug(
                        "Matrix: ignoring message %s in thread %s — "
                        "no @mention (thread_require_mention=true)",
                        event_id,
                        thread_id,
                    )
                    return None

        # DM mention-thread.
        if is_dm and not thread_id and self._dm_mention_threads and is_mentioned:
            thread_id = event_id
            self._threads.mark(thread_id)

        # Strip mention from body (only when mention-gating is active).
        if is_mentioned and self._require_mention:
            body = self._strip_mention(body)

        # Auto-thread/session-scope policy. Real Matrix thread roots are
        # preserved above; synthetic thread roots are policy-driven.
        if not thread_id:
            if is_dm:
                if self._dm_auto_thread:
                    thread_id = event_id
                    self._threads.mark(thread_id)
            elif self._matrix_session_scope == "room":
                thread_id = None
            elif self._matrix_session_scope == "thread":
                thread_id = event_id
                self._threads.mark(thread_id)
            elif self._auto_thread:
                thread_id = event_id
                self._threads.mark(thread_id)

        display_name = await self._get_display_name(room_id, sender)
        source = self.build_source(
            chat_id=room_id,
            chat_name=identity.display_name,
            chat_type=chat_type,
            user_id=sender,
            user_name=display_name,
            thread_id=thread_id,
            chat_topic=identity.room_topic,
            guild_id=identity.server_name,
            parent_chat_id=room_id if thread_id else None,
            message_id=event_id,
        )

        if thread_id:
            self._threads.mark(thread_id)

        self._background_read_receipt(room_id, event_id)

        return body, is_dm, chat_type, thread_id, display_name, source

    async def _handle_text_message(
        self,
        room_id: str,
        sender: str,
        event_id: str,
        event_ts: float,
        source_content: dict,
        relates_to: dict,
    ) -> None:
        """Process a text message event."""
        body = source_content.get("body", "") or ""
        if not body:
            return
        body = _normalize_matrix_bang_command(body)

        ctx = await self._resolve_message_context(
            room_id,
            sender,
            event_id,
            body,
            source_content,
            relates_to,
        )
        if ctx is None:
            return
        body, is_dm, chat_type, thread_id, display_name, source = ctx

        # Reply-to detection.
        reply_to = None
        in_reply_to = relates_to.get("m.in_reply_to", {})
        if in_reply_to:
            reply_to = in_reply_to.get("event_id")

        # Strip reply fallback from body.
        if reply_to and body.startswith("> "):
            lines = body.split("\n")
            stripped = []
            past_fallback = False
            for line in lines:
                if not past_fallback:
                    if line.startswith("> ") or line == ">":
                        continue
                    if line == "":
                        past_fallback = True
                        continue
                    past_fallback = True
                stripped.append(line)
            body = "\n".join(stripped) if stripped else body

        # Re-run bang normalization after reply-fallback stripping so a quoted
        # reply whose actual content is a bang command (e.g. ``> quoted\n\n!model``)
        # is treated as a command, matching how ``/command`` is recognized below.
        body = _normalize_matrix_bang_command(body)

        msg_type = MessageType.TEXT
        if body.startswith("/"):
            msg_type = MessageType.COMMAND

        msg_event = MessageEvent(
            text=body,
            message_type=msg_type,
            source=source,
            raw_message=source_content,
            message_id=event_id,
            reply_to_message_id=reply_to,
        )

        if msg_type == MessageType.TEXT and self._text_batch_delay_seconds > 0:
            self._enqueue_text_event(msg_event)
        else:
            await self.handle_message(msg_event)

    async def _handle_media_message(
        self,
        room_id: str,
        sender: str,
        event_id: str,
        event_ts: float,
        source_content: dict,
        relates_to: dict,
        msgtype: str,
    ) -> None:
        """Process a media message event (image, audio, video, file)."""
        body = source_content.get("body", "") or ""
        url = source_content.get("url", "")
        if url and not str(url).startswith("mxc://"):
            logger.warning(
                "[Matrix] Rejecting inbound media %s with non-MXC URL",
                event_id,
            )
            return

        # Convert mxc:// to HTTP URL for downstream processing.
        http_url = ""
        if url and url.startswith("mxc://"):
            http_url = self._mxc_to_http(url)

        # Extract MIME type from content info.
        content_info = source_content.get("info", {})
        if not isinstance(content_info, dict):
            content_info = {}
        event_mimetype = content_info.get("mimetype", "")
        event_size = content_info.get("size")
        try:
            event_size_int = int(event_size) if event_size is not None else 0
        except (TypeError, ValueError):
            event_size_int = 0
        if event_size_int and event_size_int > self._max_media_bytes:
            logger.warning(
                "[Matrix] Rejecting oversized inbound media %s (%d > %d bytes)",
                event_id,
                event_size_int,
                self._max_media_bytes,
            )
            return

        # For encrypted media, the URL may be in file.url.
        file_content = source_content.get("file", {})
        if not url and isinstance(file_content, dict):
            url = file_content.get("url", "") or ""
            if url and not str(url).startswith("mxc://"):
                logger.warning(
                    "[Matrix] Rejecting inbound encrypted media %s with non-MXC URL",
                    event_id,
                )
                return
            if url and url.startswith("mxc://"):
                http_url = self._mxc_to_http(url)

        is_encrypted_media = bool(
            file_content and isinstance(file_content, dict) and file_content.get("url")
        )

        media_type = "application/octet-stream"
        msg_type = MessageType.DOCUMENT
        is_voice_message = False

        if msgtype == "m.image":
            msg_type = MessageType.PHOTO
            media_type = event_mimetype or "image/png"
        elif msgtype == "m.audio":
            if source_content.get("org.matrix.msc3245.voice") is not None:
                is_voice_message = True
                msg_type = MessageType.VOICE
            else:
                msg_type = MessageType.AUDIO
            media_type = event_mimetype or "audio/ogg"
        elif msgtype == "m.video":
            msg_type = MessageType.VIDEO
            media_type = event_mimetype or "video/mp4"
        elif event_mimetype:
            media_type = event_mimetype

        # Cache media locally when downstream tools need a real file path.
        cached_path = None
        should_cache_locally = msg_type in {
            MessageType.PHOTO, MessageType.AUDIO, MessageType.VIDEO, MessageType.DOCUMENT,
        } or is_voice_message or is_encrypted_media
        if should_cache_locally and url:
            try:
                file_bytes = await self._client.download_media(ContentURI(url))
                if file_bytes is not None:
                    if is_encrypted_media:
                        from mautrix.crypto.attachments import decrypt_attachment

                        hashes_value = (
                            file_content.get("hashes")
                            if isinstance(file_content, dict)
                            else None
                        )
                        hash_value = (
                            hashes_value.get("sha256")
                            if isinstance(hashes_value, dict)
                            else None
                        )

                        key_value = (
                            file_content.get("key")
                            if isinstance(file_content, dict)
                            else None
                        )
                        if isinstance(key_value, dict):
                            key_value = key_value.get("k")

                        iv_value = (
                            file_content.get("iv")
                            if isinstance(file_content, dict)
                            else None
                        )

                        if key_value and hash_value and iv_value:
                            file_bytes = decrypt_attachment(
                                file_bytes, key_value, hash_value, iv_value
                            )
                        else:
                            logger.warning(
                                "[Matrix] Encrypted media event missing decryption metadata for %s",
                                event_id,
                            )
                            file_bytes = None

                    if file_bytes is not None:
                        from gateway.platforms.base import (
                            cache_audio_from_bytes,
                            cache_document_from_bytes,
                            cache_image_from_bytes,
                        )

                        if msg_type == MessageType.PHOTO:
                            ext_map = {
                                "image/jpeg": ".jpg",
                                "image/png": ".png",
                                "image/gif": ".gif",
                                "image/webp": ".webp",
                            }
                            ext = ext_map.get(media_type, ".jpg")
                            cached_path = cache_image_from_bytes(file_bytes, ext=ext)
                            logger.info("[Matrix] Cached user image at %s", cached_path)
                        elif msg_type in {MessageType.AUDIO, MessageType.VOICE}:
                            ext = (
                                Path(
                                    body
                                    or (
                                        "voice.ogg" if is_voice_message else "audio.ogg"
                                    )
                                ).suffix
                                or ".ogg"
                            )
                            cached_path = cache_audio_from_bytes(file_bytes, ext=ext)
                        else:
                            filename = body or (
                                "video.mp4"
                                if msg_type == MessageType.VIDEO
                                else "document"
                            )
                            cached_path = cache_document_from_bytes(
                                file_bytes, filename
                            )
            except Exception as e:
                logger.warning("[Matrix] Failed to cache media: %s", e)

        ctx = await self._resolve_message_context(
            room_id,
            sender,
            event_id,
            body,
            source_content,
            relates_to,
        )
        if ctx is None:
            return
        body, is_dm, chat_type, thread_id, display_name, source = ctx

        if msgtype == "m.image" and _looks_like_matrix_image_filename(body):
            body = ""

        allow_http_fallback = bool(http_url) and not is_encrypted_media
        media_urls = (
            [cached_path]
            if cached_path
            else ([http_url] if allow_http_fallback else None)
        )
        media_types = [media_type] if media_urls else None

        msg_event = MessageEvent(
            text=body,
            message_type=msg_type,
            source=source,
            raw_message=source_content,
            message_id=event_id,
            media_urls=media_urls,
            media_types=media_types,
        )

        await self.handle_message(msg_event)

    async def _on_invite(self, event: Any) -> None:
        """Auto-join rooms when invited."""

        room_id = str(getattr(event, "room_id", ""))

        logger.info(
            "Matrix: invited to %s — joining",
            room_id,
        )
        await self._join_room_by_id(room_id)

    async def _join_room_by_id(self, room_id: str) -> bool:
        """Join a room by ID and refresh local caches on success."""
        if not room_id:
            return False
        if room_id in self._joined_rooms:
            return True
        try:
            await self._client.join_room(RoomID(room_id))
            self._joined_rooms.add(room_id)
            self._room_identities.pop(room_id, None)
            self._room_identity_cached_at.pop(room_id, None)
            logger.info("Matrix: joined %s", room_id)
            await self._refresh_dm_cache()
            return True
        except Exception as exc:
            logger.warning("Matrix: error joining %s: %s", room_id, exc)
            return False

    async def _join_pending_invites(self, sync_data: Dict[str, Any]) -> None:
        """Join rooms still present in rooms.invite after sync processing."""
        rooms = sync_data.get("rooms", {}) if isinstance(sync_data, dict) else {}
        invites = rooms.get("invite", {})
        if not isinstance(invites, dict):
            return
        for room_id in invites:
            if room_id in self._joined_rooms:
                continue
            logger.info("Matrix: reconciling pending invite for %s", room_id)
            await self._join_room_by_id(str(room_id))

    # ------------------------------------------------------------------
    # Reactions (send, receive, processing lifecycle)
    # ------------------------------------------------------------------

    async def _send_reaction(
        self,
        room_id: str,
        event_id: str,
        emoji: str,
    ) -> Optional[str]:
        """Send an emoji reaction to a message in a room.
        Returns the reaction event_id on success, None on failure.
        """

        if not self._client:
            return None
        content = {
            "m.relates_to": {
                "rel_type": "m.annotation",
                "event_id": event_id,
                "key": emoji,
            }
        }
        try:
            resp_event_id = await self._client.send_message_event(
                RoomID(room_id),
                EventType.REACTION,
                content,
            )
            logger.debug("Matrix: sent reaction %s to %s", emoji, event_id)
            return str(resp_event_id)
        except Exception as exc:
            logger.debug("Matrix: reaction send error: %s", exc)
            return None

    async def _redact_reaction(
        self,
        room_id: str,
        reaction_event_id: str,
        reason: str = "",
    ) -> bool:
        """Remove a reaction by redacting its event."""
        return await self.redact_message(room_id, reaction_event_id, reason)

    def _schedule_reaction_redaction(
        self,
        room_id: str,
        reaction_event_id: str,
        reason: str = "",
    ) -> None:
        """Redact a reaction after a short delay so message delivery settles."""

        async def _redact_later() -> None:
            try:
                if self._reaction_redaction_delay_seconds:
                    await asyncio.sleep(self._reaction_redaction_delay_seconds)
                if not await self._redact_reaction(room_id, reaction_event_id, reason):
                    logger.debug(
                        "Matrix: failed to redact reaction %s", reaction_event_id
                    )
            except asyncio.CancelledError:
                raise
            except Exception as exc:
                logger.debug(
                    "Matrix: delayed reaction redaction failed for %s: %s",
                    reaction_event_id,
                    exc,
                )

        task = asyncio.create_task(_redact_later())
        self._reaction_redaction_tasks.add(task)
        task.add_done_callback(self._reaction_redaction_tasks.discard)

    async def on_processing_start(self, event: MessageEvent) -> None:
        """Add eyes reaction when the agent starts processing a message."""
        if not self._reactions_enabled:
            return
        msg_id = event.message_id
        room_id = event.source.chat_id
        if msg_id and room_id:
            reaction_event_id = await self._send_reaction(room_id, msg_id, "\U0001f440")
            if reaction_event_id:
                self._pending_reactions[(room_id, msg_id)] = reaction_event_id

    async def on_processing_complete(
        self,
        event: MessageEvent,
        outcome: ProcessingOutcome,
    ) -> None:
        """Replace eyes with checkmark (success) or cross (failure)."""
        if not self._reactions_enabled:
            return
        msg_id = event.message_id
        room_id = event.source.chat_id
        if not msg_id or not room_id:
            return
        if outcome == ProcessingOutcome.CANCELLED:
            return
        reaction_key = (room_id, msg_id)
        if reaction_key in self._pending_reactions:
            eyes_event_id = self._pending_reactions.pop(reaction_key)
            self._schedule_reaction_redaction(
                room_id,
                eyes_event_id,
                "processing complete",
            )
        await self._send_reaction(
            room_id,
            msg_id,
            "\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
        )

    async def _on_reaction(self, event: Any) -> None:
        """Handle incoming reaction events."""
        sender = str(getattr(event, "sender", ""))
        if self._is_self_sender(sender):
            return
        event_id = str(getattr(event, "event_id", ""))
        if self._is_duplicate_event(event_id):
            return

        room_id = str(getattr(event, "room_id", ""))
        content = getattr(event, "content", None)
        if content:
            relates_to = (
                content.get("m.relates_to", {})
                if isinstance(content, dict)
                else getattr(content, "relates_to", {})
            )
            reacts_to = ""
            key = ""
            if isinstance(relates_to, dict):
                reacts_to = relates_to.get("event_id", "")
                key = relates_to.get("key", "")
            elif hasattr(relates_to, "event_id"):
                reacts_to = str(getattr(relates_to, "event_id", ""))
                key = str(getattr(relates_to, "key", ""))
            logger.info(
                "Matrix: reaction %s from %s on %s in %s",
                key,
                sender,
                reacts_to,
                room_id,
            )

            # Check if this reaction resolves a pending approval prompt.
            prompt = self._approval_prompts_by_event.get(reacts_to)
            if prompt and not prompt.resolved:
                if room_id != prompt.chat_id:
                    return
                if self._matrix_prompt_expired(prompt):
                    await self._expire_matrix_approval_prompt(room_id, reacts_to, prompt)
                    return
                if not await self._validate_matrix_prompt_reactor(
                    room_id, reacts_to, sender, prompt, "approval"
                ):
                    return
                choice = self._approval_reaction_map.get(key)
                if not choice:
                    await self._send_invalid_reaction_feedback(
                        room_id,
                        reacts_to,
                        "That reaction is not valid for this approval prompt.",
                    )
                    return
                try:
                    from tools.approval import resolve_gateway_approval

                    count = resolve_gateway_approval(prompt.session_key, choice)
                    if count:
                        prompt.resolved = True
                        self._approval_prompts_by_event.pop(reacts_to, None)
                        self._approval_prompt_by_session.pop(prompt.session_key, None)
                        logger.info(
                            "Matrix reaction resolved %d approval(s) for session %s "
                            "(choice=%s, user=%s)",
                            count, prompt.session_key, choice, sender,
                        )
                        # Redact bot's seed reactions, leaving only the user's
                        await self._redact_bot_approval_reactions(room_id, prompt)
                except Exception as exc:
                    logger.error("Failed to resolve gateway approval from Matrix reaction: %s", exc)
                return

            model_prompt = self._model_picker_prompts_by_event.get(reacts_to)
            if model_prompt and not model_prompt.resolved:
                if room_id != model_prompt.chat_id:
                    return
                if self._matrix_prompt_expired(model_prompt):
                    await self._expire_matrix_model_picker_prompt(room_id, reacts_to, model_prompt)
                    return
                if not await self._validate_matrix_prompt_reactor(
                    room_id, reacts_to, sender, model_prompt, "model picker"
                ):
                    return
                selection = model_prompt.choices.get(key)
                if not selection:
                    await self._send_invalid_reaction_feedback(
                        room_id,
                        reacts_to,
                        "That reaction is not one of the available model choices.",
                    )
                    return
                model_prompt.resolved = True
                self._model_picker_prompts_by_event.pop(reacts_to, None)
                model_id, provider_slug = selection
                try:
                    confirmation = await model_prompt.on_model_selected(
                        room_id, model_id, provider_slug
                    )
                    await self._redact_bot_model_picker_reactions(room_id, model_prompt)
                    if confirmation:
                        await self.send(room_id, confirmation, reply_to=reacts_to)
                except Exception as exc:
                    logger.error("Failed to switch model from Matrix reaction: %s", exc)
                    await self.send(
                        room_id,
                        f"Failed to switch model: {exc}",
                        reply_to=reacts_to,
                    )
                return

    def _matrix_prompt_expired(self, prompt: Any) -> bool:
        expires_at = getattr(prompt, "expires_at", None)
        return expires_at is not None and time.monotonic() > float(expires_at)

    async def _validate_matrix_prompt_reactor(
        self,
        room_id: str,
        target_event_id: str,
        sender: str,
        prompt: Any,
        prompt_label: str,
    ) -> bool:
        allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {
            "true",
            "1",
            "yes",
        }
        if not allow_all and not (
            self._allowed_user_ids and sender in self._allowed_user_ids
        ):
            logger.info(
                "Matrix: ignoring %s reaction from unauthorized user %s on %s",
                prompt_label, sender, target_event_id,
            )
            await self._send_invalid_reaction_feedback(
                room_id,
                target_event_id,
                "Only an authorized Matrix user can use these controls.",
            )
            return False

        requester = getattr(prompt, "requester_user_id", None)
        approval_require_sender = getattr(self, "_approval_require_sender", True)
        if approval_require_sender and requester and sender != requester:
            logger.info(
                "Matrix: ignoring %s reaction from %s; requester is %s",
                prompt_label, sender, requester,
            )
            await self._send_invalid_reaction_feedback(
                room_id,
                target_event_id,
                "Only the user who requested this action can use these controls.",
            )
            return False
        return True

    async def _send_invalid_reaction_feedback(
        self,
        room_id: str,
        target_event_id: str,
        text: str,
    ) -> None:
        try:
            await self.send(room_id, text, reply_to=target_event_id)
        except Exception as exc:
            logger.debug("Matrix: failed to send invalid reaction feedback: %s", exc)

    async def _expire_matrix_approval_prompt(
        self,
        room_id: str,
        target_event_id: str,
        prompt: "_MatrixApprovalPrompt",
    ) -> None:
        prompt.resolved = True
        self._approval_prompts_by_event.pop(target_event_id, None)
        self._approval_prompt_by_session.pop(prompt.session_key, None)
        await self._redact_bot_approval_reactions(room_id, prompt)
        await self._send_invalid_reaction_feedback(
            room_id,
            target_event_id,
            "This approval prompt has expired. Run the command again if you still want to approve it.",
        )

    async def _expire_matrix_model_picker_prompt(
        self,
        room_id: str,
        target_event_id: str,
        prompt: "_MatrixModelPickerPrompt",
    ) -> None:
        prompt.resolved = True
        self._model_picker_prompts_by_event.pop(target_event_id, None)
        await self._redact_bot_model_picker_reactions(room_id, prompt)
        await self._send_invalid_reaction_feedback(
            room_id,
            target_event_id,
            "This model picker has expired. Run `/model` again to choose a model.",
        )

    async def _redact_bot_approval_reactions(
        self,
        room_id: str,
        prompt: "_MatrixApprovalPrompt",
    ) -> None:
        """Redact the bot's seeded approval reactions, leaving only the user's reaction."""
        for emoji, evt_id in prompt.bot_reaction_events.items():
            self._schedule_reaction_redaction(room_id, evt_id, "approval resolved")
            logger.debug("Matrix: scheduled bot reaction redaction %s (%s)", emoji, evt_id)

    async def _redact_bot_model_picker_reactions(
        self,
        room_id: str,
        prompt: "_MatrixModelPickerPrompt",
    ) -> None:
        """Redact the bot's seeded model picker reactions."""
        for emoji, evt_id in prompt.bot_reaction_events.items():
            try:
                await self.redact_message(room_id, evt_id, "model picker resolved")
                logger.debug("Matrix: redacted model picker reaction %s (%s)", emoji, evt_id)
            except Exception as exc:
                logger.debug("Matrix: failed to redact model picker reaction %s: %s", emoji, exc)

    # ------------------------------------------------------------------
    # Text message aggregation (handles Matrix client-side splits)
    # ------------------------------------------------------------------

    def _text_batch_key(self, event: MessageEvent) -> str:
        """Session-scoped key for text message batching."""
        from gateway.session import build_session_key

        return build_session_key(
            event.source,
            group_sessions_per_user=self.config.extra.get(
                "group_sessions_per_user", True
            ),
            thread_sessions_per_user=self.config.extra.get(
                "thread_sessions_per_user", False
            ),
        )

    def _enqueue_text_event(self, event: MessageEvent) -> None:
        """Buffer a text event and reset the flush timer."""
        key = self._text_batch_key(event)
        existing = self._pending_text_batches.get(key)
        chunk_len = len(event.text or "")
        if existing is None:
            event._last_chunk_len = chunk_len  # type: ignore[attr-defined]
            self._pending_text_batches[key] = event
        else:
            if event.text:
                existing.text = (
                    f"{existing.text}\n{event.text}" if existing.text else event.text
                )
            existing._last_chunk_len = chunk_len  # type: ignore[attr-defined]
            if event.media_urls:
                existing.media_urls.extend(event.media_urls)
                existing.media_types.extend(event.media_types)

        prior_task = self._pending_text_batch_tasks.get(key)
        if prior_task and not prior_task.done():
            prior_task.cancel()
        self._pending_text_batch_tasks[key] = asyncio.create_task(
            self._flush_text_batch(key)
        )

    async def _flush_text_batch(self, key: str) -> None:
        """Wait for the quiet period then dispatch the aggregated text."""
        current_task = asyncio.current_task()
        try:
            pending = self._pending_text_batches.get(key)
            last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
            if last_len >= self._SPLIT_THRESHOLD:
                delay = self._text_batch_split_delay_seconds
            else:
                delay = self._text_batch_delay_seconds
            await asyncio.sleep(delay)
            event = self._pending_text_batches.pop(key, None)
            if not event:
                return
            logger.info(
                "[Matrix] Flushing text batch %s (%d chars)",
                key,
                len(event.text or ""),
            )
            await self.handle_message(event)
        finally:
            if self._pending_text_batch_tasks.get(key) is current_task:
                self._pending_text_batch_tasks.pop(key, None)

    # ------------------------------------------------------------------
    # Read receipts
    # ------------------------------------------------------------------

    def _background_read_receipt(self, room_id: str, event_id: str) -> None:
        """Fire-and-forget read receipt with error logging."""

        async def _send() -> None:
            try:
                await self.send_read_receipt(room_id, event_id)
            except Exception as exc:  # pragma: no cover — defensive
                logger.debug("Matrix: background read receipt failed: %s", exc)

        asyncio.ensure_future(_send())

    async def send_read_receipt(self, room_id: str, event_id: str) -> bool:
        """Send a read receipt (m.read) for an event."""
        if not self._client:
            return False
        try:
            room = RoomID(room_id)
            event = EventID(event_id)
            if hasattr(self._client, "set_fully_read_marker"):
                await self._client.set_fully_read_marker(room, event, event)
            elif hasattr(self._client, "send_receipt"):
                await self._client.send_receipt(room, event)
            elif hasattr(self._client, "set_read_markers"):
                await self._client.set_read_markers(
                    room,
                    fully_read_event=event,
                    read_receipt=event,
                )
            else:
                logger.debug("Matrix: client has no read receipt method")
                return False
            logger.debug("Matrix: sent read receipt for %s in %s", event_id, room_id)
            return True
        except Exception as exc:
            logger.debug("Matrix: read receipt failed: %s", exc)
            return False

    # ------------------------------------------------------------------
    # Message redaction
    # ------------------------------------------------------------------

    async def redact_message(
        self,
        room_id: str,
        event_id: str,
        reason: str = "",
    ) -> bool:
        """Redact (delete) a message or event from a room."""
        if not self._client:
            return False
        try:
            await self._client.redact(
                RoomID(room_id),
                EventID(event_id),
                reason=reason or None,
            )
            logger.info("Matrix: redacted %s in %s", event_id, room_id)
            return True
        except Exception as exc:
            logger.warning("Matrix: redact error: %s", exc)
            return False

    # ------------------------------------------------------------------
    # Room creation & management
    # ------------------------------------------------------------------

    async def create_room(
        self,
        name: str = "",
        topic: str = "",
        invite: Optional[list] = None,
        is_direct: bool = False,
        preset: str = "private_chat",
    ) -> Optional[str]:
        """Create a new Matrix room."""
        if not self._client:
            return None
        if preset == "public_chat" and os.getenv("MATRIX_ALLOW_PUBLIC_ROOMS", "").lower() not in (
            "true",
            "1",
            "yes",
        ):
            logger.warning("Matrix: refusing to create public room without MATRIX_ALLOW_PUBLIC_ROOMS=true")
            return None
        try:
            preset_enum = {
                "private_chat": RoomCreatePreset.PRIVATE,
                "public_chat": RoomCreatePreset.PUBLIC,
                "trusted_private_chat": RoomCreatePreset.TRUSTED_PRIVATE,
            }.get(preset, RoomCreatePreset.PRIVATE)
            invitees = [UserID(u) for u in (invite or [])]
            room_id = await self._client.create_room(
                name=name or None,
                topic=topic or None,
                invitees=invitees,
                is_direct=is_direct,
                preset=preset_enum,
            )
            room_id_str = str(room_id)
            self._joined_rooms.add(room_id_str)
            logger.info("Matrix: created room %s (%s)", room_id_str, name or "unnamed")
            return room_id_str
        except Exception as exc:
            logger.warning("Matrix: create_room error: %s", exc)
            return None

    async def invite_user(self, room_id: str, user_id: str) -> bool:
        """Invite a user to a room."""
        if not self._client:
            return False
        try:
            await self._client.invite_user(RoomID(room_id), UserID(user_id))
            logger.info("Matrix: invited %s to %s", user_id, room_id)
            return True
        except Exception as exc:
            logger.warning("Matrix: invite error: %s", exc)
            return False

    async def fetch_history(
        self,
        room_id: str,
        limit: int = 20,
        from_token: str = "",
    ) -> list[dict[str, Any]]:
        """Fetch recent Matrix room history using the live client."""
        if not self._client:
            return []
        limit = max(1, min(int(limit or 20), 100))
        try:
            direction = getattr(PaginationDirection, "BACKWARD", "b")
            if hasattr(self._client, "messages"):
                response = await self._client.messages(
                    RoomID(room_id),
                    from_token=SyncToken(from_token) if from_token else None,
                    direction=direction,
                    limit=limit,
                )
            elif hasattr(self._client, "get_messages"):
                response = await self._client.get_messages(
                    RoomID(room_id),
                    start=SyncToken(from_token) if from_token else None,
                    direction=direction,
                    limit=limit,
                )
            else:
                logger.debug("Matrix: client has no messages/get_messages method")
                return []
            chunk = getattr(response, "chunk", None)
            if chunk is None and isinstance(response, dict):
                chunk = response.get("chunk")
            return [self._serialize_history_event(evt) for evt in (chunk or [])]
        except Exception as exc:
            logger.warning("Matrix: fetch history error: %s", exc)
            return []

    def _serialize_history_event(self, event: Any) -> dict[str, Any]:
        content = getattr(event, "content", None)
        if content is None and isinstance(event, dict):
            content = event.get("content", {})
        if not isinstance(content, dict):
            content = dict(content) if hasattr(content, "items") else {}
        return {
            "event_id": str(
                getattr(event, "event_id", "")
                or (event.get("event_id", "") if isinstance(event, dict) else "")
            ),
            "sender": str(
                getattr(event, "sender", "")
                or (event.get("sender", "") if isinstance(event, dict) else "")
            ),
            "timestamp": _matrix_event_timestamp_seconds(event),
            "msgtype": str(content.get("msgtype", "")),
            "body": str(content.get("body", "")),
        }

    # ------------------------------------------------------------------
    # Presence
    # ------------------------------------------------------------------

    _VALID_PRESENCE_STATES = frozenset(("online", "offline", "unavailable"))

    async def set_presence(self, state: str = "online", status_msg: str = "") -> bool:
        """Set the bot's presence status."""
        if not self._client:
            return False
        if state not in self._VALID_PRESENCE_STATES:
            logger.warning("Matrix: invalid presence state %r", state)
            return False
        try:
            presence_map = {
                "online": PresenceState.ONLINE,
                "offline": PresenceState.OFFLINE,
                "unavailable": PresenceState.UNAVAILABLE,
            }
            await self._client.set_presence(
                presence=presence_map[state],
                status=status_msg or None,
            )
            logger.debug("Matrix: presence set to %s", state)
            return True
        except Exception as exc:
            logger.debug("Matrix: set_presence failed: %s", exc)
            return False

    # ------------------------------------------------------------------
    # Emote & notice message types
    # ------------------------------------------------------------------

    async def _send_simple_message(
        self,
        chat_id: str,
        text: str,
        msgtype: str,
    ) -> SendResult:
        """Send a simple message (emote, notice) with optional HTML formatting."""
        if not self._client or not text:
            return SendResult(success=False, error="No client or empty text")

        msg_content = self._build_text_message_content(text, msgtype=msgtype)

        try:
            event_id = await self._client.send_message_event(
                RoomID(chat_id),
                EventType.ROOM_MESSAGE,
                msg_content,
            )
            return SendResult(success=True, message_id=str(event_id))
        except Exception as exc:
            return SendResult(success=False, error=str(exc))

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _state_event_value(event: Any, key: str) -> Optional[str]:
        """Extract a simple value from a Matrix state event object or dict."""
        if event is None:
            return None
        value = getattr(event, key, None)
        if value:
            return str(value)
        if isinstance(event, dict):
            if event.get(key):
                return str(event[key])
            content = event.get("content")
            if isinstance(content, dict) and content.get(key):
                return str(content[key])
        content = getattr(event, "content", None)
        if isinstance(content, dict) and content.get(key):
            return str(content[key])
        if content is not None and getattr(content, key, None):
            return str(getattr(content, key))
        return None

    async def _get_room_member_count(self, room_id: str) -> Optional[int]:
        state_store = (
            getattr(self._client, "state_store", None) if self._client else None
        )
        if not state_store:
            return None
        try:
            members = await state_store.get_members(room_id)
        except Exception:
            return None
        if members is None:
            return None
        try:
            return len(members)
        except TypeError:
            return None

    async def _get_room_name(self, room_id: str) -> Optional[str]:
        if not self._client or not hasattr(self._client, "get_state_event"):
            return None
        try:
            event = await self._client.get_state_event(
                RoomID(room_id),
                "m.room.name",
            )
        except Exception:
            return None
        value = self._state_event_value(event, "name")
        return value.strip() if value and value.strip() else None

    async def _get_room_canonical_alias(self, room_id: str) -> Optional[str]:
        if not self._client or not hasattr(self._client, "get_state_event"):
            return None
        try:
            event = await self._client.get_state_event(
                RoomID(room_id),
                "m.room.canonical_alias",
            )
        except Exception:
            return None
        value = self._state_event_value(event, "alias")
        return value.strip() if value and value.strip() else None

    async def _get_room_topic(self, room_id: str) -> Optional[str]:
        if not self._client or not hasattr(self._client, "get_state_event"):
            return None
        try:
            event = await self._client.get_state_event(
                RoomID(room_id),
                "m.room.topic",
            )
        except Exception:
            return None
        value = self._state_event_value(event, "topic")
        return value.strip() if value and value.strip() else None

    @staticmethod
    def _room_server_name(room_id: str) -> Optional[str]:
        if ":" not in room_id:
            return None
        server = room_id.rsplit(":", 1)[-1].strip()
        return server or None

    def _cache_room_identity(self, room_id: str, identity: MatrixRoomIdentity) -> None:
        if len(self._room_identities) >= self._room_identity_cache_max:
            oldest = min(
                self._room_identity_cached_at,
                key=self._room_identity_cached_at.get,
                default=None,
            )
            if oldest:
                self._room_identities.pop(oldest, None)
                self._room_identity_cached_at.pop(oldest, None)
        self._room_identities[room_id] = identity
        self._room_identity_cached_at[room_id] = time.monotonic()

    async def _resolve_room_identity(
        self,
        room_id: str,
        *,
        force_refresh: bool = False,
    ) -> MatrixRoomIdentity:
        """Resolve Matrix room identity without member-count DM heuristics.

        Matrix ``m.direct`` account data is the authoritative DM signal, but
        explicitly named rooms win over stale/conflicting DM account data.
        """
        cached = self._room_identities.get(room_id)
        cached_at = self._room_identity_cached_at.get(room_id, 0.0)
        cache_fresh = (
            self._room_identity_ttl_seconds <= 0
            or time.monotonic() - cached_at <= self._room_identity_ttl_seconds
        )
        if cached is not None and cache_fresh and not force_refresh:
            return cached

        room_name = await self._get_room_name(room_id)
        room_topic = await self._get_room_topic(room_id)
        canonical_alias = await self._get_room_canonical_alias(room_id)
        member_count = await self._get_room_member_count(room_id)
        has_explicit_name = bool(room_name)
        is_direct = bool(self._dm_rooms.get(room_id, False))
        conflict = bool(is_direct and has_explicit_name)
        chat_type = "dm" if is_direct and not has_explicit_name else "room"
        display_name = room_name or canonical_alias or room_id

        identity = MatrixRoomIdentity(
            room_id=room_id,
            room_name=room_name,
            room_topic=room_topic,
            canonical_alias=canonical_alias,
            server_name=self._room_server_name(room_id),
            joined_member_count=member_count,
            is_direct_account_data=is_direct,
            display_name=display_name,
            has_explicit_name=has_explicit_name,
            chat_type=chat_type,
            conflict=conflict,
        )
        self._cache_room_identity(room_id, identity)
        return identity

    async def _is_dm_room(self, room_id: str) -> bool:
        """Check if a room is a DM."""
        return (await self._resolve_room_identity(room_id)).chat_type == "dm"

    async def _refresh_dm_cache(self) -> None:
        """Refresh the DM room cache from m.direct account data."""
        if not self._client:
            return

        dm_data: Optional[Dict] = None

        try:
            resp = await self._client.get_account_data("m.direct")
            if hasattr(resp, "content"):
                dm_data = resp.content
            elif isinstance(resp, dict):
                dm_data = resp
        except Exception as exc:
            logger.debug("Matrix: get_account_data('m.direct') failed: %s", exc)

        if dm_data is None:
            return

        dm_room_ids: Set[str] = set()
        for user_id, rooms in dm_data.items():
            if isinstance(rooms, list):
                dm_room_ids.update(str(r) for r in rooms if isinstance(r, str))

        self._dm_rooms = {rid: (rid in dm_room_ids) for rid in self._joined_rooms}
        self._room_identities.clear()
        self._room_identity_cached_at.clear()

    # ------------------------------------------------------------------
    # Mention detection helpers
    # ------------------------------------------------------------------

    def _build_text_message_content(self, text: str, msgtype: str = "m.text") -> Dict[str, Any]:
        """Build Matrix text content with HTML and outbound mention metadata."""
        msg_content: Dict[str, Any] = {"msgtype": msgtype, "body": text}
        mention_user_ids = self._extract_outbound_mentions(text)
        room_mentioned = self._allow_room_mentions and self._has_outbound_room_mention(text)
        if mention_user_ids:
            msg_content["m.mentions"] = {"user_ids": mention_user_ids}
        if room_mentioned:
            msg_content.setdefault("m.mentions", {})["room"] = True

        html_source = self._inject_outbound_mention_links(text)
        html = self._markdown_to_html(html_source)
        if html and html != text:
            msg_content["format"] = "org.matrix.custom.html"
            msg_content["formatted_body"] = html

        return msg_content

    def _apply_relation_metadata(
        self,
        msg_content: Dict[str, Any],
        *,
        reply_to: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Apply Matrix reply/thread relation metadata to an outbound payload."""
        thread_id = str((metadata or {}).get("thread_id") or "")
        if reply_to:
            msg_content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to}}
        if thread_id:
            relates_to = msg_content.get("m.relates_to", {})
            relates_to["rel_type"] = "m.thread"
            relates_to["event_id"] = thread_id
            relates_to["is_falling_back"] = True
            # Matrix clients that do not render threads still use reply
            # fallback. If no explicit reply target is available, fall back
            # to the thread root.
            relates_to.setdefault(
                "m.in_reply_to",
                {"event_id": reply_to or thread_id},
            )
            msg_content["m.relates_to"] = relates_to

    def _extract_outbound_mentions(self, text: str) -> list[str]:
        """Return unique Matrix user IDs mentioned in outbound text."""
        protected, _ = self._protect_outbound_mention_regions(text)
        seen: Set[str] = set()
        mentions: list[str] = []
        for match in _OUTBOUND_MENTION_RE.finditer(protected):
            user_id = match.group(1)
            if user_id not in seen:
                seen.add(user_id)
                mentions.append(user_id)
        return mentions

    def _has_outbound_room_mention(self, text: str) -> bool:
        """Return True when outbound text contains @room outside protected spans."""
        protected, _ = self._protect_outbound_mention_regions(text)
        return bool(re.search(r"(?<![\w/])@room(?![\w:.-])", protected))

    def _inject_outbound_mention_links(self, text: str) -> str:
        """Wrap outbound Matrix mentions in markdown links outside code spans."""
        if not text:
            return text

        protected, placeholders = self._protect_outbound_mention_regions(text)

        linked = _OUTBOUND_MENTION_RE.sub(
            lambda match: f"[{match.group(1)}](https://matrix.to/#/{match.group(1)})",
            protected,
        )

        for idx, original in enumerate(placeholders):
            linked = linked.replace(f"\x00MENTION_PROTECTED{idx}\x00", original)

        return linked

    def _protect_outbound_mention_regions(self, text: str) -> tuple[str, list[str]]:
        """Protect markdown regions where outbound mentions should stay literal."""
        placeholders: list[str] = []

        def _protect(fragment: str) -> str:
            idx = len(placeholders)
            placeholders.append(fragment)
            return f"\x00MENTION_PROTECTED{idx}\x00"

        protected = re.sub(
            r"```[\s\S]*?```",
            lambda match: _protect(match.group(0)),
            text or "",
        )
        protected = re.sub(
            r"`[^`\n]+`",
            lambda match: _protect(match.group(0)),
            protected,
        )
        protected = re.sub(
            r"\[[^\]]+\]\([^)]+\)",
            lambda match: _protect(match.group(0)),
            protected,
        )

        return protected, placeholders

    def _is_bot_mentioned(
        self,
        body: str,
        formatted_body: Optional[str] = None,
        mention_user_ids: Optional[list] = None,
    ) -> bool:
        """Return True if the bot is mentioned in the message.

        Per MSC3952, ``m.mentions.user_ids`` is the authoritative mention
        signal in the Matrix spec.  When the sender's client populates that
        field with the bot's user-id, we trust it — even when the visible
        body text does not contain an explicit ``@bot`` string (some clients
        only render mention "pills" in ``formatted_body`` or use display
        names).
        """
        # m.mentions.user_ids — authoritative per MSC3952 / Matrix v1.7.
        if mention_user_ids and self._user_id and self._user_id in mention_user_ids:
            return True
        if not body and not formatted_body:
            return False
        if self._user_id and self._user_id in body:
            return True
        if self._user_id and ":" in self._user_id:
            localpart = self._user_id.split(":")[0].lstrip("@")
            if localpart and re.search(
                r"\b" + re.escape(localpart) + r"\b", body, re.IGNORECASE
            ):
                return True
        if formatted_body and self._user_id:
            if f"matrix.to/#/{self._user_id}" in formatted_body:
                return True
        return False

    def _strip_mention(self, body: str) -> str:
        """Remove explicit bot mentions from message body.

        Important: only strip explicit mention tokens (``@user:server`` or
        ``@localpart``). Do NOT strip bare words matching the bot localpart,
        otherwise normal phrases like "Hermes Agent" become "Agent".
        """
        if not body:
            return ""

        # Strip explicit full MXID mentions.
        if self._user_id:
            body = body.replace(self._user_id, "")

        # Strip explicit @localpart mentions only (not bare localpart words).
        if self._user_id and ":" in self._user_id:
            localpart = self._user_id.split(":")[0].lstrip("@")
            if localpart:
                body = re.sub(
                    r'(?<![\w])@' + re.escape(localpart) + r'\b',
                    '',
                    body,
                    flags=re.IGNORECASE,
                )

        # Normalize spacing after mention removal.
        body = re.sub(r'[ \t]{2,}', ' ', body)
        body = re.sub(r'\s+([,.;:!?])', r'\1', body)
        return body.strip()

    async def _get_display_name(self, room_id: str, user_id: str) -> str:
        """Get a user's display name in a room, falling back to user_id."""
        state_store = (
            getattr(self._client, "state_store", None) if self._client else None
        )
        if state_store:
            try:
                member = await state_store.get_member(room_id, user_id)
                if member and getattr(member, "displayname", None):
                    return member.displayname
            except Exception:
                pass
        # Strip the @...:server format to just the localpart.
        if user_id.startswith("@") and ":" in user_id:
            return user_id[1:].split(":")[0]
        return user_id

    def _mxc_to_http(self, mxc_url: str) -> str:
        """Convert mxc://server/media_id to an HTTP download URL."""
        if not mxc_url.startswith("mxc://"):
            return mxc_url
        parts = mxc_url[6:]  # strip mxc://
        return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}"

    def _markdown_to_html(self, text: str) -> str:
        """Convert Markdown to Matrix-compatible HTML (org.matrix.custom.html).

        Uses the ``markdown`` library (a core dependency) when available.
        Falls back to a comprehensive regex converter that handles fenced
        code blocks, inline code, headers, bold, italic, strikethrough,
        links, blockquotes, lists, and horizontal rules — everything the
        Matrix HTML spec allows.
        """
        text = _pre_sanitize_matrix_markdown(text)
        try:
            import markdown as _md

            md = _md.Markdown(
                extensions=["fenced_code", "tables", "nl2br", "sane_lists"],
            )
            if "html_block" in md.preprocessors:
                md.preprocessors.deregister("html_block")

            html = md.convert(text)
            md.reset()

            if html.count("<p>") == 1:
                html = html.replace("<p>", "").replace("</p>", "")
            return _sanitize_matrix_html(html)
        except ImportError:
            pass

        return _sanitize_matrix_html(self._markdown_to_html_fallback(text))

    # ------------------------------------------------------------------
    # Regex-based Markdown -> HTML (no extra dependencies)
    # ------------------------------------------------------------------

    @staticmethod
    def _sanitize_link_url(url: str) -> str:
        """Sanitize a URL for use in an href attribute."""
        stripped = url.strip()
        scheme = stripped.split(":", 1)[0].lower().strip() if ":" in stripped else ""
        if scheme in {"javascript", "data", "vbscript"}:
            return ""
        return stripped.replace('"', "&quot;")

    @staticmethod
    def _markdown_to_html_fallback(text: str) -> str:
        """Comprehensive regex Markdown-to-HTML for Matrix."""
        placeholders: list = []

        def _protect_html(html_fragment: str) -> str:
            idx = len(placeholders)
            placeholders.append(html_fragment)
            return f"\x00PROTECTED{idx}\x00"

        # Fenced code blocks: ```lang\n...\n```
        result = re.sub(
            r"```(\w*)\n(.*?)```",
            lambda m: _protect_html(
                f'<pre><code class="language-{_html_escape(m.group(1))}">'
                f"{_html_escape(m.group(2))}</code></pre>"
                if m.group(1)
                else f"<pre><code>{_html_escape(m.group(2))}</code></pre>"
            ),
            text,
            flags=re.DOTALL,
        )

        # Inline code: `code`
        result = re.sub(
            r"`([^`\n]+)`",
            lambda m: _protect_html(f"<code>{_html_escape(m.group(1))}</code>"),
            result,
        )

        # Extract and protect markdown links before escaping.
        result = re.sub(
            r"\[([^\]]+)\]\(([^)]+)\)",
            lambda m: _protect_html(
                '<a href="{}">{}</a>'.format(
                    MatrixAdapter._sanitize_link_url(m.group(2)),
                    _html_escape(m.group(1)),
                )
            ),
            result,
        )

        # HTML-escape remaining text.
        parts = re.split(r"(\x00PROTECTED\d+\x00)", result)
        for idx, part in enumerate(parts):
            if not part.startswith("\x00PROTECTED"):
                parts[idx] = _html_escape(part)
        result = "".join(parts)

        # Block-level transforms (line-oriented).
        lines = result.split("\n")
        out_lines: list = []
        i = 0
        while i < len(lines):
            line = lines[i]

            # Horizontal rule
            if re.match(r"^[\s]*([-*_])\s*\1\s*\1[\s\-*_]*$", line):
                out_lines.append("<hr>")
                i += 1
                continue

            # Headers
            hdr = re.match(r"^(#{1,6})\s+(.+)$", line)
            if hdr:
                level = len(hdr.group(1))
                out_lines.append(f"<h{level}>{hdr.group(2).strip()}</h{level}>")
                i += 1
                continue

            # Blockquote
            if (
                line.startswith("&gt; ")
                or line == "&gt;"
                or line.startswith("> ")
                or line == ">"
            ):
                bq_lines = []
                while i < len(lines) and (
                    lines[i].startswith("&gt; ")
                    or lines[i] == "&gt;"
                    or lines[i].startswith("> ")
                    or lines[i] == ">"
                ):
                    ln = lines[i]
                    if ln.startswith("&gt; "):
                        bq_lines.append(ln[5:])
                    elif ln.startswith("> "):
                        bq_lines.append(ln[2:])
                    else:
                        bq_lines.append("")
                    i += 1
                out_lines.append(f"<blockquote>{'<br>'.join(bq_lines)}</blockquote>")
                continue

            # Unordered list
            ul_match = re.match(r"^[\s]*[-*+]\s+(.+)$", line)
            if ul_match:
                items = []
                while i < len(lines) and re.match(r"^[\s]*[-*+]\s+(.+)$", lines[i]):
                    items.append(re.match(r"^[\s]*[-*+]\s+(.+)$", lines[i]).group(1))
                    i += 1
                li = "".join(f"<li>{item}</li>" for item in items)
                out_lines.append(f"<ul>{li}</ul>")
                continue

            # Ordered list
            ol_match = re.match(r"^[\s]*\d+[.)]\s+(.+)$", line)
            if ol_match:
                items = []
                while i < len(lines) and re.match(r"^[\s]*\d+[.)]\s+(.+)$", lines[i]):
                    items.append(re.match(r"^[\s]*\d+[.)]\s+(.+)$", lines[i]).group(1))
                    i += 1
                li = "".join(f"<li>{item}</li>" for item in items)
                out_lines.append(f"<ol>{li}</ol>")
                continue

            out_lines.append(line)
            i += 1

        result = "\n".join(out_lines)

        # Inline transforms.
        result = re.sub(
            r"\*\*(.+?)\*\*", r"<strong>\1</strong>", result, flags=re.DOTALL
        )
        result = re.sub(r"__(.+?)__", r"<strong>\1</strong>", result, flags=re.DOTALL)
        result = re.sub(r"\*(.+?)\*", r"<em>\1</em>", result, flags=re.DOTALL)
        result = re.sub(
            r"(?<!\w)_(.+?)_(?!\w)", r"<em>\1</em>", result, flags=re.DOTALL
        )
        result = re.sub(r"~~(.+?)~~", r"<del>\1</del>", result, flags=re.DOTALL)
        result = re.sub(r"\n", "<br>\n", result)
        result = re.sub(
            r"<br>\n(</?(?:pre|blockquote|h[1-6]|ul|ol|li|hr))", r"\n\1", result
        )
        result = re.sub(r"(</(?:pre|blockquote|h[1-6]|ul|ol|li)>)<br>", r"\1", result)

        # Restore protected regions.
        for idx, original in enumerate(placeholders):
            result = result.replace(f"\x00PROTECTED{idx}\x00", original)

        return result
