Skip to main content
The AgentWorker consumes jobs from the BullMQ queue and executes them using registered agents and workflows. Workers can run in the same process or as separate services.

Setup

import { Agent, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";

const assistant = new Agent({
  name: "assistant",
  model: openai("gpt-4o"),
  instructions: "You are a helpful assistant.",
});

const worker = new AgentWorker({
  connection: { host: "localhost", port: 6379 },
  agentRegistry: { assistant },
  concurrency: 5,
});

worker.start();
connection
{ host: string; port: number }
required
Redis connection details. Must match the producer’s connection.
queueName
string
default:"radaros:jobs"
Queue name to consume from. Must match the producer.
concurrency
number
default:"5"
Number of jobs to process simultaneously.
agentRegistry
Record<string, Agent>
required
Map of agent names to Agent instances. Names must match what the producer enqueues.
workflowRegistry
Record<string, Workflow>
Map of workflow names to Workflow instances.

How It Works

  1. Worker connects to Redis and listens for jobs on the configured queue
  2. When a job arrives, it looks up the agent/workflow by name in the registry
  3. Executes agent.run() or workflow.run() with the job’s input
  4. Reports progress via BullMQ job progress updates
  5. Stores the result (or error) back to Redis for the producer to retrieve

With Workflows

import { Agent, Workflow, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";

const researcher = new Agent({
  name: "researcher",
  model: openai("gpt-4o"),
  instructions: "Research the given topic thoroughly.",
});

const writer = new Agent({
  name: "writer",
  model: openai("gpt-4o"),
  instructions: "Write a blog post based on the research.",
});

const pipeline = new Workflow({
  name: "content-pipeline",
  initialState: { topic: "", research: "", article: "" },
  steps: [
    { name: "research", agent: researcher, inputFrom: (s) => s.topic },
    { name: "write", agent: writer, inputFrom: (s) => `Write about: ${s.research}` },
  ],
});

const worker = new AgentWorker({
  connection: { host: "localhost", port: 6379 },
  agentRegistry: { researcher, writer },
  workflowRegistry: { "content-pipeline": pipeline },
  concurrency: 3,
});

worker.start();

Graceful Shutdown

process.on("SIGTERM", async () => {
  console.log("Shutting down worker...");
  await worker.stop();
  process.exit(0);
});
The stop() method waits for currently active jobs to complete before shutting down.

Event Bridging

The bridgeEventBusToJob utility connects an agent’s EventBus to BullMQ’s job progress system, enabling real-time progress tracking:
import { bridgeEventBusToJob } from "@radaros/queue";

// This is used internally by AgentWorker, but you can use it
// for custom worker implementations
const cleanup = bridgeEventBusToJob(agent.eventBus, job, runId);

// cleanup() removes all listeners when done
Events bridged:
  • run.stream.chunk updates job progress
  • tool.call adds to job logs
  • tool.result adds to job logs

Scaling

Run multiple worker processes to scale horizontally. BullMQ handles job distribution automatically:
# Terminal 1
node worker.js

# Terminal 2
node worker.js

# Terminal 3
node worker.js
Each worker processes up to concurrency jobs simultaneously. With 3 workers at concurrency 5, you can process 15 jobs in parallel.

Full Producer + Worker Example

// producer.ts
import { AgentQueue } from "@radaros/queue";

const queue = new AgentQueue({
  connection: { host: "localhost", port: 6379 },
});

const { jobId } = await queue.enqueueAgentRun({
  agentName: "assistant",
  input: "Summarize the latest AI research papers",
});

queue.onCompleted((id, result) => {
  console.log(`Result: ${result.text}`);
  queue.close();
});
// worker.ts
import { Agent, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";

const assistant = new Agent({
  name: "assistant",
  model: openai("gpt-4o"),
  instructions: "Summarize research papers concisely.",
});

const worker = new AgentWorker({
  connection: { host: "localhost", port: 6379 },
  agentRegistry: { assistant },
});

worker.start();
console.log("Worker listening for jobs...");