Skip to main content
This guide explains how to add a Server-Sent Events (SSE) endpoint to the Prowler API. SSE lets the backend push a one-way stream of events to a client over a single long-lived HTTP connection — ideal for live progress, token-by-token LLM output, or any “the server has news for you” use case where the client should not poll.
The platform ships the SSE infrastructure (api.sse) and wiring. No feature endpoint streams over SSE out of the box — this guide shows how to build one on top of the shared base.

When to use SSE

NeedUse
Server pushes incremental updates, client only readsSSE
Bidirectional, low-latency messaging (chat both ways, games)WebSocket
Client asks, server answers oncePlain REST
SSE is the right tool when the client only consumes: scan progress, long-running job checkpoints, streamed LLM tokens, cross-client resource-sync notifications. It rides on plain HTTP, reconnects automatically in the browser via the native EventSource API, and needs no extra protocol.

How it works

SSE is wired through django-eventstream and a small platform layer in api/src/backend/api/sse/:
PieceFileResponsibility
BaseSSEViewSetapi/sse/base_views.pyBase DRF viewset a feature subclasses. The feature implements get_channels; the base handles auth, the tenant transaction, and delegates streaming to django-eventstream.
SSEChannelManagerapi/sse/channelmanager.pyRegistered in settings.EVENTSTREAM_CHANNELMANAGER_CLASS. Reads the channel set off the request and enforces the platform-wide tenant gate.
SSEAuthenticationapi/authentication.pySame JWT/API-key stack as the rest of the API, plus an ?access_token=<jwt> fallback for browser EventSource clients. Lives with the other authentication classes, not in the sse package.
make_channel_name / tenant_id_from_channelapi/sse/utils.pySingle source of truth for the channel-name format, so publishers and the channel manager agree byte-for-byte.
Settingsconfig/settings/eventstream.pyValkey Pub/Sub backend (dedicated DB), channel manager, allowed headers.

Transport: the server runs on ASGI

SSE connections are long-lived. Holding one open per synchronous worker would exhaust the worker pool, so the API runs under Gunicorn’s native asgi worker (config.asgi:application). Streams are parked on the event loop while ordinary CRUD endpoints keep their synchronous execution (Django runs sync views in a thread-sensitive executor under ASGI). This is configured in config/guniconf.py and used by both the dev and production entrypoints — no separate server process is needed.

The data flow

publisher (Celery task / view)             subscriber (browser, CLI)
        │                                          │
        │  send_event(channel, "scan.progress", …) │  GET …/event-stream
        ▼                                          ▼
   Valkey Pub/Sub  ◄────────────────────►  BaseSSEViewSet.list
   (EVENTSTREAM_VALKEY_DB)                  → get_channels() (RLS-scoped)
                                            → SSEChannelManager (tenant gate)
                                            → StreamingHttpResponse (text/event-stream)
A publisher anywhere in the system (most often a Celery task) calls send_event(channel, event_type, payload). django-eventstream fans it out over Valkey Pub/Sub to every connection subscribed to that channel.

Adding an SSE endpoint to your feature

The example below streams progress for a long-running scan. Adapt the resource, prefix, and event names to your feature.
1

Pick a channel prefix

Channels follow the format <prefix>:<tenant_id>:<resource_id>, built only through make_channel_name. The prefix is owned by your feature and may contain hyphens but never colons (the parser splits on :).
CHANNEL_PREFIX = "scan-progress"
The tenant id is baked into every channel name. That is what lets the platform enforce cross-tenant isolation without knowing anything about your feature.
2

Subclass BaseSSEViewSet

Create the viewset for the SSE sub-resource. The only required method is get_channels; it runs inside the tenant transaction set up by the base class, so any database lookup inside it is automatically RLS-scoped.
# scans/event_streams.py
from api.sse import BaseSSEViewSet, make_channel_name
from django.shortcuts import get_object_or_404
from scans.models import Scan

CHANNEL_PREFIX = "scan-progress"


class ScanEventStreamViewSet(BaseSSEViewSet):
    def get_queryset(self):
        # RLS already scopes to the tenant; narrow further as needed
        # (e.g. only scans the requesting user may see).
        return Scan.objects.filter(tenant_id=self.request.tenant_id)

    def get_channels(self) -> set[str]:
        scan = get_object_or_404(self.get_queryset(), pk=self.kwargs["scan_pk"])
        return {make_channel_name(CHANNEL_PREFIX, scan.tenant_id, scan.id)}
get_channels must raise the relevant DRF exception (NotFound, PermissionDenied, NotAuthenticated) when authorization fails — get_object_or_404 does this for you. Returning an empty set surfaces as django-eventstream’s confusing “No channels specified” error instead of the real cause.
3

Wire the URL as a sub-resource

Mount the endpoint as an event-stream sub-resource. Keep it outside the DRF router, which would force the URL into a list/detail convention. Route the get method to the viewset’s list action.
# scans/urls.py
path(
    "scans/<uuid:scan_pk>/event-stream",
    ScanEventStreamViewSet.as_view({"get": "list"}),
    name="scan-event-stream",
),
4

Define your event vocabulary

A feature owns its event types in <app>/<domain>/events.py: one publish_<event> function per event type, each body a single send_event call so the wire-level string lives in exactly one place.
# scans/events.py
from django_eventstream import send_event


