Zero Trust Security Model for AWS Infrastructure: Complete Implementation Guide

TL;DR / Key Takeaways

  • Core Principle: “Never trust, always verify” – no implicit trust for any user, device, or network location
  • Four Pillars: Identity verification, device trust, network segmentation, and data protection
  • AWS Integration: Comprehensive use of Identity Center, GuardDuty, WAF, KMS, and Config for complete coverage
  • Implementation Strategy: Phased approach with continuous verification and adaptive security policies
  • Enterprise Benefits: Reduced attack surface, improved compliance, enhanced visibility, and adaptive threat response
  • Practical Tools: Ready-to-use Python scripts, Terraform modules, and policy templates for immediate deployment

Introduction

Zero Trust security represents a fundamental shift from traditional perimeter-based security to a model where trust is never assumed and every request must be verified. In AWS environments, implementing Zero Trust requires orchestrating multiple services across identity, network, data, and monitoring layers to create a comprehensive security posture.

This guide provides enterprise-grade implementation patterns, practical code examples, and step-by-step deployment strategies for building a robust Zero Trust architecture in AWS.

Understanding Zero Trust Architecture

Core Principles

Zero Trust is built on three fundamental principles:

  1. Explicit Verification: Always authenticate and authorize based on data points including user identity, location, device health, service or workload, data classification, and anomalies
  2. Least Privilege Access: Limit user access with Just-In-Time and Just-Enough-Access (JIT/JEA), risk-based adaptive policies, and data protection
  3. Assume Breach: Verify end-to-end encryption, use analytics to gain visibility, drive threat detection, and improve defenses

Zero Trust Architecture Model

---
config:
  layout: elk
---
flowchart LR
 subgraph subGraph0["Policy Engine Core"]
    direction TB
        PE["🧠 Policy Engine<br>(Decision Orchestrator)"]
        PDP["⚖️ Policy Decision Point<br>(Evaluates Access)"]
        PAP["🔧 Policy Administration Point<br>(Manages Rules & Policies)"]
  end
 subgraph subGraph1["Identity Verification"]
        MFA["🔐 Multi-Factor Authentication<br>(TOTP/SMS/Hardware)"]
        IC["🏢 AWS Identity Center<br>(SSO Management)"]
        CERT["📜 Certificate Authority<br>(PKI Infrastructure)"]
        RISK["📊 Risk Assessment<br>(Behavioral Analysis)"]
  end
 subgraph subGraph2["Device Trust"]
        DEV["📱 Device Registration<br>(Hardware Fingerprint)"]
        COMP["✅ Device Compliance<br>(Security Posture)"]
        MDM["📋 Mobile Device Management<br>(Policy Enforcement)"]
        CERT_DEV["🔒 Device Certificates<br>(Mutual TLS)"]
  end
 subgraph subGraph3["Network Security"]
        VPC["🌐 Amazon VPC<br>(Network Segmentation)"]
        SG["🛡️ Security Groups<br>(Instance Firewall)"]
        NACL["🚧 Network ACLs<br>(Subnet Protection)"]
        WAF["🔥 AWS WAF<br>(Web Application Firewall)"]
        GWLB["⚖️ Gateway Load Balancer<br>(Traffic Inspection)"]
  end
 subgraph subGraph4["Data Protection"]
        KMS["🗝️ AWS KMS<br>(Encryption Keys)"]
        SECRETS["🔐 AWS Secrets Manager<br>(Credential Storage)"]
        MACIE["🔍 Amazon Macie<br>(Sensitive Data Discovery)"]
        DLP["🛡️ Data Loss Prevention<br>(Content Filtering)"]
  end
 subgraph subGraph5["Monitoring & Analytics"]
        GUARD["🚨 Amazon GuardDuty<br>(Threat Detection)"]
        CONFIG["⚙️ AWS Config<br>(Compliance Monitoring)"]
        TRAIL["📝 AWS CloudTrail<br>(Audit Logging)"]
        DETECTIVE["🔍 Amazon Detective<br>(Security Investigation)"]
  end
 subgraph subGraph6["Protected Resources"]
        EC2["💻 Amazon EC2<br>(Compute Instances)"]
        RDS["🗄️ Amazon RDS<br>(Managed Databases)"]
        S3["📦 Amazon S3<br>(Object Storage)"]
        LAMBDA["⚡ AWS Lambda<br>(Serverless Functions)"]
        EKS["☸️ Amazon EKS<br>(Kubernetes Clusters)"]
        WORKSPACES["🖥️ Amazon WorkSpaces<br>(Virtual Desktops)"]
  end
    PE --> PDP
    PDP --> PAP
    PAP --> PE
    USER["👤 User Request"] --> PE
    DEVICE_REQ["💻 Device Request"] --> PE
    APP["📱 Application Request"] --> PE
    PDP -.-> MFA & IC & CERT & RISK & DEV & COMP & MDM & CERT_DEV & VPC & SG & NACL & WAF & GWLB & KMS & SECRETS & MACIE & DLP
    VPC --> EC2
    SG --> RDS
    NACL --> S3
    WAF --> LAMBDA
    GWLB --> EKS
    KMS --> WORKSPACES
    EC2 --> GUARD
    RDS --> CONFIG
    S3 --> TRAIL
    LAMBDA --> DETECTIVE
    EKS --> GUARD
    WORKSPACES --> CONFIG
    GUARD --> TRAIL
    CONFIG --> DETECTIVE
    TRAIL --> CONFIG
    DETECTIVE --> RISK
     USER:::requests
     DEVICE_REQ:::requests
     APP:::requests
     PE:::policyCore
     PDP:::policyCore
     PAP:::policyCore
     MFA:::identity
     IC:::identity
     CERT:::identity
     RISK:::identity
     DEV:::device
     COMP:::device
     MDM:::device
     CERT_DEV:::device
     VPC:::network
     SG:::network
     NACL:::network
     WAF:::network
     GWLB:::network
     KMS:::data
     SECRETS:::data
     MACIE:::data
     DLP:::data
     GUARD:::monitoring
     CONFIG:::monitoring
     TRAIL:::monitoring
     DETECTIVE:::monitoring
     EC2:::resources
     RDS:::resources
     S3:::resources
     LAMBDA:::resources
     EKS:::resources
     WORKSPACES:::resources
    classDef policyCore fill:#ff6b6b,stroke:#d63031,stroke-width:3px,color:#fff
    classDef identity fill:#4ecdc4,stroke:#00b894,stroke-width:2px,color:#fff
    classDef device fill:#fd79a8,stroke:#e84393,stroke-width:2px,color:#fff
    classDef network fill:#45b7d1,stroke:#0984e3,stroke-width:2px,color:#fff
    classDef data fill:#96ceb4,stroke:#00b894,stroke-width:2px,color:#fff
    classDef monitoring fill:#fdcb6e,stroke:#e17055,stroke-width:2px,color:#000
    classDef resources fill:#dda0dd,stroke:#9932cc,stroke-width:2px,color:#000
    classDef requests fill:#a29bfe,stroke:#6c5ce7,stroke-width:2px,color:#fff

How to Read This Diagram

This Zero Trust Architecture Model uses a left-to-right flow design showing how security verification works across AWS environments. Here’s how to interpret the flow and components:

🔄 Access Flow Pattern

  1. Entry Points (Left Side): All access requests (User 👤, Device 💻, Application 📱) must enter through the Policy Engine
  2. Central Decision Making: Policy Engine orchestrates decisions through internal flow: Policy Engine → Policy Decision Point → Policy Administration Point → Policy Engine (continuous loop)
  3. Multi-Layer Evaluation: Policy Decision Point (PDP) simultaneously evaluates requests across all five security layers using dotted control lines
  4. Resource Protection: Security layers protect specific AWS resources only after authorization is granted
  5. Continuous Monitoring: Protected resources feed monitoring data back to analytics services, which improve risk assessment

🏗️ Architecture Components Explained

🧠 Policy Engine Core (Red – Highest Priority)

  • Policy Engine: Central orchestrator that receives all access requests and coordinates security decisions
  • Policy Decision Point (PDP): Evaluates access requests against policies from all security layers simultaneously
  • Policy Administration Point (PAP): Manages, updates, and distributes security policies across the architecture
  • Internal Flow: Forms a continuous decision loop ensuring dynamic policy updates and real-time decisions

🔐 Identity Verification (Teal – Trust Establishment)

  • Multi-Factor Authentication: Enforces TOTP, SMS, and hardware token requirements for all access attempts
  • AWS Identity Center: Provides centralized SSO and identity federation across AWS accounts and external systems
  • Certificate Authority: Manages PKI infrastructure for digital certificates and identity validation
  • Risk Assessment: Performs behavioral analysis, location verification, and anomaly detection for adaptive authentication

📱 Device Trust (Pink – Device Security Posture)

  • Device Registration: Creates unique hardware fingerprints and maintains device inventory
  • Device Compliance: Continuously monitors security posture including OS updates, antivirus status, and configuration compliance
  • Mobile Device Management: Enforces security policies on mobile devices including encryption, app whitelisting, and remote wipe capabilities
  • Device Certificates: Implements mutual TLS authentication ensuring only trusted devices can establish connections

🌐 Network Security (Blue – Perimeter & Segmentation)

  • Amazon VPC: Provides network-level isolation and micro-segmentation with software-defined boundaries
  • Security Groups: Implements stateful firewall rules at the instance level with automatic connection tracking
  • Network ACLs: Enforces subnet-level access control lists providing additional network protection layers
  • AWS WAF: Protects web applications against common attacks including SQL injection, XSS, and DDoS
  • Gateway Load Balancer: Enables deep packet inspection and traffic filtering through third-party security appliances

🗝️ Data Protection (Green – Data Security)

  • AWS KMS: Manages encryption keys for data-at-rest and data-in-transit across all AWS services
  • AWS Secrets Manager: Provides secure credential storage, automatic rotation, and just-in-time access to sensitive data
  • Amazon Macie: Discovers, classifies, and protects sensitive data using machine learning and pattern recognition
  • Data Loss Prevention: Inspects content, monitors data movement, and prevents unauthorized data exfiltration

🚨 Monitoring & Analytics (Yellow/Orange – Visibility & Response)

  • Amazon GuardDuty: Uses AI/ML for threat detection, analyzing VPC flow logs, DNS logs, and CloudTrail events
  • AWS Config: Monitors resource configurations, tracks compliance drift, and provides configuration history
  • AWS CloudTrail: Records all API calls across AWS services providing comprehensive audit trails
  • Amazon Detective: Performs security investigation using graph analytics to identify root causes of security findings

