"""Tests for config-driven platform access policies at the gateway layer.

Background (#34515): WeCom, Weixin, Yuanbao, QQBot, and WhatsApp expose a
documented config-driven access surface (``dm_policy`` / ``group_policy`` /
``allow_from`` / ``group_allow_from`` in ``PlatformConfig.extra``) and enforce
it at intake —
a message is dropped inside the adapter and never reaches the gateway unless it
already passed that policy.

The gateway's env-based allowlist check (``_is_user_authorized``) runs *after*
the adapter. Before the fix it fell through to an env-only default-deny when no
``PLATFORM_ALLOWED_USERS`` env var was set, silently rejecting ``dm_policy:
open`` and config-only allowlists even though the adapter had already
authorized the sender.

The fix is a single drift-proof contract: adapters that own their access policy
declare ``enforces_own_access_policy`` (a ``BasePlatformAdapter`` property,
default ``False``). The gateway trusts that flag and skips the env-only
default-deny for those platforms, rather than re-implementing each adapter's
policy logic a second time.
"""

from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock

import pytest

from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.session import SessionSource


# Platforms whose adapters own their access policy at intake.
_OWN_POLICY_PLATFORMS = [
    Platform.WECOM,
    Platform.WEIXIN,
    Platform.YUANBAO,
    Platform.QQBOT,
    Platform.WHATSAPP,
]


def _clear_auth_env(monkeypatch) -> None:
    for key in (
        "WECOM_ALLOWED_USERS",
        "WEIXIN_ALLOWED_USERS",
        "YUANBAO_ALLOWED_USERS",
        "QQ_ALLOWED_USERS",
        "QQ_GROUP_ALLOWED_USERS",
        "WHATSAPP_ALLOWED_USERS",
        "TELEGRAM_ALLOWED_USERS",
        "GATEWAY_ALLOWED_USERS",
        "GATEWAY_ALLOW_ALL_USERS",
        "WECOM_ALLOW_ALL_USERS",
        "WEIXIN_ALLOW_ALL_USERS",
        "YUANBAO_ALLOW_ALL_USERS",
        "QQ_ALLOW_ALL_USERS",
        "WHATSAPP_ALLOW_ALL_USERS",
    ):
        monkeypatch.delenv(key, raising=False)


def _make_runner(platform: Platform, config: GatewayConfig, *, enforces: bool):
    """Build a bare GatewayRunner with one adapter for *platform*.

    ``enforces`` controls whether the adapter declares
    ``enforces_own_access_policy`` — i.e. whether it owns its access gate.
    """
    from gateway.run import GatewayRunner

    runner = object.__new__(GatewayRunner)
    runner.config = config
    adapter = SimpleNamespace(send=AsyncMock(), enforces_own_access_policy=enforces)
    runner.adapters = {platform: adapter}
    runner.pairing_store = MagicMock()
    runner.pairing_store.is_approved.return_value = False
    runner.pairing_store._is_rate_limited.return_value = False
    return runner, adapter


def _source(platform: Platform, *, chat_type: str = "dm") -> SessionSource:
    return SessionSource(
        platform=platform,
        user_id="some-user",
        chat_id="some-chat",
        user_name="tester",
        chat_type=chat_type,
    )


# ---------------------------------------------------------------------------
# Layer 1: the base-class contract and per-adapter overrides
# ---------------------------------------------------------------------------


def test_base_adapter_defaults_to_not_owning_access_policy():
    """Adapters that don't override the property delegate to the gateway."""
    from gateway.platforms.base import BasePlatformAdapter

    # The default lives on the base property descriptor.
    assert BasePlatformAdapter.enforces_own_access_policy.fget(object()) is False


@pytest.mark.parametrize(
    "module_path, class_name",
    [
        ("gateway.platforms.wecom", "WeComAdapter"),
        ("gateway.platforms.weixin", "WeixinAdapter"),
        ("gateway.platforms.yuanbao", "YuanbaoAdapter"),
        ("gateway.platforms.qqbot.adapter", "QQAdapter"),
        ("gateway.platforms.whatsapp", "WhatsAppAdapter"),
    ],
)
def test_own_policy_adapters_declare_the_flag(module_path, class_name):
    """The config-policy adapters override the flag to True."""
    import importlib

    module = importlib.import_module(module_path)
    adapter_cls = getattr(module, class_name)
    # Property is overridden on the subclass and returns True regardless of
    # instance state (it reflects a static capability, not runtime config).
    value = adapter_cls.enforces_own_access_policy.fget(object.__new__(adapter_cls))
    assert value is True


# ---------------------------------------------------------------------------
# Layer 2: gateway trusts the adapter-enforced flag
# ---------------------------------------------------------------------------


@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS)
def test_own_policy_platform_authorized_without_env_allowlist(monkeypatch, platform):
    """A message reaching the gateway from an own-policy adapter is trusted.

    With no env allowlist set, the gateway must NOT default-deny — the adapter
    already authorized the sender at intake (e.g. ``dm_policy: open``).
    """
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={platform: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
    )
    runner, _adapter = _make_runner(platform, config, enforces=True)

    assert runner._is_user_authorized(_source(platform)) is True


