"""Tests for cross-loop client cache isolation fix (#2681).

Verifies that _get_cached_client() returns different AsyncOpenAI clients
when called from different event loops, preventing the httpx deadlock
that occurs when a cached async client bound to loop A is reused on loop B.

This test file is self-contained and does not import the full tool chain,
so it can run without optional dependencies like firecrawl.
"""

import asyncio
import threading
from unittest.mock import patch, MagicMock

import pytest


# ---------------------------------------------------------------------------
# Minimal stubs so we can import _get_cached_client without the full tree
# ---------------------------------------------------------------------------

def _stub_resolve_provider_client(provider, model, async_mode, **kw):
    """Return a unique mock client each time, simulating AsyncOpenAI creation."""
    client = MagicMock(name=f"client-{provider}-async={async_mode}")
    client.api_key = "test"
    client.base_url = kw.get("explicit_base_url", "http://localhost:8081/v1")
    return client, model or "test-model"


@pytest.fixture(autouse=True)
def _clean_client_cache():
    """Clear the client cache before each test."""
    # We need to patch before importing
    with patch.dict("sys.modules", {}):
        pass
    # Import and clear
    import agent.auxiliary_client as ac
    ac._client_cache.clear()
    yield
    ac._client_cache.clear()


class TestCrossLoopCacheIsolation:
    """Verify async clients are cached per-event-loop, not globally."""

    def test_same_loop_reuses_client(self):
        """Within a single event loop, the same client should be returned."""
        from agent.auxiliary_client import _get_cached_client

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        with patch("agent.auxiliary_client.resolve_provider_client",
                    side_effect=_stub_resolve_provider_client):
            client1, _ = _get_cached_client("custom", "m1", async_mode=True,
                                             base_url="http://localhost:8081/v1")
            client2, _ = _get_cached_client("custom", "m1", async_mode=True,
                                             base_url="http://localhost:8081/v1")

        assert client1 is client2, (
            "Same loop should return the same cached client"
        )
        loop.close()

    def test_different_loops_get_different_clients(self):
        """Different event loops must get separate client instances."""
        from agent.auxiliary_client import _get_cached_client

        results = {}

        def _get_client_on_new_loop(name):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            with patch("agent.auxiliary_client.resolve_provider_client",
                        side_effect=_stub_resolve_provider_client):
                client, _ = _get_cached_client("custom", "m1", async_mode=True,
                                                 base_url="http://localhost:8081/v1")
            results[name] = (id(client), id(loop))
            # Don't close loop — simulates real usage where loops persist

        t1 = threading.Thread(target=_get_client_on_new_loop, args=("a",))
        t2 = threading.Thread(target=_get_client_on_new_loop, args=("b",))
        t1.start(); t1.join()
        t2.start(); t2.join()

        client_id_a, loop_id_a = results["a"]
        client_id_b, loop_id_b = results["b"]

        assert loop_id_a != loop_id_b, "Test setup error: same loop on both threads"
        assert client_id_a != client_id_b, (
            "Different event loops got the SAME cached client — this causes "
            "httpx cross-loop deadlocks in gateway mode (#2681)"
        )

    def test_sync_clients_not_affected(self):
        """Sync clients (async_mode=False) should still be cached globally,
        since httpx.Client (sync) doesn't bind to an event loop."""
        from agent.auxiliary_client import _get_cached_client

        results = {}

        def _get_sync_client(name):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            with patch("agent.auxiliary_client.resolve_provider_client",
                        side_effect=_stub_resolve_provider_client):
                client, _ = _get_cached_client("custom", "m1", async_mode=False,
                                                 base_url="http://localhost:8081/v1")
            results[name] = id(client)

        t1 = threading.Thread(target=_get_sync_client, args=("a",))
        t2 = threading.Thread(target=_get_sync_client, args=("b",))
        t1.start(); t1.join()
        t2.start(); t2.join()

        assert results["a"] == results["b"], (
            "Sync clients should be shared across threads (no loop binding)"
        )

    def test_gateway_simulation_no_deadlock(self):
        """Simulate gateway mode: _run_async spawns a thread with asyncio.run(),
        which creates a new loop. The cached client must be created on THAT loop,
        not reused from a different one."""
        from agent.auxiliary_client import _get_cached_client

        # Simulate: first call on "gateway loop"
        gateway_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(gateway_loop)

        with patch("agent.auxiliary_client.resolve_provider_client",
                    side_effect=_stub_resolve_provider_client):
            gateway_client, _ = _get_cached_client("custom", "m1", async_mode=True,
                                                     base_url="http://localhost:8081/v1")

        # Simulate: _run_async spawns a thread with asyncio.run()
        worker_client_id = [None]
        def _worker():
            async def _inner():
                with patch("agent.auxiliary_client.resolve_provider_client",
                            side_effect=_stub_resolve_provider_client):
                    client, _ = _get_cached_client("custom", "m1", async_mode=True,
                                                     base_url="http://localhost:8081/v1")
                worker_client_id[0] = id(client)
            asyncio.run(_inner())

        t = threading.Thread(target=_worker)
        t.start()
        t.join()

        assert worker_client_id[0] != id(gateway_client), (
            "Worker thread (asyncio.run) got the gateway's cached client — "
            "this is the exact cross-loop scenario that causes httpx deadlocks. "
            "The cache key must include the event loop identity (#2681)"
        )
        gateway_loop.close()

    def test_closed_loop_client_discarded(self):
        """A cached client whose loop has closed should be replaced."""
        from agent.auxiliary_client import _get_cached_client

        loop1 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop1)

        with patch("agent.auxiliary_client.resolve_provider_client",
                    side_effect=_stub_resolve_provider_client):
            client1, _ = _get_cached_client("custom", "m1", async_mode=True,
                                             base_url="http://localhost:8081/v1")

        loop1.close()

        # New loop on same thread
        loop2 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop2)

        with patch("agent.auxiliary_client.resolve_provider_client",
                    side_effect=_stub_resolve_provider_client):
            client2, _ = _get_cached_client("custom", "m1", async_mode=True,
                                             base_url="http://localhost:8081/v1")

        assert client1 is not client2, (
            "Client from closed loop should not be reused"
        )
        loop2.close()
