Skip to content

Event-Driven Automation in the Network

Event-Driven Automation in the Network: Webhooks, Message Queues, and Real-Time Response


This post is part of our ongoing series on network automation best practices, grounded in the PRIME Framework and PRIME Philosophy.

Transparency Note

Examples, scenarios, and any outcome figures in this article are provided for education and are based on enterprise delivery experience or anonymised composite scenarios unless explicitly identified as direct Nautomation Prime client outcomes.

Why This Blog Exists

Polling is slow and inefficient. Event-driven automation enables real-time response to network changes. This post covers webhooks, message queues, and how to build event-driven workflows with the PRIME Framework.


What is Event-Driven Automation?

  • Automation triggered by events (not polling)
  • Uses webhooks, message queues (RabbitMQ, Kafka), SNMP traps, or streaming telemetry
  • Enables real-time, scalable, and decoupled workflows
  • Supports closed-loop automation and self-healing networks

Why Go Event-Driven? (Benefits & Use Cases)

  • Faster response: Immediate action on incidents, config changes, or security events
  • Resource efficiency: No more wasteful polling or constant API calls
  • Scalability: Decouple producers (devices, systems) from consumers (automation, monitoring)
  • Reliability: Buffer and retry events with queues; avoid missed changes

Common Use Cases:

  • Automated ticket creation on device failure
  • Real-time compliance checks on config changes
  • Closed-loop remediation (e.g., auto-remediate BGP flap)
  • Security alerting and quarantine

Event Sources in Network Automation

  • Webhooks: Direct HTTP callbacks from ITSM, monitoring, or network tools
  • Message Queues: RabbitMQ, Kafka, AWS SQS for scalable, decoupled event delivery
  • SNMP Traps & Syslog: Legacy but still useful for device events
  • Streaming Telemetry: Model-driven, high-frequency data for analytics

Example 1: Consuming a Webhook in Python (Flask)

from flask import Flask, request
import logging

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

@app.route('/webhook', methods=['POST'])
def webhook():
  data = request.json
  logging.info(f"Received event: {data}")
  # Trigger automation based on event type
  if data.get('event_type') == 'interface_down':
    remediate_interface(data['device'], data['interface'])
  return '', 200

def remediate_interface(device, interface):
  # Example: Push config or open ticket
  logging.info(f"Remediating {device} {interface}")

Example 2: Consuming Events from Kafka (Advanced)

import asyncio
import json
import aio_pika
from typing import Callable, List

class NetworkEventConsumer:
    """Consume network events asynchronously and trigger automation."""

    def __init__(self, amqp_url: str, handlers: dict):
        """
        Args:
            amqp_url: RabbitMQ connection URL
            handlers: Dict of event_type -> handler_function mappings
        """
        self.amqp_url = amqp_url
        self.handlers = handlers
        self.connection = None
        self.channel = None

    async def connect(self):
        """Establish connection to RabbitMQ."""
        self.connection = await aio_pika.connect_robust(self.amqp_url)
        self.channel = await self.connection.channel()
        print("Connected to RabbitMQ")

    async def on_message(self, message: aio_pika.IncomingMessage):
        """Handle incoming message."""
        async with message.process():
            try:
                body = json.loads(message.body.decode())
                event_type = body.get('event_type')

                # Route to appropriate handler
                handler = self.handlers.get(event_type)
                if handler:
                    await handler(body)
                else:
                    print(f"No handler for event type: {event_type}")
            except json.JSONDecodeError as e:
                print(f"Failed to decode message: {e}")
            except Exception as e:
                print(f"Error processing message: {e}")

    async def consume(self, queue_name: str):
        """Start consuming from queue."""
        queue = await self.channel.declare_queue(queue_name, durable=True)
        print(f"Consuming from queue: {queue_name}")

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                await self.on_message(message)

