CMDB 자동 발견 (4개): - autodiscovery.py: SSH 네트워크 스캔 + CMDB 자동 등록 - snmp_discovery.py: SNMP v2c/v3 장비 자동 발견 - dependency_map.py: 서비스 의존성 자동 매핑 (netstat) - config_inventory.py: 서버 인벤토리 자동 수집 (SSH) NL 쿼리 엔진 (3개): - nlquery.py: Text-to-SQL (SELECT 전용, DML 차단) - op_assistant.py: Multi-turn 대화형 운영 어시스턴트 - query_history.py: 쿼리 이력·즐겨찾기·공유 구성 드리프트 (3개): - drift_detection.py: 골든 구성 vs 실제 비교·SR 자동 생성 - golden_config.py: 내장 CSAP 템플릿 + 버전 관리 - auto_remediation.py: 승인 기반 자동 교정 + 롤백 멀티클라우드 (4개): - multicloud.py: 통합 관제 (NCloud+AWS+KT) - aws_connector.py: AWS SigV4 직접 서명 연동 - cost_optimizer.py: AI 비용 최적화 권고 - cloud_migration.py: On-prem→K-Cloud 체크리스트 공공기관 특화 (6개): - narasajang.py: 나라장터 OpenAPI 연동 - public_api_hub.py: data.go.kr KISA·기상청 허브 - isp_support.py: ISP 수립 지원 + AI 보고서 - network_zone.py: 행정망/인터넷망 분리 관리 - k_cloud.py: 정부 K-Cloud 전환 자동화 - e_procurement.py: 전자조달 계약·검수·납품 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
125 lines
5.2 KiB
Python
125 lines
5.2 KiB
Python
"""
|
|
행정망/인터넷망 분리 운영 관리
|
|
|
|
공공기관 필수: 행정망과 인터넷망을 분리 관리하고 방화벽 정책을 제어.
|
|
|
|
엔드포인트:
|
|
GET /api/netzone/zones — 네트워크 존 목록
|
|
POST /api/netzone/zones — 존 등록
|
|
PUT /api/netzone/zones/{id} — 존 수정
|
|
GET /api/netzone/policies — 방화벽 정책 목록
|
|
POST /api/netzone/policies — 정책 추가
|
|
GET /api/netzone/audit — 정책 변경 이력
|
|
POST /api/netzone/verify — 존 분리 준수 여부 검증
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_admin_role
|
|
from database import get_db
|
|
from models import User, NetworkZone, NetworkPolicy, AuditLog
|
|
|
|
router = APIRouter(prefix="/api/netzone", tags=["네트워크 망 분리"])
|
|
|
|
|
|
class ZoneCreate(BaseModel):
|
|
name: str
|
|
zone_type: str = Field(..., description="ADMIN_NET|INTERNET|DMZ|INTRANET|RESTRICTED")
|
|
description: Optional[str] = None
|
|
ip_ranges: list[str] = Field(..., description="CIDR 목록")
|
|
|
|
|
|
class PolicyCreate(BaseModel):
|
|
src_zone_id: int
|
|
dst_zone_id: int
|
|
protocol: str = "TCP"
|
|
src_port: Optional[int] = None
|
|
dst_port: Optional[int] = None
|
|
action: str = Field("DENY", description="ALLOW|DENY")
|
|
description: Optional[str] = None
|
|
|
|
|
|
ZONE_TYPE_RULES = {
|
|
"ADMIN_NET": {"allowed_to": ["ADMIN_NET", "DMZ"], "denied_to": ["INTERNET"]},
|
|
"INTERNET": {"allowed_to": ["DMZ"], "denied_to": ["ADMIN_NET", "INTRANET"]},
|
|
"DMZ": {"allowed_to": ["ADMIN_NET", "INTERNET"],"denied_to": []},
|
|
"INTRANET": {"allowed_to": ["INTRANET", "DMZ"], "denied_to": ["INTERNET"]},
|
|
"RESTRICTED": {"allowed_to": [], "denied_to": ["INTERNET", "DMZ"]},
|
|
}
|
|
|
|
|
|
@router.post("/zones")
|
|
async def create_zone(req: ZoneCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
|
|
for cidr in req.ip_ranges:
|
|
try: ipaddress.ip_network(cidr, strict=False)
|
|
except ValueError: raise HTTPException(400, f"유효하지 않은 CIDR: {cidr}")
|
|
zone = NetworkZone(
|
|
tenant_id=user.tenant_id, name=req.name, zone_type=req.zone_type,
|
|
description=req.description, ip_ranges=req.ip_ranges,
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(zone); await db.commit(); await db.refresh(zone)
|
|
return {"ok": True, "id": zone.id}
|
|
|
|
|
|
@router.get("/zones")
|
|
async def list_zones(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
|
|
zones = rows.scalars().all()
|
|
return [
|
|
{"id": z.id, "name": z.name, "type": z.zone_type,
|
|
"ip_ranges": z.ip_ranges, "description": z.description}
|
|
for z in zones
|
|
]
|
|
|
|
|
|
@router.post("/policies")
|
|
async def create_policy(req: PolicyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_admin_role)):
|
|
policy = NetworkPolicy(
|
|
tenant_id=user.tenant_id,
|
|
src_zone_id=req.src_zone_id, dst_zone_id=req.dst_zone_id,
|
|
protocol=req.protocol, dst_port=req.dst_port,
|
|
action=req.action, description=req.description,
|
|
created_by=user.id, created_at=datetime.utcnow(),
|
|
)
|
|
db.add(policy)
|
|
log = AuditLog(user_id=user.id, action="NETWORK_POLICY_CREATED",
|
|
detail=f"정책 추가: Zone {req.src_zone_id} → {req.dst_zone_id} {req.action}",
|
|
created_at=datetime.utcnow())
|
|
db.add(log)
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/policies")
|
|
async def list_policies(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id))
|
|
return [{"id": p.id, "src": p.src_zone_id, "dst": p.dst_zone_id,
|
|
"protocol": p.protocol, "port": p.dst_port, "action": p.action} for p in rows.scalars().all()]
|
|
|
|
|
|
@router.post("/verify")
|
|
async def verify_zone_separation(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
|
rows = await db.execute(select(NetworkZone).where(NetworkZone.tenant_id == user.tenant_id))
|
|
zones = {z.zone_type: z for z in rows.scalars().all()}
|
|
issues = []
|
|
if "ADMIN_NET" not in zones: issues.append("행정망 존 미등록")
|
|
if "INTERNET" not in zones: issues.append("인터넷망 존 미등록")
|
|
policy_rows = await db.execute(select(NetworkPolicy).where(NetworkPolicy.tenant_id == user.tenant_id, NetworkPolicy.action == "ALLOW"))
|
|
for p in policy_rows.scalars().all():
|
|
src = next((z.zone_type for z in zones.values() if z.id == p.src_zone_id), None)
|
|
dst = next((z.zone_type for z in zones.values() if z.id == p.dst_zone_id), None)
|
|
if src and dst:
|
|
rules = ZONE_TYPE_RULES.get(src, {})
|
|
if dst in rules.get("denied_to", []):
|
|
issues.append(f"정책 위반: {src} → {dst} ALLOW 설정")
|
|
return {"compliant": len(issues) == 0, "issues": issues, "zone_count": len(zones)}
|