Skip to Content
Agent Data Shuttle 1.0 is out! 🎉
SDKsNode.jsADS Subscriber

ADS Subscriber

The ADS Subscriber enables you to consume events from ADS Publishers and process them with AI agents. This guide covers the essential methods and configuration options for setting up event subscribers using the Node.js SDK.

Getting Started

Initialize Your AI Agent

First, set up your AI agent

import { ChatGoogleGenerativeAI } from "@langchain/google-genai"; import { HumanMessage } from "@langchain/core/messages"; import { createReactAgent } from "@langchain/langgraph/prebuilt"; import { types, dataconnector, subscriber, notifications, } from "@agentdatashuttle/adsjs"; // Configure your AI agent const agentTools = [toolA, toolB, toolC]; const llm = new ChatGoogleGenerativeAI({ model: "gemini-2.0-flash" }); const agent = createReactAgent({ llm: llm, tools: agentTools, }); // Define agent invocation callback const invoke_agent = async ( prompt: string, payload: types.ADSDataPayload ) => { const response = await agent.invoke({ messages: [new HumanMessage(prompt)], }); return response.messages[response.messages.length - 1].content as string; };

Configure Data Connectors

Set up data connectors to receive events from ADS Publishers:

// Give the connection parameters of ADS Bridge hosted by the ADS Publisher const adsClientParamsNotion: types.ADSBridgeClientParams = { connection_string: "http://localhost:9999", path_prefix: "/ads_bridge", ads_subscribers_pool_id: "<pool_id>", // Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here }; const adsClientParamsGithub: types.ADSBridgeClientParams = { connection_string: "http://localhost:9009", path_prefix: "/ads_bridge", ads_subscribers_pool_id: "<pool_id>", // Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here }; // Create data connector const notionDataConnector = new dataconnector.ADSDataConnector( "NotionDataConnector", adsClientParamsNotion ); const githubDataConnector = new dataconnector.ADSDataConnector( "GithubDataConnector", adsClientParamsGithub );
⚠️

The ads_subscribers_pool_id must be a random UUID (or 2 of them concatenated for more randomness) to ensure proper event routing by the ADS Bridge.

Generate a unique subscribers pool ID using our Pool ID Generator.

For more details about subscriber pools and how this parameter works, check this section.

Each data connector corresponds to a specific event source. You can create multiple connectors to listen to different publishers. So, every time a publisher from either one of the data connectors publishes an event, the subscriber will act on it it.

Configure Redis for Job Processing

Provide Redis configuration for job processing:

// Configure Redis for job processing const redisParams: types.RedisParams = { host: "localhost", port: 6379, username: "admin", // Optional, if your Redis instance requires authentication password: "pass", // Optional, if your Redis instance requires authentication };

If you would like to know more about why Redis is used, refer to the Job Queue section.

Set Up Notification Channels (Optional)

// Email as a medium const emailChannel = new notifications.EmailNotificationChannel( agentDescription, "<smtp_host>", "<smtp_port>", "<smtp_username>", "<smtp_password>", "<from_address>", "<to_address>" ); // Slack as a medium const slackChannel = new notifications.SlackNotificationChannel( agentDescription, process.env.SLACK_BOT_TOKEN || "", "#ads-notifications-channel" );

Refer reports to learn more about what the notification channels are for

Create and Start Subscriber

// Create subscriber instance const adsSubscriber = new subscriber.ADSSubscriber( null, // Agent invocation function (synchronous) invoke_agent, // Agent invocation function (asynchronous) llm, // Language model instance agentDescription, // Detailed description of your agent [notionDataConnector, githubDataConnector], // Array of data connectors redisParams, // Redis configuration [emailChannel, slackChannel] // Notification channels (leave it empty if not needed) ); // Start listening for events await adsSubscriber.start();
đź’ˇ

Add a detailed agentDescription about what your AI agent does, to improve the results of the agents upon invocation at the subscriber end (To learn more about this refer here). This also helps provide better reports for the subscribers after the agent invocation is complete.

Configuration

ADS Bridge Client Parameters

The ADSBridgeClientParams interface defines the connection settings for the ADS Bridge:

const adsBridgeClientParams: types.ADSBridgeClientParams = { connection_string: string, // ADS Bridge URL path_prefix: string // API path prefix ads_subscribers_pool_id: string // A random UUID to represent your subscribers at the ADS Bridge };

Redis Parameters

The RedisParams interface configures the Redis connection for job processing:

const redisParams: types.RedisParams = { host: string, // Redis server hostname port: number, // Redis server port username: string, // Optional Redis username for authentication password: string, // Optional Redis password for authentication };

ADSDataConnector

Constructor

new dataconnector.ADSDataConnector(connectorName: string, clientParams: ADSBridgeClientParams)

Parameters:

  • connectorName (string): Identifier for your data connector
  • clientParams (ADSBridgeClientParams): ADS Bridge connection configuration

ADSSubscriber

Constructor

new subscriber.ADSSubscriber( agentCallbackFunctionSync: ((agentPrompt: string, ads_payload: ADSDataPayload) => string) | null, agentCallbackFunctionAsync: ((agentPrompt: string, ads_payload: ADSDataPayload) => Promise<string>) | null, llm: any, agentDescription: string, dataConnectors: ADSDataConnector[], redisParams: RedisParams, notificationChannels?: NotificationChannel[] )

Parameters:

  • agentCallbackFunctionSync: Optional context object for the subscriber
  • agentCallbackFunctionAsync: Function that invokes your AI agent with event data
  • llm: Language model instance (LangChain’s BaseChatModel)
  • agentDescription: Description of your agent’s capabilities
  • dataConnectors: Array of data connectors to listen to
  • redisParams: Redis configuration for job processing
  • notificationChannels: Optional notification channels for alerts

Methods

start()

Starts the subscriber to listen for events and process them with your agent.

await adsSubscriber.start(): Promise<void>

Agent Invocation Function

This is the callback function that invokes your AI agent when the ADS Subscriber receives an event from the ADS Publisher. The callback function has access to two parameters:

  • agentPrompt (string): Optimized prompt generated based on the event data. Refer to the Performance Improvement section for more details.
  • ads_payload (ADSDataPayload): The original event payload from the publisher

You can implement conditional logic in your agent invocation function to handle different event types differently or skip processing for certain events.

Error Handling

⚠️

Implement proper error handling in your agent invocation function to ensure the subscriber remains stable when processing events.

const invoke_agent = async (prompt: string, payload: types.ADSDataPayload) => { try { const response = await agent.invoke({ messages: [new HumanMessage(prompt)], }); return response.messages[response.messages.length - 1].content as string; } catch (error) { console.error("Agent invocation failed:", error.message); return "AGENT_INVOCATION_FAILED: " + error.message; } };

Next Steps

Last updated on