🔍 Visual Design Elements

Connection Types:

  • Solid Arrows (→): Direct access flow and resource protection
  • Dotted Arrows (-.->): Policy control and evaluation signals from PDP to security layers
  • Feedback Arrows: Monitoring data flowing back to analytics and risk assessment systems

Color-Coded Priority:

  • Red/Pink: Core decision-making (Policy Engine) – Highest criticality
  • Teal: Identity verification and data protection – Trust establishment
  • Blue: Network security – Perimeter protection and segmentation
  • Yellow/Orange: Monitoring and analytics – Visibility and threat response
  • Purple: Protected AWS resources – The assets being secured
  • Light Purple: Access request entry points – Potential threat vectors

🛡️ Zero Trust Implementation Principles

  1. Never Trust, Always Verify: Every request, regardless of source location or previous access, must be fully authenticated and authorized
  2. Principle of Least Privilege: Access grants are limited to specific resources, actions, and time windows with continuous re-validation
  3. Assume Breach: Architecture assumes that breaches have already occurred, implementing detection and containment at every layer
  4. Continuous Verification: Security decisions use real-time context including location, device posture, behavior patterns, and threat intelligence
  5. Comprehensive Monitoring: All access attempts, successful and failed, are logged and analyzed for patterns and anomalies
  6. Dynamic Policy Updates: Security policies are continuously updated based on threat intelligence, compliance requirements, and risk analysis

🔄 Critical Feedback Loops

The architecture implements several feedback mechanisms:

  • Monitoring → Risk Assessment: Security events continuously update risk scoring algorithms
  • Detective → Risk Assessment: Investigation findings improve behavioral analysis models
  • Policy Administration → Policy Engine: Updated policies are immediately distributed across the architecture
  • Resource Activity → Monitoring: All resource access generates telemetry for threat detection and compliance monitoring

This comprehensive Zero Trust model ensures that security decisions are never static, no single point of failure exists, and every component contributes to the overall security posture through continuous verification and adaptive policy enforcement.

Implementation Framework

Phase 1: Identity Verification Layer

AWS Identity Center Integration

# AWS Identity Center Zero Trust Configuration
import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class ZeroTrustIdentityConfig:
    organization_id: str
    identity_center_instance_arn: str
    mfa_required: bool = True
    session_duration_hours: int = 4
    risk_based_auth: bool = True
    device_trust_required: bool = True

class ZeroTrustIdentityManager:
    def __init__(self, config: ZeroTrustIdentityConfig):
        self.config = config
        self.identity_center = boto3.client('sso-admin')
        self.iam = boto3.client('iam')
        self.organizations = boto3.client('organizations')
    
    def create_conditional_access_policy(self, policy_name: str, conditions: Dict) -> Dict:
        """Create conditional access policy with Zero Trust requirements"""
        
        policy_document = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "RequireMFAForAllAccess",
                    "Effect": "Deny",
                    "Action": "*",
                    "Resource": "*",
                    "Condition": {
                        "BoolIfExists": {
                            "aws:MultiFactorAuthPresent": "false"
                        }
                    }
                },
                {
                    "Sid": "RequireSecureTransport",
                    "Effect": "Deny",
                    "Action": "*",
                    "Resource": "*",
                    "Condition": {
                        "Bool": {
                            "aws:SecureTransport": "false"
                        }
                    }
                },
                {
                    "Sid": "RestrictBySourceIP",
                    "Effect": "Deny",
                    "Action": "*",
                    "Resource": "*",
                    "Condition": {
                        "NotIpAddress": {
                            "aws:SourceIp": conditions.get("allowed_ip_ranges", [])
                        }
                    }
                } if conditions.get("ip_restriction_enabled") else {
                    "Sid": "AllowFromAnyIP",
                    "Effect": "Allow",
                    "Action": "*",
                    "Resource": "*"
                },
                # Note: Session duration enforcement should be handled by AWS STS or 
                # Identity Center session policies rather than IAM conditions
                {
                    "Sid": "RequireDeviceTrust",
                    "Effect": "Deny",
                    "Action": "*",
                    "Resource": "*",
                    "Condition": {
                        "StringNotEquals": {
                            "aws:RequestedRegion": conditions.get("allowed_regions", ["us-east-1", "us-west-2"])
                        }
                    }
                } if self.config.device_trust_required else {
                    "Sid": "AllowAnyDevice",
                    "Effect": "Allow", 
                    "Action": "*",
                    "Resource": "*"
                }
            ]
        }
        
        # Filter out None statements
        policy_document["Statement"] = [stmt for stmt in policy_document["Statement"] if stmt is not None]
        
        return {
            "policy_name": policy_name,
            "policy_document": policy_document,
            "conditions_applied": conditions,
            "zero_trust_level": "STRICT" if self.config.device_trust_required else "STANDARD"
        }
    
    def setup_permission_set_with_conditions(self, permission_set_name: str, 
                                           role_arn: str, conditions: Dict) -> Dict:
        """Create permission set with Zero Trust conditional access"""
        
        try:
            # Create permission set
            permission_set_response = self.identity_center.create_permission_set(
                Name=permission_set_name,
                Description=f"Zero Trust permission set for {permission_set_name}",
                InstanceArn=self.config.identity_center_instance_arn,
                SessionDuration=f"PT{self.config.session_duration_hours}H"
            )
            
            permission_set_arn = permission_set_response['PermissionSet']['PermissionSetArn']
            
            # Apply conditional access policy
            conditional_policy = self.create_conditional_access_policy(
                f"{permission_set_name}-conditions", conditions
            )
            
            # Attach inline policy to permission set
            self.identity_center.put_inline_policy_to_permission_set(
                InstanceArn=self.config.identity_center_instance_arn,
                PermissionSetArn=permission_set_arn,
                InlinePolicy=json.dumps(conditional_policy["policy_document"])
            )
            
            return {
                "permission_set_arn": permission_set_arn,
                "conditional_policy": conditional_policy,
                "status": "created",
                "zero_trust_enforcement": "enabled"
            }
            
        except Exception as e:
            return {"error": str(e), "status": "failed"}
    
    def implement_risk_based_authentication(self, user_attributes: Dict) -> Dict:
        """Implement risk-based authentication logic"""
        
        risk_factors = {
            "location_risk": self._assess_location_risk(user_attributes.get("source_ip")),
            "device_risk": self._assess_device_risk(user_attributes.get("device_id")),
            "behavior_risk": self._assess_behavioral_risk(user_attributes.get("user_id")),
            "time_risk": self._assess_time_risk(user_attributes.get("access_time"))
        }
        
        total_risk_score = sum(risk_factors.values()) / len(risk_factors)
        
        authentication_requirements = {
            "mfa_required": True,  # Always required in Zero Trust
            "additional_verification": total_risk_score > 0.7,
            "device_registration_required": total_risk_score > 0.5,
            "admin_approval_required": total_risk_score > 0.8,
            "risk_score": total_risk_score,
            "risk_factors": risk_factors
        }
        
        return authentication_requirements
    
    def _assess_location_risk(self, source_ip: str) -> float:
        """Assess risk based on source IP location"""
        # Simplified risk assessment - in production, integrate with threat intelligence
        known_safe_ranges = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]
        # Implementation would check against known good/bad IP ranges
        return 0.3  # Placeholder risk score
    
    def _assess_device_risk(self, device_id: str) -> float:
        """Assess device trust level"""
        # Check device registration, compliance status, certificate validity
        return 0.2  # Placeholder risk score
    
    def _assess_behavioral_risk(self, user_id: str) -> float:
        """Assess user behavioral anomalies"""
        # Analyze historical access patterns, unusual activities
        return 0.1  # Placeholder risk score
    
    def _assess_time_risk(self, access_time: str) -> float:
        """Assess risk based on access time"""
        # Check if access is outside normal business hours
        return 0.15  # Placeholder risk score

# Example implementation
config = ZeroTrustIdentityConfig(
    organization_id="o-1234567890",
    identity_center_instance_arn="arn:aws:sso:::instance/ssoins-1234567890123456",
    mfa_required=True,
    session_duration_hours=4,
    risk_based_auth=True,
    device_trust_required=True
)

identity_manager = ZeroTrustIdentityManager(config)

# Create Zero Trust permission set
conditions = {
    "allowed_ip_ranges": ["203.0.113.0/24", "198.51.100.0/24"],
    "ip_restriction_enabled": True,
    "allowed_regions": ["us-east-1", "us-west-2", "eu-west-1"],
    "device_trust_level": "HIGH"
}

permission_set = identity_manager.setup_permission_set_with_conditions(
    "ZeroTrust-Developer-Access", 
    "arn:aws:iam::123456789012:role/DeveloperRole",
    conditions
)

print("Zero Trust Permission Set Configuration:")
print(json.dumps(permission_set, indent=2))

# Risk-based authentication example
user_context = {
    "user_id": "john.doe@company.com",
    "source_ip": "203.0.113.45",
    "device_id": "device-12345",
    "access_time": "2025-01-08T14:30:00Z"
}

auth_requirements = identity_manager.implement_risk_based_authentication(user_context)
print("\nRisk-Based Authentication Requirements:")
print(json.dumps(auth_requirements, indent=2))

Multi-Factor Authentication Implementation

# Advanced MFA Configuration for Zero Trust
# Note: Install required packages: pip install boto3 cryptography
import boto3
from typing import Dict, List
import uuid