def publish_progress(channel: str, checked: int, total: int) -> None:
    send_event(channel, "scan.progress", {"checked": checked, "total": total})


def publish_end(channel: str, scan_id: str) -> None:
    # Terminal event carries the canonical id so reconnecting clients
    # can refetch the persisted resource over REST.
    send_event(channel, "scan.end", {"scan_id": scan_id})


def publish_error(channel: str, code: str, detail: str) -> None:
    send_event(channel, "scan.error", {"code": code, "detail": detail})
There is no platform-side enum, registry, or dispatch table — the naming convention is the contract (see below).
5

Publish from the producer

Wherever the work happens — usually a Celery task — build the channel the same way and publish:
from api.sse import make_channel_name
from scans.events import publish_progress, publish_end

channel = make_channel_name("scan-progress", scan.tenant_id, scan.id)
publish_progress(channel, checked=42, total=100)
...
publish_end(channel, scan_id=str(scan.id))

Event naming convention

Every event uses an event type of the form <resource>.<verb> (lowercased, dot-separated). The verb comes from this platform-wide vocabulary — if you need a verb that is not listed, document the addition in this guide so the catalog stays discoverable.
VerbWhen to use
deltaAn incremental piece of a stream the client concatenates (LLM text tokens, audio chunks). Standard term across OpenAI / Anthropic / LiteLLM / Vercel AI SDK.
startBegin marker for a compound operation (e.g. a tool call whose execution will be reported by a matching end).
endTerminal marker. Carries the canonical resource id so reconnecting clients can refetch persisted state via REST.
progressPeriodic checkpoint with quantifiable completion, e.g. {"checked": 42, "total": 100}.
created / updated / deletedResource-lifecycle events for cross-client sync streams.
errorTerminal failure. Carries a stable code for client switching and a human-readable detail.
Payloads are flat JSON. The wire-level event: field already names the event type, so do not wrap the payload in {"type": ..., "data": ...}. Include the canonical resource UUID on terminal events so reconnecting clients can reconcile via REST.

Authentication

SSE endpoints use the same authentication stack as the rest of the API. Non-browser clients (CLI, programmatic) send the standard Authorization header — JWT or API key. Browser EventSource is the only widely available SSE client API and it cannot set custom headers. For that case only, the endpoint accepts a JWT via the ?access_token=<jwt> query parameter. The header always wins when present — a header is intentional, while a query parameter can leak into referers and logs, so it is consulted only as a fallback.
// Browser
const es = new EventSource(
  `/api/v1/scans/${scanId}/event-stream?access_token=${jwt}`
);
# CLI / programmatic — header, exactly like every other endpoint
curl -N -H "Authorization: Bearer $JWT" \
  https://<host>/api/v1/scans/$SCAN_ID/event-stream

Tenant isolation & security model

Authorization is enforced at two layers:
  1. At connect, get_channels runs under the regular DRF stack inside the tenant transaction (rls_transaction). Resource lookups are RLS-scoped, so a user cannot even resolve a channel for a resource they cannot see. Narrow the queryset further (e.g. created_by=request.user) when a resource is per-user within a tenant.
  2. After connect, SSEChannelManager.can_read_channel re-verifies tenant membership by parsing the tenant id embedded in the channel name. Cross-tenant subscription is rejected even if a URL-level check ever has a bug. A malformed channel name is treated as “not authorized”.
Because the tenant id lives inside the channel name, this gate works for any feature without the platform knowing anything about it.

Reconnect & state recovery

The platform deliberately ships without server-side replay (is_channel_reliable returns False). When a client reconnects, it does not receive missed events. Instead:
  • Terminal events (*.end) carry the canonical resource UUID.
  • On reconnect, the client refetches the authoritative state from the normal REST endpoint using that id.
Design your event payloads accordingly: deltas are ephemeral and concatenated in-flight; the durable truth always lives behind a REST resource.

Local development

  • The dev and production entrypoints both launch Gunicorn with the asgi worker (config.asgi:application). In dev, DJANGO_DEBUG=True enables hot reload; preload_app is automatically disabled under debug so edited code is picked up.
  • SSE uses a dedicated Valkey database (EVENTSTREAM_VALKEY_DB, default 2) kept separate from the Celery broker so a noisy broker cannot crowd out streaming traffic. It reuses the same VALKEY_* connection settings as the rest of the platform.
Env varDefaultPurpose
EVENTSTREAM_VALKEY_DB2Valkey DB index for the SSE Pub/Sub bus
DJANGO_WORKER_CLASSasgiGunicorn worker class
Test the stream end to end with curl -N (disable buffering) and an auth header:
curl -N -H "Authorization: Bearer $JWT" \
  http://localhost:8080/api/v1/scans/$SCAN_ID/event-stream

Testing

The platform basis is covered by api/tests/test_sse.py (channel parsing, the tenant gate, and auth precedence). For a feature endpoint, test:
  • get_channels returns the expected channel for an authorized resource and raises NotFound/PermissionDenied otherwise.
  • Each publish_<event> helper emits the correct event type and flat payload (mock send_event).
  • The producer builds the channel with make_channel_name using the resource’s own tenant_id.