# src/circuit_breaker.pyfromenumimportEnumfromdatetimeimportdatetime,timedeltaimporttimefromthreadingimportLockclassCircuitState(Enum):"""Circuit breaker state."""CLOSED="closed"# Normal operationOPEN="open"# Failing, reject requestsHALF_OPEN="half_open"# Testing if recoveredclassCircuitBreakerConfig:"""Circuit breaker configuration."""def__init__(self,failure_threshold:int=5,success_threshold:int=2,timeout:int=60,name:str="circuit_breaker"):self.failure_threshold=failure_thresholdself.success_threshold=success_thresholdself.timeout=timeoutself.name=nameclassCircuitBreaker:""" Circuit breaker for preventing cascade failures. Usage: breaker = CircuitBreaker(failure_threshold=5) for device in devices: try: result = breaker.call(configure_device, device) except CircuitBreakerOpen: print(f"Skipping {device} - circuit open") """def__init__(self,config:CircuitBreakerConfig):self.config=configself.state=CircuitState.CLOSEDself.failure_count=0self.success_count=0self.last_failure_time=Noneself.lock=Lock()defcall(self,func,*args,**kwargs):""" Execute function through circuit breaker. Raises: CircuitBreakerOpen: If circuit is open """withself.lock:ifself.state==CircuitState.OPEN:ifself._should_attempt_reset():self.state=CircuitState.HALF_OPENself.success_count=0else:raiseCircuitBreakerOpen(f"Circuit breaker '{self.config.name}' is OPEN")try:result=func(*args,**kwargs)self._record_success()returnresultexceptExceptionase:self._record_failure()raisedef_record_success(self):"""Record successful call."""withself.lock:self.failure_count=0ifself.state==CircuitState.HALF_OPEN:self.success_count+=1ifself.success_count>=self.config.success_threshold:self.state=CircuitState.CLOSEDprint(f"✓ Circuit '{self.config.name}' closed (recovered)")def_record_failure(self):"""Record failed call."""withself.lock:self.failure_count+=1self.last_failure_time=datetime.utcnow()self.success_count=0ifself.failure_count>=self.config.failure_threshold:self.state=CircuitState.OPENprint(f"✗ Circuit '{self.config.name}' opened (threshold hit)")def_should_attempt_reset(self)->bool:"""Check if enough time has passed to try resetting."""ifnotself.last_failure_time:returnTrueelapsed=(datetime.utcnow()-self.last_failure_time).total_seconds()returnelapsed>=self.config.timeoutdefget_state(self)->dict:"""Get circuit breaker state."""withself.lock:return{"name":self.config.name,"state":self.state.value,"failures":self.failure_count,"successes":self.success_count,"last_failure":self.last_failure_time.isoformat()ifself.last_failure_timeelseNone}classCircuitBreakerOpen(Exception):"""Raised when circuit breaker is open."""pass
# ✅ GOOD - Limited concurrencyfromconcurrent.futuresimportThreadPoolExecutorexecutor=ThreadPoolExecutor(max_workers=10)# Only 10 at a timefutures=[]fordeviceindevices:future=executor.submit(configure_device,device)futures.append(future)# This automatically queues requests when 10 are running
# src/backpressure_manager.pyfromconcurrent.futuresimportThreadPoolExecutor,as_completedfromcircuit_breakerimportCircuitBreaker,CircuitBreakerOpenclassBackpressureManager:"""Manage concurrency with circuit breaker protection."""def__init__(self,max_workers:int=10,circuit_breaker:CircuitBreaker=None):self.max_workers=max_workersself.breaker=circuit_breakerself.executor=ThreadPoolExecutor(max_workers=max_workers)self.results={"success":[],"failed":[],"circuit_open":[]}defexecute(self,tasks:list)->dict:""" Execute tasks with backpressure and circuit breaker. Args: tasks: List of (func, args, kwargs) tuples Returns: dict: Results organized by outcome """futures={}# Submit all tasks (executor will queue them)fortask_id,(func,args,kwargs)inenumerate(tasks):ifself.breaker:# Wrap function with circuit breakerfuture=self.executor.submit(self._execute_with_breaker,func,args,kwargs,task_id)else:future=self.executor.submit(func,*args,**kwargs)futures[future]=task_id# Process completions as they arriveforfutureinas_completed(futures):task_id=futures[future]try:result=future.result()self.results["success"].append({"task_id":task_id,"result":result})exceptCircuitBreakerOpen:self.results["circuit_open"].append(task_id)exceptExceptionase:self.results["failed"].append({"task_id":task_id,"error":str(e)})returnself.resultsdef_execute_with_breaker(self,func,args,kwargs,task_id):"""Execute function through circuit breaker."""returnself.breaker.call(func,*args,**kwargs)
devices=["r1","r2",...,"r100"]tasks=[(configure_device,(device,),{})fordeviceindevices]breaker=CircuitBreaker(config=CircuitBreakerConfig(failure_threshold=5,name="device_config"))manager=BackpressureManager(max_workers=10,# Only 10 SSH connections at a timecircuit_breaker=breaker)results=manager.execute(tasks)print(f"✓ Success: {len(results['success'])}")print(f"✗ Failed: {len(results['failed'])}")print(f"⏭ Circuit open: {len(results['circuit_open'])}")
# src/graceful_degradation.pyclassDeploymentStrategy:"""Deployment strategy with graceful degradation."""@staticmethoddefincremental_deploy(devices:list,deploy_func,max_failures:int=5,batch_size:int=10)->dict:""" Deploy incrementally, stopping if failure rate is too high. Args: devices: List of devices deploy_func: Function to deploy to device max_failures: Stop if failures exceed this batch_size: Deploy in batches """results={"total":len(devices),"success":0,"failed":0,"completed_batches":0,"failures_detail":[]}# Deploy in batchesforbatch_num,iinenumerate(range(0,len(devices),batch_size)):batch=devices[i:i+batch_size]print(f"\nBatch {batch_num+1}: Deploying {len(batch)} devices...")fordeviceinbatch:try:deploy_func(device)results["success"]+=1exceptExceptionase:results["failed"]+=1results["failures_detail"].append({"device":device,"error":str(e)})# Stop if failures exceed thresholdfailure_rate=results["failed"]/(results["success"]+results["failed"])ifresults["failed"]>=max_failures:print(f"✗ Failure threshold ({max_failures}) reached, stopping deployment")returnresultsiffailure_rate>0.5:print(f"✗ Failure rate >50%, stopping deployment")returnresultsresults["completed_batches"]+=1# Summary after batchprint(f" ✓ Success: {results['success']}")print(f" ✗ Failed: {results['failed']}")returnresults
results=DeploymentStrategy.incremental_deploy(devices=all_devices,deploy_func=safe_deploy_to_device,max_failures=3,batch_size=10)ifresults["failed"]<3:print("✓ Deployment acceptable, continuing...")else:print("✗ Too many failures, please investigate before retrying")
# ✅ GOOD - Configured for your environmentconfig=CircuitBreakerConfig(failure_threshold=5,# Open after 5 successive failuressuccess_threshold=3,# Close after 3 successestimeout=300,# 5 minute retry period)# ❌ BAD - Magic numbersconfig=CircuitBreakerConfig(failure_threshold=1)# Too aggressive
# ✅ GOOD - Know when circuits openstate=circuit_breaker.get_state()ifstate["state"]=="open":logger.warning(f"Circuit {state['name']} is OPEN - investigate!")
deftest_circuit_opens_on_threshold(mock_device):"""Verify circuit breaker protection works."""breaker=CircuitBreaker(config=CircuitBreakerConfig(failure_threshold=3))mock_device.send_command.side_effect=Exception("Timeout")# Fail 3 timesfor_inrange(3):try:breaker.call(configure_device,mock_device)except:pass# Fourth call should hit open circuitwithpytest.raises(CircuitBreakerOpen):breaker.call(configure_device,mock_device)