DOCS/_archive/gpt_docs/15_Polyglot_DB_선택_기준.md
happybell80 374a173e80 GPT_docs를 _archive로 이동 및 DB 테이블 활용 계획 문서 추가
- GPT_docs/ → _archive/gpt_docs/로 이동 (디렉토리 구조 정리)
- ideas/250818_conversation_logs_및_robing_stats_활용_계획.md 추가
  - conversation_logs, robing_stats, robing_settings 테이블 활용 방안
  - 현재 0개 레코드인 미사용 테이블들의 구현 가이드
  - 단계별 구현 계획 및 코드 예시 포함
2025-08-18 13:11:45 +09:00

9.6 KiB

Polyglot DB 선택 기준과 DB 역할 비교

Polyglot Persistence 전략

데이터 특성별 DB 선택 매트릭스

데이터 유형 추천 DB 선택 이유 대안
사용자 프로필 PostgreSQL ACID, 관계형 무결성 MySQL
실시간 상태 Redis 인메모리, 낮은 지연시간 KeyDB
대화 기록 MongoDB 문서 지향, 유연한 스키마 CouchDB
시계열 데이터 InfluxDB 시간 기반 최적화 TimescaleDB
그래프 관계 Neo4j 관계 중심 쿼리 ArangoDB
벡터 임베딩 ChromaDB 벡터 검색 최적화 Pinecone
로그 데이터 Elasticsearch 전문 검색, 분석 OpenSearch
이미지/파일 MinIO 객체 스토리지 S3

각 DB의 역할 정의

PostgreSQL - 핵심 트랜잭션 데이터

class PostgreSQLRole:
    """
    주 역할: 트랜잭션 데이터의 신뢰할 수 있는 저장소
    """
    
    use_cases = {
        'user_accounts': '사용자 계정 정보',
        'transactions': '토큰 거래 내역',
        'contracts': '스마트 컨트랙트',
        'audit_logs': '감사 로그'
    }
    
    strengths = [
        'ACID 보장',
        '복잡한 조인 쿼리',
        'JSON 지원',
        '풍부한 인덱싱 옵션'
    ]
    
    config = {
        'max_connections': 200,
        'shared_buffers': '4GB',
        'effective_cache_size': '12GB',
        'maintenance_work_mem': '1GB'
    }

Redis - 캐시와 실시간 상태

class RedisRole:
    """
    주 역할: 고속 캐시 및 실시간 상태 관리
    """
    
    use_cases = {
        'session_store': '세션 정보',
        'cache_layer': 'DB 쿼리 캐시',
        'real_time_state': '에이전트 현재 상태',
        'pub_sub': '실시간 메시징',
        'rate_limiting': 'API 속도 제한'
    }
    
    data_structures = {
        'strings': '단순 키-값',
        'hashes': '객체 저장',
        'lists': '큐, 스택',
        'sets': '유니크 항목',
        'sorted_sets': '순위 시스템',
        'streams': '이벤트 로그'
    }

MongoDB - 유연한 문서 저장소

class MongoDBRole:
    """
    주 역할: 반구조화 데이터 및 빠르게 변하는 스키마
    """
    
    collections = {
        'conversations': {
            'structure': 'nested documents',
            'indexes': ['user_id', 'timestamp'],
            'sharding_key': 'user_id'
        },
        'skills': {
            'structure': 'dynamic schema',
            'indexes': ['category', 'rating'],
            'ttl': None
        },
        'memories': {
            'structure': 'hierarchical',
            'indexes': ['agent_id', 'importance'],
            'capped': True
        }
    }

ChromaDB - 벡터 데이터베이스

class ChromaDBRole:
    """
    주 역할: 의미 검색 및 유사도 매칭
    """
    
    collections = {
        'knowledge_base': {
            'embedding_function': 'sentence-transformers',
            'dimension': 768,
            'distance_metric': 'cosine'
        },
        'memory_embeddings': {
            'embedding_function': 'custom',
            'dimension': 512,
            'distance_metric': 'l2'
        }
    }
    
    def similarity_search(self, query_vector, k=10):
        return self.collection.query(
            query_embeddings=[query_vector],
            n_results=k
        )

DB 선택 의사결정 트리

def select_database(data_requirements):
    """데이터 요구사항에 따른 DB 선택"""
    
    # 1. 트랜잭션 필요?
    if data_requirements['needs_transactions']:
        if data_requirements['complex_relations']:
            return 'PostgreSQL'
        else:
            return 'MySQL'
    
    # 2. 실시간 처리?
    if data_requirements['real_time']:
        if data_requirements['data_size'] < '1GB':
            return 'Redis'
        else:
            return 'Kafka + Redis'
    
    # 3. 검색 중심?
    if data_requirements['search_heavy']:
        if data_requirements['vector_search']:
            return 'ChromaDB'
        elif data_requirements['full_text']:
            return 'Elasticsearch'
        else:
            return 'PostgreSQL with indexes'
    
    # 4. 그래프 관계?
    if data_requirements['graph_relations']:
        return 'Neo4j'
    
    # 5. 시계열?
    if data_requirements['time_series']:
        return 'InfluxDB'
    
    # 기본값
    return 'PostgreSQL'