class ZeroTrustMFAManager:
    def __init__(self):
        self.iam = boto3.client('iam')
        self.cognito = boto3.client('cognito-idp')
    
    def setup_adaptive_mfa(self, user_pool_id: str, risk_configuration: Dict) -> Dict:
        """Setup adaptive MFA based on risk assessment"""
        
        mfa_configuration = {
            "UserPoolId": user_pool_id,
            "MfaConfiguration": "ON",  # Always require MFA in Zero Trust
            "SmsMfaConfiguration": {
                "SmsAuthenticationMessage": "Your Zero Trust verification code: {####}",
                "SmsConfiguration": {
                    "SnsCallerArn": risk_configuration.get("sns_role_arn"),
                    "ExternalId": str(uuid.uuid4())
                }
            },
            "SoftwareTokenMfaConfiguration": {
                "Enabled": True
            }
        }
        
        try:
            response = self.cognito.set_user_pool_mfa_config(**mfa_configuration)
            
            # Configure risk-based authentication
            risk_config = {
                "UserPoolId": user_pool_id,
                "ClientId": risk_configuration.get("client_id"),
                "RiskConfiguration": {
                    "CompromisedCredentialsRiskConfiguration": {
                        "EventFilter": ["SIGN_IN", "PASSWORD_CHANGE", "SIGN_UP"],
                        "Actions": {
                            "EventAction": "BLOCK"
                        }
                    },
                    "AccountTakeoverRiskConfiguration": {
                        "NotifyConfiguration": {
                            "From": risk_configuration.get("notification_email"),
                            "ReplyTo": risk_configuration.get("notification_email"),
                            "SourceArn": risk_configuration.get("ses_source_arn"),
                            "BlockEmail": {
                                "Subject": "Zero Trust: Account Takeover Attempt Blocked",
                                "HtmlBody": "<h1>Suspicious activity blocked</h1><p>We detected and blocked a potential account takeover attempt.</p>",
                                "TextBody": "Suspicious activity blocked. We detected and blocked a potential account takeover attempt."
                            },
                            "NoActionEmail": {
                                "Subject": "Zero Trust: Security Alert",
                                "HtmlBody": "<h1>Security Alert</h1><p>We detected unusual activity on your account.</p>",
                                "TextBody": "Security Alert: We detected unusual activity on your account."
                            },
                            "MfaEmail": {
                                "Subject": "Zero Trust: Additional Verification Required",
                                "HtmlBody": "<h1>Additional Verification</h1><p>Please complete additional verification to access your account.</p>",
                                "TextBody": "Additional Verification: Please complete additional verification to access your account."
                            }
                        },
                        "Actions": {
                            "LowAction": {"Notify": True, "EventAction": "MFA_IF_CONFIGURED"},
                            "MediumAction": {"Notify": True, "EventAction": "MFA_REQUIRED"},
                            "HighAction": {"Notify": True, "EventAction": "BLOCK"}
                        }
                    }
                }
            }
            
            self.cognito.put_risk_configuration(**risk_config)
            
            return {
                "status": "configured",
                "mfa_configuration": mfa_configuration,
                "risk_configuration": risk_config,
                "zero_trust_compliance": "ENFORCED"
            }
            
        except Exception as e:
            return {"error": str(e), "status": "failed"}
    
    def generate_hardware_token_backup(self, user_id: str) -> Dict:
        """Generate backup codes for hardware token failures"""
        
        # Generate 10 backup codes
        backup_codes = []
        for _ in range(10):
            code = str(uuid.uuid4()).replace('-', '').upper()[:8]
            backup_codes.append(code)
        
        # In production, encrypt and store these securely
        encrypted_codes = self._encrypt_backup_codes(backup_codes)
        
        return {
            "user_id": user_id,
            "backup_codes": encrypted_codes,
            "generation_time": "2025-01-08T12:00:00Z",
            "usage_instructions": [
                "Use backup codes only when primary MFA is unavailable",
                "Each code can be used only once",
                "Generate new codes after using 3 or more codes",
                "Store codes in a secure location separate from your device"
            ]
        }
    
    def _encrypt_backup_codes(self, codes: List[str]) -> List[str]:
        """Encrypt backup codes for secure storage"""
        # In production, use AWS KMS for proper encryption
        encrypted_codes = []
        for code in codes:
            # Example using AWS KMS (requires proper KMS key setup)
            # response = self.kms.encrypt(KeyId='alias/backup-codes', Plaintext=code.encode())
            # encrypted_codes.append(response['CiphertextBlob'])
            
            # For demo: Use proper cryptographic hashing instead of base64
            import hashlib
            encrypted_codes.append(hashlib.sha256(code.encode()).hexdigest()[:16])
        return encrypted_codes
    
    def validate_mfa_compliance(self, user_session: Dict) -> Dict:
        """Validate MFA compliance for Zero Trust"""
        
        compliance_checks = {
            "mfa_present": user_session.get("mfa_authenticated", False),
            "mfa_method_secure": user_session.get("mfa_method") in ["TOTP", "SMS", "HARDWARE_TOKEN"],
            "session_duration_valid": self._check_session_duration(user_session.get("login_time")),
            "device_trusted": user_session.get("device_trusted", False),
            "location_verified": user_session.get("location_verified", False)
        }
        
        overall_compliance = all(compliance_checks.values())
        
        return {
            "compliance_status": "COMPLIANT" if overall_compliance else "NON_COMPLIANT",
            "checks": compliance_checks,
            "required_actions": self._get_required_actions(compliance_checks) if not overall_compliance else [],
            "risk_level": "LOW" if overall_compliance else "HIGH"
        }
    
    def _check_session_duration(self, login_time: str) -> bool:
        """Check if session duration is within Zero Trust limits"""
        # Implementation would check actual session age
        return True  # Placeholder
    
    def _get_required_actions(self, compliance_checks: Dict) -> List[str]:
        """Get required actions to achieve compliance"""
        actions = []
        if not compliance_checks["mfa_present"]:
            actions.append("Complete multi-factor authentication")
        if not compliance_checks["device_trusted"]:
            actions.append("Register device or use trusted device")
        if not compliance_checks["location_verified"]:
            actions.append("Verify location or use approved location")
        return actions

# Example usage
mfa_manager = ZeroTrustMFAManager()

# Setup adaptive MFA
risk_config = {
    "client_id": "client-12345",
    "sns_role_arn": "arn:aws:iam::123456789012:role/CognitoSNSRole",
    "notification_email": "security@company.com",
    "ses_source_arn": "arn:aws:ses:us-east-1:123456789012:identity/security@company.com"
}

mfa_setup = mfa_manager.setup_adaptive_mfa("us-east-1_AbCdEfGhI", risk_config)
print("MFA Configuration:")
print(json.dumps(mfa_setup, indent=2))

# Generate backup codes
backup_codes = mfa_manager.generate_hardware_token_backup("user-12345")
print("\nBackup Codes Generated:")
print(json.dumps(backup_codes, indent=2))

# Validate session compliance
user_session = {
    "mfa_authenticated": True,
    "mfa_method": "TOTP",
    "login_time": "2025-01-08T12:00:00Z",
    "device_trusted": True,
    "location_verified": True
}

compliance = mfa_manager.validate_mfa_compliance(user_session)
print("\nMFA Compliance Check:")
print(json.dumps(compliance, indent=2))

Phase 2: Device Trust Layer

Device Registration and Compliance

# Device Trust Management for Zero Trust
import boto3
import hashlib
import json
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import cryptography
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa

