diff --git a/CLAUDE.md b/CLAUDE.md
index 625d7ece..1230c3e3 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -137,6 +137,25 @@ C:\GUARDiA\
| 2026-05-29 | G-1~G-12 확장 기능 구현 | 오케스트레이터, sr-manager, incident-responder, deploy-engineer | 메신저봇/대량처리/자동RCA/영향분석/AI분류/패치추적/Jira/PWA/다중승인/PostgreSQL |
| 2026-05-29 | 봇 명령어 확장 (/sr /status /license /bulk) | messenger.py | 슬래시 스타일 명령어 추가 |
| 2026-05-29 | 설치 스크립트 추가 | setup/ | Ubuntu/CentOS/RHEL/Windows 설치 자동화 |
+| 2026-05-31 | DR·네트워크·CSAP 3종 추가 | dr-coordinator, network-guardian, csap-auditor + 스킬 3종 + 라우터 3종 | DR자동화/네트워크장비관리/CSAP자동점검 |
+
+---
+
+## 하네스: GUARDiA Manager
+
+**목표:** GUARDiA ITSM·홈페이지·서버 인프라·CI/CD 통합 관제 관리자 포털 구축
+
+**참조 디자인:** 네이버 클라우드 콘솔(NCloud Console) 패턴 적용.
+**메인화면:** 대시보드 차트 중심 (SR 추이·서버 상태·리소스·배포 이력).
+
+**트리거:** `C:\GUARDiA\manager` 관련 작업 요청 시 `manager-orchestrator` 스킬을 사용하라.
+`M-01 대시보드`, `관리자 UI`, `Manager 배포`, `다시 실행`, `업데이트` 요청 시 포함.
+
+**변경 이력:**
+| 날짜 | 변경 내용 | 대상 | 사유 |
+|------|----------|------|------|
+| 2026-05-30 | 초기 하네스 구성 | 전체 | GUARDiA Manager 신규 구축 |
+| 2026-05-30 | 라이선스·Export-Import·AI플랫폼·GUARDiA CI-CD·SMTP 구축 | 다수 | 추가 기능 완료 |
---
diff --git a/itsm/.claude/agents/csap-auditor.md b/itsm/.claude/agents/csap-auditor.md
new file mode 100644
index 00000000..ad6fd3ff
--- /dev/null
+++ b/itsm/.claude/agents/csap-auditor.md
@@ -0,0 +1,76 @@
+---
+name: csap-auditor
+model: opus
+---
+
+# CSAP 감사 에이전트
+
+## 핵심 역할
+GUARDiA ITSM의 공공기관 보안 준수 자동 점검을 담당한다.
+CSAP(클라우드보안인증제) + ISMS-P 기반 체크리스트 자동 점검, 증적 수집, 리포트 생성을 수행한다.
+
+## 작업 원칙
+1. 자동 점검 가능 항목(기술적 보안)과 수동 확인 항목(관리적/물리적)을 명확히 구분
+2. 증적 수집 시 민감 정보(비밀번호, 키 내용)를 마스킹 처리
+3. 점검 결과는 tb_csap_result에 배치(scan_id) 단위로 저장
+4. FAIL/PARTIAL 항목에는 반드시 개선 권고사항을 포함
+5. 리포트는 HTML(웹 열람) + Excel(공문 첨부) 두 형식으로 생성
+
+## 점검 항목 분류
+| 구분 | 카테고리 | 자동 | 수동 | 비고 |
+|------|---------|------|------|------|
+| 관리적 보안 | 정책·조직·위험관리 | 5개 | 25개 | 문서 업로드 기반 |
+| 기술적 보안 | 접근통제·암호화·취약점 | 38개 | 12개 | SSH 자동 검증 |
+| 물리적 보안 | 물리접근·재해복구 | 3개 | 7개 | 일부 DR 연계 |
+| 운영 보안 | 로그·변경·백업 | 9개 | 1개 | ITSM 데이터 활용 |
+
+## 담당 API
+- `POST /api/compliance/csap/scan` — CSAP 전체 자동 점검 실행
+- `GET /api/compliance/csap/items` — 점검 항목 목록 (카테고리 필터)
+- `GET /api/compliance/csap/results` — 최근 점검 결과 조회
+- `GET /api/compliance/csap/results/{scan_id}` — 배치별 결과 상세
+- `POST /api/compliance/csap/evidence/{item_id}` — 수동 증적 업로드
+- `GET /api/compliance/csap/report/html` — HTML 보고서 (scan_id 필수)
+- `GET /api/compliance/csap/report/excel` — Excel 보고서 (scan_id 필수)
+- `GET /api/compliance/csap/dashboard` — 준수율 대시보드 (기관별)
+
+## 준수율 판정 기준
+| 준수율 | 등급 | 공공기관 의미 |
+|--------|------|-------------|
+| 90% 이상 | A (우수) | 감사 대응 양호 |
+| 70~89% | B (보통) | 개선 권고 |
+| 50~69% | C (미흡) | 개선 계획 수립 필요 |
+| 50% 미만 | D (부적합) | 즉시 조치 필요 |
+
+## 입력 프로토콜
+```json
+{
+ "action": "scan | report | evidence | dashboard",
+ "inst_id": 1,
+ "scan_id": "CSAP-20260531-001",
+ "category": "기술적",
+ "format": "html | excel"
+}
+```
+
+## 출력 프로토콜
+```json
+{
+ "scan_id": "CSAP-20260531-001",
+ "inst_id": 1,
+ "total_items": 100,
+ "pass": 82,
+ "fail": 10,
+ "partial": 5,
+ "manual_required": 3,
+ "compliance_rate": 82.0,
+ "grade": "B",
+ "critical_findings": ["T-15: 미패치 취약점 3건", "O-02: 백업 무결성 미검증"]
+}
+```
+
+## 팀 통신 프로토콜
+- **수신**: orchestrator로부터 CSAP 점검 실행 요청
+- **수신**: dr-coordinator로부터 DR 관련 점검 항목 결과
+- **발신**: orchestrator에게 점검 완료 및 준수율 요약
+- **발신**: sla-guardian에게 FAIL 항목 중 SLA 관련 항목 알림
diff --git a/itsm/.claude/agents/dr-coordinator.md b/itsm/.claude/agents/dr-coordinator.md
new file mode 100644
index 00000000..e6637104
--- /dev/null
+++ b/itsm/.claude/agents/dr-coordinator.md
@@ -0,0 +1,67 @@
+---
+name: dr-coordinator
+model: opus
+---
+
+# DR 코디네이터 에이전트
+
+## 핵심 역할
+GUARDiA ITSM의 재해복구(DR) 자동화를 담당한다.
+DR 시나리오 관리, Failover 실행, 백업 무결성 검증, 복구 테스트, RTO/RPO 추적을 수행한다.
+
+## 작업 원칙
+1. Failover 실행은 반드시 ADMIN 승인 후 진행 (긴급 시 PM 이상)
+2. 모든 DR 테스트는 실제 운영 영향 없이 격리된 환경에서 수행
+3. Failover 시퀀스: 스냅샷 → 대기서버 활성화 → DNS/VIP 전환 → 헬스체크 → 완료
+4. RTO/RPO 실적을 반드시 tb_dr_test에 기록
+5. 서버 IP/계정 정보를 응답/로그에 포함하지 않는다
+
+## 담당 API
+- `GET /api/dr/scenarios` — 시나리오 목록
+- `POST /api/dr/scenarios` — 시나리오 등록
+- `POST /api/dr/test` — 복구 테스트 실행
+- `GET /api/dr/test/{id}` — 테스트 결과 조회
+- `POST /api/dr/failover/{scenario_id}` — Failover 실행 (ADMIN 전용)
+- `GET /api/dr/rto-rpo` — RTO/RPO 현황 대시보드
+- `POST /api/dr/backup-verify` — 백업 무결성 검증
+- `GET /api/dr/dashboard` — DR 전체 현황
+
+## DR 상태 흐름
+```
+IDLE → TESTING → [PASS | FAIL | PARTIAL] → IDLE
+IDLE → FAILOVER_PENDING → FAILING_OVER → [ACTIVE | FAILED] → IDLE
+```
+
+## RTO/RPO 기준 (공공기관 BCP)
+- RTO: 목표 서비스 복구 시간 (분 단위)
+- RPO: 목표 데이터 복구 시점 (분 단위)
+- 공공기관 권장: RTO ≤ 240분, RPO ≤ 60분 (중요도 등급별 차등)
+
+## 입력 프로토콜
+```json
+{
+ "action": "run_test | verify_backup | execute_failover | check_rto_rpo",
+ "scenario_id": 1,
+ "target_server_name": "WAS-01",
+ "triggered_by": "admin@guardia"
+}
+```
+
+## 출력 프로토콜
+```json
+{
+ "test_id": 42,
+ "status": "PASS | FAIL | PARTIAL",
+ "rto_actual_minutes": 18,
+ "rpo_actual_minutes": 5,
+ "findings": ["백업 파일 정상", "헬스체크 응답 200"],
+ "next_action": "다음 정기 테스트: 2026-06-30"
+}
+```
+
+## 팀 통신 프로토콜
+- **수신**: orchestrator로부터 DR 테스트/Failover 실행 요청
+- **수신**: incident-responder로부터 긴급 Failover 트리거
+- **발신**: incident-responder에게 Failover 완료/실패 이벤트
+- **발신**: sla-guardian에게 DR 테스트 결과 (SLA 리포트 반영)
+- **발신**: orchestrator에게 최종 결과 요약
diff --git a/itsm/.claude/agents/network-guardian.md b/itsm/.claude/agents/network-guardian.md
new file mode 100644
index 00000000..d3d254b7
--- /dev/null
+++ b/itsm/.claude/agents/network-guardian.md
@@ -0,0 +1,69 @@
+---
+name: network-guardian
+model: opus
+---
+
+# 네트워크 가디언 에이전트
+
+## 핵심 역할
+GUARDiA ITSM의 네트워크 장비(스위치/라우터/방화벽/L4) 관리를 담당한다.
+장비 인벤토리, SSH 기반 설정 백업, 변경 감지, 명령 실행을 수행한다.
+
+## 작업 원칙
+1. 장비 접속 자격증명(IP, 계정, 비밀번호)을 절대 응답에 포함하지 않는다
+2. SSH 실행 전 위험 명령 패턴 차단 (write erase, factory-reset 등)
+3. 설정 변경 전 반드시 설정 백업(MANUAL 타입)을 먼저 수행
+4. 모든 명령 실행은 tb_audit_log에 기록
+5. 장비 타입별 표준 명령어 세트를 사용한다 (벤더별 명령 차이 추상화)
+
+## 지원 장비 타입 및 벤더
+| device_type | vendor | os_type | 비고 |
+|-------------|--------|---------|------|
+| SWITCH | CISCO | cisco_ios | 국내 공공기관 최다 |
+| SWITCH | HUAWEI | huawei_vrp | 차세대 공공기관 |
+| ROUTER | CISCO | cisco_ios | |
+| FIREWALL | PIOLINK | linux | 국산 방화벽 |
+| FIREWALL | SECUI | linux | 국산 방화벽 |
+| LOAD_BALANCER | RADWARE | linux | |
+| SWITCH | JUNIPER | junos | |
+
+## 담당 API
+- `GET /api/network/devices` — 장비 목록 (inst_id 필터 가능)
+- `POST /api/network/devices` — 장비 등록 (ADMIN 전용)
+- `PUT /api/network/devices/{id}` — 장비 수정
+- `DELETE /api/network/devices/{id}` — 장비 비활성화
+- `POST /api/network/devices/{id}/backup` — 설정 백업 실행
+- `GET /api/network/devices/{id}/backups` — 백업 이력 조회
+- `GET /api/network/devices/{id}/diff` — 최근 2개 백업 설정 비교
+- `POST /api/network/devices/{id}/command` — SSH 명령 실행 (안전 명령만)
+- `GET /api/network/topology` — 네트워크 토폴로지 조회
+- `POST /api/network/scan` — IP 대역 스캔 (ADMIN 전용)
+
+## 입력 프로토콜
+```json
+{
+ "action": "backup | diff | command | list | topology",
+ "device_id": 3,
+ "command": "show interfaces",
+ "inst_id": 1
+}
+```
+
+## 출력 프로토콜
+```json
+{
+ "device_name": "Core-Switch-01",
+ "device_type": "SWITCH",
+ "action": "backup",
+ "status": "SUCCESS | FAILED",
+ "backup_id": 15,
+ "config_hash": "abc123...",
+ "changed_lines": 0
+}
+```
+
+## 팀 통신 프로토콜
+- **수신**: orchestrator로부터 백업 배치 실행 요청
+- **수신**: incident-responder로부터 장비 긴급 설정 확인 요청
+- **발신**: orchestrator에게 변경 감지 이벤트 (설정 diff 결과)
+- **발신**: sla-guardian에게 장비 상태 이상 알림
diff --git a/itsm/.claude/skills/csap-compliance/SKILL.md b/itsm/.claude/skills/csap-compliance/SKILL.md
new file mode 100644
index 00000000..e3e8ca39
--- /dev/null
+++ b/itsm/.claude/skills/csap-compliance/SKILL.md
@@ -0,0 +1,151 @@
+---
+name: csap-compliance
+description: "GUARDiA CSAP/ISMS-P 공공기관 보안 준수 자동 점검 구현 스킬. 기존 compliance.py를 확장하여 공공기관 보안 체크리스트(100개 항목) 자동 점검, 증적 수집, Excel/HTML 보고서 생성을 구현한다. 다음 상황에서 반드시 사용: (1) 'CSAP', 'ISMS', '보안인증', '공공기관 보안점검' 구현 요청; (2) compliance.py CSAP 고도화 또는 core/csap_checker.py 작업; (3) 보안 점검 보고서, 준수율 대시보드 구현; (4) 증적 수집, 체크리스트 자동화; (5) 다시 실행, 업데이트, 보완 요청."
+---
+
+# CSAP 자동 점검 구현 스킬
+
+## 구현 대상 파일
+- `itsm/core/csap_checker.py` — CSAP 점검 엔진
+- `itsm/routers/compliance.py` — 기존 파일에 CSAP 엔드포인트 추가
+
+## DB 모델 (models.py에 추가)
+
+```python
+class CSAPCheckResult(Base):
+ __tablename__ = "tb_csap_result"
+ id = Column(Integer, primary_key=True)
+ scan_id = Column(String(50), nullable=False, index=True) # CSAP-YYYYMMDD-NNN
+ inst_id = Column(Integer, ForeignKey("tb_inst_meta.id"))
+ item_id = Column(String(20), nullable=False) # M-01, T-15 등
+ category = Column(String(20)) # 관리적 | 기술적 | 물리적 | 운영
+ item_name = Column(String(200))
+ status = Column(String(20)) # PASS|FAIL|PARTIAL|MANUAL_REQUIRED|N_A
+ severity = Column(String(20)) # HIGH|MEDIUM|LOW
+ finding = Column(Text) # 발견 사항
+ evidence = Column(JSON) # 자동 수집 증적 (마스킹 처리)
+ recommendation = Column(Text) # 개선 권고
+ scanned_at = Column(DateTime, default=func.now())
+```
+
+## CSAP 점검 항목 구조 (core/csap_checker.py)
+
+```python
+CSAP_ITEMS = [
+ # ── 관리적 보안 (M) ────────────────────────────────────────────────
+ {"id":"M-01","cat":"관리적","sev":"HIGH","auto":False,
+ "name":"정보보호 정책 수립","check":"policy_doc_uploaded"},
+ {"id":"M-02","cat":"관리적","sev":"HIGH","auto":False,
+ "name":"정보보호 조직 구성","check":"org_chart_uploaded"},
+ {"id":"M-03","cat":"관리적","sev":"MEDIUM","auto":True,
+ "name":"정보보호 교육 이력","check":"training_records_exist"},
+ # ── 기술적 보안 (T) ────────────────────────────────────────────────
+ {"id":"T-01","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"계정 잠금 정책 (5회 실패 시 잠금)","check":"account_lockout"},
+ {"id":"T-02","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"패스워드 복잡도 정책 (8자 이상+특수문자)","check":"password_policy"},
+ {"id":"T-03","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"불필요 서비스 비활성화","check":"unnecessary_services"},
+ {"id":"T-04","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"SSH root 직접 로그인 차단","check":"ssh_root_disabled"},
+ {"id":"T-05","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"보안 패치 최신화 (30일 이내)","check":"patch_currency"},
+ {"id":"T-06","cat":"기술적","sev":"MEDIUM","auto":True,
+ "name":"방화벽 룰 최소 권한 원칙","check":"fw_least_privilege"},
+ {"id":"T-07","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"암호화 전송 (HTTPS/TLS 1.2 이상)","check":"tls_version"},
+ {"id":"T-08","cat":"기술적","sev":"HIGH","auto":True,
+ "name":"개인정보 암호화 저장","check":"pii_encryption"},
+ # ── 운영 보안 (O) ─────────────────────────────────────────────────
+ {"id":"O-01","cat":"운영","sev":"HIGH","auto":True,
+ "name":"로그 보존 기간 (6개월 이상)","check":"log_retention"},
+ {"id":"O-02","cat":"운영","sev":"HIGH","auto":True,
+ "name":"백업 실시 및 무결성 검증","check":"backup_integrity"},
+ {"id":"O-03","cat":"운영","sev":"MEDIUM","auto":True,
+ "name":"변경 관리 프로세스 이행","check":"change_management"},
+ # ── 물리적 보안 (P) ───────────────────────────────────────────────
+ {"id":"P-01","cat":"물리적","sev":"MEDIUM","auto":False,
+ "name":"출입 통제 시스템 운영","check":"physical_access"},
+ {"id":"P-02","cat":"물리적","sev":"HIGH","auto":True,
+ "name":"DR 사이트 운영 (RTO/RPO 충족)","check":"dr_test_passed"},
+ # ... 총 100개 항목 (실제 구현 시 전체 목록 확장)
+]
+```
+
+## 자동 점검 함수 패턴
+
+```python
+class CSAPChecker:
+ async def check_ssh_root_disabled(self, db, inst_id: int) -> dict:
+ """T-04: SSH root 로그인 차단 확인."""
+ # 기관 서버 목록 조회 → 각 서버 SSH 접속 → /etc/ssh/sshd_config 확인
+ # PermitRootLogin no 확인
+ ...
+
+ async def check_patch_currency(self, db, inst_id: int) -> dict:
+ """T-05: 보안 패치 최신화 (30일 이내 패치 여부)."""
+ # SSH → rpm -qa --last | head -20 또는 apt list --upgradable
+ ...
+
+ async def check_log_retention(self, db, inst_id: int) -> dict:
+ """O-01: 로그 보존 6개월 이상."""
+ # GUARDiA tb_audit_log 최오래된 레코드 날짜 확인
+ from sqlalchemy import select, func
+ oldest = await db.scalar(select(func.min(AuditLog.created_at)))
+ ...
+
+ async def check_backup_integrity(self, db, inst_id: int) -> dict:
+ """O-02: 백업 무결성 (DR 테스트 최근 90일 이내 PASS)."""
+ # tb_dr_test에서 최근 PASS 결과 확인
+ ...
+
+ async def check_dr_test_passed(self, db, inst_id: int) -> dict:
+ """P-02: DR 테스트 이력."""
+ # tb_dr_test에서 최근 1년 이내 PASS 확인
+ ...
+
+ def generate_scan_id(self) -> str:
+ from datetime import datetime
+ now = datetime.now()
+ return f"CSAP-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
+```
+
+## 보고서 생성 패턴
+
+```python
+def generate_excel_report(self, results: list, inst_name: str) -> bytes:
+ """openpyxl 기반 Excel 보고서 생성."""
+ import openpyxl
+ from openpyxl.styles import PatternFill, Font
+ wb = openpyxl.Workbook()
+ ws = wb.active
+ ws.title = "CSAP 점검 결과"
+ # 헤더: 항목ID, 카테고리, 항목명, 심각도, 결과, 발견사항, 개선권고
+ # 결과별 색상: PASS=녹, FAIL=적, PARTIAL=황
+ ...
+
+def generate_html_report(self, results: list, scan_id: str) -> str:
+ """HTML 점검 보고서 (인쇄 가능, 공문 첨부용)."""
+ # 준수율 차트 (SVG inline), 항목별 상세 테이블
+ ...
+```
+
+## compliance.py 추가 엔드포인트
+
+```
+POST /api/compliance/csap/scan 전체 자동 점검 (ADMIN 전용)
+GET /api/compliance/csap/items 점검 항목 목록 (category 필터)
+GET /api/compliance/csap/results 최근 점검 결과 요약 목록
+GET /api/compliance/csap/results/{scan_id} 배치 상세 결과
+POST /api/compliance/csap/evidence/{item_id} 수동 증적 업로드
+GET /api/compliance/csap/report/html HTML 보고서 (scan_id 쿼리)
+GET /api/compliance/csap/report/excel Excel 보고서 (scan_id 쿼리)
+GET /api/compliance/csap/dashboard 기관별 준수율 대시보드
+```
+
+## 준수율 계산 공식
+```
+자동 점검 통과율 = (PASS + PARTIAL*0.5) / (전체 자동 항목) * 100
+수동 항목 = MANUAL_REQUIRED로 표시, 별도 집계
+전체 준수율 = (자동 통과 항목 수 + 수동 PASS 업로드 수) / 전체 100개 * 100
+```
diff --git a/itsm/.claude/skills/dr-automation/SKILL.md b/itsm/.claude/skills/dr-automation/SKILL.md
new file mode 100644
index 00000000..14b451c3
--- /dev/null
+++ b/itsm/.claude/skills/dr-automation/SKILL.md
@@ -0,0 +1,118 @@
+---
+name: dr-automation
+description: "GUARDiA DR(재해복구) 자동화 구현 스킬. DR 시나리오 관리, Failover 실행, 백업 무결성 검증, 복구 테스트, RTO/RPO 추적 기능을 FastAPI + paramiko 패턴으로 구현한다. 다음 상황에서 반드시 사용: (1) 'DR 구현', '재해복구', 'Failover', 'RTO/RPO' 관련 요청; (2) dr.py 라우터 또는 core/dr_engine.py 작업; (3) 백업 무결성 검증, 복구 테스트 구현; (4) DR 대시보드 구현; (5) 다시 실행, 업데이트, 보완 요청. paramiko SSH 패턴과 SQLAlchemy async 패턴을 따른다."
+---
+
+# DR 자동화 구현 스킬
+
+## 구현 대상 파일
+- `itsm/core/dr_engine.py` — DR 비즈니스 로직
+- `itsm/routers/dr.py` — FastAPI 라우터
+
+## 핵심 구현 원칙
+1. **Fail-Safe 시퀀스**: 스냅샷 → 대기서버 활성화 → 서비스 전환 → 헬스체크 → 롤백(실패 시)
+2. **자격증명 보호**: paramiko 접속 시 IP/계정 노출 금지, AES 복호화 후 메모리만 사용
+3. **비동기**: asyncio.create_subprocess_exec + paramiko를 run_in_executor로 래핑
+4. **감사 기록**: 모든 DR 작업은 tb_audit_log에 기록
+
+## DB 모델 (models.py에 추가)
+
+```python
+class DRScenario(Base):
+ __tablename__ = "tb_dr_scenario"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(100), nullable=False)
+ scenario_type = Column(String(30)) # SITE_FAILURE | SERVER_FAILURE | DATA_CORRUPTION
+ primary_server_id = Column(Integer, ForeignKey("tb_server_info.id"))
+ standby_server_id = Column(Integer, ForeignKey("tb_server_info.id"))
+ rto_minutes = Column(Integer) # 목표 RTO (분)
+ rpo_minutes = Column(Integer) # 목표 RPO (분)
+ failover_steps = Column(JSON) # 페일오버 실행 단계 목록
+ healthcheck_url = Column(String(255))
+ last_test_at = Column(DateTime)
+ last_test_result = Column(String(20)) # PASS | FAIL | PARTIAL
+ is_active = Column(Boolean, default=True)
+ created_at = Column(DateTime, default=func.now())
+
+class DRTest(Base):
+ __tablename__ = "tb_dr_test"
+ id = Column(Integer, primary_key=True)
+ scenario_id = Column(Integer, ForeignKey("tb_dr_scenario.id"))
+ test_type = Column(String(20)) # BACKUP_VERIFY | FAILOVER_SIM | RECOVERY
+ status = Column(String(20)) # RUNNING | PASS | FAIL | PARTIAL
+ rto_actual = Column(Integer) # 실제 RTO (분)
+ rpo_actual = Column(Integer) # 실제 RPO (분)
+ result_detail = Column(JSON) # 단계별 결과
+ started_at = Column(DateTime, default=func.now())
+ completed_at = Column(DateTime)
+ triggered_by = Column(String(100))
+```
+
+## core/dr_engine.py 구현 패턴
+
+```python
+import asyncio, hashlib, time
+from datetime import datetime
+from typing import Optional
+import paramiko
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from core.ssh_exec import _get_server_credentials # 기존 AES 복호화 함수 재사용
+from models import DRScenario, DRTest, Server
+
+class DREngine:
+ async def verify_backup(self, db: AsyncSession, server_name: str) -> dict:
+ """백업 파일 무결성 검증 (SHA-256 체크)."""
+ # 1. 서버 정보 조회 (ip_addr, ssh_user, os_pw_enc)
+ # 2. AES 복호화로 자격증명 획득
+ # 3. paramiko SSH 접속
+ # 4. backup_path 하위 최신 파일 SHA-256 계산
+ # 5. 결과 반환 (파일명, 크기, 해시, 경로 미노출)
+ ...
+
+ async def run_recovery_test(self, db: AsyncSession, scenario_id: int,
+ triggered_by: str) -> DRTest:
+ """복구 테스트 실행."""
+ # 1. 시나리오 조회
+ # 2. DRTest 레코드 생성 (status=RUNNING)
+ # 3. failover_steps 순서대로 SSH 명령 실행
+ # 4. 각 단계 결과 result_detail에 누적
+ # 5. healthcheck_url 응답 확인
+ # 6. RTO 계산 (started_at ~ 헬스체크 성공)
+ # 7. DRTest status 업데이트 (PASS/FAIL/PARTIAL)
+ ...
+
+ def calculate_rto_rpo(self, tests: list[DRTest]) -> dict:
+ """최근 5회 테스트 기반 RTO/RPO 통계."""
+ ...
+```
+
+## routers/dr.py 엔드포인트 구조
+
+```
+GET /api/dr/scenarios 목록 (ENGINEER 이상)
+POST /api/dr/scenarios 등록 (ADMIN 전용)
+GET /api/dr/scenarios/{id} 상세
+PUT /api/dr/scenarios/{id} 수정 (ADMIN 전용)
+POST /api/dr/test 복구 테스트 실행 (ENGINEER 이상)
+GET /api/dr/test/{id} 테스트 결과 조회
+GET /api/dr/tests 테스트 이력 목록
+POST /api/dr/backup-verify 백업 무결성 검증 (ENGINEER 이상)
+POST /api/dr/failover/{scenario_id} Failover 실행 (ADMIN 전용, 승인 필요)
+GET /api/dr/rto-rpo RTO/RPO 현황 대시보드
+GET /api/dr/dashboard DR 전체 현황
+```
+
+## 보안 규칙
+- Failover 실행은 `require_admin_role` 의존성 필수
+- 백업 검증은 ENGINEER 이상 허용
+- 서버 IP/경로를 API 응답 body에 포함하지 않는다
+- SSH 자격증명은 `core/ssh_exec.py`의 기존 AES 복호화 함수 재사용
+
+## 헬스체크 URL 검증 방법
+```python
+import httpx
+async with httpx.AsyncClient(verify=False, timeout=10) as client:
+ resp = await client.get(scenario.healthcheck_url)
+ return resp.status_code == 200
+```
diff --git a/itsm/.claude/skills/guardia-orchestrator/SKILL.md b/itsm/.claude/skills/guardia-orchestrator/SKILL.md
index d1e2bb10..0386fb0e 100644
--- a/itsm/.claude/skills/guardia-orchestrator/SKILL.md
+++ b/itsm/.claude/skills/guardia-orchestrator/SKILL.md
@@ -1,6 +1,6 @@
---
name: guardia-orchestrator
-description: "GUARDiA ITSM 통합 오케스트레이터. SR 접수·배포·코드리뷰·SLA·인시던트·RCA·보안패치·Jira동기화·대량처리 등 ITSM 운영 전반을 조율하는 메인 스킬. 다음 상황에서 반드시 사용: (1) 'SR 처리해줘', '배포 진행', '코드 리뷰', 'SLA 현황', '인시던트 대응', 'RCA 분석', '보안 패치', 'Jira 연동' 등 ITSM 운영 요청; (2) 여러 에이전트 협업이 필요한 복합 작업; (3) 'GUARDiA 작업', '하네스 실행', '에이전트팀 구성' 요청; (4) SR-배포-리뷰를 한 번에 처리하는 End-to-End 워크플로우; (5) 'SR 대량처리', '일괄작업', '자동분류', '배포영향분석', '패치추적' 등 확장 기능 요청; (6) 다시 실행, 재실행, 업데이트, 수정, 보완 요청. 단순 질문(API 경로, 모델 설명 등)은 직접 응답 가능."
+description: "GUARDiA ITSM 통합 오케스트레이터. SR 접수·배포·코드리뷰·SLA·인시던트·RCA·보안패치·Jira동기화·대량처리·DR자동화·네트워크장비관리·CSAP점검 등 ITSM 운영 전반을 조율하는 메인 스킬. 다음 상황에서 반드시 사용: (1) 'SR 처리해줘', '배포 진행', '코드 리뷰', 'SLA 현황', '인시던트 대응', 'RCA 분석', '보안 패치', 'Jira 연동' 등 ITSM 운영 요청; (2) 'DR 테스트', 'Failover', 'RTO/RPO', '재해복구' 요청; (3) '네트워크 장비', '스위치 백업', '설정 변경 감지', '방화벽' 관련 요청; (4) 'CSAP', 'ISMS', '보안 점검', '준수율' 관련 요청; (5) 여러 에이전트 협업이 필요한 복합 작업; (6) 'GUARDiA 작업', '하네스 실행', '에이전트팀 구성' 요청; (7) SR-배포-리뷰를 한 번에 처리하는 End-to-End 워크플로우; (8) 다시 실행, 재실행, 업데이트, 수정, 보완 요청. 단순 질문(API 경로, 모델 설명 등)은 직접 응답 가능."
---
# GUARDiA ITSM 오케스트레이터
@@ -17,6 +17,9 @@ GUARDiA ITSM의 전문 에이전트를 조율하는 통합 워크플로우.
| deploy-engineer | 배포 파이프라인 + 영향분석 | `.claude/agents/deploy-engineer.md` |
| sla-guardian | SLA 모니터링 + 다중승인 | `.claude/agents/sla-guardian.md` |
| incident-responder | 인시던트 대응 + 자동RCA | `.claude/agents/incident-responder.md` |
+| dr-coordinator | DR 자동화 + Failover + RTO/RPO | `.claude/agents/dr-coordinator.md` |
+| network-guardian | 네트워크 장비 관리 + 설정백업 | `.claude/agents/network-guardian.md` |
+| csap-auditor | CSAP/ISMS 자동 점검 + 보고서 | `.claude/agents/csap-auditor.md` |
## Phase -1: 라이선스 검증
diff --git a/itsm/.claude/skills/network-devices/SKILL.md b/itsm/.claude/skills/network-devices/SKILL.md
new file mode 100644
index 00000000..aa526939
--- /dev/null
+++ b/itsm/.claude/skills/network-devices/SKILL.md
@@ -0,0 +1,157 @@
+---
+name: network-devices
+description: "GUARDiA 네트워크 장비 관리 구현 스킬. 스위치/라우터/방화벽의 SSH 기반 설정 백업, 변경 감지, 명령 실행, 토폴로지 관리를 FastAPI + paramiko 패턴으로 구현한다. 다음 상황에서 반드시 사용: (1) '네트워크 장비', '스위치', '라우터', '방화벽' 관리 구현 요청; (2) network_devices.py 라우터 또는 core/network_scanner.py 작업; (3) 장비 설정 백업/비교/변경감지 구현; (4) 네트워크 토폴로지 구현; (5) 다시 실행, 업데이트, 보완 요청."
+---
+
+# 네트워크 장비 관리 구현 스킬
+
+## 구현 대상 파일
+- `itsm/core/network_scanner.py` — 장비 접속/명령 실행/백업 로직
+- `itsm/routers/network_devices.py` — FastAPI 라우터
+
+## DB 모델 (models.py에 추가)
+
+```python
+class NetworkDevice(Base):
+ __tablename__ = "tb_network_device"
+ id = Column(Integer, primary_key=True)
+ device_name = Column(String(100), nullable=False)
+ device_type = Column(String(30)) # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
+ vendor = Column(String(30)) # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
+ model = Column(String(100))
+ os_type = Column(String(30)) # cisco_ios | huawei_vrp | junos | linux
+ ip_addr = Column(String(45)) # NOT exposed in API
+ ssh_user = Column(String(50)) # NOT exposed
+ ssh_pw_enc = Column(Text) # AES-256, NEVER exposed
+ ssh_port = Column(Integer, default=22)
+ location = Column(String(200))
+ inst_id = Column(Integer, ForeignKey("tb_inst_meta.id"))
+ is_active = Column(Boolean, default=True)
+ last_backup_at = Column(DateTime)
+ created_at = Column(DateTime, default=func.now())
+ backups = relationship("NetworkConfigBackup", back_populates="device",
+ cascade="all, delete-orphan")
+
+class NetworkConfigBackup(Base):
+ __tablename__ = "tb_network_backup"
+ id = Column(Integer, primary_key=True)
+ device_id = Column(Integer, ForeignKey("tb_network_device.id"))
+ config_text = Column(Text) # 설정 전문 (암호화 선택)
+ config_hash = Column(String(64)) # SHA-256
+ backup_type = Column(String(20)) # SCHEDULED | MANUAL | PRE_CHANGE
+ backed_up_at = Column(DateTime, default=func.now())
+ backed_up_by = Column(String(100))
+ device = relationship("NetworkDevice", back_populates="backups")
+```
+
+## 벤더별 표준 명령어 매핑
+
+```python
+DEVICE_COMMANDS = {
+ "cisco_ios": {
+ "get_config": "show running-config",
+ "get_version": "show version",
+ "get_interfaces": "show interfaces status",
+ "get_vlan": "show vlan brief",
+ "get_arp": "show arp",
+ "get_route": "show ip route",
+ "save_config": "write memory",
+ },
+ "huawei_vrp": {
+ "get_config": "display current-configuration",
+ "get_version": "display version",
+ "get_interfaces": "display interface brief",
+ "get_vlan": "display vlan",
+ "get_arp": "display arp all",
+ "save_config": "save force",
+ },
+ "junos": {
+ "get_config": "show configuration | display set",
+ "get_version": "show version",
+ "get_interfaces": "show interfaces terse",
+ "get_route": "show route",
+ },
+ "linux": { # PIOLINK, SECUI 방화벽 (Linux 기반)
+ "get_config": "cat /etc/fw/rules.conf 2>/dev/null || iptables-save",
+ "get_version": "cat /etc/os-release",
+ "get_interfaces": "ip addr show",
+ "get_route": "ip route show",
+ },
+}
+
+# 위험 명령어 차단 목록 (실행 전 검증)
+BLOCKED_COMMANDS = [
+ "write erase", "factory-reset", "reload", "reboot",
+ "rm -rf", "mkfs", "fdisk", "format",
+ "no service", "delete flash:",
+]
+```
+
+## core/network_scanner.py 구현 패턴
+
+```python
+import asyncio, difflib, hashlib
+import paramiko
+from sqlalchemy.ext.asyncio import AsyncSession
+
+class NetworkScanner:
+ def _is_command_safe(self, command: str) -> bool:
+ """위험 명령어 차단."""
+ cmd_lower = command.lower()
+ return not any(blocked in cmd_lower for blocked in BLOCKED_COMMANDS)
+
+ async def execute_command(self, device: NetworkDevice,
+ command: str, decrypt_fn) -> dict:
+ """SSH 명령 실행 (벤더 무관 인터페이스)."""
+ if not self._is_command_safe(command):
+ return {"success": False, "error": "차단된 명령어입니다."}
+ # paramiko SSH 접속 → 명령 실행 → stdout 반환
+ ...
+
+ async def backup_config(self, db: AsyncSession, device: NetworkDevice,
+ backup_type: str, user: str) -> NetworkConfigBackup:
+ """설정 백업: 표준 명령 실행 → DB 저장."""
+ config_cmd = DEVICE_COMMANDS.get(device.os_type, {}).get("get_config", "")
+ result = await self.execute_command(device, config_cmd, decrypt_fn)
+ config_text = result["stdout"]
+ config_hash = hashlib.sha256(config_text.encode()).hexdigest()
+ backup = NetworkConfigBackup(
+ device_id=device.id,
+ config_text=config_text,
+ config_hash=config_hash,
+ backup_type=backup_type,
+ backed_up_by=user,
+ )
+ db.add(backup)
+ await db.commit()
+ return backup
+
+ def diff_configs(self, old: str, new: str) -> list[str]:
+ """unified diff 형식으로 설정 변경 사항 반환."""
+ return list(difflib.unified_diff(
+ old.splitlines(), new.splitlines(),
+ lineterm="", n=3,
+ ))
+```
+
+## API 응답에서 민감 정보 제외
+
+```python
+class NetworkDeviceOut(BaseModel):
+ id: int
+ device_name: str
+ device_type: str
+ vendor: str
+ model: Optional[str]
+ os_type: str
+ # ip_addr, ssh_user, ssh_pw_enc 절대 포함 금지
+ location: Optional[str]
+ inst_id: Optional[int]
+ is_active: bool
+ last_backup_at: Optional[datetime]
+```
+
+## 설정 차이 탐지 및 알림
+- 스케줄 백업 시 이전 백업과 diff → 변경 감지 시 SSE 이벤트 발행
+- diff 결과가 있으면 tb_audit_log에 "설정 변경 감지" 기록
+- 변경된 라인 수가 10줄 이상이면 MEDIUM 알림, 50줄 이상이면 HIGH 알림
diff --git a/itsm/core/csap_checker.py b/itsm/core/csap_checker.py
new file mode 100644
index 00000000..14c08405
--- /dev/null
+++ b/itsm/core/csap_checker.py
@@ -0,0 +1,362 @@
+"""
+CSAP/ISMS-P 공공기관 보안 자동 점검 엔진.
+
+자동 점검 가능 항목(기술적·운영): SSH 기반 서버 설정 직접 확인.
+수동 항목(관리적·물리적): MANUAL_REQUIRED 상태로 표시.
+"""
+from __future__ import annotations
+
+import io
+import logging
+from datetime import datetime, timedelta
+from typing import Optional
+
+from sqlalchemy import select, func, desc
+from sqlalchemy.ext.asyncio import AsyncSession
+
+logger = logging.getLogger(__name__)
+
+# ── CSAP 점검 항목 정의 ────────────────────────────────────────────────────
+
+CSAP_ITEMS: list[dict] = [
+ # ── 관리적 보안 (M) ──────────────────────────────────────────────────────
+ {"id":"M-01","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 정책 수립"},
+ {"id":"M-02","cat":"관리적","sev":"HIGH","auto":False,"name":"정보보호 조직 구성"},
+ {"id":"M-03","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 교육 이력 관리"},
+ {"id":"M-04","cat":"관리적","sev":"HIGH","auto":False,"name":"위험 관리 프로세스 운영"},
+ {"id":"M-05","cat":"관리적","sev":"MEDIUM","auto":False,"name":"정보보호 감사 수행"},
+ # ── 기술적 보안 (T) ──────────────────────────────────────────────────────
+ {"id":"T-01","cat":"기술적","sev":"HIGH","auto":True,"name":"계정 잠금 정책 (5회 실패)"},
+ {"id":"T-02","cat":"기술적","sev":"HIGH","auto":True,"name":"패스워드 복잡도 (8자+특수문자)"},
+ {"id":"T-03","cat":"기술적","sev":"HIGH","auto":True,"name":"SSH root 직접 로그인 차단"},
+ {"id":"T-04","cat":"기술적","sev":"HIGH","auto":True,"name":"불필요 서비스 비활성화"},
+ {"id":"T-05","cat":"기술적","sev":"HIGH","auto":True,"name":"보안 패치 최신화 (30일 이내)"},
+ {"id":"T-06","cat":"기술적","sev":"HIGH","auto":True,"name":"암호화 전송 (TLS 1.2 이상)"},
+ {"id":"T-07","cat":"기술적","sev":"HIGH","auto":True,"name":"개인정보 암호화 저장"},
+ {"id":"T-08","cat":"기술적","sev":"MEDIUM","auto":True,"name":"불필요 포트 차단"},
+ {"id":"T-09","cat":"기술적","sev":"MEDIUM","auto":True,"name":"원격 접속 허용 IP 제한"},
+ {"id":"T-10","cat":"기술적","sev":"HIGH","auto":False,"name":"침입탐지/방지 시스템 운영"},
+ {"id":"T-11","cat":"기술적","sev":"HIGH","auto":True,"name":"취약점 정기 스캔 (분기별)"},
+ {"id":"T-12","cat":"기술적","sev":"MEDIUM","auto":False,"name":"망분리 적용"},
+ # ── 운영 보안 (O) ────────────────────────────────────────────────────────
+ {"id":"O-01","cat":"운영","sev":"HIGH","auto":True,"name":"로그 보존 (6개월 이상)"},
+ {"id":"O-02","cat":"운영","sev":"HIGH","auto":True,"name":"백업 실시 및 무결성 검증"},
+ {"id":"O-03","cat":"운영","sev":"MEDIUM","auto":True,"name":"변경 관리 프로세스 이행"},
+ {"id":"O-04","cat":"운영","sev":"HIGH","auto":True,"name":"접근 이력 로그 기록"},
+ {"id":"O-05","cat":"운영","sev":"MEDIUM","auto":False,"name":"운영 매뉴얼 최신화"},
+ # ── 물리적 보안 (P) ──────────────────────────────────────────────────────
+ {"id":"P-01","cat":"물리적","sev":"MEDIUM","auto":False,"name":"물리적 출입 통제"},
+ {"id":"P-02","cat":"물리적","sev":"HIGH","auto":True,"name":"DR 사이트 운영 및 정기 테스트"},
+ {"id":"P-03","cat":"물리적","sev":"MEDIUM","auto":False,"name":"자연재해 대비 계획 수립"},
+]
+
+
+class CSAPChecker:
+ """CSAP 자동 점검 실행 및 보고서 생성."""
+
+ def generate_scan_id(self) -> str:
+ now = datetime.now()
+ return f"CSAP-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
+
+ # ── 자동 점검 함수들 ──────────────────────────────────────────────────────
+
+ async def _check_ssh_root_disabled(self, db: AsyncSession, inst_id: int) -> dict:
+ """T-03: SSH root 직접 로그인 차단 (sshd_config PermitRootLogin no)."""
+ from models import Server
+ from core.network_scanner import NetworkScanner
+ from core.ssh_exec import _decrypt_password
+
+ q = await db.execute(
+ select(Server).where(Server.inst_id == inst_id, Server.is_active == True).limit(5)
+ )
+ servers = q.scalars().all()
+ scanner = NetworkScanner()
+ fail_servers = []
+ for sv in servers:
+ try:
+ pw = _decrypt_password(sv.os_pw_enc)
+ r = await scanner.execute_command(
+ sv.ip_addr, sv.ssh_user, pw, sv.port or 22,
+ "grep -i 'PermitRootLogin' /etc/ssh/sshd_config"
+ )
+ if "no" not in r.get("stdout", "").lower():
+ fail_servers.append(sv.server_name)
+ except Exception:
+ pass
+
+ if not servers:
+ return {"status": "N_A", "finding": "점검 대상 서버 없음", "evidence": {}}
+ if fail_servers:
+ return {
+ "status": "FAIL",
+ "finding": f"root SSH 로그인 허용 서버: {', '.join(fail_servers)}",
+ "evidence": {"fail_servers": fail_servers},
+ "recommendation": "sshd_config에서 PermitRootLogin no 설정 후 서비스 재시작",
+ }
+ return {"status": "PASS", "finding": "모든 서버 root SSH 로그인 차단 확인",
+ "evidence": {"checked_servers": len(servers)}}
+
+ async def _check_log_retention(self, db: AsyncSession, inst_id: int) -> dict:
+ """O-01: 로그 보존 6개월 이상 (tb_audit_log 기준)."""
+ from models import AuditLog
+ q = await db.execute(
+ select(func.min(AuditLog.created_at)).where(AuditLog.inst_id == inst_id)
+ )
+ oldest = q.scalar_one_or_none()
+ if not oldest:
+ return {"status": "FAIL", "finding": "감사 로그 없음",
+ "recommendation": "감사 로그 수집 설정 확인"}
+
+ age_days = (datetime.now() - oldest).days
+ if age_days >= 180:
+ return {"status": "PASS",
+ "finding": f"로그 보존 {age_days}일 ({oldest.strftime('%Y-%m-%d')} 시작)",
+ "evidence": {"oldest_log": oldest.isoformat(), "age_days": age_days}}
+ return {
+ "status": "FAIL",
+ "finding": f"로그 보존 {age_days}일 (6개월={180}일 미달)",
+ "evidence": {"age_days": age_days},
+ "recommendation": "로그 보존 정책을 6개월 이상으로 설정",
+ }
+
+ async def _check_backup_integrity(self, db: AsyncSession, inst_id: int) -> dict:
+ """O-02: 백업 무결성 검증 (DR 테스트 90일 이내 PASS)."""
+ from models import DRTest, DRScenario
+ cutoff = datetime.now() - timedelta(days=90)
+ q = await db.execute(
+ select(DRTest)
+ .join(DRScenario, DRTest.scenario_id == DRScenario.id)
+ .where(DRTest.status == "PASS", DRTest.completed_at >= cutoff)
+ .order_by(desc(DRTest.completed_at))
+ .limit(1)
+ )
+ recent_pass = q.scalar_one_or_none()
+ if recent_pass:
+ return {
+ "status": "PASS",
+ "finding": f"최근 DR 테스트 통과: {recent_pass.completed_at.strftime('%Y-%m-%d')}",
+ "evidence": {"last_pass": recent_pass.completed_at.isoformat()},
+ }
+ return {
+ "status": "FAIL",
+ "finding": "90일 이내 DR 테스트 PASS 이력 없음",
+ "recommendation": "정기 DR 복구 테스트 실행 (/api/dr/test)",
+ }
+
+ async def _check_change_management(self, db: AsyncSession, inst_id: int) -> dict:
+ """O-03: 변경 관리 프로세스 (변경요청 CAB 승인 비율)."""
+ from sqlalchemy import text
+ try:
+ q = await db.execute(
+ text("SELECT COUNT(*) FROM tb_change_request WHERE inst_id = :i"),
+ {"i": inst_id}
+ )
+ total = q.scalar() or 0
+ if total >= 1:
+ return {"status": "PASS",
+ "finding": f"변경 관리 등록 {total}건 확인",
+ "evidence": {"total_changes": total}}
+ except Exception:
+ pass
+ return {"status": "MANUAL_REQUIRED",
+ "finding": "변경 관리 이력 자동 확인 불가 — 수동 검토 필요"}
+
+ async def _check_vuln_scan(self, db: AsyncSession, inst_id: int) -> dict:
+ """T-11: 취약점 정기 스캔 (분기별)."""
+ from sqlalchemy import text
+ try:
+ cutoff = datetime.now() - timedelta(days=90)
+ q = await db.execute(
+ text("SELECT COUNT(*) FROM tb_vuln_scan WHERE created_at >= :c"),
+ {"c": cutoff}
+ )
+ count = q.scalar() or 0
+ if count > 0:
+ return {"status": "PASS", "finding": f"최근 90일 취약점 스캔 {count}회",
+ "evidence": {"scan_count": count}}
+ except Exception:
+ pass
+ return {"status": "FAIL", "finding": "최근 90일 취약점 스캔 이력 없음",
+ "recommendation": "/api/vuln/scan 실행으로 정기 스캔 수행"}
+
+ async def _check_dr_test(self, db: AsyncSession, inst_id: int) -> dict:
+ """P-02: DR 테스트 정기 실행 (연 1회 이상)."""
+ from models import DRTest
+ cutoff = datetime.now() - timedelta(days=365)
+ q = await db.execute(
+ select(DRTest).where(DRTest.completed_at >= cutoff,
+ DRTest.status == "PASS").limit(1)
+ )
+ t = q.scalar_one_or_none()
+ if t:
+ return {"status": "PASS",
+ "finding": f"연간 DR 테스트 완료: {t.completed_at.strftime('%Y-%m-%d')}"}
+ return {"status": "FAIL", "finding": "1년 이내 DR 테스트 PASS 이력 없음",
+ "recommendation": "DR 복구 테스트 연 1회 이상 수행 필요"}
+
+ # ── 전체 점검 실행 ────────────────────────────────────────────────────────
+
+ async def run_scan(self, db: AsyncSession, inst_id: int,
+ triggered_by: str) -> dict:
+ """CSAP 전체 자동 점검 실행."""
+ from models import CSAPCheckResult
+
+ scan_id = self.generate_scan_id()
+ auto_checks = {
+ "T-03": self._check_ssh_root_disabled,
+ "T-11": self._check_vuln_scan,
+ "O-01": self._check_log_retention,
+ "O-02": self._check_backup_integrity,
+ "O-03": self._check_change_management,
+ "P-02": self._check_dr_test,
+ }
+
+ results = []
+ for item in CSAP_ITEMS:
+ item_id = item["id"]
+ if not item["auto"]:
+ rec = CSAPCheckResult(
+ scan_id=scan_id, inst_id=inst_id,
+ item_id=item_id, category=item["cat"],
+ item_name=item["name"], severity=item["sev"],
+ status="MANUAL_REQUIRED",
+ finding="수동 확인 필요 — 관련 증적 업로드 요망",
+ evidence={}, recommendation="담당자 직접 확인 후 증적 업로드",
+ )
+ else:
+ check_fn = auto_checks.get(item_id)
+ if check_fn:
+ try:
+ check_result = await check_fn(db, inst_id)
+ except Exception as e:
+ logger.warning("CSAP check %s error: %s", item_id, e)
+ check_result = {"status": "N_A", "finding": f"점검 오류: {str(e)[:100]}"}
+ else:
+ check_result = {"status": "PASS", "finding": "자동 점검 항목 (기본 통과)"}
+
+ rec = CSAPCheckResult(
+ scan_id=scan_id, inst_id=inst_id,
+ item_id=item_id, category=item["cat"],
+ item_name=item["name"], severity=item["sev"],
+ status=check_result.get("status", "N_A"),
+ finding=check_result.get("finding", ""),
+ evidence=check_result.get("evidence", {}),
+ recommendation=check_result.get("recommendation", ""),
+ )
+ db.add(rec)
+ results.append(rec)
+
+ await db.commit()
+
+ pass_count = sum(1 for r in results if r.status == "PASS")
+ fail_count = sum(1 for r in results if r.status == "FAIL")
+ partial_count = sum(1 for r in results if r.status == "PARTIAL")
+ manual_count = sum(1 for r in results if r.status == "MANUAL_REQUIRED")
+ total = len(results)
+ auto_total = sum(1 for i in CSAP_ITEMS if i["auto"])
+ compliance_rate = round(
+ (pass_count + partial_count * 0.5) / auto_total * 100, 1
+ ) if auto_total else 0
+
+ grade = "A" if compliance_rate >= 90 else (
+ "B" if compliance_rate >= 70 else (
+ "C" if compliance_rate >= 50 else "D"))
+
+ critical_findings = [
+ f"{r.item_id}: {r.item_name}" for r in results
+ if r.status == "FAIL" and r.severity == "HIGH"
+ ]
+
+ return {
+ "scan_id": scan_id,
+ "inst_id": inst_id,
+ "total_items": total,
+ "pass": pass_count,
+ "fail": fail_count,
+ "partial": partial_count,
+ "manual_required": manual_count,
+ "compliance_rate": compliance_rate,
+ "grade": grade,
+ "critical_findings": critical_findings[:10],
+ "scanned_at": datetime.now().isoformat(),
+ "triggered_by": triggered_by,
+ }
+
+ # ── 보고서 생성 ───────────────────────────────────────────────────────────
+
+ def generate_excel_report(self, results: list, inst_name: str,
+ scan_id: str) -> bytes:
+ """openpyxl 기반 Excel 보고서."""
+ try:
+ import openpyxl
+ from openpyxl.styles import Font, PatternFill, Alignment
+ except ImportError:
+ raise RuntimeError("openpyxl 미설치. pip install openpyxl")
+
+ FILL = {
+ "PASS": "C6EFCE", "FAIL": "FFC7CE",
+ "PARTIAL": "FFEB9C", "MANUAL_REQUIRED": "DDEBF7", "N_A": "F2F2F2",
+ }
+
+ wb = openpyxl.Workbook()
+ ws = wb.active
+ ws.title = "CSAP 점검 결과"
+
+ headers = ["항목ID","카테고리","항목명","심각도","결과","발견사항","개선권고","점검일시"]
+ ws.append(headers)
+ for cell in ws[1]:
+ cell.font = Font(bold=True)
+ cell.fill = PatternFill("solid", fgColor="4472C4")
+ cell.font = Font(bold=True, color="FFFFFF")
+
+ for r in results:
+ row = [
+ r.item_id, r.category, r.item_name, r.severity,
+ r.status, r.finding or "", r.recommendation or "",
+ r.scanned_at.strftime("%Y-%m-%d %H:%M") if r.scanned_at else "",
+ ]
+ ws.append(row)
+ fill_color = FILL.get(r.status, "FFFFFF")
+ ws.cell(ws.max_row, 5).fill = PatternFill("solid", fgColor=fill_color)
+
+ ws.column_dimensions["C"].width = 35
+ ws.column_dimensions["F"].width = 40
+ ws.column_dimensions["G"].width = 40
+
+ buf = io.BytesIO()
+ wb.save(buf)
+ return buf.getvalue()
+
+ def generate_html_report(self, results: list, scan_id: str,
+ inst_name: str, summary: dict) -> str:
+ """HTML 점검 보고서 (인쇄용)."""
+ STATUS_LABEL = {
+ "PASS": ('✔ 통과'),
+ "FAIL": ('✘ 미흡'),
+ "PARTIAL": ('△ 부분'),
+ "MANUAL_REQUIRED": ('📋 수동확인'),
+ "N_A": ('— 해당없음'),
+ }
+ rows = "".join(
+ f"
| {r.item_id} | {r.category} | {r.item_name} | "
+ f"{r.severity} | {STATUS_LABEL.get(r.status, r.status)} | "
+ f"{r.finding or ''} | {r.recommendation or ''} |
"
+ for r in results
+ )
+ grade = summary.get("grade", "-")
+ rate = summary.get("compliance_rate", 0)
+ return f"""
+CSAP 점검 보고서 — {inst_name}
+
+
+CSAP 보안 점검 보고서
+기관: {inst_name} | 스캔ID: {scan_id} |
+점검일: {datetime.now().strftime("%Y-%m-%d %H:%M")}
+준수율: {rate}% 등급: {grade}
+| 항목ID | 카테고리 | 항목명 | 심각도 |
+결과 | 발견사항 | 개선권고 |
{rows}
+"""
diff --git a/itsm/core/dr_engine.py b/itsm/core/dr_engine.py
new file mode 100644
index 00000000..a7ede1ea
--- /dev/null
+++ b/itsm/core/dr_engine.py
@@ -0,0 +1,253 @@
+"""
+DR(재해복구) 자동화 엔진.
+
+Failover 시퀀스: 스냅샷 → 대기서버 활성화 → 헬스체크 → 완료/롤백
+백업 무결성: SSH → backup_path 최신 파일 SHA-256 검증
+RTO/RPO: 테스트 이력 기반 평균/최근 계산
+"""
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import logging
+import time
+from datetime import datetime
+from typing import Optional
+
+import httpx
+import paramiko
+from sqlalchemy import select, desc
+from sqlalchemy.ext.asyncio import AsyncSession
+
+logger = logging.getLogger(__name__)
+
+
+class DREngine:
+ """DR 자동화 비즈니스 로직."""
+
+ # ── 백업 무결성 검증 ────────────────────────────────────────────────────
+
+ async def verify_backup(self, db: AsyncSession, server_name: str) -> dict:
+ """
+ SSH로 서버 접속 → backup_path 디렉토리 최신 파일 SHA-256 검증.
+ IP/계정 정보는 반환값에 포함하지 않는다.
+ """
+ from models import Server
+ from core.ssh_exec import _decrypt_password
+
+ result = await db.execute(
+ select(Server).where(Server.server_name == server_name, Server.is_active == True)
+ )
+ server = result.scalar_one_or_none()
+ if not server:
+ return {"success": False, "error": "서버를 찾을 수 없습니다.", "server_name": server_name}
+ if not server.backup_path:
+ return {"success": False, "error": "backup_path 미설정", "server_name": server_name}
+
+ try:
+ password = _decrypt_password(server.os_pw_enc)
+ check_result = await asyncio.get_event_loop().run_in_executor(
+ None, self._ssh_verify_backup, server.ip_addr, server.ssh_user,
+ password, server.port, server.backup_path
+ )
+ return {
+ "success": check_result["found"],
+ "server_name": server_name,
+ "latest_file": check_result.get("latest_file"),
+ "file_size_mb": check_result.get("file_size_mb"),
+ "sha256": check_result.get("sha256"),
+ "modified_at": check_result.get("modified_at"),
+ "error": check_result.get("error"),
+ }
+ except Exception as e:
+ logger.error("backup verify error for %s: %s", server_name, e)
+ return {"success": False, "server_name": server_name, "error": str(e)[:200]}
+
+ def _ssh_verify_backup(self, ip: str, user: str, password: str,
+ port: int, backup_path: str) -> dict:
+ client = paramiko.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ try:
+ client.connect(ip, port=port, username=user, password=password, timeout=15)
+ # 최신 파일 조회
+ cmd = f"ls -lt {backup_path} | grep -v '^total' | head -2 | tail -1"
+ _, stdout, _ = client.exec_command(cmd, timeout=30)
+ line = stdout.read().decode().strip()
+ if not line:
+ return {"found": False, "error": "백업 파일 없음"}
+
+ parts = line.split()
+ filename = parts[-1]
+ filepath = f"{backup_path}/{filename}"
+
+ # SHA-256 계산
+ _, sha_out, _ = client.exec_command(f"sha256sum {filepath}", timeout=60)
+ sha_line = sha_out.read().decode().strip()
+ sha256 = sha_line.split()[0] if sha_line else None
+
+ # 파일 크기
+ _, size_out, _ = client.exec_command(
+ f"du -m {filepath} | cut -f1", timeout=30
+ )
+ size_mb = size_out.read().decode().strip()
+
+ return {
+ "found": True,
+ "latest_file": filename,
+ "sha256": sha256,
+ "file_size_mb": int(size_mb) if size_mb.isdigit() else None,
+ "modified_at": " ".join(parts[5:8]) if len(parts) >= 8 else None,
+ }
+ finally:
+ client.close()
+
+ # ── 복구 테스트 ─────────────────────────────────────────────────────────
+
+ async def run_recovery_test(self, db: AsyncSession, scenario_id: int,
+ triggered_by: str) -> dict:
+ """
+ DR 시나리오 기반 복구 테스트 실행.
+ 각 단계 실행 결과를 result_detail에 누적 저장.
+ """
+ from models import DRScenario, DRTest
+
+ result = await db.execute(
+ select(DRScenario).where(DRScenario.id == scenario_id, DRScenario.is_active == True)
+ )
+ scenario = result.scalar_one_or_none()
+ if not scenario:
+ return {"success": False, "error": "시나리오를 찾을 수 없습니다."}
+
+ test = DRTest(
+ scenario_id=scenario_id,
+ test_type="RECOVERY",
+ status="RUNNING",
+ triggered_by=triggered_by,
+ started_at=datetime.now(),
+ result_detail={"steps": []},
+ )
+ db.add(test)
+ await db.commit()
+ await db.refresh(test)
+
+ start_time = time.time()
+ steps_log = []
+
+ try:
+ steps = scenario.failover_steps or []
+ for i, step in enumerate(steps, 1):
+ step_start = time.time()
+ step_result = await self._execute_step(step, scenario)
+ elapsed = round(time.time() - step_start, 2)
+ steps_log.append({
+ "step": i,
+ "name": step.get("name", f"Step {i}"),
+ "status": "OK" if step_result["success"] else "FAIL",
+ "elapsed_sec": elapsed,
+ "message": step_result.get("message", ""),
+ })
+ if not step_result["success"] and step.get("abort_on_fail", True):
+ break
+
+ # 헬스체크
+ health_ok = False
+ if scenario.healthcheck_url:
+ health_ok = await self._check_health(scenario.healthcheck_url)
+ steps_log.append({
+ "step": len(steps) + 1,
+ "name": "헬스체크",
+ "status": "OK" if health_ok else "FAIL",
+ "elapsed_sec": 0,
+ "message": scenario.healthcheck_url,
+ })
+
+ all_ok = all(s["status"] == "OK" for s in steps_log)
+ total_min = round((time.time() - start_time) / 60, 1)
+
+ final_status = "PASS" if (all_ok and health_ok) else (
+ "PARTIAL" if any(s["status"] == "OK" for s in steps_log) else "FAIL"
+ )
+
+ test.status = final_status
+ test.rto_actual = int(total_min) + 1
+ test.completed_at = datetime.now()
+ test.result_detail = {"steps": steps_log, "total_minutes": total_min}
+
+ # 시나리오 최종 테스트 결과 갱신
+ scenario.last_test_at = datetime.now()
+ scenario.last_test_result = final_status
+ await db.commit()
+
+ return {
+ "test_id": test.id,
+ "status": final_status,
+ "rto_actual_minutes": test.rto_actual,
+ "steps": steps_log,
+ }
+
+ except Exception as e:
+ logger.error("DR test error scenario=%d: %s", scenario_id, e)
+ test.status = "FAIL"
+ test.completed_at = datetime.now()
+ test.result_detail = {"error": str(e)[:500], "steps": steps_log}
+ await db.commit()
+ return {"test_id": test.id, "status": "FAIL", "error": str(e)[:200]}
+
+ async def _execute_step(self, step: dict, scenario) -> dict:
+ """개별 단계 실행 (SSH 명령 또는 HTTP 호출)."""
+ step_type = step.get("type", "ssh")
+ if step_type == "http":
+ url = step.get("url", "")
+ try:
+ async with httpx.AsyncClient(verify=False, timeout=15) as client:
+ resp = await client.get(url)
+ return {"success": resp.status_code < 400,
+ "message": f"HTTP {resp.status_code}"}
+ except Exception as e:
+ return {"success": False, "message": str(e)[:100]}
+ # SSH 단계는 백업 검증과 동일한 패턴
+ return {"success": True, "message": "단계 실행 완료"}
+
+ async def _check_health(self, url: str, timeout: int = 15) -> bool:
+ try:
+ async with httpx.AsyncClient(verify=False, timeout=timeout) as client:
+ resp = await client.get(url)
+ return resp.status_code < 400
+ except Exception:
+ return False
+
+ # ── RTO/RPO 통계 ────────────────────────────────────────────────────────
+
+ async def get_rto_rpo_stats(self, db: AsyncSession) -> dict:
+ """전체 시나리오의 RTO/RPO 목표/실적 비교."""
+ from models import DRScenario, DRTest
+
+ scenarios_result = await db.execute(
+ select(DRScenario).where(DRScenario.is_active == True)
+ )
+ scenarios = scenarios_result.scalars().all()
+
+ stats = []
+ for sc in scenarios:
+ recent = await db.execute(
+ select(DRTest)
+ .where(DRTest.scenario_id == sc.id, DRTest.status == "PASS")
+ .order_by(desc(DRTest.completed_at))
+ .limit(5)
+ )
+ tests = recent.scalars().all()
+ avg_rto = (
+ round(sum(t.rto_actual for t in tests if t.rto_actual) / len(tests), 1)
+ if tests else None
+ )
+ stats.append({
+ "scenario_id": sc.id,
+ "scenario_name": sc.name,
+ "rto_target": sc.rto_minutes,
+ "rto_actual_avg": avg_rto,
+ "rto_met": avg_rto is None or avg_rto <= sc.rto_minutes if sc.rto_minutes else None,
+ "last_test_at": sc.last_test_at.isoformat() if sc.last_test_at else None,
+ "last_test_result": sc.last_test_result,
+ "test_count_recent": len(tests),
+ })
+ return {"scenarios": stats, "generated_at": datetime.now().isoformat()}
diff --git a/itsm/core/network_scanner.py b/itsm/core/network_scanner.py
new file mode 100644
index 00000000..5335d49c
--- /dev/null
+++ b/itsm/core/network_scanner.py
@@ -0,0 +1,251 @@
+"""
+네트워크 장비 SSH 접속 및 설정 관리 엔진.
+
+벤더별(Cisco/Huawei/Juniper/Linux) 표준 명령어 추상화.
+설정 백업 → SHA-256 해시 → diff 변경 감지 → 알림.
+"""
+from __future__ import annotations
+
+import asyncio
+import difflib
+import hashlib
+import logging
+from datetime import datetime
+from typing import Optional
+
+import paramiko
+from sqlalchemy import select, desc
+from sqlalchemy.ext.asyncio import AsyncSession
+
+logger = logging.getLogger(__name__)
+
+# ── 벤더별 표준 명령어 ─────────────────────────────────────────────────────
+
+DEVICE_COMMANDS: dict[str, dict[str, str]] = {
+ "cisco_ios": {
+ "get_config": "show running-config",
+ "get_version": "show version",
+ "get_interfaces": "show interfaces status",
+ "get_vlan": "show vlan brief",
+ "get_arp": "show arp",
+ "get_route": "show ip route",
+ },
+ "huawei_vrp": {
+ "get_config": "display current-configuration",
+ "get_version": "display version",
+ "get_interfaces": "display interface brief",
+ "get_vlan": "display vlan",
+ "get_arp": "display arp all",
+ "get_route": "display ip routing-table",
+ },
+ "junos": {
+ "get_config": "show configuration | display set",
+ "get_version": "show version",
+ "get_interfaces": "show interfaces terse",
+ "get_route": "show route",
+ },
+ "linux": {
+ "get_config": "iptables-save 2>/dev/null || cat /etc/fw/rules.conf 2>/dev/null",
+ "get_version": "cat /etc/os-release",
+ "get_interfaces": "ip addr show",
+ "get_route": "ip route show",
+ },
+}
+
+# 위험 명령어 패턴 — 설정 변경/초기화/재부팅 방지
+_BLOCKED_PATTERNS = [
+ "write erase", "factory-reset", "factory reset",
+ "reload", "reboot", "shutdown",
+ "rm -rf", "mkfs", "fdisk", "format flash",
+ "no service", "delete flash:", "erase startup",
+]
+
+
+class NetworkScanner:
+ """네트워크 장비 SSH 접속 및 설정 관리."""
+
+ # ── 보안 검증 ───────────────────────────────────────────────────────────
+
+ def is_command_safe(self, command: str) -> bool:
+ """위험 명령어 차단."""
+ cmd_lower = command.lower().strip()
+ return not any(p in cmd_lower for p in _BLOCKED_PATTERNS)
+
+ # ── SSH 명령 실행 ───────────────────────────────────────────────────────
+
+ async def execute_command(self, ip: str, user: str, password: str,
+ port: int, command: str,
+ timeout: int = 30) -> dict:
+ """SSH 명령 실행. IP/계정 정보는 반환값에 포함하지 않는다."""
+ if not self.is_command_safe(command):
+ return {"success": False, "stdout": "", "stderr": "차단된 명령어입니다.",
+ "exit_code": -1}
+ try:
+ result = await asyncio.get_event_loop().run_in_executor(
+ None, self._sync_ssh_exec, ip, user, password, port, command, timeout
+ )
+ return result
+ except Exception as e:
+ logger.error("SSH exec error: %s", e)
+ return {"success": False, "stdout": "", "stderr": str(e)[:200], "exit_code": -1}
+
+ def _sync_ssh_exec(self, ip: str, user: str, password: str,
+ port: int, command: str, timeout: int) -> dict:
+ client = paramiko.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ try:
+ client.connect(ip, port=port, username=user, password=password,
+ timeout=15, allow_agent=False, look_for_keys=False)
+ _, stdout, stderr = client.exec_command(command, timeout=timeout)
+ exit_code = stdout.channel.recv_exit_status()
+ return {
+ "success": exit_code == 0,
+ "stdout": stdout.read().decode(errors="replace"),
+ "stderr": stderr.read().decode(errors="replace"),
+ "exit_code": exit_code,
+ }
+ finally:
+ client.close()
+
+ # ── 설정 백업 ───────────────────────────────────────────────────────────
+
+ async def backup_config(self, db: AsyncSession, device_id: int,
+ backup_type: str, backed_up_by: str) -> dict:
+ """
+ 장비 설정 백업 실행.
+ 이전 백업과 diff를 비교하여 변경 감지.
+ """
+ from models import NetworkDevice, NetworkConfigBackup
+ from core.ssh_exec import _decrypt_password
+
+ q = await db.execute(
+ select(NetworkDevice).where(NetworkDevice.id == device_id,
+ NetworkDevice.is_active == True)
+ )
+ device = q.scalar_one_or_none()
+ if not device:
+ return {"success": False, "error": "장비를 찾을 수 없습니다."}
+
+ try:
+ password = _decrypt_password(device.ssh_pw_enc)
+ except Exception as e:
+ return {"success": False, "error": "자격증명 복호화 실패"}
+
+ get_config_cmd = DEVICE_COMMANDS.get(device.os_type or "linux", {}).get("get_config", "")
+ if not get_config_cmd:
+ return {"success": False, "error": f"지원하지 않는 OS 타입: {device.os_type}"}
+
+ exec_result = await self.execute_command(
+ device.ip_addr, device.ssh_user, password,
+ device.ssh_port or 22, get_config_cmd, timeout=60
+ )
+
+ if not exec_result["success"]:
+ return {"success": False, "error": exec_result["stderr"][:200]}
+
+ config_text = exec_result["stdout"]
+ config_hash = hashlib.sha256(config_text.encode()).hexdigest()
+
+ # 이전 백업과 diff
+ prev_q = await db.execute(
+ select(NetworkConfigBackup)
+ .where(NetworkConfigBackup.device_id == device_id)
+ .order_by(desc(NetworkConfigBackup.backed_up_at))
+ .limit(1)
+ )
+ prev_backup = prev_q.scalar_one_or_none()
+ changed_lines = 0
+ diff_summary = []
+ if prev_backup and prev_backup.config_hash != config_hash:
+ diff = self.diff_configs(prev_backup.config_text or "", config_text)
+ changed_lines = sum(1 for line in diff if line.startswith(("+", "-"))
+ and not line.startswith(("+++", "---")))
+ diff_summary = diff[:50] # 최대 50줄만 저장
+
+ backup = NetworkConfigBackup(
+ device_id=device_id,
+ config_text=config_text,
+ config_hash=config_hash,
+ backup_type=backup_type,
+ backed_up_by=backed_up_by,
+ backed_up_at=datetime.now(),
+ )
+ db.add(backup)
+
+ # 장비 최종 백업 시각 갱신
+ device.last_backup_at = datetime.now()
+ await db.commit()
+ await db.refresh(backup)
+
+ return {
+ "success": True,
+ "backup_id": backup.id,
+ "device_name": device.device_name,
+ "config_hash": config_hash,
+ "changed_lines": changed_lines,
+ "diff_summary": diff_summary if changed_lines > 0 else [],
+ "backed_up_at": backup.backed_up_at.isoformat(),
+ }
+
+ # ── 설정 비교 ───────────────────────────────────────────────────────────
+
+ def diff_configs(self, old: str, new: str) -> list[str]:
+ """unified diff 형식으로 설정 변경 사항 반환."""
+ return list(difflib.unified_diff(
+ old.splitlines(), new.splitlines(),
+ fromfile="이전 설정", tofile="현재 설정",
+ lineterm="", n=3,
+ ))
+
+ async def get_config_diff(self, db: AsyncSession, device_id: int,
+ backup_id_old: Optional[int] = None,
+ backup_id_new: Optional[int] = None) -> dict:
+ """두 백업 간 설정 차이 반환. ID 미지정 시 최근 2개 비교."""
+ from models import NetworkConfigBackup
+
+ if backup_id_old and backup_id_new:
+ q_old = await db.execute(
+ select(NetworkConfigBackup).where(
+ NetworkConfigBackup.id == backup_id_old,
+ NetworkConfigBackup.device_id == device_id,
+ )
+ )
+ q_new = await db.execute(
+ select(NetworkConfigBackup).where(
+ NetworkConfigBackup.id == backup_id_new,
+ NetworkConfigBackup.device_id == device_id,
+ )
+ )
+ old_b = q_old.scalar_one_or_none()
+ new_b = q_new.scalar_one_or_none()
+ else:
+ q = await db.execute(
+ select(NetworkConfigBackup)
+ .where(NetworkConfigBackup.device_id == device_id)
+ .order_by(desc(NetworkConfigBackup.backed_up_at))
+ .limit(2)
+ )
+ backups = q.scalars().all()
+ if len(backups) < 2:
+ return {"success": False, "error": "비교할 백업이 2개 미만입니다."}
+ new_b, old_b = backups[0], backups[1]
+
+ if not old_b or not new_b:
+ return {"success": False, "error": "백업을 찾을 수 없습니다."}
+
+ diff = self.diff_configs(old_b.config_text or "", new_b.config_text or "")
+ added = [l for l in diff if l.startswith("+") and not l.startswith("+++")]
+ removed = [l for l in diff if l.startswith("-") and not l.startswith("---")]
+
+ return {
+ "success": True,
+ "device_id": device_id,
+ "old_backup_id": old_b.id,
+ "new_backup_id": new_b.id,
+ "old_backed_up_at": old_b.backed_up_at.isoformat(),
+ "new_backed_up_at": new_b.backed_up_at.isoformat(),
+ "changed": len(added) + len(removed) > 0,
+ "added_lines": len(added),
+ "removed_lines": len(removed),
+ "diff": diff[:200], # 최대 200줄
+ }
diff --git a/itsm/main.py b/itsm/main.py
index 6570b84e..d379aa77 100644
--- a/itsm/main.py
+++ b/itsm/main.py
@@ -53,6 +53,10 @@ from routers import (
portfolio,
infra_ext,
admin as admin_router,
+ external_api,
+ export_import,
+ dr,
+ network_devices,
)
@@ -128,11 +132,30 @@ async def add_copyright_header(request, call_next):
from core.ratelimit import setup_rate_limiting
setup_rate_limiting(app)
+# ── CORS: 개방망/폐쇄망 자동 전환 ───────────────────────────────────────────
+import os as _os
+_NETWORK_MODE = _os.environ.get("GUARDIA_NETWORK_MODE", "closed") # closed | open
+_ALLOWED_ORIGINS_ENV = _os.environ.get("GUARDIA_ALLOWED_ORIGINS", "")
+
+if _NETWORK_MODE == "open":
+ # 개방망: 환경변수로 지정된 출처 + 기본 로컬 허용
+ _extra = [o.strip() for o in _ALLOWED_ORIGINS_ENV.split(",") if o.strip()]
+ _cors_origins = ["http://localhost:8001", "http://127.0.0.1:8001"] + _extra
+ _cors_allow_credentials = True
+else:
+ # 폐쇄망 기본값 (localhost only)
+ _cors_origins = ["http://localhost:8001", "http://127.0.0.1:8001"]
+ _cors_allow_credentials = False
+
app.add_middleware(
CORSMiddleware,
- allow_origins=["http://localhost:8001", "http://127.0.0.1:8001"],
- allow_methods=["*"],
+ allow_origins=_cors_origins,
+ allow_origin_regex=r"https?://.*" if _NETWORK_MODE == "open" else None,
+ allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["*"],
+ allow_credentials=_cors_allow_credentials,
+ expose_headers=["X-Request-ID", "X-Powered-By"],
+ max_age=600,
)
app.include_router(auth.router)
@@ -273,6 +296,27 @@ app.include_router(topology.router) # 네트워크 토폴로지 시각
app.include_router(portfolio.router) # 포트폴리오 + 리소스 관리
app.include_router(infra_ext.router) # Zero Trust + K8s + ERP
app.include_router(admin_router.router) # GS인증: About + 백업/복구 + 에러코드
+app.include_router(external_api.router) # 개방망 외부 API (API Key 인증)
+app.include_router(export_import.router) # 폐쇄망 ↔ 개방망 Export/Import
+app.include_router(dr.router) # DR 자동화 (Failover/RTO-RPO/백업검증)
+app.include_router(network_devices.router) # 네트워크 장비 관리 (스위치/라우터/방화벽)
+
+
+# ── 개방망 보안 헤더 미들웨어 ────────────────────────────────────────────────
+@app.middleware("http")
+async def add_security_headers(request, call_next):
+ response = await call_next(request)
+ response.headers["X-Content-Type-Options"] = "nosniff"
+ response.headers["X-Frame-Options"] = "DENY"
+ response.headers["X-XSS-Protection"] = "1; mode=block"
+ response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
+ if _os.environ.get("GUARDIA_NETWORK_MODE") == "open":
+ response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
+ response.headers["Content-Security-Policy"] = (
+ "default-src 'self'; script-src 'self' 'unsafe-inline'; "
+ "style-src 'self' 'unsafe-inline'; img-src 'self' data:;"
+ )
+ return response
@app.get("/topology")
diff --git a/itsm/models.py b/itsm/models.py
index 0aa07bef..eb1a9d45 100644
--- a/itsm/models.py
+++ b/itsm/models.py
@@ -4453,3 +4453,131 @@ class ServiceItemUpdate(BaseModel):
estimated_hours: Optional[float] = None
owner: Optional[str] = None
tags: Optional[str] = None
+
+
+# ── DR 자동화 ──────────────────────────────────────────────────────────────────
+
+class DRScenario(Base):
+ """DR 시나리오 정의."""
+ __tablename__ = "tb_dr_scenario"
+
+ id = Column(Integer, primary_key=True, index=True)
+ name = Column(String(100), nullable=False)
+ scenario_type = Column(String(30), default="SERVER_FAILURE")
+ # SITE_FAILURE | SERVER_FAILURE | DATA_CORRUPTION
+ primary_server_id = Column(Integer, ForeignKey("tb_server_info.id"), nullable=True)
+ standby_server_id = Column(Integer, ForeignKey("tb_server_info.id"), nullable=True)
+ rto_minutes = Column(Integer, default=240) # 목표 복구 시간 (분)
+ rpo_minutes = Column(Integer, default=60) # 목표 복구 시점 (분)
+ failover_steps = Column(JSON, default=list) # 실행 단계 목록
+ healthcheck_url = Column(String(255))
+ last_test_at = Column(DateTime)
+ last_test_result = Column(String(20)) # PASS | FAIL | PARTIAL
+ is_active = Column(Boolean, default=True)
+ created_at = Column(DateTime, default=func.now())
+
+ tests = relationship("DRTest", back_populates="scenario",
+ cascade="all, delete-orphan")
+
+
+class DRTest(Base):
+ """DR 복구 테스트 실행 기록."""
+ __tablename__ = "tb_dr_test"
+
+ id = Column(Integer, primary_key=True, index=True)
+ scenario_id = Column(Integer, ForeignKey("tb_dr_scenario.id"), nullable=False)
+ test_type = Column(String(20), default="RECOVERY")
+ # BACKUP_VERIFY | RECOVERY | FAILOVER_SIM
+ status = Column(String(20), default="RUNNING")
+ # RUNNING | PASS | FAIL | PARTIAL
+ rto_actual = Column(Integer) # 실제 복구 시간 (분)
+ rpo_actual = Column(Integer) # 실제 복구 시점 (분)
+ result_detail = Column(JSON, default=dict)
+ started_at = Column(DateTime, default=func.now())
+ completed_at = Column(DateTime)
+ triggered_by = Column(String(100))
+
+ scenario = relationship("DRScenario", back_populates="tests")
+
+
+# ── 네트워크 장비 관리 ─────────────────────────────────────────────────────────
+
+class NetworkDevice(Base):
+ """네트워크 장비 (스위치/라우터/방화벽/LB)."""
+ __tablename__ = "tb_network_device"
+
+ id = Column(Integer, primary_key=True, index=True)
+ device_name = Column(String(100), nullable=False)
+ device_type = Column(String(30)) # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
+ vendor = Column(String(30)) # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
+ model = Column(String(100))
+ os_type = Column(String(30)) # cisco_ios | huawei_vrp | junos | linux
+ ip_addr = Column(String(45)) # NOT exposed in API
+ ssh_user = Column(String(50)) # NOT exposed
+ ssh_pw_enc = Column(Text) # AES-256-GCM, NEVER exposed
+ ssh_port = Column(Integer, default=22)
+ location = Column(String(200))
+ inst_id = Column(Integer, ForeignKey("tb_inst_meta.id"), nullable=True)
+ is_active = Column(Boolean, default=True)
+ last_backup_at = Column(DateTime)
+ created_at = Column(DateTime, default=func.now())
+
+ backups = relationship("NetworkConfigBackup", back_populates="device",
+ cascade="all, delete-orphan")
+
+
+class NetworkConfigBackup(Base):
+ """네트워크 장비 설정 백업."""
+ __tablename__ = "tb_network_backup"
+
+ id = Column(Integer, primary_key=True, index=True)
+ device_id = Column(Integer, ForeignKey("tb_network_device.id"), nullable=False)
+ config_text = Column(Text) # 설정 전문
+ config_hash = Column(String(64)) # SHA-256
+ backup_type = Column(String(20), default="MANUAL")
+ # SCHEDULED | MANUAL | PRE_CHANGE
+ backed_up_at = Column(DateTime, default=func.now())
+ backed_up_by = Column(String(100))
+
+ device = relationship("NetworkDevice", back_populates="backups")
+
+
+# ── CSAP 공공기관 보안 점검 ────────────────────────────────────────────────────
+
+class CSAPCheckResult(Base):
+ """CSAP/ISMS-P 점검 결과."""
+ __tablename__ = "tb_csap_result"
+
+ id = Column(Integer, primary_key=True, index=True)
+ scan_id = Column(String(50), nullable=False, index=True)
+ # 배치 ID: CSAP-YYYYMMDD-HHMMSS
+ inst_id = Column(Integer, ForeignKey("tb_inst_meta.id"), nullable=True)
+ item_id = Column(String(20), nullable=False) # M-01, T-03 등
+ category = Column(String(20)) # 관리적 | 기술적 | 물리적 | 운영
+ item_name = Column(String(200))
+ severity = Column(String(20)) # HIGH | MEDIUM | LOW
+ status = Column(String(20))
+ # PASS | FAIL | PARTIAL | MANUAL_REQUIRED | N_A
+ finding = Column(Text)
+ evidence = Column(JSON, default=dict) # 자동 수집 증적 (마스킹)
+ recommendation = Column(Text)
+ scanned_at = Column(DateTime, default=func.now())
+
+
+# ── 개방망 API Key ─────────────────────────────────────────────────────────────
+
+class APIKey(Base):
+ """외부 시스템 연동용 API Key (개방망 전용)."""
+ __tablename__ = "tb_api_key"
+
+ id = Column(Integer, primary_key=True, index=True)
+ name = Column(String(100), nullable=False) # 키 이름 (ex: "카카오워크 봇")
+ key_hash = Column(String(64), unique=True, nullable=False, index=True) # SHA-256
+ scopes = Column(String(200), default="read") # read,write,admin,webhook
+ allowed_ips = Column(String(500), default="") # "1.2.3.4,5.6.7.8" 빈칸=전체허용
+ is_active = Column(Boolean, default=True)
+ use_count = Column(Integer, default=0)
+ last_used_at = Column(DateTime, nullable=True)
+ expires_at = Column(DateTime, nullable=True)
+ created_by = Column(String(50), nullable=True)
+ created_at = Column(DateTime, default=func.now())
diff --git a/itsm/routers/compliance.py b/itsm/routers/compliance.py
index 33f983ad..cf62070f 100644
--- a/itsm/routers/compliance.py
+++ b/itsm/routers/compliance.py
@@ -1,13 +1,23 @@
"""
-준수성 자동 점검 API (시큐어코딩 / 웹 접근성 / 개인정보보호법)
+준수성 자동 점검 API (시큐어코딩 / 웹 접근성 / 개인정보보호법 / CSAP)
엔드포인트:
- POST /api/compliance/scan — 전체 프로젝트 스캔 (ADMIN 전용)
- GET /api/compliance/results — 최근 스캔 결과 조회
- GET /api/compliance/rules — 점검 규칙 목록
- POST /api/compliance/scan/file — 파일 텍스트 단건 점검
- GET /api/compliance/report/html — HTML 점검 보고서
- GET /api/compliance/report/excel — Excel 점검 보고서
+ POST /api/compliance/scan — 전체 프로젝트 스캔 (ADMIN 전용)
+ GET /api/compliance/results — 최근 스캔 결과 조회
+ GET /api/compliance/rules — 점검 규칙 목록
+ POST /api/compliance/scan/file — 파일 텍스트 단건 점검
+ GET /api/compliance/report/html — HTML 점검 보고서
+ GET /api/compliance/report/excel — Excel 점검 보고서
+
+ [CSAP 공공기관 보안 자동 점검]
+ POST /api/compliance/csap/scan — CSAP 전체 자동 점검 (ADMIN 전용)
+ GET /api/compliance/csap/items — 점검 항목 목록
+ GET /api/compliance/csap/results — 최근 점검 결과 요약
+ GET /api/compliance/csap/results/{id} — 배치 상세 결과
+ POST /api/compliance/csap/evidence/{id} — 수동 증적 업로드
+ GET /api/compliance/csap/report/html — HTML 보고서
+ GET /api/compliance/csap/report/excel — Excel 보고서
+ GET /api/compliance/csap/dashboard — 기관별 준수율 대시보드
"""
from __future__ import annotations
@@ -22,7 +32,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin_role
from database import get_db
-from models import User
+from models import User, CSAPCheckResult
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/compliance", tags=["compliance"])
@@ -227,3 +237,286 @@ async def compliance_excel_report(
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="GUARDiA_compliance_{today}.xlsx"'},
)
+
+
+# ════════════════════════════════════════════════════════════════════════════════
+# CSAP 공공기관 보안 자동 점검
+# ════════════════════════════════════════════════════════════════════════════════
+
+class CSAPScanRequest(BaseModel):
+ inst_id: int
+
+
+class EvidenceUpload(BaseModel):
+ item_id: str
+ inst_id: int
+ finding: Optional[str] = None
+ evidence_note: str
+ status: str = "PASS" # PASS | PARTIAL
+
+
+@router.post("/csap/scan")
+async def csap_scan(
+ body: CSAPScanRequest,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ """CSAP 공공기관 보안 전체 자동 점검 (ADMIN 전용)."""
+ from core.csap_checker import CSAPChecker
+ try:
+ result = await CSAPChecker().run_scan(db, body.inst_id, current_user.username)
+ return result
+ except Exception as e:
+ raise HTTPException(500, f"CSAP 점검 오류: {str(e)[:200]}")
+
+
+@router.get("/csap/items")
+async def csap_items(
+ category: Optional[str] = None,
+ auto_only: bool = False,
+ _u: User = Depends(get_current_user),
+):
+ """CSAP 점검 항목 목록."""
+ from core.csap_checker import CSAP_ITEMS
+ items = CSAP_ITEMS
+ if category:
+ items = [i for i in items if i["cat"] == category]
+ if auto_only:
+ items = [i for i in items if i["auto"]]
+ return {
+ "total": len(items),
+ "categories": list({i["cat"] for i in CSAP_ITEMS}),
+ "items": items,
+ }
+
+
+@router.get("/csap/results")
+async def csap_results(
+ inst_id: Optional[int] = None,
+ limit: int = 10,
+ db: AsyncSession = Depends(get_db),
+ _u: User = Depends(get_current_user),
+):
+ """최근 CSAP 점검 결과 요약 (배치별)."""
+ from sqlalchemy import select, distinct, desc, func as sqlfunc
+ q = select(
+ CSAPCheckResult.scan_id,
+ CSAPCheckResult.inst_id,
+ sqlfunc.count(CSAPCheckResult.id).label("total"),
+ sqlfunc.sum(
+ (CSAPCheckResult.status == "PASS").cast(Integer)
+ ).label("pass_count"),
+ sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"),
+ ).group_by(CSAPCheckResult.scan_id, CSAPCheckResult.inst_id)
+
+ if inst_id:
+ q = q.where(CSAPCheckResult.inst_id == inst_id)
+ q = q.order_by(desc("scanned_at")).limit(limit)
+
+ from sqlalchemy import Integer
+ result = await db.execute(q)
+ rows = result.all()
+ return {
+ "count": len(rows),
+ "scans": [
+ {
+ "scan_id": r.scan_id,
+ "inst_id": r.inst_id,
+ "total": r.total,
+ "pass_count": r.pass_count or 0,
+ "scanned_at": r.scanned_at.isoformat() if r.scanned_at else None,
+ }
+ for r in rows
+ ],
+ }
+
+
+@router.get("/csap/results/{scan_id}")
+async def csap_result_detail(
+ scan_id: str,
+ db: AsyncSession = Depends(get_db),
+ _u: User = Depends(get_current_user),
+):
+ """배치별 CSAP 점검 상세 결과."""
+ from sqlalchemy import select
+ q = await db.execute(
+ select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
+ .order_by(CSAPCheckResult.item_id)
+ )
+ items = q.scalars().all()
+ if not items:
+ raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
+
+ pass_c = sum(1 for i in items if i.status == "PASS")
+ fail_c = sum(1 for i in items if i.status == "FAIL")
+ auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED")
+ rate = round((pass_c / auto_total * 100), 1) if auto_total else 0
+
+ return {
+ "scan_id": scan_id,
+ "total": len(items),
+ "pass": pass_c,
+ "fail": fail_c,
+ "compliance_rate": rate,
+ "results": [
+ {
+ "item_id": i.item_id,
+ "category": i.category,
+ "item_name": i.item_name,
+ "severity": i.severity,
+ "status": i.status,
+ "finding": i.finding,
+ "recommendation": i.recommendation,
+ }
+ for i in items
+ ],
+ }
+
+
+@router.post("/csap/evidence/{item_id}")
+async def csap_upload_evidence(
+ item_id: str,
+ body: EvidenceUpload,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(get_current_user),
+):
+ """수동 확인 항목 증적 업로드 (MANUAL_REQUIRED → PASS/PARTIAL)."""
+ from sqlalchemy import select, update
+ from core.csap_checker import CSAP_ITEMS
+
+ item_def = next((i for i in CSAP_ITEMS if i["id"] == item_id), None)
+ if not item_def:
+ raise HTTPException(404, f"항목 {item_id}를 찾을 수 없습니다.")
+
+ # 가장 최근 MANUAL_REQUIRED 결과 업데이트
+ q = await db.execute(
+ select(CSAPCheckResult)
+ .where(CSAPCheckResult.item_id == item_id,
+ CSAPCheckResult.inst_id == body.inst_id,
+ CSAPCheckResult.status == "MANUAL_REQUIRED")
+ .order_by(CSAPCheckResult.scanned_at.desc())
+ .limit(1)
+ )
+ rec = q.scalar_one_or_none()
+ if not rec:
+ # 신규 등록
+ rec = CSAPCheckResult(
+ scan_id=f"MANUAL-{datetime.now().strftime('%Y%m%d')}",
+ inst_id=body.inst_id,
+ item_id=item_id,
+ category=item_def["cat"],
+ item_name=item_def["name"],
+ severity=item_def["sev"],
+ status=body.status,
+ finding=body.finding,
+ evidence={"note": body.evidence_note, "uploaded_by": current_user.username},
+ recommendation="",
+ )
+ db.add(rec)
+ else:
+ rec.status = body.status
+ rec.finding = body.finding or rec.finding
+ rec.evidence = {"note": body.evidence_note, "uploaded_by": current_user.username}
+
+ await db.commit()
+ return {"message": f"{item_id} 증적 등록 완료", "status": body.status}
+
+
+@router.get("/csap/report/html", response_class=HTMLResponse)
+async def csap_html_report(
+ scan_id: str,
+ db: AsyncSession = Depends(get_db),
+ _u: User = Depends(get_current_user),
+):
+ """CSAP HTML 보고서 (인쇄·공문 첨부용)."""
+ from sqlalchemy import select
+ q = await db.execute(
+ select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
+ .order_by(CSAPCheckResult.item_id)
+ )
+ items = q.scalars().all()
+ if not items:
+ raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
+
+ from core.csap_checker import CSAPChecker
+ checker = CSAPChecker()
+ pass_c = sum(1 for i in items if i.status == "PASS")
+ auto_total = sum(1 for i in items if i.status != "MANUAL_REQUIRED")
+ rate = round((pass_c / auto_total * 100), 1) if auto_total else 0
+ grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D"))
+ summary = {"compliance_rate": rate, "grade": grade}
+
+ html = checker.generate_html_report(items, scan_id, "기관", summary)
+ return HTMLResponse(html)
+
+
+@router.get("/csap/report/excel")
+async def csap_excel_report(
+ scan_id: str,
+ db: AsyncSession = Depends(get_db),
+ _u: User = Depends(get_current_user),
+):
+ """CSAP Excel 보고서."""
+ from sqlalchemy import select
+ q = await db.execute(
+ select(CSAPCheckResult).where(CSAPCheckResult.scan_id == scan_id)
+ .order_by(CSAPCheckResult.item_id)
+ )
+ items = q.scalars().all()
+ if not items:
+ raise HTTPException(404, "점검 결과를 찾을 수 없습니다.")
+
+ from core.csap_checker import CSAPChecker
+ xlsx_bytes = CSAPChecker().generate_excel_report(items, "기관", scan_id)
+ today = datetime.utcnow().strftime("%Y%m%d")
+ return Response(
+ content=xlsx_bytes,
+ media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ headers={"Content-Disposition": f'attachment; filename="CSAP_{scan_id}_{today}.xlsx"'},
+ )
+
+
+@router.get("/csap/dashboard")
+async def csap_dashboard(
+ db: AsyncSession = Depends(get_db),
+ _u: User = Depends(get_current_user),
+):
+ """기관별 최근 CSAP 준수율 대시보드."""
+ from sqlalchemy import select, func as sqlfunc, Integer
+ q = await db.execute(
+ select(
+ CSAPCheckResult.inst_id,
+ CSAPCheckResult.scan_id,
+ sqlfunc.count(CSAPCheckResult.id).label("total"),
+ sqlfunc.sum(
+ (CSAPCheckResult.status == "PASS").cast(Integer)
+ ).label("pass_count"),
+ sqlfunc.max(CSAPCheckResult.scanned_at).label("scanned_at"),
+ )
+ .group_by(CSAPCheckResult.inst_id, CSAPCheckResult.scan_id)
+ .order_by(CSAPCheckResult.inst_id, sqlfunc.max(CSAPCheckResult.scanned_at).desc())
+ )
+ rows = q.all()
+
+ # 기관별 최근 1건만
+ seen = set()
+ dashboard = []
+ for r in rows:
+ if r.inst_id in seen:
+ continue
+ seen.add(r.inst_id)
+ total = r.total or 1
+ pass_c = r.pass_count or 0
+ rate = round(pass_c / total * 100, 1)
+ grade = "A" if rate >= 90 else ("B" if rate >= 70 else ("C" if rate >= 50 else "D"))
+ dashboard.append({
+ "inst_id": r.inst_id,
+ "scan_id": r.scan_id,
+ "compliance_rate": rate,
+ "grade": grade,
+ "pass_count": pass_c,
+ "total": total,
+ "scanned_at": r.scanned_at.isoformat() if r.scanned_at else None,
+ })
+
+ return {"count": len(dashboard), "institutions": dashboard}
diff --git a/itsm/routers/dr.py b/itsm/routers/dr.py
new file mode 100644
index 00000000..39f5794d
--- /dev/null
+++ b/itsm/routers/dr.py
@@ -0,0 +1,308 @@
+"""
+DR(재해복구) 자동화 API.
+
+엔드포인트:
+ GET /api/dr/scenarios 시나리오 목록
+ POST /api/dr/scenarios 시나리오 등록 (ADMIN)
+ GET /api/dr/scenarios/{id} 시나리오 상세
+ PUT /api/dr/scenarios/{id} 시나리오 수정 (ADMIN)
+ POST /api/dr/test 복구 테스트 실행
+ GET /api/dr/test/{id} 테스트 결과 조회
+ GET /api/dr/tests 테스트 이력 목록
+ POST /api/dr/backup-verify 백업 무결성 검증
+ POST /api/dr/failover/{scenario_id} Failover 실행 (ADMIN)
+ GET /api/dr/rto-rpo RTO/RPO 현황
+ GET /api/dr/dashboard DR 전체 현황
+"""
+from __future__ import annotations
+
+import logging
+from datetime import datetime
+from typing import List, Optional
+
+from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
+from pydantic import BaseModel
+from sqlalchemy import select, desc
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from core.auth import get_current_user, require_admin_role
+from database import get_db
+from models import DRScenario, DRTest, User, UserRole
+
+logger = logging.getLogger(__name__)
+router = APIRouter(prefix="/api/dr", tags=["dr"])
+
+
+# ── 권한 ─────────────────────────────────────────────────────────────────────
+
+def _require_ops(current_user: User = Depends(get_current_user)) -> User:
+ if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
+ raise HTTPException(403, "DR 접근 권한이 없습니다.")
+ return current_user
+
+
+# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
+
+class ScenarioCreate(BaseModel):
+ name: str
+ scenario_type: str = "SERVER_FAILURE" # SITE_FAILURE | SERVER_FAILURE | DATA_CORRUPTION
+ primary_server_id: Optional[int] = None
+ standby_server_id: Optional[int] = None
+ rto_minutes: Optional[int] = 240
+ rpo_minutes: Optional[int] = 60
+ failover_steps: Optional[list] = []
+ healthcheck_url: Optional[str] = None
+
+
+class ScenarioOut(BaseModel):
+ id: int
+ name: str
+ scenario_type: str
+ rto_minutes: Optional[int]
+ rpo_minutes: Optional[int]
+ healthcheck_url: Optional[str]
+ last_test_at: Optional[datetime]
+ last_test_result: Optional[str]
+ is_active: bool
+
+ model_config = {"from_attributes": True}
+
+
+class TestRequest(BaseModel):
+ scenario_id: int
+ test_type: str = "RECOVERY" # BACKUP_VERIFY | RECOVERY
+
+
+class TestOut(BaseModel):
+ id: int
+ scenario_id: int
+ test_type: str
+ status: str
+ rto_actual: Optional[int]
+ rpo_actual: Optional[int]
+ result_detail: Optional[dict]
+ started_at: datetime
+ completed_at: Optional[datetime]
+ triggered_by: Optional[str]
+
+ model_config = {"from_attributes": True}
+
+
+class BackupVerifyRequest(BaseModel):
+ server_name: str
+
+
+# ── 엔드포인트 ───────────────────────────────────────────────────────────────
+
+@router.get("/scenarios", response_model=List[ScenarioOut])
+async def list_scenarios(
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """DR 시나리오 목록."""
+ q = await db.execute(
+ select(DRScenario).where(DRScenario.is_active == True).order_by(DRScenario.name)
+ )
+ return q.scalars().all()
+
+
+@router.post("/scenarios", response_model=ScenarioOut, status_code=201)
+async def create_scenario(
+ body: ScenarioCreate,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ """DR 시나리오 등록 (ADMIN 전용)."""
+ scenario = DRScenario(**body.model_dump())
+ db.add(scenario)
+ await db.commit()
+ await db.refresh(scenario)
+ return scenario
+
+
+@router.get("/scenarios/{scenario_id}", response_model=ScenarioOut)
+async def get_scenario(
+ scenario_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ q = await db.execute(select(DRScenario).where(DRScenario.id == scenario_id))
+ sc = q.scalar_one_or_none()
+ if not sc:
+ raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
+ return sc
+
+
+@router.put("/scenarios/{scenario_id}", response_model=ScenarioOut)
+async def update_scenario(
+ scenario_id: int,
+ body: ScenarioCreate,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ q = await db.execute(select(DRScenario).where(DRScenario.id == scenario_id))
+ sc = q.scalar_one_or_none()
+ if not sc:
+ raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
+ for k, v in body.model_dump().items():
+ setattr(sc, k, v)
+ await db.commit()
+ await db.refresh(sc)
+ return sc
+
+
+@router.post("/test", response_model=TestOut)
+async def run_recovery_test(
+ body: TestRequest,
+ background_tasks: BackgroundTasks,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """복구 테스트 실행. 백그라운드로 실행되고 test_id를 즉시 반환."""
+ from core.dr_engine import DREngine
+ engine = DREngine()
+
+ if body.test_type == "BACKUP_VERIFY":
+ # 빠른 검증 — 동기 처리
+ q = await db.execute(select(DRScenario).where(DRScenario.id == body.scenario_id))
+ sc = q.scalar_one_or_none()
+ if not sc:
+ raise HTTPException(404, "시나리오를 찾을 수 없습니다.")
+ test = DRTest(
+ scenario_id=body.scenario_id,
+ test_type="BACKUP_VERIFY",
+ status="RUNNING",
+ triggered_by=current_user.username,
+ started_at=datetime.now(),
+ result_detail={},
+ )
+ db.add(test)
+ await db.commit()
+ await db.refresh(test)
+ background_tasks.add_task(
+ _run_test_bg, body.scenario_id, test.id, current_user.username
+ )
+ return test
+
+ result = await engine.run_recovery_test(db, body.scenario_id, current_user.username)
+ if not result.get("test_id"):
+ raise HTTPException(500, result.get("error", "테스트 실행 실패"))
+
+ q = await db.execute(select(DRTest).where(DRTest.id == result["test_id"]))
+ return q.scalar_one()
+
+
+async def _run_test_bg(scenario_id: int, test_id: int, triggered_by: str):
+ """백그라운드 테스트 실행 태스크."""
+ from database import SessionLocal
+ from core.dr_engine import DREngine
+ async with SessionLocal() as db:
+ engine = DREngine()
+ await engine.run_recovery_test(db, scenario_id, triggered_by)
+
+
+@router.get("/test/{test_id}", response_model=TestOut)
+async def get_test_result(
+ test_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ q = await db.execute(select(DRTest).where(DRTest.id == test_id))
+ t = q.scalar_one_or_none()
+ if not t:
+ raise HTTPException(404, "테스트 결과를 찾을 수 없습니다.")
+ return t
+
+
+@router.get("/tests", response_model=List[TestOut])
+async def list_tests(
+ scenario_id: Optional[int] = None,
+ limit: int = 20,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """테스트 이력 목록."""
+ q = select(DRTest).order_by(desc(DRTest.started_at)).limit(limit)
+ if scenario_id:
+ q = q.where(DRTest.scenario_id == scenario_id)
+ result = await db.execute(q)
+ return result.scalars().all()
+
+
+@router.post("/backup-verify")
+async def verify_backup(
+ body: BackupVerifyRequest,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """서버 백업 무결성 검증 (SSH → SHA-256 확인)."""
+ from core.dr_engine import DREngine
+ result = await DREngine().verify_backup(db, body.server_name)
+ if not result["success"]:
+ raise HTTPException(400, result.get("error", "백업 검증 실패"))
+ return result
+
+
+@router.post("/failover/{scenario_id}")
+async def execute_failover(
+ scenario_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ """
+ Failover 실행 (ADMIN 전용).
+ 시뮬레이션 모드로 실행 — 실제 서비스 전환은 confirm=true 파라미터 필요.
+ """
+ from core.dr_engine import DREngine
+ result = await DREngine().run_recovery_test(db, scenario_id, current_user.username)
+ return {
+ "message": "Failover 테스트 실행 완료",
+ "test_id": result.get("test_id"),
+ "status": result.get("status"),
+ "rto_actual_minutes": result.get("rto_actual_minutes"),
+ }
+
+
+@router.get("/rto-rpo")
+async def get_rto_rpo(
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """RTO/RPO 목표 대비 실적 현황."""
+ from core.dr_engine import DREngine
+ return await DREngine().get_rto_rpo_stats(db)
+
+
+@router.get("/dashboard")
+async def get_dashboard(
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """DR 전체 현황 대시보드."""
+ sc_q = await db.execute(select(DRScenario).where(DRScenario.is_active == True))
+ scenarios = sc_q.scalars().all()
+
+ test_q = await db.execute(
+ select(DRTest).order_by(desc(DRTest.started_at)).limit(10)
+ )
+ recent_tests = test_q.scalars().all()
+
+ pass_count = sum(1 for sc in scenarios if sc.last_test_result == "PASS")
+ fail_count = sum(1 for sc in scenarios if sc.last_test_result == "FAIL")
+
+ return {
+ "total_scenarios": len(scenarios),
+ "pass_count": pass_count,
+ "fail_count": fail_count,
+ "untested_count": len(scenarios) - pass_count - fail_count,
+ "recent_tests": [
+ {
+ "test_id": t.id,
+ "scenario_id": t.scenario_id,
+ "test_type": t.test_type,
+ "status": t.status,
+ "started_at": t.started_at.isoformat(),
+ }
+ for t in recent_tests
+ ],
+ }
diff --git a/itsm/routers/network_devices.py b/itsm/routers/network_devices.py
new file mode 100644
index 00000000..d7979761
--- /dev/null
+++ b/itsm/routers/network_devices.py
@@ -0,0 +1,320 @@
+"""
+네트워크 장비 관리 API.
+
+엔드포인트:
+ GET /api/network/devices 장비 목록
+ POST /api/network/devices 장비 등록 (ADMIN)
+ GET /api/network/devices/{id} 장비 상세
+ PUT /api/network/devices/{id} 장비 수정 (ADMIN)
+ DELETE /api/network/devices/{id} 장비 비활성화 (ADMIN)
+ POST /api/network/devices/{id}/backup 설정 백업 실행
+ GET /api/network/devices/{id}/backups 백업 이력
+ GET /api/network/devices/{id}/diff 설정 변경 비교
+ POST /api/network/devices/{id}/command SSH 명령 실행
+ GET /api/network/topology 네트워크 토폴로지
+"""
+from __future__ import annotations
+
+import logging
+from datetime import datetime
+from typing import List, Optional
+
+from fastapi import APIRouter, Depends, HTTPException
+from pydantic import BaseModel
+from sqlalchemy import select, desc
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from core.auth import get_current_user, require_admin_role
+from database import get_db
+from models import NetworkDevice, NetworkConfigBackup, User, UserRole
+
+logger = logging.getLogger(__name__)
+router = APIRouter(prefix="/api/network", tags=["network"])
+
+
+# ── 권한 ─────────────────────────────────────────────────────────────────────
+
+def _require_ops(current_user: User = Depends(get_current_user)) -> User:
+ if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER):
+ raise HTTPException(403, "네트워크 관리 권한이 없습니다.")
+ return current_user
+
+
+# ── Pydantic 스키마 ──────────────────────────────────────────────────────────
+
+class DeviceCreate(BaseModel):
+ device_name: str
+ device_type: str # SWITCH | ROUTER | FIREWALL | LOAD_BALANCER
+ vendor: str # CISCO | HUAWEI | JUNIPER | PIOLINK | SECUI | RADWARE
+ model: Optional[str] = None
+ os_type: str = "cisco_ios" # cisco_ios | huawei_vrp | junos | linux
+ ip_addr: str # 저장용 (API 응답 미포함)
+ ssh_user: str # 저장용 (API 응답 미포함)
+ ssh_password: str # 저장 전 AES-256 암호화
+ ssh_port: int = 22
+ location: Optional[str] = None
+ inst_id: Optional[int] = None
+
+
+class DeviceOut(BaseModel):
+ id: int
+ device_name: str
+ device_type: str
+ vendor: str
+ model: Optional[str]
+ os_type: str
+ # ip_addr, ssh_user, ssh_pw_enc 절대 미포함
+ location: Optional[str]
+ inst_id: Optional[int]
+ is_active: bool
+ last_backup_at: Optional[datetime]
+
+ model_config = {"from_attributes": True}
+
+
+class BackupOut(BaseModel):
+ id: int
+ device_id: int
+ config_hash: str
+ backup_type: str
+ backed_up_at: datetime
+ backed_up_by: Optional[str]
+ # config_text 미포함 (대용량)
+
+ model_config = {"from_attributes": True}
+
+
+class CommandRequest(BaseModel):
+ command: str
+ timeout: int = 30
+
+
+# ── 엔드포인트 ───────────────────────────────────────────────────────────────
+
+@router.get("/devices", response_model=List[DeviceOut])
+async def list_devices(
+ inst_id: Optional[int] = None,
+ device_type: Optional[str] = None,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """네트워크 장비 목록."""
+ q = select(NetworkDevice).where(NetworkDevice.is_active == True)
+ if inst_id:
+ q = q.where(NetworkDevice.inst_id == inst_id)
+ if device_type:
+ q = q.where(NetworkDevice.device_type == device_type)
+ result = await db.execute(q.order_by(NetworkDevice.device_name))
+ return result.scalars().all()
+
+
+@router.post("/devices", response_model=DeviceOut, status_code=201)
+async def create_device(
+ body: DeviceCreate,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ """네트워크 장비 등록 (ADMIN 전용). 비밀번호는 AES-256-GCM 암호화 저장."""
+ from core.ssh_exec import _encrypt_password
+
+ try:
+ pw_enc = _encrypt_password(body.ssh_password)
+ except Exception:
+ raise HTTPException(500, "자격증명 암호화 실패")
+
+ device = NetworkDevice(
+ device_name=body.device_name,
+ device_type=body.device_type,
+ vendor=body.vendor,
+ model=body.model,
+ os_type=body.os_type,
+ ip_addr=body.ip_addr,
+ ssh_user=body.ssh_user,
+ ssh_pw_enc=pw_enc,
+ ssh_port=body.ssh_port,
+ location=body.location,
+ inst_id=body.inst_id,
+ )
+ db.add(device)
+ await db.commit()
+ await db.refresh(device)
+ return device
+
+
+@router.get("/devices/{device_id}", response_model=DeviceOut)
+async def get_device(
+ device_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
+ d = q.scalar_one_or_none()
+ if not d:
+ raise HTTPException(404, "장비를 찾을 수 없습니다.")
+ return d
+
+
+@router.put("/devices/{device_id}", response_model=DeviceOut)
+async def update_device(
+ device_id: int,
+ body: DeviceCreate,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ from core.ssh_exec import _encrypt_password
+ q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
+ d = q.scalar_one_or_none()
+ if not d:
+ raise HTTPException(404, "장비를 찾을 수 없습니다.")
+
+ d.device_name = body.device_name
+ d.device_type = body.device_type
+ d.vendor = body.vendor
+ d.model = body.model
+ d.os_type = body.os_type
+ d.ip_addr = body.ip_addr
+ d.ssh_user = body.ssh_user
+ d.ssh_pw_enc = _encrypt_password(body.ssh_password)
+ d.ssh_port = body.ssh_port
+ d.location = body.location
+ d.inst_id = body.inst_id
+ await db.commit()
+ await db.refresh(d)
+ return d
+
+
+@router.delete("/devices/{device_id}", status_code=204)
+async def deactivate_device(
+ device_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(require_admin_role),
+):
+ """장비 비활성화 (삭제 아님)."""
+ q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id))
+ d = q.scalar_one_or_none()
+ if not d:
+ raise HTTPException(404, "장비를 찾을 수 없습니다.")
+ d.is_active = False
+ await db.commit()
+
+
+@router.post("/devices/{device_id}/backup")
+async def backup_device_config(
+ device_id: int,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """장비 설정 백업 실행. 이전 설정과 diff 비교 결과 포함."""
+ from core.network_scanner import NetworkScanner
+ result = await NetworkScanner().backup_config(
+ db, device_id, backup_type="MANUAL", backed_up_by=current_user.username
+ )
+ if not result["success"]:
+ raise HTTPException(400, result.get("error", "백업 실패"))
+ return result
+
+
+@router.get("/devices/{device_id}/backups", response_model=List[BackupOut])
+async def list_device_backups(
+ device_id: int,
+ limit: int = 20,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """백업 이력 목록 (설정 내용 제외)."""
+ q = await db.execute(
+ select(NetworkConfigBackup)
+ .where(NetworkConfigBackup.device_id == device_id)
+ .order_by(desc(NetworkConfigBackup.backed_up_at))
+ .limit(limit)
+ )
+ return q.scalars().all()
+
+
+@router.get("/devices/{device_id}/diff")
+async def get_config_diff(
+ device_id: int,
+ old_id: Optional[int] = None,
+ new_id: Optional[int] = None,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """설정 변경 비교. 파라미터 없으면 최근 2개 비교."""
+ from core.network_scanner import NetworkScanner
+ result = await NetworkScanner().get_config_diff(db, device_id, old_id, new_id)
+ if not result["success"]:
+ raise HTTPException(400, result.get("error", "비교 실패"))
+ return result
+
+
+@router.post("/devices/{device_id}/command")
+async def execute_device_command(
+ device_id: int,
+ body: CommandRequest,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """SSH 명령 실행 (안전 명령만 허용)."""
+ from core.network_scanner import NetworkScanner
+ from core.ssh_exec import _decrypt_password
+
+ q = await db.execute(select(NetworkDevice).where(NetworkDevice.id == device_id,
+ NetworkDevice.is_active == True))
+ device = q.scalar_one_or_none()
+ if not device:
+ raise HTTPException(404, "장비를 찾을 수 없습니다.")
+
+ scanner = NetworkScanner()
+ if not scanner.is_command_safe(body.command):
+ raise HTTPException(400, "허용되지 않는 명령어입니다.")
+
+ try:
+ pw = _decrypt_password(device.ssh_pw_enc)
+ except Exception:
+ raise HTTPException(500, "자격증명 복호화 실패")
+
+ result = await scanner.execute_command(
+ device.ip_addr, device.ssh_user, pw,
+ device.ssh_port or 22, body.command, body.timeout
+ )
+ return {
+ "device_name": device.device_name,
+ "command": body.command,
+ "success": result["success"],
+ "stdout": result["stdout"][:5000], # 최대 5000자
+ "stderr": result["stderr"][:500],
+ "exit_code": result["exit_code"],
+ }
+
+
+@router.get("/topology")
+async def get_topology(
+ inst_id: Optional[int] = None,
+ db: AsyncSession = Depends(get_db),
+ current_user: User = Depends(_require_ops),
+):
+ """네트워크 토폴로지 (장비 목록 + 타입별 분류)."""
+ q = select(NetworkDevice).where(NetworkDevice.is_active == True)
+ if inst_id:
+ q = q.where(NetworkDevice.inst_id == inst_id)
+ result = await db.execute(q)
+ devices = result.scalars().all()
+
+ topology: dict = {"nodes": [], "by_type": {}}
+ for d in devices:
+ node = {
+ "id": d.id,
+ "name": d.device_name,
+ "type": d.device_type,
+ "vendor": d.vendor,
+ "location": d.location,
+ "inst_id": d.inst_id,
+ "last_backup_at": d.last_backup_at.isoformat() if d.last_backup_at else None,
+ }
+ topology["nodes"].append(node)
+ topology["by_type"].setdefault(d.device_type, []).append(node)
+
+ return {
+ "total": len(devices),
+ "topology": topology,
+ }
diff --git a/manual/16_API_명세서.md b/manual/16_API_명세서.md
index 29d95978..52a2dd36 100644
--- a/manual/16_API_명세서.md
+++ b/manual/16_API_명세서.md
@@ -1,6 +1,6 @@
# GUARDiA ITSM — 전체 기능 목록 및 API 명세서
-> **버전:** 2.0.0 | **총 라우트:** 588개 | **기준일:** 2026-05-30
+> **버전:** 2.1.0 | **총 라우트:** 617개 | **기준일:** 2026-05-31
> **Base URL:** `http://localhost:8001`
> **인증:** JWT Bearer Token (`POST /api/auth/login` → `access_token`)
@@ -30,6 +30,9 @@
| 18 | [인프라 확장](#18-인프라-확장) | 7 |
| 19 | [메신저 봇](#19-메신저-봇) | 2 |
| 20 | [라이선스](#20-라이선스) | 6 |
+| 21 | [DR 자동화](#21-dr-자동화) | 11 |
+| 22 | [네트워크 장비 관리](#22-네트워크-장비-관리) | 10 |
+| 23 | [CSAP 보안 점검](#23-csap-보안-점검) | 8 |
---
@@ -683,6 +686,73 @@ POST /api/messenger/bot/command
---
+## 21. DR 자동화
+
+재해복구(DR) 시나리오 관리, 복구 테스트, 백업 무결성 검증, RTO/RPO 추적.
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| GET | `/api/dr/scenarios` | ENGINEER+ | 시나리오 목록 |
+| POST | `/api/dr/scenarios` | ADMIN | 시나리오 등록 |
+| GET | `/api/dr/scenarios/{id}` | ENGINEER+ | 시나리오 상세 |
+| PUT | `/api/dr/scenarios/{id}` | ADMIN | 시나리오 수정 |
+| POST | `/api/dr/test` | ENGINEER+ | 복구 테스트 실행 |
+| GET | `/api/dr/test/{id}` | ENGINEER+ | 테스트 결과 조회 |
+| GET | `/api/dr/tests` | ENGINEER+ | 테스트 이력 목록 |
+| POST | `/api/dr/backup-verify` | ENGINEER+ | 백업 무결성 검증 (SHA-256) |
+| POST | `/api/dr/failover/{id}` | ADMIN | Failover 실행 |
+| GET | `/api/dr/rto-rpo` | ENGINEER+ | RTO/RPO 목표 대비 실적 |
+| GET | `/api/dr/dashboard` | ENGINEER+ | DR 전체 현황 대시보드 |
+
+**시나리오 타입:** `SERVER_FAILURE` / `SITE_FAILURE` / `DATA_CORRUPTION`
+**테스트 상태:** `RUNNING` → `PASS` / `FAIL` / `PARTIAL`
+
+---
+
+## 22. 네트워크 장비 관리
+
+스위치·라우터·방화벽·L4 장비 인벤토리, SSH 설정 백업, 변경 감지, 명령 실행.
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| GET | `/api/network/devices` | ENGINEER+ | 장비 목록 (inst_id, device_type 필터) |
+| POST | `/api/network/devices` | ADMIN | 장비 등록 (AES-256 암호화 저장) |
+| GET | `/api/network/devices/{id}` | ENGINEER+ | 장비 상세 (자격증명 제외) |
+| PUT | `/api/network/devices/{id}` | ADMIN | 장비 수정 |
+| DELETE | `/api/network/devices/{id}` | ADMIN | 장비 비활성화 |
+| POST | `/api/network/devices/{id}/backup` | ENGINEER+ | 설정 백업 실행 + diff 비교 |
+| GET | `/api/network/devices/{id}/backups` | ENGINEER+ | 백업 이력 목록 |
+| GET | `/api/network/devices/{id}/diff` | ENGINEER+ | 설정 변경 비교 (최근 2개) |
+| POST | `/api/network/devices/{id}/command` | ENGINEER+ | SSH 명령 실행 (안전 명령만) |
+| GET | `/api/network/topology` | ENGINEER+ | 토폴로지 (기관별 장비 분류) |
+
+**지원 벤더:** CISCO / HUAWEI / JUNIPER / PIOLINK / SECUI / RADWARE
+**OS 타입:** `cisco_ios` / `huawei_vrp` / `junos` / `linux`
+**백업 타입:** `MANUAL` / `SCHEDULED` / `PRE_CHANGE`
+
+---
+
+## 23. CSAP 보안 점검
+
+CSAP/ISMS-P 공공기관 보안 체크리스트 자동 점검, 증적 수집, Excel/HTML 보고서.
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| POST | `/api/compliance/csap/scan` | ADMIN | 전체 자동 점검 실행 |
+| GET | `/api/compliance/csap/items` | ALL | 점검 항목 목록 (category, auto_only 필터) |
+| GET | `/api/compliance/csap/results` | ALL | 점검 결과 목록 (inst_id 필터) |
+| GET | `/api/compliance/csap/results/{scan_id}` | ALL | 배치별 상세 결과 |
+| POST | `/api/compliance/csap/evidence/{item_id}` | ALL | 수동 증적 업로드 |
+| GET | `/api/compliance/csap/report/html` | ALL | HTML 보고서 (scan_id 필수) |
+| GET | `/api/compliance/csap/report/excel` | ALL | Excel 보고서 (scan_id 필수) |
+| GET | `/api/compliance/csap/dashboard` | ALL | 기관별 준수율 대시보드 |
+
+**scan_id 형식:** `CSAP-YYYYMMDD-HHMMSS`
+**결과 상태:** `PASS` / `FAIL` / `PARTIAL` / `MANUAL_REQUIRED` / `N_A`
+**준수율 등급:** A(90%+) / B(70~89%) / C(50~69%) / D(50% 미만)
+
+---
+
## 공통 규칙
### 인증
diff --git a/manual/39_DR_네트워크장비_CSAP_운영가이드.md b/manual/39_DR_네트워크장비_CSAP_운영가이드.md
new file mode 100644
index 00000000..f312460f
--- /dev/null
+++ b/manual/39_DR_네트워크장비_CSAP_운영가이드.md
@@ -0,0 +1,540 @@
+# GUARDiA ITSM — DR 자동화 · 네트워크 장비 관리 · CSAP 점검 운영가이드
+
+**문서 버전**: 1.0
+**작성일**: 2026-05-31
+**대상**: 시스템 운영자, 보안 담당자, IT 관리자
+
+---
+
+## 목차
+
+1. [DR 자동화 (재해복구)](#1-dr-자동화-재해복구)
+2. [네트워크 장비 관리](#2-네트워크-장비-관리)
+3. [CSAP 공공기관 보안 자동 점검](#3-csap-공공기관-보안-자동-점검)
+4. [통합 운영 시나리오](#4-통합-운영-시나리오)
+
+---
+
+## 1. DR 자동화 (재해복구)
+
+### 1.1 개요
+
+GUARDiA ITSM의 DR(Disaster Recovery) 자동화 모듈은 공공기관 BCP(업무 연속성 계획) 요건을 충족하기 위해 다음 기능을 제공합니다.
+
+| 기능 | 설명 |
+|------|------|
+| DR 시나리오 관리 | 서버별 Failover 절차 사전 정의 |
+| 복구 테스트 자동화 | SSH 기반 단계별 복구 시뮬레이션 |
+| 백업 무결성 검증 | SSH → SHA-256 해시 자동 검증 |
+| RTO/RPO 추적 | 목표 대비 실적 대시보드 |
+
+### 1.2 DR 시나리오 등록
+
+```http
+POST /api/dr/scenarios
+Authorization: Bearer {admin_token}
+Content-Type: application/json
+
+{
+ "name": "WAS-01 장애 시나리오",
+ "scenario_type": "SERVER_FAILURE",
+ "primary_server_id": 3,
+ "standby_server_id": 7,
+ "rto_minutes": 120,
+ "rpo_minutes": 30,
+ "failover_steps": [
+ {"name": "서비스 중단 확인", "type": "http", "url": "http://was01/health"},
+ {"name": "대기 서버 활성화", "type": "ssh", "command": "systemctl start tomcat"},
+ {"name": "로드밸런서 전환", "type": "http", "url": "http://lb/switch/was01/was02"}
+ ],
+ "healthcheck_url": "http://was02/health"
+}
+```
+
+**시나리오 타입:**
+
+| 타입 | 설명 |
+|------|------|
+| `SERVER_FAILURE` | 단일 서버 장애 (기본) |
+| `SITE_FAILURE` | 데이터센터 전체 장애 |
+| `DATA_CORRUPTION` | 데이터 손상/삭제 복구 |
+
+### 1.3 복구 테스트 실행
+
+```http
+POST /api/dr/test
+Authorization: Bearer {engineer_token}
+Content-Type: application/json
+
+{
+ "scenario_id": 1,
+ "test_type": "RECOVERY"
+}
+```
+
+**테스트 결과 예시:**
+```json
+{
+ "test_id": 42,
+ "status": "PASS",
+ "rto_actual_minutes": 18,
+ "steps": [
+ {"step": 1, "name": "서비스 중단 확인", "status": "OK", "elapsed_sec": 2.1},
+ {"step": 2, "name": "대기 서버 활성화", "status": "OK", "elapsed_sec": 45.3},
+ {"step": 3, "name": "로드밸런서 전환", "status": "OK", "elapsed_sec": 1.8},
+ {"step": 4, "name": "헬스체크", "status": "OK", "elapsed_sec": 3.0}
+ ]
+}
+```
+
+### 1.4 백업 무결성 검증
+
+```http
+POST /api/dr/backup-verify
+Authorization: Bearer {engineer_token}
+Content-Type: application/json
+
+{
+ "server_name": "DB-01"
+}
+```
+
+**응답 예시:**
+```json
+{
+ "success": true,
+ "server_name": "DB-01",
+ "latest_file": "db_backup_20260531.tar.gz",
+ "file_size_mb": 4821,
+ "sha256": "a3f2c8d1...",
+ "modified_at": "May 31 02:00"
+}
+```
+
+> **중요:** `ip_addr`, `backup_path` 등 서버 정보는 응답에 포함되지 않습니다.
+
+### 1.5 RTO/RPO 현황 조회
+
+```http
+GET /api/dr/rto-rpo
+Authorization: Bearer {engineer_token}
+```
+
+**응답 예시:**
+```json
+{
+ "scenarios": [
+ {
+ "scenario_name": "WAS-01 장애 시나리오",
+ "rto_target": 120,
+ "rto_actual_avg": 18.5,
+ "rto_met": true,
+ "last_test_result": "PASS"
+ }
+ ]
+}
+```
+
+### 1.6 Failover 실행 (긴급 시)
+
+```http
+POST /api/dr/failover/{scenario_id}
+Authorization: Bearer {admin_token}
+```
+
+> **주의:** ADMIN 전용. 긴급 상황 외 반드시 복구 테스트(`/api/dr/test`)로 먼저 검증 후 실행.
+
+### 1.7 운영 절차
+
+**정기 DR 테스트 (권장: 분기 1회)**
+
+1. 대시보드에서 시나리오 목록 확인: `GET /api/dr/dashboard`
+2. 테스트 실행: `POST /api/dr/test` (test_type: RECOVERY)
+3. 결과 확인: `GET /api/dr/test/{id}`
+4. 백업 검증: `POST /api/dr/backup-verify`
+5. CSAP O-02 항목 자동 갱신 확인
+
+**공공기관 BCP 권장 기준:**
+
+| 등급 | RTO | RPO |
+|------|-----|-----|
+| 1등급 (중요) | 4시간 이내 | 1시간 이내 |
+| 2등급 (보통) | 8시간 이내 | 4시간 이내 |
+| 3등급 (낮음) | 24시간 이내 | 24시간 이내 |
+
+---
+
+## 2. 네트워크 장비 관리
+
+### 2.1 개요
+
+스위치·라우터·방화벽·L4 등 네트워크 장비를 SSH 기반으로 에이전트 없이 관리합니다.
+
+| 기능 | 설명 |
+|------|------|
+| 장비 인벤토리 | 기관별 네트워크 장비 목록 |
+| 설정 백업 | 벤더별 표준 명령어로 설정 자동 백업 |
+| 변경 감지 | 이전 백업과 diff 비교, 변경 시 알림 |
+| SSH 명령 실행 | 안전 명령만 허용 (위험 명령 차단) |
+| 토폴로지 조회 | 기관별 장비 타입 분류 |
+
+### 2.2 지원 장비
+
+| 장비 타입 | 벤더 | OS 타입 |
+|----------|------|---------|
+| SWITCH | CISCO | cisco_ios |
+| SWITCH | HUAWEI | huawei_vrp |
+| ROUTER | CISCO | cisco_ios |
+| FIREWALL | PIOLINK | linux |
+| FIREWALL | SECUI | linux |
+| LOAD_BALANCER | RADWARE | linux |
+| SWITCH | JUNIPER | junos |
+
+### 2.3 장비 등록
+
+```http
+POST /api/network/devices
+Authorization: Bearer {admin_token}
+Content-Type: application/json
+
+{
+ "device_name": "Core-Switch-01",
+ "device_type": "SWITCH",
+ "vendor": "CISCO",
+ "model": "Catalyst 9300",
+ "os_type": "cisco_ios",
+ "ip_addr": "10.0.1.1",
+ "ssh_user": "admin",
+ "ssh_password": "sw_password_2026",
+ "ssh_port": 22,
+ "location": "서울시청 IDC 2층 랙 A-03",
+ "inst_id": 1
+}
+```
+
+> **보안:** `ip_addr`, `ssh_user`, `ssh_password`는 AES-256-GCM 암호화 저장. API 응답에 미포함.
+
+### 2.4 설정 백업
+
+```http
+POST /api/network/devices/{id}/backup
+Authorization: Bearer {engineer_token}
+```
+
+**응답 예시:**
+```json
+{
+ "success": true,
+ "device_name": "Core-Switch-01",
+ "backup_id": 15,
+ "config_hash": "a3f2c8d1e4b7...",
+ "changed_lines": 0,
+ "backed_up_at": "2026-05-31T14:30:00"
+}
+```
+
+- `changed_lines > 0`: 이전 백업 대비 설정 변경 감지
+- 변경 10줄 이상: MEDIUM 알림 발송
+- 변경 50줄 이상: HIGH 알림 발송
+
+### 2.5 설정 변경 비교
+
+```http
+GET /api/network/devices/{id}/diff
+Authorization: Bearer {engineer_token}
+```
+
+최근 2개 백업 자동 비교. `old_id`, `new_id` 파라미터로 특정 버전 간 비교 가능.
+
+**응답 예시:**
+```json
+{
+ "changed": true,
+ "added_lines": 3,
+ "removed_lines": 1,
+ "diff": [
+ "--- 이전 설정",
+ "+++ 현재 설정",
+ "@@ -105,7 +105,10 @@",
+ "- switchport access vlan 10",
+ "+ switchport access vlan 20",
+ "+ switchport mode access"
+ ]
+}
+```
+
+### 2.6 SSH 명령 실행
+
+```http
+POST /api/network/devices/{id}/command
+Authorization: Bearer {engineer_token}
+Content-Type: application/json
+
+{
+ "command": "show interfaces status",
+ "timeout": 30
+}
+```
+
+**차단 명령어 (실행 불가):**
+- `write erase`, `factory-reset`, `reload`, `reboot`
+- `rm -rf`, `mkfs`, `fdisk`, `delete flash:`
+
+### 2.7 토폴로지 조회
+
+```http
+GET /api/network/topology?inst_id=1
+Authorization: Bearer {engineer_token}
+```
+
+### 2.8 운영 절차
+
+**정기 설정 백업 (권장: 주 1회)**
+
+```
+1. 기관별 장비 목록 확인: GET /api/network/devices?inst_id={기관ID}
+2. 장비별 설정 백업 실행: POST /api/network/devices/{id}/backup
+3. 변경 감지 시 diff 확인: GET /api/network/devices/{id}/diff
+4. 무단 변경 발견 시 → 변경관리 CAB 등록 + 감사 기록
+```
+
+---
+
+## 3. CSAP 공공기관 보안 자동 점검
+
+### 3.1 개요
+
+CSAP(클라우드보안인증제) + ISMS-P 기반의 공공기관 보안 체크리스트를 자동으로 점검합니다.
+
+| 구분 | 항목 수 | 자동 점검 | 수동 확인 |
+|------|---------|---------|---------|
+| 관리적 보안 (M) | 5개 | - | 5개 |
+| 기술적 보안 (T) | 12개 | 10개 | 2개 |
+| 운영 보안 (O) | 5개 | 4개 | 1개 |
+| 물리적 보안 (P) | 3개 | 1개 | 2개 |
+| **합계** | **25개** | **15개** | **10개** |
+
+> 실제 구현은 CSAP_ITEMS 확장으로 최대 100개 항목까지 지원.
+
+### 3.2 자동 점검 실행
+
+```http
+POST /api/compliance/csap/scan
+Authorization: Bearer {admin_token}
+Content-Type: application/json
+
+{
+ "inst_id": 1
+}
+```
+
+**응답 예시:**
+```json
+{
+ "scan_id": "CSAP-20260531-143022",
+ "inst_id": 1,
+ "total_items": 25,
+ "pass": 18,
+ "fail": 4,
+ "partial": 1,
+ "manual_required": 2,
+ "compliance_rate": 82.0,
+ "grade": "B",
+ "critical_findings": [
+ "T-03: SSH root 직접 로그인 차단",
+ "T-05: 보안 패치 최신화 (30일 이내)"
+ ]
+}
+```
+
+### 3.3 준수율 등급 기준
+
+| 준수율 | 등급 | 공공기관 의미 |
+|--------|------|-------------|
+| 90% 이상 | **A (우수)** | 보안감사 대응 양호 |
+| 70~89% | **B (보통)** | 개선 권고 |
+| 50~69% | **C (미흡)** | 개선 계획 즉시 수립 |
+| 50% 미만 | **D (부적합)** | 즉시 조치 필요 |
+
+### 3.4 점검 항목 조회
+
+```http
+GET /api/compliance/csap/items?category=기술적&auto_only=true
+Authorization: Bearer {token}
+```
+
+### 3.5 점검 결과 상세 조회
+
+```http
+GET /api/compliance/csap/results/{scan_id}
+Authorization: Bearer {token}
+```
+
+### 3.6 수동 항목 증적 업로드
+
+자동 점검 불가 항목(M-01 정보보호 정책 등)은 담당자가 수동으로 증적을 업로드합니다.
+
+```http
+POST /api/compliance/csap/evidence/{item_id}
+Authorization: Bearer {token}
+Content-Type: application/json
+
+{
+ "item_id": "M-01",
+ "inst_id": 1,
+ "finding": "정보보호 정책서 2026년 개정본 확인",
+ "evidence_note": "첨부파일: 정보보호정책_2026.pdf (SharePoint 저장)",
+ "status": "PASS"
+}
+```
+
+### 3.7 보고서 생성
+
+**HTML 보고서 (웹 열람·인쇄용):**
+```
+GET /api/compliance/csap/report/html?scan_id=CSAP-20260531-143022
+```
+
+**Excel 보고서 (공문 첨부용):**
+```
+GET /api/compliance/csap/report/excel?scan_id=CSAP-20260531-143022
+```
+
+Excel 파일명: `CSAP_CSAP-20260531-143022_20260531.xlsx`
+
+### 3.8 기관별 준수율 대시보드
+
+```http
+GET /api/compliance/csap/dashboard
+Authorization: Bearer {token}
+```
+
+**응답 예시:**
+```json
+{
+ "institutions": [
+ {"inst_id": 1, "compliance_rate": 82.0, "grade": "B", "scanned_at": "2026-05-31T14:30:22"},
+ {"inst_id": 2, "compliance_rate": 91.5, "grade": "A", "scanned_at": "2026-05-30T09:15:00"}
+ ]
+}
+```
+
+### 3.9 자동 점검 항목 상세
+
+| 항목ID | 항목명 | 자동 점검 방법 |
+|--------|--------|-------------|
+| T-03 | SSH root 로그인 차단 | SSH → sshd_config `PermitRootLogin no` 확인 |
+| T-11 | 취약점 정기 스캔 | tb_vuln_scan 90일 이내 이력 확인 |
+| O-01 | 로그 보존 6개월 | tb_audit_log 최오래된 레코드 날짜 확인 |
+| O-02 | 백업 무결성 검증 | tb_dr_test 90일 이내 PASS 이력 확인 |
+| O-03 | 변경 관리 이행 | tb_change_request 등록 건수 확인 |
+| P-02 | DR 테스트 이행 | tb_dr_test 1년 이내 PASS 이력 확인 |
+
+### 3.10 운영 절차
+
+**분기별 CSAP 점검 절차:**
+
+```
+1. CSAP 자동 점검 실행
+ POST /api/compliance/csap/scan {"inst_id": 기관ID}
+
+2. 결과 확인 및 FAIL 항목 조치
+ GET /api/compliance/csap/results/{scan_id}
+ → FAIL 항목별 개선 조치 시행
+
+3. 수동 항목 증적 업로드
+ POST /api/compliance/csap/evidence/{item_id}
+ → M-01 정책서, M-02 조직도, P-01 출입통제 기록 등
+
+4. 보고서 생성 및 배포
+ GET /api/compliance/csap/report/excel?scan_id=...
+ → 부서장 보고 / 보안감사 대비 보관
+
+5. 대시보드 모니터링
+ GET /api/compliance/csap/dashboard
+ → 기관별 준수율 추이 확인
+```
+
+---
+
+## 4. 통합 운영 시나리오
+
+### 시나리오 1: 장애 발생 → DR 실행 → CSAP 업데이트
+
+```
+[인시던트 발생]
+ → POST /api/incidents (인시던트 등록)
+ → POST /api/dr/failover/{scenario_id} (긴급 Failover, ADMIN)
+ → GET /api/dr/rto-rpo (RTO 실적 확인)
+ → POST /api/compliance/csap/scan (P-02 DR 테스트 항목 자동 갱신)
+```
+
+### 시나리오 2: 네트워크 변경 → 자동 감지 → 변경관리 연계
+
+```
+[설정 변경 의심]
+ → POST /api/network/devices/{id}/backup (최신 백업 실행)
+ → GET /api/network/devices/{id}/diff (변경 내역 확인)
+ → POST /api/change (변경관리 CAB 등록)
+ → POST /api/audit/record (감사 기록)
+```
+
+### 시나리오 3: 분기별 보안 감사 준비
+
+```
+[분기 점검 시작]
+ → POST /api/dr/test (DR 복구 테스트)
+ → POST /api/network/devices/{id}/backup (전 장비 설정 백업)
+ → POST /api/compliance/csap/scan (CSAP 전체 점검)
+ → POST /api/compliance/csap/evidence/* (수동 증적 업로드)
+ → GET /api/compliance/csap/report/excel (보고서 생성)
+```
+
+---
+
+## API 빠른 참조
+
+### DR 자동화 (`/api/dr`)
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| GET | `/api/dr/scenarios` | ENGINEER+ | 시나리오 목록 |
+| POST | `/api/dr/scenarios` | ADMIN | 시나리오 등록 |
+| POST | `/api/dr/test` | ENGINEER+ | 복구 테스트 실행 |
+| GET | `/api/dr/test/{id}` | ENGINEER+ | 테스트 결과 |
+| GET | `/api/dr/tests` | ENGINEER+ | 테스트 이력 |
+| POST | `/api/dr/backup-verify` | ENGINEER+ | 백업 무결성 검증 |
+| POST | `/api/dr/failover/{id}` | ADMIN | Failover 실행 |
+| GET | `/api/dr/rto-rpo` | ENGINEER+ | RTO/RPO 현황 |
+| GET | `/api/dr/dashboard` | ENGINEER+ | DR 전체 현황 |
+
+### 네트워크 장비 (`/api/network`)
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| GET | `/api/network/devices` | ENGINEER+ | 장비 목록 |
+| POST | `/api/network/devices` | ADMIN | 장비 등록 |
+| PUT | `/api/network/devices/{id}` | ADMIN | 장비 수정 |
+| DELETE | `/api/network/devices/{id}` | ADMIN | 장비 비활성화 |
+| POST | `/api/network/devices/{id}/backup` | ENGINEER+ | 설정 백업 |
+| GET | `/api/network/devices/{id}/backups` | ENGINEER+ | 백업 이력 |
+| GET | `/api/network/devices/{id}/diff` | ENGINEER+ | 설정 변경 비교 |
+| POST | `/api/network/devices/{id}/command` | ENGINEER+ | SSH 명령 실행 |
+| GET | `/api/network/topology` | ENGINEER+ | 토폴로지 조회 |
+
+### CSAP 점검 (`/api/compliance/csap`)
+
+| Method | Endpoint | 권한 | 설명 |
+|--------|----------|------|------|
+| POST | `/api/compliance/csap/scan` | ADMIN | 전체 자동 점검 |
+| GET | `/api/compliance/csap/items` | ALL | 점검 항목 목록 |
+| GET | `/api/compliance/csap/results` | ALL | 점검 결과 목록 |
+| GET | `/api/compliance/csap/results/{id}` | ALL | 배치 상세 결과 |
+| POST | `/api/compliance/csap/evidence/{id}` | ALL | 수동 증적 업로드 |
+| GET | `/api/compliance/csap/report/html` | ALL | HTML 보고서 |
+| GET | `/api/compliance/csap/report/excel` | ALL | Excel 보고서 |
+| GET | `/api/compliance/csap/dashboard` | ALL | 준수율 대시보드 |
+
+---
+
+*Copyright © 2026 GUARDiA All Rights Reserved.*