# 데이터 접근 제어 정책 및 감사 로그 관리 ## 데이터 분류 체계 ### 데이터 민감도 레벨 ```python class DataClassification: LEVELS = { 'PUBLIC': { 'level': 0, 'description': '공개 정보', 'encryption': False, 'access_control': 'open', 'retention': 'indefinite', 'examples': ['공개 프로필', '통계 데이터'] }, 'INTERNAL': { 'level': 1, 'description': '내부 사용', 'encryption': 'at_rest', 'access_control': 'authenticated', 'retention': '2_years', 'examples': ['작업 로그', '성능 메트릭'] }, 'CONFIDENTIAL': { 'level': 2, 'description': '기밀', 'encryption': 'at_rest_and_transit', 'access_control': 'role_based', 'retention': '1_year', 'examples': ['사용자 대화', '개인 메모'] }, 'RESTRICTED': { 'level': 3, 'description': '제한됨', 'encryption': 'end_to_end', 'access_control': 'attribute_based', 'retention': '6_months', 'examples': ['금융 정보', '의료 기록'] }, 'TOP_SECRET': { 'level': 4, 'description': '극비', 'encryption': 'quantum_resistant', 'access_control': 'zero_trust', 'retention': '30_days', 'examples': ['인증 키', '개인정보'] } } ``` ## 접근 제어 모델 ### RBAC + ABAC 하이브리드 ```python class HybridAccessControl: def __init__(self): self.roles = {} self.attributes = {} self.policies = [] def define_role(self, role_name, permissions): """역할 정의""" self.roles[role_name] = { 'permissions': permissions, 'created': datetime.now(), 'constraints': [] } def define_attribute_policy(self, policy): """속성 기반 정책 정의""" # 예시: 시간 기반 접근 제어 time_based_policy = { 'name': 'business_hours_only', 'conditions': [ { 'attribute': 'current_time', 'operator': 'between', 'value': ['09:00', '18:00'] }, { 'attribute': 'user_location', 'operator': 'in', 'value': ['office', 'home'] } ], 'effect': 'allow' } self.policies.append(policy) def evaluate_access(self, subject, resource, action): """접근 권한 평가""" # 1. 역할 기반 체크 role_check = self.check_role_permissions(subject, resource, action) # 2. 속성 기반 체크 attribute_check = self.check_attributes(subject, resource, action) # 3. 컨텍스트 체크 context_check = self.check_context(subject, resource) # 최종 결정 if role_check and attribute_check and context_check: return True, "Access granted" else: return False, "Access denied" ``` ## 감사 로그 시스템 ### 불변 감사 로그 ```python class AuditLogger: def __init__(self): self.log_chain = [] # 블록체인 스타일 로그 self.current_hash = "0" def log_access_attempt(self, event): """접근 시도 로깅""" log_entry = { 'id': uuid.uuid4().hex, 'timestamp': datetime.now().isoformat(), 'event_type': event['type'], 'subject': { 'did': event['subject_did'], 'role': event.get('role'), 'ip': event.get('ip_address'), 'device': event.get('device_id') }, 'resource': { 'type': event['resource_type'], 'id': event['resource_id'], 'classification': event['data_classification'] }, 'action': event['action'], 'result': event['result'], 'reason': event.get('reason'), 'previous_hash': self.current_hash } # 해시 계산 (불변성 보장) log_hash = self.calculate_hash(log_entry) log_entry['hash'] = log_hash # 체인에 추가 self.log_chain.append(log_entry) self.current_hash = log_hash # 실시간 모니터링 알림 if event['result'] == 'denied' or event['data_classification'] >= 3: self.send_alert(log_entry) return log_entry def calculate_hash(self, entry): """로그 엔트리 해시 계산""" import hashlib entry_string = json.dumps(entry, sort_keys=True) return hashlib.sha256(entry_string.encode()).hexdigest() ``` ### 감사 로그 분석 ```python class AuditAnalyzer: def __init__(self): self.anomaly_detector = AnomalyDetector() def analyze_access_patterns(self, time_window='24h'): """접근 패턴 분석""" logs = self.get_logs(time_window) analysis = { 'total_attempts': len(logs), 'successful': sum(1 for l in logs if l['result'] == 'granted'), 'failed': sum(1 for l in logs if l['result'] == 'denied'), 'suspicious_patterns': [], 'top_accessors': {}, 'sensitive_data_access': [] } # 이상 패턴 감지 for log in logs: if self.is_suspicious(log): analysis['suspicious_patterns'].append({ 'log_id': log['id'], 'reason': self.get_suspicion_reason(log) }) return analysis def is_suspicious(self, log): """의심스러운 활동 감지""" suspicious_indicators = [ self.check_unusual_time(log), self.check_rapid_attempts(log), self.check_privilege_escalation(log), self.check_data_exfiltration(log) ] return any(suspicious_indicators) ``` ## 암호화 정책 ### 계층별 암호화 ```python class EncryptionPolicy: def __init__(self): self.encryption_methods = { 'at_rest': 'AES-256-GCM', 'in_transit': 'TLS 1.3', 'end_to_end': 'NaCl Box', 'quantum_resistant': 'CRYSTALS-Kyber' } def encrypt_data(self, data, classification_level): """데이터 분류에 따른 암호화""" if classification_level == 'PUBLIC': return data # 암호화 불필요 elif classification_level == 'INTERNAL': return self.aes_encrypt(data) elif classification_level == 'CONFIDENTIAL': encrypted = self.aes_encrypt(data) return self.add_integrity_check(encrypted) elif classification_level == 'RESTRICTED': encrypted = self.aes_encrypt(data) signed = self.sign_data(encrypted) return self.add_access_control_layer(signed) elif classification_level == 'TOP_SECRET': # 다중 암호화 layer1 = self.aes_encrypt(data) layer2 = self.chacha20_encrypt(layer1) layer3 = self.quantum_resistant_encrypt(layer2) return layer3 ``` ## 데이터 최소화 원칙 ### 자동 데이터 정제 ```python class DataMinimization: def __init__(self): self.retention_policies = {} self.anonymization_rules = {} def apply_data_minimization(self, data, purpose): """목적에 필요한 최소 데이터만 유지""" minimized = {} # 필요한 필드만 선택 required_fields = self.get_required_fields(purpose) for field in required_fields: if field in data: # 민감 데이터 마스킹 if self.is_sensitive(field): minimized[field] = self.mask_data(data[field]) else: minimized[field] = data[field] return minimized def auto_expire_data(self): """데이터 자동 만료""" current_time = datetime.now() for data_id, metadata in self.data_registry.items(): retention_period = self.get_retention_period( metadata['classification'] ) if current_time - metadata['created'] > retention_period: # 만료 처리 if metadata['classification'] >= 3: # 안전 삭제 (덮어쓰기) self.secure_delete(data_id) else: # 일반 삭제 self.delete_data(data_id) ``` ## 동의 관리 ### GDPR 준수 동의 시스템 ```python class ConsentManager: def __init__(self): self.consents = {} def record_consent(self, user_did, consent_details): """동의 기록""" consent = { 'id': uuid.uuid4().hex, 'user': user_did, 'timestamp': datetime.now().isoformat(), 'purposes': consent_details['purposes'], 'data_types': consent_details['data_types'], 'duration': consent_details.get('duration', '1_year'), 'withdrawal_method': 'immediate', 'version': '1.0', 'signature': self.sign_consent(consent_details) } self.consents[user_did] = consent # 블록체인에 해시 저장 (증명용) self.store_consent_proof(consent) return consent def check_consent(self, user_did, purpose, data_type): """동의 확인""" if user_did not in self.consents: return False consent = self.consents[user_did] # 만료 확인 if self.is_expired(consent): return False # 목적과 데이터 타입 확인 if purpose in consent['purposes'] and data_type in consent['data_types']: return True return False def withdraw_consent(self, user_did): """동의 철회""" if user_did in self.consents: # 철회 기록 withdrawal = { 'user': user_did, 'timestamp': datetime.now().isoformat(), 'consent_id': self.consents[user_did]['id'] } # 관련 데이터 처리 self.handle_data_on_withdrawal(user_did) # 동의 제거 del self.consents[user_did] return withdrawal ``` ## 실시간 모니터링 ### 접근 제어 대시보드 ```python class AccessControlDashboard: def __init__(self): self.metrics = {} self.alerts = [] def real_time_monitoring(self): """실시간 모니터링""" dashboard_data = { 'current_sessions': self.get_active_sessions(), 'access_attempts': { 'last_hour': self.count_attempts('1h'), 'success_rate': self.calculate_success_rate('1h'), 'denied_attempts': self.get_denied_attempts('1h') }, 'data_access': { 'sensitive_access': self.track_sensitive_access(), 'unusual_patterns': self.detect_anomalies(), 'high_volume_users': self.identify_high_volume() }, 'compliance': { 'gdpr_status': self.check_gdpr_compliance(), 'consent_coverage': self.calculate_consent_coverage(), 'retention_violations': self.find_retention_violations() }, 'security_events': { 'failed_authentications': self.count_failed_auth(), 'privilege_escalations': self.detect_escalations(), 'data_exfiltration_attempts': self.detect_exfiltration() } } return dashboard_data ```