The AgentQueue class is the producer side of the queue system. It enqueues jobs for agents and workflows to be processed asynchronously by workers.
Setup
import { AgentQueue } from "@radaros/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
queueName: "radaros:jobs", // optional, this is the default
});
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
BullMQ queue name.
Default options for all jobs (priority, attempts, backoff, etc.).
Enqueue Agent Runs
const { jobId } = await queue.enqueueAgentRun({
agentName: "assistant",
input: "Analyze this quarterly report",
sessionId: "user-123",
userId: "user-123",
});
console.log("Job enqueued:", jobId);
With Priority and Delay
const { jobId } = await queue.enqueueAgentRun({
agentName: "processor",
input: "High priority task",
priority: 1, // Lower number = higher priority
delay: 5000, // Delay 5 seconds before processing
});
Enqueue Workflows
const { jobId } = await queue.enqueueWorkflow({
workflowName: "content-pipeline",
initialState: {
topic: "AI in healthcare",
format: "blog-post",
},
sessionId: "session-abc",
});
Check Job Status
const status = await queue.getJobStatus(jobId);
console.log(status.state); // "waiting" | "active" | "completed" | "failed" | "delayed"
console.log(status.progress); // 0-100
console.log(status.result); // RunOutput (when completed)
console.log(status.error); // Error message (when failed)
JobStatus Fields
| Field | Type | Description |
|---|
jobId | string | Unique job identifier |
state | string | Current state |
progress | number | Completion percentage |
result | RunOutput | Agent output (when completed) |
error | string | Error message (when failed) |
createdAt | Date | When the job was enqueued |
processedAt | Date | When processing started |
finishedAt | Date | When processing finished |
Cancel Jobs
await queue.cancelJob(jobId);
Event Handlers
Listen for job completion or failure:
queue.onCompleted((jobId, result) => {
console.log(`Job ${jobId} completed:`, result.text);
// Send notification, update database, etc.
});
queue.onFailed((jobId, error) => {
console.error(`Job ${jobId} failed:`, error.message);
// Alert, retry logic, etc.
});
Cleanup
Full Example
import { Agent, openai } from "@radaros/core";
import { AgentQueue } from "@radaros/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
});
// Enqueue multiple jobs
const jobs = await Promise.all([
queue.enqueueAgentRun({ agentName: "summarizer", input: "Long article text..." }),
queue.enqueueAgentRun({ agentName: "translator", input: "Translate this to Spanish" }),
queue.enqueueAgentRun({ agentName: "classifier", input: "Classify this email" }),
]);
// Monitor all jobs
for (const { jobId } of jobs) {
const status = await queue.getJobStatus(jobId);
console.log(`${jobId}: ${status.state}`);
}
// Wait for completion
queue.onCompleted((id, result) => {
console.log(`Done: ${id} -> ${result.text.slice(0, 100)}`);
});