Skip to Content
Agent Data Shuttle 1.0 is out! 🎉
SDKsPythonADS Subscriber

ADS Subscriber

The ADS Subscriber allows you to receive events from ADS Publishers and process them using AI agents. This guide covers the essential methods and configuration options for subscribing to events and invoking AI agents using the Python SDK.

Getting Started

Prerequisites

Before using the ADS Subscriber, ensure you have:

  • Python 3.8+
  • Email/Slack credentials (if using notification channels)
  • AI Agent and LLM instance (for integrating with an AI model and triggering agentic workflows)

Installation

pip install agentdatashuttle-adspy

Initialize Your AI Agent

First, set up your AI agent

import os from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import HumanMessage from langgraph.prebuilt import create_react_agent from agentdatashuttle_adspy.core.subscriber import ADSSubscriber from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector from agentdatashuttle_adspy.core.client import ADSBridgeClientParams from agentdatashuttle_adspy.models.models import ADSDataPayload # Set up your LLM and agent tools agent_tools = [toolA, toolB, toolC] llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash") # Create the agent agent = create_react_agent(llm, agent_tools, debug=True) # Define your agent callback function def invoke_agent(prompt: str, payload: ADSDataPayload) -> str: print("The payload was:", payload) # Add conditional logic based on event type if payload.event_name == "website_up": return "NO INVOCATION FOR THIS EVENT - WEBSITE UP" # Invoke the agent with the prompt response = agent.invoke({"messages": [HumanMessage(prompt)]}) return response["messages"][-1].content

The invoke_agent function is your main callback that processes incoming events. You can customize this function to handle different event types and invoke your AI agent accordingly.

Configure Data Connectors

Set up data connectors to receive events from ADS Publishers:

# Configure ADS Bridge client parameters ads_bridge_client_params = ADSBridgeClientParams( connection_string="http://localhost:9999", path_prefix="/ads_bridge", ads_subscribers_pool_id="<pool_id>" # Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here ) # Create data connector uptime_kuma_connector = ADSDataConnector( connector_name="UptimeKumaConnector", bridge_client_params=ads_bridge_client_params )
⚠️

The ads_subscribers_pool_id must be a UUID (or 2 of them concatenated for more randomness) to ensure proper event routing by the ADS Bridge.

Generate a unique subscribers pool ID using our Pool ID Generator.

For more details about subscriber pools and how this parameter works, check this section.

Each data connector corresponds to a specific event source. You can create multiple connectors to listen to different publishers. So, every time a publisher from either one of the data connectors publishes an event, the subscriber will act on it it.

Set Up Notification Channels (Optional)

To find out supported channels, refer to the reports section

from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel # Email as a medium email_channel = EmailNotificationChannel( "<agent_description>", "<smtp_host>", "<smtp_port>", "<smtp_username>", "<smtp_password>", "[email protected]", "[email protected]" ) # Slack as a medium slack_channel = SlackNotificationChannel( "<agent_description>", "<slack_bot_token>", "#<your-channel>" )

Refer reports to learn more about what the notification channels are for

Create and Start Subscriber

Create the subscriber instance and start listening for events:

# Create subscriber instance ads_subscriber = ADSSubscriber( agent_callback_function=invoke_agent, llm=llm, agent_description="<a detailed description of your agent>", data_connectors=[uptime_kuma_connector], notification_channels=[email_channel, slack_channel] ) # Start the subscriber ads_subscriber.start()
💡

Add a detailed agent_description about what your AI agent does, to improve the results of the agents upon invocation at the subscriber end (To learn more about this refer here). This also helps provide better reports for the subscribers after the agent invocation is complete.

Configuration

ADS Bridge Client Parameters

The ADSBridgeClientParams class defines the connection settings for your ADS Bridge:

from agentdatashuttle_adspy.core.client import ADSBridgeClientParams ads_bridge_client_params = ADSBridgeClientParams( connection_string="http://your-ads-bridge-host:9999", # ADS Bridge URL path_prefix="/ads_bridge" # API path prefix ads_subscribers_pool_id="<pool_id>" # A random UUID to represent your subscribers at the ADS Bridge )

Data Connectors

Data connectors handle the connection to specific event sources:

from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector data_connector = ADSDataConnector( connector_name="YourConnectorName", # Unique identifier for this connector bridge_client_params=ads_bridge_client_params )

You can configure multiple data connectors to receive events from different sources simultaneously.

ADSSubscriber

Constructor

ADSSubscriber( agent_callback_function: callable, llm: BaseChatModel, agent_description: str, data_connectors: List[ADSDataConnector], notification_channels: List[NotificationChannel] = None )

Parameters:

  • agent_callback_function (callable): Function that processes incoming events and invokes your AI agent
  • llm (BaseChatModel): Your language model instance (LangChain’s BaseChatModel)
  • agent_description (str): Description of what your agent does
  • data_connectors (List[ADSDataConnector]): List of data connectors to receive events from
  • notification_channels (List[NotificationChannel], optional): List of notification channels for alerts

Methods

start()

Starts the subscriber to begin listening for events.

ads_subscriber.start()

Agent Callback Function

The agent callback function is the core of your subscriber. It receives events and processes them with your AI agent:

def invoke_agent(prompt: str, payload: ADSDataPayload) -> str: """ Process incoming events and invoke AI agent Args: prompt (str): Generated prompt based on the event payload (ADSDataPayload): The original event payload Returns: str: Response from the AI agent """ print(f"Processing event: {payload.event_name}") # Add your custom logic here if payload.event_name == "website_up": return "NO INVOCATION FOR THIS EVENT - WEBSITE UP" # Invoke your AI agent response = agent.invoke({"messages": [HumanMessage(prompt)]}) return response["messages"][-1].content

You can implement conditional logic in your agent invocation function to handle different event types differently or skip processing for certain events.

Error Handling

⚠️

Always implement proper error handling in your agent callback function to ensure the subscriber remains stable.

def invoke_agent(prompt: str, payload: ADSDataPayload) -> str: try: print(f"Processing event: {payload.event_name}") # Your agent logic here response = agent.invoke({"messages": [HumanMessage(prompt)]}) return response["messages"][-1].content except Exception as error: print(f"Agent invocation failed: {error}") return f"Error processing event: {str(error)}"

Logging

Configure logging level via the LOG_LEVEL environment variable:

LevelDescription
errorCritical errors that may cause the app to crash
warnWarnings about potentially harmful situations
infoGeneral operational information
debugDebug-level logs for development

Next Steps

Last updated on