하이브리드 아키텍처

다중 DB 통합 패턴

class HybridDataArchitecture:
    def __init__(self):
        self.primary_db = PostgreSQL()     # 주 데이터
        self.cache_db = Redis()            # 캐시
        self.search_db = Elasticsearch()   # 검색
        self.vector_db = ChromaDB()        # 벡터
        self.graph_db = Neo4j()           # 관계
        
    async def write_operation(self, data):
        """쓰기 작업: 여러 DB에 분산"""
        
        # 1. 주 데이터는 PostgreSQL
        await self.primary_db.insert(data['core'])
        
        # 2. 캐시 무효화
        await self.cache_db.invalidate(data['cache_keys'])
        
        # 3. 검색 인덱스 업데이트
        await self.search_db.index(data['searchable'])
        
        # 4. 벡터 임베딩 저장
        if data.get('embeddings'):
            await self.vector_db.upsert(data['embeddings'])
        
        # 5. 관계 업데이트
        if data.get('relationships'):
            await self.graph_db.create_edges(data['relationships'])
    
    async def read_operation(self, query):
        """읽기 작업: 최적 DB 선택"""
        
        # 1. 캐시 확인
        cached = await self.cache_db.get(query['cache_key'])
        if cached:
            return cached
        
        # 2. 쿼리 유형별 DB 선택
        if query['type'] == 'similarity':
            result = await self.vector_db.search(query)
        elif query['type'] == 'relationship':
            result = await self.graph_db.traverse(query)
        elif query['type'] == 'full_text':
            result = await self.search_db.search(query)
        else:
            result = await self.primary_db.query(query)
        
        # 3. 캐시 저장
        await self.cache_db.set(query['cache_key'], result)
        
        return result

데이터 일관성 전략

Eventually Consistent 패턴

class DataConsistencyManager:
    def __init__(self):
        self.sync_queue = asyncio.Queue()
        
    async def eventual_consistency_sync(self):
        """최종 일관성 동기화"""
        
        while True:
            sync_task = await self.sync_queue.get()
            
            try:
                # 비동기로 다른 DB 업데이트
                if sync_task['type'] == 'propagate':
                    await self.propagate_changes(sync_task)
                elif sync_task['type'] == 'reconcile':
                    await self.reconcile_differences(sync_task)
                    
            except Exception as e:
                # 실패 시 재시도 큐에 추가
                await self.retry_queue.put(sync_task)
    
    async def propagate_changes(self, task):
        """변경사항 전파"""
        
        source_db = task['source']
        target_dbs = task['targets']
        data = task['data']
        
        for target in target_dbs:
            await target.update(data)

성능 최적화 전략

DB별 최적화 기법

class DatabaseOptimization:
    
    @staticmethod
    def optimize_postgresql():
        return {
            'indexes': 'CREATE INDEX CONCURRENTLY',
            'partitioning': 'PARTITION BY RANGE',
            'vacuum': 'VACUUM ANALYZE',
            'connection_pooling': 'pgbouncer'
        }
    
    @staticmethod
    def optimize_mongodb():
        return {
            'indexes': 'compound indexes on frequent queries',
            'sharding': 'horizontal scaling',
            'aggregation_pipeline': 'optimize stages order',
            'projection': 'return only needed fields'
        }
    
    @staticmethod
    def optimize_redis():
        return {
            'memory': 'use appropriate data structures',
            'persistence': 'AOF vs RDB tradeoff',
            'eviction': 'LRU or LFU policy',
            'clustering': 'Redis Cluster for scaling'
        }

모니터링과 관리

통합 모니터링 대시보드

class PolyglotMonitoring:
    def __init__(self):
        self.metrics = {}
        
    def collect_metrics(self):
        """모든 DB에서 메트릭 수집"""
        
        metrics = {
            'postgresql': {
                'connections': self.pg_client.get_connection_count(),
                'query_time': self.pg_client.get_avg_query_time(),
                'cache_hit_ratio': self.pg_client.get_cache_hit_ratio()
            },
            'redis': {
                'memory_usage': self.redis_client.memory_usage(),
                'hit_rate': self.redis_client.hit_rate(),
                'evicted_keys': self.redis_client.evicted_keys()
            },
            'mongodb': {
                'operations_per_sec': self.mongo_client.get_ops_per_sec(),
                'document_count': self.mongo_client.count_documents(),
                'index_usage': self.mongo_client.get_index_usage()
            },
            'chromadb': {
                'vector_count': self.chroma_client.count(),
                'search_latency': self.chroma_client.get_search_latency(),
                'index_size': self.chroma_client.get_index_size()
            }
        }
        
        return metrics