144 lines
6.5 KiB
Python
144 lines
6.5 KiB
Python
"""N²SF — 국가 망 보안체계 준수 점검 (국정원 2026 의무)"""
|
|
from __future__ import annotations
|
|
import json, logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import User, N2SFAssessment, N2SFZone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/n2sf", tags=["N²SF"])
|
|
|
|
# N²SF 3단계 보안 구역 정의
|
|
ZONES = {
|
|
"A": {"label": "고위험 (개인정보·국가기밀)", "color": "#EF4444",
|
|
"requirements": ["완전 망 분리", "AES-256 암호화", "접근 로그 100% 기록",
|
|
"2FA 필수", "USB 차단", "화면보호기 5분 이내"]},
|
|
"B": {"label": "중위험 (일반 업무 데이터)", "color": "#F59E0B",
|
|
"requirements": ["논리적 망 분리", "TLS 1.2 이상", "접근 로그 기록",
|
|
"MFA 권고", "바이러스 백신 필수"]},
|
|
"C": {"label": "저위험 (공개 정보)", "color": "#10B981",
|
|
"requirements": ["기본 방화벽", "HTTPS 적용", "정기 보안 패치"]},
|
|
}
|
|
|
|
# 평가 항목 50개 (축약)
|
|
CHECKLIST_ITEMS = [
|
|
{"id": "N001", "zone": "A", "category": "망 분리", "item": "개인정보 처리 시스템 물리적 망 분리"},
|
|
{"id": "N002", "zone": "A", "category": "접근 통제", "item": "특권 계정 2FA 인증 적용"},
|
|
{"id": "N003", "zone": "A", "category": "암호화", "item": "저장 데이터 AES-256 이상 암호화"},
|
|
{"id": "N004", "zone": "A", "category": "감사 로그", "item": "접근 로그 1년 이상 보존"},
|
|
{"id": "N005", "zone": "A", "category": "미디어 통제", "item": "USB 등 이동식 미디어 차단"},
|
|
{"id": "N006", "zone": "B", "category": "망 분리", "item": "업무 시스템 논리적 네트워크 분리"},
|
|
{"id": "N007", "zone": "B", "category": "암호화", "item": "전송 데이터 TLS 1.2 이상"},
|
|
{"id": "N008", "zone": "B", "category": "접근 통제", "item": "최소 권한 원칙 적용"},
|
|
{"id": "N009", "zone": "B", "category": "보안 패치", "item": "OS·미들웨어 정기 패치 (30일 이내)"},
|
|
{"id": "N010", "zone": "B", "category": "바이러스 백신", "item": "모든 단말 백신 설치 및 최신화"},
|
|
{"id": "N011", "zone": "C", "category": "방화벽", "item": "불필요 포트 차단"},
|
|
{"id": "N012", "zone": "C", "category": "웹 보안", "item": "HTTPS 적용 및 인증서 유효"},
|
|
{"id": "N013", "zone": "C", "category": "취약점", "item": "정기 취약점 점검 (분기 1회)"},
|
|
]
|
|
|
|
|
|
class ZoneClassify(BaseModel):
|
|
system_name: str
|
|
handles_personal_info: bool = False
|
|
handles_classified: bool = False
|
|
internet_facing: bool = False
|
|
|
|
|
|
class AssessmentCreate(BaseModel):
|
|
system_name: str
|
|
zone: str # A|B|C
|
|
checked_items: list = [] # ["N001", "N002", ...]
|
|
notes: str = ""
|
|
|
|
|
|
@router.get("/policies")
|
|
async def list_policies(user: User = Depends(get_current_user)):
|
|
return [{"zone": z, **info} for z, info in ZONES.items()]
|
|
|
|
|
|
@router.post("/classify")
|
|
async def classify_zone(body: ZoneClassify, user: User = Depends(get_current_user)):
|
|
"""시스템 특성에 따른 N²SF 구역 자동 분류."""
|
|
if body.handles_classified or body.handles_personal_info:
|
|
zone = "A"
|
|
elif not body.internet_facing:
|
|
zone = "B"
|
|
else:
|
|
zone = "C"
|
|
return {
|
|
"system": body.system_name,
|
|
"recommended_zone": zone,
|
|
"zone_label": ZONES[zone]["label"],
|
|
"requirements": ZONES[zone]["requirements"],
|
|
}
|
|
|
|
|
|
@router.get("/checklist")
|
|
async def get_checklist(zone: Optional[str] = None, user: User = Depends(get_current_user)):
|
|
items = CHECKLIST_ITEMS
|
|
if zone:
|
|
items = [i for i in items if i["zone"] == zone.upper()]
|
|
return {"total": len(items), "items": items}
|
|
|
|
|
|
@router.post("/assess")
|
|
async def create_assessment(body: AssessmentCreate, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
zone_items = [i for i in CHECKLIST_ITEMS if i["zone"] == body.zone]
|
|
total = len(zone_items)
|
|
passed = len([i for i in zone_items if i["id"] in body.checked_items])
|
|
score = round(passed / total * 100) if total > 0 else 0
|
|
|
|
assessment = N2SFAssessment(
|
|
system_name=body.system_name, zone=body.zone,
|
|
total_items=total, passed_items=passed, score=score,
|
|
checked_items=json.dumps(body.checked_items),
|
|
notes=body.notes, assessed_by=user.id, created_at=datetime.utcnow()
|
|
)
|
|
db.add(assessment); await db.commit(); await db.refresh(assessment)
|
|
return {"assessment_id": assessment.id, "score": score, "passed": passed, "total": total,
|
|
"grade": "적합" if score >= 80 else "보완 필요" if score >= 60 else "부적합"}
|
|
|
|
|
|
@router.get("/report")
|
|
async def get_report(limit: int = 10, db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user)):
|
|
rows = await db.execute(
|
|
select(N2SFAssessment).order_by(desc(N2SFAssessment.created_at)).limit(limit)
|
|
)
|
|
assessments = rows.scalars().all()
|
|
return [{"id": a.id, "system_name": a.system_name, "zone": a.zone,
|
|
"score": a.score, "passed": a.passed_items, "total": a.total_items,
|
|
"grade": "적합" if a.score >= 80 else "보완 필요" if a.score >= 60 else "부적합",
|
|
"created_at": a.created_at}
|
|
for a in assessments]
|
|
|
|
|
|
@router.get("/zones")
|
|
async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(N2SFZone).order_by(N2SFZone.zone))
|
|
zones = rows.scalars().all()
|
|
if not zones:
|
|
return [{"zone": z, "label": info["label"], "system_count": 0} for z, info in ZONES.items()]
|
|
return [{"zone": z.zone, "label": ZONES.get(z.zone, {}).get("label", ""), "system_count": 0}
|
|
for z in zones]
|
|
|
|
|
|
@router.get("/violations")
|
|
async def list_violations(db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(require_admin_role)):
|
|
rows = await db.execute(
|
|
select(N2SFAssessment).where(N2SFAssessment.score < 60).order_by(desc(N2SFAssessment.created_at))
|
|
)
|
|
violations = rows.scalars().all()
|
|
return [{"system": v.system_name, "zone": v.zone, "score": v.score,
|
|
"created_at": v.created_at} for v in violations]
|