Skip to content

Secure Credential Vaulting

Secure Credential Vaulting for Network Automation: HashiCorp Vault, AWS Secrets Manager, and Beyond

Published: March 1, 2026
Author: Nautomation Prime Team

Why This Tutorial Exists

Hardcoded credentials are a top security risk in network automation. This comprehensive tutorial shows how to use enterprise-grade secrets managers with Python, integrating with Nornir, Ansible, and CI/CD pipelines. Aligned with the PRIME Framework for safety and compliance.


Prerequisites

  • Advanced Python (3.8+)
  • Understanding of authentication and authorization models
  • Experience with environment variables and secure API calls
  • Familiarity with network security concepts (SSH keys, OAuth, mTLS)

Why Use a Secrets Manager?

Feature Hardcoded Environment Vars Secrets Manager
Centralized โŒ โŒ โœ…
Auditable โŒ โŒ โœ…
Rotation โŒ Manual โœ… Automatic
Dynamic Secrets โŒ โŒ โœ…
RBAC โŒ โŒ โœ…
Encryption โŒ โš ๏ธ โœ…
Compliance Ready โŒ โŒ โœ…

Architecture: Multi-Tier Vault Strategy

Application Layer
    โ†“
Vault Manager (Abstract Interface)
    โ”œโ”€โ”€ HashiCorp Vault (On-Prem, High Security)
    โ”œโ”€โ”€ AWS Secrets Manager (Cloud-Native)
    โ”œโ”€โ”€ Azure Key Vault (Azure-Integrated)
    โ””โ”€โ”€ Local Keyring (Development)
    โ†“
Local Cache (with TTL)
    โ†“
Network Devices

Comprehensive Vault Manager Implementation

Multi-backend secrets management with abstraction layer:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
import os
import json
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import hashlib
import hmac
import base64
from pathlib import Path

logger = logging.getLogger(__name__)

class VaultBackend(Enum):
    """Supported vault backend types"""
    HASHICORP = "hashicorp_vault"
    AWS = "aws_secrets_manager"
    AZURE = "azure_keyvault"
    KEYRING = "local_keyring"

@dataclass
class CacheEntry:
    """Cached credential entry with TTL"""
    secret: Dict[str, Any]
    expiry: datetime

    def is_expired(self) -> bool:
        """Check if cache entry has expired"""
        return datetime.now() >= self.expiry

class BaseVaultBackend(ABC):
    """Abstract base for vault backends"""

    @abstractmethod
    async def get_secret(self, path: str, key: Optional[str] = None) -> Any:
        """Retrieve secret from vault"""
        pass

    @abstractmethod
    async def put_secret(self, path: str, secret: Dict[str, Any]) -> None:
        """Store secret in vault"""
        pass

    @abstractmethod
    async def rotate_secret(self, path: str) -> None:
        """Rotate secret (if supported)"""
        pass

    @abstractmethod
    async def delete_secret(self, path: str) -> None:
        """Delete secret from vault"""
        pass

