Execution State
The executor maintains a structured execution state that tracks every step's status, timing, output, errors, and retry attempts. The state is updated immutably through deltas and can be observed in real time via the onStateChange callback.
The State Model
interface ExecutionState {
runId: string; // Unique ID for this execution
status: "pending" | "running" | "completed" | "failed";
startedAt: string; // ISO 8601 timestamp
completedAt?: string; // Set when the run finishes
durationMs?: number; // Total wall-clock duration
stepRecords: StepExecutionRecord[]; // One record per step execution
output?: unknown; // Final workflow output
error?: ErrorSnapshot; // Error that caused failure
}The executionState field on ExecutionResult gives you the final snapshot after a run completes:
const result = await executeWorkflow(workflow, { tools });
const { executionState } = result;
console.log(`Run ${executionState.runId}: ${executionState.status}`);
console.log(`Duration: ${executionState.durationMs}ms`);
console.log(`Steps executed: ${executionState.stepRecords.length}`);Step Execution Records
Each step execution produces a StepExecutionRecord:
interface StepExecutionRecord {
stepId: string;
status: "pending" | "running" | "completed" | "failed" | "skipped";
startedAt?: string;
completedAt?: string;
durationMs?: number;
output?: unknown; // The step's output value
error?: ErrorSnapshot; // Error if the step failed
resolvedInputs?: unknown; // Resolved input expressions
retries: RetryRecord[]; // All retry attempts
path: ExecutionPathSegment[]; // Structural context (loop iteration, branch, etc.)
}A single step ID can appear in multiple records — for example, a step inside a for-each loop produces one record per iteration, each with a different path.
Querying Step Records
const { executionState } = result;
// Find all records for a specific step
const classifyRecords = executionState.stepRecords.filter(
(r) => r.stepId === "classify_ticket"
);
// Find failed steps
const failures = executionState.stepRecords.filter(
(r) => r.status === "failed"
);
// Find steps with retries
const retriedSteps = executionState.stepRecords.filter(
(r) => r.retries.length > 0
);
// Total execution time across all steps
const totalStepTime = executionState.stepRecords.reduce(
(sum, r) => sum + (r.durationMs ?? 0),
0
);Resolved Inputs
The resolvedInputs field captures the input values after JMESPath expression evaluation — showing what values the step actually received at runtime. This is useful for debugging and for the workflow viewer to display resolved values:
for (const record of executionState.stepRecords) {
if (record.resolvedInputs) {
console.log(`Step "${record.stepId}" received:`, record.resolvedInputs);
}
}Execution Path
The path field on each step record tracks structural context — which loop iteration, switch branch, or polling attempt the step is executing within. This enables correct attribution when the same step executes multiple times in different contexts.
type ExecutionPathSegment =
| {
type: "for-each";
stepId: string; // The for-each step's ID
iterationIndex: number; // 0-based iteration index
itemValue: unknown; // The current item value
}
| {
type: "switch-case";
stepId: string; // The switch-case step's ID
matchedCaseIndex: number; // Index of the matched case
matchedValue: unknown; // The value that was matched
}
| {
type: "wait-for-condition";
stepId: string; // The wait-for-condition step's ID
pollAttempt: number; // Current polling attempt number
};Path segments nest. A step inside a for-each loop that's inside a switch-case branch has a path with two segments:
// A step executing in the 3rd iteration of a loop, inside a switch branch
const record = executionState.stepRecords.find(
(r) =>
r.stepId === "process_item" &&
r.path.length === 2 &&
r.path[0].type === "switch-case" &&
r.path[1].type === "for-each" &&
r.path[1].iterationIndex === 2
);Filtering by Path
To find the latest execution of a step across all loop iterations:
const latestClassify = executionState.stepRecords
.filter((r) => r.stepId === "classify_ticket")
.sort((a, b) => {
const aTime = a.completedAt ? new Date(a.completedAt).getTime() : 0;
const bTime = b.completedAt ? new Date(b.completedAt).getTime() : 0;
return bTime - aTime;
})[0];To find all executions of a step within a specific loop iteration:
const iteration2Records = executionState.stepRecords.filter(
(r) =>
r.path.some(
(seg) =>
seg.type === "for-each" &&
seg.stepId === "process_each" &&
seg.iterationIndex === 2
)
);Retry Records
When a step is retried due to a recoverable error, each attempt is captured in the step record's retries array:
interface RetryRecord {
attempt: number; // 1-based attempt number
startedAt: string; // ISO 8601 timestamp
failedAt: string; // ISO 8601 timestamp
errorCode: string; // The error code that triggered the retry
errorMessage: string; // Human-readable error message
}for (const record of executionState.stepRecords) {
if (record.retries.length > 0) {
console.log(`Step "${record.stepId}" was retried ${record.retries.length} time(s):`);
for (const retry of record.retries) {
console.log(` Attempt ${retry.attempt}: ${retry.errorCode} — ${retry.errorMessage}`);
}
}
}Error Snapshots
Errors in the execution state are stored as serializable ErrorSnapshot objects rather than class instances:
interface ErrorSnapshot {
code: string;
category: string;
message: string;
stepId?: string;
statusCode?: number; // For HTTP errors
isRetryable?: boolean; // Whether the error was retryable
}You can convert a StepExecutionError class instance to a snapshot using snapshotError:
import { snapshotError } from "@remoraflow/core";
try {
// ...
} catch (e) {
if (e instanceof StepExecutionError) {
const snapshot = snapshotError(e);
// snapshot is JSON-serializable
await saveToDatabase(snapshot);
}
}Execution Deltas
State transitions are represented as immutable deltas. Each delta describes a single atomic change to the execution state:
type ExecutionDelta =
| { type: "run-started"; runId: string; startedAt: string }
| { type: "step-started"; stepId: string; path: ExecutionPathSegment[]; startedAt: string }
| { type: "step-completed"; stepId: string; path: ExecutionPathSegment[];
completedAt: string; durationMs: number; output: unknown; resolvedInputs?: unknown }
| { type: "step-failed"; stepId: string; path: ExecutionPathSegment[];
failedAt: string; durationMs: number; error: ErrorSnapshot; resolvedInputs?: unknown }
| { type: "step-retry"; stepId: string; path: ExecutionPathSegment[]; retry: RetryRecord }
| { type: "run-completed"; runId: string; completedAt: string;
durationMs: number; output?: unknown }
| { type: "run-failed"; runId: string; failedAt: string;
durationMs: number; error: ErrorSnapshot }Delta Lifecycle
A typical successful execution produces deltas in this order:
run-started— execution beginsstep-started→step-completed— for each step (possibly withstep-retryin between)run-completed— execution finishes
A failed execution ends with run-failed instead of run-completed.
Collecting State History
To build a complete history of all state transitions, collect deltas from onStateChange:
const deltas: ExecutionDelta[] = [];
const result = await executeWorkflow(workflow, {
tools,
onStateChange: (_state, delta) => {
deltas.push(delta);
},
});
// deltas now contains every state transition in order
console.log(`Total transitions: ${deltas.length}`);
console.log("Timeline:");
for (const delta of deltas) {
switch (delta.type) {
case "run-started":
console.log(` [${delta.startedAt}] Run started`);
break;
case "step-started":
console.log(` [${delta.startedAt}] Step "${delta.stepId}" started`);
break;
case "step-completed":
console.log(` [${delta.completedAt}] Step "${delta.stepId}" completed (${delta.durationMs}ms)`);
break;
case "step-failed":
console.log(` [${delta.failedAt}] Step "${delta.stepId}" failed: ${delta.error.message}`);
break;
case "step-retry":
console.log(` Step "${delta.stepId}" retry #${delta.retry.attempt}: ${delta.retry.errorCode}`);
break;
case "run-completed":
console.log(` [${delta.completedAt}] Run completed (${delta.durationMs}ms)`);
break;
case "run-failed":
console.log(` [${delta.failedAt}] Run failed: ${delta.error.message}`);
break;
}
}Replaying State
The applyDelta function is a pure reducer that applies a delta to an execution state, returning a new state without mutation:
import { applyDelta } from "@remoraflow/core";
// Replay a sequence of deltas to reconstruct state at any point
let state: ExecutionState = {
runId: "initial",
status: "pending",
startedAt: new Date().toISOString(),
stepRecords: [],
};
for (const delta of savedDeltas) {
state = applyDelta(state, delta);
// Inspect state at each point in time
console.log(`After ${delta.type}: ${state.status}, ${state.stepRecords.length} records`);
}This is useful for:
- Debugging — reconstruct the execution state at any point by replaying deltas up to that point
- Persistence — store deltas as an append-only log and reconstruct state on demand
- Testing — verify state transitions by applying deltas and asserting on the result
Streaming State to a UI
The onStateChange callback integrates naturally with React or other UI frameworks for live execution visualization:
// React example: stream execution state to a component
const [executionState, setExecutionState] = useState<ExecutionState | null>(null);
async function runWorkflow() {
const result = await executeWorkflow(workflow, {
tools,
onStateChange: (state) => {
setExecutionState(state); // Triggers re-render on every state change
},
});
}The WorkflowViewer component accepts executionState as a prop, automatically highlighting running, completed, and failed steps:
import { WorkflowViewer } from "@remoraflow/ui";
<WorkflowViewer
workflow={workflow}
executionState={executionState} // Live updates during execution
/>Deriving Step Summaries
The viewer exports a deriveStepSummaries helper that aggregates step execution records into per-step summaries. This is useful when a step executes multiple times (inside loops) and you want a single summary:
import { deriveStepSummaries } from "@remoraflow/ui";
const summaries = deriveStepSummaries(result.executionState);
for (const [stepId, summary] of Object.entries(summaries)) {
console.log(`${stepId}:`);
console.log(` Status: ${summary.status}`);
console.log(` Executions: ${summary.executionCount}`);
console.log(` Completed: ${summary.completedCount}`);
console.log(` Failed: ${summary.failedCount}`);
console.log(` Total retries: ${summary.totalRetries}`);
if (summary.latestOutput) {
console.log(` Latest output:`, summary.latestOutput);
}
if (summary.latestError) {
console.log(` Latest error: [${summary.latestError.code}] ${summary.latestError.message}`);
}
}