Basic Workflow
A simple linear pipeline that fetches data, processes it, and formats the output. Each step returns a partial state patch that’s merged into the running state.import { Workflow } from "@radaros/core";
interface PipelineState {
url: string;
rawData: string;
processed: Record<string, number>;
report: string;
}
const pipeline = new Workflow<PipelineState>({
name: "DataPipeline",
initialState: {
url: "https://api.example.com/metrics",
rawData: "",
processed: {},
report: "",
},
steps: [
{
name: "fetch",
run: async (state) => {
const res = await fetch(state.url);
if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`);
const rawData = await res.text();
return { rawData };
},
},
{
name: "process",
run: async (state) => {
const rows = JSON.parse(state.rawData) as Array<{
name: string;
value: number;
}>;
const processed: Record<string, number> = {};
for (const row of rows) {
processed[row.name] = (processed[row.name] ?? 0) + row.value;
}
return { processed };
},
},
{
name: "format",
run: async (state) => {
const lines = Object.entries(state.processed)
.sort(([, a], [, b]) => b - a)
.map(([name, value]) => `${name}: ${value.toLocaleString()}`);
return { report: lines.join("\n") };
},
},
],
});
const { state, stepResults } = await pipeline.run();
console.log(state.report);
console.log(
"Steps:",
stepResults.map((s) => `${s.stepName}: ${s.status}`).join(", ")
);
Agent Steps
UseAgentStep to embed an LLM agent inside a workflow step. The inputFrom function maps workflow state to the agent’s input, and the agent’s output is stored as <stepName>_output on the state.
import { Agent, Workflow, openai } from "@radaros/core";
const summarizer = new Agent({
name: "Summarizer",
model: openai("gpt-4o-mini"),
instructions: `Summarize the provided text into 3 concise bullet points.
Each bullet should be a single sentence. Start each with a verb.`,
register: false,
});
const headlineWriter = new Agent({
name: "HeadlineWriter",
model: openai("gpt-4o-mini"),
instructions: `Write a compelling headline (max 10 words) for the
provided summary. Make it engaging and newsworthy.`,
register: false,
});
interface SummaryState {
articleUrl: string;
rawContent: string;
summarize_output: string;
headline_output: string;
result: { headline: string; summary: string };
}
const workflow = new Workflow<SummaryState>({
name: "ArticleDigest",
initialState: {
articleUrl: "https://api.example.com/articles/latest",
rawContent: "",
summarize_output: "",
headline_output: "",
result: { headline: "", summary: "" },
},
steps: [
{
name: "fetch_content",
run: async (state) => {
const res = await fetch(state.articleUrl);
const article = await res.json();
return { rawContent: article.body };
},
},
{
name: "summarize",
agent: summarizer,
inputFrom: (state) => `Summarize this article:\n\n${state.rawContent}`,
},
{
name: "headline",
agent: headlineWriter,
inputFrom: (state) =>
`Write a headline for this summary:\n\n${state.summarize_output}`,
},
{
name: "compose",
run: async (state) => ({
result: {
headline: state.headline_output.trim(),
summary: state.summarize_output.trim(),
},
}),
},
],
});
const { state } = await workflow.run();
console.log(`# ${state.result.headline}`);
console.log(state.result.summary);
Conditional Branching
A step with acondition function only executes its nested steps when the condition returns true. This enables if/else-style branching based on runtime state.
import { Agent, Workflow, openai } from "@radaros/core";
const translator = new Agent({
name: "Translator",
model: openai("gpt-4o-mini"),
instructions: "Translate the provided text into English. Preserve formatting.",
register: false,
});
const toxicityChecker = new Agent({
name: "ToxicityChecker",
model: openai("gpt-4o-mini"),
instructions: `Analyze the text for toxic or harmful content.
Respond with exactly "SAFE" or "TOXIC: <reason>".`,
register: false,
});
interface ModerationState {
text: string;
language: string;
needsTranslation: boolean;
translate_output: string;
toxicity_check_output: string;
finalText: string;
isSafe: boolean;
}
const workflow = new Workflow<ModerationState>({
name: "ContentModeration",
initialState: {
text: "Bonjour le monde, ceci est un message amical.",
language: "fr",
needsTranslation: false,
translate_output: "",
toxicity_check_output: "",
finalText: "",
isSafe: false,
},
steps: [
{
name: "detect_language",
run: async (state) => ({
needsTranslation: state.language !== "en",
}),
},
{
name: "maybe_translate",
condition: (state) => state.needsTranslation,
steps: [
{
name: "translate",
agent: translator,
inputFrom: (state) =>
`Translate from ${state.language} to English:\n\n${state.text}`,
},
],
},
{
name: "set_english_text",
run: async (state) => ({
finalText: state.needsTranslation
? state.translate_output
: state.text,
}),
},
{
name: "check_toxicity",
agent: toxicityChecker,
inputFrom: (state) => state.finalText,
},
{
name: "evaluate_safety",
run: async (state) => ({
isSafe: state.toxicity_check_output.trim().startsWith("SAFE"),
}),
},
{
name: "block_if_toxic",
condition: (state) => !state.isSafe,
steps: [
{
name: "reject",
run: async (state) => ({
finalText: `[BLOCKED] ${state.toxicity_check_output}`,
}),
},
],
},
],
});
const { state } = await workflow.run();
console.log("Safe:", state.isSafe);
console.log("Output:", state.finalText);
Parallel Steps
A step with aparallel array runs multiple sub-steps concurrently, then merges all their state patches. Use this when steps are independent and can execute simultaneously.
import { Agent, Workflow, openai } from "@radaros/core";
const sentimentAgent = new Agent({
name: "SentimentAnalyzer",
model: openai("gpt-4o-mini"),
instructions: `Classify text sentiment as positive, negative, neutral, or
mixed. Respond with one word followed by a confidence percentage.`,
register: false,
});
const topicAgent = new Agent({
name: "TopicExtractor",
model: openai("gpt-4o-mini"),
instructions: `Extract the main topics from the text as a comma-separated
list. Limit to the top 5 most prominent topics.`,
register: false,
});
const entityAgent = new Agent({
name: "EntityExtractor",
model: openai("gpt-4o-mini"),
instructions: `Extract named entities (people, companies, products, locations)
from the text. Return as a JSON array of {name, type} objects.`,
register: false,
});
interface NLPState {
text: string;
sentiment: string;
topics: string;
entities: string;
SentimentAnalysis_output: string;
TopicExtraction_output: string;
EntityExtraction_output: string;
}
const workflow = new Workflow<NLPState>({
name: "ParallelNLP",
initialState: {
text: "Apple CEO Tim Cook announced the new Vision Pro 2 at WWDC in Cupertino. The device is faster and cheaper, but analysts at Goldman Sachs remain skeptical about mass adoption.",
sentiment: "",
topics: "",
entities: "",
SentimentAnalysis_output: "",
TopicExtraction_output: "",
EntityExtraction_output: "",
},
steps: [
{
name: "analyze",
parallel: [
{
name: "SentimentAnalysis",
agent: sentimentAgent,
inputFrom: (state) => state.text,
},
{
name: "TopicExtraction",
agent: topicAgent,
inputFrom: (state) => state.text,
},
{
name: "EntityExtraction",
agent: entityAgent,
inputFrom: (state) => state.text,
},
],
},
{
name: "merge_results",
run: async (state) => ({
sentiment: state.SentimentAnalysis_output,
topics: state.TopicExtraction_output,
entities: state.EntityExtraction_output,
}),
},
],
});
const { state } = await workflow.run();
console.log("Sentiment:", state.sentiment);
console.log("Topics:", state.topics);
console.log("Entities:", state.entities);
Retry Policy
Configure automatic retries with exponential backoff for steps that call unreliable external services. The retry policy applies to all steps in the workflow.import { Workflow } from "@radaros/core";
interface IngestionState {
source: string;
data: string;
recordCount: number;
ingested: boolean;
retryAttempts: number;
}
const workflow = new Workflow<IngestionState>({
name: "ReliableIngestion",
initialState: {
source: "https://flaky-partner-api.example.com/events",
data: "",
recordCount: 0,
ingested: false,
retryAttempts: 0,
},
retryPolicy: {
maxRetries: 3,
backoffMs: 500,
backoffMultiplier: 2,
},
steps: [
{
name: "fetch_events",
run: async (state) => {
const res = await fetch(state.source, {
headers: { Authorization: `Bearer ${process.env.PARTNER_API_KEY}` },
});
if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`);
const data = await res.text();
const parsed = JSON.parse(data);
return { data, recordCount: parsed.length };
},
},
{
name: "validate",
run: async (state) => {
const records = JSON.parse(state.data);
const valid = records.filter(
(r: any) => r.id && r.timestamp && r.payload
);
if (valid.length === 0) throw new Error("No valid records in payload");
return { data: JSON.stringify(valid), recordCount: valid.length };
},
},
{
name: "ingest",
run: async (state) => {
await fetch("https://warehouse.internal/ingest", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: state.data,
});
return { ingested: true };
},
},
],
});
const { state, stepResults } = await workflow.run();
console.log(`Ingested: ${state.ingested} (${state.recordCount} records)`);
for (const step of stepResults) {
console.log(` ${step.stepName}: ${step.status} (${step.durationMs}ms)`);
}
Workflow with State
Typed state flows between steps, providing full type safety. Each step returns a partial patch that’s shallow-merged into the running state, so steps can build on each other’s output.import { Workflow } from "@radaros/core";
interface OrderState {
orderId: string;
customerId: string;
items: Array<{ sku: string; qty: number; price: number }>;
subtotal: number;
discountCode: string;
discountAmount: number;
tax: number;
shippingCost: number;
total: number;
receiptUrl: string;
}
const checkout = new Workflow<OrderState>({
name: "Checkout",
initialState: {
orderId: "ORD-20260302-001",
customerId: "CUST-4492",
items: [
{ sku: "WIDGET-A", qty: 2, price: 19.99 },
{ sku: "GADGET-B", qty: 1, price: 49.99 },
{ sku: "CABLE-C", qty: 3, price: 9.99 },
],
subtotal: 0,
discountCode: "SPRING20",
discountAmount: 0,
tax: 0,
shippingCost: 0,
total: 0,
receiptUrl: "",
},
steps: [
{
name: "calculate_subtotal",
run: async (state) => {
const subtotal = state.items.reduce(
(sum, item) => sum + item.qty * item.price,
0
);
return { subtotal: +subtotal.toFixed(2) };
},
},
{
name: "apply_discount",
run: async (state) => {
const discounts: Record<string, number> = {
SPRING20: 0.2,
WELCOME10: 0.1,
VIP30: 0.3,
};
const rate = discounts[state.discountCode] ?? 0;
const discountAmount = +(state.subtotal * rate).toFixed(2);
return { discountAmount };
},
},
{
name: "calculate_shipping",
run: async (state) => {
const itemCount = state.items.reduce((sum, i) => sum + i.qty, 0);
const shippingCost = itemCount > 5 ? 0 : 5.99;
return { shippingCost };
},
},
{
name: "apply_tax",
run: async (state) => {
const taxableAmount = state.subtotal - state.discountAmount;
const taxRate = 0.0875;
const tax = +(taxableAmount * taxRate).toFixed(2);
const total = +(taxableAmount + tax + state.shippingCost).toFixed(2);
return { tax, total };
},
},
{
name: "generate_receipt",
run: async (state) => {
const res = await fetch("https://receipts.internal/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
orderId: state.orderId,
customerId: state.customerId,
items: state.items,
subtotal: state.subtotal,
discount: state.discountAmount,
tax: state.tax,
shipping: state.shippingCost,
total: state.total,
}),
});
const { url } = await res.json();
return { receiptUrl: url };
},
},
],
});
const { state } = await checkout.run();
console.log(`Order ${state.orderId}`);
console.log(` Subtotal: $${state.subtotal}`);
console.log(` Discount: -$${state.discountAmount} (${state.discountCode})`);
console.log(` Shipping: $${state.shippingCost}`);
console.log(` Tax: $${state.tax}`);
console.log(` Total: $${state.total}`);
console.log(` Receipt: ${state.receiptUrl}`);
Complex Pipeline
A multi-stage content generation pipeline: research, outline, draft, review, and publish. Each stage is anAgentStep that passes its output forward through the state.
import { Agent, Workflow, openai } from "@radaros/core";
const researchAgent = new Agent({
name: "Researcher",
model: openai("gpt-4o"),
instructions: `Gather key facts, statistics, and expert opinions on the topic.
Return structured research notes with sections and citations.
Target at least 5 distinct data points.`,
register: false,
});
const outlineAgent = new Agent({
name: "Outliner",
model: openai("gpt-4o-mini"),
instructions: `Create a detailed article outline with section headers,
key points for each section, and a logical flow.
Include an intro hook and a conclusion with call-to-action.`,
register: false,
});
const writerAgent = new Agent({
name: "Drafter",
model: openai("gpt-4o"),
instructions: `Write a polished 800-word article from the outline and
research notes. Use a professional but engaging tone.
Incorporate data points naturally. Include subheadings.`,
register: false,
});
const reviewerAgent = new Agent({
name: "Reviewer",
model: openai("gpt-4o"),
instructions: `Review the article for accuracy, clarity, grammar, and
engagement. Provide a final edited version — don't just list suggestions.
Ensure all cited data matches the original research.`,
register: false,
});
interface ContentState {
topic: string;
targetAudience: string;
research_output: string;
outline_output: string;
draft_output: string;
review_output: string;
publishedUrl: string;
wordCount: number;
}
const contentPipeline = new Workflow<ContentState>({
name: "ContentPipeline",
initialState: {
topic: "The rise of edge computing in 2026",
targetAudience: "CTOs and VP Engineering at mid-size SaaS companies",
research_output: "",
outline_output: "",
draft_output: "",
review_output: "",
publishedUrl: "",
wordCount: 0,
},
steps: [
{
name: "research",
agent: researchAgent,
inputFrom: (state) =>
`Research this topic for ${state.targetAudience}:\n\n${state.topic}`,
},
{
name: "outline",
agent: outlineAgent,
inputFrom: (state) =>
`Create an outline for "${state.topic}" targeting ${state.targetAudience}.\n\nResearch notes:\n${state.research_output}`,
},
{
name: "draft",
agent: writerAgent,
inputFrom: (state) =>
`Write a full article.\n\nOutline:\n${state.outline_output}\n\nResearch:\n${state.research_output}`,
},
{
name: "review",
agent: reviewerAgent,
inputFrom: (state) =>
`Review and improve this article. Preserve meaning, fix issues:\n\n${state.draft_output}`,
},
{
name: "count_words",
run: async (state) => ({
wordCount: state.review_output.split(/\s+/).filter(Boolean).length,
}),
},
{
name: "publish",
run: async (state) => {
const res = await fetch("https://cms.internal/api/articles", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
title: state.topic,
body: state.review_output,
audience: state.targetAudience,
wordCount: state.wordCount,
}),
});
const { url } = await res.json();
return { publishedUrl: url };
},
},
],
});
const { state, stepResults } = await contentPipeline.run();
console.log(`Published: ${state.publishedUrl} (${state.wordCount} words)`);
for (const step of stepResults) {
console.log(
` ${step.stepName}: ${step.status} (${step.durationMs}ms)`
);
}
Workflow with Human Review
Pause the workflow at a specific step for human approval before continuing. The human review step calls an external system (Slack, email, dashboard) and blocks until a response is received.import { Agent, Workflow, EventBus, openai } from "@radaros/core";
const draftAgent = new Agent({
name: "DraftWriter",
model: openai("gpt-4o"),
instructions: `Write a marketing email draft based on the campaign brief.
Include subject line, preview text, and HTML body.
Keep it under 200 words. Use a compelling CTA.`,
register: false,
});
interface EmailCampaignState {
brief: string;
audience: string;
DraftWriter_output: string;
approved: boolean;
reviewerNotes: string;
sent: boolean;
sentCount: number;
}
async function requestHumanApproval(
draft: string
): Promise<{ approved: boolean; notes: string }> {
// In production: post to Slack, send a review email, or update a dashboard
// and wait for a webhook callback. This example simulates approval.
console.log("Draft pending review:", draft.slice(0, 200), "...");
return { approved: true, notes: "Looks good — ship it." };
}
const eventBus = new EventBus();
eventBus.on("workflow:step:complete", (event) => {
console.log(`[Event] Step "${event.stepName}" completed in ${event.durationMs}ms`);
});
const emailWorkflow = new Workflow<EmailCampaignState>({
name: "EmailCampaign",
eventBus,
initialState: {
brief:
"Announce our new API v2 with 3x throughput, 50ms P99 latency, and new streaming endpoints.",
audience: "Developer newsletter subscribers (14,200 contacts)",
DraftWriter_output: "",
approved: false,
reviewerNotes: "",
sent: false,
sentCount: 0,
},
steps: [
{
name: "write_draft",
agent: draftAgent,
inputFrom: (state) =>
`Campaign brief: ${state.brief}\nAudience: ${state.audience}`,
},
{
name: "human_review",
run: async (state) => {
const { approved, notes } = await requestHumanApproval(
state.DraftWriter_output
);
return { approved, reviewerNotes: notes };
},
},
{
name: "send_if_approved",
condition: (state) => state.approved,
steps: [
{
name: "send_email",
run: async (state) => {
const res = await fetch(
"https://email.internal/api/campaigns/send",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
subject: "Introducing API v2",
html: state.DraftWriter_output,
audience: state.audience,
}),
}
);
const { recipientCount } = await res.json();
return { sent: true, sentCount: recipientCount };
},
},
],
},
{
name: "skip_if_rejected",
condition: (state) => !state.approved,
steps: [
{
name: "log_rejection",
run: async (state) => {
console.log(
`Campaign rejected. Reviewer notes: ${state.reviewerNotes}`
);
return {};
},
},
],
},
],
});
const { state } = await emailWorkflow.run();
console.log("Approved:", state.approved);
console.log("Sent:", state.sent, `(${state.sentCount} recipients)`);
console.log("Reviewer notes:", state.reviewerNotes);
Error Handling
Use try/catch inside function steps for fine-grained error handling. Combine with the workflow-levelretryPolicy as a safety net, and use conditional steps to handle failures gracefully.
import { Workflow } from "@radaros/core";
interface MigrationState {
sourceDb: string;
targetDb: string;
recordCount: number;
migratedCount: number;
errors: Array<{ batch: number; message: string }>;
status: "pending" | "partial" | "complete" | "failed";
rollbackPerformed: boolean;
}
const migration = new Workflow<MigrationState>({
name: "DatabaseMigration",
initialState: {
sourceDb: "postgres://source-host:5432/prod",
targetDb: "postgres://target-host:5432/prod_v2",
recordCount: 0,
migratedCount: 0,
errors: [],
status: "pending",
rollbackPerformed: false,
},
retryPolicy: { maxRetries: 2, backoffMs: 1000 },
steps: [
{
name: "count_records",
run: async (state) => {
const res = await fetch(`${state.sourceDb}/count`);
if (!res.ok) throw new Error("Failed to count source records");
const { count } = await res.json();
return { recordCount: count };
},
},
{
name: "migrate_batches",
run: async (state) => {
const errors: Array<{ batch: number; message: string }> = [];
let migrated = 0;
const batchSize = 1000;
for (
let offset = 0;
offset < state.recordCount;
offset += batchSize
) {
const batchNum = Math.floor(offset / batchSize) + 1;
try {
const batch = await fetch(
`${state.sourceDb}/records?offset=${offset}&limit=${batchSize}`
).then((r) => r.json());
const res = await fetch(`${state.targetDb}/bulk-insert`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
});
if (!res.ok) {
errors.push({
batch: batchNum,
message: `HTTP ${res.status}: ${res.statusText}`,
});
continue;
}
migrated += batch.length;
} catch (err) {
const message =
err instanceof Error ? err.message : String(err);
errors.push({ batch: batchNum, message });
}
}
return { migratedCount: migrated, errors };
},
},
{
name: "set_status",
run: async (state) => {
if (state.errors.length === 0) return { status: "complete" as const };
if (state.migratedCount > 0) return { status: "partial" as const };
return { status: "failed" as const };
},
},
{
name: "handle_failures",
condition: (state) => state.errors.length > 0,
steps: [
{
name: "log_errors",
run: async (state) => {
console.error(
`Migration finished with ${state.errors.length} batch error(s):`
);
for (const err of state.errors) {
console.error(` Batch ${err.batch}: ${err.message}`);
}
return {};
},
},
],
},
{
name: "rollback_on_total_failure",
condition: (state) => state.status === "failed",
steps: [
{
name: "rollback",
run: async (state) => {
console.error("Total failure — rolling back target database...");
await fetch(`${state.targetDb}/rollback`, { method: "POST" });
return { rollbackPerformed: true };
},
},
],
},
],
});
const { state, stepResults } = await migration.run();
console.log(`Status: ${state.status}`);
console.log(`Migrated: ${state.migratedCount}/${state.recordCount}`);
console.log(`Rollback: ${state.rollbackPerformed ? "yes" : "no"}`);
for (const step of stepResults) {
const errorSuffix = step.error ? ` — ${step.error}` : "";
console.log(` ${step.stepName}: ${step.status} (${step.durationMs}ms)${errorSuffix}`);
}
Workflow Registration
Workflows auto-register in the global registry by default. Use theregistry to list them, run them by name, or pass them to the transport layer’s serve() function for HTTP access.
import { Agent, Workflow, registry, openai } from "@radaros/core";
import { createExpressGateway } from "@radaros/transport";
const classifier = new Agent({
name: "TicketClassifier",
model: openai("gpt-4o-mini"),
instructions: `Classify the support ticket into exactly one category:
bug, feature-request, question, or billing.
Respond with only the category name, nothing else.`,
register: false,
});
interface TicketState {
ticketBody: string;
classify_output: string;
category: string;
priority: "low" | "medium" | "high";
routed: boolean;
queueUrl: string;
}
const ticketWorkflow = new Workflow<TicketState>({
name: "TicketRouter",
initialState: {
ticketBody: "",
classify_output: "",
category: "",
priority: "medium",
routed: false,
queueUrl: "",
},
steps: [
{
name: "classify",
agent: classifier,
inputFrom: (state) => `Classify this ticket:\n\n${state.ticketBody}`,
},
{
name: "normalize",
run: async (state) => ({
category: state.classify_output.trim().toLowerCase(),
}),
},
{
name: "assign_priority",
run: async (state) => {
const highPriorityKeywords = [
"urgent",
"down",
"outage",
"critical",
"security",
];
const isHigh = highPriorityKeywords.some((kw) =>
state.ticketBody.toLowerCase().includes(kw)
);
return { priority: isHigh ? ("high" as const) : ("medium" as const) };
},
},
{
name: "route",
run: async (state) => {
const queues: Record<string, string> = {
bug: "https://queues.internal/bugs",
"feature-request": "https://queues.internal/features",
question: "https://queues.internal/support",
billing: "https://queues.internal/billing",
};
const endpoint = queues[state.category] ?? queues.question;
await fetch(endpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
ticket: state.ticketBody,
category: state.category,
priority: state.priority,
}),
});
return { routed: true, queueUrl: endpoint };
},
},
],
});
// The workflow is now in the global registry
console.log("Registered workflows:", registry.list().workflows);
// Run directly with initial state overrides
const result = await ticketWorkflow.run({
initialState: {
ticketBody:
"URGENT: Our production API is returning 500 errors on all /users endpoints since 3am.",
},
});
console.log("Category:", result.state.category);
console.log("Priority:", result.state.priority);
console.log("Routed:", result.state.routed, "→", result.state.queueUrl);
// Or expose via Express transport for HTTP access
// const app = createExpressGateway();
// app.listen(3000);
// POST /workflows/TicketRouter/run → executes the workflow