You've written automation that connects to devices, extracts data, and makes changes.
Now consider these real-world scenarios:
A network device times out mid-operation—should you retry automatically?
A configuration change fails on one of 50 devices—how do you audit what happened?
Your API is rate-limited—how do you throttle requests without rewriting logic?
You need to measure how long each network operation takes—how do you do that without cluttering your code?
Decorators are Python's answer to these problems.
Decorators let you wrap functions with cross-cutting behaviour—retry logic, logging, error handling, rate limiting—without modifying the original function. In network automation, this pattern separates your core device logic from your operational concerns.
importfunctoolsimporttimefromnetmikoimportNetmikoTimeoutException,NetmikoAuthenticationExceptiondefretry(max_attempts=3,delay=1,backoff=2,exceptions=(Exception,)):""" Retry a function with exponential backoff. Args: max_attempts: Maximum number of attempts delay: Initial delay between retries (seconds) backoff: Multiplier for exponential backoff exceptions: Tuple of exceptions to catch Example: @retry(max_attempts=3, delay=2, backoff=2) def connect_to_device(host): return connect(host) """defdecorator(func):@functools.wraps(func)defwrapper(*args,**kwargs):current_delay=delaylast_exception=Noneforattemptinrange(1,max_attempts+1):try:print(f"[{func.__name__}] Attempt {attempt}/{max_attempts}")returnfunc(*args,**kwargs)exceptexceptionsase:last_exception=eifattempt==max_attempts:print(f"[{func.__name__}] All {max_attempts} attempts failed")raiseprint(f"[{func.__name__}] Failed: {e}. Retrying in {current_delay}s...")time.sleep(current_delay)current_delay*=backoffreturnwrapperreturndecorator
fromnetmikoimportConnectHandlerfromnetmikoimportNetmikoTimeoutException@retry(max_attempts=3,delay=2,backoff=2,exceptions=(NetmikoTimeoutException,OSError))defconnect_to_device(host,username,password):"""Connect to network device with automatic retry."""device=ConnectHandler(device_type="cisco_ios",host=host,username=username,password=password,timeout=10)returndevice# Usagetry:device=connect_to_device("10.0.0.1","admin","password")output=device.send_command("show version")device.disconnect()exceptExceptionase:print(f"Connection failed after retries: {e}")
@log_audit(action="enable_interface")defenable_interface(device,interface):"""Enable a network interface."""device.send_command(f"interface {interface}")device.send_command("no shutdown")returnTrue@log_audit(action="provision_vlan")defprovision_vlan(device,vlan_id,vlan_name):"""Create and configure a VLAN."""device.send_command(f"vlan {vlan_id}")device.send_command(f"name {vlan_name}")device.send_command("exit")returnvlan_id# Usagedevice=connect_to_device("10.0.0.1","admin","password")enable_interface(device,"Gi0/0/1")provision_vlan(device,100,"Management")
# Without decorators—lots of repetitiontry:result1=operation_1()exceptSomeErrorase:log_error(e)notify_team(e)raisetry:result2=operation_2()exceptSomeErrorase:log_error(e)notify_team(e)raise
importfunctoolsimportloggingdefhandle_errors(default_return=None,notify=True,reraise=True):""" Unified error handling for network operations. Args: default_return: What to return if error occurs (if reraise=False) notify: Whether to notify critical errors reraise: Whether to re-raise the exception after handling Example: @handle_errors(notify=True, reraise=True) def deploy_config(device, config): ... """defdecorator(func):@functools.wraps(func)defwrapper(*args,**kwargs):try:returnfunc(*args,**kwargs)exceptExceptionase:error_msg=f"Error in {func.__name__}: {str(e)}"logging.error(error_msg)ifnotify:# Send critical alert to operations teamsend_alert(f"AUTOMATION FAILURE: {error_msg}")ifnotreraise:returndefault_returnraisereturnwrapperreturndecoratordefsend_alert(message):"""Placeholder for actual alerting (email, Slack, PagerDuty, etc.)"""print(f"[ALERT] {message}")
importfunctoolsimporttimefromcollectionsimportdequefromthreadingimportLockclassRateLimiter:"""Thread-safe rate limiter using sliding window."""def__init__(self,max_calls,time_window):""" Args: max_calls: Maximum number of calls allowed time_window: Time window in seconds """self.max_calls=max_callsself.time_window=time_windowself.calls=deque()self.lock=Lock()def__call__(self,func):@functools.wraps(func)defwrapper(*args,**kwargs):withself.lock:now=time.time()# Remove calls outside the time windowwhileself.callsandself.calls[0]<=now-self.time_window:self.calls.popleft()# If we've hit the limit, waitiflen(self.calls)>=self.max_calls:sleep_time=self.time_window-(now-self.calls[0])print(f"Rate limit reached. Waiting {sleep_time:.2f}s...")time.sleep(sleep_time)self.calls.append(time.time())returnfunc(*args,**kwargs)returnwrapper# Create rate limiters for different APIsdns_limiter=RateLimiter(max_calls=100,time_window=60)api_limiter=RateLimiter(max_calls=50,time_window=10)
# 100 requests per minute to DNS API@dns_limiterdefquery_dns_api(hostname):"""Query external DNS API."""# API call herereturnresolve_hostname(hostname)# 50 requests per 10 seconds to device API@api_limiterdeffetch_device_data(device_id):"""Fetch data from rate-limited device API."""# API call herereturnget_device_stats(device_id)# Usagedevices=["device1","device2",...,"device200"]# 200 devicesfordevice_idindevices:stats=fetch_device_data(device_id)# Automatically rate-limitedprint(stats)# Completes in 40+ seconds (respecting 50 req/10sec limit), # not crashing with rate limit errors
@perf.monitordefcollect_show_version(device):"""Collect show version from device."""returndevice.send_command("show version")@perf.monitordefcollect_show_interfaces(device):"""Collect interface status from device."""returndevice.send_command("show interfaces brief")@perf.monitordefparse_and_store(data):"""Parse device data and store in database."""parsed=parse_device_output(data)store_in_db(parsed)returnparsed# Usagedevices=[connect_to_device(ip)foripindevice_ips]fordeviceindevices:version=collect_show_version(device)interfaces=collect_show_interfaces(device)parse_and_store(interfaces)# Print performance reportperf.report()
# All the previous decorators: @retry, @log_audit, @handle_errors, @measure_performance@retry(max_attempts=3,delay=2)@log_audit(action="configure_device")@handle_errors(notify=True,reraise=True)@perf.monitordefconfigure_device(device,config):""" Pure business logic focused on the network operation. All infrastructure concerns are handled by decorators: - Retry: Automatic retry on failure - Log audit: Compliance logging - Handle errors: Unified error handling and alerting - Performance: Execution time tracking """device.send_command("configure terminal")forcommandinconfig:device.send_command(command)device.send_command("end")returnTrue
# GOOD: Retry is the outermost (catches all failures)@retry(max_attempts=3)@log_audit(action="deploy")defdeploy_config(device,config):pass# LESS IDEAL: Audit logging wraps retry@log_audit(action="deploy")@retry(max_attempts=3)defdeploy_config(device,config):pass# This still works, but retry failures aren't logged as "final" failures
General principle: Put decorators that handle exceptional cases (retry, error handling) on the outside, and decorators that monitor normal execution (logging, performance) on the inside.
# Rate limit API calls per device (10 sec between each device)device_api_limiter=RateLimiter(max_calls=1,time_window=10)@device_api_limiter@retry(max_attempts=2)defquery_device_api(task:Task)->Result:"""Query device API with rate limiting (one per 10 seconds)."""# This automatically throttles: process 100 devices in 1000 secondsapi_response=task.host["api_endpoint"].query_status()returnResult(host=task.host,result=api_response)# This scales safely—no rate limit errorsnr.run(task=query_device_api)
importfunctoolsimportosdefconditional_retry(condition,**retry_kwargs):"""Apply retry only if condition is True."""defdecorator(func):@functools.wraps(func)defwrapper(*args,**kwargs):ifcondition:# Apply retry decoratorreturnretry(**retry_kwargs)(func)(*args,**kwargs)else:# Skip decoratorreturnfunc(*args,**kwargs)returnwrapperreturndecorator# Use environment variable to control behaviourPRODUCTION=os.getenv("ENV")=="production"@conditional_retry(condition=PRODUCTION,max_attempts=3,delay=2)defconfigure_device(device,config):"""In production: retry 3 times. Otherwise: fail fast."""pass
importosPRODUCTION=os.getenv("ENV")=="production"DRY_RUN=os.getenv("DRY_RUN","false").lower()=="true"defconditional_decorator(condition=True,decorator_func=None):"""Conditionally apply a decorator."""defwrapper(func):ifcondition:returndecorator_func(func)returnfuncreturnwrapper@conditional_decorator(condition=PRODUCTION,decorator_func=lambdaf:retry(max_attempts=3)(f))@conditional_decorator(condition=PRODUCTION,decorator_func=lambdaf:log_audit(action="deploy")(f))defdeploy_config(device,config):""" - In production: auto-retry, audit log, and execute - In testing: fail fast, no logging, execute """ifDRY_RUN:print(f"[DRY RUN] Would execute: {config}")returnTrue# Actual deploymentdevice.apply_config(config)returnTrue
defretry(max_attempts=3,delay=1,backoff=2,exceptions=(Exception,)):""" Retry a function with exponential backoff. Args: max_attempts (int): Maximum number of attempts. Default: 3 delay (float): Initial delay between retries in seconds. Default: 1 backoff (float): Multiplier for exponential backoff. Default: 2 exceptions (tuple): Exception types to catch and retry on. Default: (Exception,) Raises: The last caught exception if all retries fail. Example: @retry(max_attempts=3, delay=2, backoff=1.5) def connect_to_device(host): return device_connection(host) """
# General rule: exceptional cases on the outside, observability on the inside# ✅ GOOD@retry(max_attempts=3)# Exception handler (outermost)@log_audit(action="deploy")# Logging (middle)@perf.monitor# Observability (innermost)defdeploy():pass# This order makes sense:# 1. Try the operation (retry wrapper is outermost)# 2. Log what happened# 3. Measure how long it took
importfunctoolsimportloggingimporttimefromtypingimportCallable,Tuple,Type,Anydefproduction_retry(max_attempts:int=3,delay:float=1,backoff:float=2,exceptions:Tuple[Type[Exception],...]=(Exception,),logger:logging.Logger=None):""" Production-grade retry decorator with logging and metrics. Args: max_attempts: Maximum number of attempts delay: Initial delay between retries backoff: Exponential backoff multiplier exceptions: Tuple of exceptions to retry on logger: Logger instance for recording retries (uses module logger if None) """ifloggerisNone:logger=logging.getLogger(__name__)defdecorator(func:Callable)->Callable:@functools.wraps(func)defwrapper(*args,**kwargs)->Any:current_delay=delaylast_exception=Noneforattemptinrange(1,max_attempts+1):try:logger.debug(f"Attempt {attempt}/{max_attempts} for {func.__name__}")returnfunc(*args,**kwargs)exceptexceptionsase:last_exception=eifattempt==max_attempts:logger.error(f"Failed after {max_attempts} attempts: {func.__name__}",exc_info=True)raiselogger.warning(f"Attempt {attempt} failed: {str(e)}. "f"Retrying in {current_delay}s...")time.sleep(current_delay)current_delay*=backoffreturnwrapperreturndecorator
# ❌ Catches Exception but device raises Timeout (subclass)@retry(exceptions=(Exception,))defconnect():pass# ✅ Catch the specific exceptionfromnetmikoimportNetmikoTimeoutException@retry(exceptions=(NetmikoTimeoutException,))defconnect():pass
# This:@decorator_a@decorator_b@decorator_cdeffunc():pass# Is equivalent to:func=decorator_a(decorator_b(decorator_c(func)))# So decorator_c runs first (closest to func), then decorator_b, then decorator_a# If you want decorator_a to run first, put it closest to the function
# ❌ Overhead for simple, fast operations@log_audit(action="send_data")@perf.monitor@retry(max_attempts=3)defsend_packet(data):returnsocket.send(data)# Takes 1ms# For 100,000 calls, decorator overhead becomes significant# ✅ Use conditional decorators for high-frequency operations@conditional_retry(condition=nothigh_frequency_mode,max_attempts=3)@conditional_decorator(condition=nothigh_frequency_mode,decorator_func=perf.monitor)defsend_packet(data):returnsocket.send(data)