# Example handlers
async def handle_interface_down(event):
    """Handle interface down event."""
    device = event['device']
    interface = event['interface']
    print(f"Interface down: {device} {interface}")

    # Check if device is reachable
    reachable = await check_device_reachability(device)
    if not reachable:
        await create_incident_ticket(device, f"Device unreachable")
    else:
        await create_work_ticket(device, interface, "Interface down")

async def handle_device_reboot(event):
    """Handle device reboot event."""
    device = event['device']
    reason = event.get('reason', 'Unknown')
    print(f"Device rebooted: {device} ({reason})")

    # Trigger post-reboot validation
    await validate_device_state(device)

async def handle_config_change(event):
    """Handle config change event."""
    device = event['device']
    change_id = event['change_id']
    print(f"Config change on {device}: {change_id}")

    # Validate change doesn't violate compliance
    await validate_compliance(device, change_id)

# Main
async def main():
    consumer = NetworkEventConsumer(
        amqp_url='amqp://guest:guest@localhost/',
        handlers={
            'interface_down': handle_interface_down,
            'device_reboot': handle_device_reboot,
            'config_change': handle_config_change,
        }
    )

    await consumer.connect()
    await consumer.consume('network_events')

asyncio.run(main())

Event-Driven Architecture Patterns

Pattern 1: Fan-Out (One Event โ†’ Multiple Actions)

# One interface_down event triggers multiple automation tasks
interface_down_event = {
    'device': 'switch-01',
    'interface': 'Eth1/1',
    'timestamp': 1640000000,
}

# All these handlers run in parallel
tasks = [
    handle_interface_down(interface_down_event),
    create_incident_ticket(interface_down_event),
    run_diagnostics(interface_down_event),
    trigger_failover_if_critical(interface_down_event),
]

await asyncio.gather(*tasks)

Pattern 2: Event Correlation (Multiple Events โ†’ One Action)

class EventCorrelator:
    """Correlate related events and trigger remediation only once."""

    def __init__(self):
        self.event_buffer = []
        self.correlation_window = 5  # seconds

    async def correlate_events(self, event):
        """Buffer events and check for correlation."""
        self.event_buffer.append(event)

        # Wait for more events in this time window
        await asyncio.sleep(self.correlation_window)

        # Check if we have a pattern (e.g., interface flap = multiple up/down)
        if self.is_flap(self.event_buffer):
            await handle_interface_flap(self.event_buffer)
        else:
            # Handle as individual events
            for evt in self.event_buffer:
                await handle_single_event(evt)

        self.event_buffer = []

    def is_flap(self, events):
        """Detect if events represent an interface flap."""
        if len(events) < 3:
            return False

        # Check if same interface alternates between up/down/up...
        interface = events[0]['interface']
        states = [e.get('state') for e in events]

        alternating = all(
            states[i] != states[i+1]
            for i in range(len(states) - 1)
        )
        return alternating

Pattern 3: Backpressure Handling (Queue Overload)

import asyncio
from asyncio import BoundedSemaphore

class BackpressureQueue:
    """Queue with backpressure to prevent overload."""

    def __init__(self, max_concurrent=5):
        self.semaphore = BoundedSemaphore(max_concurrent)
        self.queue = asyncio.Queue()

    async def put(self, item):
        """Add item, blocking if at max capacity."""
        await self.queue.put(item)

    async def process(self, handler):
        """Process queue items with concurrency limit."""
        while True:
            item = await self.queue.get()

            # Acquire semaphore slot
            async with self.semaphore:
                try:
                    await handler(item)
                except Exception as e:
                    print(f"Handler failed: {e}")
                finally:
                    self.queue.task_done()

Event Deduplication & Dead-Letter Queues

Deduplication Pattern

from datetime import datetime, timedelta