class HashiCorpVaultBackend(BaseVaultBackend):
    """HashiCorp Vault backend implementation"""

    def __init__(self, vault_addr: str, vault_token: str, verify_ssl: bool = True):
        self.vault_addr = vault_addr
        self.vault_token = vault_token
        self.verify_ssl = verify_ssl
        self.client = None
        self._init_client()

    def _init_client(self):
        """Initialize Vault client"""
        import hvac

        self.client = hvac.Client(
            url=self.vault_addr,
            token=self.vault_token,
            verify=self.verify_ssl
        )

        try:
            self.client.auth.token.lookup_self()
            logger.info(f"Connected to Vault at {self.vault_addr}")
        except Exception as e:
            logger.error(f"Failed to connect to Vault: {e}")
            raise

    async def get_secret(self, path: str, key: Optional[str] = None) -> Any:
        """
        Retrieve secret from Vault KV2 engine

        Args:
            path: Path to secret (e.g., "network/credentials/router1")
            key: Optional specific key within secret

        Returns:
            Secret value or full secret dict
        """
        try:
            secret_response = self.client.secrets.kv.v2.read_secret_version(path=path)
            secret_data = secret_response['data']['data']

            logger.info(
                f"Retrieved secret from Vault: {path}",
                extra={'vault_path': path}
            )

            if key:
                return secret_data.get(key)
            return secret_data

        except Exception as e:
            logger.error(f"Failed to retrieve secret from Vault: {e}")
            raise

    async def put_secret(self, path: str, secret: Dict[str, Any]) -> None:
        """Store secret in Vault"""
        try:
            self.client.secrets.kv.v2.create_or_update_secret(
                path=path,
                secret_data=secret
            )
            logger.info(f"Stored secret in Vault: {path}")

        except Exception as e:
            logger.error(f"Failed to store secret in Vault: {e}")
            raise

    async def rotate_secret(self, path: str) -> None:
        """
        Implement secret rotation:
        1. Generate new secret
        2. Store in Vault
        3. Update policy version tracking
        """
        try:
            # Read current secret metadata
            metadata = self.client.secrets.kv.v2.read_secret_metadata(path=path)

            # Create new version (automatic with create_or_update)
            new_secret = {
                'rotated_at': datetime.now().isoformat(),
                'rotation_count': metadata['data'].get('custom_metadata', {}).get('rotation_count', 0) + 1
            }

            await self.put_secret(path, new_secret)
            logger.info(f"Rotated secret: {path}")

        except Exception as e:
            logger.error(f"Failed to rotate secret: {e}")
            raise

    async def delete_secret(self, path: str) -> None:
        """Delete secret from Vault"""
        try:
            self.client.secrets.kv.v2.delete_secret_version_by_path(path=path)
            logger.info(f"Deleted secret: {path}")

        except Exception as e:
            logger.error(f"Failed to delete secret: {e}")
            raise

class AWSSecretsManagerBackend(BaseVaultBackend):
    """AWS Secrets Manager backend implementation"""

    def __init__(self, region: str = "us-east-1"):
        self.region = region
        self.client = None
        self._init_client()

    def _init_client(self):
        """Initialize AWS Secrets Manager client"""
        import boto3

        self.client = boto3.client('secretsmanager', region_name=self.region)
        logger.info(f"Connected to AWS Secrets Manager in {self.region}")

    async def get_secret(self, path: str, key: Optional[str] = None) -> Any:
        """
        Retrieve secret from AWS Secrets Manager

        Args:
            path: Secret name (e.g., "network/router1/credentials")
            key: Optional JSON key within secret string
        """
        try:
            response = self.client.get_secret_value(SecretId=path)

            if 'SecretString' in response:
                secret_data = json.loads(response['SecretString'])
            else:
                secret_data = base64.b64decode(response['SecretBinary']).decode('utf-8')

            logger.info(f"Retrieved secret from AWS: {path}")

            if key and isinstance(secret_data, dict):
                return secret_data.get(key)
            return secret_data

        except Exception as e:
            logger.error(f"Failed to retrieve secret from AWS: {e}")
            raise

    async def put_secret(self, path: str, secret: Dict[str, Any]) -> None:
        """Store/update secret in AWS Secrets Manager"""
        try:
            self.client.put_secret_value(
                SecretId=path,
                SecretString=json.dumps(secret)
            )
            logger.info(f"Stored secret in AWS: {path}")

        except self.client.exceptions.ResourceNotFoundException:
            # Create if doesn't exist
            self.client.create_secret(
                Name=path,
                SecretString=json.dumps(secret)
            )
            logger.info(f"Created new secret in AWS: {path}")

        except Exception as e:
            logger.error(f"Failed to store secret in AWS: {e}")
            raise

    async def rotate_secret(self, path: str) -> None:
        """
        Enable/trigger rotation in AWS Secrets Manager
        AWS handles actual rotation with Lambda functions
        """
        try:
            # Update secret with rotation flag
            secret = await self.get_secret(path)
            secret['last_rotation'] = datetime.now().isoformat()
            await self.put_secret(path, secret)

            logger.info(f"Marked secret for rotation: {path}")

        except Exception as e:
            logger.error(f"Failed to initiate rotation: {e}")
            raise

    async def delete_secret(self, path: str) -> None:
        """Delete secret from AWS"""
        try:
            self.client.delete_secret(
                SecretId=path,
                ForceDeleteWithoutRecovery=False  # 7-day recovery window
            )
            logger.info(f"Deleted secret: {path}")

        except Exception as e:
            logger.error(f"Failed to delete secret: {e}")
            raise

