Skip to content

Runtime Stream

Applies to: 4.0.8.1+

runtime_stream is a side-channel output in TriggerFlow:

  • independent from final result
  • emits progress, token deltas, and stage states in real time

Great for SSE, WebSocket, CLI progress, and operational dashboards.

1. Mental model

2. Basic pattern: emit + stop

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def stream_steps(data: TriggerFlowEventData):
    for i in range(3):
        await data.async_put_into_stream({"step": i + 1, "status": "working"})
        await asyncio.sleep(0.05)
    await data.async_stop_stream()
    return "done"

flow.to(stream_steps).end()

for event in flow.get_runtime_stream("start", timeout=None):
    print("[stream]", event)

3. Key APIs

Common APIs in production:

  • data.put_into_stream(...) / await data.async_put_into_stream(...)
  • data.stop_stream() / await data.async_stop_stream()
  • flow.get_runtime_stream(...)
  • flow.get_async_runtime_stream(...)

4. Streaming and final result together

Common event contract:

  • emit multiple delta events
  • emit one final event
  • call stop_stream

This maps naturally to FastAPIHelper SSE/WS endpoints.

5. Sync generator bridge (new)

v4.0.8.1 adds FunctionShifter.asyncify_sync_generator() for safely bridging sync generators into async pipelines.

Why it matters:

  • reuse legacy sync streaming code in async runtime stream/HTTP streaming
  • reduce migration effort during async adoption

6. Common issues

6.1 Consumer blocks forever

Most likely stop_stream() is missing. Ensure all branches (including error branches) terminate stream.

6.2 Stream emits but final result is empty

runtime_stream and result are independent. Return final business result explicitly in handlers.

6.3 Event order under high concurrency

In fan-out scenarios, include source/chunk_id/seq in stream events for deterministic client-side ordering.