zioinfo-mail/workspace/guardia-itsm/routers/network_zone.py
DESKTOP-TKLFCPR\ython b8faec44e0 feat(advanced): GUARDiA 고급 확장 구현 — 20 routers + 754 endpoints
CMDB 자동 발견 (4개):
- autodiscovery.py: SSH 네트워크 스캔 + CMDB 자동 등록
- snmp_discovery.py: SNMP v2c/v3 장비 자동 발견
- dependency_map.py: 서비스 의존성 자동 매핑 (netstat)
- config_inventory.py: 서버 인벤토리 자동 수집 (SSH)

NL 쿼리 엔진 (3개):
- nlquery.py: Text-to-SQL (SELECT 전용, DML 차단)
- op_assistant.py: Multi-turn 대화형 운영 어시스턴트
- query_history.py: 쿼리 이력·즐겨찾기·공유

구성 드리프트 (3개):
- drift_detection.py: 골든 구성 vs 실제 비교·SR 자동 생성
- golden_config.py: 내장 CSAP 템플릿 + 버전 관리
- auto_remediation.py: 승인 기반 자동 교정 + 롤백

멀티클라우드 (4개):
- multicloud.py: 통합 관제 (NCloud+AWS+KT)
- aws_connector.py: AWS SigV4 직접 서명 연동
- cost_optimizer.py: AI 비용 최적화 권고
- cloud_migration.py: On-prem→K-Cloud 체크리스트

공공기관 특화 (6개):
- narasajang.py: 나라장터 OpenAPI 연동
- public_api_hub.py: data.go.kr KISA·기상청 허브
- isp_support.py: ISP 수립 지원 + AI 보고서
- network_zone.py: 행정망/인터넷망 분리 관리
- k_cloud.py: 정부 K-Cloud 전환 자동화
- e_procurement.py: 전자조달 계약·검수·납품

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-02 14:33:41 +09:00

125 lines
5.2 KiB
Python

"""
행정망/인터넷망 분리 운영 관리
공공기관 필수: 행정망과 인터넷망을 분리 관리하고 방화벽 정책을 제어.
엔드포인트:
GET /api/netzone/zones — 네트워크 존 목록
POST /api/netzone/zones — 존 등록
PUT /api/netzone/zones/{id} — 존 수정
GET /api/netzone/policies — 방화벽 정책 목록
POST /api/netzone/policies — 정책 추가
GET /api/netzone/audit — 정책 변경 이력
POST /api/netzone/verify — 존 분리 준수 여부 검증
"""
from __future__ import annotations
import ipaddress
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
from models import User, NetworkZone, NetworkPolicy, AuditLog
router = APIRouter(prefix="/api/netzone", tags=["네트워크 망 분리"])
class ZoneCreate(BaseModel):
name: str
zone_type: str = Field(..., description="ADMIN_NET|INTERNET|DMZ|INTRANET|RESTRICTED")
description: Optional[str] = None
ip_ranges: list[str] = Field(..., description="CIDR 목록")
class PolicyCreate(BaseModel):
src_zone_id: int
dst_zone_id: int
protocol: str = "TCP"
src_port: Optional[int] = None
dst_port: Optional[int] = None
action: str = Field("DENY", description="ALLOW|DENY")
description: Optional[str] = None
ZONE_TYPE_RULES = {
"ADMIN_NET": {"allowed_to": ["ADMIN_NET", "DMZ"], "denied_to": ["INTERNET"]},
"INTERNET": {"allowed_to": ["DMZ"], "denied_to": ["ADMIN_NET", "INTRANET"]},
"DMZ": {"allowed_to": ["ADMIN_NET", "INTERNET"],"denied_to": []},
"INTRANET": {"allowed_to": ["INTRANET", "DMZ"], "denied_to": ["INTERNET"]},
"RESTRICTED": {"allowed_to": [], "denied_to": ["INTERNET", "DMZ"]},
}
@router.post("/zones")
async def create_zone(req: ZoneCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
for cidr in req.ip_ranges:
try: ipaddress.ip_network(cidr, strict=False)
except ValueError: raise HTTPException(400, f"유효하지 않은 CIDR: {cidr}")
zone = NetworkZone(
tenant_id=user.tenant_id, name=req.name, zone_type=req.zone_type,
description=req.description, ip_ranges=req.ip_ranges,
created_at=datetime.utcnow(),
)
db.add(zone); await db.commit(); await db.refresh(zone)
return {"ok": True, "id": zone.id}
@router.get("/zones")
async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
zones = rows.scalars().all()
return [
{"id": z.id, "name": z.name, "type": z.zone_type,
"ip_ranges": z.ip_ranges, "description": z.description}
for z in zones
]
@router.post("/policies")
async def create_policy(req: PolicyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
policy = NetworkPolicy(
tenant_id=user.tenant_id,
src_zone_id=req.src_zone_id, dst_zone_id=req.dst_zone_id,
protocol=req.protocol, dst_port=req.dst_port,
action=req.action, description=req.description,
created_by=user.id, created_at=datetime.utcnow(),
)
db.add(policy)
log = AuditLog(user_id=user.id, action="NETWORK_POLICY_CREATED",
detail=f"정책 추가: Zone {req.src_zone_id}{req.dst_zone_id} {req.action}",
created_at=datetime.utcnow())
db.add(log)
await db.commit()
return {"ok": True}
@router.get("/policies")
async def list_policies(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id))
return [{"id": p.id, "src": p.src_zone_id, "dst": p.dst_zone_id,
"protocol": p.protocol, "port": p.dst_port, "action": p.action} for p in rows.scalars().all()]
@router.post("/verify")
async def verify_zone_separation(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
zones = {z.zone_type: z for z in rows.scalars().all()}
issues = []
if "ADMIN_NET" not in zones: issues.append("행정망 존 미등록")
if "INTERNET" not in zones: issues.append("인터넷망 존 미등록")
policy_rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id, NetworkPolicy.action == "ALLOW"))
for p in policy_rows.scalars().all():
src = next((z.zone_type for z in zones.values() if z.id == p.src_zone_id), None)
dst = next((z.zone_type for z in zones.values() if z.id == p.dst_zone_id), None)
if src and dst:
rules = ZONE_TYPE_RULES.get(src, {})
if dst in rules.get("denied_to", []):
issues.append(f"정책 위반: {src}{dst} ALLOW 설정")
return {"compliant": len(issues) == 0, "issues": issues, "zone_count": len(zones)}