326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""
|
|
단위 테스트 — auto_remediation_runbook / policy_engine 라우터
|
|
|
|
커버리지:
|
|
- RemediationRunbook ORM 모델 기본 필드
|
|
- RemediationSession ORM 모델 기본 필드
|
|
- PolicyRule ORM 모델 기본 필드
|
|
- PolicyViolation ORM 모델 기본 필드
|
|
- _simulate_steps: 정상 단계 실행 결과 반환
|
|
- _simulate_steps: steps JSON 파싱 실패 처리
|
|
- _evaluate_rule: condition 없는 규칙 통과
|
|
- _evaluate_rule: condition JSON 파싱 실패 처리
|
|
- 시드 데이터 구조 검증 (런북 5개, 정책 5개)
|
|
- 정책 템플릿 목록 구조 검증
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
|
|
import json
|
|
import pytest
|
|
|
|
|
|
# ── ORM 모델 필드 테스트 ─────────────────────────────────────────────────────────
|
|
|
|
class TestRemediationRunbookModel:
|
|
"""RemediationRunbook ORM 모델 기본 필드 검증."""
|
|
|
|
def test_model_tablename(self):
|
|
from models import RemediationRunbook
|
|
assert RemediationRunbook.__tablename__ == "tb_remediation_runbook"
|
|
|
|
def test_model_columns_exist(self):
|
|
from models import RemediationRunbook
|
|
cols = {c.name for c in RemediationRunbook.__table__.columns}
|
|
assert "id" in cols
|
|
assert "name" in cols
|
|
assert "trigger_pattern" in cols
|
|
assert "steps" in cols
|
|
assert "auto_execute" in cols
|
|
assert "created_at" in cols
|
|
|
|
def test_auto_execute_default_false(self):
|
|
from models import RemediationRunbook
|
|
col = RemediationRunbook.__table__.columns["auto_execute"]
|
|
assert col.default.arg is False
|
|
|
|
def test_relationship_sessions_exists(self):
|
|
from models import RemediationRunbook
|
|
assert hasattr(RemediationRunbook, "sessions")
|
|
|
|
|
|
class TestRemediationSessionModel:
|
|
"""RemediationSession ORM 모델 기본 필드 검증."""
|
|
|
|
def test_model_tablename(self):
|
|
from models import RemediationSession
|
|
assert RemediationSession.__tablename__ == "tb_remediation_session"
|
|
|
|
def test_model_columns_exist(self):
|
|
from models import RemediationSession
|
|
cols = {c.name for c in RemediationSession.__table__.columns}
|
|
assert "runbook_id" in cols
|
|
assert "trigger_data" in cols
|
|
assert "step_results" in cols
|
|
assert "status" in cols
|
|
assert "success" in cols
|
|
|
|
def test_status_default_running(self):
|
|
from models import RemediationSession
|
|
col = RemediationSession.__table__.columns["status"]
|
|
assert col.default.arg == "running"
|
|
|
|
def test_relationship_runbook_exists(self):
|
|
from models import RemediationSession
|
|
assert hasattr(RemediationSession, "runbook")
|
|
|
|
|
|
class TestPolicyRuleModel:
|
|
"""PolicyRule ORM 모델 기본 필드 검증."""
|
|
|
|
def test_model_tablename(self):
|
|
from models import PolicyRule
|
|
assert PolicyRule.__tablename__ == "tb_policy_rule"
|
|
|
|
def test_model_columns_exist(self):
|
|
from models import PolicyRule
|
|
cols = {c.name for c in PolicyRule.__table__.columns}
|
|
assert "id" in cols
|
|
assert "name" in cols
|
|
assert "category" in cols
|
|
assert "condition" in cols
|
|
assert "severity" in cols
|
|
assert "auto_remediate" in cols
|
|
assert "active" in cols
|
|
|
|
def test_severity_default_medium(self):
|
|
from models import PolicyRule
|
|
col = PolicyRule.__table__.columns["severity"]
|
|
assert col.default.arg == "MEDIUM"
|
|
|
|
def test_active_default_true(self):
|
|
from models import PolicyRule
|
|
col = PolicyRule.__table__.columns["active"]
|
|
assert col.default.arg is True
|
|
|
|
def test_relationship_violations_exists(self):
|
|
from models import PolicyRule
|
|
assert hasattr(PolicyRule, "violations")
|
|
|
|
|
|
class TestPolicyViolationModel:
|
|
"""PolicyViolation ORM 모델 기본 필드 검증."""
|
|
|
|
def test_model_tablename(self):
|
|
from models import PolicyViolation
|
|
assert PolicyViolation.__tablename__ == "tb_policy_violation"
|
|
|
|
def test_model_columns_exist(self):
|
|
from models import PolicyViolation
|
|
cols = {c.name for c in PolicyViolation.__table__.columns}
|
|
assert "rule_id" in cols
|
|
assert "target" in cols
|
|
assert "detail" in cols
|
|
assert "status" in cols
|
|
assert "remediated_at" in cols
|
|
|
|
def test_status_default_open(self):
|
|
from models import PolicyViolation
|
|
col = PolicyViolation.__table__.columns["status"]
|
|
assert col.default.arg == "open"
|
|
|
|
def test_relationship_rule_exists(self):
|
|
from models import PolicyViolation
|
|
assert hasattr(PolicyViolation, "rule")
|
|
|
|
|
|
# ── auto_remediation_runbook 헬퍼 테스트 ────────────────────────────────────────
|
|
|
|
class TestSimulateSteps:
|
|
"""_simulate_steps 헬퍼 함수 단위 테스트."""
|
|
|
|
def _run(self, steps_json, trigger_data=None):
|
|
from routers.auto_remediation_runbook import _simulate_steps
|
|
return _simulate_steps(steps_json, trigger_data)
|
|
|
|
def test_none_steps_returns_empty_success(self):
|
|
results, success = self._run(None)
|
|
assert results == []
|
|
assert success is True
|
|
|
|
def test_valid_steps_returns_results(self):
|
|
steps = json.dumps([
|
|
{"order": 1, "name": "상태확인", "cmd": "systemctl status nginx"},
|
|
{"order": 2, "name": "재시작", "cmd": "systemctl restart nginx"},
|
|
])
|
|
results, success = self._run(steps)
|
|
assert len(results) == 2
|
|
assert success is True
|
|
assert results[0]["order"] == 1
|
|
assert results[0]["status"] == "success"
|
|
|
|
def test_placeholder_replacement(self):
|
|
steps = json.dumps([
|
|
{"order": 1, "name": "체크", "cmd": "systemctl status {service_name}"},
|
|
])
|
|
results, success = self._run(steps, {"service_name": "nginx"})
|
|
assert "nginx" in results[0]["cmd"]
|
|
assert "{service_name}" not in results[0]["cmd"]
|
|
|
|
def test_invalid_json_returns_error(self):
|
|
results, success = self._run("not-valid-json")
|
|
assert success is False
|
|
assert len(results) == 1
|
|
assert "error" in results[0]
|
|
|
|
def test_empty_steps_array(self):
|
|
results, success = self._run(json.dumps([]))
|
|
assert results == []
|
|
assert success is True
|
|
|
|
|
|
# ── policy_engine 헬퍼 테스트 ────────────────────────────────────────────────────
|
|
|
|
class TestEvaluateRule:
|
|
"""_evaluate_rule 헬퍼 함수 단위 테스트."""
|
|
|
|
def _make_rule(self, condition=None, name="테스트규칙"):
|
|
from models import PolicyRule
|
|
rule = PolicyRule.__new__(PolicyRule)
|
|
rule.name = name
|
|
rule.condition = condition
|
|
return rule
|
|
|
|
def test_no_condition_passes(self):
|
|
from routers.policy_engine import _evaluate_rule
|
|
rule = self._make_rule(condition=None)
|
|
passed, detail = _evaluate_rule(rule, "server-01")
|
|
assert passed is True
|
|
assert "통과" in detail
|
|
|
|
def test_invalid_json_condition_fails(self):
|
|
from routers.policy_engine import _evaluate_rule
|
|
rule = self._make_rule(condition="not-json")
|
|
passed, detail = _evaluate_rule(rule, "server-01")
|
|
assert passed is False
|
|
assert "파싱 실패" in detail
|
|
|
|
def test_valid_condition_passes(self):
|
|
from routers.policy_engine import _evaluate_rule
|
|
condition = json.dumps({
|
|
"type": "ssh_config_check",
|
|
"key": "PermitRootLogin",
|
|
"expected": "no",
|
|
"description": "SSH root 접속 금지 확인",
|
|
})
|
|
rule = self._make_rule(condition=condition)
|
|
passed, detail = _evaluate_rule(rule, "server-01")
|
|
# 시뮬레이션 모드: 항상 True
|
|
assert passed is True
|
|
assert "ssh_config_check" in detail
|
|
|
|
def test_target_different_servers(self):
|
|
"""다른 서버를 대상으로 평가해도 독립적으로 동작."""
|
|
from routers.policy_engine import _evaluate_rule
|
|
condition = json.dumps({"type": "patch_recency_check", "max_days": 30})
|
|
rule = self._make_rule(condition=condition)
|
|
passed1, _ = _evaluate_rule(rule, "web-server-01")
|
|
passed2, _ = _evaluate_rule(rule, "db-server-02")
|
|
assert passed1 is True
|
|
assert passed2 is True
|
|
|
|
|
|
# ── 시드 데이터 구조 검증 ────────────────────────────────────────────────────────
|
|
|
|
class TestSeedData:
|
|
"""기본 시드 데이터 구조 및 개수 검증."""
|
|
|
|
def test_default_runbooks_count(self):
|
|
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
|
|
assert len(_DEFAULT_RUNBOOKS) == 5
|
|
|
|
def test_runbook_required_fields(self):
|
|
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
|
|
for rb in _DEFAULT_RUNBOOKS:
|
|
assert "name" in rb
|
|
assert "steps" in rb
|
|
# steps는 유효한 JSON이어야 함
|
|
steps = json.loads(rb["steps"])
|
|
assert isinstance(steps, list)
|
|
assert len(steps) > 0
|
|
|
|
def test_runbook_steps_have_required_keys(self):
|
|
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
|
|
for rb in _DEFAULT_RUNBOOKS:
|
|
steps = json.loads(rb["steps"])
|
|
for step in steps:
|
|
assert "order" in step
|
|
assert "name" in step
|
|
assert "cmd" in step
|
|
|
|
def test_default_policies_count(self):
|
|
from routers.policy_engine import _DEFAULT_POLICIES
|
|
assert len(_DEFAULT_POLICIES) == 5
|
|
|
|
def test_policy_required_fields(self):
|
|
from routers.policy_engine import _DEFAULT_POLICIES
|
|
for p in _DEFAULT_POLICIES:
|
|
assert "name" in p
|
|
assert "category" in p
|
|
assert "severity" in p
|
|
assert "active" in p
|
|
# condition은 유효한 JSON이어야 함
|
|
condition = json.loads(p["condition"])
|
|
assert "type" in condition
|
|
assert "description" in condition
|
|
|
|
def test_policy_severities_valid(self):
|
|
from routers.policy_engine import _DEFAULT_POLICIES
|
|
valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW"}
|
|
for p in _DEFAULT_POLICIES:
|
|
assert p["severity"] in valid_severities
|
|
|
|
def test_policy_categories_present(self):
|
|
from routers.policy_engine import _DEFAULT_POLICIES
|
|
categories = {p["category"] for p in _DEFAULT_POLICIES}
|
|
# 시드 데이터에 security, access, patch, backup 카테고리가 모두 포함
|
|
assert "security" in categories
|
|
assert "access" in categories
|
|
assert "patch" in categories
|
|
assert "backup" in categories
|
|
|
|
|
|
# ── 정책 템플릿 구조 검증 ────────────────────────────────────────────────────────
|
|
|
|
class TestPolicyTemplates:
|
|
"""공공기관 표준 정책 템플릿 목록 구조 검증."""
|
|
|
|
def test_templates_count(self):
|
|
from routers.policy_engine import _POLICY_TEMPLATES
|
|
assert len(_POLICY_TEMPLATES) == 5
|
|
|
|
def test_template_required_fields(self):
|
|
from routers.policy_engine import _POLICY_TEMPLATES
|
|
for t in _POLICY_TEMPLATES:
|
|
assert "template_id" in t
|
|
assert "name" in t
|
|
assert "category" in t
|
|
assert "severity" in t
|
|
assert "description" in t
|
|
assert "reference" in t
|
|
assert "conditions" in t
|
|
assert isinstance(t["conditions"], list)
|
|
assert len(t["conditions"]) > 0
|
|
|
|
def test_template_ids_unique(self):
|
|
from routers.policy_engine import _POLICY_TEMPLATES
|
|
ids = [t["template_id"] for t in _POLICY_TEMPLATES]
|
|
assert len(ids) == len(set(ids))
|
|
|
|
def test_template_ids_format(self):
|
|
from routers.policy_engine import _POLICY_TEMPLATES
|
|
for t in _POLICY_TEMPLATES:
|
|
# T-XXX-NNN 형식
|
|
assert t["template_id"].startswith("T-")
|