class EventDeduplicator:
    """Prevent duplicate events from triggering multiple automations."""

    def __init__(self, dedup_window: int = 60):
        """
        Args:
            dedup_window: Time window in seconds to consider events as duplicates
        """
        self.dedup_window = dedup_window
        self.seen_events = {}  # event_hash -> timestamp

    def get_event_hash(self, event):
        """Create a hash representing event identity."""
        return f"{event['device']}:{event['event_type']}:{event.get('interface', '')}"

    async def should_process(self, event):
        """Check if this event has been seen recently."""
        event_hash = self.get_event_hash(event)
        now = datetime.now()

        # Clean up old entries
        self.seen_events = {
            k: v for k, v in self.seen_events.items()
            if (now - v).total_seconds() < self.dedup_window
        }

        # Check if we've seen this event recently
        if event_hash in self.seen_events:
            age = (now - self.seen_events[event_hash]).total_seconds()
            if age < self.dedup_window:
                print(f"Duplicate event (age={age:.1f}s): {event_hash}")
                return False

        # First time seeing this event (or old one expired)
        self.seen_events[event_hash] = now
        return True

Dead-Letter Queue Pattern

class EventProcessor:
    """Process events with dead-letter queue for failed events."""

    def __init__(self, main_queue, dlq):
        self.main_queue = main_queue
        self.dlq = dlq  # Dead-Letter Queue
        self.max_retries = 3

    async def process_with_retry(self, event):
        """Process with retry logic."""
        retries = 0
        while retries < self.max_retries:
            try:
                result = await self.process_event(event)
                return result
            except RecoverableException as e:
                retries += 1
                print(f"Retrying (attempt {retries}): {e}")
                await asyncio.sleep(2 ** retries)  # Exponential backoff
            except UnrecoverableException as e:
                print(f"Unrecoverable error, sending to DLQ: {e}")
                await self.dlq.put({
                    'original_event': event,
                    'error': str(e),
                    'timestamp': datetime.now().isoformat(),
                })
                raise

    async def process_event(self, event):
        """Your actual event processing logic."""
        # Implement your handling
        pass

Monitoring Event-Driven Systems

from prometheus_client import Counter, Histogram, Gauge
import time

# Metrics for event-driven system
events_total = Counter('events_total', 'Total events received', ['event_type', 'status'])
event_processing_duration = Histogram('event_processing_seconds', 'Event processing time', ['event_type'])
queue_depth = Gauge('queue_depth', 'Current queue depth')

async def process_and_measure(event, handler):
    """Process event with metrics collection."""
    event_type = event.get('event_type')
    start = time.time()

    try:
        await handler(event)
        duration = time.time() - start
        events_total.labels(event_type=event_type, status='success').inc()
        event_processing_duration.labels(event_type=event_type).observe(duration)
        print(f"Event processed in {duration:.2f}s")
    except Exception as e:
        events_total.labels(event_type=event_type, status='failure').inc()
        raise

Additional PRIME Practices: Safety, Measurability, and Transparency

  • Log every event โ€” Maintain an immutable event log for audit and replay
  • Monitor queue health โ€” Alert if queue depth grows too large (indicates processing backlog)
  • Use unique event IDs โ€” Correlate events across systems with trace IDs
  • Document event schemas โ€” Define what fields each event contains and why
  • Test failure scenarios โ€” What happens if a handler crashes? Use dead-letter queues
  • Measure automation success โ€” Track success rates, response times, and business impact
  • Integrate with ITSM โ€” Link event automation to change tickets and incident records

PRIME in Action: Safety, Measurability, and Transparency

  • Validate and log every event
  • Monitor for missed, duplicate, or failed events
  • Document event sources, workflows, and outcomes
  • Build dashboards for event rates, automation actions, and success/failure metrics

Summary: Blog Takeaways

  • Event-driven automation enables real-time, scalable, and reliable network operations
  • Use webhooks, message queues, and telemetry for modern workflows
  • Apply advanced patterns for correlation, error handling, and observability
  • PRIME principles ensure safe, measurable, and transparent adoption


๐Ÿ“ฃ Want More?