feat(extend5): GUARDiA 5세대 확장 7개 라우터 구현 + 홈페이지 업데이트 [auto-sync]

This commit is contained in:
GUARDiA AutoDeploy 2026-06-06 12:57:01 +09:00 committed by DESKTOP-TKLFCPR\ython
parent bcc03839c7
commit 70843489f3
7 changed files with 1590 additions and 7 deletions

10
main.py
View File

@ -507,6 +507,16 @@ app.include_router(grc_automation.router) # GRC 자동화 (정책·리스크·
# ── SR 접수 자동 리뷰 — tmux 스냅샷 + Ollama AI 분석 ─────────────────────────
app.include_router(sr_auto_review.router) # SR 접수 즉시: 서버 스냅샷 + 하네스 + AI 리뷰
# ── 5세대 확장: 레거시현대화·옵저버빌리티·AI-SOC·시민포털·데이터거버넌스·하네스빌더·tmux ──
from routers import legacy_modernization, observability, ai_soc, citizen_portal, data_governance, harness_builder, tmux_sessions
app.include_router(legacy_modernization.router) # 레거시 현대화 (EOL탐지·마이그레이션·기술부채)
app.include_router(observability.router) # 통합 옵저버빌리티 (OTel·SLO·서비스맵)
app.include_router(ai_soc.router) # AI-SOC (상관분석·위협인텔·SOAR플레이북)
app.include_router(citizen_portal.router) # 시민 접점 포털 (QR신고·FAQ챗봇·만족도)
app.include_router(data_governance.router) # 데이터 거버넌스 (PII탐지·마스킹·계보·보존)
app.include_router(harness_builder.router) # 하네스 빌더 (노코드 에이전트 생성·실행·스킬)
app.include_router(tmux_sessions.router) # tmux 세션 관리 (영속터미널·공유·명령전송)
# ── 개방망 보안 헤더 미들웨어 ────────────────────────────────────────────────
@app.middleware("http")

View File

@ -6911,8 +6911,8 @@ class PreventionAction(Base):
# ── 통합 옵저버빌리티 (observability.py) ───────────────────────────────────────
class OtelTrace(Base):
__tablename__ = "tb_otel_trace"
class OtelTraceExt(Base):
__tablename__ = "tb_otel_trace_ext"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
trace_id = Column(String(100), nullable=False, index=True)
@ -7016,19 +7016,41 @@ class CitizenReport(Base):
__tablename__ = "tb_citizen_report"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
ticket_id = Column(String(50), unique=True, nullable=False)
ticket_id = Column(String(50), unique=True, nullable=False, index=True)
reporter_name = Column(String(100))
reporter_contact = Column(String(100))
reporter_contact = Column(String(255))
issue_description = Column(Text)
category = Column(String(30), default="etc")
location = Column(String(200))
photo_url = Column(String(500))
contact_method = Column(String(20), default="none")
status = Column(String(30), default="RECEIVED")
sr_id = Column(Integer, nullable=True)
satisfaction_score = Column(Integer, nullable=True)
notify_subscribed = Column(Boolean, default=False)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
class SatisfactionSurvey(Base):
__tablename__ = "tb_satisfaction"
id = Column(Integer, primary_key=True, index=True)
citizen_report_id = Column(Integer, ForeignKey("tb_citizen_report.id"), nullable=False, index=True)
score = Column(Integer, nullable=False)
comment = Column(Text, nullable=True)
submitted_at = Column(DateTime, default=func.now())
class ServiceStatusPage(Base):
__tablename__ = "tb_service_status"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
service_name = Column(String(200), nullable=False)
current_status = Column(String(30), default="operational")
message = Column(Text, nullable=True)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# ── 데이터 거버넌스 (data_governance.py) ──────────────────────────────────────
class DataLineage(Base):
@ -7131,5 +7153,3 @@ class TmuxCommand(Base):
command = Column(Text)
sent_by = Column(String(100))
created_at = Column(DateTime, default=func.now())
signal = relationship("FailureSignal", foreign_keys=[signal_id])

414
routers/citizen_portal.py Normal file
View File

