ADS Subscriber
The ADS Subscriber allows you to receive events from ADS Publishers and process them using AI agents. This guide covers the essential methods and configuration options for subscribing to events and invoking AI agents using the Python SDK.
Getting Started
Prerequisites
Before using the ADS Subscriber, ensure you have:
- Python 3.8+
- Email/Slack credentials (if using notification channels)
- AI Agent and LLM instance (for integrating with an AI model and triggering agentic workflows)
Installation
pip install agentdatashuttle-adspy
Initialize Your AI Agent
First, set up your AI agent
import os
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent
from agentdatashuttle_adspy.core.subscriber import ADSSubscriber
from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector
from agentdatashuttle_adspy.core.client import ADSBridgeClientParams
from agentdatashuttle_adspy.models.models import ADSDataPayload
# Set up your LLM and agent tools
agent_tools = [toolA, toolB, toolC]
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
# Create the agent
agent = create_react_agent(llm, agent_tools, debug=True)
# Define your agent callback function
def invoke_agent(prompt: str, payload: ADSDataPayload) -> str:
print("The payload was:", payload)
# Add conditional logic based on event type
if payload.event_name == "website_up":
return "NO INVOCATION FOR THIS EVENT - WEBSITE UP"
# Invoke the agent with the prompt
response = agent.invoke({"messages": [HumanMessage(prompt)]})
return response["messages"][-1].content
The invoke_agent function is your main callback that processes incoming events. You can customize this function to handle different event types and invoke your AI agent accordingly.
Configure Data Connectors
Set up data connectors to receive events from ADS Publishers:
# Configure ADS Bridge client parameters
ads_bridge_client_params = ADSBridgeClientParams(
connection_string="http://localhost:9999",
path_prefix="/ads_bridge",
ads_subscribers_pool_id="<pool_id>" # Use the Pool ID Generator tool or generate a random UUID for your subscriber pool and replace it here
)
# Create data connector
uptime_kuma_connector = ADSDataConnector(
connector_name="UptimeKumaConnector",
bridge_client_params=ads_bridge_client_params
)
The ads_subscribers_pool_id
must be a UUID (or 2 of them concatenated for more randomness) to ensure proper event routing by the ADS Bridge.
Generate a unique subscribers pool ID using our Pool ID Generator.
For more details about subscriber pools and how this parameter works, check this section.
Each data connector corresponds to a specific event source. You can create multiple connectors to listen to different publishers. So, every time a publisher from either one of the data connectors publishes an event, the subscriber will act on it it.
Set Up Notification Channels (Optional)
To find out supported channels, refer to the reports section
from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel
# Email as a medium
email_channel = EmailNotificationChannel(
"<agent_description>",
"<smtp_host>",
"<smtp_port>",
"<smtp_username>",
"<smtp_password>",
"[email protected]",
"[email protected]"
)
# Slack as a medium
slack_channel = SlackNotificationChannel(
"<agent_description>",
"<slack_bot_token>",
"#<your-channel>"
)
Refer reports to learn more about what the notification channels are for
Create and Start Subscriber
Create the subscriber instance and start listening for events:
# Create subscriber instance
ads_subscriber = ADSSubscriber(
agent_callback_function=invoke_agent,
llm=llm,
agent_description="<a detailed description of your agent>",
data_connectors=[uptime_kuma_connector],
notification_channels=[email_channel, slack_channel]
)
# Start the subscriber
ads_subscriber.start()
Configuration
ADS Bridge Client Parameters
The ADSBridgeClientParams
class defines the connection settings for your ADS Bridge:
from agentdatashuttle_adspy.core.client import ADSBridgeClientParams
ads_bridge_client_params = ADSBridgeClientParams(
connection_string="http://your-ads-bridge-host:9999", # ADS Bridge URL
path_prefix="/ads_bridge" # API path prefix
ads_subscribers_pool_id="<pool_id>" # A random UUID to represent your subscribers at the ADS Bridge
)
Data Connectors
Data connectors handle the connection to specific event sources:
from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector
data_connector = ADSDataConnector(
connector_name="YourConnectorName", # Unique identifier for this connector
bridge_client_params=ads_bridge_client_params
)
You can configure multiple data connectors to receive events from different sources simultaneously.
ADSSubscriber
Constructor
ADSSubscriber(
agent_callback_function: callable,
llm: BaseChatModel,
agent_description: str,
data_connectors: List[ADSDataConnector],
notification_channels: List[NotificationChannel] = None
)
Parameters:
agent_callback_function
(callable): Function that processes incoming events and invokes your AI agentllm
(BaseChatModel): Your language model instance (LangChain’sBaseChatModel
)agent_description
(str): Description of what your agent doesdata_connectors
(List[ADSDataConnector]): List of data connectors to receive events fromnotification_channels
(List[NotificationChannel], optional): List of notification channels for alerts
Methods
start()
Starts the subscriber to begin listening for events.
ads_subscriber.start()
Agent Callback Function
The agent callback function is the core of your subscriber. It receives events and processes them with your AI agent:
def invoke_agent(prompt: str, payload: ADSDataPayload) -> str:
"""
Process incoming events and invoke AI agent
Args:
prompt (str): Generated prompt based on the event
payload (ADSDataPayload): The original event payload
Returns:
str: Response from the AI agent
"""
print(f"Processing event: {payload.event_name}")
# Add your custom logic here
if payload.event_name == "website_up":
return "NO INVOCATION FOR THIS EVENT - WEBSITE UP"
# Invoke your AI agent
response = agent.invoke({"messages": [HumanMessage(prompt)]})
return response["messages"][-1].content
You can implement conditional logic in your agent invocation function to handle different event types differently or skip processing for certain events.
Error Handling
Always implement proper error handling in your agent callback function to ensure the subscriber remains stable.
def invoke_agent(prompt: str, payload: ADSDataPayload) -> str:
try:
print(f"Processing event: {payload.event_name}")
# Your agent logic here
response = agent.invoke({"messages": [HumanMessage(prompt)]})
return response["messages"][-1].content
except Exception as error:
print(f"Agent invocation failed: {error}")
return f"Error processing event: {str(error)}"
Logging
Configure logging level via the LOG_LEVEL
environment variable:
Level | Description |
---|---|
error | Critical errors that may cause the app to crash |
warn | Warnings about potentially harmful situations |
info | General operational information |
debug | Debug-level logs for development |
Next Steps
- Explore the SDK usage for the ADS Publisher to send events
- Explore Node.js SDK for Node.js applications
- Learn about Performance Improvement techniques