Skip to content

Structured Logging for Network Automation

Why Structured Logging Matters

Scenario: Your automation failed 3 hours ago. Now your boss asks: "What happened?"

Without Structured Logging:

  • ❌ Logs are scattered across different files
  • ❌ Each script logs differently
  • ❌ Simple grep searches are ineffective
  • ❌ Can't correlate events across devices
  • ❌ Timestamps don't line up
  • ❌ No emergency response capability

With Structured Logging:

  • ✅ Every log is JSON (machine-readable)
  • ✅ Centralized aggregation (ELK, Splunk, CloudWatch)
  • ✅ Powerful queries ("Show me all failures for device X")
  • ✅ Correlation IDs track operations across devices
  • ✅ Automatic alerting on errors
  • ✅ Compliance audit trails

Print statements and unstructured logs don't scale. Structured logging enables production automation.


Architecture: Log Flow

┌─────────────────────┐
│  Your Script        │
│  (Structured logs)  │
└────────┬────────────┘
         ↓ JSON format
┌─────────────────────┐
│  Log Shipper        │
│  (Filebeat/Logstash)│
└────────┬────────────┘
         ↓ HTTP/socket
┌─────────────────────────────┐
│  Log Aggregation            │
│  (Elasticsearch/Splunk/     │
│   CloudWatch)               │
└────────┬────────────────────┘
         ↓ Querying
┌─────────────────────┐
│  Analytics/         │
│  Dashboards         │
│  Alerts             │
└─────────────────────┘

Pattern 1: Structured Logging Basics

The Implementation

# src/structured_logger.py
import json
import logging
import sys
from datetime import datetime
from typing import Any, Dict
import uuid

class StructuredLogger:
    """Structured JSON logging for network automation."""

    def __init__(self, name: str, level=logging.INFO):
        """
        Initialize structured logger.

        Args:
            name: Logger name (usually __name__)
            level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        """
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)

        # Remove default handlers
        self.logger.handlers = []

        # Create JSON formatter
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(JSONFormatter())
        self.logger.addHandler(handler)

        # Unique ID for this execution
        self.correlation_id = str(uuid.uuid4())

    def _add_context(self, extra: Dict[str, Any]) -> Dict[str, Any]:
        """Add standard context to log entry."""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "correlation_id": self.correlation_id,
            **extra
        }

    def info(self, message: str, **context):
        """Log info level with context."""
        self.logger.info(
            message,
            extra=self._add_context(context)
        )

    def warning(self, message: str, **context):
        """Log warning level with context."""
        self.logger.warning(
            message,
            extra=self._add_context(context)
        )

    def error(self, message: str, exception=None, **context):
        """Log error level with context and optional exception."""
        if exception:
            context["exception"] = str(exception)
            context["exception_type"] = type(exception).__name__

        self.logger.error(
            message,
            extra=self._add_context(context),
            exc_info=exception
        )

    def debug(self, message: str, **context):
        """Log debug level with context."""
        self.logger.debug(
            message,
            extra=self._add_context(context)
        )


class JSONFormatter(logging.Formatter):
    """Format logs as JSON for machine parsing."""

    def format(self, record):
        """Convert log record to JSON."""
        log_data = {
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }

        # Add extra fields
        if hasattr(record, "timestamp"):
            log_data["timestamp"] = record.timestamp
        if hasattr(record, "correlation_id"):
            log_data["correlation_id"] = record.correlation_id

        # Add any custom fields from extra={}
        for key, value in record.__dict__.items():
            if key not in ["name", "msg", "args", "created", "filename", "funcName", 
                          "levelname", "levelno", "lineno", "module", "msecs", 
                          "message", "pathname", "process", "processName", "relativeCreated",
                          "thread", "threadName", "exc_info", "exc_text", "stack_info"]:
                log_data[key] = value

        return json.dumps(log_data)

Usage Example

from structured_logger import StructuredLogger

logger = StructuredLogger(__name__)

# Simple log
logger.info("Deployment started", phase="validation")

# Log with context
logger.info(
    "Device configuration retrieved",
    device="router1",
    lines=245,
    duration_ms=1234
)

# Error logging
try:
    connect_to_device("10.0.0.1")
except Exception as e:
    logger.error(
        "Failed to connect to device",
        device="router1",
        exception=e
    )

# Output (pretty-printed):
# {
#   "level": "INFO",
#   "logger": "__main__",
#   "message": "Device configuration retrieved",
#   "timestamp": "2026-03-04T14:23:45.123456",
#   "correlation_id": "a1b2c3d4-e5f6-4g7h-8i9j-0k1l2m3n4o5p",
#   "device": "router1",
#   "lines": 245,
#   "duration_ms": 1234
# }

Pattern 2: Contextual Logging Across Operations

The Problem

When deploying to 50 devices, how do you track which device failed?

1
2
3
4
5
6
# ❌ BAD - No context
for device in devices:
    configure_device(device)

