Incident Response Automation
Why Automated Response Matters¶
Scenario: BGP session flaps at 2 AM.
Manual response:
- Alert fires at 2 AM
- Engineer gets paged
- Waits 15-30 minutes for engineer to respond
- Engineer investigates (15 minutes)
- Engineer identifies cause (10 minutes)
- Engineer applies fix (10 minutes)
- Total: 60-90 minutes of outage
With automated response:
- Alert fires at 2 AM
- Automated system identifies flapping pattern
- Runs diagnostics in parallel
- Identifies isolated peer with bad BGP config
- Withdraws peer routes, updates peer config, re-enables
- Total: 90 seconds of outage
- Engineer reviews incident history next morning
Automated response reduces MTTR (Mean Time To Repair) by 98%.
Pattern 1: Event Detection Engine¶
The Implementation¶
# src/incident_detector.py
from dataclasses import dataclass
from typing import Dict, List, Set, Callable
from datetime import datetime
from enum import Enum
import json
class SeverityLevel(Enum):
"""Incident severity."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFORMATIONAL = "info"
class RemediationType(Enum):
"""Type of remediation available."""
AUTOMATIC = "automatic" # Can be fixed without approval
MANUAL = "manual" # Requires engineer approval
ESCALATE = "escalate" # Escalate to NOC
@dataclass
class Incident:
"""Detected incident."""
id: str
name: str
severity: SeverityLevel
device: str
symptoms: Dict[str, any]
detected_at: datetime
remediation_type: RemediationType
likely_causes: List[str]
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"severity": self.severity.value,
"device": self.device,
"symptoms": self.symptoms,
"detected_at": self.detected_at.isoformat(),
"remediation_type": self.remediation_type.value,
"likely_causes": self.likely_causes
}
class PatternMatcher:
"""Detect incidents from device metrics and events."""
def __init__(self):
self.patterns = {}
self.incident_counter = 0
def register_pattern(
self,
name: str,
check_func: Callable,
severity: SeverityLevel,
remediation_type: RemediationType,
likely_causes: List[str] = None
):
"""
Register pattern to detect.
Args:
name: Pattern name
check_func: Function to detect pattern (returns True if found)
severity: Incident severity
remediation_type: Type of remediation available
likely_causes: List of possible root causes
"""
self.patterns[name] = {
"check_func": check_func,
"severity": severity,
"remediation_type": remediation_type,
"likely_causes": likely_causes or []
}
def detect(self, device: str, metrics: Dict) -> List[Incident]:
"""
Check all patterns against device metrics.
Args:
device: Device name
metrics: Device metrics/state
Returns:
List of detected incidents
"""
incidents = []
for pattern_name, pattern in self.patterns.items():
try:
if pattern["check_func"](metrics):
incident = Incident(
id=f"INC-{self.incident_counter:05d}",
name=pattern_name,
severity=pattern["severity"],
device=device,
symptoms=metrics,
detected_at=datetime.now(),
remediation_type=pattern["remediation_type"],
likely_causes=pattern["likely_causes"]
)
incidents.append(incident)
self.incident_counter += 1
except Exception as e:
# Log pattern check failure, continue
print(f"โ Pattern '{pattern_name}' check failed: {e}")
return incidents
Pattern Registration Examples¶
# src/patterns.py
from incident_detector import PatternMatcher, SeverityLevel, RemediationType
detector = PatternMatcher()
# Pattern: BGP session flapping
detector.register_pattern(
name="BGP Session Flapping",
check_func=lambda m: (
m.get("bgp_session_changes_last_hour", 0) > 10 and
m.get("bgp_session_state") != "Established"
),
severity=SeverityLevel.HIGH,
remediation_type=RemediationType.AUTOMATIC,
likely_causes=[
"BGP configuration mismatch",
"Network connectivity issue",
"Peer interface bouncing",
"CPU overload"
]
)
# Pattern: High interface error rate
detector.register_pattern(
name="High Interface Error Rate",
check_func=lambda m: (
m.get("interface_crc_errors", 0) > 100 or
m.get("interface_errors_percent", 0) > 5
),
severity=SeverityLevel.CRITICAL,
remediation_type=RemediationType.MANUAL,
likely_causes=[
"Duplex mismatch",
"Defective cable",
"Port flapping",
"Controller issue"
]
)
# Pattern: Low memory
detector.register_pattern(
name="Low Available Memory",
check_func=lambda m: m.get("available_memory_percent", 100) < 10,
severity=SeverityLevel.HIGH,
remediation_type=RemediationType.MANUAL,
likely_causes=[
"Memory leak in process",
"Process consuming too much memory",
"Device needs restart"
]
)
# Pattern: Reachability loss
detector.register_pattern(
name="Device Unreachable",
check_func=lambda m: not m.get("reachable", True),
severity=SeverityLevel.CRITICAL,
remediation_type=RemediationType.ESCALATE,
likely_causes=[
"Network connectivity issue",
"Device down",
"Routing issues",
"Management interface failure"
]
)
Pattern 2: Automatic Remediation Engine¶
# src/remediation.py
from dataclasses import dataclass
from typing import Callable, Dict, any
from datetime import datetime
from incident_detector import Incident, RemediationType
@dataclass
class RemediationAction:
"""Single remediation step."""
name: str
func: Callable
validate_func: Callable = None # Verify remediation worked
async def execute(self) -> bool:
"""Execute remediation and validate."""
try:
print(f" Executing: {self.name}")
await self.func()
if self.validate_func:
if await self.validate_func():
print(f" โ {self.name} successful")
return True
else:
print(f" โ {self.name} validation failed")
return False
return True
except Exception as e:
print(f" โ {self.name} failed: {e}")
return False
class RemediationRunbook:
"""Define remediation steps for an incident type."""
def __init__(self, incident_name: str):
self.incident_name = incident_name
self.actions = []
self.rollback_actions = []
def add_action(
self,
name: str,
func: Callable,
validate_func: Callable = None,
reversible: bool = True
):
"""Add remediation action."""
action = RemediationAction(name, func, validate_func)
self.actions.append(action)
if reversible:
self.rollback_actions.insert(0, action)
async def execute(self, incident: Incident) -> Dict[str, any]:
"""
Execute all remediation actions.
Args:
incident: Incident to remediate
Returns:
dict with execution results and success status
"""
print(f"\n๐ง Remediating: {self.incident_name}")
print(f" Device: {incident.device}")
print(f" Likely causes: {', '.join(incident.likely_causes)}\n")
results = {
"incident_id": incident.id,
"actions_taken": [],
"success": True,
"timestamp": datetime.now().isoformat()
}
for action in self.actions:
success = await action.execute()
results["actions_taken"].append({
"name": action.name,
"success": success
})
if not success:
results["success"] = False
print(f"\nโ Remediation failed at: {action.name}")
print(f" Rolling back to previous state...")
# Execute rollback
for rollback_action in self.rollback_actions:
print(f" Undoing: {rollback_action.name}")
try:
await rollback_action.func()
except:
pass
break
return results
Remediation Runbook Examples¶
# src/remediation_runbooks.py
from remediation import RemediationRunbook
from netmiko import ConnectHandler
# BGP Flapping Remediation
bgp_flapping_runbook = RemediationRunbook("BGP Session Flapping")
async def clear_bgp_session(device):
"""Clear and re-establish BGP session."""
conn = ConnectHandler(**device)
conn.send_command("clear ip bgp * soft")
conn.disconnect()
async def verify_bgp_stable(device):
"""Verify BGP session is stable."""
conn = ConnectHandler(**device)
output = conn.send_command("show ip bgp summary")
# Check for stable state
conn.disconnect()
return "Established" in output
bgp_flapping_runbook.add_action(
name="Clear BGP session",
func=lambda: clear_bgp_session(device_dict),
validate_func=lambda: verify_bgp_stable(device_dict),
reversible=True
)
# Interface Error Remediation
interface_error_runbook = RemediationRunbook("High Interface Error Rate")
async def restart_interface(device, interface):
"""Cycle interface power."""
conn = ConnectHandler(**device)
conn.send_config_set([
f"interface {interface}",
"shutdown",
"no shutdown"
])
conn.disconnect()
async def verify_interface_healthy(device, interface):
"""Verify interface is up and healthy."""
conn = ConnectHandler(**device)
output = conn.send_command(f"show interface {interface}")
conn.disconnect()
return "up" in output.lower() and "crc" not in output.lower()
interface_error_runbook.add_action(
name="Restart interface",
func=lambda: restart_interface(device, interface),
validate_func=lambda: verify_interface_healthy(device, interface),
reversible=True
)
Pattern 3: Incident Tracking & History¶
# src/incident_tracker.py
import json
from datetime import datetime
from pathlib import Path
from incident_detector import Incident
class IncidentTracker:
"""Track incidents and remediation history."""
def __init__(self, history_file: str = "incidents.jsonl"):
self.history_file = Path(history_file)
def record_incident(self, incident: Incident):
"""Log detected incident."""
with open(self.history_file, "a") as f:
record = {
"type": "incident_detected",
"timestamp": datetime.now().isoformat(),
"incident": incident.to_dict()
}
f.write(json.dumps(record) + "\n")
def record_remediation(self, incident_id: str, result: dict):
"""Log remediation attempt."""
with open(self.history_file, "a") as f:
record = {
"type": "remediation_executed",
"timestamp": datetime.now().isoformat(),
"incident_id": incident_id,
"result": result
}
f.write(json.dumps(record) + "\n")
def get_incident_history(self, device: str = None) -> List[dict]:
"""Get incident history for device."""
if not self.history_file.exists():
return []
incidents = []
with open(self.history_file, "r") as f:
for line in f:
record = json.loads(line)
# Filter by device if specified
if device and record["type"] == "incident_detected":
if record["incident"]["device"] != device:
continue
incidents.append(record)
return incidents
def get_mttr(self, days: int = 7) -> float:
"""
Calculate Mean Time To Repair.
Args:
days: Number of days to analyze
Returns:
Average repair time in minutes
"""
incidents = self.get_incident_history()
incident_times = {}
for record in incidents:
if record["type"] == "incident_detected":
incident_id = record["incident"]["id"]
incident_times[incident_id] = {
"detected": record["timestamp"]
}
elif record["type"] == "remediation_executed":
incident_id = record["incident_id"]
if incident_id in incident_times:
incident_times[incident_id]["remediated"] = record["timestamp"]
# Calculate repair time for incidents with remediation
repair_times = []
for inc_id, times in incident_times.items():
if "remediated" in times:
detected = datetime.fromisoformat(times["detected"])
remediated = datetime.fromisoformat(times["remediated"])
repair_time = (remediated - detected).total_seconds() / 60
repair_times.append(repair_time)
if repair_times:
return sum(repair_times) / len(repair_times)
return 0
Pattern 4: Adaptive Response Based on History¶
# src/adaptive_remediation.py
from incident_tracker import IncidentTracker
class AdaptiveRemediationEngine:
"""Learn from incident history and adjust remediation."""
def __init__(self, tracker: IncidentTracker):
self.tracker = tracker
self.success_rates = {}
def calculate_success_rate(self, remediation_name: str) -> float:
"""Calculate success rate of remediation type."""
incidents = self.tracker.get_incident_history()
total = 0
successful = 0
for record in incidents:
if record["type"] == "remediation_executed":
for action in record["result"].get("actions_taken", []):
if action["name"] == remediation_name:
total += 1
if action["success"]:
successful += 1
if total == 0:
return 0.0
return (successful / total) * 100
def select_remediation_strategy(
self,
incident_name: str,
available_strategies: List[RemediationRunbook]
) -> RemediationRunbook:
"""
Select best remediation based on historical success.
Args:
incident_name: Type of incident
available_strategies: Possible remediation approaches
Returns:
Best remediation strategy based on history
"""
strategy_scores = {}
for strategy in available_strategies:
success_rate = self.calculate_success_rate(strategy.incident_name)
strategy_scores[strategy.incident_name] = success_rate
# Return strategy with highest success rate
best_strategy = max(
available_strategies,
key=lambda s: strategy_scores.get(s.incident_name, 0)
)
print(f"Selected strategy: {best_strategy.incident_name} "
f"(success rate: {strategy_scores[best_strategy.incident_name]:.1f}%)")
return best_strategy
Best Practices¶
1. Separate Automatic vs Manual Remediation¶
# โ
GOOD - Clear separation
case SeverityLevel.LOW:
await automatic_remediation.execute(incident)
case SeverityLevel.CRITICAL:
await escalate_to_noc(incident)
# โ BAD - Auto-fix critical issues
if incident.severity == SeverityLevel.CRITICAL:
await automatic_fix(incident) # Too risky!
2. Always Validate Before and After¶
# โ
GOOD - Comprehensive validation
async def remediate():
# Validate current state
if not await validate_incident():
return False
# Apply fix
await fix_issue()
# Validate fix worked
if not await validate_fix():
await rollback()
return False
return True
# โ BAD - No validation
async def remediate():
await fix_issue() # Hope it works
3. Design for Rollback¶
# โ
GOOD - Keep rollback capability
actions = [
RemediationAction(
"update_config",
func=update_config,
reversible=True # Can rollback
)
]
# โ BAD - No way to undo
async def fix_issue():
delete_old_config() # Can't rollback
apply_new_config()
restart_service()
4. Track and Learn¶
# โ
GOOD - Record all activity
tracker.record_incident(incident)
result = await remediation.execute(incident)
tracker.record_remediation(incident.id, result)
# Analyze and improve
mttr = tracker.get_mttr()
print(f"MTTR: {mttr:.1f} minutes")
# โ BAD - Silent failures
await remediation.execute(incident) # No record of what happened
Production Deployment Example¶
# src/automation_engine.py
import asyncio
from netmiko import ConnectHandler
from incident_detector import PatternMatcher
from remediation import RemediationRunbook
from incident_tracker import IncidentTracker
class AutomationEngine:
"""Main orchestration for incident detection and response."""
def __init__(self, devices: List[dict]):
self.devices = devices
self.detector = PatternMatcher()
self.tracker = IncidentTracker()
self.runbooks = {}
# Register patterns
self._register_patterns()
# Register runbooks
self._register_runbooks()
def _register_patterns(self):
"""Register all incident detection patterns."""
# BGP flapping
self.detector.register_pattern(
name="BGP Session Flapping",
check_func=lambda m: (
m.get("bgp_changes_hour", 0) > 10 and
not m.get("bgp_established", True)
),
severity=SeverityLevel.HIGH,
remediation_type=RemediationType.AUTOMATIC
)
# ... more patterns
def _register_runbooks(self):
"""Register remediation runbooks."""
bgp_runbook = RemediationRunbook("BGP Session Flapping")
# ... configure runbook
self.runbooks["BGP Session Flapping"] = bgp_runbook
async def check_device(self, device: dict) -> List[Incident]:
"""Collect metrics and detect incidents."""
try:
conn = ConnectHandler(**device)
metrics = {
"reachable": True,
"bgp_changes_hour": self._count_bgp_changes(conn),
"bgp_established": self._check_bgp_established(conn),
"interface_errors": self._get_interface_errors(conn)
}
conn.disconnect()
incidents = self.detector.detect(device["host"], metrics)
return incidents
except Exception as e:
return [Incident(
id="INC-UNREACHABLE",
name="Device Unreachable",
severity=SeverityLevel.CRITICAL,
device=device["host"],
symptoms={"error": str(e)},
detected_at=datetime.now(),
remediation_type=RemediationType.ESCALATE,
likely_causes=["Network down", "Device down"]
)]
async def run(self, check_interval: int = 60):
"""Main event loop."""
print("Starting incident response engine...")
while True:
print(f"\n[{datetime.now()}] Checking devices...")
# Check all devices
all_incidents = []
for device in self.devices:
incidents = await self.check_device(device)
all_incidents.extend(incidents)
# Process incidents
for incident in all_incidents:
print(f"\n{incident.severity.value.upper()}: {incident.name} on {incident.device}")
self.tracker.record_incident(incident)
# Only auto-remediate lower severity
if incident.remediation_type == RemediationType.AUTOMATIC:
if incident.name in self.runbooks:
runbook = self.runbooks[incident.name]
result = await runbook.execute(incident)
self.tracker.record_remediation(incident.id, result)
else:
print(f" โ Escalating to NOC: {incident.likely_causes}")
# Wait for next check
await asyncio.sleep(check_interval)
Summary¶
Incident automation provides:
- Detection โ Pattern matching identifies problems
- Response โ Automatic remediation for known issues
- History โ Track what happened and why
- Learning โ Improve over time based on results
- Visibility โ Know MTTR and system health
Related Patterns¶
- Testing Patterns โ Test remediation behaviours
- Health Checks โ Detect issues early
- Circuit Breakers โ Prevent cascade failures
Need help applying this in a live Cisco environment?
If you want this pattern implemented, governed, or adapted for your estate, use the contact page to start a discovery conversation or review how Nautomation Prime delivers engagements.