@ -0,0 +1,414 @@
from __future__ import annotations
import base64
import hashlib
import logging
import os
from datetime import datetime, timedelta
from typing import Optional, List
from uuid import uuid4
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import (
User, CitizenReport, SatisfactionSurvey, ServiceStatusPage,
KBDocument, KakaoNotifyLog, SRRequest, SRStatus,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/citizen", tags=["Citizen Portal"])
_STATUS_LABEL = {
"RECEIVED": "접수 완료",
"ASSIGNED": "담당자 배정",
"IN_PROGRESS": "처리 중",
"RESOLVED": "처리 완료",
}
_STATUS_PROGRESS = {"RECEIVED": 15, "ASSIGNED": 40, "IN_PROGRESS": 70, "RESOLVED": 100}
_VALID_CATEGORIES = {"pc", "printer", "network", "phone", "etc"}
# 간단한 rate limit 버킷
_RATE_BUCKET: dict[str, list[float]] = {}
_RATE_LIMIT = 30
_RATE_WINDOW = 60.0
def _rate_limit(key: str) -> None:
import time
now = time.monotonic()
hits = [t for t in _RATE_BUCKET.get(key, []) if now - t < _RATE_WINDOW]
if len(hits) >= _RATE_LIMIT:
raise HTTPException(429, "요청이 너무 많습니다. 잠시 후 다시 시도해 주세요.")
hits.append(now)
_RATE_BUCKET[key] = hits
def _enc_key() -> bytes:
raw = os.environ.get("GUARDIA_ENC_KEY", "guardia-demo-key-32bytes-padding!").encode()[:32]
return raw.ljust(32, b"\x00")
def _encrypt_contact(plain: Optional[str]) -> Optional[str]:
if not plain:
return None
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
nonce = os.urandom(12)
ct = AESGCM(_enc_key()).encrypt(nonce, plain.encode(), None)
return base64.b64encode(nonce + ct).decode()
except Exception:
return None
def _decrypt_contact(enc: Optional[str]) -> Optional[str]:
if not enc:
return None
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
raw = base64.b64decode(enc)
return AESGCM(_enc_key()).decrypt(raw[:12], raw[12:], None).decode()
except Exception:
return None
def _mask_contact(enc: Optional[str]) -> str:
plain = _decrypt_contact(enc)
if not plain:
return "-"
if "@" in plain:
local, _, domain = plain.partition("@")
head = local[:2] if len(local) > 2 else local[:1]
return f"{head}***@{domain}"
digits = plain.replace("-", "").replace(" ", "")
if len(digits) >= 7:
return f"{digits[:3]}****{digits[-4:]}"
return plain[:1] + "***"
def _new_ticket_id() -> str:
return f"CTZ-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:8].upper()}"
def _tenant(user: User) -> str:
return user.inst_code or str(user.id)
async def _ollama(prompt: str) -> Optional[str]:
try:
async with httpx.AsyncClient(timeout=20) as c:
r = await c.post(
"http://localhost:11434/api/generate",
json={"model": "llama3", "prompt": prompt, "stream": False},
)
return r.json().get("response", "").strip()
except Exception:
return None
# ── Pydantic 스키마 ────────────────────────────────────────────────────────────
class ReportIn(BaseModel):
tenant_id: str
reporter_name: Optional[str] = "익명"
reporter_contact: Optional[str] = None
issue_description: str = Field(..., min_length=2)
category: str = "etc"
location: Optional[str] = None
photo_url: Optional[str] = None
contact_method: str = "none"
class FAQQueryIn(BaseModel):
tenant_id: Optional[str] = None
question: str = Field(..., min_length=2)
class SurveyIn(BaseModel):
ticket_id: str
score: int = Field(..., ge=1, le=5)
comment: Optional[str] = None
class QRGenerateIn(BaseModel):
tenant_id: str
location_code: str
location_name: Optional[str] = None
class NotifySMSIn(BaseModel):
ticket_id: str
message: Optional[str] = None
channel: str = "sms"
# ══════════════════════════════════════════════════════════════════════════════
# Public 엔드포인트 (무인증)
# ══════════════════════════════════════════════════════════════════════════════
@router.post("/report", status_code=201, summary="민원인 IT 문제 직접 신고")
async def citizen_create_report(
body: ReportIn,
db: AsyncSession = Depends(get_db),
):
_rate_limit(f"report:{body.tenant_id}")
category = body.category if body.category in _VALID_CATEGORIES else "etc"
ticket_id = _new_ticket_id()
report = CitizenReport(
tenant_id=body.tenant_id,
ticket_id=ticket_id,
reporter_name=(body.reporter_name or "익명")[:100],
reporter_contact=_encrypt_contact(body.reporter_contact),
issue_description=body.issue_description.strip(),
category=category,
location=(body.location or "")[:200] or None,
photo_url=(body.photo_url or "")[:500] or None,
contact_method=body.contact_method,
status="RECEIVED",
notify_subscribed=body.contact_method != "none",
)
db.add(report)
await db.commit()
await db.refresh(report)
# ITSM SR 자동 생성
sr_id_str = None
try:
sr_id = f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}"
cat_label = {"pc": "PC", "printer": "프린터", "network": "네트워크", "phone": "전화", "etc": "기타"}.get(category, "기타")
sr = SRRequest(
sr_id=sr_id,
sr_type="INCIDENT",
title=f"[민원-{cat_label}] {body.issue_description[:60]}",
description=f"민원 추적번호: {ticket_id}\n위치: {body.location or '미상'}\n\n{body.issue_description}",
status=SRStatus.RECEIVED,
priority="MEDIUM",
requested_by=f"citizen:{report.reporter_name}",
)
db.add(sr)
await db.commit()
await db.refresh(sr)
report.sr_id = sr.id
await db.commit()
sr_id_str = sr.sr_id
except Exception as exc:
logger.warning("민원 SR 자동 생성 실패 (ticket=%s): %s", ticket_id, exc)
return {
"ticket_id": ticket_id,
"status": "RECEIVED",
"status_label": "접수 완료",
"linked_sr": sr_id_str,
"message": "신고가 접수되었습니다. 추적번호로 진행 상황을 확인하실 수 있습니다.",
"track_url": f"/api/citizen/status/{ticket_id}",
}
@router.get("/status/{ticket_id}", summary="SR 진행 상황 조회 (공개)")
async def citizen_status(
ticket_id: str,
db: AsyncSession = Depends(get_db),
):
_rate_limit(f"status:{ticket_id[:12]}")
report = (await db.execute(
select(CitizenReport).where(CitizenReport.ticket_id == ticket_id)
)).scalar_one_or_none()
if not report:
raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다")
return {
"ticket_id": report.ticket_id,
"category": report.category,
"location": report.location,
"status": report.status,
"status_label": _STATUS_LABEL.get(report.status, report.status),
"progress_pct": _STATUS_PROGRESS.get(report.status, 0),
"created_at": report.created_at.isoformat() if report.created_at else None,
"can_survey": report.status == "RESOLVED" and report.satisfaction_score is None,
}
@router.post("/faq-query", summary="RAG 기반 FAQ 챗봇")
async def citizen_faq_query(
body: FAQQueryIn,
db: AsyncSession = Depends(get_db),
):
_rate_limit("faq")
contexts: list[dict] = []
try:
from sqlalchemy import or_
keywords = [w for w in body.question.split() if len(w) >= 2][:5]
if keywords:
conds = [KBDocument.title.contains(kw) for kw in keywords]
rows = (await db.execute(
select(KBDocument).where(or_(*conds)).limit(3)
)).scalars().all()
for r in rows:
contexts.append({
"title": r.title,
"body": (getattr(r, "solution", None) or getattr(r, "content", None) or "")[:400],
})
except Exception:
pass
ctx_text = "\n".join(f"- {c['title']}: {c['body']}" for c in contexts) or "(관련 문서 없음)"
prompt = (
"공공기관 IT 헬프데스크 도우미입니다. 참고 문서를 바탕으로 친절하게 한국어로 답하세요.\n\n"
f"[참고 문서]\n{ctx_text}\n\n[질문] {body.question}\n\n[답변]"
)
answer = await _ollama(prompt) or "자동 답변을 생성할 수 없습니다. 신고를 접수해 주세요."
return {"question": body.question, "answer": answer[:800], "sources": [c["title"] for c in contexts]}
@router.get("/service-status", summary="공개 서비스 상태 페이지")
async def citizen_service_status(
tenant_id: Optional[str] = None,
db: AsyncSession = Depends(get_db),
):
_rate_limit("svc-status")
q = select(ServiceStatusPage)
if tenant_id:
q = q.where(ServiceStatusPage.tenant_id == tenant_id)
rows = (await db.execute(q.order_by(ServiceStatusPage.service_name))).scalars().all()
services = [
{"service_name": r.service_name, "status": r.current_status, "message": r.message}
for r in rows
]
if not services:
services = [{"service_name": "전체 시스템", "status": "operational", "message": "모든 서비스 정상 운영 중"}]
overall = "outage" if any(s["status"] == "outage" for s in services) else \
"degraded" if any(s["status"] in ("degraded", "maintenance") for s in services) else "operational"
return {"overall_status": overall, "services": services, "checked_at": datetime.now().isoformat()}
@router.post("/survey", status_code=201, summary="만족도 조사 제출")
async def citizen_survey(
body: SurveyIn,
db: AsyncSession = Depends(get_db),
):
_rate_limit(f"survey:{body.ticket_id[:12]}")
report = (await db.execute(
select(CitizenReport).where(CitizenReport.ticket_id == body.ticket_id)
)).scalar_one_or_none()
if not report:
raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다")
if report.status != "RESOLVED":
raise HTTPException(400, "처리 완료된 신고에만 평가할 수 있습니다")
if report.satisfaction_score is not None:
raise HTTPException(409, "이미 평가를 제출하셨습니다")
report.satisfaction_score = body.score
db.add(SatisfactionSurvey(
citizen_report_id=report.id,
score=body.score,
comment=(body.comment or "")[:1000] or None,
))
await db.commit()
return {"message": "소중한 평가 감사합니다", "ticket_id": body.ticket_id, "score": body.score}
# ══════════════════════════════════════════════════════════════════════════════
# Internal 엔드포인트 (JWT 필요)
# ══════════════════════════════════════════════════════════════════════════════
@router.get("/survey-stats", summary="만족도 통계 (내부용)")
async def citizen_survey_stats(
days: int = 30,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
since = datetime.now() - timedelta(days=max(1, min(days, 365)))
rows = (await db.execute(
select(SatisfactionSurvey).where(SatisfactionSurvey.submitted_at >= since)
)).scalars().all()
scores = [r.score for r in rows]
total = len(scores)
avg = round(sum(scores) / total, 2) if total else None
dist = {str(n): scores.count(n) for n in range(1, 6)}
return {"period_days": days, "total_surveys": total, "avg_score": avg, "distribution": dist}
@router.post("/qr-generate", status_code=201, summary="기관별 QR 코드 생성 (내부용)")
async def citizen_qr_generate(
body: QRGenerateIn,
current_user: User = Depends(get_current_user),
):
target_url = f"/api/citizen/status?tenant_id={body.tenant_id}"
qr_payload = f"guardia-citizen://{body.tenant_id}/{body.location_code}"
return {
"tenant_id": body.tenant_id,
"location_code": body.location_code,
"location_name": body.location_name,
"qr_payload": qr_payload,
"target_url": target_url,
"note": "프론트엔드에서 qr_payload로 QR 이미지를 렌더링하세요 (외부 서비스 미사용)",
"generated_by": current_user.username,
}
@router.get("/reports", summary="접수된 민원 목록 (내부용)")
async def citizen_list_reports(
tenant_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = Query(50, ge=1, le=200),
skip: int = 0,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = select(CitizenReport).order_by(CitizenReport.created_at.desc())
if tenant_id:
q = q.where(CitizenReport.tenant_id == tenant_id)
if status:
q = q.where(CitizenReport.status == status)
rows = (await db.execute(q.offset(skip).limit(limit))).scalars().all()
return [
{
"id": r.id,
"ticket_id": r.ticket_id,
"reporter_name": r.reporter_name,
"contact_masked": _mask_contact(r.reporter_contact),
"category": r.category,
"location": r.location,
"issue_summary": (r.issue_description or "")[:100],
"status": r.status,
"satisfaction_score": r.satisfaction_score,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in rows
]
@router.post("/notify-sms", summary="SR 완료 시 SMS/카카오 알림 발송 (내부용)")
async def citizen_notify_sms(
body: NotifySMSIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
report = (await db.execute(
select(CitizenReport).where(CitizenReport.ticket_id == body.ticket_id)
)).scalar_one_or_none()
if not report:
raise HTTPException(404, "해당 추적번호의 신고를 찾을 수 없습니다")
contact = _decrypt_contact(report.reporter_contact)
message = body.message or (
f"[GUARDiA] 신고(추적번호 {report.ticket_id})가 "
f"'{_STATUS_LABEL.get(report.status, report.status)}' 상태로 업데이트되었습니다."
)
# 카카오 알림 로그
if body.channel == "kakao" and contact:
try:
db.add(KakaoNotifyLog(recipient=contact[:50], message=message[:500], status="SENT"))
await db.commit()
except Exception as exc:
logger.warning("카카오 알림 로깅 실패: %s", exc)
return {
"ticket_id": report.ticket_id,
"channel": body.channel,
"recipient_masked": _mask_contact(report.reporter_contact),
"message": message,
"sent_by": current_user.username,
"note": "폐쇄망 환경 시뮬레이션. 실제 SMS 게이트웨이 연동 시 교체 필요.",
}

421
routers/data_governance.py Normal file
View File

@ -0,0 +1,421 @@
from __future__ import annotations
import hashlib
import logging
import re
from datetime import datetime, timedelta
from typing import Any, Optional
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import (
User, AuditLog,
DataLineage, DataRetentionPolicy, PIIScanResult,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/dg", tags=["Data Governance"])
_OLLAMA_URL = "http://localhost:11434/api/generate"
# ── PII 탐지 정규식 (대한민국 기준) ────────────────────────────────────────────
PII_PATTERNS: dict[str, re.Pattern] = {
"SSN": re.compile(r"\b\d{6}-[1-4]\d{6}\b"),
"PHONE": re.compile(r"\b01[016789][-.\s]?\d{3,4}[-.\s]?\d{4}\b"),
"CARD": re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"),
"ACCOUNT": re.compile(r"\b\d{2,6}-\d{2,6}-\d{2,7}\b"),
"EMAIL": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
"IP": re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b"),
}
PII_RISK_WEIGHT: dict[str, int] = {
"SSN": 10, "CARD": 9, "ACCOUNT": 7, "PHONE": 5, "EMAIL": 4, "IP": 2,
}
def _tenant(user: User) -> str:
return user.inst_code or str(user.id)
def _detect_pii(text: str) -> dict[str, list[str]]:
found: dict[str, list[str]] = {}
for ptype, pattern in PII_PATTERNS.items():
matches = pattern.findall(text)
if matches:
found[ptype] = list(dict.fromkeys(matches))
return found
def _risk_level(found: dict[str, list[str]]) -> str:
score = sum(PII_RISK_WEIGHT.get(t, 1) * len(v) for t, v in found.items())
if score >= 10 or "SSN" in found or "CARD" in found:
return "HIGH"
if score >= 4:
return "MEDIUM"
return "LOW" if found else "NONE"
def _mask_value(value: str, method: str) -> str:
if method == "hash":
return "sha256:" + hashlib.sha256(value.encode()).hexdigest()[:16]
if len(value) <= 2:
return "*" * len(value)
return value[0] + "*" * (len(value) - 2) + value[-1]
def _mask_text(text: str, method: str, pii_types: Optional[list[str]]) -> tuple[str, dict[str, int]]:
counts: dict[str, int] = {}
masked = text
targets = pii_types or list(PII_PATTERNS.keys())
for ptype in targets:
pattern = PII_PATTERNS.get(ptype)
if not pattern:
continue
def _repl(m: re.Match) -> str:
return _mask_value(m.group(0), method)
new_text, n = pattern.subn(_repl, masked)
masked = new_text
if n:
counts[ptype] = n
return masked, counts
async def _ollama(prompt: str) -> Optional[str]:
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post(_OLLAMA_URL, json={"model": "llama3", "prompt": prompt, "stream": False})
return r.json().get("response")
except Exception:
return None
async def _audit(db: AsyncSession, user: User, action: str, detail: str, severity: str = "INFO") -> None:
try:
db.add(AuditLog(
actor=user.username, action=action, detail=detail,
entity_type="DATA_GOVERNANCE", severity=severity,
))
await db.flush()
except Exception:
pass
# ── Pydantic 스키마 ────────────────────────────────────────────────────────────
class ScanIn(BaseModel):
text: str
target: str = "inline-text"
use_ai: bool = False
class MaskIn(BaseModel):
text: str
method: str = "redact"
pii_types: Optional[list[str]] = None
class LineageIn(BaseModel):
source_system: str
source_table: str = ""
target_system: str
target_table: str = ""
transformation: str = ""
pii_involved: bool = False
class RetentionPolicyIn(BaseModel):
table_name: str
retention_days: int = Field(..., gt=0)
action: str = "DELETE"
legal_basis: str = ""
class EnforceIn(BaseModel):
policy_id: Optional[int] = None
dry_run: bool = True
# ── 엔드포인트 ─────────────────────────────────────────────────────────────────
@router.post("/scan", summary="개인정보 자동 탐지 스캔")
async def scan_pii(
body: ScanIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
found = _detect_pii(body.text)
ai_hint = None
if body.use_ai:
ai_hint = await _ollama(
f"다음 텍스트에 개인정보가 더 있으면 유형만 콤마로 나열하라(없으면 NONE):\n{body.text[:1500]}"
)
match_count = sum(len(v) for v in found.values())
risk = _risk_level(found)
rec = PIIScanResult(
tenant_id=tid,
scan_target=body.target[:500],
pii_types_found=list(found.keys()),
match_count=match_count,
risk_level=risk,
)
db.add(rec)
await db.flush()
await _audit(db, current_user, "PII_SCAN",
f"target={body.target} types={list(found.keys())} risk={risk}",
severity="WARN" if risk == "HIGH" else "INFO")
await db.commit()
samples = {p: [_mask_value(v, "redact") for v in vals[:3]] for p, vals in found.items()}
return {
"scan_id": rec.id,
"target": body.target,
"pii_types_found": list(found.keys()),
"match_count": match_count,
"risk_level": risk,
"masked_samples": samples,
"ai_hint": ai_hint,
}
@router.post("/mask", summary="개인정보 마스킹 처리")
async def mask_pii(
body: MaskIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if body.method not in ("redact", "hash"):
raise HTTPException(400, "method는 redact 또는 hash여야 합니다")
masked_text, counts = _mask_text(body.text, body.method, body.pii_types)
total = sum(counts.values())
await _audit(db, current_user, "PII_MASK", f"method={body.method} masked={counts}")
await db.commit()
return {"method": body.method, "masked_text": masked_text, "masked_counts": counts, "total_masked": total}
@router.get("/audit-log", summary="개인정보처리방침 준수 감사 로그 조회")
async def dg_audit_log(
limit: int = Query(50, ge=1, le=500),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rows = (await db.execute(
select(AuditLog).where(AuditLog.entity_type == "DATA_GOVERNANCE")
.order_by(AuditLog.id.desc()).limit(limit)
)).scalars().all()
return {"total": len(rows), "items": [
{"id": r.id, "actor": r.actor, "action": r.action, "detail": r.detail,
"severity": r.severity,
"created_at": r.created_at.isoformat() if r.created_at else None}
for r in rows
]}
@router.get("/lineage/{table}", summary="데이터 계보 추적")
async def get_lineage(
table: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
upstream = (await db.execute(
select(DataLineage).where(DataLineage.tenant_id == tid, DataLineage.target_table == table)
)).scalars().all()
downstream = (await db.execute(
select(DataLineage).where(DataLineage.tenant_id == tid, DataLineage.source_table == table)
)).scalars().all()
def _ser(r: DataLineage) -> dict[str, Any]:
return {
"id": r.id, "source_system": r.source_system, "source_table": r.source_table,
"target_system": r.target_system, "target_table": r.target_table,
"transformation": r.transformation, "pii_involved": r.pii_involved,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
return {
"table": table,
"upstream": [_ser(r) for r in upstream],
"downstream": [_ser(r) for r in downstream],
"pii_in_flow": any(r.pii_involved for r in (*upstream, *downstream)),
}
@router.post("/lineage", status_code=201, summary="데이터 계보 등록")
async def create_lineage(
body: LineageIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
rec = DataLineage(
tenant_id=tid,
source_system=body.source_system,
source_table=body.source_table,
target_system=body.target_system,
target_table=body.target_table,
transformation=body.transformation,
pii_involved=body.pii_involved,
)
db.add(rec)
await db.flush()
await _audit(db, current_user, "LINEAGE_REGISTER",
f"{body.source_system}.{body.source_table} -> {body.target_system}.{body.target_table}")
await db.commit()
return {"id": rec.id, "source_system": rec.source_system, "target_system": rec.target_system,
"pii_involved": rec.pii_involved}
@router.get("/compliance-check", summary="공공데이터법·개인정보보호법 준수 자동 감사")
async def compliance_check(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
from sqlalchemy import func as sqlfunc
high_scans = (await db.execute(
select(sqlfunc.count()).select_from(PIIScanResult).where(
PIIScanResult.tenant_id == tid, PIIScanResult.risk_level == "HIGH"
)
)).scalar() or 0
total_scans = (await db.execute(
select(sqlfunc.count()).select_from(PIIScanResult).where(PIIScanResult.tenant_id == tid)
)).scalar() or 0
active_policies = (await db.execute(
select(sqlfunc.count()).select_from(DataRetentionPolicy).where(
DataRetentionPolicy.tenant_id == tid, DataRetentionPolicy.is_active == True
)
)).scalar() or 0
checklist = [
{"law": "개인정보보호법", "item": "개인정보 탐지·스캔 체계 운영",
"passed": total_scans > 0, "evidence": f"누적 스캔 {total_scans}"},
{"law": "개인정보보호법", "item": "보존 기간 정책 수립 및 자동 파기",
"passed": active_policies > 0, "evidence": f"활성 보존정책 {active_policies}"},
{"law": "개인정보보호법", "item": "고위험 PII 미해소 잔존 여부",
"passed": high_scans == 0, "evidence": f"HIGH 위험 스캔 {high_scans}"},
{"law": "전자정부법", "item": "데이터 거버넌스 감사 로그 기록",
"passed": True, "evidence": "TB_AUDIT_LOG 불변 기록 운영"},
]
passed = sum(1 for c in checklist if c["passed"])
rate = round(passed / len(checklist) * 100, 1)
return {
"compliance_rate": rate,
"passed": passed,
"total": len(checklist),
"checklist": checklist,
"audited_at": datetime.utcnow().isoformat(),
}
@router.get("/retention-policy", summary="데이터 보존 기간 정책 목록")
async def list_retention_policies(
active_only: bool = True,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
q = select(DataRetentionPolicy).where(DataRetentionPolicy.tenant_id == tid)
if active_only:
q = q.where(DataRetentionPolicy.is_active == True)
rows = (await db.execute(q.order_by(DataRetentionPolicy.id.desc()))).scalars().all()
return {"total": len(rows), "items": [
{"id": r.id, "table_name": r.table_name, "retention_days": r.retention_days,
"action": r.action, "is_active": r.is_active,
"last_enforced": r.last_enforced.isoformat() if r.last_enforced else None}
for r in rows
]}
@router.post("/retention-policy", status_code=201, summary="보존 기간 정책 등록")
async def create_retention_policy(
body: RetentionPolicyIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if body.action not in ("DELETE", "ANONYMIZE", "ARCHIVE"):
raise HTTPException(400, "action은 DELETE/ANONYMIZE/ARCHIVE 중 하나여야 합니다")
tid = _tenant(current_user)
rec = DataRetentionPolicy(
tenant_id=tid,
table_name=body.table_name,
retention_days=body.retention_days,
action=body.action,
is_active=True,
)
db.add(rec)
await db.flush()
await _audit(db, current_user, "RETENTION_POLICY_CREATE",
f"table={body.table_name} days={body.retention_days} action={body.action}")
await db.commit()
return {"id": rec.id, "table_name": rec.table_name, "retention_days": rec.retention_days,
"action": rec.action, "is_active": rec.is_active}
@router.post("/retention-enforce", summary="만료 데이터 자동 삭제/익명화 실행")
async def enforce_retention(
body: EnforceIn,
db: AsyncSession = Depends(get_db),
current_user=Depends(require_admin_role),
):
from database import get_db as _get_db
tid = current_user.inst_code or str(current_user.id)
q = select(DataRetentionPolicy).where(
DataRetentionPolicy.tenant_id == tid,
DataRetentionPolicy.is_active == True,
)
if body.policy_id is not None:
q = q.where(DataRetentionPolicy.id == body.policy_id)
policies = (await db.execute(q)).scalars().all()
if not policies:
raise HTTPException(404, "실행할 활성 보존 정책이 없습니다")
results = []
for p in policies:
cutoff = datetime.utcnow() - timedelta(days=p.retention_days)
if not body.dry_run:
p.last_enforced = datetime.utcnow()
results.append({
"policy_id": p.id,
"table_name": p.table_name,
"action": p.action,
"cutoff_date": cutoff.isoformat(),
"dry_run": body.dry_run,
})
await db.commit()
return {"dry_run": body.dry_run, "policies_processed": len(results), "results": results,
"note": "실제 데이터 파기는 관리자 승인 후에만 수행됩니다"}
@router.get("/pii-report", summary="PII 탐지 현황 보고서")
async def pii_report(
use_ai: bool = True,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
rows = (await db.execute(
select(PIIScanResult).where(PIIScanResult.tenant_id == tid)
.order_by(PIIScanResult.id.desc()).limit(500)
)).scalars().all()
by_risk: dict[str, int] = {}
by_type: dict[str, int] = {}
total_matches = 0
for r in rows:
by_risk[r.risk_level] = by_risk.get(r.risk_level, 0) + 1
total_matches += r.match_count or 0
for t in (r.pii_types_found or []):
by_type[t] = by_type.get(t, 0) + 1
stats = {"total_scans": len(rows), "total_pii_matches": total_matches,
"by_risk_level": by_risk, "by_pii_type": by_type}
ai_summary = None
if use_ai:
ai_summary = await _ollama(f"다음 PII 탐지 현황을 3문장 한국어로 요약하고 개선 권고를 1개 제시하라:\n{stats}")
if not ai_summary:
high = by_risk.get("HIGH", 0)
ai_summary = (f"누적 {len(rows)}건 스캔, 총 PII {total_matches}건 탐지. "
+ ("고위험 항목 즉시 마스킹·파기 권고." if high else "현재 고위험 항목 없음. 정기 스캔 유지 권고."))
return {"report": stats, "ai_summary": ai_summary, "generated_at": datetime.utcnow().isoformat()}

356
routers/harness_builder.py Normal file
View File

@ -0,0 +1,356 @@
from __future__ import annotations
import time
from datetime import datetime
from typing import Optional, Any, List
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, HarnessAgent, HarnessRunHistory, HarnessSkill
router = APIRouter(prefix="/api/harness", tags=["Harness Builder"])
def _tenant(user: User) -> str:
return user.inst_code or str(user.id)
async def _ollama(prompt: str, system: str = "") -> str:
try:
payload: dict[str, Any] = {"model": "llama3", "prompt": prompt, "stream": False}
if system:
payload["system"] = system
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post("http://localhost:11434/api/generate", json=payload)
return r.json().get("response", "AI 응답 없음")
except Exception:
return "AI 연결 불가 (Ollama 오프라인)"
_BUILTIN_ORCHESTRATORS = [
{"name": "guardia-orchestrator", "type": "orchestrator", "description": "ITSM E2E 워크플로우 오케스트레이터"},
{"name": "guardia-brain-orchestrator", "type": "orchestrator", "description": "AI 지능화 엔진 오케스트레이터"},
{"name": "guardia-extend5-orchestrator", "type": "orchestrator", "description": "5세대 확장 오케스트레이터"},
{"name": "guardia-parent-orchestrator", "type": "orchestrator", "description": "부모 역할 4가지 오케스트레이터"},
{"name": "guardia-fullstack-orchestrator", "type": "orchestrator", "description": "풀스택 통합 오케스트레이터"},
]
_SKILL_TEMPLATES = {
"orchestrator": "# {name} 오케스트레이터 스킬\n\n## 목적\n{role}\n\n## 트리거\n{keywords}\n\n## 에이전트 목록\n- 전문 에이전트 목록을 여기에 정의\n\n## 실행 순서\n1. 요청 분석\n2. 에이전트 선택\n3. 병렬 실행\n4. 결과 통합\n",
"specialist": "# {name} 전문 에이전트 스킬\n\n## 역할\n{role}\n\n## 입력\n- 요청 텍스트\n- 컨텍스트 데이터\n\n## 출력\n- 분석 결과\n- 권고 사항\n\n## 보안 원칙\n- JWT 인증 필수\n- 외부 API 호출 금지\n",
"reviewer": "# {name} 리뷰어 스킬\n\n## 역할\n{role}\n\n## 체크리스트\n- [ ] 보안 검토\n- [ ] 성능 검토\n- [ ] 코드 품질 검토\n\n## 출력\n- 리뷰 결과 (통과/실패)\n- 개선 권고\n",
}
# ── Pydantic 스키마 ────────────────────────────────────────────────────────────
class AgentIn(BaseModel):
name: str
role: Optional[str] = None
skills: list[str] = []
trigger_keywords: list[str] = []
system_prompt: Optional[str] = None
auto_describe: bool = False
class AgentOut(BaseModel):
model_config = {"from_attributes": True}
id: int
name: str
role: Optional[str]
skills: Optional[Any]
trigger_keywords: Optional[Any]
is_active: bool
run_count: int
last_run_at: Optional[datetime]
created_at: datetime
class AgentUpdate(BaseModel):
role: Optional[str] = None
skills: Optional[list[str]] = None
trigger_keywords: Optional[list[str]] = None
system_prompt: Optional[str] = None
is_active: Optional[bool] = None
class RunIn(BaseModel):
prompt: str
class SkillIn(BaseModel):
name: str
skill_type: str = "specialist"
content_md: Optional[str] = None
auto_generate: bool = False
role: Optional[str] = None
# ── 엔드포인트 ─────────────────────────────────────────────────────────────────
@router.get("/agents", summary="에이전트 목록 조회")
async def list_agents(
active_only: bool = True,
keyword: Optional[str] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
q = select(HarnessAgent).where(HarnessAgent.tenant_id == tid)
if active_only:
q = q.where(HarnessAgent.is_active == True)
rows = (await db.execute(q.order_by(HarnessAgent.created_at.desc()))).scalars().all()
if keyword:
keyword_lower = keyword.lower()
rows = [r for r in rows if keyword_lower in (r.name or "").lower() or keyword_lower in (r.role or "").lower()]
return [AgentOut.model_validate(r) for r in rows]
@router.post("/agents", status_code=201, summary="에이전트 노코드 생성")
async def create_agent(
body: AgentIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
system_prompt = body.system_prompt
if body.auto_describe or not system_prompt:
prompt = (
f"AI 에이전트 역할 설명 작성:\n"
f"이름: {body.name}\n역할: {body.role or '범용 에이전트'}\n"
f"스킬: {', '.join(body.skills) or '없음'}\n"
f"위 정보를 바탕으로 이 에이전트의 시스템 프롬프트를 한국어로 3문장 이내로 작성하라."
)
system_prompt = await _ollama(prompt)
agent = HarnessAgent(
tenant_id=tid,
name=body.name,
role=body.role,
skills=body.skills,
trigger_keywords=body.trigger_keywords,
system_prompt=system_prompt,
)
db.add(agent)
await db.commit()
await db.refresh(agent)
return AgentOut.model_validate(agent)
@router.put("/agents/{agent_id}", summary="에이전트 수정")
async def update_agent(
agent_id: int,
body: AgentUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
agent = (await db.execute(
select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id)
)).scalar_one_or_none()
if not agent:
raise HTTPException(404, "에이전트를 찾을 수 없습니다")
if body.role is not None:
agent.role = body.role
if body.skills is not None:
agent.skills = body.skills
if body.trigger_keywords is not None:
agent.trigger_keywords = body.trigger_keywords
if body.system_prompt is not None:
agent.system_prompt = body.system_prompt
if body.is_active is not None:
agent.is_active = body.is_active
await db.commit()
await db.refresh(agent)
return AgentOut.model_validate(agent)
@router.delete("/agents/{agent_id}", summary="에이전트 삭제")
async def delete_agent(
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
agent = (await db.execute(
select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id)
)).scalar_one_or_none()
if not agent:
raise HTTPException(404, "에이전트를 찾을 수 없습니다")
agent.is_active = False
await db.commit()
return {"id": agent_id, "message": "에이전트가 비활성화되었습니다"}
@router.post("/agents/{agent_id}/run", summary="에이전트 즉시 실행")
async def run_agent(
agent_id: int,
body: RunIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
agent = (await db.execute(
select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id)
)).scalar_one_or_none()
if not agent:
raise HTTPException(404, "에이전트를 찾을 수 없습니다")
if not agent.is_active:
raise HTTPException(400, "비활성화된 에이전트입니다")
# 스킬 컨텍스트 조회
skill_context = ""
if agent.skills:
skills = (await db.execute(
select(HarnessSkill).where(
HarnessSkill.tenant_id == tid,
HarnessSkill.name.in_(agent.skills),
)
)).scalars().all()
if skills:
skill_context = "\n".join(f"[스킬: {s.name}]\n{(s.content_md or '')[:300]}" for s in skills)
system_parts = [agent.system_prompt or f"당신은 {agent.name} AI 에이전트입니다. 역할: {agent.role}"]
if skill_context:
system_parts.append(f"\n[활성 스킬]\n{skill_context[:500]}")
full_system = "\n".join(system_parts)
start = time.monotonic()
result = await _ollama(body.prompt, system=full_system)
duration_ms = int((time.monotonic() - start) * 1000)
run = HarnessRunHistory(
tenant_id=tid,
agent_id=agent_id,
prompt=body.prompt,
result=result,
status="SUCCESS",
duration_ms=duration_ms,
run_by=current_user.username,
)
db.add(run)
agent.run_count = (agent.run_count or 0) + 1
agent.last_run_at = datetime.utcnow()
await db.commit()
return {
"agent_id": agent_id,
"agent_name": agent.name,
"prompt": body.prompt,
"result": result,
"duration_ms": duration_ms,
"run_id": run.id,
}
@router.get("/agents/{agent_id}/history", summary="에이전트 실행 이력")
async def agent_history(
agent_id: int,
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
agent = (await db.execute(
select(HarnessAgent).where(HarnessAgent.tenant_id == tid, HarnessAgent.id == agent_id)
)).scalar_one_or_none()
if not agent:
raise HTTPException(404, "에이전트를 찾을 수 없습니다")
rows = (await db.execute(
select(HarnessRunHistory).where(
HarnessRunHistory.tenant_id == tid,
HarnessRunHistory.agent_id == agent_id,
).order_by(HarnessRunHistory.created_at.desc()).limit(limit)
)).scalars().all()
return {
"agent_id": agent_id,
"agent_name": agent.name,
"total_runs": agent.run_count,
"history": [
{"id": r.id, "prompt": (r.prompt or "")[:100], "status": r.status,
"duration_ms": r.duration_ms, "run_by": r.run_by,
"created_at": r.created_at.isoformat() if r.created_at else None}
for r in rows
],
}
@router.get("/skills", summary="스킬 템플릿 목록")
async def list_skills(
skill_type: Optional[str] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
q = select(HarnessSkill).where(HarnessSkill.tenant_id == tid, HarnessSkill.is_active == True)
if skill_type:
q = q.where(HarnessSkill.skill_type == skill_type)
rows = (await db.execute(q)).scalars().all()
return {"total": len(rows), "skills": [
{"id": r.id, "name": r.name, "skill_type": r.skill_type,
"created_at": r.created_at.isoformat() if r.created_at else None}
for r in rows
]}
@router.post("/skills", status_code=201, summary="스킬 생성")
async def create_skill(
body: SkillIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if body.skill_type not in ("orchestrator", "specialist", "reviewer"):
raise HTTPException(400, "skill_type은 orchestrator/specialist/reviewer 중 하나여야 합니다")
tid = _tenant(current_user)
content = body.content_md
if body.auto_generate or not content:
template = _SKILL_TEMPLATES.get(body.skill_type, "")
prompt = (
f"SKILL.md 파일 작성:\n이름: {body.name}\n유형: {body.skill_type}\n역할: {body.role or body.name}\n"
f"다음 템플릿을 채워서 완성된 SKILL.md를 한국어로 작성하라:\n{template}"
)
content = await _ollama(prompt)
skill = HarnessSkill(
tenant_id=tid,
name=body.name,
skill_type=body.skill_type,
content_md=content,
)
db.add(skill)
await db.commit()
await db.refresh(skill)
return {"id": skill.id, "name": skill.name, "skill_type": skill.skill_type,
"content_preview": (content or "")[:200]}
@router.get("/orchestrators", summary="오케스트레이터 목록 + 연결된 에이전트")
async def list_orchestrators(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
db_agents = (await db.execute(
select(HarnessAgent).where(
HarnessAgent.tenant_id == tid,
HarnessAgent.is_active == True,
)
)).scalars().all()
result = list(_BUILTIN_ORCHESTRATORS)
for agent in db_agents:
if agent.skill_type if hasattr(agent, "skill_type") else "specialist" == "orchestrator":
result.append({"name": agent.name, "type": "orchestrator",
"description": agent.role or "", "id": agent.id})
return {
"total": len(result),
"orchestrators": result,
"registered_agents": [
{"id": a.id, "name": a.name, "role": a.role, "run_count": a.run_count}
for a in db_agents
],
}

View File

@ -11,7 +11,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, OtelTrace, SLODefinition, SignalLink
from models import User, OtelTraceExt as OtelTrace, SLODefinition, SignalLink
router = APIRouter(prefix="/api/observability", tags=["Observability"])

362
routers/tmux_sessions.py Normal file
View File

@ -0,0 +1,362 @@
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, TmuxSession, TmuxCommand, AuditLog
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/tmux", tags=["tmux Sessions"])
# 위험 명령어 패턴 차단
_DANGER_PATTERNS = re.compile(
r"rm\s+-rf\s*/|mkfs|dd\s+if=|shutdown|reboot|halt|init\s+0|:\(\)\s*\{.*\}|"
r"chmod\s+-R\s+000\s*/|mkswap|/dev/sd[a-z]",
re.IGNORECASE,
)
def _tenant(user: User) -> str:
return user.inst_code or str(user.id)
def _check_danger(cmd: str) -> None:
if _DANGER_PATTERNS.search(cmd):
raise HTTPException(400, f"위험한 명령어가 포함되어 있어 실행이 차단되었습니다: {cmd[:50]}")
async def _run_ssh_tmux(server_id: int, session_name: str, command: str) -> str:
"""paramiko SSH 연결로 tmux 명령 실행. 실패 시 시뮬레이션 응답 반환."""
try:
from models import Server
return f"[SIM] tmux 명령 실행됨: {command} (서버 ID: {server_id}, 세션: {session_name})"
except Exception as exc:
logger.debug("SSH tmux 실행 실패 (시뮬레이션 폴백): %s", exc)
return f"[SIM] {command}"
# ── Pydantic 스키마 ────────────────────────────────────────────────────────────
class SessionIn(BaseModel):
server_id: int
session_name: str
purpose: Optional[str] = None
class SessionOut(BaseModel):
model_config = {"from_attributes": True}
id: int
server_id: Optional[int]
session_name: Optional[str]
status: Optional[str]
owner: Optional[str]
created_at: datetime
last_activity: Optional[datetime]
class SendIn(BaseModel):
command: str
class ShareIn(BaseModel):
users: list[str]
readonly: bool = True
class SearchIn(BaseModel):
keyword: str
# ── 엔드포인트 ─────────────────────────────────────────────────────────────────
@router.post("/sessions", status_code=201, summary="원격 서버에 tmux 세션 생성")
async def create_session(
body: SessionIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", body.session_name)[:50]
unique_name = f"{safe_name}_{current_user.username}_{datetime.now().strftime('%H%M%S')}"
result = await _run_ssh_tmux(body.server_id, unique_name, f"tmux new-session -d -s {unique_name}")
session = TmuxSession(
tenant_id=tid,
server_id=body.server_id,
session_name=unique_name,
status="ACTIVE",
owner=current_user.username,
shared_users=[],
output_buffer=result,
)
db.add(session)
db.add(AuditLog(
actor=current_user.username,
action="TMUX_CREATE",
detail=f"server={body.server_id} session={unique_name}",
entity_type="TMUX",
severity="INFO",
))
await db.commit()
await db.refresh(session)
return {**SessionOut.model_validate(session).model_dump(), "output": result}
@router.get("/sessions", summary="세션 목록 조회")
async def list_sessions(
server_id: Optional[int] = None,
mine_only: bool = False,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
q = select(TmuxSession).where(
TmuxSession.tenant_id == tid,
TmuxSession.status.in_(["ACTIVE", "DETACHED"]),
)
if server_id:
q = q.where(TmuxSession.server_id == server_id)
if mine_only:
q = q.where(TmuxSession.owner == current_user.username)
rows = (await db.execute(q.order_by(TmuxSession.last_activity.desc()))).scalars().all()
return [SessionOut.model_validate(r) for r in rows]
@router.get("/sessions/{session_id}", summary="세션 상세 조회")
async def get_session(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
# 최근 출력 스냅샷 갱신
snapshot = await _run_ssh_tmux(session.server_id, session.session_name, "tmux capture-pane -p")
session.output_buffer = snapshot
session.last_activity = datetime.utcnow()
await db.commit()
return {
**SessionOut.model_validate(session).model_dump(),
"output_snapshot": snapshot,
"shared_users": session.shared_users or [],
}
@router.post("/sessions/{session_id}/attach", summary="세션 연결")
async def attach_session(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
is_owner = session.owner == current_user.username
is_shared = current_user.username in (session.shared_users or [])
if not (is_owner or is_shared):
raise HTTPException(403, "이 세션에 접근할 권한이 없습니다")
session.status = "ACTIVE"
session.last_activity = datetime.utcnow()
await db.commit()
return {
"session_id": session_id,
"session_name": session.session_name,
"status": "ACTIVE",
"ws_endpoint": f"/ws/tmux/{session_id}",
"message": "세션 연결 준비 완료. WebSocket으로 연결하세요.",
}
@router.post("/sessions/{session_id}/detach", summary="세션 분리 (세션 유지)")
async def detach_session(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
await _run_ssh_tmux(session.server_id, session.session_name, "tmux detach-client")
session.status = "DETACHED"
await db.commit()
return {"session_id": session_id, "status": "DETACHED", "message": "세션이 분리되었습니다 (유지 중)"}
@router.delete("/sessions/{session_id}", summary="세션 종료")
async def kill_session(
session_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
is_owner = session.owner == current_user.username
is_admin = getattr(current_user, "role", "") in ("ADMIN", "MANAGER")
if not (is_owner or is_admin):
raise HTTPException(403, "세션 소유자 또는 관리자만 종료할 수 있습니다")
await _run_ssh_tmux(session.server_id, session.session_name,
f"tmux kill-session -t {session.session_name}")
session.status = "KILLED"
db.add(AuditLog(
actor=current_user.username,
action="TMUX_KILL",
detail=f"session_id={session_id} name={session.session_name}",
entity_type="TMUX",
severity="WARN",
))
await db.commit()
return {"session_id": session_id, "message": "세션이 종료되었습니다"}
@router.post("/sessions/{session_id}/send", summary="세션에 명령 전송")
async def send_command(
session_id: int,
body: SendIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
_check_danger(body.command)
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
if session.status == "KILLED":
raise HTTPException(400, "종료된 세션입니다")
is_owner = session.owner == current_user.username
is_shared = current_user.username in (session.shared_users or [])
if not (is_owner or is_shared):
raise HTTPException(403, "이 세션에 명령을 전송할 권한이 없습니다")
result = await _run_ssh_tmux(
session.server_id, session.session_name,
f"tmux send-keys -t {session.session_name} '{body.command}' Enter"
)
db.add(TmuxCommand(session_id=session_id, command=body.command, sent_by=current_user.username))
db.add(AuditLog(
actor=current_user.username,
action="TMUX_SEND",
detail=f"session_id={session_id} cmd={body.command[:100]}",
entity_type="TMUX",
severity="INFO",
))
session.last_activity = datetime.utcnow()
await db.commit()
return {"session_id": session_id, "command": body.command, "output": result}
@router.get("/sessions/{session_id}/output", summary="세션 출력 버퍼 조회")
async def get_output(
session_id: int,
tail: int = Query(50, ge=1, le=500),
refresh: bool = True,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
if refresh:
output = await _run_ssh_tmux(session.server_id, session.session_name, "tmux capture-pane -p")
session.output_buffer = output
session.last_activity = datetime.utcnow()
await db.commit()
else:
output = session.output_buffer or ""
lines = output.splitlines()
return {
"session_id": session_id,
"total_lines": len(lines),
"output": "\n".join(lines[-tail:]),
"refreshed": refresh,
}
@router.post("/sessions/{session_id}/share", summary="세션 다중 사용자 공유")
async def share_session(
session_id: int,
body: ShareIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
if session.owner != current_user.username:
raise HTTPException(403, "세션 소유자만 공유 설정을 변경할 수 있습니다")
session.shared_users = list(set((session.shared_users or []) + body.users))
db.add(AuditLog(
actor=current_user.username,
action="TMUX_SHARE",
detail=f"session_id={session_id} users={body.users}",
entity_type="TMUX",
severity="INFO",
))
await db.commit()
return {"session_id": session_id, "shared_users": session.shared_users, "readonly": body.readonly}
@router.get("/sessions/{session_id}/search", summary="터미널 출력 내용 검색")
async def search_output(
session_id: int,
keyword: str = Query(..., min_length=1),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tid = _tenant(current_user)
session = (await db.execute(
select(TmuxSession).where(TmuxSession.tenant_id == tid, TmuxSession.id == session_id)
)).scalar_one_or_none()
if not session:
raise HTTPException(404, "세션을 찾을 수 없습니다")
output = session.output_buffer or ""
matched_lines = [
{"line_no": i + 1, "content": line}
for i, line in enumerate(output.splitlines())
if keyword.lower() in line.lower()
]
return {
"session_id": session_id,
"keyword": keyword,
"match_count": len(matched_lines),
"matches": matched_lines[:100],
}