I'm working in a codebase where we use DataLoaders quite heavily with an SQLAlchemy-based database client. Often, we'd like to await two or more load()
futures. We use FastAPI and rely on it's dependency injection to handle SQLAlchemy session, forcibly closing it at the end of a request. Unfortunately, currently we're hitting a problem where using asyncio.gather()
or even asyncio.TaskGroup()
results in hard to debug errors, whereby dataloader task continues to run even if the load()
future is cancelled.
with AsyncSession(db_engine) as session:
async def user_loader_fn(ids: List[int]) -> List[User]:
users = await session.select(User).where(User.id.in_(ids)).all()
return reorder_by_ids(users, ids)
user_dataloader = DataLoader(user_loader_fn)
async def load_user(id):
return await user_dataloader.load(id)
async def load_post():
raise ValueError("Post not found")
with asyncio.TaskGroup as tg:
t1 = tg.create_task(load_post(1)) # this will raise before t2 is done or even properly started, leading to cancellation of t2
t2 = tg.create_task(load_user(2))
# the error from `load_post` will bubble up, closing the session, but user_dataloader will still try to use the session somewhere in the background
To summarise, there's no way to cancel or wait for the underlying dataloader task, making it impossible to cleanly close the resources used by the dataloader.
I've got a branch (diff) with a test case that shows this problem.
I've also got a very speculative branch that addresses this issue and improves general handling of task cancellations in the DataLoader. It's not 100% done yet. I'll try to finish it & open a PR in the next few days.
A little bit like #3414, this issue has to do with keeping track of tasks/futures and a proper resource cleanup.
Pay now to fund the work behind this issue.
Get updates on progress being made.
Maintainer is rewarded once the issue is completed.
You're funding impactful open source efforts
You want to contribute to this effort
You want to get funding like this too