DOCS/_archive/gpt_docs/18_데이터_접근_제어_정책.md
happybell80 374a173e80 GPT_docs를 _archive로 이동 및 DB 테이블 활용 계획 문서 추가
- GPT_docs/ → _archive/gpt_docs/로 이동 (디렉토리 구조 정리)
- ideas/250818_conversation_logs_및_robing_stats_활용_계획.md 추가
  - conversation_logs, robing_stats, robing_settings 테이블 활용 방안
  - 현재 0개 레코드인 미사용 테이블들의 구현 가이드
  - 단계별 구현 계획 및 코드 예시 포함
2025-08-18 13:11:45 +09:00

12 KiB

데이터 접근 제어 정책 및 감사 로그 관리

데이터 분류 체계

데이터 민감도 레벨

class DataClassification:
    LEVELS = {
        'PUBLIC': {
            'level': 0,
            'description': '공개 정보',
            'encryption': False,
            'access_control': 'open',
            'retention': 'indefinite',
            'examples': ['공개 프로필', '통계 데이터']
        },
        'INTERNAL': {
            'level': 1,
            'description': '내부 사용',
            'encryption': 'at_rest',
            'access_control': 'authenticated',
            'retention': '2_years',
            'examples': ['작업 로그', '성능 메트릭']
        },
        'CONFIDENTIAL': {
            'level': 2,
            'description': '기밀',
            'encryption': 'at_rest_and_transit',
            'access_control': 'role_based',
            'retention': '1_year',
            'examples': ['사용자 대화', '개인 메모']
        },
        'RESTRICTED': {
            'level': 3,
            'description': '제한됨',
            'encryption': 'end_to_end',
            'access_control': 'attribute_based',
            'retention': '6_months',
            'examples': ['금융 정보', '의료 기록']
        },
        'TOP_SECRET': {
            'level': 4,
            'description': '극비',
            'encryption': 'quantum_resistant',
            'access_control': 'zero_trust',
            'retention': '30_days',
            'examples': ['인증 키', '개인정보']
        }
    }

접근 제어 모델

RBAC + ABAC 하이브리드

class HybridAccessControl:
    def __init__(self):
        self.roles = {}
        self.attributes = {}
        self.policies = []
        
    def define_role(self, role_name, permissions):
        """역할 정의"""
        self.roles[role_name] = {
            'permissions': permissions,
            'created': datetime.now(),
            'constraints': []
        }
    
    def define_attribute_policy(self, policy):
        """속성 기반 정책 정의"""
        
        # 예시: 시간 기반 접근 제어
        time_based_policy = {
            'name': 'business_hours_only',
            'conditions': [
                {
                    'attribute': 'current_time',
                    'operator': 'between',
                    'value': ['09:00', '18:00']
                },
                {
                    'attribute': 'user_location',
                    'operator': 'in',
                    'value': ['office', 'home']
                }
            ],
            'effect': 'allow'
        }
        
        self.policies.append(policy)
    
    def evaluate_access(self, subject, resource, action):
        """접근 권한 평가"""
        
        # 1. 역할 기반 체크
        role_check = self.check_role_permissions(subject, resource, action)
        
        # 2. 속성 기반 체크
        attribute_check = self.check_attributes(subject, resource, action)
        
        # 3. 컨텍스트 체크
        context_check = self.check_context(subject, resource)
        
        # 최종 결정
        if role_check and attribute_check and context_check:
            return True, "Access granted"
        else:
            return False, "Access denied"

감사 로그 시스템

불변 감사 로그

class AuditLogger:
    def __init__(self):
        self.log_chain = []  # 블록체인 스타일 로그
        self.current_hash = "0"
        
    def log_access_attempt(self, event):
        """접근 시도 로깅"""
        
        log_entry = {
            'id': uuid.uuid4().hex,
            'timestamp': datetime.now().isoformat(),
            'event_type': event['type'],
            'subject': {
                'did': event['subject_did'],
                'role': event.get('role'),
                'ip': event.get('ip_address'),
                'device': event.get('device_id')
            },
            'resource': {
                'type': event['resource_type'],
                'id': event['resource_id'],
                'classification': event['data_classification']
            },
            'action': event['action'],
            'result': event['result'],
            'reason': event.get('reason'),
            'previous_hash': self.current_hash
        }
        
        # 해시 계산 (불변성 보장)
        log_hash = self.calculate_hash(log_entry)
        log_entry['hash'] = log_hash
        
        # 체인에 추가
        self.log_chain.append(log_entry)
        self.current_hash = log_hash
        
        # 실시간 모니터링 알림
        if event['result'] == 'denied' or event['data_classification'] >= 3:
            self.send_alert(log_entry)
        
        return log_entry
    
    def calculate_hash(self, entry):
        """로그 엔트리 해시 계산"""
        import hashlib
        
        entry_string = json.dumps(entry, sort_keys=True)
        return hashlib.sha256(entry_string.encode()).hexdigest()

감사 로그 분석

class AuditAnalyzer:
    def __init__(self):
        self.anomaly_detector = AnomalyDetector()
        
    def analyze_access_patterns(self, time_window='24h'):
        """접근 패턴 분석"""
        
        logs = self.get_logs(time_window)
        
        analysis = {
            'total_attempts': len(logs),
            'successful': sum(1 for l in logs if l['result'] == 'granted'),
            'failed': sum(1 for l in logs if l['result'] == 'denied'),
            'suspicious_patterns': [],
            'top_accessors': {},
            'sensitive_data_access': []
        }
        
        # 이상 패턴 감지
        for log in logs:
            if self.is_suspicious(log):
                analysis['suspicious_patterns'].append({
                    'log_id': log['id'],
                    'reason': self.get_suspicion_reason(log)
                })
        
        return analysis
    
    def is_suspicious(self, log):
        """의심스러운 활동 감지"""
        
        suspicious_indicators = [
            self.check_unusual_time(log),
            self.check_rapid_attempts(log),
            self.check_privilege_escalation(log),
            self.check_data_exfiltration(log)
        ]
        
        return any(suspicious_indicators)