class VaultManager:
    """
    Central credential manager with:
    - Multi-backend support (Vault, AWS, Azure, Keyring)
    - Local caching with TTL
    - Automatic secret rotation
    - Comprehensive audit logging
    - Circuit breaker for vault failures
    """

    def __init__(
        self,
        backend_type: VaultBackend = VaultBackend.HASHICORP,
        cache_ttl: int = 3600,
        enable_audit: bool = True,
        **backend_kwargs
    ):
        """
        Args:
            backend_type: Which vault backend to use
            cache_ttl: Cache time-to-live in seconds
            enable_audit: Enable audit logging for all operations
            **backend_kwargs: Credentials for vault backend
        """
        self.backend_type = backend_type
        self.cache_ttl = cache_ttl
        self.enable_audit = enable_audit
        self.cache: Dict[str, CacheEntry] = {}
        self.audit_log: List[Dict[str, Any]] = []

        # Initialize backend
        self.backend = self._init_backend(backend_type, **backend_kwargs)

    def _init_backend(self, backend_type: VaultBackend, **kwargs) -> BaseVaultBackend:
        """Initialize appropriate vault backend"""
        if backend_type == VaultBackend.HASHICORP:
            return HashiCorpVaultBackend(
                vault_addr=os.environ.get('VAULT_ADDR', kwargs.get('vault_addr')),
                vault_token=os.environ.get('VAULT_TOKEN', kwargs.get('vault_token')),
                verify_ssl=kwargs.get('verify_ssl', True)
            )

        elif backend_type == VaultBackend.AWS:
            return AWSSecretsManagerBackend(
                region=os.environ.get('AWS_REGION', kwargs.get('region', 'us-east-1'))
            )

        else:
            raise ValueError(f"Unsupported backend: {backend_type}")

    async def get_secret(self, path: str, key: Optional[str] = None) -> Any:
        """
        Retrieve secret with caching

        Args:
            path: Path to secret
            key: Optional specific key within secret

        Returns:
            Secret value
        """
        # Check cache
        cache_key = f"{path}:{key}" if key else path
        if cache_key in self.cache and not self.cache[cache_key].is_expired():
            logger.debug(f"Cache hit: {cache_key}")
            self._audit_log('cache_hit', cache_key)
            return self.cache[cache_key].secret

        # Fetch from vault
        try:
            secret = await self.backend.get_secret(path, key)

            # Cache result
            self.cache[cache_key] = CacheEntry(
                secret=secret,
                expiry=datetime.now() + timedelta(seconds=self.cache_ttl)
            )

            self._audit_log('secret_retrieved', path, {'key': key})

            return secret

        except Exception as e:
            logger.error(f"Failed to retrieve secret: {e}")
            self._audit_log('secret_retrieval_failed', path, {'error': str(e)})
            raise

    async def put_secret(self, path: str, secret: Dict[str, Any]) -> None:
        """Store secret in vault and invalidate cache"""
        try:
            await self.backend.put_secret(path, secret)

            # Invalidate cache
            for key in list(self.cache.keys()):
                if key.startswith(path):
                    del self.cache[key]

            self._audit_log('secret_stored', path)

        except Exception as e:
            logger.error(f"Failed to store secret: {e}")
            self._audit_log('secret_storage_failed', path, {'error': str(e)})
            raise

    async def rotate_secret(self, path: str) -> None:
        """Rotate secret with audit trail"""
        try:
            await self.backend.rotate_secret(path)

            # Invalidate cache
            self.cache.pop(path, None)

            self._audit_log('secret_rotated', path)
            logger.info(f"Secret rotated: {path}")

        except Exception as e:
            logger.error(f"Failed to rotate secret: {e}")
            self._audit_log('secret_rotation_failed', path, {'error': str(e)})
            raise

    async def delete_secret(self, path: str) -> None:
        """Delete secret and clear cache"""
        try:
            await self.backend.delete_secret(path)
            self.cache.pop(path, None)
            self._audit_log('secret_deleted', path)

        except Exception as e:
            logger.error(f"Failed to delete secret: {e}")
            self._audit_log('secret_deletion_failed', path, {'error': str(e)})
            raise

    def _audit_log(self, action: str, path: str, details: Optional[Dict] = None) -> None:
        """Log all secret operations for compliance"""
        if not self.enable_audit:
            return

        entry = {
            'timestamp': datetime.now().isoformat(),
            'action': action,
            'path': path,
            'user': os.environ.get('USER', 'unknown'),
            'backend': self.backend_type.value,
            'details': details or {}
        }

        self.audit_log.append(entry)
        logger.info(f"Audit: {action} on {path}")

    def export_audit_log(self, filepath: str) -> None:
        """Export audit log for compliance"""
        with open(filepath, 'w') as f:
            json.dump(self.audit_log, f, indent=2)
        logger.info(f"Audit log exported to {filepath}")

