""" 단위 테스트 — auto_remediation_runbook / policy_engine 라우터 커버리지: - RemediationRunbook ORM 모델 기본 필드 - RemediationSession ORM 모델 기본 필드 - PolicyRule ORM 모델 기본 필드 - PolicyViolation ORM 모델 기본 필드 - _simulate_steps: 정상 단계 실행 결과 반환 - _simulate_steps: steps JSON 파싱 실패 처리 - _evaluate_rule: condition 없는 규칙 통과 - _evaluate_rule: condition JSON 파싱 실패 처리 - 시드 데이터 구조 검증 (런북 5개, 정책 5개) - 정책 템플릿 목록 구조 검증 """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) import json import pytest # ── ORM 모델 필드 테스트 ───────────────────────────────────────────────────────── class TestRemediationRunbookModel: """RemediationRunbook ORM 모델 기본 필드 검증.""" def test_model_tablename(self): from models import RemediationRunbook assert RemediationRunbook.__tablename__ == "tb_remediation_runbook" def test_model_columns_exist(self): from models import RemediationRunbook cols = {c.name for c in RemediationRunbook.__table__.columns} assert "id" in cols assert "name" in cols assert "trigger_pattern" in cols assert "steps" in cols assert "auto_execute" in cols assert "created_at" in cols def test_auto_execute_default_false(self): from models import RemediationRunbook col = RemediationRunbook.__table__.columns["auto_execute"] assert col.default.arg is False def test_relationship_sessions_exists(self): from models import RemediationRunbook assert hasattr(RemediationRunbook, "sessions") class TestRemediationSessionModel: """RemediationSession ORM 모델 기본 필드 검증.""" def test_model_tablename(self): from models import RemediationSession assert RemediationSession.__tablename__ == "tb_remediation_session" def test_model_columns_exist(self): from models import RemediationSession cols = {c.name for c in RemediationSession.__table__.columns} assert "runbook_id" in cols assert "trigger_data" in cols assert "step_results" in cols assert "status" in cols assert "success" in cols def test_status_default_running(self): from models import RemediationSession col = RemediationSession.__table__.columns["status"] assert col.default.arg == "running" def test_relationship_runbook_exists(self): from models import RemediationSession assert hasattr(RemediationSession, "runbook") class TestPolicyRuleModel: """PolicyRule ORM 모델 기본 필드 검증.""" def test_model_tablename(self): from models import PolicyRule assert PolicyRule.__tablename__ == "tb_policy_rule" def test_model_columns_exist(self): from models import PolicyRule cols = {c.name for c in PolicyRule.__table__.columns} assert "id" in cols assert "name" in cols assert "category" in cols assert "condition" in cols assert "severity" in cols assert "auto_remediate" in cols assert "active" in cols def test_severity_default_medium(self): from models import PolicyRule col = PolicyRule.__table__.columns["severity"] assert col.default.arg == "MEDIUM" def test_active_default_true(self): from models import PolicyRule col = PolicyRule.__table__.columns["active"] assert col.default.arg is True def test_relationship_violations_exists(self): from models import PolicyRule assert hasattr(PolicyRule, "violations") class TestPolicyViolationModel: """PolicyViolation ORM 모델 기본 필드 검증.""" def test_model_tablename(self): from models import PolicyViolation assert PolicyViolation.__tablename__ == "tb_policy_violation" def test_model_columns_exist(self): from models import PolicyViolation cols = {c.name for c in PolicyViolation.__table__.columns} assert "rule_id" in cols assert "target" in cols assert "detail" in cols assert "status" in cols assert "remediated_at" in cols def test_status_default_open(self): from models import PolicyViolation col = PolicyViolation.__table__.columns["status"] assert col.default.arg == "open" def test_relationship_rule_exists(self): from models import PolicyViolation assert hasattr(PolicyViolation, "rule") # ── auto_remediation_runbook 헬퍼 테스트 ──────────────────────────────────────── class TestSimulateSteps: """_simulate_steps 헬퍼 함수 단위 테스트.""" def _run(self, steps_json, trigger_data=None): from routers.auto_remediation_runbook import _simulate_steps return _simulate_steps(steps_json, trigger_data) def test_none_steps_returns_empty_success(self): results, success = self._run(None) assert results == [] assert success is True def test_valid_steps_returns_results(self): steps = json.dumps([ {"order": 1, "name": "상태확인", "cmd": "systemctl status nginx"}, {"order": 2, "name": "재시작", "cmd": "systemctl restart nginx"}, ]) results, success = self._run(steps) assert len(results) == 2 assert success is True assert results[0]["order"] == 1 assert results[0]["status"] == "success" def test_placeholder_replacement(self): steps = json.dumps([ {"order": 1, "name": "체크", "cmd": "systemctl status {service_name}"}, ]) results, success = self._run(steps, {"service_name": "nginx"}) assert "nginx" in results[0]["cmd"] assert "{service_name}" not in results[0]["cmd"] def test_invalid_json_returns_error(self): results, success = self._run("not-valid-json") assert success is False assert len(results) == 1 assert "error" in results[0] def test_empty_steps_array(self): results, success = self._run(json.dumps([])) assert results == [] assert success is True # ── policy_engine 헬퍼 테스트 ──────────────────────────────────────────────────── class TestEvaluateRule: """_evaluate_rule 헬퍼 함수 단위 테스트.""" def _make_rule(self, condition=None, name="테스트규칙"): from models import PolicyRule rule = PolicyRule.__new__(PolicyRule) rule.name = name rule.condition = condition return rule def test_no_condition_passes(self): from routers.policy_engine import _evaluate_rule rule = self._make_rule(condition=None) passed, detail = _evaluate_rule(rule, "server-01") assert passed is True assert "통과" in detail def test_invalid_json_condition_fails(self): from routers.policy_engine import _evaluate_rule rule = self._make_rule(condition="not-json") passed, detail = _evaluate_rule(rule, "server-01") assert passed is False assert "파싱 실패" in detail def test_valid_condition_passes(self): from routers.policy_engine import _evaluate_rule condition = json.dumps({ "type": "ssh_config_check", "key": "PermitRootLogin", "expected": "no", "description": "SSH root 접속 금지 확인", }) rule = self._make_rule(condition=condition) passed, detail = _evaluate_rule(rule, "server-01") # 시뮬레이션 모드: 항상 True assert passed is True assert "ssh_config_check" in detail def test_target_different_servers(self): """다른 서버를 대상으로 평가해도 독립적으로 동작.""" from routers.policy_engine import _evaluate_rule condition = json.dumps({"type": "patch_recency_check", "max_days": 30}) rule = self._make_rule(condition=condition) passed1, _ = _evaluate_rule(rule, "web-server-01") passed2, _ = _evaluate_rule(rule, "db-server-02") assert passed1 is True assert passed2 is True # ── 시드 데이터 구조 검증 ──────────────────────────────────────────────────────── class TestSeedData: """기본 시드 데이터 구조 및 개수 검증.""" def test_default_runbooks_count(self): from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS assert len(_DEFAULT_RUNBOOKS) == 5 def test_runbook_required_fields(self): from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS for rb in _DEFAULT_RUNBOOKS: assert "name" in rb assert "steps" in rb # steps는 유효한 JSON이어야 함 steps = json.loads(rb["steps"]) assert isinstance(steps, list) assert len(steps) > 0 def test_runbook_steps_have_required_keys(self): from routers.auto_remediation_runbook import _DEFAULT_RUNBOOKS for rb in _DEFAULT_RUNBOOKS: steps = json.loads(rb["steps"]) for step in steps: assert "order" in step assert "name" in step assert "cmd" in step def test_default_policies_count(self): from routers.policy_engine import _DEFAULT_POLICIES assert len(_DEFAULT_POLICIES) == 5 def test_policy_required_fields(self): from routers.policy_engine import _DEFAULT_POLICIES for p in _DEFAULT_POLICIES: assert "name" in p assert "category" in p assert "severity" in p assert "active" in p # condition은 유효한 JSON이어야 함 condition = json.loads(p["condition"]) assert "type" in condition assert "description" in condition def test_policy_severities_valid(self): from routers.policy_engine import _DEFAULT_POLICIES valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW"} for p in _DEFAULT_POLICIES: assert p["severity"] in valid_severities def test_policy_categories_present(self): from routers.policy_engine import _DEFAULT_POLICIES categories = {p["category"] for p in _DEFAULT_POLICIES} # 시드 데이터에 security, access, patch, backup 카테고리가 모두 포함 assert "security" in categories assert "access" in categories assert "patch" in categories assert "backup" in categories # ── 정책 템플릿 구조 검증 ──────────────────────────────────────────────────────── class TestPolicyTemplates: """공공기관 표준 정책 템플릿 목록 구조 검증.""" def test_templates_count(self): from routers.policy_engine import _POLICY_TEMPLATES assert len(_POLICY_TEMPLATES) == 5 def test_template_required_fields(self): from routers.policy_engine import _POLICY_TEMPLATES for t in _POLICY_TEMPLATES: assert "template_id" in t assert "name" in t assert "category" in t assert "severity" in t assert "description" in t assert "reference" in t assert "conditions" in t assert isinstance(t["conditions"], list) assert len(t["conditions"]) > 0 def test_template_ids_unique(self): from routers.policy_engine import _POLICY_TEMPLATES ids = [t["template_id"] for t in _POLICY_TEMPLATES] assert len(ids) == len(set(ids)) def test_template_ids_format(self): from routers.policy_engine import _POLICY_TEMPLATES for t in _POLICY_TEMPLATES: # T-XXX-NNN 형식 assert t["template_id"].startswith("T-")