""" 문서 워크플로우 자동화 — OCR 결과 → ITSM 자동 연동 Upstage OCR 결과를 ITSM 기능에 자동 연동하는 7개 워크플로우. 엔드포인트: POST /api/docflow/contract — 나라장터 계약서 → 조달 자동 등록 POST /api/docflow/server-spec — 서버납품서 → CMDB 자동 등록 POST /api/docflow/invoice — 청구서/세금계산서 → 과금 연동 POST /api/docflow/audit-report — CSAP/감사보고서 → 준수율 업데이트 POST /api/docflow/incident-report — 장애보고서 이미지 → SR 자동 생성 POST /api/docflow/meeting-minutes — 회의록 → SR/액션아이템 생성 POST /api/docflow/brand-contract — 기업 브랜드 계약서 (현대백화점 등) GET /api/docflow/jobs — 작업 목록 GET /api/docflow/jobs/{id} — 작업 상세 """ from __future__ import annotations import json import logging import re from datetime import date, datetime from typing import Optional import httpx from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile from pydantic import BaseModel from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import ( User, UpstageOCRConfig, OCRHistory, DocWorkflowJob, SRRequest, SRStatus, Server, ProcurementRecord, Invoice, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/docflow", tags=["문서 워크플로우"]) UPSTAGE_BASE = "https://api.upstage.ai/v1/document-ai" MAX_FILE_SIZE = 20 * 1024 * 1024 # ── 내부 헬퍼 ──────────────────────────────────────────────────────────────── def _parse_amount(text: str) -> int: """금액 문자열 → 정수 (₩50,000,000 → 50000000).""" if not text: return 0 cleaned = re.sub(r'[^\d]', '', str(text)) return int(cleaned) if cleaned else 0 def _parse_date(text: str) -> Optional[date]: """날짜 문자열 → date (다양한 형식 지원).""" if not text: return None formats = ["%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d", "%Y년 %m월 %d일", "%Y%m%d"] cleaned = str(text).strip() for fmt in formats: try: return datetime.strptime(cleaned, fmt).date() except ValueError: continue return None async def _get_api_key(user: User, db: AsyncSession) -> str: row = await db.execute( select(UpstageOCRConfig).where( UpstageOCRConfig.tenant_id == user.tenant_id, UpstageOCRConfig.is_active == True, ) ) cfg = row.scalar_one_or_none() if not cfg: raise HTTPException(404, "Upstage API Key 미설정. POST /api/ocr/config 에서 설정하세요.") return cfg.api_key_enc async def _extract(api_key: str, file_bytes: bytes, filename: str, schema: dict) -> dict: """Upstage Information Extraction 호출.""" from pathlib import Path MIME = {".pdf": "application/pdf", ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".tiff": "image/tiff"} ext = Path(filename).suffix.lower() mime = MIME.get(ext, "application/octet-stream") async with httpx.AsyncClient(timeout=120) as client: r = await client.post( f"{UPSTAGE_BASE}/information-extraction", headers={"Authorization": f"Bearer {api_key}"}, files={"document": (filename, file_bytes, mime)}, data={"schema": json.dumps(schema, ensure_ascii=False)} ) if r.status_code != 200: raise HTTPException(502, f"Upstage API 오류: {r.text[:200]}") return r.json() async def _parse_doc(api_key: str, file_bytes: bytes, filename: str) -> dict: """Upstage Document Parse 호출.""" from pathlib import Path MIME = {".pdf": "application/pdf", ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg"} ext = Path(filename).suffix.lower() mime = MIME.get(ext, "application/octet-stream") async with httpx.AsyncClient(timeout=120) as client: r = await client.post( f"{UPSTAGE_BASE}/document-digitization", headers={"Authorization": f"Bearer {api_key}"}, files={"document": (filename, file_bytes, mime)}, data={"model": "document-parse-ocr", "ocr": "auto", "output_formats": '["text"]'} ) if r.status_code != 200: raise HTTPException(502, f"Upstage API 오류: {r.text[:200]}") return r.json() async def _save_job(db: AsyncSession, tenant_id: int, user_id: int, workflow: str, filename: str, template_id: Optional[int], extracted: dict, linked_table: str, linked_id: Optional[int], status: str = "DONE") -> int: job = DocWorkflowJob( tenant_id=tenant_id, workflow_type=workflow, filename=filename, template_id=template_id, status=status, extracted_data=extracted, linked_table=linked_table, linked_record_id=linked_id, created_by=user_id, created_at=datetime.utcnow(), completed_at=datetime.utcnow(), ) db.add(job) await db.commit() await db.refresh(job) return job.id def _simplify(result: dict) -> dict: """Upstage 추출 결과 → 단순 Key-Value.""" if "result" in result and isinstance(result["result"], dict): return {k: v.get("value", "") if isinstance(v, dict) else v for k, v in result["result"].items()} return {} # ── 워크플로우 엔드포인트 ─────────────────────────────────────────────────── @router.post("/contract") async def process_contract( file: UploadFile = File(...), auto_register: bool = Form(True), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """나라장터 계약서 → 조달 이력 자동 등록.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) schema = { "contract_no": "계약번호", "contract_name": "계약품명", "supplier": "공급사명", "supplier_biz_no": "공급사 사업자번호", "amount": "계약금액(원)", "vat": "부가세액", "start_date": "계약시작일", "end_date": "계약종료일", "institution": "발주기관명", "manager": "담당자명", "payment_terms": "납부조건", } result = await _extract(api_key, file_bytes, file.filename or "contract.pdf", schema) extracted = _simplify(result) record_id = None if auto_register and extracted.get("contract_no"): record = ProcurementRecord( tenant_id=user.tenant_id, contract_no=extracted.get("contract_no", ""), contract_name=extracted.get("contract_name", "미상"), supplier=extracted.get("supplier", ""), amount=_parse_amount(extracted.get("amount", "0")), category="IT계약", start_date=_parse_date(extracted.get("start_date")), end_date=_parse_date(extracted.get("end_date")), status="ACTIVE", created_at=datetime.utcnow(), ) db.add(record) await db.commit() await db.refresh(record) record_id = record.id job_id = await _save_job(db, user.tenant_id, user.id, "contract", file.filename or "", None, extracted, "tb_procurement_record", record_id) return { "ok": True, "workflow": "contract", "extracted": extracted, "record_id": record_id, "job_id": job_id, "message": f"계약 정보 추출 완료" + (f" → 조달 ID {record_id} 등록" if record_id else " (수동 확인 필요)"), } @router.post("/server-spec") async def process_server_spec( file: UploadFile = File(...), auto_register: bool = Form(True), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """서버 납품 명세서 → CMDB 자동 등록.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) schema = { "hostname": "호스트명/서버명", "manufacturer": "제조사", "model_no": "모델번호", "serial_no": "시리얼번호", "cpu_model": "CPU 모델명", "cpu_cores": "CPU 코어 수", "memory_gb": "메모리 용량(GB)", "disk_config": "스토리지 구성", "os": "운영체제", "ip_addr": "IP주소", "rack_location": "랙/위치", "warranty_until": "보증기간 만료일", "delivery_date": "납품일", } result = await _extract(api_key, file_bytes, file.filename or "spec.pdf", schema) extracted = _simplify(result) server_id = None if auto_register and extracted.get("hostname"): server = Server( hostname=extracted.get("hostname", ""), ip_addr=extracted.get("ip_addr", "0.0.0.0"), os_type=extracted.get("os", ""), cpu_cores=int(re.sub(r'[^\d]', '', extracted.get("cpu_cores", "0") or "0") or 0), memory_mb=int(re.sub(r'[^\d]', '', extracted.get("memory_gb", "0") or "0") or 0) * 1024, ssh_user="opsagent", discovered_at=datetime.utcnow(), ) db.add(server) await db.commit() await db.refresh(server) server_id = server.id job_id = await _save_job(db, user.tenant_id, user.id, "server_spec", file.filename or "", None, extracted, "tb_server_info", server_id) return { "ok": True, "workflow": "server_spec", "extracted": extracted, "server_id": server_id, "job_id": job_id, "message": f"서버 사양 추출 완료" + (f" → CMDB ID {server_id} 등록" if server_id else " (수동 확인 필요)"), } @router.post("/invoice") async def process_invoice( file: UploadFile = File(...), auto_register: bool = Form(True), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """청구서/세금계산서 → 과금 Invoice 자동 등록.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) schema = { "invoice_no": "세금계산서번호/청구번호", "issue_date": "발행일", "supplier_name": "공급자 상호", "supplier_biz_no": "공급자 사업자번호", "buyer_name": "공급받는자 상호", "supply_amount": "공급가액", "vat_amount": "세액", "total_amount": "합계금액", "items": "품목/내역", "payment_due": "결제기한", } result = await _extract(api_key, file_bytes, file.filename or "invoice.pdf", schema) extracted = _simplify(result) invoice_id = None if auto_register and extracted.get("total_amount"): today = date.today() invoice = Invoice( tenant_id=user.tenant_id, plan="OCR_IMPORT", period=today.strftime("%Y-%m"), amount=_parse_amount(extracted.get("total_amount", "0")), status="DRAFT", generated_by=user.id, created_at=datetime.utcnow(), ) db.add(invoice) await db.commit() await db.refresh(invoice) invoice_id = invoice.id job_id = await _save_job(db, user.tenant_id, user.id, "invoice", file.filename or "", None, extracted, "tb_invoice", invoice_id) return { "ok": True, "workflow": "invoice", "extracted": extracted, "invoice_id": invoice_id, "job_id": job_id, "total_amount": _parse_amount(extracted.get("total_amount", "0")), } @router.post("/audit-report") async def process_audit_report( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """CSAP/감사 보고서 → 준수율 정보 추출.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) schema = { "institution": "기관명", "check_date": "점검일", "auditor": "점검자/감사기관", "total_items": "총 점검항목 수", "passed_items": "적합(통과) 항목 수", "failed_items": "부적합 항목 수", "compliance_rate": "준수율(%)", "major_findings": "주요 발견사항", "recommendations": "권고사항", } result = await _extract(api_key, file_bytes, file.filename or "audit.pdf", schema) extracted = _simplify(result) job_id = await _save_job(db, user.tenant_id, user.id, "audit_report", file.filename or "", None, extracted, "audit", None) compliance_rate = float(re.sub(r'[^\d.]', '', extracted.get("compliance_rate", "0") or "0") or 0) return { "ok": True, "workflow": "audit_report", "extracted": extracted, "compliance_rate": compliance_rate, "job_id": job_id, "message": f"감사 보고서 분석 완료. 준수율: {compliance_rate}%", } @router.post("/incident-report") async def process_incident_report( file: UploadFile = File(...), auto_create_sr: bool = Form(True), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """장애보고서 이미지/PDF → 에러 내용 추출 → SR 자동 생성.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) # Document Parse로 텍스트 추출 parse_result = await _parse_doc(api_key, file_bytes, file.filename or "incident.png") text = parse_result.get("content", {}).get("text", "") if isinstance(parse_result.get("content"), dict) else "" # 추가로 정보 추출 schema = { "incident_date": "발생일시", "incident_type": "장애유형", "affected_system": "영향 시스템", "error_message": "오류 메시지", "severity": "심각도(P1/P2/P3/P4)", "reporter": "보고자", } extract_result = await _extract(api_key, file_bytes, file.filename or "incident.png", schema) extracted = _simplify(extract_result) sr_id = None if auto_create_sr: severity = extracted.get("severity", "P3") priority = {"P1": "HIGH", "P2": "HIGH", "P3": "MEDIUM", "P4": "LOW"}.get(severity.upper(), "MEDIUM") title = f"[장애보고서] {extracted.get('incident_type', '장애')} - {extracted.get('affected_system', '미상')}" description = ( f"OCR 추출 장애보고서\n\n" f"발생일시: {extracted.get('incident_date', '-')}\n" f"장애유형: {extracted.get('incident_type', '-')}\n" f"영향 시스템: {extracted.get('affected_system', '-')}\n" f"오류 메시지: {extracted.get('error_message', '-')}\n\n" f"원본 텍스트:\n{text[:500]}" ) sr = SRRequest( title=title[:100], description=description, category="INCIDENT", priority=priority, status=SRStatus.OPEN, created_at=datetime.utcnow(), ) db.add(sr) await db.commit() await db.refresh(sr) sr_id = sr.id job_id = await _save_job(db, user.tenant_id, user.id, "incident_report", file.filename or "", None, extracted, "tb_sr_request", sr_id) return { "ok": True, "workflow": "incident_report", "extracted": extracted, "sr_id": sr_id, "job_id": job_id, "message": f"장애 보고서 분석 완료" + (f" → SR-{sr_id} 생성" if sr_id else ""), } @router.post("/meeting-minutes") async def process_meeting_minutes( file: UploadFile = File(...), auto_create_sr: bool = Form(True), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """회의록 → 결정사항/액션아이템 추출 → SR 자동 생성.""" file_bytes = await file.read() api_key = await _get_api_key(user, db) schema = { "meeting_date": "회의일시", "chairman": "의장/주관자", "participants": "참석자 목록", "agenda": "회의 안건", "decisions": "결정사항", "action_items": "액션아이템(담당자/기한 포함)", "next_meeting": "차기 회의 일정", } result = await _extract(api_key, file_bytes, file.filename or "meeting.pdf", schema) extracted = _simplify(result) sr_ids = [] if auto_create_sr and extracted.get("action_items"): # 액션아이템별로 SR 생성 action_text = extracted.get("action_items", "") items = [a.strip() for a in re.split(r'[,\n]', action_text) if a.strip()] for item in items[:5]: # 최대 5개 SR sr = SRRequest( title=f"[회의록 액션] {item[:80]}", description=f"회의일: {extracted.get('meeting_date', '-')}\n의장: {extracted.get('chairman', '-')}\n\n액션아이템: {item}", category="TASK", priority="MEDIUM", status=SRStatus.OPEN, created_at=datetime.utcnow(), ) db.add(sr) await db.commit() await db.refresh(sr) sr_ids.append(sr.id) job_id = await _save_job(db, user.tenant_id, user.id, "meeting_minutes", file.filename or "", None, extracted, "tb_sr_request", sr_ids[0] if sr_ids else None) return { "ok": True, "workflow": "meeting_minutes", "extracted": extracted, "sr_ids": sr_ids, "job_id": job_id, "message": f"회의록 분석 완료" + (f" → SR {sr_ids} 생성" if sr_ids else ""), } @router.post("/brand-contract") async def process_brand_contract( file: UploadFile = File(...), auto_register: bool = Form(True), brand_name: str = Form("", description="브랜드사명 (예: 현대백화점)"), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """ 기업 브랜드 계약서 처리 — 현대백화점·롯데·신세계 등 유통/브랜드 계약. 나라장터 외 일반 B2B 계약서를 자동 파싱하여 계약 이력에 등록. """ file_bytes = await file.read() api_key = await _get_api_key(user, db) # 브랜드 계약서 전용 스키마 schema = { "contract_title": "계약서 제목", "party_a": "갑(발주사/브랜드사)", "party_a_biz_no": "갑 사업자번호", "party_b": "을(수주사/입점사/공급사)", "party_b_biz_no": "을 사업자번호", "contract_amount": "계약금액(숫자만)", "currency": "통화(KRW/USD/기타)", "effective_date": "계약체결일(YYYY-MM-DD)", "expiry_date": "계약만료일(YYYY-MM-DD)", "auto_renewal": "자동갱신여부(Y/N)", "payment_terms": "대금 지급조건", "contract_items": "계약 품목/서비스", "royalty_rate": "수수료율/로열티율", "territory": "적용지역/매장명", "exclusive": "독점여부(Y/N)", "termination": "계약 해지 조건", "penalty_clause": "위약금 조항", "contact_a": "갑 담당자명", "contact_b": "을 담당자명", "special_terms": "특약사항", } result = await _extract(api_key, file_bytes, file.filename or "brand_contract.pdf", schema) extracted = _simplify(result) # 브랜드사명 보완 if brand_name and not extracted.get("party_a"): extracted["party_a"] = brand_name record_id = None if auto_register: record = ProcurementRecord( tenant_id=user.tenant_id, contract_no=f"BRAND-{datetime.utcnow().strftime('%Y%m%d%H%M')}", contract_name=extracted.get("contract_title") or f"{extracted.get('party_a', '브랜드사')} 계약서", supplier=extracted.get("party_b", ""), amount=_parse_amount(extracted.get("contract_amount", "0")), category="브랜드계약", start_date=_parse_date(extracted.get("effective_date")), end_date=_parse_date(extracted.get("expiry_date")), status="ACTIVE", created_at=datetime.utcnow(), ) db.add(record) await db.commit() await db.refresh(record) record_id = record.id job_id = await _save_job(db, user.tenant_id, user.id, "brand_contract", file.filename or "", None, extracted, "tb_procurement_record", record_id) return { "ok": True, "workflow": "brand_contract", "brand_name": extracted.get("party_a", brand_name), "counterparty": extracted.get("party_b", ""), "contract_amount": _parse_amount(extracted.get("contract_amount", "0")), "currency": extracted.get("currency", "KRW"), "effective_date": extracted.get("effective_date", ""), "expiry_date": extracted.get("expiry_date", ""), "extracted": extracted, "record_id": record_id, "job_id": job_id, "message": f"브랜드 계약서 처리 완료" + (f" → 계약 ID {record_id} 등록" if record_id else ""), } @router.get("/jobs") async def list_workflow_jobs( limit: int = 50, workflow_type: Optional[str] = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): q = select(DocWorkflowJob).where(DocWorkflowJob.tenant_id == user.tenant_id) if workflow_type: q = q.where(DocWorkflowJob.workflow_type == workflow_type) q = q.order_by(desc(DocWorkflowJob.created_at)).limit(limit) rows = await db.execute(q) jobs = rows.scalars().all() return [ { "id": j.id, "workflow": j.workflow_type, "filename": j.filename, "status": j.status, "linked_table": j.linked_table, "linked_id": j.linked_record_id, "created_at": j.created_at, } for j in jobs ] @router.get("/jobs/{job_id}") async def get_workflow_job( job_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): row = await db.execute( select(DocWorkflowJob).where( DocWorkflowJob.id == job_id, DocWorkflowJob.tenant_id == user.tenant_id, ) ) job = row.scalar_one_or_none() if not job: raise HTTPException(404) return { "id": job.id, "workflow": job.workflow_type, "filename": job.filename, "status": job.status, "extracted_data": job.extracted_data, "linked_table": job.linked_table, "linked_id": job.linked_record_id, "error": job.error_message, "created_at": job.created_at, "completed_at": job.completed_at, }