# src/state_management.pyimportjsonfromdatetimeimportdatetimefromnetmikoimportConnectHandlerclassDeviceStateManager:"""Capture and compare device states."""def__init__(self,device):""" Initialize with a connected device. Args: device: Netmiko ConnectHandler instance """self.device=deviceself.state_history={}defcapture_state(self,state_name="current",commands=None):""" Capture current device configuration state. Args: state_name: Label for this state (e.g., "pre_change", "post_change") commands: List of commands to run (default: common show commands) Returns: dict: Device state snapshot """ifcommandsisNone:commands=["show ip route summary","show ip interface brief","show running-config | include vlan","show ip bgp summary",]state={"timestamp":datetime.utcnow().isoformat(),"device":self.device.host,"outputs":{}}forcommandincommands:try:output=self.device.send_command(command)state["outputs"][command]=outputexceptExceptionase:state["outputs"][command]=f"ERROR: {str(e)}"# Store in historyself.state_history[state_name]=statereturnstatedefcompare_states(self,state1_name,state2_name):""" Compare two captured states. Args: state1_name: Name of first state (e.g., "pre_change") state2_name: Name of second state (e.g., "post_change") Returns: dict: Differences between states """state1=self.state_history.get(state1_name)state2=self.state_history.get(state2_name)ifnotstate1ornotstate2:raiseValueError("Both states must be captured first")differences={"state1":state1_name,"state2":state2_name,"changes":{}}# Compare command outputsforcommandinstate1["outputs"]:output1=state1["outputs"].get(command,"")output2=state2["outputs"].get(command,"")ifoutput1!=output2:differences["changes"][command]={"before":output1[:200]+"..."iflen(output1)>200elseoutput1,"after":output2[:200]+"..."iflen(output2)>200elseoutput2,}returndifferencesdefsave_state_to_file(self,state_name,filename):""" Save state snapshot to file for recovery. Args: state_name: Name of state to save filename: File path """state=self.state_history.get(state_name)ifnotstate:raiseValueError(f"State '{state_name}' not found")withopen(filename,'w')asf:json.dump(state,f,indent=2)print(f"✓ State saved to {filename}")defload_state_from_file(self,filename):"""Load previously saved state."""withopen(filename,'r')asf:state=json.load(f)self.state_history["loaded"]=statereturnstate
# src/validation.pyfromdataclassesimportdataclassfromtypingimportCallable,Any,List@dataclassclassValidationResult:"""Result of a validation check."""passed:boolmessage:strdetails:dict=NoneclassDeviceValidator:"""Validate device state and trigger rollback if needed."""def__init__(self,device,state_manager):""" Initialize validator. Args: device: Netmiko device state_manager: DeviceStateManager instance """self.device=deviceself.state_manager=state_managerself.validators=[]defadd_validator(self,name:str,validation_func:Callable):""" Register a validation function. Args: name: Validator name validation_func: Function that returns ValidationResult """self.validators.append((name,validation_func))defvalidate_all(self)->List[ValidationResult]:""" Run all registered validators. Returns: list: Validation results """results=[]forname,validator_funcinself.validators:try:result=validator_func()results.append(result)status="✓"ifresult.passedelse"✗"print(f"{status}{name}: {result.message}")exceptExceptionase:result=ValidationResult(passed=False,message=f"Validator failed: {str(e)}",details={"error":str(e)})results.append(result)returnresultsdefall_passed(self,results:List[ValidationResult])->bool:"""Check if all validations passed."""returnall(r.passedforrinresults)# Example validators for common checksdefvalidate_no_config_errors(device)->ValidationResult:"""Check device has no syntax errors."""output=device.send_command("show running-config")if"ERROR"inoutputor"invalid"inoutput.lower():returnValidationResult(passed=False,message="Configuration contains errors")returnValidationResult(passed=True,message="No configuration errors detected")defvalidate_interfaces_up(device,min_interfaces=2)->ValidationResult:"""Check minimum number of interfaces are up."""output=device.send_command("show ip interface brief")# Count 'up' interfacesup_count=output.count(" UP ")ifup_count<min_interfaces:returnValidationResult(passed=False,message=f"Only {up_count} interfaces up (expected {min_interfaces})",details={"up_interfaces":up_count,"required":min_interfaces})returnValidationResult(passed=True,message=f"{up_count} interfaces up (required {min_interfaces})")defvalidate_bgp_neighbors_established(device,min_neighbors=1)->ValidationResult:"""Check BGP neighbors are established."""output=device.send_command("show ip bgp summary")# Count 'Established' neighborsestablished=output.count("Established")ifestablished<min_neighbors:returnValidationResult(passed=False,message=f"Only {established} BGP neighbors established (expected {min_neighbors})")returnValidationResult(passed=True,message=f"{established} BGP neighbors established")defvalidate_vlans_created(device,expected_vlans:List[int])->ValidationResult:"""Check expected VLANs exist."""output=device.send_command("show vlan brief")missing=[]forvlan_idinexpected_vlans:iff"VLAN{vlan_id:04d}"notinoutput:missing.append(vlan_id)ifmissing:returnValidationResult(passed=False,message=f"VLANs missing: {missing}",details={"missing_vlans":missing})returnValidationResult(passed=True,message=f"All {len(expected_vlans)} expected VLANs created")
# src/rollback.pyfromstate_managementimportDeviceStateManagerfromvalidationimportDeviceValidatorclassRollbackManager:"""Manage configuration rollback."""def__init__(self,device):"""Initialize rollback manager."""self.device=deviceself.state_manager=DeviceStateManager(device)self.rollback_config=Nonedefsafe_config_deploy(self,change_func,validators:list)->bool:""" Apply configuration changes with automatic rollback on failure. Args: change_func: Function that applies changes (takes device as arg) validators: List of (validator_name, validator_func) tuples Returns: bool: True if successful, False if rolled back """print("Step 1: Capturing pre-change configuration...")self.state_manager.capture_state("pre_change")self.state_manager.save_state_to_file("pre_change","/tmp/pre_change.json")print("\nStep 2: Applying configuration changes...")try:change_func(self.device)exceptExceptionase:print(f"✗ Configuration deployment failed: {str(e)}")print("Performing emergency rollback...")self._rollback_to_saved_state()returnFalseprint("\nStep 3: Capturing post-change configuration...")self.state_manager.capture_state("post_change")print("\nStep 4: Validating changes...")validator=DeviceValidator(self.device,self.state_manager)# Register all validatorsforvalidator_name,validator_funcinvalidators:validator.add_validator(validator_name,lambdavf=validator_func:vf(self.device))results=validator.validate_all()ifnotvalidator.all_passed(results):print("\n✗ Post-deployment validation FAILED")print("Performing automatic rollback...")self._rollback_to_saved_state()returnFalseprint("\n✓ All validations PASSED")print("Configuration change successful!")returnTruedef_rollback_to_saved_state(self):"""Rollback to pre-change state."""print("Loading pre-change configuration...")# Get running config from before change# This is device-specific; example for Cisco IOS:pre_state=self.state_manager.state_history.get("pre_change")ifnotpre_state:print("✗ ERROR: No pre-change state available!")return# For full rollback, reload from backup or use NVRAMprint("Reloading from startup configuration...")self.device.send_command("reload")self.device.send_command("yes")# Confirm reload# Wait for reload (in production, use proper wait logic)importtimetime.sleep(60)print("✓ Rollback to pre-change state...")
# ✅ GOODstate_mgr.capture_state("pre_change")make_changes()state_mgr.capture_state("post_change")# ❌ BAD - No pre-state to rollback tomake_changes()state_mgr.capture_state("post_change")
# ✅ GOOD - Specific, testable conditiondefvalidate_vlan_100_exists(device):output=device.send_command("show vlan id 100")if"VLAN0100"inoutput:returnValidationResult(passed=True,message="VLAN 100 exists")returnValidationResult(passed=False,message="VLAN 100 missing")# ❌ BAD - Vague, unmeasurabledefvalidate_config_good(device):# "Good"? What does that mean?returnValidationResult(passed=True,message="Config looks good")
# ✅ GOOD - Deploy to 5% first, then 50%, then 100%devices=get_all_devices()deploy_to_devices(devices[0:len(devices)//20])# 5%validate_all()deploy_to_devices(devices[len(devices)//20:len(devices)//2])# 45%validate_all()deploy_to_devices(devices[len(devices)//2:])# 50%# ❌ BAD - All-or-nothing deploymentdeploy_to_devices(all_devices)
# ✅ GOOD - Rollback is straightforward# Option 1: Reload from NVRAMdevice.send_command("reload")# Option 2: Restore from backuprestore_from_backup(device)# ❌ BAD - Complex rollback that might failtry_to_undo_each_command()# Fragile!
# pytestdeftest_rollback_works(mock_device):"""Ensure rollback actually works."""rollback=RollbackManager(mock_device)# Capture staterollback.state_manager.capture_state("pre_change")# Make bad change that fails validationdefbad_change(dev):dev.send_command("shutdown all interfaces")# BAD!validators=[("interfaces_up",validate_interfaces_up)]success=rollback.safe_config_deploy(bad_change,validators)assertsuccessisFalse# Should have rolled backassertmock_device.reload.called# Verify reload was called