ADS Subscriber
The ADS Subscriber enables you to consume events from ADS Publishers and process them with AI agents. This guide covers the essential methods and configuration options for setting up event subscribers using the Node.js SDK.
Getting Started
Initialize Your AI Agent
First, set up your AI agent
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
import { HumanMessage } from "@langchain/core/messages";
import { createReactAgent } from "@langchain/langgraph/prebuilt";
import {
types,
dataconnector,
subscriber,
notifications,
} from "@agentdatashuttle/adsjs";
// Configure your AI agent
const agentTools = [toolA, toolB, toolC];
const llm = new ChatGoogleGenerativeAI({ model: "gemini-2.0-flash" });
const agent = createReactAgent({
llm: llm,
tools: agentTools,
});
// Define agent invocation callback
const invoke_agent = async (
prompt: string,
payload: types.ADSDataPayload
) => {
const response = await agent.invoke({
messages: [new HumanMessage(prompt)],
});
return response.messages[response.messages.length - 1].content as string;
};
Configure Data Connectors
Set up data connectors to receive events from ADS Publishers:
// Give the connection parameters of ADS Bridge hosted by the ADS Publisher
const adsClientParamsNotion: types.ADSBridgeClientParams = {
connection_string: "http://localhost:9999",
path_prefix: "/ads_bridge",
ads_subscribers_pool_id: "<pool_id>", // Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here
};
const adsClientParamsGithub: types.ADSBridgeClientParams = {
connection_string: "http://localhost:9009",
path_prefix: "/ads_bridge",
ads_subscribers_pool_id: "<pool_id>", // Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here
};
// Create data connector
const notionDataConnector = new dataconnector.ADSDataConnector(
"NotionDataConnector",
adsClientParamsNotion
);
const githubDataConnector = new dataconnector.ADSDataConnector(
"GithubDataConnector",
adsClientParamsGithub
);
The ads_subscribers_pool_id
must be a random UUID (or 2 of them concatenated for more randomness) to ensure proper event routing by the ADS Bridge.
Generate a unique subscribers pool ID using our Pool ID Generator.
For more details about subscriber pools and how this parameter works, check this section.
Each data connector corresponds to a specific event source. You can create multiple connectors to listen to different publishers. So, every time a publisher from either one of the data connectors publishes an event, the subscriber will act on it it.
Configure Redis for Job Processing
Provide Redis configuration for job processing:
// Configure Redis for job processing
const redisParams: types.RedisParams = {
host: "localhost",
port: 6379,
username: "admin", // Optional, if your Redis instance requires authentication
password: "pass", // Optional, if your Redis instance requires authentication
};
If you would like to know more about why Redis is used, refer to the Job Queue section.
Set Up Notification Channels (Optional)
// Email as a medium
const emailChannel = new notifications.EmailNotificationChannel(
agentDescription,
"<smtp_host>",
"<smtp_port>",
"<smtp_username>",
"<smtp_password>",
"<from_address>",
"<to_address>"
);
// Slack as a medium
const slackChannel = new notifications.SlackNotificationChannel(
agentDescription,
process.env.SLACK_BOT_TOKEN || "",
"#ads-notifications-channel"
);
Refer reports to learn more about what the notification channels are for
Create and Start Subscriber
// Create subscriber instance
const adsSubscriber = new subscriber.ADSSubscriber(
null, // Agent invocation function (synchronous)
invoke_agent, // Agent invocation function (asynchronous)
llm, // Language model instance
agentDescription, // Detailed description of your agent
[notionDataConnector, githubDataConnector], // Array of data connectors
redisParams, // Redis configuration
[emailChannel, slackChannel] // Notification channels (leave it empty if not needed)
);
// Start listening for events
await adsSubscriber.start();
Configuration
ADS Bridge Client Parameters
The ADSBridgeClientParams
interface defines the connection settings for the ADS Bridge:
const adsBridgeClientParams: types.ADSBridgeClientParams = {
connection_string: string, // ADS Bridge URL
path_prefix: string // API path prefix
ads_subscribers_pool_id: string // A random UUID to represent your subscribers at the ADS Bridge
};
Redis Parameters
The RedisParams
interface configures the Redis connection for job processing:
const redisParams: types.RedisParams = {
host: string, // Redis server hostname
port: number, // Redis server port
username: string, // Optional Redis username for authentication
password: string, // Optional Redis password for authentication
};
ADSDataConnector
Constructor
new dataconnector.ADSDataConnector(connectorName: string, clientParams: ADSBridgeClientParams)
Parameters:
connectorName
(string): Identifier for your data connectorclientParams
(ADSBridgeClientParams): ADS Bridge connection configuration
ADSSubscriber
Constructor
new subscriber.ADSSubscriber(
agentCallbackFunctionSync: ((agentPrompt: string, ads_payload: ADSDataPayload) => string) | null,
agentCallbackFunctionAsync: ((agentPrompt: string, ads_payload: ADSDataPayload) => Promise<string>) | null,
llm: any,
agentDescription: string,
dataConnectors: ADSDataConnector[],
redisParams: RedisParams,
notificationChannels?: NotificationChannel[]
)
Parameters:
agentCallbackFunctionSync
: Optional context object for the subscriberagentCallbackFunctionAsync
: Function that invokes your AI agent with event datallm
: Language model instance (LangChain’sBaseChatModel
)agentDescription
: Description of your agent’s capabilitiesdataConnectors
: Array of data connectors to listen toredisParams
: Redis configuration for job processingnotificationChannels
: Optional notification channels for alerts
Methods
start()
Starts the subscriber to listen for events and process them with your agent.
await adsSubscriber.start(): Promise<void>
Agent Invocation Function
This is the callback function that invokes your AI agent when the ADS Subscriber receives an event from the ADS Publisher. The callback function has access to two parameters:
agentPrompt
(string): Optimized prompt generated based on the event data. Refer to the Performance Improvement section for more details.ads_payload
(ADSDataPayload): The original event payload from the publisher
You can implement conditional logic in your agent invocation function to handle different event types differently or skip processing for certain events.
Error Handling
Implement proper error handling in your agent invocation function to ensure the subscriber remains stable when processing events.
const invoke_agent = async (prompt: string, payload: types.ADSDataPayload) => {
try {
const response = await agent.invoke({
messages: [new HumanMessage(prompt)],
});
return response.messages[response.messages.length - 1].content as string;
} catch (error) {
console.error("Agent invocation failed:", error.message);
return "AGENT_INVOCATION_FAILED: " + error.message;
}
};
Next Steps
- Explore the ADS Publisher documentation to understand event publishing
- Check out Python SDK for Python applications
- Get your hands dirty with the quickstart for subscribers to see how everything works together