@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS)
def test_own_policy_platform_authorized_for_group_chat(monkeypatch, platform):
    """Group traffic from an own-policy adapter is trusted the same way."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={platform: PlatformConfig(enabled=True, extra={"group_policy": "open"})}
    )
    runner, _adapter = _make_runner(platform, config, enforces=True)

    assert runner._is_user_authorized(_source(platform, chat_type="group")) is True


def test_non_owning_platform_still_default_denies(monkeypatch):
    """Adapters that don't own their policy keep the env-only default-deny."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="t")}
    )
    runner, _adapter = _make_runner(Platform.TELEGRAM, config, enforces=False)

    assert runner._is_user_authorized(_source(Platform.TELEGRAM)) is False


def test_env_allowlist_still_takes_precedence_for_own_policy_platform(monkeypatch):
    """When an env allowlist IS set, it governs — adapter trust is a fallback.

    The adapter-trust branch only fires when no env allowlist exists, so an
    operator who sets ``WECOM_ALLOWED_USERS`` still gets env-based gating and
    a non-listed user is denied.
    """
    _clear_auth_env(monkeypatch)
    monkeypatch.setenv("WECOM_ALLOWED_USERS", "allowed-user")
    config = GatewayConfig(
        platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
    )
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)

    listed = SessionSource(
        platform=Platform.WECOM, user_id="allowed-user", chat_id="c",
        user_name="t", chat_type="dm",
    )
    stranger = SessionSource(
        platform=Platform.WECOM, user_id="stranger", chat_id="c",
        user_name="t", chat_type="dm",
    )
    assert runner._is_user_authorized(listed) is True
    assert runner._is_user_authorized(stranger) is False


def test_unknown_adapter_does_not_crash_trust_check(monkeypatch):
    """No adapter registered for the platform → safe default-deny."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(platforms={Platform.WECOM: PlatformConfig(enabled=True)})
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
    runner.adapters = {}  # nothing registered

    assert runner._adapter_enforces_own_access_policy(Platform.WECOM) is False
    assert runner._is_user_authorized(_source(Platform.WECOM)) is False


# ---------------------------------------------------------------------------
# Layer 2b: `dm_policy: pairing` is NOT blanket-trusted
# ---------------------------------------------------------------------------
#
# Regression: WeCom/Weixin document ``dm_policy: pairing`` and declare
# ``enforces_own_access_policy=True``, but their intake helper only special-cases
# ``disabled`` / ``allowlist`` — ``pairing`` falls through and forwards the DM so
# the gateway can run its pairing handshake. With no env allowlist, the
# adapter-trust shortcut above then authorized *every* unpaired sender, silently
# degrading pairing mode to open access. The shortcut must skip pairing-mode DMs
# so an unpaired sender falls through to default-deny (and gets a pairing code).


@pytest.mark.parametrize("platform", [Platform.WECOM, Platform.WEIXIN])
def test_pairing_dm_policy_not_blanket_authorized(monkeypatch, platform):
    """An unpaired sender in ``dm_policy: pairing`` is NOT authorized."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={platform: PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})}
    )
    runner, _adapter = _make_runner(platform, config, enforces=True)
    # pairing_store.is_approved already returns False (set in _make_runner).

    assert runner._is_user_authorized(_source(platform)) is False


def test_pairing_dm_policy_authorizes_paired_user(monkeypatch):
    """Once approved in the pairing store, the sender authorizes normally."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})}
    )
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
    runner.pairing_store.is_approved.return_value = True

    assert runner._is_user_authorized(_source(Platform.WECOM)) is True


def test_pairing_carveout_reads_adapter_when_env_set(monkeypatch):
    """Env-only ``WECOM_DM_POLICY=pairing`` (absent from config.extra) is honored.

    The adapter resolves ``dm_policy`` from the env var, so its ``_dm_policy`` is
    authoritative even when ``config.extra`` is empty. The carve-out must read
    that, not just config.
    """
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={})}
    )
    runner, adapter = _make_runner(Platform.WECOM, config, enforces=True)
    adapter._dm_policy = "pairing"  # as the adapter would resolve from the env var

    assert runner._is_user_authorized(_source(Platform.WECOM)) is False


def test_pairing_dm_policy_group_chat_still_trusted(monkeypatch):
    """Pairing is DM-only — group traffic keeps the adapter-trust path."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={
            Platform.WECOM: PlatformConfig(
                enabled=True, extra={"dm_policy": "pairing", "group_policy": "open"}
            )
        }
    )
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)

    assert runner._is_user_authorized(_source(Platform.WECOM, chat_type="group")) is True


# ---------------------------------------------------------------------------
# Layer 3: unauthorized-DM behavior reads config dm_policy
# ---------------------------------------------------------------------------


@pytest.mark.parametrize(
    "dm_policy, expected",
    [
        ("allowlist", "ignore"),
        ("disabled", "ignore"),
        ("pairing", "pair"),
    ],
)
def test_unauthorized_dm_behavior_follows_config_dm_policy(monkeypatch, dm_policy, expected):
    """A restrictive dm_policy drops unauthorized DMs; pairing opts back in."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": dm_policy})}
    )
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)

    assert runner._get_unauthorized_dm_behavior(Platform.WECOM) == expected


def test_unauthorized_dm_behavior_open_policy_keeps_default(monkeypatch):
    """``dm_policy: open`` is not restrictive → falls through to the default."""
    _clear_auth_env(monkeypatch)
    config = GatewayConfig(
        platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
    )
    runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)

    # No allowlist + no restrictive policy → open-gateway pairing default.
    assert runner._get_unauthorized_dm_behavior(Platform.WECOM) == "pair"