Integration with Nornir

Dynamic credential injection with vault manager:

from nornir import InitNornir
from nornir.core.task import Task, Result
from nornir.core.inventory import Host

async def set_credentials_from_vault(task: Task) -> Result:
    """
    Nornir task: Inject credentials from vault
    """
    vault_manager = task.host.get('vault_manager')

    if not vault_manager:
        return Result(host=task.host, failed=True, result="No vault manager configured")

    try:
        # Retrieve credentials
        device_creds = await vault_manager.get_secret(
            f"network/credentials/{task.host.name}",
            key='all'
        )

        # Update host credentials
        task.host.username = device_creds.get('username')
        task.host.password = device_creds.get('password')

        return Result(host=task.host, result="Credentials injected from vault")

    except Exception as e:
        return Result(host=task.host, failed=True, result=f"Failed to inject credentials: {e}")

# Usage in nornir config
nr = InitNornir(config_file="config.yaml")

# Initialize vault manager
vault_manager = VaultManager(
    backend_type=VaultBackend.HASHICORP,
    vault_addr=os.environ['VAULT_ADDR'],
    vault_token=os.environ['VAULT_TOKEN']
)

# Add vault manager to all hosts
for host in nr.inventory.hosts.values():
    host['vault_manager'] = vault_manager

# Run credential injection task
results = nr.run(task=set_credentials_from_vault)

CI/CD Integration: GitHub Actions with Vault

Secure credential injection in CI/CD pipelines:

name: Network Automation with Vault
on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Fetch secrets from HashiCorp Vault
        uses: hashicorp/vault-action@v2
        with:
          url: ${{ secrets.VAULT_ADDR }}
          method: token
          token: ${{ secrets.VAULT_TOKEN }}
          secrets: |
            network/credentials/routers username | ROUTER_USERNAME
            network/credentials/routers password | ROUTER_PASSWORD
            network/credentials/api-token token | API_TOKEN

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run network automation
        run: python scripts/deploy.py
        env:
          VAULT_ADDR: ${{ secrets.VAULT_ADDR }}
          VAULT_TOKEN: ${{ secrets.VAULT_TOKEN }}
          ROUTER_USERNAME: ${{ env.ROUTER_USERNAME }}
          ROUTER_PASSWORD: ${{ env.ROUTER_PASSWORD }}
          API_TOKEN: ${{ env.API_TOKEN }}

Dynamic Secrets: Just-In-Time Access

Generate ephemeral SSH credentials with Vault SSH engine:

