# ❌ ABSOLUTELY NOTdevice=ConnectHandler(device_type="cisco_ios",host="10.0.0.1",username="admin",password="SuperSecret123!"# 🚨 Now in Git history forever)
When credentials are hardcoded:
Anyone with code access can see production passwords
Git history stores them forever (even after git rm)
They appear in logs, backups, screenshots
Compliance audits fail
When leaked (and they will), you have to rotate everything
# ✅ RIGHT - Credentials come from secure vaultfromvaultimportget_secretdevice=ConnectHandler(device_type="cisco_ios",host="10.0.0.1",username=get_secret("network/devices/admin/username"),password=get_secret("network/devices/admin/password"),)
Benefits:
Code contains NO secrets
Credentials rotated without code changes
Audit trail of who accessed what
Granular permissions (this script only gets these credentials)
flowchart TD
A[Your Script]
B[Secrets Manager]
C[Your Script with credentials]
D[Network Device]
A -->|Request credentials| B
B -->|Return encrypted credentials| C
C -->|SSH connection| D
# src/credentials.pyimportosdefget_device_credentials(device_name):"""Get credentials from environment variables."""username=os.getenv(f"DEVICE_{device_name.upper()}_USERNAME")password=os.getenv(f"DEVICE_{device_name.upper()}_PASSWORD")ifnotusernameornotpassword:raiseCredentialsNotFoundError(f"Credentials for {device_name} not found in environment")return{"username":username,"password":password}
# Set environment variables (never in code)exportDEVICE_ROUTER1_USERNAME=admin
exportDEVICE_ROUTER1_PASSWORD=SecurePassword123
# Or from .env file (never committed to Git)# .env should be in .gitignore
# Install Vault CLI# On macOS: brew install vault# On Linux: https://www.vaultproject.io/downloads# Start Vault dev server (development only)vaultserver-dev
# In another terminal, loginexportVAULT_ADDR="http://127.0.0.1:8200"exportVAULT_TOKEN="s.xxxxxxxx"# From dev server output# Store a credentialvaultkvputsecret/network/devices/router1\username=admin\password="SuperSecurePassword123!"# Retrieve itvaultkvgetsecret/network/devices/router1
# src/vault_client.pyimporthvacimportosclassVaultClient:"""Wrapper for HashiCorp Vault credential retrieval."""def__init__(self,vault_addr=None,vault_token=None):""" Initialize Vault client. Args: vault_addr: Vault server URL (default: env var VAULT_ADDR) vault_token: Vault auth token (default: env var VAULT_TOKEN) """self.vault_addr=vault_addroros.getenv("VAULT_ADDR")self.vault_token=vault_tokenoros.getenv("VAULT_TOKEN")ifnotself.vault_addr:raiseValueError("VAULT_ADDR environment variable not set")ifnotself.vault_token:raiseValueError("VAULT_TOKEN environment variable not set")self.client=hvac.Client(url=self.vault_addr,token=self.vault_token)# Verify authenticationifnotself.client.is_authenticated():raiseRuntimeError("Failed to authenticate with Vault")defget_secret(self,secret_path):""" Retrieve secret from Vault. Args: secret_path: Path in Vault (e.g., 'secret/network/devices/router1') Returns: dict: Secret data (username, password, etc.) """try:response=self.client.secrets.kv.read_secret_version(path=secret_path)returnresponse['data']['data']excepthvac.exceptions.InvalidPath:raiseCredentialsNotFoundError(f"Secret not found at {secret_path}")exceptExceptionase:raiseCredentialRetrievalError(f"Failed to retrieve secret: {str(e)}")defget_device_credentials(self,device_name):""" Retrieve device credentials from Vault. Args: device_name: Device identifier (e.g., 'router1') Returns: dict: Credentials with 'username' and 'password' """secret_path=f"secret/network/devices/{device_name}"returnself.get_secret(secret_path)defrotate_secret(self,secret_path,new_data):""" Update secret in Vault (credential rotation). Args: secret_path: Path to secret new_data: New secret data (dict) """try:self.client.secrets.kv.create_or_update_secret(path=secret_path,secret=new_data)exceptExceptionase:raiseCredentialRotationError(f"Failed to rotate credential: {str(e)}")# Custom exceptionsclassCredentialsNotFoundError(Exception):passclassCredentialRetrievalError(Exception):passclassCredentialRotationError(Exception):pass
fromnetmikoimportConnectHandlerfromvault_clientimportVaultClient# Initialize Vault client (uses VAULT_ADDR and VAULT_TOKEN from env)vault=VaultClient()# Get credentials for a devicecreds=vault.get_device_credentials("router1")# Returns: {"username": "admin", "password": "SuperSecurePassword123!"}# Connect to devicedevice=ConnectHandler(device_type="cisco_ios",host="10.0.0.1",username=creds["username"],password=creds["password"])config=device.send_command("show running-config")device.disconnect()
fromnornirimportInitNornirfromnornir.core.taskimportTask,Resultfromvault_clientimportVaultClientvault=VaultClient()defget_device_credentials_task(task:Task)->Result:"""Nornir task to inject credentials from Vault."""device_name=task.host.nametry:creds=vault.get_device_credentials(device_name)# Inject credentials into Nornir inventorytask.host.username=creds["username"]task.host.password=creds["password"]returnResult(host=task.host,result=f"Credentials loaded for {device_name}")exceptExceptionase:returnResult(host=task.host,failed=True,result=f"Failed to load credentials: {str(e)}")# Usagenr=InitNornir(config_file="config.yaml")nr.run(task=get_device_credentials_task)
# src/aws_secrets.pyimportboto3importjsonclassAWSSecretsManager:"""Retrieve credentials from AWS Secrets Manager."""def__init__(self,region_name="us-east-1"):"""Initialize AWS Secrets Manager client."""self.client=boto3.client('secretsmanager',region_name=region_name)defget_secret(self,secret_name):""" Retrieve secret from AWS Secrets Manager. Args: secret_name: Name of secret (e.g., 'network/router1') Returns: dict: Parsed secret data """try:response=self.client.get_secret_value(SecretId=secret_name)# Parse JSON if stored as stringif'SecretString'inresponse:returnjson.loads(response['SecretString'])else:# Binary secretreturnresponse['SecretBinary']exceptself.client.exceptions.ResourceNotFoundException:raiseCredentialsNotFoundError(f"Secret '{secret_name}' not found in AWS")exceptExceptionase:raiseCredentialRetrievalError(str(e))defget_device_credentials(self,device_name):"""Retrieve device credentials from AWS."""secret_name=f"network/{device_name}"returnself.get_secret(secret_name)
# Store credentials in AWS Secrets Managerawssecretsmanagercreate-secret\--namenetwork/router1\--secret-string'{"username":"admin","password":"SecurePass123!"}'# Retrieveawssecretsmanagerget-secret-value--secret-idnetwork/router1
# src/encrypted_config.pyfromcryptography.fernetimportFernetimportjsonimportosclassEncryptedConfig:"""Store credentials in encrypted YAML/JSON files."""def__init__(self,encryption_key=None):""" Initialize with encryption key. Args: encryption_key: Fernet key (default: read from env var) """self.encryption_key=encryption_keyoros.getenv("ENCRYPTION_KEY")ifnotself.encryption_key:raiseValueError("ENCRYPTION_KEY environment variable not set")self.cipher=Fernet(self.encryption_key.encode())defencrypt_credentials(self,credentials_dict,output_file):""" Encrypt and save credentials. Args: credentials_dict: Dict with credentials output_file: File to save encrypted data """json_str=json.dumps(credentials_dict)encrypted=self.cipher.encrypt(json_str.encode())withopen(output_file,'wb')asf:f.write(encrypted)# Never commit encrypted file with sensitive dataprint(f"✓ Encrypted credentials saved to {output_file}")defdecrypt_credentials(self,encrypted_file):""" Decrypt and load credentials. Args: encrypted_file: File with encrypted credentials Returns: dict: Decrypted credentials """withopen(encrypted_file,'rb')asf:encrypted=f.read()decrypted=self.cipher.decrypt(encrypted)returnjson.loads(decrypted.decode())
# Generate encryption key (do this ONCE and save securely)fromcryptography.fernetimportFernetkey=Fernet.generate_key()print(key.decode())# Save this somewhere secure# Set as: export ENCRYPTION_KEY="<key>"# Encrypt credentialsfromencrypted_configimportEncryptedConfigconfig=EncryptedConfig()config.encrypt_credentials({"router1":{"username":"admin","password":"SecurePass123!"},"router2":{"username":"admin","password":"SecurePass456!"}},"credentials.enc")# Load encrypted credentialscreds=config.decrypt_credentials("credentials.enc")
# src/tacacs_credentials.pyimportparamikofromtacacs_plus.clientimportTACACSClientdefget_tacacs_credentials(username,password,device_ip):""" Authenticate with TACACS+ and get session token. Args: username: Service account username password: Service account password device_ip: Target device IP Returns: str: Session authorization token """tacacs=TACACSClient(server="10.0.0.100",# TACACS+ serverport=49,secret="shared_secret",timeout=5)# Authenticate service accountauthen=tacacs.authenticate(username=username,password=password,authen_type=TACACSClient.AUTHEN_TYPE_PAP)ifnotauthen.ok:raiseCredentialRetrievalError("TACACS+ authentication failed")# Request authorization for specific deviceauthor=tacacs.authorize(username=username,arguments=[f"device={device_ip}","service=network",])ifnotauthor.ok:raiseCredentialRetrievalError("TACACS+ authorization failed")returnauthen.session_id
# tests/test_credential_management.pyimportpytestfromunittest.mockimportMagicMock,patchfromsrc.vault_clientimportVaultClient,CredentialsNotFoundErrorclassTestVaultClient:"""Test credential retrieval without real Vault."""@patch('hvac.Client')deftest_get_secret_success(self,mock_hvac_client):"""Successfully retrieve secret from Vault."""mock_client=MagicMock()mock_client.is_authenticated.return_value=Truemock_client.secrets.kv.read_secret_version.return_value={'data':{'data':{'username':'admin','password':'secret'}}}mock_hvac_client.return_value=mock_clientvault=VaultClient(vault_addr="http://localhost:8200",vault_token="token")creds=vault.get_secret("secret/network/devices/router1")assertcreds['username']=='admin'assertcreds['password']=='secret'@patch('hvac.Client')deftest_get_secret_not_found(self,mock_hvac_client):"""Handle missing secret gracefully."""mock_client=MagicMock()mock_client.is_authenticated.return_value=Truemock_client.secrets.kv.read_secret_version.side_effect= \
MagicMock(side_effect=Exception("InvalidPath"))mock_hvac_client.return_value=mock_clientvault=VaultClient(vault_addr="http://localhost:8200",vault_token="token")withpytest.raises(CredentialsNotFoundError):vault.get_secret("secret/nonexistent")deftest_missing_vault_token_raises_error(self):"""Fail fast if Vault token not set."""withpytest.raises(ValueError,match="VAULT_TOKEN"):VaultClient(vault_addr="http://localhost:8200",vault_token=None)
# ✅ GOOD - Service account with minimal permissions# Created: 2026-03-04# Permissions: read-only on network/devices/* in Vault# Rotation: every 90 daysservice_account_token=os.getenv("VAULT_SERVICE_TOKEN")# ❌ BAD - Admin account for every scriptadmin_token="hvs.CAwEB_secret_admin_token_xyzabc"
# All credential retrievals should be loggeddefget_device_credentials(device_name):""" Retrieve credentials (with audit logging). Args: device_name: Device to get credentials for Returns: dict: Credentials """logger.info(f"Retrieved credentials",extra={"device":device_name,"user":os.getenv("USER"),"timestamp":datetime.utcnow().isoformat()})returnvault.get_device_credentials(device_name)
# ✅ GOOD - Script only gets credentials it needsvault=VaultClient()device_creds=vault.get_secret("secret/network/devices/router1")# Can only access this ONE secret# ❌ BAD - Script can access entire Vaultvault_token="hvs.CBwQv28r_FULL_ADMIN_ACCESS"