Best PracticesBlogEvent-DrivenMessage QueuesNetwork AutomationPRIME FrameworkWebhooks
Event-Driven Automation in the Network
Event-Driven Automation in the Network: Webhooks, Message Queues, and Real-Time Response¶
This post is part of our ongoing series on network automation best practices, grounded in the PRIME Framework and PRIME Philosophy.
Transparency Note
Examples, scenarios, and any outcome figures in this article are provided for education and are based on enterprise delivery experience or anonymised composite scenarios unless explicitly identified as direct Nautomation Prime client outcomes.
Polling is slow and inefficient. Event-driven automation enables real-time response to network changes. This post covers webhooks, message queues, and how to build event-driven workflows with the PRIME Framework.
fromflaskimportFlask,requestimportloggingapp=Flask(__name__)logging.basicConfig(level=logging.INFO)@app.route('/webhook',methods=['POST'])defwebhook():data=request.jsonlogging.info(f"Received event: {data}")# Trigger automation based on event typeifdata.get('event_type')=='interface_down':remediate_interface(data['device'],data['interface'])return'',200defremediate_interface(device,interface):# Example: Push config or open ticketlogging.info(f"Remediating {device}{interface}")
Example 2: Consuming Events from Kafka (Advanced)¶
# One interface_down event triggers multiple automation tasksinterface_down_event={'device':'switch-01','interface':'Eth1/1','timestamp':1640000000,}# All these handlers run in paralleltasks=[handle_interface_down(interface_down_event),create_incident_ticket(interface_down_event),run_diagnostics(interface_down_event),trigger_failover_if_critical(interface_down_event),]awaitasyncio.gather(*tasks)
Pattern 2: Event Correlation (Multiple Events โ One Action)¶
classEventCorrelator:"""Correlate related events and trigger remediation only once."""def__init__(self):self.event_buffer=[]self.correlation_window=5# secondsasyncdefcorrelate_events(self,event):"""Buffer events and check for correlation."""self.event_buffer.append(event)# Wait for more events in this time windowawaitasyncio.sleep(self.correlation_window)# Check if we have a pattern (e.g., interface flap = multiple up/down)ifself.is_flap(self.event_buffer):awaithandle_interface_flap(self.event_buffer)else:# Handle as individual eventsforevtinself.event_buffer:awaithandle_single_event(evt)self.event_buffer=[]defis_flap(self,events):"""Detect if events represent an interface flap."""iflen(events)<3:returnFalse# Check if same interface alternates between up/down/up...interface=events[0]['interface']states=[e.get('state')foreinevents]alternating=all(states[i]!=states[i+1]foriinrange(len(states)-1))returnalternating
importasynciofromasyncioimportBoundedSemaphoreclassBackpressureQueue:"""Queue with backpressure to prevent overload."""def__init__(self,max_concurrent=5):self.semaphore=BoundedSemaphore(max_concurrent)self.queue=asyncio.Queue()asyncdefput(self,item):"""Add item, blocking if at max capacity."""awaitself.queue.put(item)asyncdefprocess(self,handler):"""Process queue items with concurrency limit."""whileTrue:item=awaitself.queue.get()# Acquire semaphore slotasyncwithself.semaphore:try:awaithandler(item)exceptExceptionase:print(f"Handler failed: {e}")finally:self.queue.task_done()
fromdatetimeimportdatetime,timedeltaclassEventDeduplicator:"""Prevent duplicate events from triggering multiple automations."""def__init__(self,dedup_window:int=60):""" Args: dedup_window: Time window in seconds to consider events as duplicates """self.dedup_window=dedup_windowself.seen_events={}# event_hash -> timestampdefget_event_hash(self,event):"""Create a hash representing event identity."""returnf"{event['device']}:{event['event_type']}:{event.get('interface','')}"asyncdefshould_process(self,event):"""Check if this event has been seen recently."""event_hash=self.get_event_hash(event)now=datetime.now()# Clean up old entriesself.seen_events={k:vfork,vinself.seen_events.items()if(now-v).total_seconds()<self.dedup_window}# Check if we've seen this event recentlyifevent_hashinself.seen_events:age=(now-self.seen_events[event_hash]).total_seconds()ifage<self.dedup_window:print(f"Duplicate event (age={age:.1f}s): {event_hash}")returnFalse# First time seeing this event (or old one expired)self.seen_events[event_hash]=nowreturnTrue
classEventProcessor:"""Process events with dead-letter queue for failed events."""def__init__(self,main_queue,dlq):self.main_queue=main_queueself.dlq=dlq# Dead-Letter Queueself.max_retries=3asyncdefprocess_with_retry(self,event):"""Process with retry logic."""retries=0whileretries<self.max_retries:try:result=awaitself.process_event(event)returnresultexceptRecoverableExceptionase:retries+=1print(f"Retrying (attempt {retries}): {e}")awaitasyncio.sleep(2**retries)# Exponential backoffexceptUnrecoverableExceptionase:print(f"Unrecoverable error, sending to DLQ: {e}")awaitself.dlq.put({'original_event':event,'error':str(e),'timestamp':datetime.now().isoformat(),})raiseasyncdefprocess_event(self,event):"""Your actual event processing logic."""# Implement your handlingpass