class VaultSSHSecrets:
    """Manage dynamic SSH credentials from Vault"""

    def __init__(self, vault_manager: VaultManager):
        self.vault = vault_manager

    async def generate_ssh_credentials(
        self,
        role: str,
        ip_address: str,
        ttl: int = 3600
    ) -> Dict[str, str]:
        """
        Generate temporary SSH credentials

        Args:
            role: SSH role name in Vault
            ip_address: Target device IP
            ttl: Credential lifetime in seconds

        Returns:
            Dict with username, password, and key
        """
        try:
            # Generate dynamic SSH secret
            ssh_cred = self.vault.backend.client.secrets.ssh.generate_credentials(
                role=role,
                ip=ip_address,
                username='automation',
                ttl=f"{ttl}s"
            )

            return {
                'username': ssh_cred['data']['username'],
                'password': ssh_cred['data']['key'],
                'ttl': ttl,
                'expires_at': (
                    datetime.now() + timedelta(seconds=ttl)
                ).isoformat()
            }

        except Exception as e:
            logger.error(f"Failed to generate SSH credentials: {e}")
            raise

Credential Rotation Scheduler

Automatic rotation of aged credentials:

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

class CredentialRotationScheduler:
    """Automatically rotate credentials based on schedule"""

    def __init__(self, vault_manager: VaultManager):
        self.vault = vault_manager
        self.scheduler = AsyncIOScheduler()

    def schedule_rotation(
        self,
        secret_path: str,
        interval_days: int = 90,
        rotation_hour: int = 2  # Off-peak time
    ):
        """
        Schedule automatic secret rotation

        Args:
            secret_path: Path to secret to rotate
            interval_days: Rotation frequency
            rotation_hour: Hour of day to rotate (UTC)
        """
        self.scheduler.add_job(
            self.vault.rotate_secret,
            'interval',
            days=interval_days,
            args=[secret_path],
            name=f"rotate_{secret_path}"
        )

        logger.info(f"Scheduled rotation for {secret_path} every {interval_days} days")

    async def start(self):
        """Start the rotation scheduler"""
        self.scheduler.start()
        logger.info("Credential rotation scheduler started")

    async def stop(self):
        """Stop the rotation scheduler"""
        self.scheduler.shutdown()
        logger.info("Credential rotation scheduler stopped")

Testing Vault Integration

import pytest
from unittest.mock import AsyncMock, MagicMock

@pytest.mark.asyncio
async def test_vault_manager_caching():
    """Test secret caching with TTL"""
    manager = VaultManager(
        backend_type=VaultBackend.HASHICORP,
        cache_ttl=10
    )

    # Mock backend
    manager.backend.get_secret = AsyncMock(
        return_value={'username': 'admin', 'password': 'secret123'}
    )

    # First retrieval (from vault)
    result1 = await manager.get_secret('network/router1')
    assert manager.backend.get_secret.call_count == 1

    # Second retrieval (from cache)
    result2 = await manager.get_secret('network/router1')
    assert manager.backend.get_secret.call_count == 1  # Not called again
    assert result1 == result2

@pytest.mark.asyncio
async def test_secret_rotation():
    """Test secret rotation audit trail"""
    manager = VaultManager(enable_audit=True)
    manager.backend.rotate_secret = AsyncMock()

    await manager.rotate_secret('network/router1')

    # Check audit log
    assert len(manager.audit_log) > 0
    assert manager.audit_log[-1]['action'] == 'secret_rotated'

Key Takeaways

โœ… Never hardcode credentials - Use secrets managers
โœ… Support multiple backends - HashiCorp Vault, AWS, Azure for portability
โœ… Cache with TTL - Reduce vault load while maintaining security
โœ… Audit all operations - Compliance and forensics
โœ… Rotate regularly - Automated rotation reduces risk
โœ… Use dynamic secrets - Ephemeral credentials for just-in-time access
โœ… Integrate with CI/CD - Secrets injection at runtime


PRIME in Action: Safety, Compliance, and Ownership

  • โœ… Safety: Secrets never hardcoded, automatic rotation, audit trails
  • โœ… Measuring: Comprehensive audit logging for compliance audits
  • โœ… Ownership: Teams control credential policies per environment
  • โœ… Empowerment: Clear APIs for credential retrieval across tools

๐Ÿ“ฃ Want More?