NOUKAI

Run

Inspect execution traces and results.

Run (Sync)

Proxy for inspecting a specific flow execution by ID. No network call is made at construction; methods are lazy.

Methods

trace(*, timeout=None) -> Trace

Fetch the complete execution trace with the latest attempt per step.

Parameters:

  • timeout (float, optional): Request timeout in seconds. Default: None

Returns: Trace with flow_run (RunSummary) and steps (list of StepTrace)

Raises:

  • FlowNotFoundError: Execution ID not found
  • APITimeoutError: Request timed out
  • APIConnectionError: Network error

Example:

trace = flow.run(result.execution_id).trace()
print(f"Run: {trace.flow_run.id}")
print(f"Duration: {trace.flow_run.duration_ms}ms")
 
# Aggregate cost across steps (cost_usd is a decimal string)
total_cost = sum(float(s.cost_usd) for s in trace.steps if s.cost_usd)
print(f"Total cost: ${total_cost:.6f}")

step_trace(step_id, *, attempt="latest", loop_index=None, timeout=None) -> StepTrace | StepAttempts

Fetch per-step trace, optionally across multiple attempts.

Parameters:

  • step_id (str): The step identifier to fetch
  • attempt (str or int, optional): "latest" (default), "all", or attempt number N. Default: "latest"
  • loop_index (int, optional): Loop iteration index for steps inside a loop. Default: None
  • timeout (float, optional): Request timeout in seconds. Default: None

Returns:

  • StepTrace when attempt="latest" (default) or attempt=N
  • StepAttempts when attempt="all"

Raises:

  • FlowNotFoundError: Execution or step not found
  • APITimeoutError: Request timed out

Example:

# Latest attempt only
step = flow.run(result.execution_id).step_trace("my_step")
print(f"Step {step.step_id}: {step.status}")
 
# All attempts
history = flow.run(result.execution_id).step_trace("my_step", attempt="all")
for attempt in history.attempts:
    print(f"  Attempt {attempt.attempt}: {attempt.status}")

live_trace() -> Generator[StreamEvent, None, None]

Stream live trace events, replaying from database then live-tailing until the run terminates.

Yields: StreamEvent union — typed events like RunStarted, StepStarted, StepCompleted, FlowCompleted, etc. (not full Trace snapshots)

Example:

for event in flow.run(result.execution_id).live_trace():
    if event.event_type == "step_completed":
        print(f"Step {event.step_id}: {event.duration_ms}ms")
    elif event.event_type == "flow_completed":
        print(f"Done: {event.result}")

AsyncRun

Async equivalent of Run. All methods are awaitable/async-iterable.

Methods

async trace(*, timeout=None) -> Trace

Async version of Run.trace().

Example:

trace = await flow.run(result.execution_id).trace()

async step_trace(step_id, *, attempt="latest", loop_index=None, timeout=None) -> StepTrace | StepAttempts

Async version of Run.step_trace().

Example:

step = await flow.run(result.execution_id).step_trace("my_step")

async for event in live_trace()

Stream live events asynchronously.

Yields: StreamEvent objects

Example:

async for event in flow.run(result.execution_id).live_trace():
    if event.event_type == "step_completed":
        print(f"Step done: {event.step_id}")

Trace (Type)

Complete execution trace with run summary and per-step details. Cost/duration totals are aggregated client-side from steps — the Trace object itself has no total_cost_usd or total_duration_ms field.

Attributes:

  • flow_run (RunSummary): Run-level metadata
  • steps (list[StepTrace]): List of step traces (latest attempt each)

Example:

trace = await flow.run(execution_id).trace()
 
# Run summary
print(f"Flow: {trace.flow_run.flow_id}")
print(f"Status: {trace.flow_run.status}")
print(f"Duration: {trace.flow_run.duration_ms}ms")
print(f"Steps: {trace.flow_run.step_count}")
 
# Aggregate totals
total_cost = sum(float(s.cost_usd) for s in trace.steps if s.cost_usd)
total_duration = sum(s.duration_ms or 0 for s in trace.steps)
print(f"Total: ${total_cost:.6f}, {total_duration}ms")

RunSummary (Type)

High-level metadata about a flow execution.

Attributes:

  • id (str): Unique execution ID
  • flow_id (str): Flow ID this execution ran against
  • status (str): Execution status (e.g., "completed", "failed")
  • trigger_type (str or None): How the flow was triggered (e.g., "api", "job")
  • started_at (str or None): ISO timestamp when execution started
  • completed_at (str or None): ISO timestamp when execution completed
  • duration_ms (int or None): Total execution time in milliseconds
  • step_count (int or None): Number of steps in the flow

StepTrace (Type)

Execution metrics for a single step (one attempt).

Attributes:

  • step_id — Unique step identifier (no separate name field on StepTrace)
  • attempt — Attempt number (1 = first try, >1 if server retried this step)
  • loop_index — Iteration index if the step is inside a loop (optional)
  • status — One of: "running", "completed", "failed", "skipped"
  • started_at — ISO timestamp when the step started (optional)
  • completed_at — ISO timestamp when the step completed (optional)
  • duration_ms — Step execution time in milliseconds (optional)
  • model_used — LLM model identifier if the step ran an LLM block (optional)
  • tokens — Token usage object with prompt, completion, total counts (optional)
  • cost_usd — Estimated cost as a decimal string (e.g., "0.001234"); parse with float() (optional)
  • input_context — Captured input snapshot (optional; only with trace=True or for failed steps)
  • output_context — Captured output snapshot (optional)
  • error_context — Error details if the step failed (optional)
  • input_size_bytes — Size of input snapshot (optional)
  • output_size_bytes — Size of output snapshot (optional)
  • truncated — Whether the snapshot was truncated due to size

Example:

for step in trace.steps:
    print(f"Step {step.step_id}:")
    print(f"  Attempt: {step.attempt}")
    print(f"  Status: {step.status}")
    print(f"  Duration: {step.duration_ms}ms")
    if step.model_used:
        print(f"  Model: {step.model_used}")
    if step.cost_usd:
        print(f"  Cost: ${float(step.cost_usd):.6f}")
    if step.tokens:
        print(f"  Tokens: {step.tokens.prompt} prompt + {step.tokens.completion} completion")

TokenBreakdown (Type)

Token usage for a single LLM call. Flat three-field model — no cache fields at the SDK level (cache accounting is rolled into prompt server-side).

Attributes:

  • prompt (int): Input tokens sent to the model
  • completion (int): Output tokens returned by the model
  • total (int): Sum of prompt and completion

Example:

if step.tokens:
    t = step.tokens
    print(f"Tokens: {t.prompt} prompt, {t.completion} completion, {t.total} total")

StepAttempts (Type)

Collection of all attempts for a single step (returned when attempt="all").

Attributes:

  • step_id (str): The step being inspected
  • attempts (list[StepTrace]): List of StepTrace objects, one per attempt, ordered by attempt number

Example:

history = await flow.run(execution_id).step_trace("my_step", attempt="all")
print(f"Step {history.step_id} had {len(history.attempts)} attempts:")
for attempt in history.attempts:
    print(f"  Attempt {attempt.attempt}: {attempt.status} ({attempt.duration_ms}ms)")
    if attempt.error_context:
        print(f"    Error: {attempt.error_context}")