# Logs show errors but not WHICH device
# Result: "ERROR: Configuration failed"

The Solution: Operation Context

# ✅ GOOD - Rich context
logger = StructuredLogger(__name__)

for device in devices:
    logger.info(
        "Starting device configuration",
        device=device.host,
        sequence=devices.index(device) + 1,
        total_devices=len(devices)
    )

    try:
        configure_device(device)
        logger.info(
            "Device configuration completed",
            device=device.host,
            status="success"
        )
    except Exception as e:
        logger.error(
            "Device configuration failed",
            device=device.host,
            status="failed",
            exception=e
        )
        # Later, can query: "Show all failures for router1"

Implementation

# src/contextual_logging.py
from contextlib import contextmanager

class OperationLogger:
    """Logger with automatic operation context."""

    def __init__(self, logger: StructuredLogger):
        self.logger = logger
        self.operation_stack = []

    @contextmanager
    def operation(self, operation_name: str, **context):
        """
        Context manager for logging operations.

        Example:
            with op_logger.operation("deploy_to_device", device="router1"):
                configure_device()
                # All logs within this block include device context
        """
        # Add operation to stack
        self.operation_stack.append({
            "operation": operation_name,
            **context
        })

        try:
            self.logger.info(
                f"{operation_name} started",
                **context
            )
            yield

            self.logger.info(
                f"{operation_name} completed",
                status="success",
                **context
            )

        except Exception as e:
            self.logger.error(
                f"{operation_name} failed",
                status="failed",
                exception=e,
                **context
            )
            raise

        finally:
            self.operation_stack.pop()

Usage with Nornir

# src/nornir_logging_task.py
from nornir import InitNornir
from nornir.core.task import Task, Result
from contextual_logging import OperationLogger
from structured_logger import StructuredLogger

logger = StructuredLogger(__name__)
op_logger = OperationLogger(logger)

def deploy_with_logging(task: Task) -> Result:
    """Nornir task with structured logging."""
    with op_logger.operation("device_deployment", device=task.host.name):
        device = task.host.get_connection("netmiko")

        with op_logger.operation("get_current_state", device=task.host.name):
            current_config = device.send_command("show running-config")
            logger.debug(
                "Current configuration retrieved",
                device=task.host.name,
                config_lines=len(current_config.split('\n'))
            )

        with op_logger.operation("apply_changes", device=task.host.name):
            device.send_command("configure terminal")
            device.send_command("hostname " + task.host.name)
            device.send_command("end")
            logger.info(
                "Configuration changes applied",
                device=task.host.name
            )

        return Result(
            host=task.host,
            result="Deployment successful"
        )

# Usage
nr = InitNornir(config_file="config.yaml")
results = nr.run(task=deploy_with_logging)

Pattern 3: Performance Metrics Logging

The Implementation

# src/metrics_logger.py
import time
from structured_logger import StructuredLogger

class MetricsLogger:
    """Log operation metrics (duration, throughput, etc.)."""

    def __init__(self, logger: StructuredLogger):
        self.logger = logger

    def log_duration(self, operation: str, duration_ms: float, **context):
        """Log operation duration."""
        self.logger.info(
            f"{operation} duration",
            operation=operation,
            duration_ms=f"{duration_ms:.2f}",
            **context
        )

    def log_throughput(self, operation: str, items: int, duration_ms: float, **context):
        """Log throughput (items per second)."""
        throughput = (items / duration_ms) * 1000 if duration_ms > 0 else 0

        self.logger.info(
            f"{operation} throughput",
            operation=operation,
            items=items,
            duration_ms=f"{duration_ms:.2f}",
            throughput_per_sec=f"{throughput:.2f}",
            **context
        )

    def time_operation(self, operation: str, **context):
        """Context manager to automatically time operations."""
        class Timer:
            def __enter__(timer_self):
                timer_self.start = time.time()
                return timer_self

            def __exit__(timer_self, *args):
                duration = (time.time() - timer_self.start) * 1000  # ms
                self.log_duration(operation, duration, **context)

        return Timer()

Usage

logger = StructuredLogger(__name__)
metrics = MetricsLogger(logger)

# Automatic timing
with metrics.time_operation("device_configuration", device="router1"):
    configure_device(device)

# Manual metrics  
start = time.time()
devices_processed = 0
for device in devices:
    deploy_to_device(device)
    devices_processed += 1
duration = (time.time() - start) * 1000

metrics.log_throughput(
    "fleet_deployment",
    items=devices_processed,
    duration_ms=duration,
    total_devices=len(devices)
)

Pattern 4: Log Aggregation Integration

Sending Logs to Elasticsearch

# src/elasticsearch_logger.py
import json
from elasticsearch import Elasticsearch
from structured_logger import StructuredLogger