class ZeroTrustDeviceManager:
    def __init__(self, device_table_name: str = 'zero-trust-devices'):
        self.device_table_name = device_table_name
        self.secrets_manager = boto3.client('secretsmanager')
        self.s3 = boto3.client('s3')
        self.lambda_client = boto3.client('lambda')
        self.dynamodb = boto3.resource('dynamodb')
        
    def register_device(self, device_info: Dict, user_id: str) -> Dict:
        """Register device with Zero Trust requirements"""
        
        # Generate device fingerprint
        device_fingerprint = self._generate_device_fingerprint(device_info)
        
        # Create device certificate
        device_cert = self._generate_device_certificate(device_fingerprint, user_id)
        
        # Perform initial compliance check
        compliance_status = self._check_device_compliance(device_info)
        
        device_record = {
            "device_id": device_fingerprint,
            "user_id": user_id,
            "device_info": device_info,
            "certificate": device_cert["certificate_pem"],
            "private_key_ref": device_cert["private_key_secret_arn"],
            "registration_time": datetime.utcnow().isoformat(),
            "compliance_status": compliance_status,
            "trust_level": self._calculate_trust_level(device_info, compliance_status),
            "last_seen": datetime.utcnow().isoformat(),
            "access_count": 0,
            "status": "ACTIVE"
        }
        
        # Store device record in DynamoDB
        devices_table = self.dynamodb.Table(self.device_table_name)
        devices_table.put_item(Item=device_record)
        
        return {
            "device_id": device_fingerprint,
            "registration_status": "SUCCESS",
            "certificate_expires": device_cert["expiry_date"],
            "trust_level": device_record["trust_level"],
            "compliance_requirements": self._get_compliance_requirements(compliance_status),
            "next_compliance_check": (datetime.utcnow() + timedelta(hours=24)).isoformat()
        }
    
    def _generate_device_fingerprint(self, device_info: Dict) -> str:
        """Generate unique device fingerprint"""
        fingerprint_data = {
            "hardware_id": device_info.get("hardware_id"),
            "mac_address": device_info.get("mac_address"),
            "cpu_id": device_info.get("cpu_id"),
            "motherboard_serial": device_info.get("motherboard_serial"),
            "bios_version": device_info.get("bios_version")
        }
        
        # Create hash of device characteristics
        fingerprint_string = json.dumps(fingerprint_data, sort_keys=True)
        fingerprint_hash = hashlib.sha256(fingerprint_string.encode()).hexdigest()
        
        return f"device-{fingerprint_hash[:16]}"
    
    def _generate_device_certificate(self, device_id: str, user_id: str) -> Dict:
        """Generate X.509 certificate for device"""
        
        try:
            # Generate private key
            private_key = rsa.generate_private_key(
                public_exponent=65537,
                key_size=2048,
            )
        
        # Create certificate
        subject = issuer = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "State"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, "City"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Zero Trust Organization"),
            x509.NameAttribute(NameOID.COMMON_NAME, device_id),
        ])
        
        cert = x509.CertificateBuilder().subject_name(
            subject
        ).issuer_name(
            issuer
        ).public_key(
            private_key.public_key()
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            datetime.utcnow()
        ).not_valid_after(
            datetime.utcnow() + timedelta(days=365)
        ).add_extension(
            x509.SubjectAlternativeName([
                x509.DNSName(f"{device_id}.zero-trust.local"),
                x509.RFC822Name(user_id),
            ]),
            critical=False,
        ).sign(private_key, hashes.SHA256())
        
        # Serialize certificate and key
        cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode()
        key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        ).decode()
        
        # Store private key in AWS Secrets Manager
        secret_arn = self._store_private_key_securely(device_id, key_pem)
        
            return {
                "certificate_pem": cert_pem,
                "private_key_secret_arn": secret_arn,
                "expiry_date": (datetime.utcnow() + timedelta(days=365)).isoformat()
            }
        except Exception as e:
            print(f"Certificate generation failed for {device_id}: {e}")
            return {
                "error": "Certificate generation failed",
                "certificate_pem": None,
                "private_key_secret_arn": None,
                "expiry_date": None
            }
    
    def _store_private_key_securely(self, device_id: str, private_key_pem: str) -> str:
        """Store private key in AWS Secrets Manager"""
        
        secret_name = f"zero-trust/device-keys/{device_id}"
        
        try:
            response = self.secrets_manager.create_secret(
                Name=secret_name,
                Description=f"Private key for Zero Trust device {device_id}",
                SecretString=json.dumps({
                    "private_key": private_key_pem,
                    "device_id": device_id,
                    "created_at": datetime.utcnow().isoformat()
                }),
                KmsKeyId="alias/zero-trust-secrets"
            )
            
            return response['ARN']
            
        except Exception as e:
            print(f"Error storing private key: {e}")
            return None
    
    def _check_device_compliance(self, device_info: Dict) -> Dict:
        """Check device compliance against Zero Trust policies"""
        
        compliance_checks = {
            "os_version_current": self._check_os_version(device_info.get("os_version")),
            "antivirus_enabled": device_info.get("antivirus_enabled", False),
            "firewall_enabled": device_info.get("firewall_enabled", False),
            "disk_encryption_enabled": device_info.get("disk_encrypted", False),
            "patch_level_current": self._check_patch_level(device_info.get("last_patch_date")),
            "no_jailbreak_detected": not device_info.get("jailbroken", False),
            "remote_access_disabled": not device_info.get("remote_access_enabled", True),
            "screen_lock_enabled": device_info.get("screen_lock_enabled", False)
        }
        
        compliance_score = sum(compliance_checks.values()) / len(compliance_checks)
        
        return {
            "overall_score": compliance_score,
            "checks": compliance_checks,
            "compliance_level": self._get_compliance_level(compliance_score),
            "required_remediation": [
                check for check, passed in compliance_checks.items() if not passed
            ]
        }
    
    def _check_os_version(self, os_version: str) -> bool:
        """Check if OS version meets minimum requirements"""
        # Simplified check - in production, maintain database of approved versions
        if not os_version:
            return False
        
        # Example: Windows 10 build 19041 or later
        if "Windows" in os_version:
            try:
                build_number = int(os_version.split(".")[-1])
                return build_number >= 19041
            except:
                return False
        
        # Example: macOS 11.0 or later
        if "macOS" in os_version:
            try:
                version_parts = os_version.split(".")
                major_version = int(version_parts[0].split()[-1])
                return major_version >= 11
            except:
                return False
        
        return True  # Default to true for other OS types
    
    def _check_patch_level(self, last_patch_date: str) -> bool:
        """Check if device patches are current"""
        if not last_patch_date:
            return False
        
        try:
            patch_date = datetime.fromisoformat(last_patch_date.replace('Z', '+00:00'))
            days_since_patch = (datetime.utcnow() - patch_date.replace(tzinfo=None)).days
            return days_since_patch <= 30  # Patches must be within 30 days
        except:
            return False
    
    def _calculate_trust_level(self, device_info: Dict, compliance_status: Dict) -> str:
        """Calculate overall device trust level"""
        compliance_score = compliance_status["overall_score"]
        
        # Additional factors
        device_age_factor = self._assess_device_age(device_info.get("registration_date"))
        usage_pattern_factor = self._assess_usage_patterns(device_info.get("usage_history", {}))
        
        overall_trust_score = (compliance_score * 0.6 + device_age_factor * 0.2 + usage_pattern_factor * 0.2)
        
        if overall_trust_score >= 0.8:
            return "HIGH"
        elif overall_trust_score >= 0.6:
            return "MEDIUM"
        else:
            return "LOW"
    
    def _assess_device_age(self, registration_date: str) -> float:
        """Assess trust based on device age and history"""
        # Newer devices have lower trust until proven
        if not registration_date:
            return 0.3  # New device, lower trust
        
        try:
            reg_date = datetime.fromisoformat(registration_date.replace('Z', '+00:00'))
            days_registered = (datetime.utcnow() - reg_date.replace(tzinfo=None)).days
            
            if days_registered < 7:
                return 0.3  # Very new
            elif days_registered < 30:
                return 0.6  # Somewhat established
            else:
                return 0.9  # Well established
        except:
            return 0.3
    
    def _assess_usage_patterns(self, usage_history: Dict) -> float:
        """Assess trust based on usage patterns"""
        # Analyze for anomalous behavior
        normal_access_hours = usage_history.get("normal_hours", [9, 17])  # 9 AM to 5 PM
        unusual_locations = usage_history.get("unusual_locations", 0)
        failed_attempts = usage_history.get("failed_attempts", 0)
        
        pattern_score = 1.0
        
        if unusual_locations > 3:
            pattern_score -= 0.3
        if failed_attempts > 5:
            pattern_score -= 0.4
        
        return max(0.0, pattern_score)
    
    def _get_compliance_level(self, compliance_score: float) -> str:
        """Get compliance level based on score"""
        if compliance_score >= 0.9:
            return "FULLY_COMPLIANT"
        elif compliance_score >= 0.7:
            return "MOSTLY_COMPLIANT"
        elif compliance_score >= 0.5:
            return "PARTIALLY_COMPLIANT"
        else:
            return "NON_COMPLIANT"
    
    def _get_compliance_requirements(self, compliance_status: Dict) -> List[str]:
        """Get list of compliance requirements based on current status"""
        requirements = []
        
        for check, passed in compliance_status["checks"].items():
            if not passed:
                requirement_map = {
                    "os_version_current": "Update operating system to supported version",
                    "antivirus_enabled": "Install and enable antivirus software",
                    "firewall_enabled": "Enable device firewall",
                    "disk_encryption_enabled": "Enable full disk encryption",
                    "patch_level_current": "Install latest security patches",
                    "no_jailbreak_detected": "Remove jailbreak/root modifications",
                    "remote_access_disabled": "Disable unnecessary remote access tools",
                    "screen_lock_enabled": "Enable screen lock with strong PIN/password"
                }
                
                requirements.append(requirement_map.get(check, f"Address {check} compliance issue"))
        
        return requirements

    def continuous_compliance_monitoring(self, device_id: str) -> Dict:
        """Perform continuous compliance monitoring"""
        
        try:
            # Get current device record
            devices_table = self.dynamodb.Table(self.device_table_name)
            response = devices_table.get_item(Key={'device_id': device_id})
            
            if 'Item' not in response:
                return {"error": "Device not found", "device_id": device_id}
            
            device_record = response['Item']
            
            # Simulate real-time compliance check (in production, integrate with MDM/EDR)
            current_compliance = self._check_device_compliance(device_record['device_info'])
            
            # Update device record with new compliance status
            device_record['compliance_status'] = current_compliance
            device_record['last_compliance_check'] = datetime.utcnow().isoformat()
            device_record['trust_level'] = self._calculate_trust_level(
                device_record['device_info'], current_compliance
            )
            
            # Store updated record
            devices_table.put_item(Item=device_record)
            
            # Generate compliance report
            compliance_report = {
                "device_id": device_id,
                "compliance_check_time": datetime.utcnow().isoformat(),
                "previous_score": device_record.get('previous_compliance_score', 0),
                "current_score": current_compliance["overall_score"],
                "score_trend": "IMPROVING" if current_compliance["overall_score"] > device_record.get('previous_compliance_score', 0) else "DECLINING",
                "trust_level": device_record['trust_level'],
                "action_required": len(current_compliance["required_remediation"]) > 0,
                "remediation_items": current_compliance["required_remediation"],
                "next_check": (datetime.utcnow() + timedelta(hours=24)).isoformat()
            }
            
            # Trigger automated remediation if critical issues found
            if current_compliance["overall_score"] < 0.5:
                self._trigger_automated_remediation(device_id, current_compliance)
            
            return compliance_report
            
        except Exception as e:
            return {"error": str(e), "device_id": device_id}
    
    def _trigger_automated_remediation(self, device_id: str, compliance_status: Dict):
        """Trigger automated remediation for compliance issues"""
        
        remediation_actions = {
            "patch_level_current": {
                "action": "trigger_patch_deployment",
                "lambda_function": "zero-trust-patch-deployment"
            },
            "antivirus_enabled": {
                "action": "deploy_antivirus",
                "lambda_function": "zero-trust-av-deployment"
            },
            "firewall_enabled": {
                "action": "enable_firewall",
                "lambda_function": "zero-trust-firewall-config"
            }
        }
        
        for failed_check in compliance_status["required_remediation"]:
            if failed_check in remediation_actions:
                action = remediation_actions[failed_check]
                
                # Invoke remediation Lambda function
                try:
                    self.lambda_client.invoke(
                        FunctionName=action["lambda_function"],
                        InvocationType='Event',  # Asynchronous
                        Payload=json.dumps({
                            "device_id": device_id,
                            "action": action["action"],
                            "compliance_issue": failed_check
                        })
                    )
                    
                    print(f"Triggered automated remediation for {device_id}: {action['action']}")
                    
                except Exception as e:
                    print(f"Failed to trigger remediation for {device_id}: {e}")

# Example usage
device_manager = ZeroTrustDeviceManager()

# Register a new device
device_info = {
    "hardware_id": "HW-12345-ABCDE",
    "mac_address": "00:1B:44:11:3A:B7",
    "cpu_id": "CPU-67890",
    "motherboard_serial": "MB-54321",
    "bios_version": "BIOS-1.2.3",
    "os_version": "Windows 10.0.19041",
    "antivirus_enabled": True,
    "firewall_enabled": True,
    "disk_encrypted": True,
    "last_patch_date": "2025-01-05T10:30:00Z",
    "jailbroken": False,
    "remote_access_enabled": False,
    "screen_lock_enabled": True,
    "registration_date": "2025-01-01T00:00:00Z",
    "usage_history": {
        "normal_hours": [9, 17],
        "unusual_locations": 1,
        "failed_attempts": 2
    }
}

registration_result = device_manager.register_device(device_info, "john.doe@company.com")
print("Device Registration Result:")
print(json.dumps(registration_result, indent=2))

# Perform continuous compliance monitoring
compliance_report = device_manager.continuous_compliance_monitoring(registration_result["device_id"])
print("\nContinuous Compliance Monitoring:")
print(json.dumps(compliance_report, indent=2))

Phase 3: Network Segmentation Layer

Zero Trust Network Architecture

# Terraform configuration for Zero Trust Network Architecture
# File: zero-trust-network.tf

terraform {
  required_version = ">= 1.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

variable "organization_name" {
  description = "Organization name for resource naming"
  type        = string
  default     = "zero-trust-org"
}

variable "environment" {
  description = "Environment (prod, staging, dev)"
  type        = string
  default     = "prod"
}

# Zero Trust VPC with micro-segmentation
resource "aws_vpc" "zero_trust_vpc" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true
  
  tags = {
    Name        = "${var.organization_name}-zero-trust-vpc"
    Environment = var.environment
    ZeroTrust   = "enabled"
  }
}

