guardia-itsm/tests/unit/test_auto_remediation_policy.py
2026-06-04 08:13:41 +09:00

326 lines
12 KiB
Python

"""
단위 테스트 — auto_remediation_runbook / policy_engine 라우터
커버리지:
- RemediationRunbook ORM 모델 기본 필드
- RemediationSession ORM 모델 기본 필드
- PolicyRule ORM 모델 기본 필드
- PolicyViolation ORM 모델 기본 필드
- _simulate_steps: 정상 단계 실행 결과 반환
- _simulate_steps: steps JSON 파싱 실패 처리
- _evaluate_rule: condition 없는 규칙 통과
- _evaluate_rule: condition JSON 파싱 실패 처리
- 시드 데이터 구조 검증 (런북 5개, 정책 5개)
- 정책 템플릿 목록 구조 검증
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
import json
import pytest
# ── ORM 모델 필드 테스트 ─────────────────────────────────────────────────────────
class TestRemediationRunbookModel:
"""RemediationRunbook ORM 모델 기본 필드 검증."""
def test_model_tablename(self):
from models import RemediationRunbook
assert RemediationRunbook.__tablename__ == "tb_remediation_runbook"
def test_model_columns_exist(self):
from models import RemediationRunbook
cols = {c.name for c in RemediationRunbook.__table__.columns}
assert "id" in cols
assert "name" in cols
assert "trigger_pattern" in cols
assert "steps" in cols
assert "auto_execute" in cols
assert "created_at" in cols
def test_auto_execute_default_false(self):
from models import RemediationRunbook
col = RemediationRunbook.__table__.columns["auto_execute"]
assert col.default.arg is False
def test_relationship_sessions_exists(self):
from models import RemediationRunbook
assert hasattr(RemediationRunbook, "sessions")
class TestRemediationSessionModel:
"""RemediationSession ORM 모델 기본 필드 검증."""
def test_model_tablename(self):
from models import RemediationSession
assert RemediationSession.__tablename__ == "tb_remediation_session"
def test_model_columns_exist(self):
from models import RemediationSession
cols = {c.name for c in RemediationSession.__table__.columns}
assert "runbook_id" in cols
assert "trigger_data" in cols
assert "step_results" in cols
assert "status" in cols
assert "success" in cols
def test_status_default_running(self):
from models import RemediationSession
col = RemediationSession.__table__.columns["status"]
assert col.default.arg == "running"
def test_relationship_runbook_exists(self):
from models import RemediationSession
assert hasattr(RemediationSession, "runbook")
class TestPolicyRuleModel:
"""PolicyRule ORM 모델 기본 필드 검증."""
def test_model_tablename(self):
from models import PolicyRule
assert PolicyRule.__tablename__ == "tb_policy_rule"
def test_model_columns_exist(self):
from models import PolicyRule
cols = {c.name for c in PolicyRule.__table__.columns}
assert "id" in cols
assert "name" in cols
assert "category" in cols
assert "condition" in cols
assert "severity" in cols
assert "auto_remediate" in cols
assert "active" in cols
def test_severity_default_medium(self):
from models import PolicyRule
col = PolicyRule.__table__.columns["severity"]
assert col.default.arg == "MEDIUM"
def test_active_default_true(self):
from models import PolicyRule
col = PolicyRule.__table__.columns["active"]
assert col.default.arg is True
def test_relationship_violations_exists(self):
from models import PolicyRule
assert hasattr(PolicyRule, "violations")
class TestPolicyViolationModel:
"""PolicyViolation ORM 모델 기본 필드 검증."""
def test_model_tablename(self):
from models import PolicyViolation
assert PolicyViolation.__tablename__ == "tb_policy_violation"
def test_model_columns_exist(self):
from models import PolicyViolation
cols = {c.name for c in PolicyViolation.__table__.columns}
assert "rule_id" in cols
assert "target" in cols
assert "detail" in cols
assert "status" in cols
assert "remediated_at" in cols
def test_status_default_open(self):
from models import PolicyViolation
col = PolicyViolation.__table__.columns["status"]
assert col.default.arg == "open"
def test_relationship_rule_exists(self):
from models import PolicyViolation
assert hasattr(PolicyViolation, "rule")
# ── auto_remediation_runbook 헬퍼 테스트 ────────────────────────────────────────
class TestSimulateSteps:
"""_simulate_steps 헬퍼 함수 단위 테스트."""
def _run(self, steps_json, trigger_data=None):
from routers.auto_remediation_runbook import _simulate_steps
return _simulate_steps(steps_json, trigger_data)
def test_none_steps_returns_empty_success(self):
results, success = self._run(None)
assert results == []
assert success is True
def test_valid_steps_returns_results(self):
steps = json.dumps([
{"order": 1, "name": "상태확인", "cmd": "systemctl status nginx"},
{"order": 2, "name": "재시작", "cmd": "systemctl restart nginx"},
])
results, success = self._run(steps)
assert len(results) == 2
assert success is True
assert results[0]["order"] == 1
assert results[0]["status"] == "success"
def test_placeholder_replacement(self):
steps = json.dumps([
{"order": 1, "name": "체크", "cmd": "systemctl status {service_name}"},
])
results, success = self._run(steps, {"service_name": "nginx"})
assert "nginx" in results[0]["cmd"]
assert "{service_name}" not in results[0]["cmd"]
def test_invalid_json_returns_error(self):
results, success = self._run("not-valid-json")
assert success is False
assert len(results) == 1
assert "error" in results[0]
def test_empty_steps_array(self):
results, success = self._run(json.dumps([]))
assert results == []
assert success is True
# ── policy_engine 헬퍼 테스트 ────────────────────────────────────────────────────
class TestEvaluateRule:
"""_evaluate_rule 헬퍼 함수 단위 테스트."""
def _make_rule(self, condition=None, name="테스트규칙"):
from models import PolicyRule
rule = PolicyRule.__new__(PolicyRule)
rule.name = name
rule.condition = condition
return rule
def test_no_condition_passes(self):
from routers.policy_engine import _evaluate_rule
rule = self._make_rule(condition=None)
passed, detail = _evaluate_rule(rule, "server-01")
assert passed is True
assert "통과" in detail
def test_invalid_json_condition_fails(self):
from routers.policy_engine import _evaluate_rule
rule = self._make_rule(condition="not-json")
passed, detail = _evaluate_rule(rule, "server-01")
assert passed is False
assert "파싱 실패" in detail
def test_valid_condition_passes(self):
from routers.policy_engine import _evaluate_rule
condition = json.dumps({
"type": "ssh_config_check",
"key": "PermitRootLogin",
"expected": "no",
"description": "SSH root 접속 금지 확인",
})
rule = self._make_rule(condition=condition)
passed, detail = _evaluate_rule(rule, "server-01")
# 시뮬레이션 모드: 항상 True
assert passed is True
assert "ssh_config_check" in detail
def test_target_different_servers(self):
"""다른 서버를 대상으로 평가해도 독립적으로 동작."""
from routers.policy_engine import _evaluate_rule
condition = json.dumps({"type": "patch_recency_check", "max_days": 30})
rule = self._make_rule(condition=condition)
passed1, _ = _evaluate_rule(rule, "web-server-01")
passed2, _ = _evaluate_rule(rule, "db-server-02")
assert passed1 is True
assert passed2 is True
# ── 시드 데이터 구조 검증 ────────────────────────────────────────────────────────
class TestSeedData:
"""기본 시드 데이터 구조 및 개수 검증."""
def test_default_runbooks_count(self):
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
assert len(_DEFAULT_RUNBOOKS) == 5
def test_runbook_required_fields(self):
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
for rb in _DEFAULT_RUNBOOKS:
assert "name" in rb
assert "steps" in rb
# steps는 유효한 JSON이어야 함
steps = json.loads(rb["steps"])
assert isinstance(steps, list)
assert len(steps) > 0
def test_runbook_steps_have_required_keys(self):
from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS
for rb in _DEFAULT_RUNBOOKS:
steps = json.loads(rb["steps"])
for step in steps:
assert "order" in step
assert "name" in step
assert "cmd" in step
def test_default_policies_count(self):
from routers.policy_engine import _DEFAULT_POLICIES
assert len(_DEFAULT_POLICIES) == 5
def test_policy_required_fields(self):
from routers.policy_engine import _DEFAULT_POLICIES
for p in _DEFAULT_POLICIES:
assert "name" in p
assert "category" in p
assert "severity" in p
assert "active" in p
# condition은 유효한 JSON이어야 함
condition = json.loads(p["condition"])
assert "type" in condition
assert "description" in condition
def test_policy_severities_valid(self):
from routers.policy_engine import _DEFAULT_POLICIES
valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW"}
for p in _DEFAULT_POLICIES:
assert p["severity"] in valid_severities
def test_policy_categories_present(self):
from routers.policy_engine import _DEFAULT_POLICIES
categories = {p["category"] for p in _DEFAULT_POLICIES}
# 시드 데이터에 security, access, patch, backup 카테고리가 모두 포함
assert "security" in categories
assert "access" in categories
assert "patch" in categories
assert "backup" in categories
# ── 정책 템플릿 구조 검증 ────────────────────────────────────────────────────────
class TestPolicyTemplates:
"""공공기관 표준 정책 템플릿 목록 구조 검증."""
def test_templates_count(self):
from routers.policy_engine import _POLICY_TEMPLATES
assert len(_POLICY_TEMPLATES) == 5
def test_template_required_fields(self):
from routers.policy_engine import _POLICY_TEMPLATES
for t in _POLICY_TEMPLATES:
assert "template_id" in t
assert "name" in t
assert "category" in t
assert "severity" in t
assert "description" in t
assert "reference" in t
assert "conditions" in t
assert isinstance(t["conditions"], list)
assert len(t["conditions"]) > 0
def test_template_ids_unique(self):
from routers.policy_engine import _POLICY_TEMPLATES
ids = [t["template_id"] for t in _POLICY_TEMPLATES]
assert len(ids) == len(set(ids))
def test_template_ids_format(self):
from routers.policy_engine import _POLICY_TEMPLATES
for t in _POLICY_TEMPLATES:
# T-XXX-NNN 형식
assert t["template_id"].startswith("T-")