Best PracticesBlogEmerging TechPRIME FrameworkStreaming Telemetry
Streaming Telemetry in Network Automation
Streaming Telemetry in Network Automation: Real-Time Data for Modern Operations¶
This post is part of our ongoing series on network automation best practices, grounded in the PRIME Framework and PRIME Philosophy.
Transparency Note
Examples, scenarios, and any outcome figures in this article are provided for education and are based on enterprise delivery experience or anonymised composite scenarios unless explicitly identified as direct Nautomation Prime client outcomes.
SNMP and CLI scraping are no longer enough. Streaming telemetry provides real-time, structured data for modern network automation. This post introduces the concept, benefits, and practical steps to get started.
importasyncioimportjsonfromdataclassesimportdataclassimportlogginglogging.basicConfig(level=logging.INFO)logger=logging.getLogger(__name__)@dataclassclassTelemetryEvent:timestamp:floatdevice:strinterface:strmetric:strvalue:intasyncdefprocess_telemetry_stream(telemetry_queue):"""Process incoming telemetry and trigger automation."""threshold_config={'in_errors':100,'out_errors':100,'input_queue_drops':50,'queue_depth':200,}whileTrue:event=awaittelemetry_queue.get()# Parse telemetrytry:metric_name=event.metricvalue=event.valuedevice=event.deviceinterface=event.interfaceexceptAttributeError:logger.error(f"Invalid telemetry event: {event}")continue# Check against thresholdsthreshold=threshold_config.get(metric_name)ifthresholdandvalue>threshold:logger.warning(f"ALERT: {device}{interface}{metric_name}={value} (threshold={threshold})")# Trigger remediationawaitremediate_interface(device,interface,metric_name,value)telemetry_queue.task_done()asyncdefremediate_interface(device,interface,metric_name,value):"""Attempt automated remediation."""remediation_actions={'in_errors':lambdad,i:bounce_interface(d,i),'input_queue_drops':lambdad,i:increase_queue_buffer(d,i),'queue_depth':lambdad,i:apply_qos(d,i),}action=remediation_actions.get(metric_name)ifaction:try:result=awaitaction(device,interface)logger.info(f"Remediation successful: {device}{interface}")awaitverify_remediation(device,interface,metric_name)exceptExceptionase:logger.error(f"Remediation failed: {e}")awaitalert_human_ops(device,interface,metric_name,value)asyncdefverify_remediation(device,interface,metric_name,max_retries=3):"""Verify that remediation actually fixed the issue."""forattemptinrange(max_retries):awaitasyncio.sleep(5)# Wait 5 seconds before checkingcurrent_value=awaitget_current_metric(device,interface,metric_name)ifcurrent_value<50:# Back to normallogger.info(f"Remediation verified on {device}{interface}")returnTruelogger.error(f"Remediation could not be verified on {device}{interface}")returnFalse
importasynciofrompygnmi.clientimportgNMIclientimportjsonasyncdefcollect_telemetry_async(device_ip,username,password):"""Collect telemetry from gNMI-enabled device asynchronously."""asyncwithgNMIclient(target=(device_ip,57400),username=username,password=password,insecure=True,# Enable SSL if needed: insecure=False, cert_file='cert.pem', key_file='key.pem')asgc:# Subscribe to interface counterssubscription={'path':'/interfaces/interface/state/counters','mode':'sample','sample_interval':10000000000# 10 seconds in nanoseconds}try:asyncfortelemetryingc.subscribe(subscriptions=[subscription],mode='stream',encoding='json_ietf'):# Process each telemetry updateawaitprocess_telemetry(telemetry)exceptExceptionase:print(f"Error: {e}")asyncdefprocess_telemetry(telemetry_msg):"""Parse and store telemetry data."""try:# Extract data from gNMI messageif'update'intelemetry_msg:forupdateintelemetry_msg['update']['update']:path=update['path']value=update['val']['json_val']# Store in time-series databaseawaitstore_in_influx(path,value)# Check for alertsawaitcheck_thresholds(path,value)exceptExceptionase:print(f"Failed to process telemetry: {e}")# Main event loopasyncdefmain():devices=[{'ip':'10.0.0.1','username':'admin','password':'pass'},{'ip':'10.0.0.2','username':'admin','password':'pass'},]tasks=[collect_telemetry_async(d['ip'],d['username'],d['password'])fordindevices]awaitasyncio.gather(*tasks)asyncio.run(main())
asyncdefcheck_thresholds_dynamic(device,metric_value):"""Use recent baseline to detect anomalies (not just static thresholds)."""# Get baseline from last 1 hourbaseline=awaitget_metric_baseline(device,hours=1)mean=baseline['mean']stddev=baseline['stddev']# Detect if current value is 3+ standard deviations from meanz_score=abs((metric_value-mean)/stddev)ifz_score>3:awaitalert_ops(f"Anomaly detected on {device}: z-score={z_score:.2f}")
Advanced Patterns: Scaling, Security, and Observability¶
# Produce telemetry to Kafka for distributed processingfromkafkaimportKafkaProducerimportjsonproducer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambdav:json.dumps(v).encode('utf-8'))asyncdefstream_telemetry_to_kafka(telemetry_queue):"""Buffer telemetry in Kafka for processing."""whileTrue:event=awaittelemetry_queue.get()producer.send('network_telemetry',value=event)# Consume from Kafka for processingfromkafkaimportKafkaConsumerdefprocess_telemetry_from_kafka():consumer=KafkaConsumer('network_telemetry',bootstrap_servers=['localhost:9092'],value_deserializer=lambdam:json.loads(m.decode('utf-8')))formessageinconsumer:event=message.valueprocess_event(event)
# Secure gNMI example with certificatesgc=gNMIclient(target=('10.0.0.1',57400),username='admin',password='pass',insecure=False,# Enable TLScert_file='/path/to/client.crt',key_file='/path/to/client.key',ca_certs='/path/to/ca.crt')
Additional PRIME Practices: Measurability, Safety, and Transparency¶
Establish baselines โ Collect 1-2 weeks of normal telemetry before alerting