master
.Upgrading starlette>=0.15.0 breaks current testing strategy. The setup is mocking a nats subscription by actually using the nats server.
The code works with starlette 0.14.2, upgradign to 0.15.0 gives RunTumeError got Future <Future pending> attached to a different loop
. When upgrading to starlette 0.16.0 it gives TimeOut errors. I would love to keep tests sync.
requirements.txt
starlette
requests
pytest
asyncio-nats-client
code
from starlette.routing import Route
from starlette.testclient import TestClient
import pytest
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
import asyncio
from nats.aio.client import Client as NATS
"""
Test work with starlette 0.14.2
Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
Error with starlette 0.16.0: Nats timeout
The test is that the client code makes a nats request to a mocked nats service over nats itself.
Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
"""
HOST_NATS = "localhost:4222"
# =======================================================================
# CODE
# =======================================================================
def create_app():
async def index(request):
r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
return PlainTextResponse(content=r.data.decode())
async def setup() -> None:
await app.state.nc.connect(HOST_NATS)
print("Connected to nats")
app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
app.state.nc: NATS = NATS()
return app
app = create_app()
# =======================================================================
# MOCKS & TESTS
# =======================================================================
class NatsServiceMock:
def __init__(self) -> None:
self.nc: NATS = NATS()
async def lifespan(self) -> None:
await self.nc.connect(HOST_NATS)
await self.nc.subscribe("subject1", cb=self.handle)
async def handle(self, msg):
await self.nc.publish(msg.reply, b"PONG")
def __enter__(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.lifespan())
return self
def __exit__(self, *args) -> None:
pass
@pytest.fixture(scope="session")
def nats_service() -> None:
with NatsServiceMock() as nc:
yield nc
@pytest.fixture(scope="session")
def test_app(nats_service) -> None:
with TestClient(create_app()) as client:
yield client
# =======================================================================
# TESTS
# =======================================================================
def test_index_should_give_a_succesful_response(test_app):
r = test_app.get("/")
assert r.status_code == 200
assert r.text == "PONG"
Run:
pytest <file>
The test to work.
Test does not work.
output running with starlette 0.15.0:
try:
# wait until the future completes or the timeout
try:
> await waiter
E RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/from_thread.py:177> cb=[TaskGroup._spawn.<locals>.task_done() at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:622]> got Future <Future pending> attached to a different loop
/usr/lib/python3.8/asyncio/tasks.py:481: RuntimeError
output when running with starlette 0.16.0:
# Wait for the response or give up on timeout.
try:
msg = await asyncio.wait_for(future, timeout, loop=self._loop)
return msg
except asyncio.TimeoutError:
del self._resp_map[token.decode()]
future.cancel()
> raise ErrTimeout
E nats.aio.errors.ErrTimeout: nats: Timeout
/home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/nats/aio/client.py:945: ErrTimeout
Pay now to fund the work behind this issue.
Get updates on progress being made.
Maintainer is rewarded once the issue is completed.
You're funding impactful open source efforts
You want to contribute to this effort
You want to get funding like this too