class ElasticsearchHandler(logging.Handler):
    """Send logs to Elasticsearch."""

    def __init__(self, hosts: list, index_name: str = "network-automation"):
        """
        Initialize Elasticsearch handler.

        Args:
            hosts: Elasticsearch hosts (e.g., ["localhost:9200"])
            index_name: Index name for logs
        """
        super().__init__()
        self.es = Elasticsearch(hosts=hosts)
        self.index_name = index_name

    def emit(self, record):
        """Send log record to Elasticsearch."""
        try:
            log_data = json.loads(self.format(record))

            # Add timestamp for Elasticsearch
            log_data["@timestamp"] = log_data.get("timestamp", datetime.utcnow().isoformat())

            # Send to Elasticsearch
            self.es.index(
                index=f"{self.index_name}-{datetime.utcnow():%Y.%m.%d}",
                doc_type="_doc",
                body=log_data
            )
        except Exception as e:
            self.handleError(record)

Setup

# In your application startup
logger = StructuredLogger(__name__)

# Add Elasticsearch handler
es_handler = ElasticsearchHandler(hosts=["localhost:9200"])
es_handler.setFormatter(JSONFormatter())
logger.logger.addHandler(es_handler)

# Now all logs are sent to Elasticsearch
logger.info("Application started")

Elasticsearch Queries

# Find all failures for device router1
GET network-automation-*/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"device": "router1"}},
        {"term": {"level": "ERROR"}}
      ]
    }
  }
}

# Find slow operations
GET network-automation-*/_search
{
  "query": {
    "range": {
      "duration_ms": {"gte": 5000}
    }
  }
}

# Group errors by device
GET network-automation-*/_search
{
  "aggs": {
    "errors_by_device": {
      "terms": {"field": "device"}
    }
  }
}

Pattern 5: Audit Logging for Compliance

# src/audit_logger.py
from structured_logger import StructuredLogger

class AuditLogger:
    """Compliance-grade audit logging."""

    def __init__(self):
        self.logger = StructuredLogger("audit", level=logging.INFO)

    def log_action(self, action: str, actor: str, resource: str, result: str, **context):
        """
        Log an action for compliance.

        Args:
            action: What was done (e.g., "configure_vlan")
            actor: Who did it (user, service account)
            resource: What was affected (device, interface)
            result: success or failure
        """
        self.logger.info(
            f"Action: {action}",
            action=action,
            actor=actor,
            resource=resource,
            result=result,
            **context
        )

    def log_configuration_change(self, device: str, change_type: str, change_details: dict):
        """Log configuration change."""
        self.logger.info(
            "Configuration change",
            device=device,
            change_type=change_type,
            change_details=json.dumps(change_details),
            audit_required=True
        )

    def log_authentication(self, user: str, resource: str, success: bool):
        """Log authentication attempt."""
        self.logger.info(
            "Authentication attempt",
            user=user,
            resource=resource,
            success=success,
            audit_required=True
        )

Usage

audit = AuditLogger()

# Log before making changes
audit.log_action(
    action="deploy_vlan_config",
    actor="automation_service",
    resource="router1",
    result="pending",
    vlan_ids=[100, 101, 102]
)

# Deploy configuration...

# Log after completion
audit.log_action(
    action="deploy_vlan_config",
    actor="automation_service",
    resource="router1",
    result="success",
    vlan_ids=[100, 101, 102],
    duration_ms=1234
)

Best Practices

1. Log at the Right Level

# ✅ GOOD
logger.debug("Verbose details for developers")
logger.info("Normal operations")
logger.warning("Something might be wrong")
logger.error("Something is definitely wrong")

# ❌ BAD - Everything at INFO
logger.info("Detailed debug info")
logger.info("Normal operation")
logger.info("ERROR happening here")

2. Include Actionable Context

# ✅ GOOD - Easy to find and fix
logger.error(
    "Device configuration failed",
    device="router1",
    module="bgp",
    expected_neighbors=3,
    actual_neighbors=1
)

# ❌ BAD - Vague
logger.error("Configuration failed")

3. Never Log Sensitive Data

1
2
3
4
5
# ❌ ABSOLUTELY NOT
logger.info(f"Connecting with username={username} password={password}")

# ✅ GOOD
logger.info("Authenticating to device", device=host)

4. Use Correlation IDs

# All logs from one operation get same correlation_id
logger = StructuredLogger(__name__)

# Correlation ID automatically injected
logger.info("Starting deployment")
logger.info("Device 1 configured")
logger.info("Device 2 configured")

# Later: Query all logs for one deployment
# GET logs where correlation_id = "..."

Summary

Concept Why It Matters
Structured JSON Machines can parse, aggregate, and analyze
Correlation IDs Track operations across multiple devices
Log Levels Find important information quickly
Context Know which device, what operation, when
Aggregation Centralized visibility across all automation
Audit Trail Compliance and forensics

Structured logging transforms logs from debugging tools into operational intelligence.


Next Steps