"From Sequential Scripts to Parallel Tasks β The Foundation of Enterprise Automation"ΒΆ
Now that you understand why Nornir matters (Tutorial #1), let's learn how to use it.
In this tutorial, we'll build your first Nornir automation. You'll take the same logic from Beginner Tutorial #3 and transform it into a parallel system that runs 10x faster.
Best part: The business logic (how to connect to devices and retrieve configs) is almost identical. Nornir just handles the parallelization automatically.
#!/usr/bin/env python3"""Absolute minimal Nornir examplePing 3 devices in parallel"""fromnornirimportInitNornirfromnornir.core.taskimportTask,Resultfromnornir_netmiko.tasksimportnetmiko_send_command@taskdefping_device(task:Task)->Result:""" Ping a device and return result This runs once per device, in parallel """device_name=task.host.nameresult=task.run(netmiko_send_command,command_string="ping 8.8.8.8 repeat 3")returnResult(host=task.host,result=result[task.host.name].result)# Initialize Nornirnr=InitNornir(config_file="nornir_config.yaml")# Run the task on all devicesresults=nr.run(task=ping_device)# Print resultsfordevice_name,result_objinresults.items():ifresult_obj.failed:print(f"β {device_name}: FAILED")else:print(f"β {device_name}: SUCCESS")
What just happened: All 3 devices ran in parallel. You can feel the difference if you time it (time python minimal_example.py on Linux/Mac, Measure-Command { python minimal_example.py } in PowerShell). With a sequential script, it would take 3x longer.
Key insight: The @task decorator with num_workers: 3 handled all the parallelization automatically. You focused on the logic, Nornir handled the concurrency.
π Understanding Task Execution Flow in NornirΒΆ
The diagram below shows how your tasks execute in parallel:
flowchart TD
Start([Nornir.run backup_config]) --> Pool["Connection Pool (up to 10 workers)"]
Pool --> T1["Task Instance 1 Device: router1"]
Pool --> T2["Task Instance 2 Device: switch1"]
Pool --> T3["Task Instance 3 Device: switch2"]
T1 --> Con1["Connect to Device"]
T2 --> Con2["Connect to Device"]
T3 --> Con3["Connect to Device"]
Con1 --> Cmd1["Send Command"]
Con2 --> Cmd2["Send Command"]
Con3 --> Cmd3["Send Command"]
Cmd1 --> Result1["Return Result for router1"]
Cmd2 --> Result2["Return Result for switch1"]
Cmd3 --> Result3["Return Result for switch2"]
Result1 --> Aggregate["Aggregate All Results"]
Result2 --> Aggregate
Result3 --> Aggregate
Aggregate --> End(["Return Combined Result Object"])
style Pool fill:#ccffcc
style T1 fill:#ffffcc
style T2 fill:#ffffcc
style T3 fill:#ffffcc
style Aggregate fill:#ccffcc
style End fill:#ccffcc
Key insight: Each task instance (T1, T2, T3) runs independently. While T1 waits for SSH, T2 and T3 are processing simultaneously.
---core:num_workers:10# How many tasks run in parallelinventory:plugin:SimpleInventory# Use file-based inventoryoptions:host_file:"inventory/hosts.yaml"group_file:"inventory/groups.yaml"defaults_file:"inventory/defaults.yaml"
Key setting:num_workers: 10 means up to 10 devices run in parallel.
"""Nornir Tasks for Configuration BackupDescription: Parallel backup of running configs from Cisco devicesAuthor: Nautomation Prime"""fromnornir.core.taskimportTask,Resultfromnornir_netmiko.tasksimportnetmiko_send_commandimportosimportlogging# Configure logginglogging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger=logging.getLogger(__name__)@taskdefbackup_running_config(task:Task)->Result:""" Backup the running configuration from a device This is a Nornir task - it runs once per device, in parallel. Args: task: Nornir task object containing device info Returns: Result object with config data and metadata """device_name=task.host.namedevice_ip=task.host.hostnamedevice_type=task.host.data.get('device_type','cisco_ios')logger.info(f"Starting backup for {device_name} ({device_ip})")try:# Send the 'show running-config' command via Netmiko (handled by nornir-netmiko)result=task.run(netmiko_send_command,command_string="show running-config",use_textfsm=False,# We want raw config text, not parsedname="Retrieve running config")config=result[0].result# Extract the config from the result# Verify we got config dataifisinstance(config,str)andlen(config)>100:logger.info(f"β {device_name}: Retrieved {len(config)} bytes")returnResult(host=task.host,result={'config':config,'size':len(config),'status':'success'})else:logger.warning(f"β {device_name}: Config seems too small or invalid")returnResult(host=task.host,result={'config':None,'size':0,'status':'invalid-data'},failed=True)exceptExceptionase:logger.error(f"β {device_name}: {str(e)}")returnResult(host=task.host,result={'config':None,'size':0,'status':f'failed: {str(e)}'},failed=True)@taskdefsave_config_to_file(task:Task,config_data:dict,backup_dir:str="configs")->Result:""" Save configuration to a timestamped file Args: task: Nornir task object config_data: Dictionary with config, size, status from previous task backup_dir: Directory to save configs Returns: Result object with file info """device_name=task.host.nametry:# Create config directory if it doesn't existos.makedirs(backup_dir,exist_ok=True)# Only save if we have valid configifconfig_data.get('status')!='success':logger.warning(f"β {device_name}: Skipping save (status: {config_data.get('status')})")returnResult(host=task.host,result={'filename':None,'path':None,'status':config_data.get('status')},failed=True)# Create safe filenamesafe_name=device_name.replace('.','-')filename=f"{safe_name}_running-config.txt"filepath=os.path.join(backup_dir,filename)# Write config to filewithopen(filepath,'w')asf:f.write(config_data['config'])file_size=os.path.getsize(filepath)logger.info(f"β {device_name}: Saved to {filepath} ({file_size:,} bytes)")returnResult(host=task.host,result={'filename':filename,'path':filepath,'size':file_size,'status':'success'})exceptExceptionase:logger.error(f"β {device_name}: File save failed - {str(e)}")returnResult(host=task.host,result={'filename':None,'path':None,'status':f'save-failed: {str(e)}'},failed=True)
"""Main script to run Nornir backup tasks"""importosimportsysimportgetpassfromdatetimeimportdatetimefromnornirimportInitNornirfromnornir_utils.plugins.functionsimportprint_resultfromtasks.backupimportbackup_running_config,save_config_to_filedefmain():"""Main function to orchestrate backup"""print("="*70)print("Nornir Multi-Device Configuration Backup")print("="*70)# Get passworddevice_password=getpass.getpass('Enter device password: ')try:# Initialize Nornir from config filenornir=InitNornir(config_file="nornir_config.yaml")# Update passwords in inventoryforhostinnornir.inventory.hosts.values():host.password=device_passwordprint(f"\nβ Loaded {len(nornir.inventory.hosts)} device(s) from inventory\n")# Create timestamped backup directorytimestamp=datetime.now().strftime('%Y%m%d_%H%M%S')backup_dir=f"configs/{timestamp}"os.makedirs(backup_dir,exist_ok=True)print(f"β Created backup directory: {backup_dir}\n")# Run backup task on all devices in parallelprint(f"{'='*70}")print("Executing parallel backups...\n")# Task 1: Retrieve configs from all devices (in parallel)backup_results=nornir.run(task=backup_running_config,name="Backup Running Configs")# Task 2: Process results and save configsprint(f"\n{'='*70}")print("Processing results and saving to files...\n")file_results=nornir.run(task=save_config_to_file,config_data={host_name:backup_results[host_name][0].resultforhost_nameinbackup_results.keys()},backup_dir=backup_dir)# Summaryprint(f"\n{'='*70}")print("BACKUP SUMMARY")print(f"{'='*70}")print(f"Backup Location: {backup_dir}")# Count successes and failuressuccessful=sum(1forresultinbackup_results.values()ifresult[0].result.get('status')=='success')failed=len(backup_results)-successfulprint(f"Successful Backups: {successful}/{len(nornir.inventory.hosts)}")print(f"Failed Backups: {failed}/{len(nornir.inventory.hosts)}")print(f"{'='*70}")# Show backed-up devicesifsuccessful>0:print("\nBacked Up Devices:")forhost_name,resultinbackup_results.items():ifresult[0].result.get('status')=='success':size=result[0].result.get('size',0)print(f" β {host_name}: {size:,} bytes")iffailed>0:print("\nFailed Devices:")forhost_name,resultinbackup_results.items():ifresult[0].result.get('status')!='success':status=result[0].result.get('status','unknown')print(f" β {host_name}: {status}")print()exceptExceptionase:print(f"β Error: {str(e)}")sys.exit(1)if__name__=="__main__":main()
---my-router1:hostname:10.1.1.1groups:-ios_devicesdata:device_type:cisco_iosmy-router2:hostname:10.1.1.2groups:-ios_devicesdata:device_type:cisco_ios# Add more devices here...
task.host.name# Device name from inventory (e.g., "router1")task.host.hostname# Device IP/DNS (e.g., "10.1.1.1")task.host.username# Device usernametask.host.password# Device passwordtask.host.data# Custom data from inventorytask.host.groups# Groups this device belongs to
Usage: Access device info like normal Python attributes.
# In main.pyforhostinnornir.inventory.hosts.values():if"ios"in[g.nameforginhost.groups]:host.password=ios_passwordelif"nxos"in[g.nameforginhost.groups]:host.password=nxos_password
# Check max open files (Linux/Mac)ulimit-n
# Typical defaults range from ~256-1024# If ulimit -n is 1024, keep num_workers <= 100# Windows: no ulimit equivalent, start with 10-20 workers and increase slowly
Solution: Reduce num_workers in nornir_config.yaml:
#!/usr/bin/env python3"""Test that Nornir can load your inventory"""fromnornirimportInitNornirnr=InitNornir(config_file="nornir_config.yaml")print(f"β Loaded {len(nr.inventory.hosts)} devices:")forhostinnr.inventory.hosts.values():print(f" - {host.name}: {host.hostname}")
#!/usr/bin/env python3"""Test SSH connectivity to each device"""fromnornirimportInitNornirfromnornir.core.taskimportTask,Result@taskdeftest_connectivity(task:Task)->Result:"""Try to connect to device, return success/failure"""try:# This triggers a connection attempttask.host.get_connection("netmiko",task.nornir.config)returnResult(host=task.host,result="β Connected")exceptExceptionase:returnResult(host=task.host,result=f"β Failed: {str(e)}",failed=True)nr=InitNornir(config_file="nornir_config.yaml")results=nr.run(task=test_connectivity)# Summarypassed=sum(1forrinresults.values()ifnotr.failed)failed=sum(1forrinresults.values()ifr.failed)print(f"\nβ Passed: {passed}/{len(results)}")iffailed>0:print(f"β Failed: {failed}/{len(results)}")print("\nFailed devices:")fordevice,resultinresults.items():ifresult.failed:print(f" - {device}: {result[device].result}")
#!/usr/bin/env python3"""Test that you can run real commands"""fromnornirimportInitNornirfromnornir.core.taskimportTask,Resultfromnornir_netmiko.tasksimportnetmiko_send_command@taskdeftest_commands(task:Task)->Result:"""Run a simple show command"""result=task.run(netmiko_send_command,command_string="show version | include Cisco")output=result[task.host.name].resultreturnResult(host=task.host,result=output)nr=InitNornir(config_file="nornir_config.yaml")results=nr.run(task=test_commands)fordevice,result_objinresults.items():output=result_obj[device].result[:50]+"..."# First 50 charsprint(f"{device}: {output}")
importos# Password from environment (safer in CI/CD)nr=InitNornir(config_file="nornir_config.yaml")# Inject credentials from secure sourceforhostinnr.inventory.hosts.values():host.password=os.environ.get("DEVICE_PASSWORD")
core:num_workers:20# Conservative, increase if needed
Rule of thumb: Start with 10, then increase in small steps while watching runtime and failures. If runtime keeps dropping without more timeouts, you can add workers. If timeouts or failures rise, you have exceeded what the network/devices can handleβback off.
Example:
10 workers: 8 min, 0 failures
20 workers: 4 min, 0 failures
30 workers: 3.5 min, 5 timeouts
Result: Use 20 workers. The speedup from 30 is small and reliability drops.
results=nr.run(backup_config)# All devices run, failures isolated# Check results individuallyfordevice,result_objinresults.items():ifresult_obj.failed:print(f"Device {device} failed, but others still completed")
Lesson: Framework-level error isolation is huge for enterprise reliability.
Gotcha 4: Logs Are Messy in Parallel EnvironmentΒΆ
@taskdefbackup_config(task:Task)->Result:# ... get config ...result=Result(host=task.host,result={'config':config})# Don't try to modify after returnresult.result['timestamp']=time.now()# Too late!
@taskdefbackup_config(task:Task)->Result:# ... get config ...# Include everything BEFORE returningresult_data={'config':config,'timestamp':time.now(),'size':len(config)}returnResult(host=task.host,result=result_data)
# Copy to .env and fill in your values# NEVER commit .env to git!DEVICE_USERNAME=admin
DEVICE_PASSWORD=your_password_here
NORNIR_NUM_WORKERS=10LOG_LEVEL=INFO
# Optional: For advanced patternsNETBOX_URL=https://netbox.example.com/api/
NETBOX_TOKEN=your_netbox_token
β Inventory Management β Organise devices in YAML
β Task Functions β Write once, Nornir runs on all devices
β Parallel Execution β Automatic parallelization, no threading headaches
β Result Aggregation β Unified results from all devices
β Error Handling β Failed devices don't stop the whole job
β Logging β Professional logging that works in parallel
β Performance β 5-20x speedup vs. sequential scripts