Basic Workflow
A simple linear pipeline that fetches data, processes it, and formats the output. Each step returns a partial state patch that’s merged into the running state.Copy
Ask AI
import { Workflow } from "@radaros/core";
interface PipelineState {
url: string;
rawData: string;
processed: Record<string, number>;
report: string;
}
const pipeline = new Workflow<PipelineState>({
name: "DataPipeline",
initialState: {
url: "https://api.example.com/metrics",
rawData: "",
processed: {},
report: "",
},
steps: [
{
name: "fetch",
run: async (state) => {
const res = await fetch(state.url);
if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`);
const rawData = await res.text();
return { rawData };
},
},
{
name: "process",
run: async (state) => {
const rows = JSON.parse(state.rawData) as Array<{
name: string;
value: number;
}>;
const processed: Record<string, number> = {};
for (const row of rows) {
processed[row.name] = (processed[row.name] ?? 0) + row.value;
}
return { processed };
},
},
{
name: "format",
run: async (state) => {
const lines = Object.entries(state.processed)
.sort(([, a], [, b]) => b - a)
.map(([name, value]) => `${name}: ${value.toLocaleString()}`);
return { report: lines.join("\n") };
},
},
],
});
const { state, stepResults } = await pipeline.run();
console.log(state.report);
console.log(
"Steps:",
stepResults.map((s) => `${s.stepName}: ${s.status}`).join(", ")
);
Agent Steps
UseAgentStep to embed an LLM agent inside a workflow step. The inputFrom function maps workflow state to the agent’s input, and the agent’s output is stored as <stepName>_output on the state.
Copy
Ask AI
import { Agent, Workflow, openai } from "@radaros/core";
const summarizer = new Agent({
name: "Summarizer",
model: openai("gpt-4o-mini"),
instructions: `Summarize the provided text into 3 concise bullet points.
Each bullet should be a single sentence. Start each with a verb.`,
register: false,
});
const headlineWriter = new Agent({
name: "HeadlineWriter",
model: openai("gpt-4o-mini"),
instructions: `Write a compelling headline (max 10 words) for the
provided summary. Make it engaging and newsworthy.`,
register: false,
});
interface SummaryState {
articleUrl: string;
rawContent: string;
summarize_output: string;
headline_output: string;
result: { headline: string; summary: string };
}
const workflow = new Workflow<SummaryState>({
name: "ArticleDigest",
initialState: {
articleUrl: "https://api.example.com/articles/latest",
rawContent: "",
summarize_output: "",
headline_output: "",
result: { headline: "", summary: "" },
},
steps: [
{
name: "fetch_content",
run: async (state) => {
const res = await fetch(state.articleUrl);
const article = await res.json();
return { rawContent: article.body };
},
},
{
name: "summarize",
agent: summarizer,
inputFrom: (state) => `Summarize this article:\n\n${state.rawContent}`,
},
{
name: "headline",
agent: headlineWriter,
inputFrom: (state) =>
`Write a headline for this summary:\n\n${state.summarize_output}`,
},
{
name: "compose",
run: async (state) => ({
result: {
headline: state.headline_output.trim(),
summary: state.summarize_output.trim(),
},
}),
},
],
});
const { state } = await workflow.run();
console.log(`# ${state.result.headline}`);
console.log(state.result.summary);
Conditional Branching
A step with acondition function only executes its nested steps when the condition returns true. This enables if/else-style branching based on runtime state.
Copy
Ask AI
import { Agent, Workflow, openai } from "@radaros/core";
const translator = new Agent({
name: "Translator",
model: openai("gpt-4o-mini"),
instructions: "Translate the provided text into English. Preserve formatting.",
register: false,
});
const toxicityChecker = new Agent({
name: "ToxicityChecker",
model: openai("gpt-4o-mini"),
instructions: `Analyze the text for toxic or harmful content.
Respond with exactly "SAFE" or "TOXIC: <reason>".`,
register: false,
});
interface ModerationState {
text: string;
language: string;
needsTranslation: boolean;
translate_output: string;
toxicity_check_output: string;
finalText: string;
isSafe: boolean;
}
const workflow = new Workflow<ModerationState>({
name: "ContentModeration",
initialState: {
text: "Bonjour le monde, ceci est un message amical.",
language: "fr",
needsTranslation: false,
translate_output: "",
toxicity_check_output: "",
finalText: "",
isSafe: false,
},
steps: [
{
name: "detect_language",
run: async (state) => ({
needsTranslation: state.language !== "en",
}),
},
{
name: "maybe_translate",
condition: (state) => state.needsTranslation,
steps: [
{
name: "translate",
agent: translator,
inputFrom: (state) =>
`Translate from ${state.language} to English:\n\n${state.text}`,
},
],
},
{
name: "set_english_text",
run: async (state) => ({
finalText: state.needsTranslation
? state.translate_output
: state.text,
}),
},
{
name: "check_toxicity",
agent: toxicityChecker,
inputFrom: (state) => state.finalText,
},
{
name: "evaluate_safety",
run: async (state) => ({
isSafe: state.toxicity_check_output.trim().startsWith("SAFE"),
}),
},
{
name: "block_if_toxic",
condition: (state) => !state.isSafe,
steps: [
{
name: "reject",
run: async (state) => ({
finalText: `[BLOCKED] ${state.toxicity_check_output}`,
}),
},
],
},
],
});
const { state } = await workflow.run();
console.log("Safe:", state.isSafe);
console.log("Output:", state.finalText);
Parallel Steps
A step with aparallel array runs multiple sub-steps concurrently, then merges all their state patches. Use this when steps are independent and can execute simultaneously.
Copy
Ask AI
import { Agent, Workflow, openai } from "@radaros/core";
const sentimentAgent = new Agent({
name: "SentimentAnalyzer",
model: openai("gpt-4o-mini"),
instructions: `Classify text sentiment as positive, negative, neutral, or
mixed. Respond with one word followed by a confidence percentage.`,
register: false,
});
const topicAgent = new Agent({
name: "TopicExtractor",
model: openai("gpt-4o-mini"),
instructions: `Extract the main topics from the text as a comma-separated
list. Limit to the top 5 most prominent topics.`,
register: false,
});
const entityAgent = new Agent({
name: "EntityExtractor",
model: openai("gpt-4o-mini"),
instructions: `Extract named entities (people, companies, products, locations)
from the text. Return as a JSON array of {name, type} objects.`,
register: false,
});
interface NLPState {
text: string;
sentiment: string;
topics: string;
entities: string;
SentimentAnalysis_output: string;
TopicExtraction_output: string;
EntityExtraction_output: string;
}
const workflow = new Workflow<NLPState>({
name: "ParallelNLP",
initialState: {
text: "Apple CEO Tim Cook announced the new Vision Pro 2 at WWDC in Cupertino. The device is faster and cheaper, but analysts at Goldman Sachs remain skeptical about mass adoption.",
sentiment: "",
topics: "",
entities: "",
SentimentAnalysis_output: "",
TopicExtraction_output: "",
EntityExtraction_output: "",
},
steps: [
{
name: "analyze",
parallel: [
{
name: "SentimentAnalysis",
agent: sentimentAgent,
inputFrom: (state) => state.text,
},
{
name: "TopicExtraction",
agent: topicAgent,
inputFrom: (state) => state.text,
},
{
name: "EntityExtraction",
agent: entityAgent,
inputFrom: (state) => state.text,
},
],
},
{
name: "merge_results",
run: async (state) => ({
sentiment: state.SentimentAnalysis_output,
topics: state.TopicExtraction_output,
entities: state.EntityExtraction_output,
}),
},
],
});
const { state } = await workflow.run();
console.log("Sentiment:", state.sentiment);
console.log("Topics:", state.topics);
console.log("Entities:", state.entities);
Retry Policy
Configure automatic retries with exponential backoff for steps that call unreliable external services. The retry policy applies to all steps in the workflow.Copy
Ask AI
import { Workflow } from "@radaros/core";
interface IngestionState {
source: string;
data: string;
recordCount: number;
ingested: boolean;
retryAttempts: number;
}
const workflow = new Workflow<IngestionState>({
name: "ReliableIngestion",
initialState: {
source: "https://flaky-partner-api.example.com/events",
data: "",
recordCount: 0,
ingested: false,
retryAttempts: 0,
},
retryPolicy: {
maxRetries: 3,
backoffMs: 500,
backoffMultiplier: 2,
},
steps: [
{
name: "fetch_events",
run: async (state) => {
const res = await fetch(state.source, {
headers: { Authorization: `Bearer ${process.env.PARTNER_API_KEY}` },
});
if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`);
const data = await res.text();
const parsed = JSON.parse(data);
return { data, recordCount: parsed.length };
},
},
{
name: "validate",
run: async (state) => {
const records = JSON.parse(state.data);
const valid = records.filter(
(r: any) => r.id && r.timestamp && r.payload
);
if (valid.length === 0) throw new Error("No valid records in payload");
return { data: JSON.stringify(valid), recordCount: valid.length };
},
},
{
name: "ingest",
run: async (state) => {
await fetch("https://warehouse.internal/ingest", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: state.data,
});
return { ingested: true };
},
},
],
});
const { state, stepResults } = await workflow.run();
console.log(`Ingested: ${state.ingested} (${state.recordCount} records)`);
for (const step of stepResults) {
console.log(` ${step.stepName}: ${step.status} (${step.durationMs}ms)`);
}
Workflow with State
Typed state flows between steps, providing full type safety. Each step returns a partial patch that’s shallow-merged into the running state, so steps can build on each other’s output.Copy
Ask AI
import { Workflow } from "@radaros/core";
interface OrderState {
orderId: string;
customerId: string;
items: Array<{ sku: string; qty: number; price: number }>;
subtotal: number;
discountCode: string;
discountAmount: number;
tax: number;
shippingCost: number;
total: number;
receiptUrl: string;
}
const checkout = new Workflow<OrderState>({
name: "Checkout",
initialState: {
orderId: "ORD-20260302-001",
customerId: "CUST-4492",
items: [
{ sku: "WIDGET-A", qty: 2, price: 19.99 },
{ sku: "GADGET-B", qty: 1, price: 49.99 },
{ sku: "CABLE-C", qty: 3, price: 9.99 },
],
subtotal: 0,
discountCode: "SPRING20",
discountAmount: 0,
tax: 0,
shippingCost: 0,
total: 0,
receiptUrl: "",
},
steps: [
{
name: "calculate_subtotal",
run: async (state) => {
const subtotal = state.items.reduce(
(sum, item) => sum + item.qty * item.price,
0
);
return { subtotal: +subtotal.toFixed(2) };
},
},
{
name: "apply_discount",
run: async (state) => {
const discounts: Record<string, number> = {
SPRING20: 0.2,
WELCOME10: 0.1,
VIP30: 0.3,
};
const rate = discounts[state.discountCode] ?? 0;
const discountAmount = +(state.subtotal * rate).toFixed(2);
return { discountAmount };
},
},
{
name: "calculate_shipping",
run: async (state) => {
const itemCount = state.items.reduce((sum, i) => sum + i.qty, 0);
const shippingCost = itemCount > 5 ? 0 : 5.99;
return { shippingCost };
},
},
{
name: "apply_tax",
run: async (state) => {
const taxableAmount = state.subtotal - state.discountAmount;
const taxRate = 0.0875;
const tax = +(taxableAmount * taxRate).toFixed(2);
const total = +(taxableAmount + tax + state.shippingCost).toFixed(2);
return { tax, total };
},
},
{
name: "generate_receipt",
run: async (state) => {
const res = await fetch("https://receipts.internal/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
orderId: state.orderId,
customerId: state.customerId,
items: state.items,
subtotal: state.subtotal,
discount: state.discountAmount,
tax: state.tax,
shipping: state.shippingCost,
total: state.total,
}),
});
const { url } = await res.json();
return { receiptUrl: url };
},
},
],
});
const { state } = await checkout.run();
console.log(`Order ${state.orderId}`);
console.log(` Subtotal: $${state.subtotal}`);
console.log(` Discount: -$${state.discountAmount} (${state.discountCode})`);
console.log(` Shipping: $${state.shippingCost}`);
console.log(` Tax: $${state.tax}`);
console.log(` Total: $${state.total}`);
console.log(` Receipt: ${state.receiptUrl}`);
Complex Pipeline
A multi-stage content generation pipeline: research, outline, draft, review, and publish. Each stage is anAgentStep that passes its output forward through the state.
Copy
Ask AI
import { Agent, Workflow, openai } from "@radaros/core";
const researchAgent = new Agent({
name: "Researcher",
model: openai("gpt-4o"),
instructions: `Gather key facts, statistics, and expert opinions on the topic.
Return structured research notes with sections and citations.
Target at least 5 distinct data points.`,
register: false,
});
const outlineAgent = new Agent({
name: "Outliner",
model: openai("gpt-4o-mini"),
instructions: `Create a detailed article outline with section headers,
key points for each section, and a logical flow.
Include an intro hook and a conclusion with call-to-action.`,
register: false,
});
const writerAgent = new Agent({
name: "Drafter",
model: openai("gpt-4o"),
instructions: `Write a polished 800-word article from the outline and
research notes. Use a professional but engaging tone.
Incorporate data points naturally. Include subheadings.`,
register: false,
});
const reviewerAgent = new Agent({
name: "Reviewer",
model: openai("gpt-4o"),
instructions: `Review the article for accuracy, clarity, grammar, and
engagement. Provide a final edited version — don't just list suggestions.
Ensure all cited data matches the original research.`,
register: false,
});
interface ContentState {
topic: string;
targetAudience: string;
research_output: string;
outline_output: string;
draft_output: string;
review_output: string;
publishedUrl: string;
wordCount: number;
}
const contentPipeline = new Workflow<ContentState>({
name: "ContentPipeline",
initialState: {
topic: "The rise of edge computing in 2026",
targetAudience: "CTOs and VP Engineering at mid-size SaaS companies",
research_output: "",
outline_output: "",
draft_output: "",
review_output: "",
publishedUrl: "",
wordCount: 0,
},
steps: [
{
name: "research",
agent: researchAgent,
inputFrom: (state) =>
`Research this topic for ${state.targetAudience}:\n\n${state.topic}`,
},
{
name: "outline",
agent: outlineAgent,
inputFrom: (state) =>
`Create an outline for "${state.topic}" targeting ${state.targetAudience}.\n\nResearch notes:\n${state.research_output}`,
},
{
name: "draft",
agent: writerAgent,
inputFrom: (state) =>
`Write a full article.\n\nOutline:\n${state.outline_output}\n\nResearch:\n${state.research_output}`,
},
{
name: "review",
agent: reviewerAgent,
inputFrom: (state) =>
`Review and improve this article. Preserve meaning, fix issues:\n\n${state.draft_output}`,
},
{
name: "count_words",
run: async (state) => ({
wordCount: state.review_output.split(/\s+/).filter(Boolean).length,
}),
},
{
name: "publish",
run: async (state) => {
const res = await fetch("https://cms.internal/api/articles", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
title: state.topic,
body: state.review_output,
audience: state.targetAudience,
wordCount: state.wordCount,
}),
});
const { url } = await res.json();
return { publishedUrl: url };
},
},
],
});
const { state, stepResults } = await contentPipeline.run();
console.log(`Published: ${state.publishedUrl} (${state.wordCount} words)`);
for (const step of stepResults) {
console.log(
` ${step.stepName}: ${step.status} (${step.durationMs}ms)`
);
}
Workflow with Human Review
Pause the workflow at a specific step for human approval before continuing. The human review step calls an external system (Slack, email, dashboard) and blocks until a response is received.Copy
Ask AI
import { Agent, Workflow, EventBus, openai } from "@radaros/core";
const draftAgent = new Agent({
name: "DraftWriter",
model: openai("gpt-4o"),
instructions: `Write a marketing email draft based on the campaign brief.
Include subject line, preview text, and HTML body.
Keep it under 200 words. Use a compelling CTA.`,
register: false,
});
interface EmailCampaignState {
brief: string;
audience: string;
DraftWriter_output: string;
approved: boolean;
reviewerNotes: string;
sent: boolean;
sentCount: number;
}
async function requestHumanApproval(
draft: string
): Promise<{ approved: boolean; notes: string }> {
// In production: post to Slack, send a review email, or update a dashboard
// and wait for a webhook callback. This example simulates approval.
console.log("Draft pending review:", draft.slice(0, 200), "...");
return { approved: true, notes: "Looks good — ship it." };
}
const eventBus = new EventBus();
eventBus.on("workflow:step:complete", (event) => {
console.log(`[Event] Step "${event.stepName}" completed in ${event.durationMs}ms`);
});
const emailWorkflow = new Workflow<EmailCampaignState>({
name: "EmailCampaign",
eventBus,
initialState: {
brief:
"Announce our new API v2 with 3x throughput, 50ms P99 latency, and new streaming endpoints.",
audience: "Developer newsletter subscribers (14,200 contacts)",
DraftWriter_output: "",
approved: false,
reviewerNotes: "",
sent: false,
sentCount: 0,
},
steps: [
{
name: "write_draft",
agent: draftAgent,
inputFrom: (state) =>
`Campaign brief: ${state.brief}\nAudience: ${state.audience}`,
},
{
name: "human_review",
run: async (state) => {
const { approved, notes } = await requestHumanApproval(
state.DraftWriter_output
);
return { approved, reviewerNotes: notes };
},
},
{
name: "send_if_approved",
condition: (state) => state.approved,
steps: [
{
name: "send_email",
run: async (state) => {
const res = await fetch(
"https://email.internal/api/campaigns/send",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
subject: "Introducing API v2",
html: state.DraftWriter_output,
audience: state.audience,
}),
}
);
const { recipientCount } = await res.json();
return { sent: true, sentCount: recipientCount };
},
},
],
},
{
name: "skip_if_rejected",
condition: (state) => !state.approved,
steps: [
{
name: "log_rejection",
run: async (state) => {
console.log(
`Campaign rejected. Reviewer notes: ${state.reviewerNotes}`
);
return {};
},
},
],
},
],
});
const { state } = await emailWorkflow.run();
console.log("Approved:", state.approved);
console.log("Sent:", state.sent, `(${state.sentCount} recipients)`);
console.log("Reviewer notes:", state.reviewerNotes);
Error Handling
Use try/catch inside function steps for fine-grained error handling. Combine with the workflow-levelretryPolicy as a safety net, and use conditional steps to handle failures gracefully.
Copy
Ask AI
import { Workflow } from "@radaros/core";
interface MigrationState {
sourceDb: string;
targetDb: string;
recordCount: number;
migratedCount: number;
errors: Array<{ batch: number; message: string }>;
status: "pending" | "partial" | "complete" | "failed";
rollbackPerformed: boolean;
}
const migration = new Workflow<MigrationState>({
name: "DatabaseMigration",
initialState: {
sourceDb: "postgres://source-host:5432/prod",
targetDb: "postgres://target-host:5432/prod_v2",
recordCount: 0,
migratedCount: 0,
errors: [],
status: "pending",
rollbackPerformed: false,
},
retryPolicy: { maxRetries: 2, backoffMs: 1000 },
steps: [
{
name: "count_records",
run: async (state) => {
const res = await fetch(`${state.sourceDb}/count`);
if (!res.ok) throw new Error("Failed to count source records");
const { count } = await res.json();
return { recordCount: count };
},
},
{
name: "migrate_batches",
run: async (state) => {
const errors: Array<{ batch: number; message: string }> = [];
let migrated = 0;
const batchSize = 1000;
for (
let offset = 0;
offset < state.recordCount;
offset += batchSize
) {
const batchNum = Math.floor(offset / batchSize) + 1;
try {
const batch = await fetch(
`${state.sourceDb}/records?offset=${offset}&limit=${batchSize}`
).then((r) => r.json());
const res = await fetch(`${state.targetDb}/bulk-insert`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(batch),
});
if (!res.ok) {
errors.push({
batch: batchNum,
message: `HTTP ${res.status}: ${res.statusText}`,
});
continue;
}
migrated += batch.length;
} catch (err) {
const message =
err instanceof Error ? err.message : String(err);
errors.push({ batch: batchNum, message });
}
}
return { migratedCount: migrated, errors };
},
},
{
name: "set_status",
run: async (state) => {
if (state.errors.length === 0) return { status: "complete" as const };
if (state.migratedCount > 0) return { status: "partial" as const };
return { status: "failed" as const };
},
},
{
name: "handle_failures",
condition: (state) => state.errors.length > 0,
steps: [
{
name: "log_errors",
run: async (state) => {
console.error(
`Migration finished with ${state.errors.length} batch error(s):`
);
for (const err of state.errors) {
console.error(` Batch ${err.batch}: ${err.message}`);
}
return {};
},
},
],
},
{
name: "rollback_on_total_failure",
condition: (state) => state.status === "failed",
steps: [
{
name: "rollback",
run: async (state) => {
console.error("Total failure — rolling back target database...");
await fetch(`${state.targetDb}/rollback`, { method: "POST" });
return { rollbackPerformed: true };
},
},
],
},
],
});
const { state, stepResults } = await migration.run();
console.log(`Status: ${state.status}`);
console.log(`Migrated: ${state.migratedCount}/${state.recordCount}`);
console.log(`Rollback: ${state.rollbackPerformed ? "yes" : "no"}`);
for (const step of stepResults) {
const errorSuffix = step.error ? ` — ${step.error}` : "";
console.log(` ${step.stepName}: ${step.status} (${step.durationMs}ms)${errorSuffix}`);
}
Workflow Registration
Workflows auto-register in the global registry by default. Use theregistry to list them, run them by name, or pass them to the transport layer’s serve() function for HTTP access.
Copy
Ask AI
import { Agent, Workflow, registry, openai } from "@radaros/core";
import { createExpressGateway } from "@radaros/transport";
const classifier = new Agent({
name: "TicketClassifier",
model: openai("gpt-4o-mini"),
instructions: `Classify the support ticket into exactly one category:
bug, feature-request, question, or billing.
Respond with only the category name, nothing else.`,
register: false,
});
interface TicketState {
ticketBody: string;
classify_output: string;
category: string;
priority: "low" | "medium" | "high";
routed: boolean;
queueUrl: string;
}
const ticketWorkflow = new Workflow<TicketState>({
name: "TicketRouter",
initialState: {
ticketBody: "",
classify_output: "",
category: "",
priority: "medium",
routed: false,
queueUrl: "",
},
steps: [
{
name: "classify",
agent: classifier,
inputFrom: (state) => `Classify this ticket:\n\n${state.ticketBody}`,
},
{
name: "normalize",
run: async (state) => ({
category: state.classify_output.trim().toLowerCase(),
}),
},
{
name: "assign_priority",
run: async (state) => {
const highPriorityKeywords = [
"urgent",
"down",
"outage",
"critical",
"security",
];
const isHigh = highPriorityKeywords.some((kw) =>
state.ticketBody.toLowerCase().includes(kw)
);
return { priority: isHigh ? ("high" as const) : ("medium" as const) };
},
},
{
name: "route",
run: async (state) => {
const queues: Record<string, string> = {
bug: "https://queues.internal/bugs",
"feature-request": "https://queues.internal/features",
question: "https://queues.internal/support",
billing: "https://queues.internal/billing",
};
const endpoint = queues[state.category] ?? queues.question;
await fetch(endpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
ticket: state.ticketBody,
category: state.category,
priority: state.priority,
}),
});
return { routed: true, queueUrl: endpoint };
},
},
],
});
// The workflow is now in the global registry
console.log("Registered workflows:", registry.list().workflows);
// Run directly with initial state overrides
const result = await ticketWorkflow.run({
initialState: {
ticketBody:
"URGENT: Our production API is returning 500 errors on all /users endpoints since 3am.",
},
});
console.log("Category:", result.state.category);
console.log("Priority:", result.state.priority);
console.log("Routed:", result.state.routed, "→", result.state.queueUrl);
// Or expose via Express transport for HTTP access
// const app = createExpressGateway();
// app.listen(3000);
// POST /workflows/TicketRouter/run → executes the workflow