Skip to Content
Agent Data Shuttle 1.0 is out! 🎉
QuickstartFor ADS Subscribers

For ADS Subscribers

Get started with setting up the ADS Subscriber using the SDK to consume events from publishers and process them in just a few steps.

For n8n users, refer to this.

Prerequisites

  • Ensure you have the details about the publisher’s ADS Bridge handy

    You can connect your ADS Subscriber to multiple ADS publishers as well, via multiple ADSDataConnector objects.

  • A valid ADS Subscribers Pool ID to group your subscriber instances
  • Redis server for job processing (Node.js/n8n only)
  • API key to your LLM provider (e.g., Google AI API key for Gemini, OpenAI API key, etc.)

Environment Setup

Create a .env file with the required variables:

# Example: Google AI API Key for Gemini GOOGLE_API_KEY=your_google_ai_api_key_here # ADS Bridge Configuration ADS_BRIDGE_URL=http://localhost:9999 SUBSCRIBER_POOL_ID=your-unique-uuid-here # Redis Configuration (Node.js only) REDIS_HOST=localhost REDIS_PORT=6379 REDIS_USERNAME=admin REDIS_PASSWORD=pass # SMTP Configuration SMTP_HOST=smtp.gmail.com SMTP_PORT=587 SMTP_USERNAME=your_email_username SMTP_PASSWORD=your_email_password # Email Addresses FROM_EMAIL=[email protected] TO_EMAIL=[email protected]
⚠️

It is necessary that the SUBSCRIBER_POOL_ID is highly unique, ideally a UUID. Generate one using our tool.

Installation

npm install @agentdatashuttle/adsjs @langchain/google-genai @langchain/langgraph @langchain/core

Complete Working Example

import { ChatGoogleGenerativeAI } from "@langchain/google-genai"; import { HumanMessage } from "@langchain/core/messages"; import { createReactAgent } from "@langchain/langgraph/prebuilt"; import { tool } from "@langchain/core/tools"; import { z } from "zod"; import dotenv from "dotenv"; dotenv.config(); // Import ADS Subscriber and add ADS Data Connectors import { types, dataconnector, subscriber, notifications, } from "@agentdatashuttle/adsjs"; dotenv.config(); (async () => { // Define support ticket categorization tool //@ts-ignore const categorizeTicket = tool( async (args: any) => { // Simulate categorization logic const categories = ["Technical", "Billing", "General Inquiry"]; const category = categories[Math.floor(Math.random() * categories.length)]; return `Ticket categorized as: ${category}`; }, { name: "categorize_ticket", description: "Categorizes a support ticket based on its content.", schema: z.object({ ticket_content: z .string() .describe("The content of the support ticket."), }), } ); // Define the tools for the agent to use const agentTools = [categorizeTicket]; const llm = new ChatGoogleGenerativeAI({ model: "gemini-2.0-flash" }); //@ts-ignore const agent = createReactAgent({ llm: llm, tools: agentTools, }); // Step 1: Define callback function for ADS Subscriber to invoke agent const invoke_agent = async ( prompt: string, payload: types.ADSDataPayload ) => { console.log("The payload was:", payload); // Filter specific events in/out as you desire if (payload.event_name === "support_ticket_closed") { return "NO INVOCATION FOR THIS EVENT - TICKET CLOSED"; } // Invoke your agent with the context enriched prompt generated by Agent Data Shuttle const response = await agent.invoke({ messages: [new HumanMessage(prompt)], }); // Return final agent response - will be sent to all notification channels for later review return response.messages[response.messages.length - 1].content as string; }; // Step 2: Define ADSBridgeClientParams and corresponding ADSDataConnector const adsBridgeClientParams: types.ADSBridgeClientParams = { connection_string: process.env.ADS_BRIDGE_URL || "http://localhost:9999", path_prefix: "/ads_bridge", ads_subscribers_pool_id: process.env.SUBSCRIBER_POOL_ID || "<a_random_uuid>", // Replace with your actual pool ID to group horizontally scaled replicas of ADS Subscribers - use https://agentdatashuttle.knowyours.co/pool-id-generator to make one if needed }; const dataConnectorOne = new dataconnector.ADSDataConnector( "SupportTicketConnector", adsBridgeClientParams ); const redisParams: types.RedisParams = { host: process.env.REDIS_HOST || "localhost", port: parseInt(process.env.REDIS_PORT || "6379"), username: process.env.REDIS_USERNAME || "", password: process.env.REDIS_PASSWORD || "", }; const agentDescription = "This agent categorizes support tickets based on their content."; // Step 3: Optionally, add notification channels const emailChannel = new notifications.EmailNotificationChannel( agentDescription, process.env.SMTP_HOST || "smtp.gmail.com", parseInt(process.env.SMTP_PORT ?? "") || 587, process.env.SMTP_USERNAME || "", process.env.SMTP_PASSWORD || "", process.env.FROM_EMAIL || "[email protected]", process.env.TO_EMAIL || "[email protected]" ); // Step 4: Create the ADSSubscriber with the callback function, LLM, and Data Connectors. // The ADSSubscriber will listen for events from all the data connectors and invoke the agent. const adsSubscriber = new subscriber.ADSSubscriber( null, invoke_agent, llm, agentDescription, [dataConnectorOne], redisParams, [emailChannel] ); // Step 5: Start the ADSSubscriber to listen for events and invoke the agent. await adsSubscriber.start(); })();
⚠️

The ads_subscribers_pool_id must be a UUID to ensure proper event routing by the ADS Bridge. Generate a unique subscribers pool ID using our Pool ID Generator.

What Happens Next

Once your subscriber is running, it will:

  1. Listen for Events: Automatically receive ADS events from the configured ADS Publisher
  2. Process Events: Your AI agent will analyze each event using the optimized prompt generated by ADS
  3. Generate Responses: The agent processes the event information and provides intelligent responses
  4. Handle Different Event Types: Skip processing for certain events like closed tickets

Test Your Setup

To test your subscriber, you can use the ADS Publisher Quickstart to send sample ADS events.

Your subscriber will automatically process these events and display the results in the console.

Last updated on