#!/usr/bin/env python3"""Get data from REST API and process JSON responseExample: Cisco Meraki Dashboard API"""importrequestsimportjson# API configurationAPI_KEY="your_meraki_api_key_here"ORG_ID="your_org_id_here"BASE_URL="https://api.meraki.com/api/v1"headers={'X-Cisco-Meraki-API-Key':API_KEY,'Content-Type':'application/json'}defget_networks(org_id):""" Get all networks in an organization Returns list of network dictionaries """url=f"{BASE_URL}/organizations/{org_id}/networks"try:response=requests.get(url,headers=headers)# Check if request was successfulresponse.raise_for_status()# Raises HTTPError for bad responses (4xx, 5xx)# Parse JSON responsenetworks=response.json()# .json() method automatically parses JSON responsereturnnetworksexceptrequests.exceptions.HTTPErrorase:print(f"โ HTTP Error: {e}")print(f" Response: {e.response.text}")returnNoneexceptrequests.exceptions.ConnectionError:print("โ Connection Error: Could not reach API")returnNoneexceptjson.JSONDecodeError:print("โ Invalid JSON response")returnNonedefmain():"""Main function"""print("Fetching networks from Meraki API...")networks=get_networks(ORG_ID)ifnetworks:print(f"\nโ Found {len(networks)} networks:")fornetworkinnetworks:# Access JSON fieldsprint(f"\n Network: {network['name']}")print(f" ID: {network['id']}")print(f" Type: {', '.join(network['productTypes'])}")print(f" Time Zone: {network.get('timeZone','Not set')}")# Save to file for later analysiswithopen('meraki_networks.json','w')asf:json.dump(networks,f,indent=2)print(f"\nโ Data saved to meraki_networks.json")else:print("โ Failed to retrieve networks")if__name__=="__main__":main()
#!/usr/bin/env python3"""Structured logging with JSON outputMakes logs machine-parsable for analysis"""importjsonimportloggingfromdatetimeimportdatetimeclassJSONFormatter(logging.Formatter):""" Custom logging formatter that outputs JSON Makes logs queryable by log aggregation systems """defformat(self,record):""" Format log record as JSON """log_data={'timestamp':datetime.utcnow().isoformat()+'Z','level':record.levelname,'logger':record.name,'message':record.getMessage(),'function':record.funcName,'line':record.lineno}# Add exception info if presentifrecord.exc_info:log_data['exception']=self.formatException(record.exc_info)# Add custom fields if presentifhasattr(record,'device'):log_data['device']=record.deviceifhasattr(record,'duration'):log_data['duration_ms']=record.durationreturnjson.dumps(log_data)# Configure logger with JSON formatterlogger=logging.getLogger('network_automation')handler=logging.StreamHandler()handler.setFormatter(JSONFormatter())logger.addHandler(handler)logger.setLevel(logging.INFO)defbackup_device_config(device_name,device_ip):""" Backup device configuration with structured logging """importtimestart_time=time.time()# Log startlogger.info(f"Starting backup for {device_name}",extra={'device':device_name,'ip':device_ip})try:# Simulate backup operationtime.sleep(2)# Simulate work# Calculate durationduration_ms=(time.time()-start_time)*1000# Log successlogger.info(f"Backup completed for {device_name}",extra={'device':device_name,'duration':duration_ms})returnTrueexceptExceptionase:# Log error with exception infologger.error(f"Backup failed for {device_name}: {str(e)}",extra={'device':device_name},exc_info=True)returnFalsedefmain():"""Main function"""devices=[{'name':'router1','ip':'10.1.1.1'},{'name':'router2','ip':'10.1.1.2'},{'name':'switch1','ip':'10.1.1.10'}]logger.info("Starting network backup job",extra={'device_count':len(devices)})fordeviceindevices:backup_device_config(device['name'],device['ip'])logger.info("Backup job completed")if__name__=="__main__":main()
{"timestamp":"2026-03-12T11:45:23.456789Z","level":"INFO","logger":"network_automation","message":"Starting network backup job","function":"main","line":78,"device_count":3}{"timestamp":"2026-03-12T11:45:23.457123Z","level":"INFO","logger":"network_automation","message":"Starting backup for router1","function":"backup_device_config","line":44,"device":"router1","ip":"10.1.1.1"}{"timestamp":"2026-03-12T11:45:25.458456Z","level":"INFO","logger":"network_automation","message":"Backup completed for router1","function":"backup_device_config","line":55,"device":"router1","duration":2001.5}
Why structured JSON logging:
โ Queryable โ Search by device, duration, error type
โ Machine-parsable โ Feed into Elasticsearch, Splunk, Datadog
โ Standardized โ Same format across all automation scripts
โ Aggregatable โ Combine logs from multiple sources easily
#!/usr/bin/env python3"""Store and compare device state in JSONTrack configuration drift over time"""importjsonfromdatetimeimportdatetimefromnetmikoimportConnectHandlerdefcapture_device_state(device_params):""" Capture device state and return as structured data """# Connect to deviceconnection=ConnectHandler(**device_params)# Gather device informationstate={'timestamp':datetime.utcnow().isoformat()+'Z','hostname':device_params['host'],'device_type':device_params['device_type'],'running_config':connection.send_command('show running-config'),'version':connection.send_command('show version'),'interfaces':connection.send_command('show ip interface brief'),'vlan_database':connection.send_command('show vlan brief'),'cdp_neighbors':connection.send_command('show cdp neighbors')}connection.disconnect()returnstatedefsave_device_state(device_name,state):""" Save device state to JSON file with timestamp """timestamp=datetime.now().strftime('%Y%m%d_%H%M%S')filename=f"state_{device_name}_{timestamp}.json"withopen(filename,'w')asf:json.dump(state,f,indent=2)print(f"โ State saved to {filename}")returnfilenamedefcompare_states(state1_file,state2_file):""" Compare two device states Returns differences """# Load stateswithopen(state1_file)asf:state1=json.load(f)withopen(state2_file)asf:state2=json.load(f)differences={}# Compare running configsifstate1['running_config']!=state2['running_config']:differences['running_config']='CHANGED'# Compare VLAN databasesifstate1['vlan_database']!=state2['vlan_database']:differences['vlan_database']='CHANGED'# Compare CDP neighborsifstate1['cdp_neighbors']!=state2['cdp_neighbors']:differences['cdp_neighbors']='CHANGED'returndifferencesdefmain():"""Main function"""device_params={'device_type':'cisco_ios','host':'10.1.1.1','username':'admin','password':'password'}# Capture current stateprint("Capturing device state...")state=capture_device_state(device_params)# Save to JSONfilename=save_device_state('router1',state)print(f"\nโ Device state captured and saved")print(f" Timestamp: {state['timestamp']}")print(f" Hostname: {state['hostname']}")if__name__=="__main__":main()
{"timestamp":"2026-03-12T11:45:30.123456Z","hostname":"10.1.1.1","device_type":"cisco_ios","running_config":"!\nversion 15.6\n...","version":"Cisco IOS Software, C2960 Software...","interfaces":"Interface IP-Address OK? Method Status Protocol\nGigabitEthernet0/1 192.168.1.1 YES NVRAM up up","vlan_database":"VLAN Name Status Ports\n---- -------------------------------- --------- -------------------------------\n1 default active Gi0/2, Gi0/3","cdp_neighbors":"Device ID Local Intrfce Holdtme Capability Platform Port ID\nRouter2 Gig 0/1 150 R ISR4331 Gig 0/0"}
#!/usr/bin/env python3"""Validate JSON data against a schemaEnsures API responses and config files match expected structure"""importjsonimportjsonschemafromjsonschemaimportvalidate# Define JSON schema (what structure we expect)device_schema={"type":"object","properties":{"hostname":{"type":"string"},"ip_address":{"type":"string","pattern":"^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}$"# IP regex},"device_type":{"type":"string","enum":["cisco_ios","cisco_nxos","cisco_iosxr"]},"vlans":{"type":"array","items":{"type":"integer","minimum":1,"maximum":4094}},"enabled":{"type":"boolean"}},"required":["hostname","ip_address","device_type"]}defvalidate_device_data(data):""" Validate device data against schema Returns True if valid, raises exception if invalid """try:validate(instance=data,schema=device_schema)print("โ Data is valid")returnTrueexceptjsonschema.exceptions.ValidationErrorase:print(f"โ Validation error: {e.message}")print(f" Failed at: {' -> '.join(str(x)forxine.path)}")returnFalsedefmain():"""Main function"""# Valid datavalid_device={"hostname":"router1","ip_address":"10.1.1.1","device_type":"cisco_ios","vlans":[10,20,30],"enabled":True}print("Testing valid data:")validate_device_data(valid_device)# Invalid data (wrong IP format)invalid_device1={"hostname":"router2","ip_address":"invalid_ip",# โ Wrong format"device_type":"cisco_ios"}print("\nTesting invalid IP:")validate_device_data(invalid_device1)# Invalid data (missing required field)invalid_device2={"hostname":"router3",# Missing ip_address (required field)"device_type":"cisco_ios"}print("\nTesting missing required field:")validate_device_data(invalid_device2)# Invalid data (wrong VLAN range)invalid_device3={"hostname":"router4","ip_address":"10.1.1.4","device_type":"cisco_ios","vlans":[10,5000]# โ VLAN 5000 exceeds maximum}print("\nTesting invalid VLAN range:")validate_device_data(invalid_device3)if__name__=="__main__":main()
Testing valid data:
โ Data is valid
Testing invalid IP:
โ Validation error: 'invalid_ip' does not match '^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}$'
Failed at: ip_address
Testing missing required field:
โ Validation error: 'ip_address' is a required property
Failed at:
Testing invalid VLAN range:
โ Validation error: 5000 is greater than the maximum of 4094
Failed at: vlans -> 1
Why schema validation matters:
โ API contract enforcement โ Ensure APIs return expected structure
โ Configuration validation โ Catch errors before deployment
โ Documentation โ Schema describes expected data structure
โ Testing โ Automated validation in CI/CD pipelines
#!/usr/bin/env python3"""Process large JSON files efficientlyUse streaming for memory-efficient parsing"""importjsonimportijson# pip install ijsondefprocess_large_json_standard(filename):""" Standard approach: Load entire file into memory Works for small files, fails for large files (>1GB) """withopen(filename)asf:data=json.load(f)# Loads entire file into memory# Process dataforitemindata:process_item(item)defprocess_large_json_streaming(filename):""" Streaming approach: Process items one at a time Memory-efficient for large files """withopen(filename,'rb')asf:# Parse items incrementallyitems=ijson.items(f,'item')# Assumes JSON arrayforiteminitems:# Process one item at a timeprocess_item(item)# Previous items are garbage collecteddefprocess_item(item):"""Process a single JSON item"""# Your processing logic herepass# Example: Process 10GB JSON file with streaming# process_large_json_streaming('huge_inventory.json')
#!/usr/bin/env python3"""JSON performance optimization tips"""importjsonimportujson# pip install ujson (faster JSON library)# Tip 1: Use ujson for faster parsingdata=ujson.loads(json_string)# ~2-4x faster than json.loads()# Tip 2: Minimize serialization round trips# Bad (serializes twice)json_str=json.dumps(data)data_again=json.loads(json_str)# Good (keep as dict until needed)data_copy=data.copy()# Tip 3: Use compact format for storagecompact=json.dumps(data,separators=(',',':'))# Smaller file size# Tip 4: Cache parsed results# Bad (parses every time)foriinrange(1000):data=json.loads(json_string)# Good (parse once, reuse)data=json.loads(json_string)foriinrange(1000):use_data(data)
importjsonimportmathdata={'value':math.nan}# This will failtry:json.dumps(data)exceptValueErrorase:print(f"Error: {e}")# Solution: Convert NaN to None or stringdata={'value':Noneifmath.isnan(data['value'])elsedata['value']}json.dumps(data)# Works
importloggingimportjson# Log as JSON for machine parsinglogger.info(json.dumps({'event':'device_backup','device':'router1','status':'success','duration_ms':2345,'config_size_bytes':45678}))
#!/usr/bin/env python3"""Complete REST API client with JSON handlingDemonstrates best practices for network automation"""importjsonimportrequestsimportloggingfromdatetimeimportdatetimeimporttime# Configure logging with JSON formatterlogging.basicConfig(level=logging.INFO,format='%(message)s')logger=logging.getLogger(__name__)classNetworkAPIClient:""" REST API client for network device management """def__init__(self,config_file='config.json'):"""Initialize client with configuration from JSON file"""# Load configurationwithopen(config_file)asf:self.config=json.load(f)self.base_url=self.config['api']['base_url']self.timeout=self.config['api']['timeout']self.retry_attempts=self.config['api']['retry_attempts']self.api_key=self.config['credentials']['api_key']# Setup sessionself.session=requests.Session()self.session.headers.update({'Content-Type':'application/json','Authorization':f'Bearer {self.api_key}'})def_make_request(self,method,endpoint,**kwargs):""" Make HTTP request with retry logic """url=f"{self.base_url}/{endpoint}"forattemptinrange(self.retry_attempts):try:response=self.session.request(method,url,timeout=self.timeout,**kwargs)response.raise_for_status()returnresponse.json()exceptrequests.exceptions.HTTPErrorase:logger.error(json.dumps({'event':'http_error','status_code':e.response.status_code,'url':url,'attempt':attempt+1}))ifattempt<self.retry_attempts-1:time.sleep(2**attempt)# Exponential backoffelse:raiseexceptrequests.exceptions.RequestExceptionase:logger.error(json.dumps({'event':'request_error','error':str(e),'url':url,'attempt':attempt+1}))ifattempt<self.retry_attempts-1:time.sleep(2**attempt)else:raisedefget_device_info(self,device_name):"""Get device information from API"""logger.info(json.dumps({'event':'get_device_info','device':device_name,'timestamp':datetime.utcnow().isoformat()}))data=self._make_request('GET',f'devices/{device_name}')returndatadefupdate_device_config(self,device_name,config):"""Update device configuration via API"""logger.info(json.dumps({'event':'update_device_config','device':device_name,'timestamp':datetime.utcnow().isoformat()}))# Prepare JSON payloadpayload={'device':device_name,'config':config,'timestamp':datetime.utcnow().isoformat()}data=self._make_request('PUT',f'devices/{device_name}/config',json=payload)returndatadefmain():"""Main function"""# Initialize API clientclient=NetworkAPIClient('config.json')# Process devices from configfordeviceinclient.config['devices']:try:# Get device infoinfo=client.get_device_info(device['name'])logger.info(json.dumps({'event':'device_info_retrieved','device':device['name'],'status':'success'}))exceptExceptionase:logger.error(json.dumps({'event':'device_info_failed','device':device['name'],'error':str(e)}))if__name__=="__main__":main()