diff --git a/troubleshooting/251014_coldmail_ir_analysis_scenario.md b/troubleshooting/251014_coldmail_ir_analysis_scenario.md index 75a5326..b243f0e 100644 --- a/troubleshooting/251014_coldmail_ir_analysis_scenario.md +++ b/troubleshooting/251014_coldmail_ir_analysis_scenario.md @@ -70,23 +70,25 @@ ### 우선순위 4: LangGraph 워크플로우 - requirements.txt: langgraph 추가 -- app/workflows/__init__.py: 빈 파일 생성 -- app/workflows/coldmail_workflow.py 생성: - - from typing import TypedDict; from langgraph.graph import StateGraph, END; import aiohttp; from datetime import datetime +- app/services/coldmail_email_fetcher.py 생성 (coldmail_briefing.py:93-121 분리, 300줄 제한 준수) + - async def fetch_emails(user_id, start_time, now): skill-email API 호출, return emails +- app/services/coldmail_processor.py 생성 (coldmail_briefing.py:145-202 분리) + - async def process_coldmail(email, user_id): 첨부파일 처리, IR 분석, Slack Lists 생성, return result or None +- app/services/workflows/__init__.py: 빈 파일 생성 +- app/services/workflows/coldmail_workflow.py 생성: + - from typing import TypedDict; from langgraph.graph import StateGraph, END; from app.services import coldmail_email_fetcher, coldmail_processor, coldmail_hybrid_filter - class ColdmailState(TypedDict): emails: list, coldmail_candidates: list, processed_results: list, user_id: str, start_time: str, now: str - - async def fetch_emails_node(state): user_id = state["user_id"], start_time/now = state["start_time/now"], async with aiohttp.ClientSession() as session, params에 user_id/startSearchDate(start_time)/endSearchDate(now) 사용, coldmail_briefing.py:103-121 로직, return {"emails": emails} - - async def filter_coldmail_node(state): for email in state["emails"] 반복, coldmail_briefing.py:125-139 필터 로직, return {"coldmail_candidates": candidates} - - async def process_email_node(state): user_id = state["user_id"], async with aiohttp.ClientSession() as session, for email in state["coldmail_candidates"] 반복, detail_params/process_naverworks_attachments/get_team_uuid_from_user에 user_id 사용, coldmail_briefing.py:145-202 처리, try-except로 IR 실패 시 continue, return {"processed_results": results} - - async def send_summary_node(state): start_time = datetime.fromisoformat(state["start_time"]), now = datetime.fromisoformat(state["now"]), async with aiohttp.ClientSession() as session, summary_text 포맷에 start_time.strftime/now.strftime 사용, coldmail_briefing.py:205-215 요약 전송 + - async def fetch_emails_node(state): emails = await coldmail_email_fetcher.fetch_emails(state["user_id"], state["start_time"], state["now"]), return {"emails": emails} + - async def filter_coldmail_node(state): candidates = [], for email in state["emails"]: result = await coldmail_hybrid_filter.hybrid_coldmail_filter(...), if is_coldmail: candidates.append(email), return {"coldmail_candidates": candidates} + - async def process_email_node(state): results = [], for email in state["coldmail_candidates"]: result = await coldmail_processor.process_coldmail(email, state["user_id"]), if result: results.append(result), return {"processed_results": results} + - async def send_summary_node(state): start_time/now = datetime.fromisoformat(...), async with aiohttp.ClientSession() as session, summary 전송 - def route_after_filter(state): return "process" if state["coldmail_candidates"] else END - - graph = StateGraph(ColdmailState); graph.add_node("fetch", fetch_emails_node), add_node("filter", filter_coldmail_node), add_node("process", process_email_node), add_node("send", send_summary_node) - - graph.add_edge("fetch", "filter"), add_conditional_edges("filter", route_after_filter, {"process": "process", END: END}), add_edge("process", "send"), add_edge("send", END) + - graph = StateGraph(ColdmailState); graph.add_node/add_edge/add_conditional_edges - workflow = graph.compile() -- coldmail_briefing.py:78-302: _run_coldmail_briefing() 교체 +- coldmail_briefing.py:78-302 교체 (최종 206줄) - coldmail_briefing.py:86-101 유지 (now, start_time, user_id 계산) - - from app.workflows.coldmail_workflow import workflow - - initial_state = {"emails": [], "coldmail_candidates": [], "processed_results": [], "user_id": user_id, "start_time": start_time.isoformat(), "now": now.isoformat()} - - result = await workflow.ainvoke(initial_state) + - from app.services.workflows.coldmail_workflow import workflow + - result = await workflow.ainvoke({"emails": [], "coldmail_candidates": [], "processed_results": [], "user_id": user_id, "start_time": start_time.isoformat(), "now": now.isoformat()}) ---