NOUKAI

Tracing & Debugging

Inspect execution traces with timing, tokens, and cost breakdowns.

After a flow executes, retrieve detailed execution traces with step-by-step timing, token usage, and cost breakdowns.

Get a trace

After execution, fetch the trace using the execution ID. A Trace contains a flowRun summary and a list of steps. Step totals (cost, duration) are aggregated client-side from the steps array — there is no total_cost_usd on the trace object itself.

from noukai_sdk import AsyncNoukai
 
async def main():
    async with AsyncNoukai(org="acme", project="spelling") as client:
        flow = client.flow("grade-3")
        result = await flow.execute(message="The cat sat on the mat.", trace=True)
 
        # Fetch the trace
        trace = await flow.run(result.execution_id).trace()
 
        # Run-level summary
        print(f"Flow run: {trace.flow_run.id}")
        print(f"Status: {trace.flow_run.status}")
        print(f"Run duration: {trace.flow_run.duration_ms}ms")
        print(f"Step count: {trace.flow_run.step_count}")
 
        # Aggregate cost across steps (cost_usd is a decimal string, may be None)
        total_cost = sum(
            float(step.cost_usd) for step in trace.steps if step.cost_usd
        )
        print(f"Total cost: ${total_cost:.6f}")
 
import asyncio
asyncio.run(main())

The Trace object exposes:

  • flow_run / flowRun: A RunSummary (id, flow_id, status, trigger_type, started_at, completed_at, duration_ms, step_count)
  • steps: A list of StepTrace objects (latest attempt per step)

Pass trace=True (Python) / trace: true (Node) on execute to ensure the server captures input/output snapshots; without it, traces still include timing, tokens and cost.

Inspect steps

Iterate over the per-step traces. Each StepTrace is identified by step_id (there is no name field on StepTracename only appears on streaming StepCompleted events).

async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/spelling/grade-3")
        result = await flow.execute(message="The cat sat on the mat.")
        trace = await flow.run(result.execution_id).trace()
 
        for step in trace.steps:
            print(f"Step: {step.step_id} (attempt {step.attempt})")
            print(f"  Status: {step.status}")
            print(f"  Duration: {step.duration_ms}ms")
            if step.cost_usd:
                print(f"  Cost: ${float(step.cost_usd):.6f}")
            if step.model_used:
                print(f"  Model: {step.model_used}")
            if step.tokens:
                print(f"  Prompt tokens: {step.tokens.prompt}")
                print(f"  Completion tokens: {step.tokens.completion}")
                print(f"  Total tokens: {step.tokens.total}")
 
import asyncio
asyncio.run(main())

Each StepTrace includes:

  • step_id / stepId: Unique step identifier within the flow
  • attempt: Attempt number (1 = first try; >1 if the step was retried server-side)
  • loop_index / loopIndex: Loop iteration index for steps inside a loop (otherwise null)
  • status: "running" | "completed" | "failed" | "skipped"
  • started_at / completed_at (ISO timestamps) and duration_ms / durationMs
  • model_used / modelUsed: LLM model identifier when the step ran an LLM block
  • tokens: TokenBreakdown with prompt, completion, total (all integers)
  • cost_usd / costUsd: Estimated cost as a decimal string (e.g. "0.001234"). Parse with float(...) / parseFloat(...) before formatting. The string form preserves full precision and avoids float drift.
  • input_context / output_context / error_context: Captured snapshots (only when trace=True on execution, or for failed steps)
  • input_size_bytes, output_size_bytes, truncated: Snapshot metadata

Inspect token usage

The TokenBreakdown is a flat three-field model — it mirrors what the server records per LLM call (no separate cache fields at the SDK level; cache accounting is rolled into prompt server-side).

async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/spelling/grade-3")
        result = await flow.execute(message="...", trace=True)
        trace = await flow.run(result.execution_id).trace()
 
        for step in trace.steps:
            if step.tokens:
                t = step.tokens
                print(f"{step.step_id}: prompt={t.prompt} completion={t.completion} total={t.total}")
 
import asyncio
asyncio.run(main())

Track retries across attempts

Trace.steps only contains the latest attempt per step. To inspect every retry for a given step, call step_trace(step_id, attempt="all") — it returns a StepAttempts collection.

async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/spelling/grade-3")
        result = await flow.execute(message="...")
        run = flow.run(result.execution_id)
        trace = await run.trace()
 
        for step in trace.steps:
            if step.attempt > 1:
                # Server retried this step; fetch every attempt
                history = await run.step_trace(step.step_id, attempt="all")
                print(f"{step.step_id} was retried {len(history.attempts) - 1} times")
                for a in history.attempts:
                    print(f"  Attempt {a.attempt}: {a.status} in {a.duration_ms}ms")
                    if a.error_context:
                        print(f"    Error: {a.error_context}")
 
import asyncio
asyncio.run(main())

StepAttempts exposes:

  • step_id / stepId: The step being inspected
  • attempts: A list of StepTrace objects — one per attempt, ordered by attempt number

Live tracing

For long-running flows, live_trace() (Python) / liveTrace() (Node) replays trace history from the database then live-tails new events from the server. It yields typed StreamEvent objects — the same event union you'd get from flow.events() (RunStarted, StepStarted, StepCompleted, FlowCompleted, …), not full Trace snapshots.

from noukai_sdk import AsyncNoukai
 
async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/spelling/grade-3")
 
        # Submit asynchronously so we can tail without blocking
        job = await flow.execute_async(message="The cat sat on the mat.")
 
        # Replay history then live-tail until the run terminates
        async for event in flow.run(job.execution_id).live_trace():
            if event.event_type == "step_completed":
                print(f"step_completed: {event.step_id} ({event.duration_ms}ms)")
            elif event.event_type == "flow_completed":
                print(f"flow_completed: {event.result}")
 
import asyncio
asyncio.run(main())

The iterator terminates when the server closes the connection after the run reaches a terminal state. If you need a full Trace snapshot after streaming, call run.trace() once live_trace() returns.

Compare runs

To benchmark cost or latency between executions, aggregate over trace.steps:

def total_cost(trace) -> float:
    return sum(float(s.cost_usd) for s in trace.steps if s.cost_usd)
 
async def main():
    async with AsyncNoukai() as client:
        flow = client.flow("acme/spelling/grade-3")
 
        res_a = await flow.execute(message="Short text")
        res_b = await flow.execute(message="Very long essay...")
 
        trace_a = await flow.run(res_a.execution_id).trace()
        trace_b = await flow.run(res_b.execution_id).trace()
 
        print(f"Short: {trace_a.flow_run.duration_ms}ms ${total_cost(trace_a):.6f}")
        print(f"Long:  {trace_b.flow_run.duration_ms}ms ${total_cost(trace_b):.6f}")
 
import asyncio
asyncio.run(main())

Export traces

Traces are fully serializable to JSON:

import json
 
trace_dict = trace.model_dump(by_alias=True)
with open("trace.json", "w") as f:
    json.dump(trace_dict, f, indent=2)

Use this to archive execution history or feed into monitoring systems.

Next steps

On this page