"""N²SF — 국가 망 보안체계 준수 점검 (국정원 2026 의무)""" from __future__ import annotations import json, logging from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, N2SFAssessment, N2SFZone logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/n2sf", tags=["N²SF"]) # N²SF 3단계 보안 구역 정의 ZONES = { "A": {"label": "고위험 (개인정보·국가기밀)", "color": "#EF4444", "requirements": ["완전 망 분리", "AES-256 암호화", "접근 로그 100% 기록", "2FA 필수", "USB 차단", "화면보호기 5분 이내"]}, "B": {"label": "중위험 (일반 업무 데이터)", "color": "#F59E0B", "requirements": ["논리적 망 분리", "TLS 1.2 이상", "접근 로그 기록", "MFA 권고", "바이러스 백신 필수"]}, "C": {"label": "저위험 (공개 정보)", "color": "#10B981", "requirements": ["기본 방화벽", "HTTPS 적용", "정기 보안 패치"]}, } # 평가 항목 50개 (축약) CHECKLIST_ITEMS = [ {"id": "N001", "zone": "A", "category": "망 분리", "item": "개인정보 처리 시스템 물리적 망 분리"}, {"id": "N002", "zone": "A", "category": "접근 통제", "item": "특권 계정 2FA 인증 적용"}, {"id": "N003", "zone": "A", "category": "암호화", "item": "저장 데이터 AES-256 이상 암호화"}, {"id": "N004", "zone": "A", "category": "감사 로그", "item": "접근 로그 1년 이상 보존"}, {"id": "N005", "zone": "A", "category": "미디어 통제", "item": "USB 등 이동식 미디어 차단"}, {"id": "N006", "zone": "B", "category": "망 분리", "item": "업무 시스템 논리적 네트워크 분리"}, {"id": "N007", "zone": "B", "category": "암호화", "item": "전송 데이터 TLS 1.2 이상"}, {"id": "N008", "zone": "B", "category": "접근 통제", "item": "최소 권한 원칙 적용"}, {"id": "N009", "zone": "B", "category": "보안 패치", "item": "OS·미들웨어 정기 패치 (30일 이내)"}, {"id": "N010", "zone": "B", "category": "바이러스 백신", "item": "모든 단말 백신 설치 및 최신화"}, {"id": "N011", "zone": "C", "category": "방화벽", "item": "불필요 포트 차단"}, {"id": "N012", "zone": "C", "category": "웹 보안", "item": "HTTPS 적용 및 인증서 유효"}, {"id": "N013", "zone": "C", "category": "취약점", "item": "정기 취약점 점검 (분기 1회)"}, ] class ZoneClassify(BaseModel): system_name: str handles_personal_info: bool = False handles_classified: bool = False internet_facing: bool = False class AssessmentCreate(BaseModel): system_name: str zone: str # A|B|C checked_items: list = [] # ["N001", "N002", ...] notes: str = "" @router.get("/policies") async def list_policies(user: User = Depends(get_current_user)): return [{"zone": z, **info} for z, info in ZONES.items()] @router.post("/classify") async def classify_zone(body: ZoneClassify, user: User = Depends(get_current_user)): """시스템 특성에 따른 N²SF 구역 자동 분류.""" if body.handles_classified or body.handles_personal_info: zone = "A" elif not body.internet_facing: zone = "B" else: zone = "C" return { "system": body.system_name, "recommended_zone": zone, "zone_label": ZONES[zone]["label"], "requirements": ZONES[zone]["requirements"], } @router.get("/checklist") async def get_checklist(zone: Optional[str] = None, user: User = Depends(get_current_user)): items = CHECKLIST_ITEMS if zone: items = [i for i in items if i["zone"] == zone.upper()] return {"total": len(items), "items": items} @router.post("/assess") async def create_assessment(body: AssessmentCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): zone_items = [i for i in CHECKLIST_ITEMS if i["zone"] == body.zone] total = len(zone_items) passed = len([i for i in zone_items if i["id"] in body.checked_items]) score = round(passed / total * 100) if total > 0 else 0 assessment = N2SFAssessment( system_name=body.system_name, zone=body.zone, total_items=total, passed_items=passed, score=score, checked_items=json.dumps(body.checked_items), notes=body.notes, assessed_by=user.id, created_at=datetime.utcnow() ) db.add(assessment); await db.commit(); await db.refresh(assessment) return {"assessment_id": assessment.id, "score": score, "passed": passed, "total": total, "grade": "적합" if score >= 80 else "보완 필요" if score >= 60 else "부적합"} @router.get("/report") async def get_report(limit: int = 10, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute( select(N2SFAssessment).order_by(desc(N2SFAssessment.created_at)).limit(limit) ) assessments = rows.scalars().all() return [{"id": a.id, "system_name": a.system_name, "zone": a.zone, "score": a.score, "passed": a.passed_items, "total": a.total_items, "grade": "적합" if a.score >= 80 else "보완 필요" if a.score >= 60 else "부적합", "created_at": a.created_at} for a in assessments] @router.get("/zones") async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute(select(N2SFZone).order_by(N2SFZone.zone)) zones = rows.scalars().all() if not zones: return [{"zone": z, "label": info["label"], "system_count": 0} for z, info in ZONES.items()] return [{"zone": z.zone, "label": ZONES.get(z.zone, {}).get("label", ""), "system_count": 0} for z in zones] @router.get("/violations") async def list_violations(db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)): rows = await db.execute( select(N2SFAssessment).where(N2SFAssessment.score < 60).order_by(desc(N2SFAssessment.created_at)) ) violations = rows.scalars().all() return [{"system": v.system_name, "zone": v.zone, "score": v.score, "created_at": v.created_at} for v in violations]