- GPT_docs/ → _archive/gpt_docs/로 이동 (디렉토리 구조 정리) - ideas/250818_conversation_logs_및_robing_stats_활용_계획.md 추가 - conversation_logs, robing_stats, robing_settings 테이블 활용 방안 - 현재 0개 레코드인 미사용 테이블들의 구현 가이드 - 단계별 구현 계획 및 코드 예시 포함
12 KiB
12 KiB
데이터 접근 제어 정책 및 감사 로그 관리
데이터 분류 체계
데이터 민감도 레벨
class DataClassification:
LEVELS = {
'PUBLIC': {
'level': 0,
'description': '공개 정보',
'encryption': False,
'access_control': 'open',
'retention': 'indefinite',
'examples': ['공개 프로필', '통계 데이터']
},
'INTERNAL': {
'level': 1,
'description': '내부 사용',
'encryption': 'at_rest',
'access_control': 'authenticated',
'retention': '2_years',
'examples': ['작업 로그', '성능 메트릭']
},
'CONFIDENTIAL': {
'level': 2,
'description': '기밀',
'encryption': 'at_rest_and_transit',
'access_control': 'role_based',
'retention': '1_year',
'examples': ['사용자 대화', '개인 메모']
},
'RESTRICTED': {
'level': 3,
'description': '제한됨',
'encryption': 'end_to_end',
'access_control': 'attribute_based',
'retention': '6_months',
'examples': ['금융 정보', '의료 기록']
},
'TOP_SECRET': {
'level': 4,
'description': '극비',
'encryption': 'quantum_resistant',
'access_control': 'zero_trust',
'retention': '30_days',
'examples': ['인증 키', '개인정보']
}
}
접근 제어 모델
RBAC + ABAC 하이브리드
class HybridAccessControl:
def __init__(self):
self.roles = {}
self.attributes = {}
self.policies = []
def define_role(self, role_name, permissions):
"""역할 정의"""
self.roles[role_name] = {
'permissions': permissions,
'created': datetime.now(),
'constraints': []
}
def define_attribute_policy(self, policy):
"""속성 기반 정책 정의"""
# 예시: 시간 기반 접근 제어
time_based_policy = {
'name': 'business_hours_only',
'conditions': [
{
'attribute': 'current_time',
'operator': 'between',
'value': ['09:00', '18:00']
},
{
'attribute': 'user_location',
'operator': 'in',
'value': ['office', 'home']
}
],
'effect': 'allow'
}
self.policies.append(policy)
def evaluate_access(self, subject, resource, action):
"""접근 권한 평가"""
# 1. 역할 기반 체크
role_check = self.check_role_permissions(subject, resource, action)
# 2. 속성 기반 체크
attribute_check = self.check_attributes(subject, resource, action)
# 3. 컨텍스트 체크
context_check = self.check_context(subject, resource)
# 최종 결정
if role_check and attribute_check and context_check:
return True, "Access granted"
else:
return False, "Access denied"
감사 로그 시스템
불변 감사 로그
class AuditLogger:
def __init__(self):
self.log_chain = [] # 블록체인 스타일 로그
self.current_hash = "0"
def log_access_attempt(self, event):
"""접근 시도 로깅"""
log_entry = {
'id': uuid.uuid4().hex,
'timestamp': datetime.now().isoformat(),
'event_type': event['type'],
'subject': {
'did': event['subject_did'],
'role': event.get('role'),
'ip': event.get('ip_address'),
'device': event.get('device_id')
},
'resource': {
'type': event['resource_type'],
'id': event['resource_id'],
'classification': event['data_classification']
},
'action': event['action'],
'result': event['result'],
'reason': event.get('reason'),
'previous_hash': self.current_hash
}
# 해시 계산 (불변성 보장)
log_hash = self.calculate_hash(log_entry)
log_entry['hash'] = log_hash
# 체인에 추가
self.log_chain.append(log_entry)
self.current_hash = log_hash
# 실시간 모니터링 알림
if event['result'] == 'denied' or event['data_classification'] >= 3:
self.send_alert(log_entry)
return log_entry
def calculate_hash(self, entry):
"""로그 엔트리 해시 계산"""
import hashlib
entry_string = json.dumps(entry, sort_keys=True)
return hashlib.sha256(entry_string.encode()).hexdigest()
감사 로그 분석
class AuditAnalyzer:
def __init__(self):
self.anomaly_detector = AnomalyDetector()
def analyze_access_patterns(self, time_window='24h'):
"""접근 패턴 분석"""
logs = self.get_logs(time_window)
analysis = {
'total_attempts': len(logs),
'successful': sum(1 for l in logs if l['result'] == 'granted'),
'failed': sum(1 for l in logs if l['result'] == 'denied'),
'suspicious_patterns': [],
'top_accessors': {},
'sensitive_data_access': []
}
# 이상 패턴 감지
for log in logs:
if self.is_suspicious(log):
analysis['suspicious_patterns'].append({
'log_id': log['id'],
'reason': self.get_suspicion_reason(log)
})
return analysis
def is_suspicious(self, log):
"""의심스러운 활동 감지"""
suspicious_indicators = [
self.check_unusual_time(log),
self.check_rapid_attempts(log),
self.check_privilege_escalation(log),
self.check_data_exfiltration(log)
]
return any(suspicious_indicators)
암호화 정책
계층별 암호화
class EncryptionPolicy:
def __init__(self):
self.encryption_methods = {
'at_rest': 'AES-256-GCM',
'in_transit': 'TLS 1.3',
'end_to_end': 'NaCl Box',
'quantum_resistant': 'CRYSTALS-Kyber'
}
def encrypt_data(self, data, classification_level):
"""데이터 분류에 따른 암호화"""
if classification_level == 'PUBLIC':
return data # 암호화 불필요
elif classification_level == 'INTERNAL':
return self.aes_encrypt(data)
elif classification_level == 'CONFIDENTIAL':
encrypted = self.aes_encrypt(data)
return self.add_integrity_check(encrypted)
elif classification_level == 'RESTRICTED':
encrypted = self.aes_encrypt(data)
signed = self.sign_data(encrypted)
return self.add_access_control_layer(signed)
elif classification_level == 'TOP_SECRET':
# 다중 암호화
layer1 = self.aes_encrypt(data)
layer2 = self.chacha20_encrypt(layer1)
layer3 = self.quantum_resistant_encrypt(layer2)
return layer3
데이터 최소화 원칙
자동 데이터 정제
class DataMinimization:
def __init__(self):
self.retention_policies = {}
self.anonymization_rules = {}
def apply_data_minimization(self, data, purpose):
"""목적에 필요한 최소 데이터만 유지"""
minimized = {}
# 필요한 필드만 선택
required_fields = self.get_required_fields(purpose)
for field in required_fields:
if field in data:
# 민감 데이터 마스킹
if self.is_sensitive(field):
minimized[field] = self.mask_data(data[field])
else:
minimized[field] = data[field]
return minimized
def auto_expire_data(self):
"""데이터 자동 만료"""
current_time = datetime.now()
for data_id, metadata in self.data_registry.items():
retention_period = self.get_retention_period(
metadata['classification']
)
if current_time - metadata['created'] > retention_period:
# 만료 처리
if metadata['classification'] >= 3:
# 안전 삭제 (덮어쓰기)
self.secure_delete(data_id)
else:
# 일반 삭제
self.delete_data(data_id)
동의 관리
GDPR 준수 동의 시스템
class ConsentManager:
def __init__(self):
self.consents = {}
def record_consent(self, user_did, consent_details):
"""동의 기록"""
consent = {
'id': uuid.uuid4().hex,
'user': user_did,
'timestamp': datetime.now().isoformat(),
'purposes': consent_details['purposes'],
'data_types': consent_details['data_types'],
'duration': consent_details.get('duration', '1_year'),
'withdrawal_method': 'immediate',
'version': '1.0',
'signature': self.sign_consent(consent_details)
}
self.consents[user_did] = consent
# 블록체인에 해시 저장 (증명용)
self.store_consent_proof(consent)
return consent
def check_consent(self, user_did, purpose, data_type):
"""동의 확인"""
if user_did not in self.consents:
return False
consent = self.consents[user_did]
# 만료 확인
if self.is_expired(consent):
return False
# 목적과 데이터 타입 확인
if purpose in consent['purposes'] and data_type in consent['data_types']:
return True
return False
def withdraw_consent(self, user_did):
"""동의 철회"""
if user_did in self.consents:
# 철회 기록
withdrawal = {
'user': user_did,
'timestamp': datetime.now().isoformat(),
'consent_id': self.consents[user_did]['id']
}
# 관련 데이터 처리
self.handle_data_on_withdrawal(user_did)
# 동의 제거
del self.consents[user_did]
return withdrawal
실시간 모니터링
접근 제어 대시보드
class AccessControlDashboard:
def __init__(self):
self.metrics = {}
self.alerts = []
def real_time_monitoring(self):
"""실시간 모니터링"""
dashboard_data = {
'current_sessions': self.get_active_sessions(),
'access_attempts': {
'last_hour': self.count_attempts('1h'),
'success_rate': self.calculate_success_rate('1h'),
'denied_attempts': self.get_denied_attempts('1h')
},
'data_access': {
'sensitive_access': self.track_sensitive_access(),
'unusual_patterns': self.detect_anomalies(),
'high_volume_users': self.identify_high_volume()
},
'compliance': {
'gdpr_status': self.check_gdpr_compliance(),
'consent_coverage': self.calculate_consent_coverage(),
'retention_violations': self.find_retention_violations()
},
'security_events': {
'failed_authentications': self.count_failed_auth(),
'privilege_escalations': self.detect_escalations(),
'data_exfiltration_attempts': self.detect_exfiltration()
}
}
return dashboard_data