""" 행정망/인터넷망 분리 운영 관리 공공기관 필수: 행정망과 인터넷망을 분리 관리하고 방화벽 정책을 제어. 엔드포인트: GET /api/netzone/zones — 네트워크 존 목록 POST /api/netzone/zones — 존 등록 PUT /api/netzone/zones/{id} — 존 수정 GET /api/netzone/policies — 방화벽 정책 목록 POST /api/netzone/policies — 정책 추가 GET /api/netzone/audit — 정책 변경 이력 POST /api/netzone/verify — 존 분리 준수 여부 검증 """ from __future__ import annotations import ipaddress from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_admin_role from database import get_db from models import User, NetworkZone, NetworkPolicy, AuditLog router = APIRouter(prefix="/api/netzone", tags=["네트워크 망 분리"]) class ZoneCreate(BaseModel): name: str zone_type: str = Field(..., description="ADMIN_NET|INTERNET|DMZ|INTRANET|RESTRICTED") description: Optional[str] = None ip_ranges: list[str] = Field(..., description="CIDR 목록") class PolicyCreate(BaseModel): src_zone_id: int dst_zone_id: int protocol: str = "TCP" src_port: Optional[int] = None dst_port: Optional[int] = None action: str = Field("DENY", description="ALLOW|DENY") description: Optional[str] = None ZONE_TYPE_RULES = { "ADMIN_NET": {"allowed_to": ["ADMIN_NET", "DMZ"], "denied_to": ["INTERNET"]}, "INTERNET": {"allowed_to": ["DMZ"], "denied_to": ["ADMIN_NET", "INTRANET"]}, "DMZ": {"allowed_to": ["ADMIN_NET", "INTERNET"],"denied_to": []}, "INTRANET": {"allowed_to": ["INTRANET", "DMZ"], "denied_to": ["INTERNET"]}, "RESTRICTED": {"allowed_to": [], "denied_to": ["INTERNET", "DMZ"]}, } @router.post("/zones") async def create_zone(req: ZoneCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)): for cidr in req.ip_ranges: try: ipaddress.ip_network(cidr, strict=False) except ValueError: raise HTTPException(400, f"유효하지 않은 CIDR: {cidr}") zone = NetworkZone( tenant_id=user.tenant_id, name=req.name, zone_type=req.zone_type, description=req.description, ip_ranges=req.ip_ranges, created_at=datetime.utcnow(), ) db.add(zone); await db.commit(); await db.refresh(zone) return {"ok": True, "id": zone.id} @router.get("/zones") async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id)) zones = rows.scalars().all() return [ {"id": z.id, "name": z.name, "type": z.zone_type, "ip_ranges": z.ip_ranges, "description": z.description} for z in zones ] @router.post("/policies") async def create_policy(req: PolicyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)): policy = NetworkPolicy( tenant_id=user.tenant_id, src_zone_id=req.src_zone_id, dst_zone_id=req.dst_zone_id, protocol=req.protocol, dst_port=req.dst_port, action=req.action, description=req.description, created_by=user.id, created_at=datetime.utcnow(), ) db.add(policy) log = AuditLog(user_id=user.id, action="NETWORK_POLICY_CREATED", detail=f"정책 추가: Zone {req.src_zone_id} → {req.dst_zone_id} {req.action}", created_at=datetime.utcnow()) db.add(log) await db.commit() return {"ok": True} @router.get("/policies") async def list_policies(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id)) return [{"id": p.id, "src": p.src_zone_id, "dst": p.dst_zone_id, "protocol": p.protocol, "port": p.dst_port, "action": p.action} for p in rows.scalars().all()] @router.post("/verify") async def verify_zone_separation(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id)) zones = {z.zone_type: z for z in rows.scalars().all()} issues = [] if "ADMIN_NET" not in zones: issues.append("행정망 존 미등록") if "INTERNET" not in zones: issues.append("인터넷망 존 미등록") policy_rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id, NetworkPolicy.action == "ALLOW")) for p in policy_rows.scalars().all(): src = next((z.zone_type for z in zones.values() if z.id == p.src_zone_id), None) dst = next((z.zone_type for z in zones.values() if z.id == p.dst_zone_id), None) if src and dst: rules = ZONE_TYPE_RULES.get(src, {}) if dst in rules.get("denied_to", []): issues.append(f"정책 위반: {src} → {dst} ALLOW 설정") return {"compliant": len(issues) == 0, "issues": issues, "zone_count": len(zones)}