# Internet Gateway
resource "aws_internet_gateway" "zero_trust_igw" {
  vpc_id = aws_vpc.zero_trust_vpc.id
  
  tags = {
    Name = "${var.organization_name}-zero-trust-igw"
  }
}

# NAT Gateway for outbound traffic
resource "aws_eip" "nat_gateway_eip" {
  domain = "vpc"
  
  tags = {
    Name = "${var.organization_name}-nat-gateway-eip"
  }
}

resource "aws_nat_gateway" "zero_trust_nat" {
  allocation_id = aws_eip.nat_gateway_eip.id
  subnet_id     = aws_subnet.public_subnet_az1.id
  
  tags = {
    Name = "${var.organization_name}-zero-trust-nat"
  }
  
  depends_on = [aws_internet_gateway.zero_trust_igw]
}

# Public subnets (for load balancers and NAT gateways only)
resource "aws_subnet" "public_subnet_az1" {
  vpc_id                  = aws_vpc.zero_trust_vpc.id
  cidr_block              = "10.0.1.0/24"
  availability_zone       = data.aws_availability_zones.available.names[0]
  map_public_ip_on_launch = false  # Zero Trust: no public IPs by default
  
  tags = {
    Name = "${var.organization_name}-public-subnet-az1"
    Tier = "public"
  }
}

resource "aws_subnet" "public_subnet_az2" {
  vpc_id                  = aws_vpc.zero_trust_vpc.id
  cidr_block              = "10.0.2.0/24"
  availability_zone       = data.aws_availability_zones.available.names[1]
  map_public_ip_on_launch = false
  
  tags = {
    Name = "${var.organization_name}-public-subnet-az2"
    Tier = "public"
  }
}

# Private subnets for applications (micro-segmented)
resource "aws_subnet" "app_subnet_web_az1" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.10.0/24"
  availability_zone = data.aws_availability_zones.available.names[0]
  
  tags = {
    Name = "${var.organization_name}-app-web-subnet-az1"
    Tier = "application"
    Layer = "web"
  }
}

resource "aws_subnet" "app_subnet_web_az2" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.11.0/24"
  availability_zone = data.aws_availability_zones.available.names[1]
  
  tags = {
    Name = "${var.organization_name}-app-web-subnet-az2"
    Tier = "application"
    Layer = "web"
  }
}

resource "aws_subnet" "app_subnet_api_az1" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.20.0/24"
  availability_zone = data.aws_availability_zones.available.names[0]
  
  tags = {
    Name = "${var.organization_name}-app-api-subnet-az1"
    Tier = "application"
    Layer = "api"
  }
}

resource "aws_subnet" "app_subnet_api_az2" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.21.0/24"
  availability_zone = data.aws_availability_zones.available.names[1]
  
  tags = {
    Name = "${var.organization_name}-app-api-subnet-az2"
    Tier = "application"
    Layer = "api"
  }
}

# Database subnets (highly restricted)
resource "aws_subnet" "data_subnet_az1" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.30.0/24"
  availability_zone = data.aws_availability_zones.available.names[0]
  
  tags = {
    Name = "${var.organization_name}-data-subnet-az1"
    Tier = "data"
  }
}

resource "aws_subnet" "data_subnet_az2" {
  vpc_id            = aws_vpc.zero_trust_vpc.id
  cidr_block        = "10.0.31.0/24"
  availability_zone = data.aws_availability_zones.available.names[1]
  
  tags = {
    Name = "${var.organization_name}-data-subnet-az2"
    Tier = "data"
  }
}

# Route tables
resource "aws_route_table" "public_route_table" {
  vpc_id = aws_vpc.zero_trust_vpc.id
  
  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.zero_trust_igw.id
  }
  
  tags = {
    Name = "${var.organization_name}-public-rt"
  }
}

resource "aws_route_table" "private_route_table" {
  vpc_id = aws_vpc.zero_trust_vpc.id
  
  route {
    cidr_block     = "0.0.0.0/0"
    nat_gateway_id = aws_nat_gateway.zero_trust_nat.id
  }
  
  tags = {
    Name = "${var.organization_name}-private-rt"
  }
}

resource "aws_route_table" "data_route_table" {
  vpc_id = aws_vpc.zero_trust_vpc.id
  
  # No internet access for data tier
  tags = {
    Name = "${var.organization_name}-data-rt"
  }
}

# Route table associations
resource "aws_route_table_association" "public_subnet_association_az1" {
  subnet_id      = aws_subnet.public_subnet_az1.id
  route_table_id = aws_route_table.public_route_table.id
}

resource "aws_route_table_association" "public_subnet_association_az2" {
  subnet_id      = aws_subnet.public_subnet_az2.id
  route_table_id = aws_route_table.public_route_table.id
}

resource "aws_route_table_association" "app_web_subnet_association_az1" {
  subnet_id      = aws_subnet.app_subnet_web_az1.id
  route_table_id = aws_route_table.private_route_table.id
}

resource "aws_route_table_association" "app_web_subnet_association_az2" {
  subnet_id      = aws_subnet.app_subnet_web_az2.id
  route_table_id = aws_route_table.private_route_table.id
}

resource "aws_route_table_association" "app_api_subnet_association_az1" {
  subnet_id      = aws_subnet.app_subnet_api_az1.id
  route_table_id = aws_route_table.private_route_table.id
}

resource "aws_route_table_association" "app_api_subnet_association_az2" {
  subnet_id      = aws_subnet.app_subnet_api_az2.id
  route_table_id = aws_route_table.private_route_table.id
}

resource "aws_route_table_association" "data_subnet_association_az1" {
  subnet_id      = aws_subnet.data_subnet_az1.id
  route_table_id = aws_route_table.data_route_table.id
}

resource "aws_route_table_association" "data_subnet_association_az2" {
  subnet_id      = aws_subnet.data_subnet_az2.id
  route_table_id = aws_route_table.data_route_table.id
}

# Zero Trust Security Groups (micro-segmentation)

# Load Balancer Security Group (minimal access)
resource "aws_security_group" "alb_security_group" {
  name_prefix = "${var.organization_name}-alb-sg"
  vpc_id      = aws_vpc.zero_trust_vpc.id
  
  description = "Zero Trust ALB Security Group"
  
  ingress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]  # HTTPS only from anywhere
    description = "HTTPS traffic from internet"
  }
  
  ingress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
    description = "HTTP traffic for redirect to HTTPS"
  }
  
  egress {
    from_port       = 80
    to_port         = 80
    protocol        = "tcp"
    security_groups = [aws_security_group.web_tier_security_group.id]
    description     = "HTTP to web tier"
  }
  
  egress {
    from_port       = 443
    to_port         = 443
    protocol        = "tcp"
    security_groups = [aws_security_group.web_tier_security_group.id]
    description     = "HTTPS to web tier"
  }
  
  tags = {
    Name = "${var.organization_name}-alb-sg"
    Tier = "load-balancer"
  }
}

# Web Tier Security Group
resource "aws_security_group" "web_tier_security_group" {
  name_prefix = "${var.organization_name}-web-sg"
  vpc_id      = aws_vpc.zero_trust_vpc.id
  
  description = "Zero Trust Web Tier Security Group"
  
  ingress {
    from_port       = 80
    to_port         = 80
    protocol        = "tcp"
    security_groups = [aws_security_group.alb_security_group.id]
    description     = "HTTP from ALB"
  }
  
  ingress {
    from_port       = 443
    to_port         = 443
    protocol        = "tcp"
    security_groups = [aws_security_group.alb_security_group.id]
    description     = "HTTPS from ALB"
  }
  
  egress {
    from_port       = 8080
    to_port         = 8080
    protocol        = "tcp"
    security_groups = [aws_security_group.api_tier_security_group.id]
    description     = "API calls to application tier"
  }
  
  egress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
    description = "HTTPS for external API calls and updates"
  }
  
  tags = {
    Name = "${var.organization_name}-web-sg"
    Tier = "web"
  }
}

# API Tier Security Group
resource "aws_security_group" "api_tier_security_group" {
  name_prefix = "${var.organization_name}-api-sg"
  vpc_id      = aws_vpc.zero_trust_vpc.id
  
  description = "Zero Trust API Tier Security Group"
  
  ingress {
    from_port       = 8080
    to_port         = 8080
    protocol        = "tcp"
    security_groups = [aws_security_group.web_tier_security_group.id]
    description     = "API traffic from web tier"
  }
  
  egress {
    from_port       = 5432
    to_port         = 5432
    protocol        = "tcp"
    security_groups = [aws_security_group.data_tier_security_group.id]
    description     = "PostgreSQL to data tier"
  }
  
  egress {
    from_port       = 3306
    to_port         = 3306
    protocol        = "tcp"
    security_groups = [aws_security_group.data_tier_security_group.id]
    description     = "MySQL to data tier"
  }
  
  egress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
    description = "HTTPS for external services"
  }
  
  tags = {
    Name = "${var.organization_name}-api-sg"
    Tier = "api"
  }
}

# Data Tier Security Group (most restrictive)
resource "aws_security_group" "data_tier_security_group" {
  name_prefix = "${var.organization_name}-data-sg"
  vpc_id      = aws_vpc.zero_trust_vpc.id
  
  description = "Zero Trust Data Tier Security Group"
  
  ingress {
    from_port       = 5432
    to_port         = 5432
    protocol        = "tcp"
    security_groups = [aws_security_group.api_tier_security_group.id]
    description     = "PostgreSQL from API tier"
  }
  
  ingress {
    from_port       = 3306
    to_port         = 3306
    protocol        = "tcp"
    security_groups = [aws_security_group.api_tier_security_group.id]
    description     = "MySQL from API tier"
  }
  
  # No outbound internet access for data tier
  # Only internal communication allowed
  
  tags = {
    Name = "${var.organization_name}-data-sg"
    Tier = "data"
  }
}

# Network ACLs for additional layer of security

# Public Network ACL
resource "aws_network_acl" "public_nacl" {
  vpc_id     = aws_vpc.zero_trust_vpc.id
  subnet_ids = [aws_subnet.public_subnet_az1.id, aws_subnet.public_subnet_az2.id]
  
  # Allow HTTPS inbound
  ingress {
    rule_no    = 100
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 443
    to_port    = 443
  }
  
  # Allow HTTP inbound (for redirect)
  ingress {
    rule_no    = 110
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 80
    to_port    = 80
  }
  
  # Allow ephemeral ports for return traffic
  ingress {
    rule_no    = 120
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 1024
    to_port    = 65535
  }
  
  # Allow all outbound traffic
  egress {
    rule_no    = 100
    protocol   = "-1"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
  }
  
  tags = {
    Name = "${var.organization_name}-public-nacl"
  }
}

