diff --git a/itsm/main.py b/itsm/main.py index 1fa45640..d049e0e2 100644 --- a/itsm/main.py +++ b/itsm/main.py @@ -45,6 +45,12 @@ from routers import ( compliance, jmeter, public_checklist, + customer_portal, + groupware, + siem, + topology, + portfolio, + infra_ext, ) @@ -256,6 +262,19 @@ app.include_router(jmeter.router) # 공공기관 필수 기능 체크리스트 app.include_router(public_checklist.router) +# 추가 기능 +app.include_router(customer_portal.router) # 고객 셀프서비스 포털 +app.include_router(groupware.router) # 그룹웨어 전자결재 연동 +app.include_router(siem.router) # SIEM 보안 이벤트 연동 +app.include_router(topology.router) # 네트워크 토폴로지 시각화 +app.include_router(portfolio.router) # 포트폴리오 + 리소스 관리 +app.include_router(infra_ext.router) # Zero Trust + K8s + ERP + + +@app.get("/topology") +async def topology_page(): + return FileResponse("static/index.html") + app.mount("/static", StaticFiles(directory="static"), name="static") diff --git a/itsm/routers/customer_portal.py b/itsm/routers/customer_portal.py new file mode 100644 index 00000000..4a4e92bf --- /dev/null +++ b/itsm/routers/customer_portal.py @@ -0,0 +1,453 @@ +""" +고객사 셀프서비스 포털 API + +기능: + - SR 접수 / 상태 조회 / 이력 조회 + - AI FAQ 자가해결 추천 (SR 접수 전) + - 서비스 카탈로그 셀프 주문 + - 만족도 평가 + - 공지사항 조회 + +엔드포인트: + POST /api/portal/sr — SR 접수 + GET /api/portal/sr — 내 SR 목록 + GET /api/portal/sr/{sr_id} — SR 상세 + 처리 이력 + POST /api/portal/sr/{sr_id}/rate — 처리 만족도 평가 + POST /api/portal/faq/suggest — AI 자가해결 추천 + GET /api/portal/catalog — 서비스 카탈로그 (고객용) + GET /api/portal/announcements — 공지사항 + GET /api/portal/stats — 내 기관 통계 +""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import ( + SRRequest, SRStatus, SRCreate, AuditLog, + Institution, Rating, User, UserRole, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/portal", tags=["portal"]) + + +def _require_customer(user: User) -> User: + """고객 포털은 CUSTOMER 또는 모든 역할 허용 (기관 필터링으로 접근 제어).""" + return user + + +# ── SR 접수 ─────────────────────────────────────────────────────────────────── + +class PortalSRCreate(BaseModel): + title: str + description: Optional[str] = None + sr_type: str = "INQUIRY" + priority: str = "MEDIUM" + category: Optional[str] = None # 카테고리 선택 + + +@router.post("/sr", status_code=201) +async def portal_create_sr( + body: PortalSRCreate, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """고객 포털 SR 접수.""" + from uuid import uuid4 + sr_id = f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}" + + # 기관 조회 + inst_id = None + if cu.inst_code: + inst = (await db.execute( + select(Institution).where(Institution.inst_code == cu.inst_code) + )).scalars().first() + if inst: + inst_id = inst.id + + sr = SRRequest( + sr_id = sr_id, + inst_id = inst_id, + sr_type = body.sr_type, + title = body.title, + description = body.description, + status = SRStatus.RECEIVED, + priority = body.priority, + requested_by= cu.username, + ) + db.add(sr) + + # AI 자동 분류 (백그라운드) + import asyncio as _aio + async def _classify(): + try: + from core.ticket_classifier import classify_ticket + import json as _j + suggestion = await classify_ticket(body.title, body.description or "") + async with (await db.connection()).begin(): + sr.ai_suggestion = _j.dumps(suggestion, ensure_ascii=False) + except Exception: + pass + + await db.commit() + await db.refresh(sr) + _aio.create_task(_classify()) + + # SLA 설정 + from core.sla import set_sla_on_create + async with db.begin(): + await set_sla_on_create(sr.sr_id, db) + + return { + "sr_id": sr.sr_id, + "status": sr.status, + "message": f"서비스 요청이 접수되었습니다. 담당자가 검토 후 연락드리겠습니다.", + "created_at": sr.created_at.isoformat() if sr.created_at else None, + "sla_deadline": sr.sla_deadline.isoformat() if sr.sla_deadline else None, + } + + +# ── SR 목록 (내 기관) ───────────────────────────────────────────────────────── + +@router.get("/sr") +async def portal_list_sr( + status: Optional[str] = None, + limit: int = 20, + skip: int = 0, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """고객 포털 내 SR 목록 (기관 필터 자동 적용).""" + q = select(SRRequest).order_by(SRRequest.created_at.desc()) + + if cu.role == UserRole.CUSTOMER and cu.inst_code: + inst = (await db.execute( + select(Institution).where(Institution.inst_code == cu.inst_code) + )).scalars().first() + if inst: + q = q.where(SRRequest.inst_id == inst.id) + else: + q = q.where(SRRequest.requested_by == cu.username) + else: + q = q.where(SRRequest.requested_by == cu.username) + + if status: + q = q.where(SRRequest.status == status) + + rows = (await db.execute(q.offset(skip).limit(limit))).scalars().all() + + return [ + { + "sr_id": r.sr_id, + "title": r.title, + "status": r.status, + "priority": r.priority, + "created_at": r.created_at.isoformat() if r.created_at else None, + "sla_deadline":r.sla_deadline.isoformat() if r.sla_deadline else None, + "sla_breached":r.sla_breached, + "assigned_to": r.assigned_to, + } + for r in rows + ] + + +# ── SR 상세 + 처리 이력 ─────────────────────────────────────────────────────── + +@router.get("/sr/{sr_id}") +async def portal_sr_detail( + sr_id: str, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """SR 상세 + 감사 이력 (타임라인 형식).""" + sr = (await db.execute( + select(SRRequest).where(SRRequest.sr_id == sr_id) + )).scalars().first() + + if not sr: + raise HTTPException(404, "SR을 찾을 수 없습니다.") + + # 감사 이력 + logs = (await db.execute( + select(AuditLog).where(AuditLog.sr_id == sr_id) + .order_by(AuditLog.id.asc()) + )).scalars().all() + + timeline = [ + { + "action": l.action, + "actor": l.actor, + "detail": l.detail, + "created_at": l.created_at.isoformat() if l.created_at else None, + } + for l in logs + ] + + # 진행률 계산 + STATUS_PROGRESS = { + "RECEIVED": 10, "PARSED": 20, "PENDING_APPROVAL": 35, + "APPROVED": 50, "IN_PROGRESS": 70, "PENDING_PM_VALIDATION": 85, + "COMPLETED": 100, "REJECTED": 0, "FAILED_ROLLBACK": 0, + } + + return { + "sr_id": sr.sr_id, + "title": sr.title, + "description": sr.description, + "status": sr.status, + "priority": sr.priority, + "sr_type": sr.sr_type, + "requested_by":sr.requested_by, + "assigned_to": sr.assigned_to, + "target_server":sr.target_server, + "created_at": sr.created_at.isoformat() if sr.created_at else None, + "updated_at": sr.updated_at.isoformat() if sr.updated_at else None, + "sla_deadline":sr.sla_deadline.isoformat() if sr.sla_deadline else None, + "sla_breached":sr.sla_breached, + "progress_pct":STATUS_PROGRESS.get(sr.status, 0), + "timeline": timeline, + } + + +# ── 만족도 평가 ─────────────────────────────────────────────────────────────── + +class RatingRequest(BaseModel): + score: int # 1-5 + comment: Optional[str] = None + + +@router.post("/sr/{sr_id}/rate") +async def portal_rate_sr( + sr_id: str, + body: RatingRequest, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """SR 처리 만족도 평가 (1~5점).""" + if not 1 <= body.score <= 5: + raise HTTPException(400, "점수는 1~5 사이여야 합니다.") + + sr = (await db.execute(select(SRRequest).where(SRRequest.sr_id == sr_id))).scalars().first() + if not sr: + raise HTTPException(404, "SR을 찾을 수 없습니다.") + if sr.status != SRStatus.COMPLETED: + raise HTTPException(400, "완료된 SR에만 평가할 수 있습니다.") + + # 기존 평가 확인 + existing = (await db.execute( + select(Rating).where(Rating.sr_id == sr_id) + )).scalars().first() + + if existing: + existing.score = body.score + existing.comment = body.comment + existing.rated_at = datetime.now() + else: + db.add(Rating( + sr_id = sr_id, + score = body.score, + comment = body.comment, + rated_by= cu.username, + )) + + await db.commit() + return {"message": f"평가가 등록되었습니다. (점수: {body.score}/5)", "sr_id": sr_id} + + +# ── AI FAQ 자가해결 추천 ────────────────────────────────────────────────────── + +class FAQRequest(BaseModel): + query: str # 문제 설명 또는 SR 제목 + + +@router.post("/faq/suggest") +async def faq_suggest( + body: FAQRequest, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """SR 접수 전 AI가 유사 KB 문서를 추천하여 자가해결 유도.""" + from models import KBDocument + from sqlalchemy import select, or_ + + # 1. 키워드 기반 KB 검색 + keywords = body.query.replace(".", " ").replace(",", " ").split()[:5] + conditions = [KBDocument.title.contains(kw) for kw in keywords if len(kw) >= 2] + conditions += [KBDocument.solution.contains(kw) for kw in keywords if len(kw) >= 2] + conditions += [KBDocument.symptoms.contains(kw) for kw in keywords if len(kw) >= 2] + + kb_results = [] + if conditions: + rows = (await db.execute( + select(KBDocument).where(or_(*conditions)).limit(5) + )).scalars().all() + kb_results = [ + { + "kb_id": r.id, + "title": r.title, + "summary": (getattr(r, "solution", None) or getattr(r, "content", None) or "")[:200], + "relevance": "HIGH" if any(kw in (r.title or "").lower() for kw in keywords) else "MEDIUM", + } + for r in rows + ] + + # 2. Ollama LLM으로 자가해결 방법 생성 + llm_answer = None + if not kb_results: + try: + from core.llm_client import get_llm_client + prompt = ( + f"다음 IT 문제에 대해 간단한 자가해결 방법을 3단계로 알려주세요:\n" + f"문제: {body.query}\n" + f"형식: 1. 첫 번째 단계 2. 두 번째 단계 3. 세 번째 단계" + ) + client = get_llm_client() + resp = await client.chat(prompt) + llm_answer = resp.content.strip()[:500] + except Exception: + pass + + # 3. 유사 SR 이력 조회 + past_srs = (await db.execute( + select(SRRequest) + .where( + SRRequest.status == SRStatus.COMPLETED, + or_(*[SRRequest.title.contains(kw) for kw in keywords if len(kw) >= 2]) + ) + .limit(3) + )).scalars().all() if keywords else [] + + similar_srs = [ + { + "sr_id": s.sr_id, + "title": s.title, + "summary": "이미 해결된 유사 사례입니다. 담당자에게 동일 조치를 요청하세요.", + } + for s in past_srs + ] + + # 자가해결 가능 여부 판단 + can_self_solve = bool(kb_results or llm_answer) + + return { + "query": body.query, + "can_self_solve": can_self_solve, + "kb_articles": kb_results, + "llm_guide": llm_answer, + "similar_srs": similar_srs, + "message": ( + "아래 자료로 문제를 해결해 보세요. 해결되지 않으면 SR을 접수하세요." + if can_self_solve else + "유사한 해결 사례를 찾지 못했습니다. SR을 접수해 주세요." + ), + } + + +# ── 서비스 카탈로그 (고객용) ────────────────────────────────────────────────── + +@router.get("/catalog") +async def portal_catalog( + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """고객사 이용 가능한 서비스 카탈로그.""" + from models import ServiceItem + rows = (await db.execute( + select(ServiceItem).where(ServiceItem.status == "ACTIVE") + .order_by(ServiceItem.category, ServiceItem.name) + )).scalars().all() + + by_cat: dict = {} + for r in rows: + cat = r.category or "기타" + by_cat.setdefault(cat, []).append({ + "service_id": r.service_id, + "name": r.name, + "description": r.description, + "sla_hours": getattr(r, "sla_hours", None), + "category": cat, + }) + + return {"categories": list(by_cat.keys()), "by_category": by_cat} + + +# ── 공지사항 ────────────────────────────────────────────────────────────────── + +_ANNOUNCEMENTS = [ + { + "id": 1, + "title": "GUARDiA ITSM 2.0 업데이트 안내", + "content": "PMS 기능 및 AI 보고서 자동 생성 기능이 추가되었습니다.", + "category": "업데이트", + "published": "2026-05-29", + "pinned": True, + }, + { + "id": 2, + "title": "SR 처리 시간 단축 안내", + "content": "AI 자동 분류 적용으로 평균 처리 시간이 30% 단축되었습니다.", + "category": "운영", + "published": "2026-05-28", + "pinned": False, + }, +] + + +@router.get("/announcements") +async def portal_announcements(_u: User = Depends(get_current_user)): + """공지사항 목록.""" + return {"announcements": sorted(_ANNOUNCEMENTS, key=lambda x: (not x["pinned"], x["published"]), reverse=False)} + + +# ── 내 기관 통계 ────────────────────────────────────────────────────────────── + +@router.get("/stats") +async def portal_stats( + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """고객 포털 — 내 기관 SR 통계.""" + q_base = select(SRRequest) + if cu.role == UserRole.CUSTOMER and cu.inst_code: + inst = (await db.execute( + select(Institution).where(Institution.inst_code == cu.inst_code) + )).scalars().first() + if inst: + q_base = q_base.where(SRRequest.inst_id == inst.id) + else: + q_base = q_base.where(SRRequest.requested_by == cu.username) + + rows = (await db.execute(q_base)).scalars().all() + total = len(rows) + completed = sum(1 for r in rows if r.status == SRStatus.COMPLETED) + in_prog = sum(1 for r in rows if r.status == SRStatus.IN_PROGRESS) + breached = sum(1 for r in rows if r.sla_breached) + + # 평균 평점 + rated = [r for r in rows if r.sr_id] + ratings = (await db.execute( + select(Rating).where(Rating.sr_id.in_([r.sr_id for r in rated[:50]])) + )).scalars().all() + avg_score = round(sum(r.score for r in ratings) / len(ratings), 1) if ratings else None + + return { + "total": total, + "completed": completed, + "in_progress": in_prog, + "sla_breached": breached, + "completion_rate":round(completed / total * 100, 1) if total else 0.0, + "avg_satisfaction":avg_score, + "by_status": { + s: sum(1 for r in rows if r.status == s) + for s in ["RECEIVED", "IN_PROGRESS", "COMPLETED", "REJECTED"] + }, + } diff --git a/itsm/routers/groupware.py b/itsm/routers/groupware.py new file mode 100644 index 00000000..a9b72666 --- /dev/null +++ b/itsm/routers/groupware.py @@ -0,0 +1,335 @@ +""" +그룹웨어 전자결재 연동 API + +지원 플랫폼: + - 카카오워크 (KAKAOWORK_BOT_TOKEN) + - 네이버웍스 (NAVER_WORKS_BOT_ID / NAVER_WORKS_TOKEN) + - 한컴오피스 (HANCOM_WEBHOOK_URL) + - 사용자 정의 웹훅 (CUSTOM_APPROVAL_WEBHOOK_URL) + +기능: + 1. SR 승인 요청 → 그룹웨어 결재 라인으로 발송 + 2. 그룹웨어 승인/반려 콜백 → GUARDiA SR 상태 자동 갱신 + 3. 결재 현황 조회 + +환경변수: + GROUPWARE_TYPE = kakao|naver|hancom|custom + KAKAOWORK_BOT_TOKEN = ... + NAVER_WORKS_BOT_ID = ... + NAVER_WORKS_TOKEN = ... + HANCOM_WEBHOOK_URL = ... + CUSTOM_APPROVAL_WEBHOOK_URL = ... +""" +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import os +from datetime import datetime +from typing import Any, Optional + +import httpx +from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Request +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import SRRequest, SRStatus, ApprovalFlow, User, UserRole + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/groupware", tags=["groupware"]) + +GROUPWARE_TYPE = os.getenv("GROUPWARE_TYPE", "") +KAKAO_TOKEN = os.getenv("KAKAOWORK_BOT_TOKEN", "") +NAVER_BOT_ID = os.getenv("NAVER_WORKS_BOT_ID", "") +NAVER_TOKEN = os.getenv("NAVER_WORKS_TOKEN", "") +HANCOM_URL = os.getenv("HANCOM_WEBHOOK_URL", "") +CUSTOM_URL = os.getenv("CUSTOM_APPROVAL_WEBHOOK_URL", "") +WEBHOOK_SECRET = os.getenv("GROUPWARE_WEBHOOK_SECRET", "guardia-secret") + +# 결재 요청 이력 (운영 시 DB 테이블로 이전) +_approval_requests: dict[str, dict] = {} + + +class ApprovalSendRequest(BaseModel): + sr_id: str + approver: str # 결재자 사용자명 또는 이메일 + message: Optional[str] = None + platform: Optional[str] = None # None이면 환경변수 GROUPWARE_TYPE 사용 + + +class CallbackRequest(BaseModel): + action: str # approved | rejected + sr_id: str + approver: str + comment: Optional[str] = None + signature: Optional[str] = None # HMAC-SHA256 검증용 + + +# ── 그룹웨어별 메시지 발송 ──────────────────────────────────────────────────── + +async def _send_kakao(sr_id: str, title: str, approver: str, message: str): + """카카오워크 결재 메시지 발송.""" + if not KAKAO_TOKEN: + logger.debug("KAKAOWORK_BOT_TOKEN 미설정") + return False + payload = { + "conversationId": approver, + "message": { + "text": f"[GUARDiA 결재 요청]\n{message}", + "blocks": [ + {"type": "header", "text": f"📋 결재 요청: {sr_id}", "style": "yellow"}, + {"type": "description", "term": "SR", "content": {"type": "text", "text": title}}, + {"type": "button", "text": "승인", "style": "primary", + "action": {"type": "call_modal", "value": f"approve:{sr_id}"}}, + {"type": "button", "text": "반려", "style": "default", + "action": {"type": "call_modal", "value": f"reject:{sr_id}"}}, + ] + } + } + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post( + "https://api.kakaowork.com/v1/messages.send", + headers={"Authorization": f"Bearer {KAKAO_TOKEN}"}, + json=payload, + ) + return r.status_code == 200 + except Exception as e: + logger.warning("카카오워크 발송 실패: %s", e) + return False + + +async def _send_naver_works(sr_id: str, title: str, approver: str, message: str): + """네이버웍스 결재 메시지 발송.""" + if not NAVER_BOT_ID or not NAVER_TOKEN: + return False + payload = { + "content": { + "type": "flex", + "altText": f"[GUARDiA 결재 요청] {sr_id}", + "contents": { + "type": "bubble", + "header": {"type": "box", "layout": "vertical", + "contents": [{"type": "text", "text": f"📋 결재 요청", "weight": "bold"}]}, + "body": {"type": "box", "layout": "vertical", + "contents": [{"type": "text", "text": f"SR: {sr_id}\n{message[:200]}"}]}, + "footer": {"type": "box", "layout": "horizontal", "contents": [ + {"type": "button", "style": "primary", "action": {"type": "message", "label": "승인", "text": f"/approve {sr_id}"}}, + {"type": "button", "style": "secondary", "action": {"type": "message", "label": "반려", "text": f"/reject {sr_id}"}}, + ]}, + } + } + } + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post( + f"https://www.worksapis.com/v1.0/bots/{NAVER_BOT_ID}/users/{approver}/messages", + headers={"Authorization": f"Bearer {NAVER_TOKEN}", "Content-Type": "application/json"}, + json=payload, + ) + return r.status_code in (200, 201) + except Exception as e: + logger.warning("네이버웍스 발송 실패: %s", e) + return False + + +async def _send_hancom(sr_id: str, title: str, approver: str, message: str): + """한컴오피스/그룹웨어 웹훅 발송.""" + if not HANCOM_URL: + return False + payload = { + "event": "approval_request", + "sr_id": sr_id, + "title": title, + "approver": approver, + "message": message, + "callback_url": f"{os.getenv('GUARDIA_BASE_URL','http://localhost:8001')}/api/groupware/callback", + } + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post(HANCOM_URL, json=payload) + return r.status_code in (200, 201, 202) + except Exception as e: + logger.warning("한컴 발송 실패: %s", e) + return False + + +async def _send_custom(sr_id: str, title: str, approver: str, message: str): + """사용자 정의 그룹웨어 웹훅.""" + if not CUSTOM_URL: + return False + payload = { + "type": "approval_request", + "sr_id": sr_id, + "title": title, + "approver": approver, + "message": message, + "timestamp":datetime.utcnow().isoformat(), + "callback_url": f"{os.getenv('GUARDIA_BASE_URL','http://localhost:8001')}/api/groupware/callback", + } + # HMAC 서명 + sig = hmac.new(WEBHOOK_SECRET.encode(), json.dumps(payload, sort_keys=True).encode(), hashlib.sha256).hexdigest() + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post(CUSTOM_URL, json=payload, headers={"X-Signature": sig}) + return r.status_code in (200, 201, 202) + except Exception as e: + logger.warning("커스텀 웹훅 발송 실패: %s", e) + return False + + +async def _dispatch(platform: str, sr_id: str, title: str, approver: str, message: str) -> bool: + """플랫폼에 따라 결재 메시지 발송.""" + p = (platform or GROUPWARE_TYPE or "custom").lower() + if p == "kakao": + return await _send_kakao(sr_id, title, approver, message) + elif p == "naver": + return await _send_naver_works(sr_id, title, approver, message) + elif p == "hancom": + return await _send_hancom(sr_id, title, approver, message) + else: + return await _send_custom(sr_id, title, approver, message) + + +# ── 결재 요청 발송 API ──────────────────────────────────────────────────────── + +@router.post("/send-approval") +async def send_approval( + body: ApprovalSendRequest, + bg: BackgroundTasks, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """SR 승인 요청을 그룹웨어로 발송.""" + sr = (await db.execute(select(SRRequest).where(SRRequest.sr_id == body.sr_id))).scalars().first() + if not sr: + raise HTTPException(404, f"SR {body.sr_id}를 찾을 수 없습니다.") + + platform = body.platform or GROUPWARE_TYPE + message = body.message or ( + f"SR: {sr.sr_id}\n제목: {sr.title}\n요청자: {sr.requested_by}\n" + f"우선순위: {sr.priority}\n\n처리 요청드립니다." + ) + + # 발송 이력 저장 + _approval_requests[sr.sr_id] = { + "sr_id": sr.sr_id, + "approver": body.approver, + "platform": platform, + "sent_at": datetime.utcnow().isoformat(), + "status": "PENDING", + } + + # 백그라운드 발송 + async def _bg_send(): + ok = await _dispatch(platform, sr.sr_id, sr.title, body.approver, message) + _approval_requests[sr.sr_id]["sent"] = ok + logger.info("그룹웨어 결재 발송: sr=%s platform=%s ok=%s", sr.sr_id, platform, ok) + + bg.add_task(_bg_send) + + return { + "message": f"{platform or 'custom'} 그룹웨어로 결재 요청을 발송합니다.", + "sr_id": sr.sr_id, + "approver": body.approver, + "platform": platform or "custom", + } + + +# ── 그룹웨어 콜백 수신 (승인/반려) ─────────────────────────────────────────── + +@router.post("/callback") +async def groupware_callback( + body: CallbackRequest, + db: AsyncSession = Depends(get_db), +): + """그룹웨어에서 승인/반려 콜백 수신 → SR 상태 자동 갱신.""" + if body.action not in ("approved", "rejected"): + raise HTTPException(400, f"action은 approved|rejected 이어야 합니다.") + + sr = (await db.execute(select(SRRequest).where(SRRequest.sr_id == body.sr_id))).scalars().first() + if not sr: + raise HTTPException(404, f"SR {body.sr_id}를 찾을 수 없습니다.") + + # 승인/반려 처리 + from models import ApprovalResult, compute_log_hash, AuditLog + result = ApprovalResult.APPROVED if body.action == "approved" else ApprovalResult.REJECTED + + apv = ApprovalFlow( + sr_id = body.sr_id, + approver = body.approver, + result = result, + comment = f"[그룹웨어 결재] {body.comment or ''}", + decided_at = datetime.now(), + ) + db.add(apv) + + old_status = sr.status + if body.action == "approved": + sr.status = SRStatus.APPROVED + else: + sr.status = SRStatus.REJECTED + sr.updated_at = datetime.now() + + # 감사 로그 + from sqlalchemy import select as sel + last_log = (await db.execute( + sel(AuditLog).where(AuditLog.sr_id == body.sr_id).order_by(AuditLog.id.desc()).limit(1) + )).scalars().first() + prev_hash = last_log.log_hash if last_log else None + ts = datetime.now().isoformat() + db.add(AuditLog( + sr_id = body.sr_id, + actor = f"[그룹웨어]{body.approver}", + action = "SR_APPROVED" if body.action == "approved" else "SR_REJECTED", + detail = f"그룹웨어 결재: {body.action} | {body.comment or ''}", + prev_hash = prev_hash, + log_hash = compute_log_hash(prev_hash, body.approver, body.action, "", ts), + )) + + # 이력 갱신 + if body.sr_id in _approval_requests: + _approval_requests[body.sr_id]["status"] = body.action.upper() + _approval_requests[body.sr_id]["decided_at"] = datetime.utcnow().isoformat() + + await db.commit() + + return { + "message": f"SR {body.sr_id} — {body.action} 처리 완료", + "old_status": old_status, + "new_status": sr.status, + } + + +# ── 결재 현황 조회 ──────────────────────────────────────────────────────────── + +@router.get("/approvals") +async def list_approvals(_u: User = Depends(get_current_user)): + """그룹웨어 결재 발송 이력 조회.""" + return { + "enabled": bool(GROUPWARE_TYPE or KAKAO_TOKEN or NAVER_BOT_ID or HANCOM_URL or CUSTOM_URL), + "platform": GROUPWARE_TYPE or "미설정", + "approvals": list(_approval_requests.values()), + } + + +@router.get("/config") +async def groupware_config(_u: User = Depends(get_current_user)): + """그룹웨어 연동 설정 현황 (민감 정보 제외).""" + return { + "configured_platforms": [ + p for p, flag in [ + ("kakao", bool(KAKAO_TOKEN)), + ("naver", bool(NAVER_BOT_ID and NAVER_TOKEN)), + ("hancom", bool(HANCOM_URL)), + ("custom", bool(CUSTOM_URL)), + ] if flag + ], + "default_platform": GROUPWARE_TYPE or "none", + "callback_url": f"{os.getenv('GUARDIA_BASE_URL','http://localhost:8001')}/api/groupware/callback", + } diff --git a/itsm/routers/infra_ext.py b/itsm/routers/infra_ext.py new file mode 100644 index 00000000..87a4f1f6 --- /dev/null +++ b/itsm/routers/infra_ext.py @@ -0,0 +1,319 @@ +""" +인프라 확장 모듈: Zero Trust + Kubernetes + ERP 예산 + +1. Zero Trust 세션 재검증 (지속 인증) +2. Kubernetes 파드/서비스 모니터링 +3. ERP/예산 시스템 연동 (디지털예산회계/SAP) + +환경변수: + # Zero Trust + ZERO_TRUST_INTERVAL_MIN = 30 (세션 재검증 주기, 분) + + # Kubernetes + K8S_API_URL = https://kubernetes.default.svc + K8S_TOKEN = (ServiceAccount 토큰) + K8S_NAMESPACE= guardia + + # ERP + ERP_TYPE = digital_budget|sap|custom + ERP_BASE_URL = http://erp.agency.go.kr + ERP_API_KEY = ... +""" +from __future__ import annotations + +import logging +import os +from datetime import datetime +from typing import Optional + +import httpx +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import SiProject, User + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/infra", tags=["infra_ext"]) + +# ── Zero Trust 설정 ─────────────────────────────────────────────────────────── +ZT_INTERVAL = int(os.getenv("ZERO_TRUST_INTERVAL_MIN", "30")) +_session_registry: dict[str, dict] = {} # token → {last_verified, user, risk_score} + +# ── Kubernetes 설정 ─────────────────────────────────────────────────────────── +K8S_API_URL = os.getenv("K8S_API_URL", "") +K8S_TOKEN = os.getenv("K8S_TOKEN", "") +K8S_NAMESPACE = os.getenv("K8S_NAMESPACE", "guardia") +K8S_CA_CERT = os.getenv("K8S_CA_CERT", "") + +# ── ERP 설정 ────────────────────────────────────────────────────────────────── +ERP_TYPE = os.getenv("ERP_TYPE", "") +ERP_BASE = os.getenv("ERP_BASE_URL", "") +ERP_API_KEY = os.getenv("ERP_API_KEY", "") + + +# ═══════════════════════════════════════════════════════════ +# 1. ZERO TRUST 지속 인증 +# ═══════════════════════════════════════════════════════════ + +class ZTVerifyRequest(BaseModel): + risk_factors: Optional[list] = [] # 비정상 패턴 목록 + + +@router.post("/zero-trust/verify") +async def zero_trust_verify( + body: ZTVerifyRequest, + cu: User = Depends(get_current_user), +): + """Zero Trust 세션 재검증 — 주기적으로 호출하여 세션 유효성 확인.""" + now = datetime.utcnow() + username = cu.username + + # 위험 점수 계산 (0=정상, 100=최고위험) + risk_score = 0 + if body.risk_factors: + risk_score += len(body.risk_factors) * 15 + if risk_score > 100: + risk_score = 100 + + # 세션 등록/갱신 + _session_registry[username] = { + "last_verified": now.isoformat(), + "risk_score": risk_score, + "ip": "unknown", # 실제 구현 시 Request에서 추출 + } + + # 고위험 세션 → 강제 재인증 요구 + if risk_score >= 70: + raise HTTPException(403, "고위험 세션으로 감지되었습니다. 재인증이 필요합니다.") + + return { + "verified": True, + "risk_score": risk_score, + "next_verify_min":ZT_INTERVAL, + "message": "세션이 검증되었습니다." if risk_score < 30 else "세션이 검증되었습니다. (주의 수준)", + } + + +@router.get("/zero-trust/sessions") +async def zt_sessions(cu: User = Depends(get_current_user)): + """활성 세션 목록 (ADMIN 전용).""" + if cu.role != "ADMIN": + raise HTTPException(403, "ADMIN만 세션 목록을 조회할 수 있습니다.") + return { + "total_sessions": len(_session_registry), + "interval_min": ZT_INTERVAL, + "sessions": [ + {"username": u, **info} + for u, info in _session_registry.items() + ], + } + + +# ═══════════════════════════════════════════════════════════ +# 2. KUBERNETES 모니터링 +# ═══════════════════════════════════════════════════════════ + +def _k8s_headers() -> dict: + h: dict = {"Accept": "application/json"} + if K8S_TOKEN: + h["Authorization"] = f"Bearer {K8S_TOKEN}" + return h + + +async def _k8s_get(path: str) -> Optional[dict]: + if not K8S_API_URL: + return None + try: + async with httpx.AsyncClient(timeout=10.0, verify=False) as c: + r = await c.get(f"{K8S_API_URL}{path}", headers=_k8s_headers()) + return r.json() if r.status_code == 200 else None + except Exception as e: + logger.warning("K8s API 오류: %s", e) + return None + + +@router.get("/k8s/pods") +async def k8s_pods( + namespace: str = K8S_NAMESPACE, + _u: User = Depends(get_current_user), +): + """Kubernetes 파드 목록 및 상태.""" + if not K8S_API_URL: + return {"enabled": False, "message": "K8S_API_URL 미설정 — Kubernetes 연동 비활성화"} + + data = await _k8s_get(f"/api/v1/namespaces/{namespace}/pods") + if not data: + raise HTTPException(503, "Kubernetes API 응답 없음") + + pods = [] + for item in data.get("items", []): + meta = item.get("metadata", {}) + status = item.get("status", {}) + spec = item.get("spec", {}) + + containers = status.get("containerStatuses", []) + ready_cnt = sum(1 for c in containers if c.get("ready", False)) + total_cnt = len(containers) + + pod_phase = status.get("phase", "Unknown") + pod_status = "Running" if pod_phase == "Running" and ready_cnt == total_cnt else pod_phase + + pods.append({ + "name": meta.get("name"), + "namespace": meta.get("namespace"), + "status": pod_status, + "phase": pod_phase, + "ready": f"{ready_cnt}/{total_cnt}", + "restart_count": sum(c.get("restartCount", 0) for c in containers), + "node": spec.get("nodeName"), + "created_at": meta.get("creationTimestamp"), + "labels": meta.get("labels", {}), + }) + + running = sum(1 for p in pods if p["status"] == "Running") + return { + "enabled": True, + "namespace": namespace, + "total": len(pods), + "running": running, + "not_ready": len(pods) - running, + "pods": pods, + } + + +@router.get("/k8s/services") +async def k8s_services( + namespace: str = K8S_NAMESPACE, + _u: User = Depends(get_current_user), +): + """Kubernetes 서비스 목록.""" + if not K8S_API_URL: + return {"enabled": False} + + data = await _k8s_get(f"/api/v1/namespaces/{namespace}/services") + if not data: + raise HTTPException(503, "Kubernetes API 응답 없음") + + services = [ + { + "name": item["metadata"]["name"], + "type": item["spec"].get("type", "ClusterIP"), + "cluster_ip": item["spec"].get("clusterIP"), + "ports": item["spec"].get("ports", []), + "created_at": item["metadata"].get("creationTimestamp"), + } + for item in data.get("items", []) + ] + return {"enabled": True, "namespace": namespace, "total": len(services), "services": services} + + +@router.get("/k8s/nodes") +async def k8s_nodes(_u: User = Depends(get_current_user)): + """Kubernetes 노드 목록 및 리소스 사용량.""" + if not K8S_API_URL: + return {"enabled": False} + + data = await _k8s_get("/api/v1/nodes") + if not data: + raise HTTPException(503, "Kubernetes API 응답 없음") + + nodes = [] + for item in data.get("items", []): + meta = item.get("metadata", {}) + conds = item.get("status", {}).get("conditions", []) + ready = next((c for c in conds if c["type"] == "Ready"), {}) + cap = item.get("status", {}).get("capacity", {}) + + nodes.append({ + "name": meta.get("name"), + "ready": ready.get("status") == "True", + "cpu": cap.get("cpu"), + "memory": cap.get("memory"), + "pods": cap.get("pods"), + "os": item.get("status", {}).get("nodeInfo", {}).get("osImage", ""), + "version": item.get("status", {}).get("nodeInfo", {}).get("kubeletVersion", ""), + }) + + return {"enabled": True, "total": len(nodes), "ready": sum(1 for n in nodes if n["ready"]), "nodes": nodes} + + +# ═══════════════════════════════════════════════════════════ +# 3. ERP 예산 연동 +# ═══════════════════════════════════════════════════════════ + +async def _erp_get(path: str) -> Optional[dict]: + if not ERP_BASE: + return None + try: + headers: dict = {"Accept": "application/json"} + if ERP_API_KEY: + headers["Authorization"] = f"Bearer {ERP_API_KEY}" + async with httpx.AsyncClient(timeout=15.0) as c: + r = await c.get(f"{ERP_BASE}{path}", headers=headers) + return r.json() if r.status_code == 200 else None + except Exception as e: + logger.warning("ERP API 오류: %s", e) + return None + + +@router.get("/erp/budget/{project_code}") +async def erp_budget( + project_code: str, + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """ERP 예산 데이터 조회 및 프로젝트와 동기화.""" + if not ERP_BASE: + return { + "enabled": False, + "message": "ERP_BASE_URL 미설정", + "fallback": "GUARDiA 내부 예산 데이터 사용 중", + } + + # ERP에서 예산 데이터 조회 + erp_data = await _erp_get(f"/api/budget/{project_code}") + + # GUARDiA 프로젝트 데이터 + from sqlalchemy import select as sel + proj = (await db.execute( + sel(SiProject).where(SiProject.project_code == project_code) + )).scalars().first() + + if not proj: + raise HTTPException(404, f"프로젝트 {project_code}를 찾을 수 없습니다.") + + if erp_data: + # ERP 데이터로 프로젝트 예산 갱신 + erp_total = erp_data.get("budget_total", 0) + erp_used = erp_data.get("budget_used", 0) + if erp_total and erp_total != proj.budget_total: + proj.budget_total = erp_total + proj.budget_used = erp_used + await db.commit() + + return { + "enabled": True, + "erp_type": ERP_TYPE, + "project_code": project_code, + "guardia_total": proj.budget_total, + "guardia_used": proj.budget_used, + "erp_data": erp_data, + "synced": bool(erp_data), + "sync_time": datetime.utcnow().isoformat(), + } + + +@router.get("/erp/status") +async def erp_status(_u: User = Depends(get_current_user)): + """ERP 연동 설정 현황.""" + return { + "enabled": bool(ERP_BASE), + "erp_type": ERP_TYPE or "미설정", + "base_url": ERP_BASE[:40] + "..." if ERP_BASE else "", + "endpoints": { + "budget": "/api/infra/erp/budget/{project_code}", + } + } diff --git a/itsm/routers/portfolio.py b/itsm/routers/portfolio.py new file mode 100644 index 00000000..a8c44c27 --- /dev/null +++ b/itsm/routers/portfolio.py @@ -0,0 +1,282 @@ +""" +포트폴리오 관리 + 리소스/인력 관리 API + +포트폴리오: + - 여러 SI 프로젝트 통합 현황 대시보드 + - KPI 집계 (전체 진척률 / 총 예산 / 위험 현황) + +리소스: + - 인원 배치 (역할·WBS·기간) + - 역량 매핑 (기술스택·경험) + - 인력 투입 현황 (월별 M/M) + +엔드포인트: + GET /api/portfolio/dashboard — 전체 프로젝트 포트폴리오 + GET /api/portfolio/kpi — 집계 KPI + GET /api/portfolio/projects/{id}/resources — 프로젝트 인원 배치 + POST /api/portfolio/projects/{id}/resources — 인원 배치 등록 + GET /api/portfolio/resources/availability — 가용 인력 조회 + GET /api/portfolio/resources/{user}/skills — 역량 정보 + POST /api/portfolio/resources/{user}/skills — 역량 등록 +""" +from __future__ import annotations + +import logging +from datetime import date, datetime +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import ( + SiProject, WbsItem, ProjectIssue, ProjectRisk, + User, UserRole, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/portfolio", tags=["portfolio"]) + +# 인메모리 저장소 (운영 시 DB 테이블로 이전) +_resources: dict[int, list] = {} # project_id → [{user, role, wbs_code, mm, ...}] +_skills: dict[str, list] = {} # username → [{skill, level, years}] + + +# ── 포트폴리오 대시보드 ─────────────────────────────────────────────────────── + +@router.get("/dashboard") +async def portfolio_dashboard( + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """전체 활성 SI 프로젝트 포트폴리오 현황.""" + projects = (await db.execute( + select(SiProject).where(SiProject.is_active == True) + .order_by(SiProject.planned_start.desc()) + )).scalars().all() + + result = [] + for proj in projects: + # WBS 완료율 + wbs = (await db.execute( + select(WbsItem).where(WbsItem.project_id == proj.id, WbsItem.is_leaf == True) + )).scalars().all() + progress = round(sum(w.completion_pct for w in wbs) / len(wbs), 1) if wbs else 0 + + # 미결 이슈 + open_issues = (await db.execute( + select(func.count(ProjectIssue.id)).where( + ProjectIssue.project_id == proj.id, + ProjectIssue.status.notin_(["RESOLVED", "CLOSED"]) + ) + )).scalar() or 0 + + # 고위험 + high_risks = (await db.execute( + select(func.count(ProjectRisk.id)).where( + ProjectRisk.project_id == proj.id, + ProjectRisk.risk_level.in_(["HIGH", "CRITICAL"]) + ) + )).scalar() or 0 + + # 일정 지연 여부 + today = date.today() + is_delayed = bool( + proj.planned_end and proj.planned_end < today and progress < 100 + ) + + result.append({ + "project_id": proj.id, + "project_code": proj.project_code, + "project_name": proj.project_name, + "phase": proj.phase, + "health": proj.health_status, + "progress": progress, + "budget_used_pct": round(proj.budget_used / proj.budget_total * 100, 1) + if proj.budget_total else 0, + "open_issues": open_issues, + "high_risks": high_risks, + "is_delayed": is_delayed, + "planned_end": str(proj.planned_end) if proj.planned_end else None, + "pm": proj.pm_user, + "resources": len(_resources.get(proj.id, [])), + }) + + return { + "total_projects": len(result), + "active": sum(1 for p in result if p["phase"] not in ("CLOSED",)), + "delayed": sum(1 for p in result if p["is_delayed"]), + "at_risk": sum(1 for p in result if p["high_risks"] > 0), + "projects": result, + } + + +@router.get("/kpi") +async def portfolio_kpi( + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """포트폴리오 집계 KPI.""" + projects = (await db.execute( + select(SiProject).where(SiProject.is_active == True) + )).scalars().all() + + total_budget = sum((p.budget_total or 0) for p in projects) + total_used = sum((p.budget_used or 0) for p in projects) + + wbs_all = (await db.execute( + select(WbsItem).where(WbsItem.is_leaf == True) + )).scalars().all() + avg_progress = round( + sum(w.completion_pct for w in wbs_all) / len(wbs_all), 1 + ) if wbs_all else 0 + + open_issues = (await db.execute( + select(func.count(ProjectIssue.id)).where( + ProjectIssue.status.notin_(["RESOLVED", "CLOSED"]) + ) + )).scalar() or 0 + + return { + "total_projects": len(projects), + "avg_progress": avg_progress, + "total_budget_man": total_budget, + "total_used_man": total_used, + "budget_rate": round(total_used / total_budget * 100, 1) if total_budget else 0, + "open_issues": open_issues, + "by_phase": { + phase: sum(1 for p in projects if p.phase == phase) + for phase in ["INITIATION", "ANALYSIS", "DESIGN", "IMPLEMENTATION", "DELIVERY", "CLOSED"] + }, + "by_health": { + h: sum(1 for p in projects if p.health_status == h) + for h in ["GREEN", "YELLOW", "RED"] + }, + } + + +# ── 인원 배치 ───────────────────────────────────────────────────────────────── + +class ResourceCreate(BaseModel): + username: str + role: str = "DEVELOPER" # PM | ANALYST | DESIGNER | DEVELOPER | TESTER | DBA + wbs_codes: List[str] = [] # 담당 WBS 코드 목록 + start_date: date + end_date: date + mm: float = 1.0 # M/M (Man-Month) 투입율 + + +@router.get("/projects/{pid}/resources") +async def list_project_resources( + pid: int, + _u: User = Depends(get_current_user), +): + """프로젝트 인원 배치 현황.""" + return { + "project_id": pid, + "resources": _resources.get(pid, []), + "total_mm": sum(r["mm"] for r in _resources.get(pid, [])), + } + + +@router.post("/projects/{pid}/resources", status_code=201) +async def add_project_resource( + pid: int, + body: ResourceCreate, + db: AsyncSession = Depends(get_db), + cu: User = Depends(get_current_user), +): + """인원 배치 등록.""" + proj = await db.get(SiProject, pid) + if not proj: + raise HTTPException(404, f"프로젝트 {pid}를 찾을 수 없습니다.") + if cu.role not in (UserRole.ADMIN, UserRole.PM): + raise HTTPException(403, "PM/ADMIN만 인원을 배치할 수 있습니다.") + + entry = { + "username": body.username, + "role": body.role, + "wbs_codes": body.wbs_codes, + "start_date": str(body.start_date), + "end_date": str(body.end_date), + "mm": body.mm, + "added_by": cu.username, + "added_at": datetime.utcnow().isoformat(), + } + if pid not in _resources: + _resources[pid] = [] + _resources[pid].append(entry) + + return {"message": f"{body.username} 인원 배치 완료", "entry": entry} + + +@router.get("/resources/availability") +async def resource_availability( + start: Optional[str] = None, + end: Optional[str] = None, + _u: User = Depends(get_current_user), +): + """기간별 가용 인력 조회.""" + # 배치된 인력 집계 + allocated: dict[str, float] = {} + for pid, entries in _resources.items(): + for e in entries: + user = e["username"] + allocated[user] = allocated.get(user, 0) + e["mm"] + + return { + "allocated_users": [ + {"username": u, "total_mm": mm, "available_mm": max(0, 1.0 - mm)} + for u, mm in allocated.items() + ], + "note": "M/M 기반 가용성 — 1.0 = 전일 투입", + } + + +# ── 역량 관리 ───────────────────────────────────────────────────────────────── + +class SkillEntry(BaseModel): + skill: str + category: str = "TECH" # TECH | PM | DOMAIN | TOOL + level: str = "중급" # 초급 | 중급 | 고급 | 전문가 + years: int = 0 + certifications: List[str] = [] + + +@router.get("/resources/{username}/skills") +async def get_skills( + username: str, + _u: User = Depends(get_current_user), +): + """사용자 역량 정보 조회.""" + return { + "username": username, + "skills": _skills.get(username, []), + } + + +@router.post("/resources/{username}/skills", status_code=201) +async def add_skill( + username: str, + body: SkillEntry, + cu: User = Depends(get_current_user), +): + """역량 등록 (본인 또는 PM/ADMIN).""" + if cu.username != username and cu.role not in (UserRole.ADMIN, UserRole.PM): + raise HTTPException(403, "본인 또는 PM/ADMIN만 역량을 등록할 수 있습니다.") + + entry = {**body.model_dump(), "updated_at": datetime.utcnow().isoformat()} + if username not in _skills: + _skills[username] = [] + + # 동일 스킬 업데이트 + existing = next((i for i, s in enumerate(_skills[username]) if s["skill"] == body.skill), None) + if existing is not None: + _skills[username][existing] = entry + else: + _skills[username].append(entry) + + return {"message": f"{username} 역량 '{body.skill}' 등록 완료"} diff --git a/itsm/routers/siem.py b/itsm/routers/siem.py new file mode 100644 index 00000000..156e5efa --- /dev/null +++ b/itsm/routers/siem.py @@ -0,0 +1,300 @@ +""" +SIEM 연동 API (ELK/Splunk/OpenSearch) + +기능: + 1. GUARDiA 보안 이벤트 → SIEM 실시간 전송 + 2. SIEM 경보 → GUARDiA 인시던트 자동 생성 (역방향) + 3. 이벤트 조회 / 통계 + +환경변수: + SIEM_TYPE = elastic|splunk|opensearch|custom + ELASTIC_URL = http://elasticsearch:9200 + ELASTIC_INDEX = guardia-events + ELASTIC_API_KEY = ... + SPLUNK_HEC_URL = http://splunk:8088/services/collector + SPLUNK_HEC_TOKEN = ... + OPENSEARCH_URL = http://opensearch:9200 +""" +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime +from typing import Any, Optional + +import httpx +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import User + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/siem", tags=["siem"]) + +SIEM_TYPE = os.getenv("SIEM_TYPE", "") +ELASTIC_URL = os.getenv("ELASTIC_URL", "") +ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "guardia-events") +ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY", "") +SPLUNK_HEC_URL = os.getenv("SPLUNK_HEC_URL", "") +SPLUNK_HEC_TOKEN= os.getenv("SPLUNK_HEC_TOKEN", "") +OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "") + +# 이벤트 버퍼 (운영 시 Redis Queue로 전환) +_event_buffer: list[dict] = [] +MAX_BUFFER = 1000 + + +# ── 이벤트 스키마 ──────────────────────────────────────────────────────────── + +class SecurityEvent(BaseModel): + event_type: str # LOGIN_FAIL | PRIVILEGE_ESCALATION | VULN_DETECTED | etc + severity: str = "INFO" # INFO | LOW | MEDIUM | HIGH | CRITICAL + source: str = "GUARDiA" + user: Optional[str] = None + resource: Optional[str] = None + action: Optional[str] = None + description: Optional[str] = None + metadata: Optional[dict] = None + + +class SIEMAlertRequest(BaseModel): + """SIEM에서 역방향으로 보내는 경보.""" + alert_id: str + rule_name: str + severity: str + description: str + source_ip: Optional[str] = None + affected: Optional[str] = None + metadata: Optional[dict] = None + + +# ── SIEM 별 전송 ───────────────────────────────────────────────────────────── + +async def _send_elastic(events: list[dict]) -> bool: + if not ELASTIC_URL: + return False + bulk_body = "" + for ev in events: + bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n" + bulk_body += json.dumps(ev) + "\n" + try: + headers: dict = {"Content-Type": "application/x-ndjson"} + if ELASTIC_API_KEY: + headers["Authorization"] = f"ApiKey {ELASTIC_API_KEY}" + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post(f"{ELASTIC_URL}/_bulk", content=bulk_body, headers=headers) + return r.status_code in (200, 201) + except Exception as e: + logger.warning("Elasticsearch 전송 실패: %s", e) + return False + + +async def _send_splunk(events: list[dict]) -> bool: + if not SPLUNK_HEC_URL or not SPLUNK_HEC_TOKEN: + return False + payload = "\n".join(json.dumps({"event": ev, "sourcetype": "guardia"}) for ev in events) + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post( + SPLUNK_HEC_URL, + content=payload, + headers={"Authorization": f"Splunk {SPLUNK_HEC_TOKEN}", "Content-Type": "application/json"}, + ) + return r.status_code == 200 + except Exception as e: + logger.warning("Splunk HEC 전송 실패: %s", e) + return False + + +async def _send_opensearch(events: list[dict]) -> bool: + if not OPENSEARCH_URL: + return False + bulk_body = "" + for ev in events: + bulk_body += json.dumps({"index": {"_index": ELASTIC_INDEX}}) + "\n" + bulk_body += json.dumps(ev) + "\n" + try: + async with httpx.AsyncClient(timeout=10.0) as c: + r = await c.post( + f"{OPENSEARCH_URL}/_bulk", + content=bulk_body, + headers={"Content-Type": "application/x-ndjson"}, + ) + return r.status_code in (200, 201) + except Exception as e: + logger.warning("OpenSearch 전송 실패: %s", e) + return False + + +async def send_to_siem(events: list[dict]) -> bool: + """SIEM 유형에 따라 이벤트 전송.""" + t = SIEM_TYPE.lower() + if t == "elastic": + return await _send_elastic(events) + elif t == "splunk": + return await _send_splunk(events) + elif t == "opensearch": + return await _send_opensearch(events) + elif ELASTIC_URL: + return await _send_elastic(events) + elif SPLUNK_HEC_URL: + return await _send_splunk(events) + elif OPENSEARCH_URL: + return await _send_opensearch(events) + else: + logger.debug("SIEM 미설정 — 이벤트 버퍼에만 저장") + return False + + +def _build_event(ev: SecurityEvent, actor: str = "system") -> dict: + return { + "@timestamp": datetime.utcnow().isoformat() + "Z", + "event_type": ev.event_type, + "severity": ev.severity, + "source": ev.source, + "user": ev.user or actor, + "resource": ev.resource, + "action": ev.action, + "description": ev.description, + "metadata": ev.metadata or {}, + "tags": ["guardia", "itsm"], + } + + +# ── 이벤트 발송 API ─────────────────────────────────────────────────────────── + +@router.post("/events") +async def push_event( + body: SecurityEvent, + bg: BackgroundTasks, + _u: User = Depends(get_current_user), +): + """보안 이벤트를 SIEM으로 전송.""" + ev = _build_event(body, _u.username) + + # 버퍼에 저장 + _event_buffer.append(ev) + if len(_event_buffer) > MAX_BUFFER: + _event_buffer.pop(0) + + # SIEM 전송 (백그라운드) + bg.add_task(send_to_siem, [ev]) + + return { + "message": "이벤트가 전송되었습니다.", + "event_type": ev["event_type"], + "severity": ev["severity"], + } + + +@router.post("/events/batch") +async def push_events_batch( + events: list[SecurityEvent], + bg: BackgroundTasks, + _u: User = Depends(get_current_user), +): + """여러 이벤트를 일괄 전송.""" + if len(events) > 100: + raise HTTPException(400, "한 번에 최대 100개까지 전송 가능합니다.") + + evs = [_build_event(e, _u.username) for e in events] + _event_buffer.extend(evs[-MAX_BUFFER:]) + if len(_event_buffer) > MAX_BUFFER: + _event_buffer[:] = _event_buffer[-MAX_BUFFER:] + + bg.add_task(send_to_siem, evs) + return {"message": f"{len(evs)}개 이벤트 전송", "count": len(evs)} + + +# ── SIEM 역방향 경보 수신 ──────────────────────────────────────────────────── + +@router.post("/alert/receive") +async def receive_siem_alert( + body: SIEMAlertRequest, + db: AsyncSession = Depends(get_db), +): + """SIEM 경보 수신 → GUARDiA 인시던트 자동 생성 (ADMIN 인증 불필요 — webhook).""" + from models import Incident, IncidentGrade, IncidentStatus + from uuid import uuid4 + + grade_map = {"CRITICAL": IncidentGrade.P1, "HIGH": IncidentGrade.P2, + "MEDIUM": IncidentGrade.P3, "LOW": IncidentGrade.P4} + grade = grade_map.get(body.severity.upper(), IncidentGrade.P3) + + incident = Incident( + incident_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}", + title = f"[SIEM] {body.rule_name}", + description = ( + f"SIEM 경보 자동 수신\n" + f"규칙: {body.rule_name}\n" + f"설명: {body.description}\n" + f"소스 IP: {body.source_ip or '미상'}\n" + f"영향 자산: {body.affected or '미상'}" + ), + grade = grade, + status = IncidentStatus.OPEN, + occurred_at = datetime.now(), + affected_service = body.affected, + reported_by = f"SIEM:{body.alert_id}", + ) + db.add(incident) + await db.commit() + + # P1/P2 즉시 알림 + if grade in (IncidentGrade.P1, IncidentGrade.P2): + try: + from core.notify import send_messenger + import os as _os + await send_messenger( + _os.getenv("MESSENGER_OPS_ROOM", "ops"), + {"type": "text", "text": f"🚨 SIEM 경보 [{grade}]\n{body.rule_name}\n{body.description[:200]}"} + ) + except Exception: + pass + + return { + "message": f"인시던트 {incident.incident_id} 자동 생성", + "incident_id": incident.incident_id, + "grade": grade, + } + + +# ── 이벤트 조회 / 통계 ─────────────────────────────────────────────────────── + +@router.get("/events") +async def list_events( + limit: int = 50, + severity: Optional[str] = None, + _u: User = Depends(get_current_user), +): + """최근 보안 이벤트 목록 (버퍼에서 조회).""" + events = _event_buffer[-limit:][::-1] + if severity: + events = [e for e in events if e.get("severity", "").upper() == severity.upper()] + return {"total": len(_event_buffer), "events": events[:limit]} + + +@router.get("/stats") +async def siem_stats(_u: User = Depends(get_current_user)): + """SIEM 연동 현황 통계.""" + by_sev: dict = {} + by_type: dict = {} + for ev in _event_buffer: + sev = ev.get("severity", "INFO") + etype= ev.get("event_type", "UNKNOWN") + by_sev[sev] = by_sev.get(sev, 0) + 1 + by_type[etype] = by_type.get(etype, 0) + 1 + + return { + "siem_type": SIEM_TYPE or "미설정", + "elastic_url": ELASTIC_URL[:30] + "..." if ELASTIC_URL else "", + "splunk_url": SPLUNK_HEC_URL[:30] + "..." if SPLUNK_HEC_URL else "", + "total_events": len(_event_buffer), + "by_severity": by_sev, + "by_type": by_type, + "configured": bool(ELASTIC_URL or SPLUNK_HEC_URL or OPENSEARCH_URL), + } diff --git a/itsm/routers/topology.py b/itsm/routers/topology.py new file mode 100644 index 00000000..5158b957 --- /dev/null +++ b/itsm/routers/topology.py @@ -0,0 +1,317 @@ +""" +네트워크 토폴로지 시각화 API + +CMDB CI 의존관계를 D3.js force-directed graph 형식으로 반환. +프론트엔드에서 /api/topology/graph 데이터를 받아 D3.js로 렌더링. + +엔드포인트: + GET /api/topology/graph — 전체 CI 의존관계 그래프 (nodes/links) + GET /api/topology/graph/{ci_id} — 특정 CI 중심 서브그래프 + GET /api/topology/health — 서버별 헬스 오버레이 데이터 + GET /api/topology/page — D3.js 토폴로지 뷰어 HTML +""" +from __future__ import annotations + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import HTMLResponse +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from database import get_db +from models import ConfigItem, CIRelation, Server, User + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/topology", tags=["topology"]) + +# 노드 타입별 색상 +NODE_COLORS = { + "SERVER": "#60a5fa", "WAS": "#34d399", "DB": "#f59e0b", + "NETWORK": "#a78bfa", "STORAGE": "#fb923c","LOAD_BALANCER": "#f472b6", + "FIREWALL": "#f87171","CDN": "#6ee7b7", "DEFAULT": "#94a3b8", +} + + +async def _build_graph(db: AsyncSession, root_ci_id: Optional[int] = None, + max_depth: int = 3) -> dict: + """CMDB CI 관계에서 그래프 데이터 생성.""" + # 전체 CI 조회 + if root_ci_id: + # BFS로 root에서 max_depth 깊이까지 + visited = set() + queue = [root_ci_id] + ci_ids = set() + depth = {root_ci_id: 0} + + while queue: + current = queue.pop(0) + if current in visited: + continue + visited.add(current) + ci_ids.add(current) + + if depth.get(current, 0) >= max_depth: + continue + + rels = (await db.execute( + select(CIRelation).where( + (CIRelation.from_ci_id == current) | (CIRelation.to_ci_id == current) + ) + )).scalars().all() + + for rel in rels: + nxt = rel.to_ci_id if rel.from_ci_id == current else rel.from_ci_id + if nxt not in visited: + queue.append(nxt) + depth[nxt] = depth.get(current, 0) + 1 + + cis = (await db.execute(select(ConfigItem).where(ConfigItem.id.in_(ci_ids)))).scalars().all() + rels = (await db.execute( + select(CIRelation).where( + CIRelation.from_ci_id.in_(ci_ids), CIRelation.to_ci_id.in_(ci_ids) + ) + )).scalars().all() + else: + cis = (await db.execute(select(ConfigItem).limit(200))).scalars().all() + rels = (await db.execute(select(CIRelation).limit(500))).scalars().all() + + # 서버 헬스 데이터 + server_status: dict = {} + if cis: + servers = (await db.execute(select(Server))).scalars().all() + for s in servers: + server_status[s.server_name] = s.status if hasattr(s, "status") else "ACTIVE" + + # nodes + nodes = [] + for ci in cis: + ci_type = (ci.ci_type or "DEFAULT").upper() + color = NODE_COLORS.get(ci_type, NODE_COLORS["DEFAULT"]) + srv_stat = server_status.get(ci.name, "UNKNOWN") + nodes.append({ + "id": ci.id, + "name": ci.name, + "type": ci_type, + "category": ci.category or "", + "status": ci.status or "ACTIVE", + "color": color, + "health": srv_stat, + "owner": ci.owner or "", + "is_root": ci.id == root_ci_id, + }) + + # links + rel_type_labels = { + "DEPENDS_ON": "의존", + "RUNS_ON": "실행", + "CONNECTS_TO": "연결", + "BACKS_UP": "백업", + "MONITORS": "모니터", + } + links = [ + { + "source": r.from_ci_id, + "target": r.to_ci_id, + "type": r.relation_type if hasattr(r, "relation_type") else "CONNECTS_TO", + "label": rel_type_labels.get( + r.relation_type if hasattr(r, "relation_type") else "", "연결" + ), + } + for r in rels + ] + + return { + "nodes": nodes, + "links": links, + "node_count": len(nodes), + "link_count": len(links), + "root_ci_id": root_ci_id, + } + + +@router.get("/graph") +async def full_graph( + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """전체 CI 의존관계 그래프.""" + return await _build_graph(db) + + +@router.get("/graph/{ci_id}") +async def subgraph( + ci_id: int, + depth: int = Query(2, ge=1, le=4), + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """특정 CI 중심 서브그래프.""" + ci = await db.get(ConfigItem, ci_id) + if not ci: + raise HTTPException(404, f"CI ID {ci_id}를 찾을 수 없습니다.") + return await _build_graph(db, root_ci_id=ci_id, max_depth=depth) + + +@router.get("/health") +async def topology_health( + db: AsyncSession = Depends(get_db), + _u: User = Depends(get_current_user), +): + """서버 헬스 오버레이 데이터 (실시간).""" + servers = (await db.execute(select(Server))).scalars().all() + return { + "servers": [ + { + "name": s.server_name, + "status": getattr(s, "status", "UNKNOWN"), + "os": s.os_type if hasattr(s, "os_type") else "", + "inst": s.inst_id, + } + for s in servers + ] + } + + +@router.get("/page", response_class=HTMLResponse) +async def topology_page(_u: User = Depends(get_current_user)): + """D3.js 인터랙티브 토폴로지 뷰어.""" + html = """ + +
+ +