Skip to main content

Installation

npm install @radaros/queue bullmq ioredis
Requires a running Redis server.

AgentQueue

The producer that enqueues agent and workflow jobs.
import { AgentQueue } from "@radaros/queue";

Constructor

new AgentQueue(config: QueueConfig)
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
Name of the BullMQ queue.
defaultJobOptions
Record<string, unknown>
Default options applied to all jobs (priority, attempts, delay, etc.).

Methods

enqueueAgentRun

async enqueueAgentRun(opts: {
  agentName: string;
  input: string;
  sessionId?: string;
  userId?: string;
  priority?: number;
  delay?: number;
}): Promise<{ jobId: string }>
Enqueue an agent execution job.

enqueueWorkflow

async enqueueWorkflow(opts: {
  workflowName: string;
  initialState?: Record<string, unknown>;
  sessionId?: string;
  priority?: number;
  delay?: number;
}): Promise<{ jobId: string }>
Enqueue a workflow execution job.

getJobStatus

async getJobStatus(jobId: string): Promise<JobStatus>
Get the current status of a job.

cancelJob

async cancelJob(jobId: string): Promise<void>
Cancel a pending or active job.

Event Handlers

queue.onCompleted((jobId: string, result: RunOutput) => {
  console.log(`Job ${jobId} completed:`, result.text);
});

queue.onFailed((jobId: string, error: Error) => {
  console.error(`Job ${jobId} failed:`, error.message);
});

close

async close(): Promise<void>
Close the queue connection.

AgentWorker

The consumer that processes jobs from the queue.
import { AgentWorker } from "@radaros/queue";

Constructor

new AgentWorker(config: WorkerConfig)
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
Queue name to consume from.
concurrency
number
default:"5"
Number of jobs processed in parallel.
agentRegistry
Record<string, Agent>
required
Map of agent name to Agent instance.
workflowRegistry
Record<string, Workflow>
Map of workflow name to Workflow instance.

Methods

start

start(): void
Begin processing jobs from the queue.

stop

async stop(): Promise<void>
Gracefully stop the worker.

Types

JobPayload

type JobPayload = AgentJobPayload | WorkflowJobPayload;

interface AgentJobPayload {
  type: "agent";
  agentName: string;
  input: string;
  sessionId?: string;
  userId?: string;
}

interface WorkflowJobPayload {
  type: "workflow";
  workflowName: string;
  initialState?: Record<string, unknown>;
  sessionId?: string;
}

JobStatus

interface JobStatus {
  jobId: string;
  state: "waiting" | "active" | "completed" | "failed" | "delayed";
  progress?: number;
  result?: RunOutput;
  error?: string;
  createdAt: Date;
  processedAt?: Date;
  finishedAt?: Date;
}

Utilities

bridgeEventBusToJob

import { bridgeEventBusToJob } from "@radaros/queue";

const cleanup = bridgeEventBusToJob(eventBus, job, runId);
// Returns cleanup function to remove listeners
Bridges agent EventBus events to BullMQ job progress and logs, enabling real-time job status tracking.

Example

import { Agent, openai } from "@radaros/core";
import { AgentQueue, AgentWorker } from "@radaros/queue";

const agent = new Agent({
  name: "processor",
  model: openai("gpt-4o"),
  instructions: "Process the given data.",
});

// Producer
const queue = new AgentQueue({
  connection: { host: "localhost", port: 6379 },
});

const { jobId } = await queue.enqueueAgentRun({
  agentName: "processor",
  input: "Analyze this dataset",
});

console.log("Enqueued job:", jobId);

// Worker (can be in a separate process)
const worker = new AgentWorker({
  connection: { host: "localhost", port: 6379 },
  agentRegistry: { processor: agent },
  concurrency: 3,
});

worker.start();

// Monitor
queue.onCompleted((id, result) => {
  console.log(`Job ${id}:`, result.text);
});