# Private Network ACL (more restrictive)
resource "aws_network_acl" "private_nacl" {
  vpc_id = aws_vpc.zero_trust_vpc.id
  subnet_ids = [
    aws_subnet.app_subnet_web_az1.id,
    aws_subnet.app_subnet_web_az2.id,
    aws_subnet.app_subnet_api_az1.id,
    aws_subnet.app_subnet_api_az2.id
  ]
  
  # Allow HTTP/HTTPS from public subnets
  ingress {
    rule_no    = 100
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.1.0/24"
    from_port  = 80
    to_port    = 80
  }
  
  ingress {
    rule_no    = 110
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.2.0/24"
    from_port  = 80
    to_port    = 80
  }
  
  ingress {
    rule_no    = 120
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.1.0/24"
    from_port  = 443
    to_port    = 443
  }
  
  ingress {
    rule_no    = 130
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.2.0/24"
    from_port  = 443
    to_port    = 443
  }
  
  # Allow inter-application communication
  ingress {
    rule_no    = 140
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.10.0/22"  # All app subnets
    from_port  = 8080
    to_port    = 8080
  }
  
  # Allow ephemeral ports
  ingress {
    rule_no    = 150
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 1024
    to_port    = 65535
  }
  
  # Allow all outbound (will be further restricted by security groups)
  egress {
    rule_no    = 100
    protocol   = "-1"
    rule_action = "allow"
    cidr_block = "0.0.0.0/0"
  }
  
  tags = {
    Name = "${var.organization_name}-private-nacl"
  }
}

# Data Network ACL (most restrictive)
resource "aws_network_acl" "data_nacl" {
  vpc_id     = aws_vpc.zero_trust_vpc.id
  subnet_ids = [aws_subnet.data_subnet_az1.id, aws_subnet.data_subnet_az2.id]
  
  # Only allow database traffic from application subnets
  ingress {
    rule_no    = 100
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.20.0/24"  # API subnet AZ1
    from_port  = 5432
    to_port    = 5432
  }
  
  ingress {
    rule_no    = 110
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.21.0/24"  # API subnet AZ2
    from_port  = 5432
    to_port    = 5432
  }
  
  ingress {
    rule_no    = 120
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.20.0/24"  # API subnet AZ1
    from_port  = 3306
    to_port    = 3306
  }
  
  ingress {
    rule_no    = 130
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.21.0/24"  # API subnet AZ2
    from_port  = 3306
    to_port    = 3306
  }
  
  # Allow ephemeral ports for return traffic
  ingress {
    rule_no    = 140
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.20.0/22"  # API subnets only
    from_port  = 1024
    to_port    = 65535
  }
  
  # Very restricted outbound - only to application tier
  egress {
    rule_no    = 100
    protocol   = "tcp"
    rule_action = "allow"
    cidr_block = "10.0.20.0/22"  # API subnets only
    from_port  = 1024
    to_port    = 65535
  }
  
  tags = {
    Name = "${var.organization_name}-data-nacl"
  }
}

# WAF for additional application layer protection
resource "aws_wafv2_web_acl" "zero_trust_waf" {
  name  = "${var.organization_name}-zero-trust-waf"
  description = "Zero Trust WAF with comprehensive protection"
  scope = "REGIONAL"
  
  default_action {
    allow {}
  }
  
  # AWS Managed Rules - Core Rule Set
  rule {
    name     = "AWS-AWSManagedRulesCommonRuleSet"
    priority = 1
    
    override_action {
      none {}
    }
    
    statement {
      managed_rule_group_statement {
        name        = "AWSManagedRulesCommonRuleSet"
        vendor_name = "AWS"
      }
    }
    
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "AWS-AWSManagedRulesCommonRuleSet"
      sampled_requests_enabled   = true
    }
  }
  
  # AWS Managed Rules - Known Bad Inputs
  rule {
    name     = "AWS-AWSManagedRulesKnownBadInputsRuleSet"
    priority = 2
    
    override_action {
      none {}
    }
    
    statement {
      managed_rule_group_statement {
        name        = "AWSManagedRulesKnownBadInputsRuleSet"
        vendor_name = "AWS"
      }
    }
    
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "AWS-AWSManagedRulesKnownBadInputsRuleSet"
      sampled_requests_enabled   = true
    }
  }
  
  # AWS Managed Rules - SQL Injection
  rule {
    name     = "AWS-AWSManagedRulesSQLiRuleSet"
    priority = 3
    
    override_action {
      none {}
    }
    
    statement {
      managed_rule_group_statement {
        name        = "AWSManagedRulesSQLiRuleSet"
        vendor_name = "AWS"
      }
    }
    
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "AWS-AWSManagedRulesSQLiRuleSet"
      sampled_requests_enabled   = true
    }
  }
  
  # Rate Limiting Rule
  rule {
    name     = "RateLimitRule"
    priority = 4
    
    action {
      block {}
    }
    
    statement {
      rate_based_statement {
        limit              = 1000
        aggregate_key_type = "IP"
      }
    }
    
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "RateLimitRule"
      sampled_requests_enabled   = true
    }
  }
  
  # Geo-blocking rule (block high-risk countries)
  rule {
    name     = "GeoBlockingRule"
    priority = 5
    
    action {
      block {}
    }
    
    statement {
      geo_match_statement {
        country_codes = ["CN", "RU", "KP", "IR"]  # Example high-risk countries
      }
    }
    
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "GeoBlockingRule"
      sampled_requests_enabled   = true
    }
  }
  
  tags = {
    Name = "${var.organization_name}-zero-trust-waf"
  }
  
  visibility_config {
    cloudwatch_metrics_enabled = true
    metric_name                = "zero-trust-waf"
    sampled_requests_enabled   = true
  }
}

# VPC Flow Logs for monitoring
resource "aws_flow_log" "zero_trust_vpc_flow_log" {
  iam_role_arn    = aws_iam_role.flow_log_role.arn
  log_destination = aws_cloudwatch_log_group.vpc_flow_log.arn
  traffic_type    = "ALL"
  vpc_id          = aws_vpc.zero_trust_vpc.id
  
  tags = {
    Name = "${var.organization_name}-vpc-flow-log"
  }
}

resource "aws_cloudwatch_log_group" "vpc_flow_log" {
  name              = "/zero-trust/vpc-flow-logs"
  retention_in_days = 30
  
  tags = {
    Name = "${var.organization_name}-vpc-flow-log-group"
  }
}

resource "aws_iam_role" "flow_log_role" {
  name = "${var.organization_name}-flow-log-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "vpc-flow-logs.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "flow_log_policy" {
  name = "${var.organization_name}-flow-log-policy"
  role = aws_iam_role.flow_log_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
          "logs:DescribeLogGroups",
          "logs:DescribeLogStreams"
        ]
        Effect = "Allow"
        Resource = "*"
      }
    ]
  })
}

# Data sources
data "aws_availability_zones" "available" {
  state = "available"
}

# Outputs
output "vpc_id" {
  description = "ID of the Zero Trust VPC"
  value       = aws_vpc.zero_trust_vpc.id
}

output "public_subnet_ids" {
  description = "IDs of the public subnets"
  value       = [aws_subnet.public_subnet_az1.id, aws_subnet.public_subnet_az2.id]
}

output "app_subnet_ids" {
  description = "IDs of the application subnets"
  value = {
    web = [aws_subnet.app_subnet_web_az1.id, aws_subnet.app_subnet_web_az2.id]
    api = [aws_subnet.app_subnet_api_az1.id, aws_subnet.app_subnet_api_az2.id]
  }
}

output "data_subnet_ids" {
  description = "IDs of the data subnets"
  value       = [aws_subnet.data_subnet_az1.id, aws_subnet.data_subnet_az2.id]
}

output "security_group_ids" {
  description = "IDs of the security groups"
  value = {
    alb  = aws_security_group.alb_security_group.id
    web  = aws_security_group.web_tier_security_group.id
    api  = aws_security_group.api_tier_security_group.id
    data = aws_security_group.data_tier_security_group.id
  }
}

output "waf_arn" {
  description = "ARN of the WAF WebACL"
  value       = aws_wafv2_web_acl.zero_trust_waf.arn
}

Phase 4: Data Protection Layer

Comprehensive Data Encryption and DLP

# Data Protection and Encryption Management for Zero Trust
import boto3
import json
import base64
import hashlib
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import uuid

