125 lines
5.2 KiB
Python
125 lines
5.2 KiB
Python
"""
|
|
행정망/인터넷망 분리 운영 관리
|
|
|
|
공공기관 필수: 행정망과 인터넷망을 분리 관리하고 방화벽 정책을 제어.
|
|
|
|
엔드포인트:
|
|
GET /api/netzone/zones — 네트워크 존 목록
|
|
POST /api/netzone/zones — 존 등록
|
|
PUT /api/netzone/zones/{id} — 존 수정
|
|
GET /api/netzone/policies — 방화벽 정책 목록
|
|
POST /api/netzone/policies — 정책 추가
|
|
GET /api/netzone/audit — 정책 변경 이력
|
|
POST /api/netzone/verify — 존 분리 준수 여부 검증
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import User, NetworkZone, NetworkPolicy, AuditLog
|
|
|
|
router = APIRouter(prefix="/api/netzone", tags=["네트워크 망 분리"])
|
|
|
|
|
|
class ZoneCreate(BaseModel):
|
|
name: str
|
|
zone_type: str = Field(..., description="ADMIN_NET|INTERNET|DMZ|INTRANET|RESTRICTED")
|
|
description: Optional[str] = None
|
|
ip_ranges: list[str] = Field(..., description="CIDR 목록")
|
|
|
|
|
|
class PolicyCreate(BaseModel):
|
|
src_zone_id: int
|
|
dst_zone_id: int
|
|
protocol: str = "TCP"
|
|
src_port: Optional[int] = None
|
|
dst_port: Optional[int] = None
|
|
action: str = Field("DENY", description="ALLOW|DENY")
|
|
description: Optional[str] = None
|
|
|
|
|
|
ZONE_TYPE_RULES = {
|
|
"ADMIN_NET": {"allowed_to": ["ADMIN_NET", "DMZ"], "denied_to": ["INTERNET"]},
|
|
"INTERNET": {"allowed_to": ["DMZ"], "denied_to": ["ADMIN_NET", "INTRANET"]},
|
|
"DMZ": {"allowed_to": ["ADMIN_NET", "INTERNET"],"denied_to": []},
|
|
"INTRANET": {"allowed_to": ["INTRANET", "DMZ"], "denied_to": ["INTERNET"]},
|
|
"RESTRICTED": {"allowed_to": [], "denied_to": ["INTERNET", "DMZ"]},
|
|
}
|
|
|
|
|
|
@router.post("/zones")
|
|
async def create_zone(req: ZoneCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
|
|
for cidr in req.ip_ranges:
|
|
try: ipaddress.ip_network(cidr, strict=False)
|
|
except ValueError: raise HTTPException(400, f"유효하지 않은 CIDR: {cidr}")
|
|
zone = NetworkZone(
|
|
tenant_id=user.tenant_id, name=req.name, zone_type=req.zone_type,
|
|
description=req.description, ip_ranges=req.ip_ranges,
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(zone); await db.commit(); await db.refresh(zone)
|
|
return {"ok": True, "id": zone.id}
|
|
|
|
|
|
@router.get("/zones")
|
|
async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
|
|
zones = rows.scalars().all()
|
|
return [
|
|
{"id": z.id, "name": z.name, "type": z.zone_type,
|
|
"ip_ranges": z.ip_ranges, "description": z.description}
|
|
for z in zones
|
|
]
|
|
|
|
|
|
@router.post("/policies")
|
|
async def create_policy(req: PolicyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
|
|
policy = NetworkPolicy(
|
|
tenant_id=user.tenant_id,
|
|
src_zone_id=req.src_zone_id, dst_zone_id=req.dst_zone_id,
|
|
protocol=req.protocol, dst_port=req.dst_port,
|
|
action=req.action, description=req.description,
|
|
created_by=user.id, created_at=datetime.utcnow(),
|
|
)
|
|
db.add(policy)
|
|
log = AuditLog(user_id=user.id, action="NETWORK_POLICY_CREATED",
|
|
detail=f"정책 추가: Zone {req.src_zone_id} → {req.dst_zone_id} {req.action}",
|
|
created_at=datetime.utcnow())
|
|
db.add(log)
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/policies")
|
|
async def list_policies(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id))
|
|
return [{"id": p.id, "src": p.src_zone_id, "dst": p.dst_zone_id,
|
|
"protocol": p.protocol, "port": p.dst_port, "action": p.action} for p in rows.scalars().all()]
|
|
|
|
|
|
@router.post("/verify")
|
|
async def verify_zone_separation(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
|
|
zones = {z.zone_type: z for z in rows.scalars().all()}
|
|
issues = []
|
|
if "ADMIN_NET" not in zones: issues.append("행정망 존 미등록")
|
|
if "INTERNET" not in zones: issues.append("인터넷망 존 미등록")
|
|
policy_rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id, NetworkPolicy.action == "ALLOW"))
|
|
for p in policy_rows.scalars().all():
|
|
src = next((z.zone_type for z in zones.values() if z.id == p.src_zone_id), None)
|
|
dst = next((z.zone_type for z in zones.values() if z.id == p.dst_zone_id), None)
|
|
if src and dst:
|
|
rules = ZONE_TYPE_RULES.get(src, {})
|
|
if dst in rules.get("denied_to", []):
|
|
issues.append(f"정책 위반: {src} → {dst} ALLOW 설정")
|
|
return {"compliant": len(issues) == 0, "issues": issues, "zone_count": len(zones)}
|