For ADS Subscribers
Get started with setting up the ADS Subscriber using the SDK to consume events from publishers and process them in just a few steps.
For n8n users, refer to this.
Prerequisites
- Ensure you have the details about the publisher’s ADS Bridge handy
You can connect your ADS Subscriber to multiple ADS publishers as well, via multiple
ADSDataConnector
objects. - A valid ADS Subscribers Pool ID to group your subscriber instances
- Redis server for job processing (Node.js/n8n only)
- API key to your LLM provider (e.g., Google AI API key for Gemini, OpenAI API key, etc.)
Environment Setup
Create a .env
file with the required variables:
# Example: Google AI API Key for Gemini
GOOGLE_API_KEY=your_google_ai_api_key_here
# ADS Bridge Configuration
ADS_BRIDGE_URL=http://localhost:9999
SUBSCRIBER_POOL_ID=your-unique-uuid-here
# Redis Configuration (Node.js only)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USERNAME=admin
REDIS_PASSWORD=pass
# SMTP Configuration
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USERNAME=your_email_username
SMTP_PASSWORD=your_email_password
# Email Addresses
FROM_EMAIL=[email protected]
TO_EMAIL=[email protected]
It is necessary that the SUBSCRIBER_POOL_ID
is highly unique, ideally a
UUID. Generate one using our tool.
Installation
Node.js
npm install @agentdatashuttle/adsjs @langchain/google-genai @langchain/langgraph @langchain/core
Complete Working Example
Node.js
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { HumanMessage } from "@langchain/core/messages";
import { createReactAgent } from "@langchain/langgraph/prebuilt";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import dotenv from "dotenv";
dotenv.config();
// Import ADS Subscriber and add ADS Data Connectors
import {
types,
dataconnector,
subscriber,
notifications,
} from "@agentdatashuttle/adsjs";
dotenv.config();
(async () => {
// Define support ticket categorization tool
//@ts-ignore
const categorizeTicket = tool(
async (args: any) => {
// Simulate categorization logic
const categories = ["Technical", "Billing", "General Inquiry"];
const category =
categories[Math.floor(Math.random() * categories.length)];
return `Ticket categorized as: ${category}`;
},
{
name: "categorize_ticket",
description: "Categorizes a support ticket based on its content.",
schema: z.object({
ticket_content: z
.string()
.describe("The content of the support ticket."),
}),
}
);
// Define the tools for the agent to use
const agentTools = [categorizeTicket];
const llm = new ChatGoogleGenerativeAI({ model: "gemini-2.0-flash" });
//@ts-ignore
const agent = createReactAgent({
llm: llm,
tools: agentTools,
});
// Step 1: Define callback function for ADS Subscriber to invoke agent
const invoke_agent = async (
prompt: string,
payload: types.ADSDataPayload
) => {
console.log("The payload was:", payload);
// Filter specific events in/out as you desire
if (payload.event_name === "support_ticket_closed") {
return "NO INVOCATION FOR THIS EVENT - TICKET CLOSED";
}
// Invoke your agent with the context enriched prompt generated by Agent Data Shuttle
const response = await agent.invoke({
messages: [new HumanMessage(prompt)],
});
// Return final agent response - will be sent to all notification channels for later review
return response.messages[response.messages.length - 1].content as string;
};
// Step 2: Define ADSBridgeClientParams and corresponding ADSDataConnector
const adsBridgeClientParams: types.ADSBridgeClientParams = {
connection_string: process.env.ADS_BRIDGE_URL || "http://localhost:9999",
path_prefix: "/ads_bridge",
ads_subscribers_pool_id:
process.env.SUBSCRIBER_POOL_ID || "<a_random_uuid>", // Replace with your actual pool ID to group horizontally scaled replicas of ADS Subscribers - use https://agentdatashuttle.knowyours.co/pool-id-generator to make one if needed
};
const dataConnectorOne = new dataconnector.ADSDataConnector(
"SupportTicketConnector",
adsBridgeClientParams
);
const redisParams: types.RedisParams = {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
username: process.env.REDIS_USERNAME || "",
password: process.env.REDIS_PASSWORD || "",
};
const agentDescription =
"This agent categorizes support tickets based on their content.";
// Step 3: Optionally, add notification channels
const emailChannel = new notifications.EmailNotificationChannel(
agentDescription,
process.env.SMTP_HOST || "smtp.gmail.com",
parseInt(process.env.SMTP_PORT ?? "") || 587,
process.env.SMTP_USERNAME || "",
process.env.SMTP_PASSWORD || "",
process.env.FROM_EMAIL || "[email protected]",
process.env.TO_EMAIL || "[email protected]"
);
// Step 4: Create the ADSSubscriber with the callback function, LLM, and Data Connectors.
// The ADSSubscriber will listen for events from all the data connectors and invoke the agent.
const adsSubscriber = new subscriber.ADSSubscriber(
null,
invoke_agent,
llm,
agentDescription,
[dataConnectorOne],
redisParams,
[emailChannel]
);
// Step 5: Start the ADSSubscriber to listen for events and invoke the agent.
await adsSubscriber.start();
})();
The ads_subscribers_pool_id
must be a UUID to ensure proper event routing by
the ADS Bridge. Generate a unique subscribers pool ID using our Pool ID
Generator.
What Happens Next
Once your subscriber is running, it will:
- Listen for Events: Automatically receive ADS events from the configured ADS Publisher
- Process Events: Your AI agent will analyze each event using the optimized prompt generated by ADS
- Generate Responses: The agent processes the event information and provides intelligent responses
- Handle Different Event Types: Skip processing for certain events like closed tickets
Test Your Setup
To test your subscriber, you can use the ADS Publisher Quickstart to send sample ADS events.
Your subscriber will automatically process these events and display the results in the console.