class ZeroTrustDataProtection:
    def __init__(self):
        self.kms = boto3.client('kms')
        self.s3 = boto3.client('s3')
        self.secrets_manager = boto3.client('secretsmanager')
        self.macie = boto3.client('macie2')
        self.guardduty = boto3.client('guardduty')
        
    def setup_comprehensive_encryption(self, organization_name: str) -> Dict:
        """Setup comprehensive encryption strategy for Zero Trust"""
        
        encryption_setup = {
            "kms_keys": {},
            "s3_encryption": {},
            "rds_encryption": {},
            "secrets_encryption": {},
            "status": "initializing"
        }
        
        try:
            # Create KMS keys for different data classifications
            data_classifications = [
                "public", "internal", "confidential", "restricted"
            ]
            
            for classification in data_classifications:
                key_policy = self._create_kms_key_policy(classification, organization_name)
                
                kms_key = self.kms.create_key(
                    Description=f"Zero Trust encryption key for {classification} data",
                    KeyUsage='ENCRYPT_DECRYPT',
                    Policy=json.dumps(key_policy),
                    Tags=[
                        {'TagKey': 'Organization', 'TagValue': organization_name},
                        {'TagKey': 'DataClassification', 'TagValue': classification},
                        {'TagKey': 'ZeroTrust', 'TagValue': 'enabled'},
                        {'TagKey': 'Purpose', 'TagValue': 'data-encryption'}
                    ]
                )
                
                key_id = kms_key['KeyMetadata']['KeyId']
                
                # Create alias for easier reference
                alias_name = f"alias/{organization_name}-{classification}-key"
                self.kms.create_alias(
                    AliasName=alias_name,
                    TargetKeyId=key_id
                )
                
                encryption_setup["kms_keys"][classification] = {
                    "key_id": key_id,
                    "alias": alias_name,
                    "arn": kms_key['KeyMetadata']['Arn']
                }
            
            # Setup S3 bucket encryption policies
            encryption_setup["s3_encryption"] = self._setup_s3_encryption_policies(
                encryption_setup["kms_keys"]
            )
            
            # Setup RDS encryption requirements
            encryption_setup["rds_encryption"] = self._setup_rds_encryption_policies(
                encryption_setup["kms_keys"]
            )
            
            # Setup secrets encryption
            encryption_setup["secrets_encryption"] = self._setup_secrets_encryption(
                encryption_setup["kms_keys"]
            )
            
            encryption_setup["status"] = "completed"
            
            return encryption_setup
            
        except Exception as e:
            encryption_setup["status"] = "failed"
            encryption_setup["error"] = str(e)
            return encryption_setup
    
    def _create_kms_key_policy(self, classification: str, organization_name: str) -> Dict:
        """Create KMS key policy based on data classification"""
        
        base_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "EnableIAMUserPermissions",
                    "Effect": "Allow",
                    "Principal": {"AWS": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:root"},
                    "Action": "kms:*",
                    "Resource": "*"
                }
            ]
        }
        
        # Add classification-specific policies
        if classification == "restricted":
            # Most restrictive - requires MFA and specific roles
            base_policy["Statement"].append({
                "Sid": "RequireMFAForRestrictedData",
                "Effect": "Deny",
                "Principal": "*",
                "Action": [
                    "kms:Decrypt",
                    "kms:DescribeKey",
                    "kms:GenerateDataKey*"
                ],
                "Resource": "*",
                "Condition": {
                    "BoolIfExists": {
                        "aws:MultiFactorAuthPresent": "false"
                    }
                }
            })
            
        elif classification == "confidential":
            # Requires specific roles or groups
            base_policy["Statement"].append({
                "Sid": "RestrictConfidentialDataAccess",
                "Effect": "Allow",
                "Principal": {"AWS": [
                    f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:role/{organization_name}-confidential-access-role"
                ]},
                "Action": [
                    "kms:Decrypt",
                    "kms:DescribeKey",
                    "kms:GenerateDataKey*"
                ],
                "Resource": "*"
            })
        
        # Add audit logging requirement
        base_policy["Statement"].append({
            "Sid": "RequireCloudTrailLogging",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "kms:*",
            "Resource": "*",
            "Condition": {
                "Bool": {
                    "aws:CloudTrailLogged": "false"
                }
            }
        })
        
        return base_policy
    
    def _setup_s3_encryption_policies(self, kms_keys: Dict) -> Dict:
        """Setup S3 encryption policies for different data classifications"""
        
        s3_policies = {}
        
        for classification, key_info in kms_keys.items():
            bucket_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Sid": f"RequireEncryptionFor{classification.title()}Data",
                        "Effect": "Deny",
                        "Principal": "*",
                        "Action": "s3:PutObject",
                        "Resource": f"arn:aws:s3:::*-{classification}-*/*",
                        "Condition": {
                            "StringNotEquals": {
                                "s3:x-amz-server-side-encryption": "aws:kms"
                            }
                        }
                    },
                    {
                        "Sid": f"RequireSpecificKMSKeyFor{classification.title()}Data",
                        "Effect": "Deny",
                        "Principal": "*",
                        "Action": "s3:PutObject",
                        "Resource": f"arn:aws:s3:::*-{classification}-*/*",
                        "Condition": {
                            "StringNotEquals": {
                                "s3:x-amz-server-side-encryption-aws-kms-key-id": key_info["arn"]
                            }
                        }
                    },
                    {
                        "Sid": f"RequireSSLFor{classification.title()}Data",
                        "Effect": "Deny",
                        "Principal": "*",
                        "Action": "s3:*",
                        "Resource": [
                            f"arn:aws:s3:::*-{classification}-*",
                            f"arn:aws:s3:::*-{classification}-*/*"
                        ],
                        "Condition": {
                            "Bool": {
                                "aws:SecureTransport": "false"
                            }
                        }
                    }
                ]
            }
            
            s3_policies[classification] = bucket_policy
        
        return s3_policies
    
    def _setup_rds_encryption_policies(self, kms_keys: Dict) -> Dict:
        """Setup RDS encryption requirements"""
        
        rds_policies = {
            "encryption_required": True,
            "key_assignments": {},
            "backup_encryption": True,
            "performance_insights_encryption": True
        }
        
        for classification, key_info in kms_keys.items():
            rds_policies["key_assignments"][classification] = {
                "kms_key_id": key_info["arn"],
                "storage_encrypted": True,
                "backup_encryption_key": key_info["arn"],
                "performance_insights_kms_key_id": key_info["arn"]
            }
        
        return rds_policies
    
    def _setup_secrets_encryption(self, kms_keys: Dict) -> Dict:
        """Setup encryption for AWS Secrets Manager"""
        
        secrets_config = {}
        
        # Use the restricted key for all secrets by default
        restricted_key = kms_keys.get("restricted", {}).get("arn")
        
        secrets_config = {
            "default_kms_key": restricted_key,
            "rotation_enabled": True,
            "rotation_interval_days": 30,
            "cross_region_replica_kms_keys": {
                "us-east-1": restricted_key,
                "us-west-2": restricted_key,
                "eu-west-1": restricted_key
            }
        }
        
        return secrets_config
    
    def setup_data_loss_prevention(self, bucket_names: List[str]) -> Dict:
        """Setup Amazon Macie for data loss prevention"""
        
        try:
            # Enable Macie if not already enabled
            try:
                self.macie.enable_macie()
                print("Macie enabled successfully")
            except self.macie.exceptions.ConflictException:
                print("Macie already enabled")
            
            # Create classification jobs for each bucket
            classification_jobs = {}
            
            for bucket_name in bucket_names:
                job_name = f"zero-trust-classification-{bucket_name}-{uuid.uuid4().hex[:8]}"
                
                job_definition = {
                    'clientToken': str(uuid.uuid4()),
                    'customDataIdentifiers': [],
                    'description': f'Zero Trust data classification for {bucket_name}',
                    'initialRun': True,
                    'jobType': 'ONE_TIME',
                    'name': job_name,
                    's3JobDefinition': {
                        'bucketDefinitions': [{
                            'accountId': boto3.client('sts').get_caller_identity()['Account'],
                            'buckets': [bucket_name]
                        }]
                    },
                    'samplingPercentage': 100,  # Scan all objects
                    'tags': {
                        'ZeroTrust': 'enabled',
                        'Purpose': 'data-classification',
                        'Bucket': bucket_name
                    }
                }
                
                response = self.macie.create_classification_job(**job_definition)
                classification_jobs[bucket_name] = {
                    'job_id': response['jobId'],
                    'job_arn': response['jobArn'],
                    'status': 'CREATED'
                }
            
            # Setup findings alerts
            findings_filter = self._create_macie_findings_filter()
            
            dlp_setup = {
                'macie_enabled': True,
                'classification_jobs': classification_jobs,
                'findings_filter': findings_filter,
                'monitoring': {
                    'high_sensitivity_alerts': True,
                    'pii_detection': True,
                    'financial_data_detection': True,
                    'healthcare_data_detection': True
                }
            }
            
            return dlp_setup
            
        except Exception as e:
            return {'error': str(e), 'status': 'failed'}
    
    def _create_macie_findings_filter(self) -> Dict:
        """Create Macie findings filter for high-risk data"""
        
        findings_filter = {
            'action': 'ARCHIVE',  # or 'NOOP' to just alert
            'description': 'Zero Trust high-sensitivity data findings',
            'name': 'zero-trust-high-sensitivity-filter',
            'position': 1,
            'findingCriteria': {
                'criterion': {
                    'severity.score': {
                        'gte': 7.0  # High severity findings
                    },
                    'type': {
                        'eq': ['SensitiveData:S3Object/Personal', 
                               'SensitiveData:S3Object/Financial',
                               'SensitiveData:S3Object/Credentials']
                    }
                }
            }
        }
        
        try:
            response = self.macie.create_findings_filter(**findings_filter)
            return {
                'filter_id': response['id'],
                'filter_arn': response['arn'],
                'status': 'created'
            }
        except Exception as e:
            return {'error': str(e)}
    
    def encrypt_sensitive_data(self, data: str, classification: str, 
                              context: Dict = None) -> Dict:
        """Encrypt sensitive data using appropriate KMS key"""
        
        try:
            # Get the appropriate KMS key for the classification
            alias_name = f"alias/zero-trust-org-{classification}-key"
            
            # Add encryption context for audit trail
            encryption_context = {
                'classification': classification,
                'timestamp': datetime.utcnow().isoformat(),
                'zero_trust': 'enabled'
            }
            
            if context:
                encryption_context.update(context)
            
            # Encrypt the data
            response = self.kms.encrypt(
                KeyId=alias_name,
                Plaintext=data.encode('utf-8'),
                EncryptionContext=encryption_context
            )
            
            # Encode the ciphertext for storage/transmission
            encrypted_data = base64.b64encode(response['CiphertextBlob']).decode('utf-8')
            
            return {
                'encrypted_data': encrypted_data,
                'key_id': response['KeyId'],
                'encryption_context': encryption_context,
                'status': 'encrypted'
            }
            
        except Exception as e:
            return {'error': str(e), 'status': 'failed'}
    
    def decrypt_sensitive_data(self, encrypted_data: str, 
                              expected_context: Dict = None) -> Dict:
        """Decrypt sensitive data with context validation"""
        
        try:
            # Decode the base64 encrypted data
            ciphertext_blob = base64.b64decode(encrypted_data.encode('utf-8'))
            
            # Decrypt the data
            response = self.kms.decrypt(
                CiphertextBlob=ciphertext_blob,
                EncryptionContext=expected_context or {}
            )
            
            # Validate encryption context if provided
            if expected_context:
                returned_context = response.get('EncryptionContext', {})
                for key, value in expected_context.items():
                    if returned_context.get(key) != value:
                        return {
                            'error': 'Encryption context validation failed',
                            'status': 'context_mismatch'
                        }
            
            decrypted_data = response['Plaintext'].decode('utf-8')
            
            return {
                'decrypted_data': decrypted_data,
                'key_id': response['KeyId'],
                'encryption_context': response.get('EncryptionContext', {}),
                'status': 'decrypted'
            }
            
        except Exception as e:
            return {'error': str(e), 'status': 'failed'}
    
    def audit_data_access(self, resource_arn: str, time_range_hours: int = 24) -> Dict:
        """Audit data access patterns for anomaly detection"""
        
        try:
            # This would integrate with CloudTrail and other audit services
            # For now, returning a structured audit report format
            
            audit_report = {
                'resource_arn': resource_arn,
                'audit_period': {
                    'start_time': (datetime.utcnow() - timedelta(hours=time_range_hours)).isoformat(),
                    'end_time': datetime.utcnow().isoformat()
                },
                'access_patterns': {
                    'total_access_attempts': 0,
                    'successful_accesses': 0,
                    'failed_accesses': 0,
                    'unique_users': 0,
                    'unique_ips': 0,
                    'unusual_locations': 0,
                    'off_hours_access': 0
                },
                'anomalies': [],
                'recommendations': [],
                'compliance_status': 'COMPLIANT'
            }
            
            # In production, this would query actual CloudTrail logs
            # and use machine learning for anomaly detection
            
            return audit_report
            
        except Exception as e:
            return {'error': str(e), 'status': 'audit_failed'}

