NOUKAI

Flow

Flow execution proxy with execute, streaming, and run methods.

Flow (Sync)

Proxy object for executing a single flow synchronously.

Methods

execute(*, message, ...) -> ExecuteResult | PausedResult

Execute the flow to completion and return the result.

Parameters:

  • message (str, optional): User message — the flow's primary input. Can be omitted if parameters contains all required inputs.
  • parameters (dict, optional): Flat dict of flow-level initial inputs (not nested by block). Default: {}
  • block_overrides (dict, optional): Per-step config overrides, format {"step_id": {"field": value}}. Default: {}
  • attachments (list, optional): Up to 10 media attachments (HTTPS URLs). Default: []
  • tools (list, optional): OpenAI-format tool definitions. Max 64. Default: []
  • tool_choice (str or dict, optional): "auto" | "none" | "required" | {"type": "function", "function": {"name": ...}}. Default: None
  • tool_handler (callable, optional): Sync function (tool_calls) -> tool_results for auto-resume. Async handlers raise TypeError. Default: None
  • max_tool_rounds (int, optional): Safety bound on tool-handler loop. Defaults to 10. Raises ToolCallLimitError if exceeded.
  • trace (bool, optional): Capture full input/output snapshots in trace. Timing/tokens recorded either way. Default: False
  • version (str or int, optional): "draft" (default), "production", or a published version number (positive int). Default: "draft"
  • timeout (float, optional): Request timeout in seconds. Default: None

Returns:

  • ExecuteResult on successful completion
  • PausedResult if the flow pauses for tool calls and no tool_handler is provided

Raises:

  • FlowNotFoundError: Flow does not exist
  • InsufficientCreditsError: Account balance insufficient
  • FlowExecutionError: Server-side execution failure
  • ToolCallLimitError: max_tool_rounds exhausted
  • ValueError: Invalid version value

Examples:

Basic execution:

result = flow.execute(message="The cat sat on the mat.")
print(result.output)

With parameters and version:

result = flow.execute(
    message="Grade this essay",
    parameters={"target_grade": "college", "locale": "en-US"},
    version="production"
)
print(result.status)  # "completed" | "failed"

With tools and auto-resume:

def my_tool_handler(tool_calls):
    # Process tool_calls and return results
    return [{"tool_call_id": "...", "content": "..."}]
 
result = flow.execute(
    message="hello",
    tools=[...],
    tool_handler=my_tool_handler
)

steps(*, message, ...) -> Iterator[StepCompleted]

Stream completed steps as they finish. Sync Flow cannot iterate. Use AsyncFlow.steps() instead.

See AsyncFlow.steps() for details and examples.

events(*, message, ...) -> Iterator[StreamEvent]

Stream all execution events. Sync Flow cannot iterate. Use AsyncFlow.events() instead.

See AsyncFlow.events() for details and examples.

run(execution_id) -> Run

Get a proxy to inspect a previous execution.

Parameters:

  • execution_id (str): Execution ID from a previous execute() result

Returns: Run proxy for tracing and inspection

Example:

result = flow.execute(message="hello")
trace = flow.run(result.execution_id).trace()
total_cost = sum(float(s.cost_usd) for s in trace.steps if s.cost_usd)
print(f"Total cost: ${total_cost:.6f}")

AsyncFlow

Async equivalent of Flow. All methods are awaitable.

Methods

async execute(...) -> ExecuteResult | PausedResult

Async version of Flow.execute(). Accepts the same parameters; all are awaitable.

Example:

result = await flow.execute(message="hello")
print(result.output)

async for step in steps(*, message, ...)

Stream completed steps as they finish.

Yields: StepCompleted events

Example:

async for step in flow.steps(message="hello"):
    print(f"{step.step_id}: {step.duration_ms}ms")

async for event in events(*, message, ...)

Stream all execution events from the flow.

Yields: StreamEvent — discriminated union of RunStarted, StepStarted, StepInput, StepOutput, StepCompleted, StepFailed, StepPaused, ToolCallsRequired, FlowCompleted

Example:

async for event in flow.events(message="hello"):
    if event.event_type == "step_completed":
        print(f"Step {event.step_id} took {event.duration_ms}ms")
    elif event.event_type == "flow_completed":
        print(f"Result: {event.result}")

async def run(execution_id) -> AsyncRun

Get an async proxy to inspect a previous execution.

Example:

trace = await flow.run(result.execution_id).trace()

async def execute_async(*, message, ...) -> Job

Submit the flow to the server queue for asynchronous execution (long-running flows). Returns immediately with a Job handle.

Accepts same parameters as execute() except tool_handler, max_tool_rounds, and attachments are not supported.

Returns: Job with execution_id available immediately

Example:

job = await flow.execute_async(message="Long-running input")
# job.execution_id available immediately; flow runs server-side
 
status = await job.wait(timeout=300)  # block until terminal
print(status.status)  # "completed" | "failed"

Streaming Parameters

Both steps() and events() accept the same parameters as execute():

async for event in flow.events(
    message="input text",
    parameters={"locale": "en-US"},
    block_overrides={"essay_grader": {"rubric": "college"}},
    tools=[...],
    tool_handler=handler,
    version="draft"
):
    pass

Version Values

The version parameter accepts:

ValueBehavior
"draft" (default)Execute the in-progress draft version
"production"Execute the currently promoted production version
N (int, e.g., 3)Execute a specific published version number

Strings other than "draft" and "production" (e.g., "main", "dev") raise ValueError.

"production" is currently not implemented server-side and raises NotImplementedError at runtime. Use "draft" or a pinned version number for now.

Examples:

await flow.execute(message="test", version="draft")        # In-progress draft
await flow.execute(message="test", version=5)              # Version 5 (if published)
# await flow.execute(message="test", version="main")       # ERROR: ValueError