PyATS/Genie workflows are intended for Linux and macOS. If you're working from Windows, use WSL2 (recommended), a Linux VM, or a Linux container before following the examples.
frompyats.topologyimportloader# Load testbed — reads device definitions and credentialstestbed=loader.load('testbed.yaml')# Get specific device from testbed dictionarydevice=testbed.devices['switch-01']# Connect to device — initiates SSH session using testbed credentialsdevice.connect()# Parse VLAN output into structured data# device.parse() sends 'show vlan' command to device# Genie parser (specific to device OS) converts text output to dictvlan_output=device.parse('show vlan')# Visualize the structure by printing as JSONimportjsonprint(json.dumps(vlan_output,indent=2))# This shows exact structure so you know how to access data
# Parse show vlan command and get dictionaryvlans=device.parse('show vlan')# Navigate to specific VLAN# vlans is dict with 'vlans' key# vlans['vlans'] gives us dict of VLAN IDs# vlans['vlans']['10'] gives us VLAN 10's datavlan_10=vlans['vlans']['10']# vlan_10 now contains: {'name': 'DATA', 'status': 'active', 'interfaces': {...}}# Access VLAN 10 propertiesprint(vlan_10['name'])# Returns: "DATA" — the VLAN nameprint(vlan_10['status'])# Returns: "active" — whether VLAN is active or suspendedprint(list(vlan_10['interfaces'].keys()))# Returns: ['Ethernet1/3', 'Ethernet1/4'] — list of interface names in this VLAN
# Check if VLAN existsif'20'invlans['vlans']:print("✅ VLAN 20 found")else:print("❌ VLAN 20 NOT found")# Check VLAN nameassertvlans['vlans']['10']['name']=='DATA',"VLAN 10 name is wrong!"# Check interfaces in VLANvlan_10_ports=vlans['vlans']['10']['interfaces']assert'Ethernet1/3'invlan_10_ports,"Ethernet1/3 not in VLAN 10!"
# Parse show ip route commandroutes=device.parse('show ip route')# Dictionary structure varies by OS, but generally:# routes['route'] contains the routing table (dict of prefixes)# Each prefix maps to route information (next hops, metrics, protocols)# Navigate routing table# routes['route'] is dict where keys are IP prefixes (like "10.0.0.0/24")forprefix,route_datainroutes['route'].items():# prefix examples: "10.0.0.0/24", "192.168.1.0/24"# route_data is dict with next hop, metric, and protocol infoprint(f"{prefix}: {route_data}")# Output example:# 10.0.0.0/24: {'': [{'metric': '0', 'next_hop': {'192.168.1.1': {...}}}]}
# Parse show interfaces commandinterfaces=device.parse('show interfaces')# Dictionary where keys are interface names# interfaces['Ethernet1/1'] has all data for that interface# Check which interfaces are operationally upinterfaces_up=[nameforname,dataininterfaces.items()# Iterate through all interfacesifdata.get('oper_status')=='up'# Filter only those with oper_status='up']print(f"Interfaces up: {len(interfaces_up)}")# Output example: Interfaces up: 48# Detailed interface statusforiface_name,iface_dataininterfaces.items():status=iface_data.get('oper_status')# 'up' or 'down'mtu=iface_data.get('mtu')# Interface MTUmac=iface_data.get('mac_address')# MAC addresserrors=iface_data.get('counters',{}).get('in_errors',0)# Safely access nested dict with .get() to avoid KeyErrorprint(f"{iface_name}: {status} (MTU:{mtu}, MAC:{mac}, Errors:{errors})")
# Check that minimum number of interfaces are upmin_expected_up=48assertlen(interfaces_up)>=min_expected_up, \
f"Expected {min_expected_up} up, got {len(interfaces_up)}"# Check for error counters (indicates hardware problems)eth1_data=interfaces['Ethernet1/1']input_errors=eth1_data.get('counters',{}).get('in_errors',0)output_errors=eth1_data.get('counters',{}).get('out_errors',0)assertinput_errors==0,f"Ethernet1/1 has input errors: {input_errors}"assertoutput_errors==0,f"Ethernet1/1 has output errors: {output_errors}"
# Check that all BGP neighbors are establishedrequired_neighbors=['10.0.0.1','10.0.0.2']forneighbor_ipinrequired_neighbors:# Check neighbor exists in BGP summaryassertneighbor_ipinneighbors, \
f"BGP neighbor {neighbor_ip} not in summary!"# Check neighbor is in 'up' state (established)state=neighbors[neighbor_ip]['state']assertstate=='up', \
f"BGP neighbor {neighbor_ip} not up! State: {state}"
# List all available parsersfromgenie.libs.parser.utils.commonimportParserLookup
frompyats.topologyimportloader
testbed=loader.load('testbed.yaml')device=testbed.devices['switch-01']# Get available commands for this devicelookup=ParserLookup(device.os)forcommandinsorted(lookup.keys()):
if'vlan'incommand.lower():
print(command)# Output examples:# show vlan# show vlan access-log# show vlan id
deftest_access_port_compliance(device):""" Validate: All access ports must be in VLAN 10 (DATA) Except: Port Et1/48 which should be in VLAN 20 (MGMT) This pattern proves that your VLAN configuration matches requirements """# Parse VLAN data — returns structured dict of all VLANs and their portsvlan_data=device.parse('show vlan')# Define REQUIRED configuration: which ports MUST be in which VLANsrequired_config={'10':['Ethernet1/1','Ethernet1/2','Ethernet1/3','Ethernet1/4'],# VLAN 10 must have these 4 ports (DATA VLAN)'20':['Ethernet1/48'],# VLAN 20 must have this 1 port (MGMT VLAN)}# For each VLAN in required config, validate all ports are thereforvlan_id,required_portsinrequired_config.items():# Get actual ports in this VLAN from device parsingactual_ports=list(vlan_data['vlans'][vlan_id]['interfaces'].keys())# vlan_data['vlans'][vlan_id] gets VLAN's dict# ['interfaces'] gets the interfaces dict for this VLAN# .keys() gets interface names# list() converts to list# Validate: all required ports are actually thereforportinrequired_ports:# Check each required portassertportinactual_ports, \
f"{port} missing from VLAN {vlan_id}!"# If port not found, assertion fails with error message# Also warn about UNEXPECTED ports (ports that shouldn't be there)unexpected=set(actual_ports)-set(required_ports)# set() creates set of port names# actual_ports - required_ports gives ports in actual but not requiredifunexpected:# Print warning about extra ports (not required, but not harmful)print(f"⚠️ Warning: Unexpected ports in VLAN {vlan_id}: {unexpected}")# This warns ops team but doesn't fail testprint("✅ VLAN compliance check passed")# All assertions passed = all ports are correctly assigned
How this works step-by-step:
Parse show vlan (get current port-to-VLAN mapping)
Define required mapping (what ports SHOULD be in each VLAN)
For each VLAN: extract actual ports from parsing
Compare: actual ports must include all required ports
Warn: if extra ports found that shouldn't be there
deftest_interface_health(device,expected_up_count=48):""" Validate: Expected number of interfaces are operational Also Check: No disabled ports, no down ports, no errors This pattern proves infrastructure is healthy after deployment """# Parse show interfaces — get detailed data for every interfaceinterfaces=device.parse('show interfaces')# Initialize counters to track interface statusinterfaces_up=0# Count how many interfaces are operationally upinterfaces_errors=0# Count interfaces with errors (input or output errors)interfaces_down=[]# List of interface names that are down# Iterate through all interfaces on deviceforiface_name,iface_dataininterfaces.items():# iface_name examples: 'Ethernet1/1', 'Ethernet1/2'# iface_data is dict with all interface properties# Skip loopbacks and management interfaces (not data ports)if'Loopback'iniface_nameor'Management'iniface_name:continue# Skip these interface types — we're only checking data ports# Count how many are operationally upififace_data.get('oper_status')=='up':# .get() safely accesses dict key (returns None if missing)interfaces_up+=1# Increment counter if interface is upelse:# Interface is NOT up (down, testing, etc.)interfaces_down.append(iface_name)# Add to list of down interfaces for reporting# Check for error counters (indicates hardware or line problems)input_errors=iface_data.get('counters',{}).get('in_errors',0)# Get input error count from counters dict (default 0 if missing)output_errors=iface_data.get('counters',{}).get('out_errors',0)# Get output error count (default 0 if missing)# Warn if either error counter is non-zeroifinput_errors>0oroutput_errors>0:interfaces_errors+=1# Increment error counterprint(f"⚠️ {iface_name}: errors detected (in:{input_errors}, out:{output_errors})")# Print warning with error details# Run assertions — fail test if any condition not met# Assertion 1: Minimum number of interfaces must be upassertinterfaces_up>=expected_up_count, \
f"Expected {expected_up_count} interfaces up, got {interfaces_up}"# Shows expected vs actual in error message# Assertion 2: No interfaces should have errorsassertinterfaces_errors==0, \
f"Found {interfaces_errors} interfaces with errors"# Assertion 3: All expected interfaces should be up (none down)assertlen(interfaces_down)==0, \
f"Expected all interfaces up, but these are down: {interfaces_down}"# List the down interfaces in error messageprint(f"✅ Interface health check passed ({interfaces_up} interfaces up)")# All assertions passed = infrastructure is healthy
How this works step-by-step:
Parse show interfaces (get status of every interface)
Skip management/loopback interfaces (not data ports)
Count up interfaces
Check for error counters (indicates hardware problems)
Assert: minimum number up, no errors, no unexpected downs
deftest_critical_routes(device):""" Validate: Critical routes exist and are reachable This pattern proves your network topology changes worked """routes=device.parse('show ip route')required_routes=['10.0.0.0/8',# Corporate network'192.168.0.0/16',# Management network'172.16.0.0/12',# WAN network]available_routes=list(routes['route'].keys())forrequiredinrequired_routes:assertrequiredinavailable_routes, \
f"Critical route {required} not found in routing table!"route_info=routes['route'][required]print(f"✅ Route {required} exists")print("✅ Routing validation passed")
importpytestfrompyats.topologyimportloader@pytest.fixturedefdevice_with_timeout(testbed):"""Connect with timeout handling"""device=testbed.devices['slow_switch']try:device.connect(timeout=30)# 30-second timeoutyielddeviceexceptExceptionase:pytest.fail(f"Failed to connect to device: {e}")finally:try:device.disconnect()except:pass# Connection already closed or never openeddeftest_with_retry(device_with_timeout):"""Retry parsing if device is slow"""max_retries=3retry_count=0whileretry_count<max_retries:try:vlan_data=device_with_timeout.parse('show vlan')break# SuccessexceptExceptionase:retry_count+=1ifretry_count>=max_retries:pytest.fail(f"Failed after {max_retries} retries: {e}")print(f"⚠️ Retry {retry_count}/{max_retries}")
deftest_with_graceful_fallback(device):""" Validate routing even if BGP isn't configured """# Try to get BGP data, but don't fail if not availablebgp_data=Nonetry:bgp_data=device.parse('show ip bgp summary')exceptException:print("⚠️ BGP not configured on this device")# Validate routing (always available)routes=device.parse('show ip route')assertroutesisnotNone,"No routing table?"# Optional: validate BGP only if availableifbgp_data:neighbors=bgp_data['device']['bgp_id']['65000']['neighbors']assertlen(neighbors)>0,"BGP neighbors not established"print("✅ Validation passed")
deftest_with_vault_credentials(device):"""Credentials are automatically decrypted from vault"""# No need to pass password - PyATS handles itdevice.connect()# ... run your testsdevice.disconnect()
importpytestfrompyats.topologyimportloader@pytest.fixture(scope='session')deftestbed():returnloader.load('testbed.yaml')@pytest.mark.parameterise('device_name',['switch-01','switch-02','switch-03'])deftest_vlan_on_all_switches(testbed,device_name):""" Run the same test on all switches PyTest will run this 3 times (once per device) """device=testbed.devices[device_name]device.connect()# Validate VLAN 10 existsvlans=device.parse('show vlan')assert'10'invlans['vlans'],f"VLAN 10 missing on {device_name}!"device.disconnect()
deftest_consistent_vlan_config(testbed):""" Ensure all switches have the same VLAN configuration """all_devices_vlans={}# Collect VLAN data from all switchesfordevice_namein['switch-01','switch-02','switch-03']:device=testbed.devices[device_name]device.connect()vlans=device.parse('show vlan')all_devices_vlans[device_name]=set(vlans['vlans'].keys())device.disconnect()# Comparereference_vlans=all_devices_vlans['switch-01']fordevice_name,vlansinall_devices_vlans.items():assertvlans==reference_vlans, \
f"{device_name} VLAN config differs from switch-01!"print("✅ All switches have consistent VLAN configuration")
frompyats.topologyimportloaderfromnetmikoimportConnectHandlerimportpytest@pytest.fixturedeftestbed():returnloader.load('testbed.yaml')@pytest.fixturedefdevice(testbed):dev=testbed.devices['switch-01']dev.connect()yielddevdev.disconnect()defcapture_baseline(device):"""Capture state BEFORE automation"""return{'vlans':set(device.parse('show vlan')['vlans'].keys()),'interfaces_up':sum(1foriface_dataindevice.parse('show interfaces').values()ififace_data.get('oper_status')=='up'),}defrun_automation(device):"""Execute your actual automation"""fromnetmikoimportConnectHandlernet_connect=ConnectHandler(device_type='cisco_ios',host=device.connections.cli.ip,username='admin',password='...',)# Configure new VLANnet_connect.send_config_set(['vlan 100','name AUTOMATION-TEST',])net_connect.disconnect()defvalidate_automation(device,baseline):"""Validate state AFTER automation"""after={'vlans':set(device.parse('show vlan')['vlans'].keys()),'interfaces_up':sum(1foriface_dataindevice.parse('show interfaces').values()ififace_data.get('oper_status')=='up'),}# Validate changesnew_vlans=after['vlans']-baseline['vlans']assert'100'innew_vlans,"VLAN 100 not created!"assertafter['interfaces_up']==baseline['interfaces_up'], \
"Interfaces changed state!"returnafterdeftest_vlan_automation_with_validation(device):"""Complete automation with before/after validation"""baseline=capture_baseline(device)print(f"Before: {len(baseline['vlans'])} VLANs, {baseline['interfaces_up']} interfaces up")run_automation(device)after=validate_automation(device,baseline)print(f"After: {len(after['vlans'])} VLANs, {after['interfaces_up']} interfaces up")print("✅ Automation validated successfully")