# Example usage
data_protection = ZeroTrustDataProtection()

# Setup comprehensive encryption
encryption_setup = data_protection.setup_comprehensive_encryption("zero-trust-org")
print("Encryption Setup:")
print(json.dumps(encryption_setup, indent=2, default=str))

# Setup data loss prevention
bucket_names = ["zero-trust-org-confidential-data", "zero-trust-org-customer-data"]
dlp_setup = data_protection.setup_data_loss_prevention(bucket_names)
print("\nDLP Setup:")
print(json.dumps(dlp_setup, indent=2, default=str))

# Encrypt sensitive data
sensitive_data = "Customer SSN: 123-45-6789, Credit Card: 4111-1111-1111-1111"
encryption_result = data_protection.encrypt_sensitive_data(
    sensitive_data, 
    "restricted", 
    {"data_type": "customer_pii", "source": "customer_database"}
)
print("\nEncryption Result:")
print(json.dumps(encryption_result, indent=2))

# Decrypt the data
decryption_result = data_protection.decrypt_sensitive_data(
    encryption_result["encrypted_data"],
    {"classification": "restricted", "zero_trust": "enabled"}
)
print("\nDecryption Result:")
print(json.dumps({k: v for k, v in decryption_result.items() if k != "decrypted_data"}, indent=2))

# Audit data access
audit_result = data_protection.audit_data_access("arn:aws:s3:::zero-trust-org-confidential-data")
print("\nAudit Result:")
print(json.dumps(audit_result, indent=2))

Implementation Roadmap

Phase 1: Foundation (Weeks 1-4)

  • [ ] Identity Infrastructure
  • [ ] Deploy AWS Identity Center
  • [ ] Configure conditional access policies
  • [ ] Implement MFA requirements
  • [ ] Setup risk-based authentication
  • [ ] Device Management
  • [ ] Implement device registration process
  • [ ] Deploy compliance monitoring
  • [ ] Setup device certificates
  • [ ] Configure automated remediation

Phase 2: Network Security (Weeks 5-8)

  • [ ] Network Segmentation
  • [ ] Deploy micro-segmented VPCs
  • [ ] Configure security groups and NACLs
  • [ ] Implement WAF protection
  • [ ] Setup VPC Flow Logs
  • [ ] Monitoring and Analytics
  • [ ] Enable GuardDuty
  • [ ] Configure Config rules
  • [ ] Setup CloudTrail
  • [ ] Deploy Detective

Phase 3: Data Protection (Weeks 9-12)

  • [ ] Encryption Strategy
  • [ ] Deploy KMS key hierarchy
  • [ ] Configure S3 encryption
  • [ ] Setup RDS encryption
  • [ ] Implement secrets encryption
  • [ ] Data Loss Prevention
  • [ ] Enable Amazon Macie
  • [ ] Configure classification jobs
  • [ ] Setup findings alerts
  • [ ] Implement DLP policies

Phase 4: Operations and Optimization (Weeks 13-16)

  • [ ] Continuous Monitoring
  • [ ] Implement real-time alerting
  • [ ] Setup automated responses
  • [ ] Configure compliance dashboards
  • [ ] Deploy threat hunting
  • [ ] Process Integration
  • [ ] Integrate with ITSM
  • [ ] Setup incident response
  • [ ] Configure change management
  • [ ] Implement regular reviews

Best Practices and Recommendations

1. Identity and Access Management

  • Principle of Least Privilege: Grant minimum necessary permissions
  • Regular Access Reviews: Quarterly review of all access permissions
  • Conditional Access: Implement context-aware access controls
  • Just-in-Time Access: Temporary elevation for privileged operations

2. Device Security

  • Device Registration: Mandatory registration for all devices
  • Continuous Compliance: Real-time monitoring of device health
  • Certificate-Based Authentication: Strong device identity
  • Automated Remediation: Quick response to compliance violations

3. Network Security

  • Micro-Segmentation: Isolate workloads with security groups
  • Defense in Depth: Multiple layers of network controls
  • Traffic Inspection: Deep packet inspection with WAF
  • Anomaly Detection: ML-based traffic analysis

4. Data Protection

  • Classification-Based Encryption: Different keys for different data types
  • Encryption Everywhere: At rest, in transit, and in use
  • Key Rotation: Regular rotation of encryption keys
  • Access Logging: Comprehensive audit trails

Measuring Zero Trust Maturity

Maturity Levels

Level 1: Traditional (Baseline)

  • Perimeter-based security
  • Basic authentication
  • Limited monitoring
  • Manual processes

Level 2: Advanced (Developing)

  • Multi-factor authentication
  • Network segmentation
  • Enhanced monitoring
  • Some automation

Level 3: Optimal (Mature)

  • Zero Trust architecture
  • Continuous verification
  • Real-time analytics
  • Full automation

Level 4: Transformational (Leading)

  • AI/ML-driven security
  • Predictive analytics
  • Self-healing systems
  • Autonomous response

Key Metrics

# Zero Trust Maturity Assessment
def assess_zero_trust_maturity(organization_metrics: Dict) -> Dict:
    """Assess Zero Trust maturity level"""

    criteria = {
        "identity_verification": {
            "mfa_adoption": organization_metrics.get("mfa_adoption_rate", 0),
            "sso_integration": organization_metrics.get("sso_integration", 0),
            "conditional_access": organization_metrics.get("conditional_access_policies", 0)
        },
        "device_trust": {
            "device_registration": organization_metrics.get("device_registration_rate", 0),
            "compliance_monitoring": organization_metrics.get("compliance_monitoring", 0),
            "certificate_auth": organization_metrics.get("certificate_auth_usage", 0)
        },
        "network_security": {
            "micro_segmentation": organization_metrics.get("micro_segmentation", 0),
            "traffic_inspection": organization_metrics.get("traffic_inspection", 0),
            "anomaly_detection": organization_metrics.get("anomaly_detection", 0)
        },
        "data_protection": {
            "encryption_coverage": organization_metrics.get("encryption_coverage", 0),
            "dlp_implementation": organization_metrics.get("dlp_implementation", 0),
            "access_monitoring": organization_metrics.get("access_monitoring", 0)
        }
    }

    # Calculate weighted scores
    weights = {"identity_verification": 0.3, "device_trust": 0.2, 
               "network_security": 0.25, "data_protection": 0.25}

    pillar_scores = {}
    overall_score = 0

    for pillar, metrics in criteria.items():
        pillar_score = sum(metrics.values()) / len(metrics)
        pillar_scores[pillar] = pillar_score
        overall_score += pillar_score * weights[pillar]

    # Determine maturity level
    if overall_score >= 90:
        maturity_level = "Transformational"
    elif overall_score >= 70:
        maturity_level = "Optimal"
    elif overall_score >= 50:
        maturity_level = "Advanced"
    else:
        maturity_level = "Traditional"

    return {
        "overall_score": overall_score,
        "maturity_level": maturity_level,
        "pillar_scores": pillar_scores,
        "recommendations": generate_recommendations(pillar_scores)
    }

def generate_recommendations(pillar_scores: Dict) -> List[str]:
    """Generate recommendations based on pillar scores"""
    recommendations = []

    for pillar, score in pillar_scores.items():
        if score < 70:
            pillar_recs = {
                "identity_verification": [
                    "Increase MFA adoption to 100%",
                    "Implement comprehensive SSO solution",
                    "Deploy conditional access policies"
                ],
                "device_trust": [
                    "Implement mandatory device registration",
                    "Deploy continuous compliance monitoring",
                    "Enable certificate-based authentication"
                ],
                "network_security": [
                    "Implement micro-segmentation",
                    "Deploy comprehensive traffic inspection",
                    "Enable ML-based anomaly detection"
                ],
                "data_protection": [
                    "Achieve 100% encryption coverage",
                    "Implement comprehensive DLP solution",
                    "Enable real-time access monitoring"
                ]
            }
            recommendations.extend(pillar_recs.get(pillar, []))

    return recommendations

# Example assessment
org_metrics = {
    "mfa_adoption_rate": 95,
    "sso_integration": 80,
    "conditional_access_policies": 70,
    "device_registration_rate": 85,
    "compliance_monitoring": 75,
    "certificate_auth_usage": 60,
    "micro_segmentation": 90,
    "traffic_inspection": 85,
    "anomaly_detection": 70,
    "encryption_coverage": 95,
    "dlp_implementation": 80,
    "access_monitoring": 85
}

maturity_assessment = assess_zero_trust_maturity(org_metrics)
print("Zero Trust Maturity Assessment:")
print(json.dumps(maturity_assessment, indent=2))

Conclusion

Zero Trust security represents a paradigm shift from traditional perimeter-based security to a comprehensive “never trust, always verify” approach. In AWS environments, implementing Zero Trust requires careful orchestration of identity, device, network, and data protection controls.

Key Success Factors

  1. Executive Support: Zero Trust requires organizational commitment and investment
  2. Phased Implementation: Gradual rollout reduces risk and allows for learning
  3. User Experience: Balance security with usability to ensure adoption
  4. Continuous Improvement: Regular assessment and refinement of controls
  5. Integration: Seamless integration with existing systems and processes

Expected Outcomes

  • Reduced Attack Surface: Micro-segmentation limits blast radius
  • Enhanced Visibility: Comprehensive monitoring and analytics
  • Improved Compliance: Continuous verification meets regulatory requirements
  • Adaptive Security: Dynamic policy enforcement based on risk
  • Operational Efficiency: Automated responses and self-healing systems

The Zero Trust journey is ongoing, requiring continuous investment in technology, processes, and people. However, the benefits of improved security posture, reduced risk, and enhanced compliance make it essential for modern enterprise environments.

Related Resources

This guide provides a comprehensive foundation for implementing Zero Trust security in AWS environments. Regular updates and refinements should be made based on emerging threats, new AWS services, and organizational changes.

Leave a Comment

Your email address will not be published. Required fields are marked *