guardia-itsm/routers/n2sf.py
2026-06-03 08:04:03 +09:00

144 lines
6.5 KiB
Python

"""N²SF — 국가 망 보안체계 준수 점검 (국정원 2026 의무)"""
from __future__ import annotations
import json, logging
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import User, N2SFAssessment, N2SFZone
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/n2sf", tags=["N²SF"])
# N²SF 3단계 보안 구역 정의
ZONES = {
"A": {"label": "고위험 (개인정보·국가기밀)", "color": "#EF4444",
"requirements": ["완전 망 분리", "AES-256 암호화", "접근 로그 100% 기록",
"2FA 필수", "USB 차단", "화면보호기 5분 이내"]},
"B": {"label": "중위험 (일반 업무 데이터)", "color": "#F59E0B",
"requirements": ["논리적 망 분리", "TLS 1.2 이상", "접근 로그 기록",
"MFA 권고", "바이러스 백신 필수"]},
"C": {"label": "저위험 (공개 정보)", "color": "#10B981",
"requirements": ["기본 방화벽", "HTTPS 적용", "정기 보안 패치"]},
}
# 평가 항목 50개 (축약)
CHECKLIST_ITEMS = [
{"id": "N001", "zone": "A", "category": "망 분리", "item": "개인정보 처리 시스템 물리적 망 분리"},
{"id": "N002", "zone": "A", "category": "접근 통제", "item": "특권 계정 2FA 인증 적용"},
{"id": "N003", "zone": "A", "category": "암호화", "item": "저장 데이터 AES-256 이상 암호화"},
{"id": "N004", "zone": "A", "category": "감사 로그", "item": "접근 로그 1년 이상 보존"},
{"id": "N005", "zone": "A", "category": "미디어 통제", "item": "USB 등 이동식 미디어 차단"},
{"id": "N006", "zone": "B", "category": "망 분리", "item": "업무 시스템 논리적 네트워크 분리"},
{"id": "N007", "zone": "B", "category": "암호화", "item": "전송 데이터 TLS 1.2 이상"},
{"id": "N008", "zone": "B", "category": "접근 통제", "item": "최소 권한 원칙 적용"},
{"id": "N009", "zone": "B", "category": "보안 패치", "item": "OS·미들웨어 정기 패치 (30일 이내)"},
{"id": "N010", "zone": "B", "category": "바이러스 백신", "item": "모든 단말 백신 설치 및 최신화"},
{"id": "N011", "zone": "C", "category": "방화벽", "item": "불필요 포트 차단"},
{"id": "N012", "zone": "C", "category": "웹 보안", "item": "HTTPS 적용 및 인증서 유효"},
{"id": "N013", "zone": "C", "category": "취약점", "item": "정기 취약점 점검 (분기 1회)"},
]
class ZoneClassify(BaseModel):
system_name: str
handles_personal_info: bool = False
handles_classified: bool = False
internet_facing: bool = False
class AssessmentCreate(BaseModel):
system_name: str
zone: str # A|B|C
checked_items: list = [] # ["N001", "N002", ...]
notes: str = ""
@router.get("/policies")
async def list_policies(user: User = Depends(get_current_user)):
return [{"zone": z, **info} for z, info in ZONES.items()]
@router.post("/classify")
async def classify_zone(body: ZoneClassify, user: User = Depends(get_current_user)):
"""시스템 특성에 따른 N²SF 구역 자동 분류."""
if body.handles_classified or body.handles_personal_info:
zone = "A"
elif not body.internet_facing:
zone = "B"
else:
zone = "C"
return {
"system": body.system_name,
"recommended_zone": zone,
"zone_label": ZONES[zone]["label"],
"requirements": ZONES[zone]["requirements"],
}
@router.get("/checklist")
async def get_checklist(zone: Optional[str] = None, user: User = Depends(get_current_user)):
items = CHECKLIST_ITEMS
if zone:
items = [i for i in items if i["zone"] == zone.upper()]
return {"total": len(items), "items": items}
@router.post("/assess")
async def create_assessment(body: AssessmentCreate, db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user)):
zone_items = [i for i in CHECKLIST_ITEMS if i["zone"] == body.zone]
total = len(zone_items)
passed = len([i for i in zone_items if i["id"] in body.checked_items])
score = round(passed / total * 100) if total > 0 else 0
assessment = N2SFAssessment(
system_name=body.system_name, zone=body.zone,
total_items=total, passed_items=passed, score=score,
checked_items=json.dumps(body.checked_items),
notes=body.notes, assessed_by=user.id, created_at=datetime.utcnow()
)
db.add(assessment); await db.commit(); await db.refresh(assessment)
return {"assessment_id": assessment.id, "score": score, "passed": passed, "total": total,
"grade": "적합" if score >= 80 else "보완 필요" if score >= 60 else "부적합"}
@router.get("/report")
async def get_report(limit: int = 10, db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user)):
rows = await db.execute(
select(N2SFAssessment).order_by(desc(N2SFAssessment.created_at)).limit(limit)
)
assessments = rows.scalars().all()
return [{"id": a.id, "system_name": a.system_name, "zone": a.zone,
"score": a.score, "passed": a.passed_items, "total": a.total_items,
"grade": "적합" if a.score >= 80 else "보완 필요" if a.score >= 60 else "부적합",
"created_at": a.created_at}
for a in assessments]
@router.get("/zones")
async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
rows = await db.execute(select(N2SFZone).order_by(N2SFZone.zone))
zones = rows.scalars().all()
if not zones:
return [{"zone": z, "label": info["label"], "system_count": 0} for z, info in ZONES.items()]
return [{"zone": z.zone, "label": ZONES.get(z.zone, {}).get("label", ""), "system_count": 0}
for z in zones]
@router.get("/violations")
async def list_violations(db: AsyncSession = Depends(get_db),
user: User = Depends(require_admin_role)):
rows = await db.execute(
select(N2SFAssessment).where(N2SFAssessment.score < 60).order_by(desc(N2SFAssessment.created_at))
)
violations = rows.scalars().all()
return [{"system": v.system_name, "zone": v.zone, "score": v.score,
"created_at": v.created_at} for v in violations]