NOUKAI

Step-by-step Iteration

Stream flow execution and iterate over steps and events.

The SDKs provide iterators that drive a flow's execution one step at a time and surface typed events as they arrive.

IteratorYieldsUse when
flow.steps()One StepCompleted per finished stepYou only care about step outputs
flow.events()Every typed StreamEventYou want fine-grained progress UI or to drive tool calls manually

Both manage the execution ID and step-protocol cursor automatically — you just iterate. Both are available on the async (AsyncNoukaiAsyncFlow) and sync (NoukaiFlow) clients with the same surface.

Simple step iteration

Use flow.steps() to receive one event per completed step:

from noukai_sdk import AsyncNoukai
 
async def main():
    async with AsyncNoukai(org="acme", project="spelling") as client:
        flow = client.flow("grade-3")
        async for step in flow.steps(message="The cat sat on the mat."):
            print(f"{step.step_id}: {step.output}")
 
import asyncio
asyncio.run(main())

Sync equivalent (no asyncio required):

from noukai_sdk import Noukai
 
with Noukai(org="acme", project="spelling") as client:
    flow = client.flow("grade-3")
    for step in flow.steps(message="The cat sat on the mat."):
        print(f"{step.step_id}: {step.output}")

Each iteration yields a StepCompleted event with:

  • step_id / stepId — step identifier
  • name — optional human-readable name (may be None / undefined)
  • output — the step's output (string, dict, or whatever the step produced)
  • duration_ms / durationMs — how long the step took
  • tokens{prompt, completion, total} if the step ran an LLM
  • cost_usd / costUsd — estimated cost as a decimal string ("0.001234")

flow.steps() filters the underlying event stream down to only StepCompleted. For more granular events, use flow.events().

Full event streaming

Use flow.events() to see every typed event and react to progress:

from noukai_sdk import (
    AsyncNoukai,
    StepStarted, StepOutput, StepCompleted, FlowCompleted,
)
 
async def main():
    async with AsyncNoukai(org="acme", project="spelling") as client:
        flow = client.flow("grade-3")
        async for event in flow.events(message="The cat sat on the mat."):
            if isinstance(event, StepStarted):
                print(f"▶ {event.step_id}")
            elif isinstance(event, StepOutput):
                # Partial output chunk
                print(event.output_data, end="", flush=True)
            elif isinstance(event, StepCompleted):
                print(f"\n{event.step_id} ({event.duration_ms}ms)")
            elif isinstance(event, FlowCompleted):
                print(f"Done: {event.result}")
 
import asyncio
asyncio.run(main())

In Python you can branch on event class with isinstance. In Node, the wire field eventType is the safe discriminator — narrowing by class isn't reliable because event payloads cross the wire as plain JSON.

Event types

The StreamEvent union covers:

  • RunStarted — flow execution started; carries run_id, execution_id, step_count
  • StepStarted — a step began; carries step_id, step_index
  • StepInput — input to the step (step_id, input_data)
  • StepOutput — streaming output chunk (step_id, output_data)
  • StepCompleted — step finished (step_id, output, duration_ms, tokens, cost_usd)
  • StepFailed — step errored (step_id, error)
  • StepPaused — step-protocol pause between steps (the SDK auto-advances)
  • ToolCallsRequired — flow paused awaiting tool results (see below)
  • FlowCompleted — the flow finished (result, summary)

Tool calls in iterators

If your flow uses tools and you don't pass a tool_handler, the iterator yields ToolCallsRequired when the flow pauses. The event has an attached resume(tool_results=...) coroutine — await it from inside the loop, then the iterator continues.

Manual resume() is async only. On the sync client, pass tool_handler= instead (see the next section); the sync iterator does not surface ToolCallsRequired for manual handling.

from noukai_sdk import AsyncNoukai, ToolCallsRequired
 
async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/research/web-summarizer")
 
        async for event in flow.events(
            message="Summarize TLS 1.3",
            tools=[
                {
                    "type": "function",
                    "function": {
                        "name": "search",
                        "description": "Search the web",
                        "parameters": {"type": "object", "properties": {}},
                    },
                },
            ],
        ):
            if isinstance(event, ToolCallsRequired):
                results = [
                    {
                        "role": "tool",
                        "tool_call_id": tc["id"],
                        "content": "Search result...",
                    }
                    for tc in event.tool_calls
                ]
                # Stage tool results; the next iterator step picks them up.
                await event.resume(tool_results=results)
 
import asyncio
asyncio.run(main())

Automatic tool handling

Pass tool_handler and the iterator handles tool calls transparently — ToolCallsRequired events are consumed inside the loop and never surface to the caller. Works on both sync and async clients (sync rejects async handlers at call time with TypeError).

def my_tool_handler(tool_calls):
    return [
        {
            "role": "tool",
            "tool_call_id": tc["id"],
            "content": "...",
        }
        for tc in tool_calls
    ]
 
# Async client
async for step in flow.steps(
    message="...",
    tools=[...],
    tool_handler=my_tool_handler,
    max_tool_rounds=5,
):
    print(step.step_id)
 
# Sync client — same handler, same arguments
for step in sync_flow.steps(
    message="...",
    tools=[...],
    tool_handler=my_tool_handler,
):
    print(step.step_id)

max_tool_rounds (default 10) is a safety bound on the auto-resume loop. Exceeding it raises ToolCallLimitError.

run_remaining — server-side step pacing

By default, the server pauses between steps and the SDK advances the cursor on each iteration (one /step HTTP call per iteration). Pass run_remaining=True and the server runs all remaining steps without pausing — the SDK still sees per-step events on a single SSE stream:

async for event in flow.events(message="hi", run_remaining=True):
    print(event)

Use run_remaining=True when you want streaming output but don't need to interject between steps. Use the default (False) when you want to inspect intermediate outputs and potentially abandon the run early. The iterator terminates on FlowCompleted, StepFailed, or when the SSE stream closes.

flow.steps() does not accept run_remaining — it's only meaningful when you can see the full event stream.

Next steps

On this page