Creating a WebSocket chat in just 30 lines with Litestar
litestar-org
With the beta release of Litestar 2.0 just around the corner, here’s a little sneak peek of some of its exciting new features, in particular its extended WebSocket support and the newly introduced channels module.
The combination of these two makes for some nice and easy patterns, providing powerful features, for example, WebSocket broadcasting, supporting amongst other things inter-process communication and a configurable history.
Let’s leave the background information aside for now and dive straight into the code, so you can see what this is capable of.
The app
First, you’ll need to install the latest development version of Litestar, and a server to run it. You can do so by running
pip install litestar==2.0.0alpha7 uvicorn[standard]
The next step is to create the application and save it as app.py
:
from contextlib import asynccontextmanager
from typing import AsyncContextManager
from litestar import Litestar, WebSocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
from litestar.exceptions import WebSocketDisconnect
from litestar.handlers import websocket_listener
from litestar.static_files import StaticFilesConfig
@asynccontextmanager
async def chat_room_lifespan(socket: WebSocket, channels: ChannelsPlugin) -> AsyncContextManager[None]:
async with channels.start_subscription("chat", history=10) as subscriber:
try:
async with subscriber.run_in_background(socket.send_data):
yield
except WebSocketDisconnect:
return
@websocket_listener("/ws", connection_lifespan=chat_room_lifespan)
def chat_handler(data: str, channels: ChannelsPlugin) -> None:
channels.publish(data, channels=["chat"])
app = Litestar(
route_handlers=[chat_handler],
plugins=[ChannelsPlugin(channels=["chat"], backend=MemoryChannelsBackend(history=10))],
static_files_config=[StaticFilesConfig(directories=["static"], path="/", html_mode=True)],
)
As you can see, this is quite compact and comes in at just 30 lines (20 without the imports).
And the “frontend” part of our demo app, saved in an index.html
file:
<div id="chat"></div>
<input id="input" placeholder="Enter message">
<script>
const chat = document.getElementById("chat")
const input = document.getElementById("input")
const ws = new WebSocket("ws://127.0.0.1:8000/ws")
ws.onmessage = event => {
const messageEl = document.createElement("div")
messageEl.textContent = event.data
chat.appendChild(messageEl)
}
document.addEventListener("keydown", event => {
if (event.key === "Enter") {
ws.send(input.value)
input.value = ""
}
})
</script>
This can now be served using uvicorn by running uvicorn app:app
, making the app available at http://127.0.0.1:8000. You can now visit this address in your browser and send and receive messages. If you connect multiple clients, you’ll see that each of them will receive all messages, as well as the last 10 messages sent when they first connect.
How does this work?
@websocket_listener("/ws", connection_lifespan=chat_room_lifespan)
def chat_handler(data: str, channels: ChannelsPlugin) -> None:
channels.publish(data, channels=["chat"])
The websocket_listener
decorator crates a WebSocket handler, which will accept the connection, which it will then keep open, listen for incoming messages, and invoke the chat_handler
function every time the socket receives a message, passing the messaged received to the data
parameter.
Via dependency injection, we can receive an instance of the ChannelsPlugin
, which provides itself as a dependency once it’s registered. We then use the synchronous publish
method, to publish the data received via the socket to the channel chat
.
connection_lifespan
accepts a context manager, which will be entered once a socket connects, and is what we’re using to send the data that’s being published to the chat
channel.
@asynccontextmanager
async def chat_room_lifespan(socket: WebSocket, channels: ChannelsPlugin):
async with channels.start_subscription("chat", history=10) as subscriber:
try:
async with subscriber.run_in_background(socket.send_data):
yield
except WebSocketDisconnect:
return
Using dependency injection again, we can receive the WebSocket
connection, and the ChannelsPlugin
. The start_subscription
async context-manager is used to subscribe to one or more channels, and manages these subscriptions for us. We pass "chat"
as the first argument, defining the channels we want to subscribe to and history=10
, which will cause a new subscriber to receive the last 10 messages from the stream as soon as it starts the subscription.
When we exit this context manager, it will unsubscribe from the channels again, cleaning up resources as necessary. In our case, we also catch the WebSocketDisconnet
exception, which will be thrown by our socket.send_data
callback in the background worker after a client has disconnected, to handle disconnects more gracefully.
It returns a Subscriber
object, which can then be used to publish the messages received via the stream. subscriber.run_in_background(socket.send_data)
creates another context manager, which will start an asynchronous background task, iterating over all the channels previously subscribed by this subscriber, and invoking the callback provided, in this case socket.send_data
.
For a more in-depth explanation, check out these articles in the Litestar documentation:
Inter-process communication
In this example, the MemoryChannelsBackend
is used, which is good for testing or applications that are only every going to run on a single process, but most web applications will run multiple worker processes, or even on multiple independent machines.
For this, two Redis-based backends are provided: RedisChannelsPubSubBackend
and RedisChannelsStreamBackend
, making use of Redis’ PubSub and Streams respectively.
In the future, other backends such as RabbitMQ or Kafka might be added as well.