# conversation_logs 및 robing_stats 테이블 활용 계획 작성일: 2025년 8월 18일 작성자: Claude (51123 서버) 상태: 미구현 테이블 활용 방안 ## 1. 문제 현황 ### 1.1 미사용 테이블 현황 ```sql -- main_db에 존재하지만 전혀 사용되지 않는 테이블들 conversation_logs: 0 records (0 KB) robing_stats: 0 records (0 KB) robing_settings: 0 records (0 KB) ``` ### 1.2 코드 검사 결과 - Auth 서버: 해당 테이블 관련 코드 없음 - rb8001, rb10508: 해당 테이블 사용 코드 없음 - 결론: **테이블만 생성되고 구현이 전혀 되지 않은 상태** ### 1.3 문제점 1. **대화 기록 누락**: 모든 사용자 대화가 휘발성으로만 처리됨 2. **성장 데이터 누락**: 로빙의 레벨, 경험치, 스탯이 저장되지 않음 3. **개인화 불가**: 사용자별 설정이 저장되지 않음 4. **분석 불가**: 사용 패턴, 성능 지표 추적 불가 ## 2. conversation_logs 테이블 활용 방안 ### 2.1 테이블 구조 (현재) ```sql CREATE TABLE conversation_logs ( id UUID PRIMARY KEY, robing_id VARCHAR(100), user_id UUID, message TEXT, response TEXT, created_at TIMESTAMP, metadata JSON ); ``` ### 2.2 구현 필요 사항 #### 로빙 서비스 (rb8001, rb10508 등)에서 구현 ```python # app/services/conversation_logger.py (신규) from typing import Optional import uuid from datetime import datetime import asyncpg class ConversationLogger: def __init__(self, db_url: str): self.db_url = db_url async def log_conversation( self, robing_id: str, user_id: str, message: str, response: str, metadata: Optional[dict] = None ): """대화 내용을 DB에 저장""" async with asyncpg.connect(self.db_url) as conn: await conn.execute(""" INSERT INTO conversation_logs (id, robing_id, user_id, message, response, created_at, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7) """, uuid.uuid4(), robing_id, user_id, message, response, datetime.now(), metadata or {} ) ``` #### Slack 메시지 처리 부분에 통합 ```python # app/services/slack_handler.py (수정) async def handle_message(event: dict): user_message = event['text'] user_id = await get_user_id_from_slack(event['user']) # 기존 처리 response = await process_with_llm(user_message) # 신규: 대화 로그 저장 await conversation_logger.log_conversation( robing_id=ROBING_ID, # rb8001, rb10508 등 user_id=user_id, message=user_message, response=response, metadata={ 'slack_channel': event['channel'], 'slack_ts': event['ts'], 'thread_ts': event.get('thread_ts') } ) return response ``` ### 2.3 활용 방안 1. **대화 히스토리 조회**: 사용자별 과거 대화 내역 2. **컨텍스트 유지**: 이전 대화 참조하여 연속성 확보 3. **품질 분석**: 응답 품질 모니터링 4. **사용 패턴 분석**: 자주 묻는 질문, 피크 시간대 등 ## 3. robing_stats 테이블 활용 방안 ### 3.1 테이블 구조 (현재) ```sql CREATE TABLE robing_stats ( id UUID PRIMARY KEY, robing_id VARCHAR(100) UNIQUE, level INTEGER DEFAULT 1, experience INTEGER DEFAULT 0, memory_stat INTEGER DEFAULT 10, compute_stat INTEGER DEFAULT 10, empathy_stat INTEGER DEFAULT 10, leadership_stat INTEGER DEFAULT 10, ethics_stat INTEGER DEFAULT 10, total_conversations INTEGER DEFAULT 0, successful_tasks INTEGER DEFAULT 0, failed_tasks INTEGER DEFAULT 0, created_at TIMESTAMP, updated_at TIMESTAMP ); ``` ### 3.2 구현 필요 사항 #### 스탯 관리 서비스 ```python # app/services/stats_manager.py (신규) class StatsManager: def __init__(self, db_url: str, robing_id: str): self.db_url = db_url self.robing_id = robing_id async def initialize_stats(self): """로빙 스탯 초기화""" async with asyncpg.connect(self.db_url) as conn: await conn.execute(""" INSERT INTO robing_stats (id, robing_id, created_at, updated_at) VALUES ($1, $2, $3, $3) ON CONFLICT (robing_id) DO NOTHING """, uuid.uuid4(), self.robing_id, datetime.now()) async def add_experience(self, xp: int): """경험치 추가 및 레벨업 체크""" async with asyncpg.connect(self.db_url) as conn: # 현재 스탯 조회 stats = await conn.fetchrow( "SELECT * FROM robing_stats WHERE robing_id = $1", self.robing_id ) new_xp = stats['experience'] + xp new_level = self.calculate_level(new_xp) # 레벨업 시 스탯 포인트 분배 if new_level > stats['level']: stat_points = (new_level - stats['level']) * 3 await self.distribute_stat_points(stat_points) # 경험치와 레벨 업데이트 await conn.execute(""" UPDATE robing_stats SET experience = $1, level = $2, updated_at = $3 WHERE robing_id = $4 """, new_xp, new_level, datetime.now(), self.robing_id) def calculate_level(self, xp: int) -> int: """경험치로 레벨 계산""" # 레벨당 필요 경험치: 100 * level^1.5 level = 1 while xp >= 100 * (level ** 1.5): level += 1 return level ``` #### 작업 완료 시 경험치 획득 ```python # 작업 성공 시 호출 async def on_task_success(task_type: str, difficulty: int): xp_gained = calculate_xp(task_type, difficulty) await stats_manager.add_experience(xp_gained) # 성공 카운트 증가 await conn.execute(""" UPDATE robing_stats SET successful_tasks = successful_tasks + 1 WHERE robing_id = $1 """, ROBING_ID) ``` ### 3.3 스탯 활용 방안 1. **레벨 기반 기능 해금**: 특정 레벨 도달 시 새 기능 활성화 2. **스탯 기반 처리**: memory_stat이 높으면 더 긴 컨텍스트 유지 3. **성장 시각화**: 프론트엔드에서 레벨/스탯 표시 4. **게이미피케이션**: 업적, 뱃지 시스템 ## 4. robing_settings 테이블 활용 방안 ### 4.1 테이블 구조 (현재) ```sql CREATE TABLE robing_settings ( id UUID PRIMARY KEY, robing_id VARCHAR(100) UNIQUE, user_preferences JSON, system_settings JSON, skill_settings JSON, created_at TIMESTAMP, updated_at TIMESTAMP ); ``` ### 4.2 구현 필요 사항 #### 설정 관리 서비스 ```python # app/services/settings_manager.py (신규) class SettingsManager: def __init__(self, db_url: str, robing_id: str): self.db_url = db_url self.robing_id = robing_id self._cache = {} async def get_setting(self, category: str, key: str, default=None): """설정 값 조회""" if not self._cache: await self._load_settings() category_settings = self._cache.get(f"{category}_settings", {}) return category_settings.get(key, default) async def update_setting(self, category: str, key: str, value): """설정 값 업데이트""" settings_field = f"{category}_settings" async with asyncpg.connect(self.db_url) as conn: # 현재 설정 조회 current = await conn.fetchrow( f"SELECT {settings_field} FROM robing_settings WHERE robing_id = $1", self.robing_id ) settings = current[settings_field] or {} settings[key] = value # 업데이트 await conn.execute(f""" UPDATE robing_settings SET {settings_field} = $1, updated_at = $2 WHERE robing_id = $3 """, settings, datetime.now(), self.robing_id) # 캐시 갱신 self._cache[settings_field] = settings ``` ### 4.3 설정 예시 ```python # 사용자 선호 설정 user_preferences = { "response_style": "formal", # formal/casual/friendly "response_length": "detailed", # brief/normal/detailed "language": "ko", "timezone": "Asia/Seoul" } # 시스템 설정 system_settings = { "max_memory_size": 1000, # 최대 기억 개수 "memory_ttl_days": 30, # 기억 보존 기간 "auto_summarize": True, # 자동 요약 여부 "debug_mode": False } # 스킬 설정 skill_settings = { "email": { "enabled": True, "check_interval": 300, # 5분 "auto_reply": False }, "news": { "enabled": True, "categories": ["tech", "business"], "summary_length": 3 } } ``` ## 5. 구현 우선순위 ### Phase 1: 즉시 구현 (1주일) 1. **conversation_logs 저장 기능** - 모든 대화를 DB에 저장 - 메타데이터 포함 (채널, 시간 등) 2. **robing_stats 초기화** - 각 로빙별 스탯 레코드 생성 - 대화 카운트 업데이트 ### Phase 2: 핵심 기능 (2주차) 1. **경험치 시스템** - 작업 완료 시 XP 획득 - 레벨업 로직 2. **설정 시스템** - 기본 설정 저장/조회 - 사용자별 커스터마이징 ### Phase 3: 고급 기능 (3주차) 1. **대화 컨텍스트 활용** - 이전 대화 참조 - 장기 기억 형성 2. **스탯 기반 동작** - 레벨별 기능 차별화 - 스탯별 처리 로직 ## 6. 성능 고려사항 ### 6.1 인덱스 추가 필요 ```sql -- conversation_logs 빠른 조회 CREATE INDEX idx_conversation_user_time ON conversation_logs(user_id, created_at DESC); CREATE INDEX idx_conversation_robing_time ON conversation_logs(robing_id, created_at DESC); -- robing_stats 빠른 조회 CREATE INDEX idx_robing_stats_robing ON robing_stats(robing_id); ``` ### 6.2 배치 처리 - 대화 로그는 즉시 저장하되, 통계 업데이트는 배치로 처리 - 5분마다 집계하여 robing_stats 업데이트 ### 6.3 캐싱 - robing_settings는 메모리 캐시 유지 - robing_stats는 Redis 캐시 활용 ## 7. 모니터링 지표 ### 7.1 테이블 사용률 ```sql -- 일일 대화량 SELECT DATE(created_at), COUNT(*) FROM conversation_logs GROUP BY DATE(created_at); -- 로빙별 활동 SELECT robing_id, total_conversations, level FROM robing_stats; ``` ### 7.2 성능 지표 - 로그 저장 지연 시간 - 스탯 업데이트 주기 - 캐시 히트율 ## 8. 예상 효과 ### 8.1 즉시 효과 - 모든 대화 추적 가능 - 사용 패턴 분석 가능 - 문제 발생 시 디버깅 용이 ### 8.2 중기 효과 - 사용자별 맞춤 서비스 - 로빙 성장 시각화 - 데이터 기반 개선 ### 8.3 장기 효과 - AI 학습 데이터 축적 - 서비스 품질 지속 개선 - 사용자 만족도 향상 ## 9. 주의사항 ### 9.1 개인정보 보호 - conversation_logs에 민감 정보 마스킹 - GDPR 준수 (삭제 요청 처리) - 암호화 고려 ### 9.2 데이터 증가 대응 - 오래된 로그 아카이빙 - 파티셔닝 고려 - 정기적인 정리 작업 ### 9.3 동시성 제어 - 스탯 업데이트 시 락 처리 - 트랜잭션 격리 수준 설정 ## 10. 테스트 계획 ### 10.1 단위 테스트 - 각 테이블 CRUD 테스트 - 경험치 계산 로직 테스트 - 설정 캐싱 테스트 ### 10.2 통합 테스트 - Slack 메시지 → DB 저장 플로우 - 레벨업 → 스탯 증가 플로우 - 설정 변경 → 동작 변경 플로우 ### 10.3 부하 테스트 - 대량 대화 로그 저장 - 동시 다발적 스탯 업데이트 - 캐시 무효화 시나리오 --- *이 문서는 현재 미사용 중인 테이블들을 활성화하기 위한 구현 가이드입니다.* *로컬 개발자는 이 가이드를 참고하여 단계적으로 구현을 진행하시기 바랍니다.*