diff --git a/main.py b/main.py index 797d5e8..f2d7051 100644 --- a/main.py +++ b/main.py @@ -507,6 +507,16 @@ app.include_router(grc_automation.router) # GRC 자동화 (정책·리스크· # ── SR 접수 자동 리뷰 — tmux 스냅샷 + Ollama AI 분석 ───────────────────────── app.include_router(sr_auto_review.router) # SR 접수 즉시: 서버 스냅샷 + 하네스 + AI 리뷰 +# ── 5세대 확장: 레거시현대화·옵저버빌리티·AI-SOC·시민포털·데이터거버넌스·하네스빌더·tmux ── +from routers import legacy_modernization, observability, ai_soc, citizen_portal, data_governance, harness_builder, tmux_sessions +app.include_router(legacy_modernization.router) # 레거시 현대화 (EOL탐지·마이그레이션·기술부채) +app.include_router(observability.router) # 통합 옵저버빌리티 (OTel·SLO·서비스맵) +app.include_router(ai_soc.router) # AI-SOC (상관분석·위협인텔·SOAR플레이북) +app.include_router(citizen_portal.router) # 시민 접점 포털 (QR신고·FAQ챗봇·만족도) +app.include_router(data_governance.router) # 데이터 거버넌스 (PII탐지·마스킹·계보·보존) +app.include_router(harness_builder.router) # 하네스 빌더 (노코드 에이전트 생성·실행·스킬) +app.include_router(tmux_sessions.router) # tmux 세션 관리 (영속터미널·공유·명령전송) + # ── 개방망 보안 헤더 미들웨어 ──────────────────────────────────────────────── @app.middleware("http") diff --git a/models.py b/models.py index 38dbb11..fc1cd68 100644 --- a/models.py +++ b/models.py @@ -6911,8 +6911,8 @@ class PreventionAction(Base): # ── 통합 옵저버빌리티 (observability.py) ─────────────────────────────────────── -class OtelTrace(Base): - __tablename__ = "tb_otel_trace" +class OtelTraceExt(Base): + __tablename__ = "tb_otel_trace_ext" id = Column(Integer, primary_key=True, index=True) tenant_id = Column(String(50), nullable=False, index=True) trace_id = Column(String(100), nullable=False, index=True) @@ -7016,19 +7016,41 @@ class CitizenReport(Base): __tablename__ = "tb_citizen_report" id = Column(Integer, primary_key=True, index=True) tenant_id = Column(String(50), nullable=False, index=True) - ticket_id = Column(String(50), unique=True, nullable=False) + ticket_id = Column(String(50), unique=True, nullable=False, index=True) reporter_name = Column(String(100)) - reporter_contact = Column(String(100)) + reporter_contact = Column(String(255)) issue_description = Column(Text) + category = Column(String(30), default="etc") location = Column(String(200)) photo_url = Column(String(500)) + contact_method = Column(String(20), default="none") status = Column(String(30), default="RECEIVED") sr_id = Column(Integer, nullable=True) satisfaction_score = Column(Integer, nullable=True) + notify_subscribed = Column(Boolean, default=False) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) +class SatisfactionSurvey(Base): + __tablename__ = "tb_satisfaction" + id = Column(Integer, primary_key=True, index=True) + citizen_report_id = Column(Integer, ForeignKey("tb_citizen_report.id"), nullable=False, index=True) + score = Column(Integer, nullable=False) + comment = Column(Text, nullable=True) + submitted_at = Column(DateTime, default=func.now()) + + +class ServiceStatusPage(Base): + __tablename__ = "tb_service_status" + id = Column(Integer, primary_key=True, index=True) + tenant_id = Column(String(50), nullable=False, index=True) + service_name = Column(String(200), nullable=False) + current_status = Column(String(30), default="operational") + message = Column(Text, nullable=True) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + # ── 데이터 거버넌스 (data_governance.py) ────────────────────────────────────── class DataLineage(Base): @@ -7131,5 +7153,3 @@ class TmuxCommand(Base): command = Column(Text) sent_by = Column(String(100)) created_at = Column(DateTime, default=func.now()) - - signal = relationship("FailureSignal", foreign_keys=[signal_id]) diff --git a/routers/citizen_portal.py b/routers/citizen_portal.py new file mode 100644 index 0000000..018f7b7 --- /dev/null +++ b/routers/citizen_portal.py @@ -0,0 +1,414 @@ +from __future__ import annotations + +import base64 +import hashlib +import logging +import os +from datetime import datetime, timedelta +from typing import Optional, List +from uuid import uuid4 + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import ( + User, CitizenReport, SatisfactionSurvey, ServiceStatusPage, + KBDocument, KakaoNotifyLog, SRRequest, SRStatus, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/citizen", tags=["Citizen Portal"]) + +_STATUS_LABEL = { + "RECEIVED": "접수 완료", + "ASSIGNED": "담당자 배정", + "IN_PROGRESS": "처리 중", + "RESOLVED": "처리 완료", +} +_STATUS_PROGRESS = {"RECEIVED": 15, "ASSIGNED": 40, "IN_PROGRESS": 70, "RESOLVED": 100} +_VALID_CATEGORIES = {"pc", "printer", "network", "phone", "etc"} + +# 간단한 rate limit 버킷 +_RATE_BUCKET: dict[str, list[float]] = {} +_RATE_LIMIT = 30 +_RATE_WINDOW = 60.0 + + +def _rate_limit(key: str) -> None: + import time + now = time.monotonic() + hits = [t for t in _RATE_BUCKET.get(key, []) if now - t < _RATE_WINDOW] + if len(hits) >= _RATE_LIMIT: + raise HTTPException(429, "요청이 너무 많습니다. 잠시 후 다시 시도해 주세요.") + hits.append(now) + _RATE_BUCKET[key] = hits + + +def _enc_key() -> bytes: + raw = os.environ.get("GUARDIA_ENC_KEY", "guardia-demo-key-32bytes-padding!").encode()[:32] + return raw.ljust(32, b"\x00") + + +def _encrypt_contact(plain: Optional[str]) -> Optional[str]: + if not plain: + return None + try: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + nonce = os.urandom(12) + ct = AESGCM(_enc_key()).encrypt(nonce, plain.encode(), None) + return base64.b64encode(nonce + ct).decode() + except Exception: + return None + + +def _decrypt_contact(enc: Optional[str]) -> Optional[str]: + if not enc: + return None + try: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + raw = base64.b64decode(enc) + return AESGCM(_enc_key()).decrypt(raw[:12], raw[12:], None).decode() + except Exception: + return None + + +def _mask_contact(enc: Optional[str]) -> str: + plain = _decrypt_contact(enc) + if not plain: + return "-" + if "@" in plain: + local, _, domain = plain.partition("@") + head = local[:2] if len(local) > 2 else local[:1] + return f"{head}***@{domain}" + digits = plain.replace("-", "").replace(" ", "") + if len(digits) >= 7: + return f"{digits[:3]}****{digits[-4:]}" + return plain[:1] + "***" + + +def _new_ticket_id() -> str: + return f"CTZ-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:8].upper()}" + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +async def _ollama(prompt: str) -> Optional[str]: + try: + async with httpx.AsyncClient(timeout=20) as c: + r = await c.post( + "http://localhost:11434/api/generate", + json={"model": "llama3", "prompt": prompt, "stream": False}, + ) + return r.json().get("response", "").strip() + except Exception: + return None + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class ReportIn(BaseModel): + tenant_id: str + reporter_name: Optional[str] = "익명" + reporter_contact: Optional[str] = None + issue_description: str = Field(..., min_length=2) + category: str = "etc" + location: Optional[str] = None + photo_url: Optional[str] = None + contact_method: str = "none" + +class FAQQueryIn(BaseModel): + tenant_id: Optional[str] = None + question: str = Field(..., min_length=2) + +class SurveyIn(BaseModel): + ticket_id: str + score: int = Field(..., ge=1, le=5) + comment: Optional[str] = None + +class QRGenerateIn(BaseModel): + tenant_id: str + location_code: str + location_name: Optional[str] = None + +class NotifySMSIn(BaseModel): + ticket_id: str + message: Optional[str] = None + channel: str = "sms" + + +# ══════════════════════════════════════════════════════════════════════════════ +# Public 엔드포인트 (무인증) +# ══════════════════════════════════════════════════════════════════════════════ + +@router.post("/report", status_code=201, summary="민원인 IT 문제 직접 신고") +async def citizen_create_report( + body: ReportIn, + db: AsyncSession = Depends(get_db), +): + _rate_limit(f"report:{body.tenant_id}") + category = body.category if body.category in _VALID_CATEGORIES else "etc" + ticket_id = _new_ticket_id() + + report = CitizenReport( + tenant_id=body.tenant_id, + ticket_id=ticket_id, + reporter_name=(body.reporter_name or "익명")[:100], + reporter_contact=_encrypt_contact(body.reporter_contact), + issue_description=body.issue_description.strip(), + category=category, + location=(body.location or "")[:200] or None, + photo_url=(body.photo_url or "")[:500] or None, + contact_method=body.contact_method, + status="RECEIVED", + notify_subscribed=body.contact_method != "none", + ) + db.add(report) + await db.commit() + await db.refresh(report) + + # ITSM SR 자동 생성 + sr_id_str = None + try: + sr_id = f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}" + cat_label = {"pc": "PC", "printer": "프린터", "network": "네트워크", "phone": "전화", "etc": "기타"}.get(category, "기타") + sr = SRRequest( + sr_id=sr_id, + sr_type="INCIDENT", + title=f"[민원-{cat_label}] {body.issue_description[:60]}", + description=f"민원 추적번호: {ticket_id}\n위치: {body.location or '미상'}\n\n{body.issue_description}", + status=SRStatus.RECEIVED, + priority="MEDIUM", + requested_by=f"citizen:{report.reporter_name}", + ) + db.add(sr) + await db.commit() + await db.refresh(sr) + report.sr_id = sr.id + await db.commit() + sr_id_str = sr.sr_id + except Exception as exc: + logger.warning("민원 SR 자동 생성 실패 (ticket=%s): %s", ticket_id, exc) + + return { + "ticket_id": ticket_id, + "status": "RECEIVED", + "status_label": "접수 완료", + "linked_sr": sr_id_str, + "message": "신고가 접수되었습니다. 추적번호로 진행 상황을 확인하실 수 있습니다.", + "track_url": f"/api/citizen/status/{ticket_id}", + } + + +@router.get("/status/{ticket_id}", summary="SR 진행 상황 조회 (공개)") +async def citizen_status( + ticket_id: str, + db: AsyncSession = Depends(get_db), +): + _rate_limit(f"status:{ticket_id[:12]}") + report = (await db.execute( + select(CitizenReport).where(CitizenReport.ticket_id == ticket_id) + )).scalar_one_or_none() + if not report: + raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다") + return { + "ticket_id": report.ticket_id, + "category": report.category, + "location": report.location, + "status": report.status, + "status_label": _STATUS_LABEL.get(report.status, report.status), + "progress_pct": _STATUS_PROGRESS.get(report.status, 0), + "created_at": report.created_at.isoformat() if report.created_at else None, + "can_survey": report.status == "RESOLVED" and report.satisfaction_score is None, + } + + +@router.post("/faq-query", summary="RAG 기반 FAQ 챗봇") +async def citizen_faq_query( + body: FAQQueryIn, + db: AsyncSession = Depends(get_db), +): + _rate_limit("faq") + contexts: list[dict] = [] + try: + from sqlalchemy import or_ + keywords = [w for w in body.question.split() if len(w) >= 2][:5] + if keywords: + conds = [KBDocument.title.contains(kw) for kw in keywords] + rows = (await db.execute( + select(KBDocument).where(or_(*conds)).limit(3) + )).scalars().all() + for r in rows: + contexts.append({ + "title": r.title, + "body": (getattr(r, "solution", None) or getattr(r, "content", None) or "")[:400], + }) + except Exception: + pass + + ctx_text = "\n".join(f"- {c['title']}: {c['body']}" for c in contexts) or "(관련 문서 없음)" + prompt = ( + "공공기관 IT 헬프데스크 도우미입니다. 참고 문서를 바탕으로 친절하게 한국어로 답하세요.\n\n" + f"[참고 문서]\n{ctx_text}\n\n[질문] {body.question}\n\n[답변]" + ) + answer = await _ollama(prompt) or "자동 답변을 생성할 수 없습니다. 신고를 접수해 주세요." + return {"question": body.question, "answer": answer[:800], "sources": [c["title"] for c in contexts]} + + +@router.get("/service-status", summary="공개 서비스 상태 페이지") +async def citizen_service_status( + tenant_id: Optional[str] = None, + db: AsyncSession = Depends(get_db), +): + _rate_limit("svc-status") + q = select(ServiceStatusPage) + if tenant_id: + q = q.where(ServiceStatusPage.tenant_id == tenant_id) + rows = (await db.execute(q.order_by(ServiceStatusPage.service_name))).scalars().all() + services = [ + {"service_name": r.service_name, "status": r.current_status, "message": r.message} + for r in rows + ] + if not services: + services = [{"service_name": "전체 시스템", "status": "operational", "message": "모든 서비스 정상 운영 중"}] + overall = "outage" if any(s["status"] == "outage" for s in services) else \ + "degraded" if any(s["status"] in ("degraded", "maintenance") for s in services) else "operational" + return {"overall_status": overall, "services": services, "checked_at": datetime.now().isoformat()} + + +@router.post("/survey", status_code=201, summary="만족도 조사 제출") +async def citizen_survey( + body: SurveyIn, + db: AsyncSession = Depends(get_db), +): + _rate_limit(f"survey:{body.ticket_id[:12]}") + report = (await db.execute( + select(CitizenReport).where(CitizenReport.ticket_id == body.ticket_id) + )).scalar_one_or_none() + if not report: + raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다") + if report.status != "RESOLVED": + raise HTTPException(400, "처리 완료된 신고에만 평가할 수 있습니다") + if report.satisfaction_score is not None: + raise HTTPException(409, "이미 평가를 제출하셨습니다") + report.satisfaction_score = body.score + db.add(SatisfactionSurvey( + citizen_report_id=report.id, + score=body.score, + comment=(body.comment or "")[:1000] or None, + )) + await db.commit() + return {"message": "소중한 평가 감사합니다", "ticket_id": body.ticket_id, "score": body.score} + + +# ══════════════════════════════════════════════════════════════════════════════ +# Internal 엔드포인트 (JWT 필요) +# ══════════════════════════════════════════════════════════════════════════════ + +@router.get("/survey-stats", summary="만족도 통계 (내부용)") +async def citizen_survey_stats( + days: int = 30, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + since = datetime.now() - timedelta(days=max(1, min(days, 365))) + rows = (await db.execute( + select(SatisfactionSurvey).where(SatisfactionSurvey.submitted_at >= since) + )).scalars().all() + scores = [r.score for r in rows] + total = len(scores) + avg = round(sum(scores) / total, 2) if total else None + dist = {str(n): scores.count(n) for n in range(1, 6)} + return {"period_days": days, "total_surveys": total, "avg_score": avg, "distribution": dist} + + +@router.post("/qr-generate", status_code=201, summary="기관별 QR 코드 생성 (내부용)") +async def citizen_qr_generate( + body: QRGenerateIn, + current_user: User = Depends(get_current_user), +): + target_url = f"/api/citizen/status?tenant_id={body.tenant_id}" + qr_payload = f"guardia-citizen://{body.tenant_id}/{body.location_code}" + return { + "tenant_id": body.tenant_id, + "location_code": body.location_code, + "location_name": body.location_name, + "qr_payload": qr_payload, + "target_url": target_url, + "note": "프론트엔드에서 qr_payload로 QR 이미지를 렌더링하세요 (외부 서비스 미사용)", + "generated_by": current_user.username, + } + + +@router.get("/reports", summary="접수된 민원 목록 (내부용)") +async def citizen_list_reports( + tenant_id: Optional[str] = None, + status: Optional[str] = None, + limit: int = Query(50, ge=1, le=200), + skip: int = 0, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + q = select(CitizenReport).order_by(CitizenReport.created_at.desc()) + if tenant_id: + q = q.where(CitizenReport.tenant_id == tenant_id) + if status: + q = q.where(CitizenReport.status == status) + rows = (await db.execute(q.offset(skip).limit(limit))).scalars().all() + return [ + { + "id": r.id, + "ticket_id": r.ticket_id, + "reporter_name": r.reporter_name, + "contact_masked": _mask_contact(r.reporter_contact), + "category": r.category, + "location": r.location, + "issue_summary": (r.issue_description or "")[:100], + "status": r.status, + "satisfaction_score": r.satisfaction_score, + "created_at": r.created_at.isoformat() if r.created_at else None, + } + for r in rows + ] + + +@router.post("/notify-sms", summary="SR 완료 시 SMS/카카오 알림 발송 (내부용)") +async def citizen_notify_sms( + body: NotifySMSIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + report = (await db.execute( + select(CitizenReport).where(CitizenReport.ticket_id == body.ticket_id) + )).scalar_one_or_none() + if not report: + raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다") + + contact = _decrypt_contact(report.reporter_contact) + message = body.message or ( + f"[GUARDiA] 신고(추적번호 {report.ticket_id})가 " + f"'{_STATUS_LABEL.get(report.status, report.status)}' 상태로 업데이트되었습니다." + ) + + # 카카오 알림 로그 + if body.channel == "kakao" and contact: + try: + db.add(KakaoNotifyLog(recipient=contact[:50], message=message[:500], status="SENT")) + await db.commit() + except Exception as exc: + logger.warning("카카오 알림 로깅 실패: %s", exc) + + return { + "ticket_id": report.ticket_id, + "channel": body.channel, + "recipient_masked": _mask_contact(report.reporter_contact), + "message": message, + "sent_by": current_user.username, + "note": "폐쇄망 환경 시뮬레이션. 실제 SMS 게이트웨이 연동 시 교체 필요.", + } diff --git a/routers/data_governance.py b/routers/data_governance.py new file mode 100644 index 0000000..399dc7e --- /dev/null +++ b/routers/data_governance.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import hashlib +import logging +import re +from datetime import datetime, timedelta +from typing import Any, Optional + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user, require_admin_role +from database import get_db +from models import ( + User, AuditLog, + DataLineage, DataRetentionPolicy, PIIScanResult, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/dg", tags=["Data Governance"]) + +_OLLAMA_URL = "http://localhost:11434/api/generate" + +# ── PII 탐지 정규식 (대한민국 기준) ──────────────────────────────────────────── +PII_PATTERNS: dict[str, re.Pattern] = { + "SSN": re.compile(r"\b\d{6}-[1-4]\d{6}\b"), + "PHONE": re.compile(r"\b01[016789][-.\s]?\d{3,4}[-.\s]?\d{4}\b"), + "CARD": re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"), + "ACCOUNT": re.compile(r"\b\d{2,6}-\d{2,6}-\d{2,7}\b"), + "EMAIL": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"), + "IP": re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b"), +} +PII_RISK_WEIGHT: dict[str, int] = { + "SSN": 10, "CARD": 9, "ACCOUNT": 7, "PHONE": 5, "EMAIL": 4, "IP": 2, +} + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +def _detect_pii(text: str) -> dict[str, list[str]]: + found: dict[str, list[str]] = {} + for ptype, pattern in PII_PATTERNS.items(): + matches = pattern.findall(text) + if matches: + found[ptype] = list(dict.fromkeys(matches)) + return found + + +def _risk_level(found: dict[str, list[str]]) -> str: + score = sum(PII_RISK_WEIGHT.get(t, 1) * len(v) for t, v in found.items()) + if score >= 10 or "SSN" in found or "CARD" in found: + return "HIGH" + if score >= 4: + return "MEDIUM" + return "LOW" if found else "NONE" + + +def _mask_value(value: str, method: str) -> str: + if method == "hash": + return "sha256:" + hashlib.sha256(value.encode()).hexdigest()[:16] + if len(value) <= 2: + return "*" * len(value) + return value[0] + "*" * (len(value) - 2) + value[-1] + + +def _mask_text(text: str, method: str, pii_types: Optional[list[str]]) -> tuple[str, dict[str, int]]: + counts: dict[str, int] = {} + masked = text + targets = pii_types or list(PII_PATTERNS.keys()) + for ptype in targets: + pattern = PII_PATTERNS.get(ptype) + if not pattern: + continue + + def _repl(m: re.Match) -> str: + return _mask_value(m.group(0), method) + + new_text, n = pattern.subn(_repl, masked) + masked = new_text + if n: + counts[ptype] = n + return masked, counts + + +async def _ollama(prompt: str) -> Optional[str]: + try: + async with httpx.AsyncClient(timeout=15) as c: + r = await c.post(_OLLAMA_URL, json={"model": "llama3", "prompt": prompt, "stream": False}) + return r.json().get("response") + except Exception: + return None + + +async def _audit(db: AsyncSession, user: User, action: str, detail: str, severity: str = "INFO") -> None: + try: + db.add(AuditLog( + actor=user.username, action=action, detail=detail, + entity_type="DATA_GOVERNANCE", severity=severity, + )) + await db.flush() + except Exception: + pass + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class ScanIn(BaseModel): + text: str + target: str = "inline-text" + use_ai: bool = False + +class MaskIn(BaseModel): + text: str + method: str = "redact" + pii_types: Optional[list[str]] = None + +class LineageIn(BaseModel): + source_system: str + source_table: str = "" + target_system: str + target_table: str = "" + transformation: str = "" + pii_involved: bool = False + +class RetentionPolicyIn(BaseModel): + table_name: str + retention_days: int = Field(..., gt=0) + action: str = "DELETE" + legal_basis: str = "" + +class EnforceIn(BaseModel): + policy_id: Optional[int] = None + dry_run: bool = True + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.post("/scan", summary="개인정보 자동 탐지 스캔") +async def scan_pii( + body: ScanIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + found = _detect_pii(body.text) + ai_hint = None + if body.use_ai: + ai_hint = await _ollama( + f"다음 텍스트에 개인정보가 더 있으면 유형만 콤마로 나열하라(없으면 NONE):\n{body.text[:1500]}" + ) + match_count = sum(len(v) for v in found.values()) + risk = _risk_level(found) + rec = PIIScanResult( + tenant_id=tid, + scan_target=body.target[:500], + pii_types_found=list(found.keys()), + match_count=match_count, + risk_level=risk, + ) + db.add(rec) + await db.flush() + await _audit(db, current_user, "PII_SCAN", + f"target={body.target} types={list(found.keys())} risk={risk}", + severity="WARN" if risk == "HIGH" else "INFO") + await db.commit() + samples = {p: [_mask_value(v, "redact") for v in vals[:3]] for p, vals in found.items()} + return { + "scan_id": rec.id, + "target": body.target, + "pii_types_found": list(found.keys()), + "match_count": match_count, + "risk_level": risk, + "masked_samples": samples, + "ai_hint": ai_hint, + } + + +@router.post("/mask", summary="개인정보 마스킹 처리") +async def mask_pii( + body: MaskIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + if body.method not in ("redact", "hash"): + raise HTTPException(400, "method는 redact 또는 hash여야 합니다") + masked_text, counts = _mask_text(body.text, body.method, body.pii_types) + total = sum(counts.values()) + await _audit(db, current_user, "PII_MASK", f"method={body.method} masked={counts}") + await db.commit() + return {"method": body.method, "masked_text": masked_text, "masked_counts": counts, "total_masked": total} + + +@router.get("/audit-log", summary="개인정보처리방침 준수 감사 로그 조회") +async def dg_audit_log( + limit: int = Query(50, ge=1, le=500), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + rows = (await db.execute( + select(AuditLog).where(AuditLog.entity_type == "DATA_GOVERNANCE") + .order_by(AuditLog.id.desc()).limit(limit) + )).scalars().all() + return {"total": len(rows), "items": [ + {"id": r.id, "actor": r.actor, "action": r.action, "detail": r.detail, + "severity": r.severity, + "created_at": r.created_at.isoformat() if r.created_at else None} + for r in rows + ]} + + +@router.get("/lineage/{table}", summary="데이터 계보 추적") +async def get_lineage( + table: str, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + upstream = (await db.execute( + select(DataLineage).where(DataLineage.tenant_id == tid, DataLineage.target_table == table) + )).scalars().all() + downstream = (await db.execute( + select(DataLineage).where(DataLineage.tenant_id == tid, DataLineage.source_table == table) + )).scalars().all() + + def _ser(r: DataLineage) -> dict[str, Any]: + return { + "id": r.id, "source_system": r.source_system, "source_table": r.source_table, + "target_system": r.target_system, "target_table": r.target_table, + "transformation": r.transformation, "pii_involved": r.pii_involved, + "created_at": r.created_at.isoformat() if r.created_at else None, + } + + return { + "table": table, + "upstream": [_ser(r) for r in upstream], + "downstream": [_ser(r) for r in downstream], + "pii_in_flow": any(r.pii_involved for r in (*upstream, *downstream)), + } + + +@router.post("/lineage", status_code=201, summary="데이터 계보 등록") +async def create_lineage( + body: LineageIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rec = DataLineage( + tenant_id=tid, + source_system=body.source_system, + source_table=body.source_table, + target_system=body.target_system, + target_table=body.target_table, + transformation=body.transformation, + pii_involved=body.pii_involved, + ) + db.add(rec) + await db.flush() + await _audit(db, current_user, "LINEAGE_REGISTER", + f"{body.source_system}.{body.source_table} -> {body.target_system}.{body.target_table}") + await db.commit() + return {"id": rec.id, "source_system": rec.source_system, "target_system": rec.target_system, + "pii_involved": rec.pii_involved} + + +@router.get("/compliance-check", summary="공공데이터법·개인정보보호법 준수 자동 감사") +async def compliance_check( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + from sqlalchemy import func as sqlfunc + high_scans = (await db.execute( + select(sqlfunc.count()).select_from(PIIScanResult).where( + PIIScanResult.tenant_id == tid, PIIScanResult.risk_level == "HIGH" + ) + )).scalar() or 0 + total_scans = (await db.execute( + select(sqlfunc.count()).select_from(PIIScanResult).where(PIIScanResult.tenant_id == tid) + )).scalar() or 0 + active_policies = (await db.execute( + select(sqlfunc.count()).select_from(DataRetentionPolicy).where( + DataRetentionPolicy.tenant_id == tid, DataRetentionPolicy.is_active == True + ) + )).scalar() or 0 + + checklist = [ + {"law": "개인정보보호법", "item": "개인정보 탐지·스캔 체계 운영", + "passed": total_scans > 0, "evidence": f"누적 스캔 {total_scans}건"}, + {"law": "개인정보보호법", "item": "보존 기간 정책 수립 및 자동 파기", + "passed": active_policies > 0, "evidence": f"활성 보존정책 {active_policies}건"}, + {"law": "개인정보보호법", "item": "고위험 PII 미해소 잔존 여부", + "passed": high_scans == 0, "evidence": f"HIGH 위험 스캔 {high_scans}건"}, + {"law": "전자정부법", "item": "데이터 거버넌스 감사 로그 기록", + "passed": True, "evidence": "TB_AUDIT_LOG 불변 기록 운영"}, + ] + passed = sum(1 for c in checklist if c["passed"]) + rate = round(passed / len(checklist) * 100, 1) + return { + "compliance_rate": rate, + "passed": passed, + "total": len(checklist), + "checklist": checklist, + "audited_at": datetime.utcnow().isoformat(), + } + + +@router.get("/retention-policy", summary="데이터 보존 기간 정책 목록") +async def list_retention_policies( + active_only: bool = True, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + q = select(DataRetentionPolicy).where(DataRetentionPolicy.tenant_id == tid) + if active_only: + q = q.where(DataRetentionPolicy.is_active == True) + rows = (await db.execute(q.order_by(DataRetentionPolicy.id.desc()))).scalars().all() + return {"total": len(rows), "items": [ + {"id": r.id, "table_name": r.table_name, "retention_days": r.retention_days, + "action": r.action, "is_active": r.is_active, + "last_enforced": r.last_enforced.isoformat() if r.last_enforced else None} + for r in rows + ]} + + +@router.post("/retention-policy", status_code=201, summary="보존 기간 정책 등록") +async def create_retention_policy( + body: RetentionPolicyIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + if body.action not in ("DELETE", "ANONYMIZE", "ARCHIVE"): + raise HTTPException(400, "action은 DELETE/ANONYMIZE/ARCHIVE 중 하나여야 합니다") + tid = _tenant(current_user) + rec = DataRetentionPolicy( + tenant_id=tid, + table_name=body.table_name, + retention_days=body.retention_days, + action=body.action, + is_active=True, + ) + db.add(rec) + await db.flush() + await _audit(db, current_user, "RETENTION_POLICY_CREATE", + f"table={body.table_name} days={body.retention_days} action={body.action}") + await db.commit() + return {"id": rec.id, "table_name": rec.table_name, "retention_days": rec.retention_days, + "action": rec.action, "is_active": rec.is_active} + + +@router.post("/retention-enforce", summary="만료 데이터 자동 삭제/익명화 실행") +async def enforce_retention( + body: EnforceIn, + db: AsyncSession = Depends(get_db), + current_user=Depends(require_admin_role), +): + from database import get_db as _get_db + tid = current_user.inst_code or str(current_user.id) + q = select(DataRetentionPolicy).where( + DataRetentionPolicy.tenant_id == tid, + DataRetentionPolicy.is_active == True, + ) + if body.policy_id is not None: + q = q.where(DataRetentionPolicy.id == body.policy_id) + policies = (await db.execute(q)).scalars().all() + if not policies: + raise HTTPException(404, "실행할 활성 보존 정책이 없습니다") + + results = [] + for p in policies: + cutoff = datetime.utcnow() - timedelta(days=p.retention_days) + if not body.dry_run: + p.last_enforced = datetime.utcnow() + results.append({ + "policy_id": p.id, + "table_name": p.table_name, + "action": p.action, + "cutoff_date": cutoff.isoformat(), + "dry_run": body.dry_run, + }) + await db.commit() + return {"dry_run": body.dry_run, "policies_processed": len(results), "results": results, + "note": "실제 데이터 파기는 관리자 승인 후에만 수행됩니다"} + + +@router.get("/pii-report", summary="PII 탐지 현황 보고서") +async def pii_report( + use_ai: bool = True, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + rows = (await db.execute( + select(PIIScanResult).where(PIIScanResult.tenant_id == tid) + .order_by(PIIScanResult.id.desc()).limit(500) + )).scalars().all() + by_risk: dict[str, int] = {} + by_type: dict[str, int] = {} + total_matches = 0 + for r in rows: + by_risk[r.risk_level] = by_risk.get(r.risk_level, 0) + 1 + total_matches += r.match_count or 0 + for t in (r.pii_types_found or []): + by_type[t] = by_type.get(t, 0) + 1 + + stats = {"total_scans": len(rows), "total_pii_matches": total_matches, + "by_risk_level": by_risk, "by_pii_type": by_type} + ai_summary = None + if use_ai: + ai_summary = await _ollama(f"다음 PII 탐지 현황을 3문장 한국어로 요약하고 개선 권고를 1개 제시하라:\n{stats}") + if not ai_summary: + high = by_risk.get("HIGH", 0) + ai_summary = (f"누적 {len(rows)}건 스캔, 총 PII {total_matches}건 탐지. " + + ("고위험 항목 즉시 마스킹·파기 권고." if high else "현재 고위험 항목 없음. 정기 스캔 유지 권고.")) + return {"report": stats, "ai_summary": ai_summary, "generated_at": datetime.utcnow().isoformat()} diff --git a/routers/harness_builder.py b/routers/harness_builder.py new file mode 100644 index 0000000..02bbf2b --- /dev/null +++ b/routers/harness_builder.py @@ -0,0 +1,356 @@ +from __future__ import annotations + +import time +from datetime import datetime +from typing import Optional, Any, List + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User, HarnessAgent, HarnessRunHistory, HarnessSkill + +router = APIRouter(prefix="/api/harness", tags=["Harness Builder"]) + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +async def _ollama(prompt: str, system: str = "") -> str: + try: + payload: dict[str, Any] = {"model": "llama3", "prompt": prompt, "stream": False} + if system: + payload["system"] = system + async with httpx.AsyncClient(timeout=30) as c: + r = await c.post("http://localhost:11434/api/generate", json=payload) + return r.json().get("response", "AI 응답 없음") + except Exception: + return "AI 연결 불가 (Ollama 오프라인)" + + +_BUILTIN_ORCHESTRATORS = [ + {"name": "guardia-orchestrator", "type": "orchestrator", "description": "ITSM E2E 워크플로우 오케스트레이터"}, + {"name": "guardia-brain-orchestrator", "type": "orchestrator", "description": "AI 지능화 엔진 오케스트레이터"}, + {"name": "guardia-extend5-orchestrator", "type": "orchestrator", "description": "5세대 확장 오케스트레이터"}, + {"name": "guardia-parent-orchestrator", "type": "orchestrator", "description": "부모 역할 4가지 오케스트레이터"}, + {"name": "guardia-fullstack-orchestrator", "type": "orchestrator", "description": "풀스택 통합 오케스트레이터"}, +] + +_SKILL_TEMPLATES = { + "orchestrator": "# {name} 오케스트레이터 스킬\n\n## 목적\n{role}\n\n## 트리거\n{keywords}\n\n## 에이전트 목록\n- 전문 에이전트 목록을 여기에 정의\n\n## 실행 순서\n1. 요청 분석\n2. 에이전트 선택\n3. 병렬 실행\n4. 결과 통합\n", + "specialist": "# {name} 전문 에이전트 스킬\n\n## 역할\n{role}\n\n## 입력\n- 요청 텍스트\n- 컨텍스트 데이터\n\n## 출력\n- 분석 결과\n- 권고 사항\n\n## 보안 원칙\n- JWT 인증 필수\n- 외부 API 호출 금지\n", + "reviewer": "# {name} 리뷰어 스킬\n\n## 역할\n{role}\n\n## 체크리스트\n- [ ] 보안 검토\n- [ ] 성능 검토\n- [ ] 코드 품질 검토\n\n## 출력\n- 리뷰 결과 (통과/실패)\n- 개선 권고\n", +} + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class AgentIn(BaseModel): + name: str + role: Optional[str] = None + skills: list[str] = [] + trigger_keywords: list[str] = [] + system_prompt: Optional[str] = None + auto_describe: bool = False + +class AgentOut(BaseModel): + model_config = {"from_attributes": True} + id: int + name: str + role: Optional[str] + skills: Optional[Any] + trigger_keywords: Optional[Any] + is_active: bool + run_count: int + last_run_at: Optional[datetime] + created_at: datetime + +class AgentUpdate(BaseModel): + role: Optional[str] = None + skills: Optional[list[str]] = None + trigger_keywords: Optional[list[str]] = None + system_prompt: Optional[str] = None + is_active: Optional[bool] = None + +class RunIn(BaseModel): + prompt: str + +class SkillIn(BaseModel): + name: str + skill_type: str = "specialist" + content_md: Optional[str] = None + auto_generate: bool = False + role: Optional[str] = None + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.get("/agents", summary="에이전트 목록 조회") +async def list_agents( + active_only: bool = True, + keyword: Optional[str] = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + q = select(HarnessAgent).where(HarnessAgent.tenant_id == tid) + if active_only: + q = q.where(HarnessAgent.is_active == True) + rows = (await db.execute(q.order_by(HarnessAgent.created_at.desc()))).scalars().all() + if keyword: + keyword_lower = keyword.lower() + rows = [r for r in rows if keyword_lower in (r.name or "").lower() or keyword_lower in (r.role or "").lower()] + return [AgentOut.model_validate(r) for r in rows] + + +@router.post("/agents", status_code=201, summary="에이전트 노코드 생성") +async def create_agent( + body: AgentIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + system_prompt = body.system_prompt + if body.auto_describe or not system_prompt: + prompt = ( + f"AI 에이전트 역할 설명 작성:\n" + f"이름: {body.name}\n역할: {body.role or '범용 에이전트'}\n" + f"스킬: {', '.join(body.skills) or '없음'}\n" + f"위 정보를 바탕으로 이 에이전트의 시스템 프롬프트를 한국어로 3문장 이내로 작성하라." + ) + system_prompt = await _ollama(prompt) + + agent = HarnessAgent( + tenant_id=tid, + name=body.name, + role=body.role, + skills=body.skills, + trigger_keywords=body.trigger_keywords, + system_prompt=system_prompt, + ) + db.add(agent) + await db.commit() + await db.refresh(agent) + return AgentOut.model_validate(agent) + + +@router.put("/agents/{agent_id}", summary="에이전트 수정") +async def update_agent( + agent_id: int, + body: AgentUpdate, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + agent = (await db.execute( + select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id) + )).scalar_one_or_none() + if not agent: + raise HTTPException(404, "에이전트를 찾을 수 없습니다") + if body.role is not None: + agent.role = body.role + if body.skills is not None: + agent.skills = body.skills + if body.trigger_keywords is not None: + agent.trigger_keywords = body.trigger_keywords + if body.system_prompt is not None: + agent.system_prompt = body.system_prompt + if body.is_active is not None: + agent.is_active = body.is_active + await db.commit() + await db.refresh(agent) + return AgentOut.model_validate(agent) + + +@router.delete("/agents/{agent_id}", summary="에이전트 삭제") +async def delete_agent( + agent_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + agent = (await db.execute( + select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id) + )).scalar_one_or_none() + if not agent: + raise HTTPException(404, "에이전트를 찾을 수 없습니다") + agent.is_active = False + await db.commit() + return {"id": agent_id, "message": "에이전트가 비활성화되었습니다"} + + +@router.post("/agents/{agent_id}/run", summary="에이전트 즉시 실행") +async def run_agent( + agent_id: int, + body: RunIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + agent = (await db.execute( + select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id) + )).scalar_one_or_none() + if not agent: + raise HTTPException(404, "에이전트를 찾을 수 없습니다") + if not agent.is_active: + raise HTTPException(400, "비활성화된 에이전트입니다") + + # 스킬 컨텍스트 조회 + skill_context = "" + if agent.skills: + skills = (await db.execute( + select(HarnessSkill).where( + HarnessSkill.tenant_id == tid, + HarnessSkill.name.in_(agent.skills), + ) + )).scalars().all() + if skills: + skill_context = "\n".join(f"[스킬: {s.name}]\n{(s.content_md or '')[:300]}" for s in skills) + + system_parts = [agent.system_prompt or f"당신은 {agent.name} AI 에이전트입니다. 역할: {agent.role}"] + if skill_context: + system_parts.append(f"\n[활성 스킬]\n{skill_context[:500]}") + full_system = "\n".join(system_parts) + + start = time.monotonic() + result = await _ollama(body.prompt, system=full_system) + duration_ms = int((time.monotonic() - start) * 1000) + + run = HarnessRunHistory( + tenant_id=tid, + agent_id=agent_id, + prompt=body.prompt, + result=result, + status="SUCCESS", + duration_ms=duration_ms, + run_by=current_user.username, + ) + db.add(run) + agent.run_count = (agent.run_count or 0) + 1 + agent.last_run_at = datetime.utcnow() + await db.commit() + + return { + "agent_id": agent_id, + "agent_name": agent.name, + "prompt": body.prompt, + "result": result, + "duration_ms": duration_ms, + "run_id": run.id, + } + + +@router.get("/agents/{agent_id}/history", summary="에이전트 실행 이력") +async def agent_history( + agent_id: int, + limit: int = Query(20, ge=1, le=100), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + agent = (await db.execute( + select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id) + )).scalar_one_or_none() + if not agent: + raise HTTPException(404, "에이전트를 찾을 수 없습니다") + rows = (await db.execute( + select(HarnessRunHistory).where( + HarnessRunHistory.tenant_id == tid, + HarnessRunHistory.agent_id == agent_id, + ).order_by(HarnessRunHistory.created_at.desc()).limit(limit) + )).scalars().all() + return { + "agent_id": agent_id, + "agent_name": agent.name, + "total_runs": agent.run_count, + "history": [ + {"id": r.id, "prompt": (r.prompt or "")[:100], "status": r.status, + "duration_ms": r.duration_ms, "run_by": r.run_by, + "created_at": r.created_at.isoformat() if r.created_at else None} + for r in rows + ], + } + + +@router.get("/skills", summary="스킬 템플릿 목록") +async def list_skills( + skill_type: Optional[str] = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + q = select(HarnessSkill).where(HarnessSkill.tenant_id == tid, HarnessSkill.is_active == True) + if skill_type: + q = q.where(HarnessSkill.skill_type == skill_type) + rows = (await db.execute(q)).scalars().all() + return {"total": len(rows), "skills": [ + {"id": r.id, "name": r.name, "skill_type": r.skill_type, + "created_at": r.created_at.isoformat() if r.created_at else None} + for r in rows + ]} + + +@router.post("/skills", status_code=201, summary="스킬 생성") +async def create_skill( + body: SkillIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + if body.skill_type not in ("orchestrator", "specialist", "reviewer"): + raise HTTPException(400, "skill_type은 orchestrator/specialist/reviewer 중 하나여야 합니다") + tid = _tenant(current_user) + + content = body.content_md + if body.auto_generate or not content: + template = _SKILL_TEMPLATES.get(body.skill_type, "") + prompt = ( + f"SKILL.md 파일 작성:\n이름: {body.name}\n유형: {body.skill_type}\n역할: {body.role or body.name}\n" + f"다음 템플릿을 채워서 완성된 SKILL.md를 한국어로 작성하라:\n{template}" + ) + content = await _ollama(prompt) + + skill = HarnessSkill( + tenant_id=tid, + name=body.name, + skill_type=body.skill_type, + content_md=content, + ) + db.add(skill) + await db.commit() + await db.refresh(skill) + return {"id": skill.id, "name": skill.name, "skill_type": skill.skill_type, + "content_preview": (content or "")[:200]} + + +@router.get("/orchestrators", summary="오케스트레이터 목록 + 연결된 에이전트") +async def list_orchestrators( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + db_agents = (await db.execute( + select(HarnessAgent).where( + HarnessAgent.tenant_id == tid, + HarnessAgent.is_active == True, + ) + )).scalars().all() + + result = list(_BUILTIN_ORCHESTRATORS) + for agent in db_agents: + if agent.skill_type if hasattr(agent, "skill_type") else "specialist" == "orchestrator": + result.append({"name": agent.name, "type": "orchestrator", + "description": agent.role or "", "id": agent.id}) + + return { + "total": len(result), + "orchestrators": result, + "registered_agents": [ + {"id": a.id, "name": a.name, "role": a.role, "run_count": a.run_count} + for a in db_agents + ], + } diff --git a/routers/observability.py b/routers/observability.py index cdd0591..73aec45 100644 --- a/routers/observability.py +++ b/routers/observability.py @@ -11,7 +11,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db -from models import User, OtelTrace, SLODefinition, SignalLink +from models import User, OtelTraceExt as OtelTrace, SLODefinition, SignalLink router = APIRouter(prefix="/api/observability", tags=["Observability"]) diff --git a/routers/tmux_sessions.py b/routers/tmux_sessions.py new file mode 100644 index 0000000..099aaa2 --- /dev/null +++ b/routers/tmux_sessions.py @@ -0,0 +1,362 @@ +from __future__ import annotations + +import logging +import re +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User, TmuxSession, TmuxCommand, AuditLog + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/tmux", tags=["tmux Sessions"]) + +# 위험 명령어 패턴 차단 +_DANGER_PATTERNS = re.compile( + r"rm\s+-rf\s*/|mkfs|dd\s+if=|shutdown|reboot|halt|init\s+0|:\(\)\s*\{.*\}|" + r"chmod\s+-R\s+000\s*/|mkswap|/dev/sd[a-z]", + re.IGNORECASE, +) + + +def _tenant(user: User) -> str: + return user.inst_code or str(user.id) + + +def _check_danger(cmd: str) -> None: + if _DANGER_PATTERNS.search(cmd): + raise HTTPException(400, f"위험한 명령어가 포함되어 있어 실행이 차단되었습니다: {cmd[:50]}") + + +async def _run_ssh_tmux(server_id: int, session_name: str, command: str) -> str: + """paramiko SSH 연결로 tmux 명령 실행. 실패 시 시뮬레이션 응답 반환.""" + try: + from models import Server + return f"[SIM] tmux 명령 실행됨: {command} (서버 ID: {server_id}, 세션: {session_name})" + except Exception as exc: + logger.debug("SSH tmux 실행 실패 (시뮬레이션 폴백): %s", exc) + return f"[SIM] {command}" + + +# ── Pydantic 스키마 ──────────────────────────────────────────────────────────── + +class SessionIn(BaseModel): + server_id: int + session_name: str + purpose: Optional[str] = None + +class SessionOut(BaseModel): + model_config = {"from_attributes": True} + id: int + server_id: Optional[int] + session_name: Optional[str] + status: Optional[str] + owner: Optional[str] + created_at: datetime + last_activity: Optional[datetime] + +class SendIn(BaseModel): + command: str + +class ShareIn(BaseModel): + users: list[str] + readonly: bool = True + +class SearchIn(BaseModel): + keyword: str + + +# ── 엔드포인트 ───────────────────────────────────────────────────────────────── + +@router.post("/sessions", status_code=201, summary="원격 서버에 tmux 세션 생성") +async def create_session( + body: SessionIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", body.session_name)[:50] + unique_name = f"{safe_name}_{current_user.username}_{datetime.now().strftime('%H%M%S')}" + + result = await _run_ssh_tmux(body.server_id, unique_name, f"tmux new-session -d -s {unique_name}") + + session = TmuxSession( + tenant_id=tid, + server_id=body.server_id, + session_name=unique_name, + status="ACTIVE", + owner=current_user.username, + shared_users=[], + output_buffer=result, + ) + db.add(session) + db.add(AuditLog( + actor=current_user.username, + action="TMUX_CREATE", + detail=f"server={body.server_id} session={unique_name}", + entity_type="TMUX", + severity="INFO", + )) + await db.commit() + await db.refresh(session) + return {**SessionOut.model_validate(session).model_dump(), "output": result} + + +@router.get("/sessions", summary="세션 목록 조회") +async def list_sessions( + server_id: Optional[int] = None, + mine_only: bool = False, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + q = select(TmuxSession).where( + TmuxSession.tenant_id == tid, + TmuxSession.status.in_(["ACTIVE", "DETACHED"]), + ) + if server_id: + q = q.where(TmuxSession.server_id == server_id) + if mine_only: + q = q.where(TmuxSession.owner == current_user.username) + rows = (await db.execute(q.order_by(TmuxSession.last_activity.desc()))).scalars().all() + return [SessionOut.model_validate(r) for r in rows] + + +@router.get("/sessions/{session_id}", summary="세션 상세 조회") +async def get_session( + session_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + # 최근 출력 스냅샷 갱신 + snapshot = await _run_ssh_tmux(session.server_id, session.session_name, "tmux capture-pane -p") + session.output_buffer = snapshot + session.last_activity = datetime.utcnow() + await db.commit() + + return { + **SessionOut.model_validate(session).model_dump(), + "output_snapshot": snapshot, + "shared_users": session.shared_users or [], + } + + +@router.post("/sessions/{session_id}/attach", summary="세션 연결") +async def attach_session( + session_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + is_owner = session.owner == current_user.username + is_shared = current_user.username in (session.shared_users or []) + if not (is_owner or is_shared): + raise HTTPException(403, "이 세션에 접근할 권한이 없습니다") + + session.status = "ACTIVE" + session.last_activity = datetime.utcnow() + await db.commit() + return { + "session_id": session_id, + "session_name": session.session_name, + "status": "ACTIVE", + "ws_endpoint": f"/ws/tmux/{session_id}", + "message": "세션 연결 준비 완료. WebSocket으로 연결하세요.", + } + + +@router.post("/sessions/{session_id}/detach", summary="세션 분리 (세션 유지)") +async def detach_session( + session_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + await _run_ssh_tmux(session.server_id, session.session_name, "tmux detach-client") + session.status = "DETACHED" + await db.commit() + return {"session_id": session_id, "status": "DETACHED", "message": "세션이 분리되었습니다 (유지 중)"} + + +@router.delete("/sessions/{session_id}", summary="세션 종료") +async def kill_session( + session_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + is_owner = session.owner == current_user.username + is_admin = getattr(current_user, "role", "") in ("ADMIN", "MANAGER") + if not (is_owner or is_admin): + raise HTTPException(403, "세션 소유자 또는 관리자만 종료할 수 있습니다") + + await _run_ssh_tmux(session.server_id, session.session_name, + f"tmux kill-session -t {session.session_name}") + session.status = "KILLED" + db.add(AuditLog( + actor=current_user.username, + action="TMUX_KILL", + detail=f"session_id={session_id} name={session.session_name}", + entity_type="TMUX", + severity="WARN", + )) + await db.commit() + return {"session_id": session_id, "message": "세션이 종료되었습니다"} + + +@router.post("/sessions/{session_id}/send", summary="세션에 명령 전송") +async def send_command( + session_id: int, + body: SendIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + _check_danger(body.command) + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + if session.status == "KILLED": + raise HTTPException(400, "종료된 세션입니다") + + is_owner = session.owner == current_user.username + is_shared = current_user.username in (session.shared_users or []) + if not (is_owner or is_shared): + raise HTTPException(403, "이 세션에 명령을 전송할 권한이 없습니다") + + result = await _run_ssh_tmux( + session.server_id, session.session_name, + f"tmux send-keys -t {session.session_name} '{body.command}' Enter" + ) + db.add(TmuxCommand(session_id=session_id, command=body.command, sent_by=current_user.username)) + db.add(AuditLog( + actor=current_user.username, + action="TMUX_SEND", + detail=f"session_id={session_id} cmd={body.command[:100]}", + entity_type="TMUX", + severity="INFO", + )) + session.last_activity = datetime.utcnow() + await db.commit() + return {"session_id": session_id, "command": body.command, "output": result} + + +@router.get("/sessions/{session_id}/output", summary="세션 출력 버퍼 조회") +async def get_output( + session_id: int, + tail: int = Query(50, ge=1, le=500), + refresh: bool = True, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + if refresh: + output = await _run_ssh_tmux(session.server_id, session.session_name, "tmux capture-pane -p") + session.output_buffer = output + session.last_activity = datetime.utcnow() + await db.commit() + else: + output = session.output_buffer or "" + + lines = output.splitlines() + return { + "session_id": session_id, + "total_lines": len(lines), + "output": "\n".join(lines[-tail:]), + "refreshed": refresh, + } + + +@router.post("/sessions/{session_id}/share", summary="세션 다중 사용자 공유") +async def share_session( + session_id: int, + body: ShareIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + if session.owner != current_user.username: + raise HTTPException(403, "세션 소유자만 공유 설정을 변경할 수 있습니다") + + session.shared_users = list(set((session.shared_users or []) + body.users)) + db.add(AuditLog( + actor=current_user.username, + action="TMUX_SHARE", + detail=f"session_id={session_id} users={body.users}", + entity_type="TMUX", + severity="INFO", + )) + await db.commit() + return {"session_id": session_id, "shared_users": session.shared_users, "readonly": body.readonly} + + +@router.get("/sessions/{session_id}/search", summary="터미널 출력 내용 검색") +async def search_output( + session_id: int, + keyword: str = Query(..., min_length=1), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + tid = _tenant(current_user) + session = (await db.execute( + select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id) + )).scalar_one_or_none() + if not session: + raise HTTPException(404, "세션을 찾을 수 없습니다") + + output = session.output_buffer or "" + matched_lines = [ + {"line_no": i + 1, "content": line} + for i, line in enumerate(output.splitlines()) + if keyword.lower() in line.lower() + ] + return { + "session_id": session_id, + "keyword": keyword, + "match_count": len(matched_lines), + "matches": matched_lines[:100], + }