# src/task_orchestrator.pyfromdataclassesimportdataclassfromtypingimportDict,List,Set,CallablefromenumimportEnumclassTaskStatus(Enum):"""Task execution status."""PENDING="pending"RUNNING="running"SUCCESS="success"FAILED="failed"SKIPPED="skipped"@dataclassclassTask:"""Represents a single task in the workflow."""name:strfunc:Callableargs:tuple=()kwargs:dict=Nonedependencies:List[str]=Nonestatus:TaskStatus=TaskStatus.PENDINGresult:any=Noneerror:str=Nonedef__post_init__(self):ifself.kwargsisNone:self.kwargs={}ifself.dependenciesisNone:self.dependencies=[]classTaskOrchestrator:"""Execute tasks in dependency order."""def__init__(self):self.tasks:Dict[str,Task]={}self.execution_order=[]defadd_task(self,name:str,func:Callable,dependencies:List[str]=None,**kwargs):""" Add task to DAG. Args: name: Task name func: Function to execute dependencies: List of task names this depends on **kwargs: Arguments to pass to function """task=Task(name=name,func=func,kwargs=kwargs,dependencies=dependenciesor[])self.tasks[name]=taskdefvalidate_dag(self)->bool:"""Check for cycles in dependency graph."""visited=set()rec_stack=set()defhas_cycle(node):visited.add(node)rec_stack.add(node)fordepinself.tasks[node].dependencies:ifdepnotinvisited:ifhas_cycle(dep):returnTrueelifdepinrec_stack:returnTruerec_stack.remove(node)returnFalsefortask_nameinself.tasks:iftask_namenotinvisited:ifhas_cycle(task_name):returnFalsereturnTruedeftopological_sort(self)->List[str]:""" Calculate execution order using topological sort. Returns: List of task names in execution order """ifnotself.validate_dag():raiseValueError("DAG contains cycle - impossible to order")visited=set()order=[]defvisit(node):ifnodeinvisited:returnvisited.add(node)# Visit dependencies firstfordepinself.tasks[node].dependencies:visit(dep)order.append(node)fortask_nameinself.tasks:visit(task_name)returnorderdefexecute(self)->Dict[str,any]:""" Execute all tasks in dependency order. Returns: dict: Results for each task """execution_order=self.topological_sort()results={}print(f"Execution order: {' → '.join(execution_order)}\n")fortask_nameinexecution_order:task=self.tasks[task_name]# Check if any dependencies faileddeps_failed=[depfordepintask.dependenciesifself.tasks[dep].status==TaskStatus.FAILED]ifdeps_failed:task.status=TaskStatus.SKIPPEDprint(f"⏭ Skipping '{task_name}' (depends on failed: {deps_failed})")continuetask.status=TaskStatus.RUNNINGprint(f"▶ Running '{task_name}'...")try:task.result=task.func(**task.kwargs)task.status=TaskStatus.SUCCESSprint(f"✓ '{task_name}' completed\n")exceptExceptionase:task.status=TaskStatus.FAILEDtask.error=str(e)print(f"✗ '{task_name}' failed: {str(e)}\n")results[task_name]={"status":task.status.value,"result":task.result,"error":task.error}returnresultsdefget_dependency_graph(self)->dict:"""Get human-readable dependency graph."""return{name:task.dependenciesforname,taskinself.tasks.items()}
# src/nornir_orchestration.pyfromnornirimportInitNornirfromnornir.core.taskimportTask,Resultdeforchestrate_multisite_deployment():"""Orchestrate deployment across multiple sites."""# Load inventorynr=InitNornir(config_file="config.yaml")# Define sites and dependenciessites={"hub1":{"devices":["hub1-router","hub1-switch"],"depends_on":[]},"site1":{"devices":["site1-router"],"depends_on":["hub1"]},"site2":{"devices":["site2-router"],"depends_on":["hub1"]}}deployed_sites=set()results={}# Deploy sites in dependency orderforsite,configinsites.items():# Wait for dependenciesifnotall(depindeployed_sitesfordepinconfig["depends_on"]):print(f"⏳ Waiting for dependencies for {site}...")continueprint(f"\nDeploying site: {site}")# Filter inventory to this site's devicessite_inventory=nr.filter(name__in=config["devices"])# Deploy to sitesite_results=site_inventory.run(task=deploy_to_device)# Check resultsifall(notr[0].failedforrinsite_results.values()):deployed_sites.add(site)results[site]="success"print(f"✓ {site} deployment successful")else:results[site]="failed"print(f"✗ {site} deployment failed")returnresults
# src/conditional_execution.pyclassConditionalTask:"""Execute task only if condition is met."""def__init__(self,name,func,condition=None,dependencies=None):self.name=nameself.func=funcself.condition=conditionor(lambda:True)self.dependencies=dependenciesor[]defshould_execute(self)->bool:"""Determine if this task should run."""returnself.condition()# Usagetasks=[ConditionalTask("update_hub",update_hub_config,condition=lambda:check_if_hub_needs_update()),ConditionalTask("rollback_hub",rollback_hub_config,condition=lambda:hub_deployment_failed(),dependencies=["update_hub"])]# Only run tasks where condition is Truefortaskintasks:iftask.should_execute():task.func()
# ✅ GOOD - Dependencies skip if upstream failsifhub_setup.failed:skip_all_spoke_deployments()# ❌ BAD - Attempt spoke setup despite hub failuredeploy_spokes()# Will fail anyway, wastes time
# ✅ GOOD - Validate before executingorchestrator=TaskOrchestrator()ifnotorchestrator.validate_dag():raiseValueError("Circular dependency detected!")# ❌ BAD - Discover at runtimeorchestrator.execute()# Hangs waiting for circular dependency