암호화 정책

계층별 암호화

class EncryptionPolicy:
    def __init__(self):
        self.encryption_methods = {
            'at_rest': 'AES-256-GCM',
            'in_transit': 'TLS 1.3',
            'end_to_end': 'NaCl Box',
            'quantum_resistant': 'CRYSTALS-Kyber'
        }
    
    def encrypt_data(self, data, classification_level):
        """데이터 분류에 따른 암호화"""
        
        if classification_level == 'PUBLIC':
            return data  # 암호화 불필요
            
        elif classification_level == 'INTERNAL':
            return self.aes_encrypt(data)
            
        elif classification_level == 'CONFIDENTIAL':
            encrypted = self.aes_encrypt(data)
            return self.add_integrity_check(encrypted)
            
        elif classification_level == 'RESTRICTED':
            encrypted = self.aes_encrypt(data)
            signed = self.sign_data(encrypted)
            return self.add_access_control_layer(signed)
            
        elif classification_level == 'TOP_SECRET':
            # 다중 암호화
            layer1 = self.aes_encrypt(data)
            layer2 = self.chacha20_encrypt(layer1)
            layer3 = self.quantum_resistant_encrypt(layer2)
            return layer3

데이터 최소화 원칙

자동 데이터 정제

class DataMinimization:
    def __init__(self):
        self.retention_policies = {}
        self.anonymization_rules = {}
        
    def apply_data_minimization(self, data, purpose):
        """목적에 필요한 최소 데이터만 유지"""
        
        minimized = {}
        
        # 필요한 필드만 선택
        required_fields = self.get_required_fields(purpose)
        
        for field in required_fields:
            if field in data:
                # 민감 데이터 마스킹
                if self.is_sensitive(field):
                    minimized[field] = self.mask_data(data[field])
                else:
                    minimized[field] = data[field]
        
        return minimized
    
    def auto_expire_data(self):
        """데이터 자동 만료"""
        
        current_time = datetime.now()
        
        for data_id, metadata in self.data_registry.items():
            retention_period = self.get_retention_period(
                metadata['classification']
            )
            
            if current_time - metadata['created'] > retention_period:
                # 만료 처리
                if metadata['classification'] >= 3:
                    # 안전 삭제 (덮어쓰기)
                    self.secure_delete(data_id)
                else:
                    # 일반 삭제
                    self.delete_data(data_id)

동의 관리

GDPR 준수 동의 시스템

class ConsentManager:
    def __init__(self):
        self.consents = {}
        
    def record_consent(self, user_did, consent_details):
        """동의 기록"""
        
        consent = {
            'id': uuid.uuid4().hex,
            'user': user_did,
            'timestamp': datetime.now().isoformat(),
            'purposes': consent_details['purposes'],
            'data_types': consent_details['data_types'],
            'duration': consent_details.get('duration', '1_year'),
            'withdrawal_method': 'immediate',
            'version': '1.0',
            'signature': self.sign_consent(consent_details)
        }
        
        self.consents[user_did] = consent
        
        # 블록체인에 해시 저장 (증명용)
        self.store_consent_proof(consent)
        
        return consent
    
    def check_consent(self, user_did, purpose, data_type):
        """동의 확인"""
        
        if user_did not in self.consents:
            return False
        
        consent = self.consents[user_did]
        
        # 만료 확인
        if self.is_expired(consent):
            return False
        
        # 목적과 데이터 타입 확인
        if purpose in consent['purposes'] and data_type in consent['data_types']:
            return True
        
        return False
    
    def withdraw_consent(self, user_did):
        """동의 철회"""
        
        if user_did in self.consents:
            # 철회 기록
            withdrawal = {
                'user': user_did,
                'timestamp': datetime.now().isoformat(),
                'consent_id': self.consents[user_did]['id']
            }
            
            # 관련 데이터 처리
            self.handle_data_on_withdrawal(user_did)
            
            # 동의 제거
            del self.consents[user_did]
            
            return withdrawal

실시간 모니터링

접근 제어 대시보드

class AccessControlDashboard:
    def __init__(self):
        self.metrics = {}
        self.alerts = []
        
    def real_time_monitoring(self):
        """실시간 모니터링"""
        
        dashboard_data = {
            'current_sessions': self.get_active_sessions(),
            'access_attempts': {
                'last_hour': self.count_attempts('1h'),
                'success_rate': self.calculate_success_rate('1h'),
                'denied_attempts': self.get_denied_attempts('1h')
            },
            'data_access': {
                'sensitive_access': self.track_sensitive_access(),
                'unusual_patterns': self.detect_anomalies(),
                'high_volume_users': self.identify_high_volume()
            },
            'compliance': {
                'gdpr_status': self.check_gdpr_compliance(),
                'consent_coverage': self.calculate_consent_coverage(),
                'retention_violations': self.find_retention_violations()
            },
            'security_events': {
                'failed_authentications': self.count_failed_auth(),
                'privilege_escalations': self.detect_escalations(),
                'data_exfiltration_attempts': self.detect_exfiltration()
            }
        }
        
        return dashboard_data