"From Working Scripts to Enterprise Systems โ Advanced Patterns for Real Deployments"¶
You've now built functional Nornir automation (Tutorial #2) and enterprise-grade systems (Tutorial #3). But there's still a gap between "working for your test network" and "reliable across thousands of devices managed by multiple teams."
This tutorial covers the advanced patterns used in production Nornir deployments at scale.
"""Custom Nornir inventory plugin for NetboxFetches devices from Netbox API instead of YAML files"""fromnornir.core.inventoryimport(Inventory,Group,Host,Groups,Hosts,Defaults,)importrequestsfromtypingimportAny,Dict,OptionalclassNetboxInventory:""" Fetch inventory from Netbox Credentials from environment variables """def__init__(self,nb_url:str,nb_token:str,filters:Optional[Dict[str,str]]=None,):""" Args: nb_url: Netbox API URL (e.g., https://netbox.yourcompany.com/api/) nb_token: Netbox API token filters: Query filters (e.g., {"site": "New York"}) """self.nb_url=nb_urlself.nb_token=nb_tokenself.filters=filtersor{}defload(self)->Inventory:"""Fetch devices from Netbox and return Nornir Inventory"""# Fetch devices from Netbox APIheaders={"Authorization":f"Token {self.nb_token}"}params=self.filtersresponse=requests.get(f"{self.nb_url}dcim/devices/",headers=headers,params=params)response.raise_for_status()devices=response.json()['results']# Build Nornir inventoryhosts={}groups={}defaults=Defaults()fordeviceindevices:name=device['name']ip=device.get('primary_ip',{}).get('address','').split('/')[0]device_type=device.get('device_type',{}).get('model','').lower()site=device.get('site',{}).get('name','unknown')# Determine Netmiko device type from Netbox device typeif'cat'indevice_typeor'switch'indevice_type:nornir_device_type='cisco_ios'elif'router'indevice_type:nornir_device_type='cisco_ios'elif'3850'indevice_type:nornir_device_type='cisco_ios'else:nornir_device_type='cisco_ios'# Default# Create groups if neededifsitenotingroups:groups[site]=Group(name=site)# Create hosthosts[name]=Host(name=name,hostname=ip,groups=[groups[site]],data={'device_type':nornir_device_type,'netbox_id':device['id'],'device_type_model':device_type,'serial':device.get('serial_number',''),})returnInventory(hosts=Hosts(hosts),groups=Groups(groups),defaults=defaults)# Usage in nornir_config.yaml:# inventory:# plugin: plugins.netbox_inventory.NetboxInventory# options:# nb_url: ${NETBOX_URL}# nb_token: ${NETBOX_TOKEN}# filters:# site: "New York"
Gotcha 1A: "Token Expired" error during backup
- Root cause: Netbox token rotated while Nornir was running
- Solution: Reload inventory on each run instead of caching
Gotcha 1B: Missing "primary_ip" in Netbox
- Root cause: Device added to Netbox but IP not assigned
- Solution: Add fallback: ip = device.get('primary_ip', {}).get('address', device['name'])
Gotcha 1C: Device types don't map correctly
- Root cause: Netbox device type names don't match vendor expectations
- Solution: Build mapping table or use device role instead of type
๐ Pattern 2: Middleware for Cross-Cutting Concerns¶
Middleware runs before and after each task. Perfect for:
"""Nornir middleware for logging, metrics, and validation"""fromnornir.core.inventoryimportHostfromnornir.core.taskimportTask,Resultimportloggingimporttimelogger=logging.getLogger(__name__)# =====================================================================# PRE-TASK MIDDLEWARE: Validation and setup# =====================================================================defvalidate_device(task:Task)->None:""" Pre-flight check before each task Validate device is reachable """host=task.hostlogger.debug(f"[Pre-task] Validating {host.name}")# Example: Check if device credentials are setifnothost.password:raiseValueError(f"No password configured for {host.name}")# Could also do ping check, device type validation, etc.# =====================================================================# POST-TASK MIDDLEWARE: Logging and metrics# =====================================================================deflog_results(task:Task,result:Result)->None:""" Post-task logging and metrics """host=task.hoststatus="โ Success"ifnotresult.failedelse"โ Failed"logger.info(f"[Post-task] {host.name}: {status}")ifresult.failed:logger.error(f"[Error] {host.name}: {result.exception}")defalert_on_failure(task:Task,result:Result)->None:""" Alert (send Slack/email) if task fails """ifresult.failed:# Example: Send Slack notification# send_slack_alert(f"Task failed on {task.host.name}")logger.warning(f"Alert: Task failed on {task.host.name}")# =====================================================================# Using Middleware in Nornir# =====================================================================# In your main.py:fromnornir.core.taskimportTaskdefmain():nornir=InitNornir(config_file="nornir_config.yaml")# Register middleware (runs on ALL tasks)nornir.config.hooks['task_start']=[validate_device]nornir.config.hooks['task_ok']=[log_results]nornir.config.hooks['task_failed']=[log_results,alert_on_failure]# Now all tasks get pre/post processing automaticallyresults=nornir.run(task=my_task)
โก Pattern 3: Error Handling with Exponential Backoff¶
"""Resilient tasks with automatic retry logic"""importtimeimportloggingfromfunctoolsimportwrapsfromnornir.core.taskimportTask,Resultlogger=logging.getLogger(__name__)defretry_on_failure(max_retries:int=3,backoff_factor:float=2.0):""" Decorator for automatic retry with exponential backoff Usage: @task @retry_on_failure(max_retries=3, backoff_factor=2.0) def my_task(task): # This will retry 3 times if it fails """defdecorator(func):@wraps(func)defwrapper(task:Task,*args,**kwargs)->Result:host=task.hostattempt=0last_exception=Nonewhileattempt<max_retries:try:attempt+=1logger.info(f"[{host.name}] Attempt {attempt}/{max_retries}")# Execute taskresult=func(task,*args,**kwargs)ifnotresult.failed:ifattempt>1:logger.info(f"[{host.name}] Succeeded on attempt {attempt}")returnresultelse:last_exception=result.exceptionexceptExceptionase:last_exception=e# Wait before retry (exponential backoff)ifattempt<max_retries:wait_time=backoff_factor**(attempt-1)logger.warning(f"[{host.name}] Retry in {wait_time}s...")time.sleep(wait_time)# All retries failedlogger.error(f"[{host.name}] Failed after {max_retries} attempts")returnResult(host=task.host,result={'success':False,'error':str(last_exception)},failed=True)returnwrapperreturndecorator# Usage:@task@retry_on_failure(max_retries=3,backoff_factor=1.5)defresilient_backup(task:Task)->Result:# This automatically retries on failure# Waits: 1.5^0=1s, then 1.5^1=1.5s, then 1.5^2=2.25s between retriespass
Gotcha 3A: Retrying idempotent tasks
- Problem: If a task partially succeeds (config saved but validation failed), retry saves duplicate
- Solution: Make tasks idempotent (safe to run twice) OR track state (is this already done?)
Gotcha 3B: Exponential backoff is too aggressive
- Problem: Waiting 2^5=32 seconds between retries = slow job
- Solution: Use backoff_factor=1.2 (12% increase) instead of 2.0 (100% increase)
Gotcha 3C: Retrying won't help if issue is permanent
- Problem: Device password expired = will never work, just wastes time
- Solution: Add circuit breaker pattern (stop retrying if error is permanent)
Gotcha 5B: Command outputs differently between vendors
- Problem:show running-config vs show configuration = different format
- Solution: Normalize output parser (strip vendor-specific headers)
Gotcha 5C: Not all vendors support all features
- Problem: You check for spanning-tree on a Junos router (doesn't use STP)
- Solution: Make compliance checks vendor-aware
๐ Pattern 6: Memory Optimisation for 10k+ Devices¶
When managing thousands of devices, memory becomes critical:
"""Memory-efficient processing for large-scale operations"""fromnornirimportInitNornirimportgcdefbackup_large_network():"""Process 10,000+ devices without memory issues"""nornir=InitNornir(config_file="nornir_config.yaml")# Batch processing instead of loading all at oncebatch_size=100total_devices=len(nornir.inventory.hosts)foriinrange(0,total_devices,batch_size):# Process one batchdevice_names=list(nornir.inventory.hosts.keys())[i:i+batch_size]batch=nornir.filter(func=lambdah:h.nameindevice_names)results=batch.run(task=backup_config)# Process results immediately (don't accumulate)fordevice_name,resultinresults.items():save_to_database(device_name,result)# Clear memorydelresultsgc.collect()logger.info(f"Completed backup of {total_devices} devices")defsave_to_database(device_name,result):"""Stream results to database instead of holding in memory"""# Write to database immediatelyconn=sqlite3.connect("backup.db")cursor=conn.cursor()# ... save logic ...conn.close()
Gotcha 6A: Batch size is wrong
- Problem: Batch size of 1000 = memory spike again
- Solution: Start with 100, monitor memory. Formula: batch_size = available_ram_mb / (config_size_mb * 2)
Gotcha 6B: Losing progress on failure
- Problem: Batch 50 of 100 fails, entire batch lost
- Solution: Save backup_id to database immediately, mark status as "saved" even if later steps fail
Gotcha 6C: Database writes become the bottleneck
- Problem: Fast backups, slow database writes = queue backs up
- Solution: Use connection pooling, batch database inserts (50 at a time), or use async DB driver
"""Unit tests for Nornir tasksUsing pytest and mocking"""importpytestfromunittest.mockimportMock,patch,MagicMockfromnornir.core.taskimportResultfromnornir.core.inventoryimportHost,Groupfromtasks.enterprise_backupimportbackup_config,compliance_check@pytest.fixturedefmock_host():"""Create a mock host for testing"""host=Mock(spec=Host)host.name="test-router"host.hostname="192.168.1.1"host.password="testpass"host.data={'device_type':'cisco_ios'}host.groups=[]returnhost@pytest.fixturedefmock_task(mock_host):"""Create a mock Nornir task"""fromnornir.core.taskimportTasktask=Mock(spec=Task)task.host=mock_hosttask.run=Mock()returntaskdeftest_backup_config_success(mock_task):"""Test successful config backup"""# Mock the netmiko responsetest_config="hostname test-router\n"*100# Simulated configmock_result=Mock()mock_result.result=test_configmock_task.run.return_value=[mock_result]# Call the taskresult=backup_config(mock_task)# Assertionsassertresult.result['success']==Trueassertresult.result['config']==test_configassertlen(result.result['hash'])==64# SHA256 hash lengthdeftest_backup_config_failure(mock_task):"""Test backup failure handling"""# Mock a failed connectionmock_task.run.side_effect=Exception("Connection timeout")# Call the taskresult=backup_config(mock_task)# Assertionsassertresult.failed==Trueassertresult.result['success']==Falsedeftest_compliance_check():"""Test compliance scoring"""# Create a compliant configcompliant_config=""" banner motd # Authorized Access Only # logging 10.1.1.1 enable secret 5 $1$12345... access-list 1 permit any ntp server 8.8.8.8 snmp-server host 10.1.1.2 """# Create a non-compliant confignon_compliant_config="hostname test-device\n"# Test with mock taskfromunittest.mockimportpatchwithpatch('sqlite3.connect'):mock_task=Mock()mock_task.host.name="test"# Example assertions (call the real function in your test suite)result_good=compliance_check(mock_task,compliant_config)result_bad=compliance_check(mock_task,non_compliant_config)assertresult_good.result['score']>result_bad.result['score']assertresult_bad.result['score']<70if__name__=="__main__":pytest.main([__file__,"-v"])
"""Profile Nornir task performance"""importcProfileimportpstatsimportiofromcontextlibimportcontextmanager@contextmanagerdefprofile_task(task_name:str):"""Context manager for profiling tasks"""profiler=cProfile.Profile()profiler.enable()try:yieldprofilerfinally:profiler.disable()# Print statss=io.StringIO()ps=pstats.Stats(profiler,stream=s).sort_stats('cumulative')ps.print_stats(20)# Top 20 functionsprint(f"\nProfile Results for {task_name}:")print(s.getvalue())# Usage:defmain():withprofile_task("backup_operation")asprofiler:nornir=InitNornir(config_file="nornir_config.yaml")results=nornir.run(task=backup_config)# Output shows slowest operations -> optimise those first
importjsonfromdatetimeimportdatetimedefsave_benchmark_history(metrics,filename='benchmark_history.json'):"""Save benchmark results for historical comparison"""try:withopen(filename,'r')asf:history=json.load(f)exceptFileNotFoundError:history=[]metrics['timestamp']=datetime.now().isoformat()history.append(metrics)withopen(filename,'w')asf:json.dump(history,f,indent=2)# In main:benchmark_results=runner.metrics['backup_config (10 workers)']save_benchmark_history(benchmark_results)# Later, compare:# Was performance 2 weeks ago 5 devices/sec? Is it now 4 devices/sec?# Something changed - investigate!
# Before additiondevices:500,duration:50s,throughput:10dev/sec# After additiondevices:600,duration:55s,throughput:10.9dev/sec# Analysis: Throughput stayed same โ network is bottleneck, not code
๐ฏ Connection to PRIME Framework & Consulting Services¶
These advanced patterns are what enable the Implement stage of the PRIME Framework to scale:
Pragmatic: Use proven patterns, not experimental approaches
Transparent: Logging, profiling, and metrics built-in
Reliable: Error handling, retry logic, and testing ensure production readiness
This is where consulting engagements live โ organisations pay for someone who knows these patterns and can architect systems correctly from the start.
The patterns in this tutorial are battle-tested in real enterprises managing thousands of devices. They exist because they solve real problems. But the best pattern is the one that fits YOUR network, YOUR team, and YOUR constraints.
Use what works. Ignore what doesn't. Build systematically.