""" 배치 작업 관리 API. 엔드포인트: GET /api/batch/jobs — 배치 작업 목록 POST /api/batch/jobs — 배치 작업 등록 GET /api/batch/jobs/{id} — 배치 작업 상세 PATCH /api/batch/jobs/{id} — 배치 작업 수정 DELETE /api/batch/jobs/{id} — 배치 작업 삭제 (비활성화) POST /api/batch/jobs/{id}/run — 수동 즉시 실행 (SSH 비동기) POST /api/batch/jobs/{id}/enable — 활성화 POST /api/batch/jobs/{id}/disable — 비활성화 GET /api/batch/jobs/{id}/runs — 실행 이력 목록 GET /api/batch/runs/{run_id} — 실행 이력 상세 보안: 명령어 위험 패턴 차단, SSH 실행 결과에서 민감정보 제거. """ from __future__ import annotations import asyncio import logging from datetime import datetime from typing import List, Optional from uuid import uuid4 from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy import select, desc from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from database import get_db from models import ( Institution, Server, SRRequest, SRStatus, SRType, Priority, BatchJob, BatchJobCreate, BatchJobOut, BatchJobStatus, BatchRun, BatchRunOut, BatchRunResult, User, UserRole, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/batch", tags=["batch"]) # 위험 명령어 패턴 (SSH 명령어 안전성 검증) _DANGEROUS_PATTERNS = [ "rm -rf /", "rm -rf /*", "mkfs", "dd if=", "shutdown", ":(){:|:&};:", "chmod -R 777 /", "chown -R root /", ">(", "curl.*|.*sh", "wget.*|.*sh", ] def _validate_command(cmd: str) -> None: for pattern in _DANGEROUS_PATTERNS: if pattern.lower() in cmd.lower(): raise HTTPException( 422, f"위험한 명령어 패턴이 감지되었습니다: '{pattern}'. " f"보안 정책에 의해 차단되었습니다.", ) def _new_sr() -> str: return f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}" # ── 배치 작업 목록 ──────────────────────────────────────────────────────────── @router.get("/jobs", response_model=List[BatchJobOut]) async def list_jobs( inst_id: Optional[int] = Query(None), status: Optional[str] = Query(None), keyword: Optional[str] = Query(None), skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): q = select(BatchJob) if inst_id: q = q.where(BatchJob.inst_id == inst_id) if current_user.role == UserRole.CUSTOMER and current_user.inst_code: r_i = await db.execute( select(Institution).where(Institution.inst_code == current_user.inst_code) ) own = r_i.scalars().first() q = q.where(BatchJob.inst_id == own.id) if own else q.where(BatchJob.id == -1) if status: q = q.where(BatchJob.status == status) if keyword: q = q.where(BatchJob.job_name.contains(keyword)) q = q.order_by(BatchJob.job_name).offset(skip).limit(limit) result = await db.execute(q) return result.scalars().all() # ── 배치 작업 등록 ──────────────────────────────────────────────────────────── @router.post("/jobs", response_model=BatchJobOut, status_code=201) async def create_job( payload: BatchJobCreate, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): raise HTTPException(403, "권한이 없습니다.") _validate_command(payload.command) # cron 표현식 기본 검증 (5 필드) parts = payload.cron_expr.strip().split() if len(parts) != 5: raise HTTPException(422, "cron_expr은 5개의 필드여야 합니다 (예: '0 2 * * *').") # 중복 job_name 확인 dup = (await db.execute( select(BatchJob).where(BatchJob.job_name == payload.job_name) )).scalars().first() if dup: raise HTTPException(409, f"배치 작업 이름 '{payload.job_name}'이 이미 존재합니다.") job = BatchJob(**payload.model_dump()) db.add(job) await db.commit() await db.refresh(job) return job # ── 배치 작업 상세 ──────────────────────────────────────────────────────────── @router.get("/jobs/{job_id}", response_model=BatchJobOut) async def get_job( job_id: int, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") return job # ── 배치 작업 수정 ──────────────────────────────────────────────────────────── @router.patch("/jobs/{job_id}", response_model=BatchJobOut) async def update_job( job_id: int, payload: BatchJobCreate, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): raise HTTPException(403, "권한이 없습니다.") r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") if payload.command: _validate_command(payload.command) for k, v in payload.model_dump(exclude_unset=True).items(): setattr(job, k, v) job.updated_at = datetime.now() await db.commit() await db.refresh(job) return job # ── 활성화/비활성화 ─────────────────────────────────────────────────────────── @router.post("/jobs/{job_id}/enable", response_model=BatchJobOut) async def enable_job( job_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.role not in (UserRole.ADMIN, UserRole.PM): raise HTTPException(403, "ADMIN 또는 PM 권한이 필요합니다.") r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") job.status = BatchJobStatus.ACTIVE job.updated_at = datetime.now() await db.commit() await db.refresh(job) # APScheduler에 동적 등록 from core.scheduler import enable_batch_job enable_batch_job(job.id, job.cron_expr) return job @router.post("/jobs/{job_id}/disable", response_model=BatchJobOut) async def disable_job( job_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.role not in (UserRole.ADMIN, UserRole.PM): raise HTTPException(403, "ADMIN 또는 PM 권한이 필요합니다.") r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") job.status = BatchJobStatus.DISABLED job.updated_at = datetime.now() await db.commit() await db.refresh(job) # APScheduler에서 동적 제거 from core.scheduler import disable_batch_job disable_batch_job(job.id) return job @router.delete("/jobs/{job_id}", status_code=204) async def delete_job( job_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.role not in (UserRole.ADMIN, UserRole.PM): raise HTTPException(403, "ADMIN 또는 PM 권한이 필요합니다.") r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") job.status = BatchJobStatus.DISABLED # 소프트 삭제 await db.commit() # ── 수동 즉시 실행 ──────────────────────────────────────────────────────────── @router.post("/jobs/{job_id}/run", status_code=202) async def run_job_now( job_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 배치 작업 즉시 실행 (백그라운드 SSH). 202 Accepted 반환 후 비동기 실행. 실행 결과는 GET /api/batch/jobs/{id}/runs 에서 확인. """ if current_user.role not in (UserRole.ADMIN, UserRole.PM, UserRole.ENGINEER): raise HTTPException(403, "권한이 없습니다.") r = await db.execute(select(BatchJob).where(BatchJob.id == job_id)) job = r.scalars().first() if not job: raise HTTPException(404, "배치 작업을 찾을 수 없습니다.") if job.status != BatchJobStatus.ACTIVE: raise HTTPException(422, "비활성화된 배치 작업은 실행할 수 없습니다.") if not job.server_id: raise HTTPException(422, "배치 작업에 대상 서버가 지정되지 않았습니다.") # BatchRun 생성 (RUNNING 상태) run = BatchRun( job_id = job_id, result = BatchRunResult.RUNNING, started_at = datetime.now(), ) db.add(run) await db.commit() await db.refresh(run) run_id = run.id background_tasks.add_task( _execute_batch_run, job_id = job_id, run_id = run_id, triggered_by = current_user.username, ) return { "run_id": run_id, "job_id": job_id, "status": "RUNNING", "message": f"배치 실행 시작 (run_id={run_id}). 결과는 GET /api/batch/runs/{run_id} 에서 확인하세요.", } # ── 실행 이력 ───────────────────────────────────────────────────────────────── @router.get("/jobs/{job_id}/runs", response_model=List[BatchRunOut]) async def list_runs( job_id: int, skip: int = 0, limit: int = 50, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): result = await db.execute( select(BatchRun) .where(BatchRun.job_id == job_id) .order_by(desc(BatchRun.started_at)) .offset(skip).limit(limit) ) return result.scalars().all() @router.get("/runs/{run_id}", response_model=BatchRunOut) async def get_run( run_id: int, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): r = await db.execute(select(BatchRun).where(BatchRun.id == run_id)) run = r.scalars().first() if not run: raise HTTPException(404, "실행 이력을 찾을 수 없습니다.") return run # ── 배치 실행 로그 SSE 스트리밍 ──────────────────────────────────────────────── @router.get("/runs/{run_id}/stream") async def stream_run_log( run_id: int, db: AsyncSession = Depends(get_db), _u: User = Depends(get_current_user), ): """ 배치 실행 로그 실시간 스트리밍 (Server-Sent Events). 클라이언트에서 EventSource 사용: const es = new EventSource('/api/batch/runs/{run_id}/stream'); es.onmessage = (e) => { const d = JSON.parse(e.data); if (d.type === 'log') console.log(d.line); if (d.type === 'done') { es.close(); showResult(d); } if (d.type === 'timeout') es.close(); }; 이벤트 타입: log — {"type":"log", "run_id":N, "line":"..."} heartbeat — {"type":"heartbeat", "elapsed":N} done — {"type":"done", "run_id":N, "result":"...", "exit_code":N, ...} error — {"type":"error", "message":"..."} timeout — {"type":"timeout", "message":"..."} """ # 실행 이력 존재 여부 사전 확인 r = await db.execute(select(BatchRun).where(BatchRun.id == run_id)) run = r.scalars().first() if not run: raise HTTPException(404, "실행 이력을 찾을 수 없습니다.") from core.events import stream_batch_log return StreamingResponse( stream_batch_log(run_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # nginx SSE 버퍼 비활성화 "Access-Control-Allow-Origin": "*", }, ) # ── 백그라운드 실행 ──────────────────────────────────────────────────────────── async def _execute_batch_run(job_id: int, run_id: int, triggered_by: str) -> None: """ SSH로 배치 명령 실행 후 BatchRun 결과 업데이트. 실패 시 SRRequest 자동 생성 (alert_on_fail=True인 경우). """ from database import SessionLocal from models import BatchJob, BatchRun, Server try: async with SessionLocal() as db: job = (await db.execute( select(BatchJob).where(BatchJob.id == job_id) )).scalars().first() if not job: return srv = (await db.execute( select(Server).where(Server.id == job.server_id) )).scalars().first() if not srv: async with SessionLocal() as db: run = (await db.execute( select(BatchRun).where(BatchRun.id == run_id) )).scalars().first() if run: run.result = BatchRunResult.FAILED run.ended_at = datetime.now() run.error_msg = "대상 서버를 찾을 수 없습니다." await db.commit() return # SSH 실행 try: from core.ssh_exec import run_command_on_server stdout = await asyncio.wait_for( run_command_on_server(srv, job.command), timeout=job.timeout_sec, ) exit_code = 0 result = BatchRunResult.SUCCESS error_msg = None except asyncio.TimeoutError: stdout = "" exit_code = -1 result = BatchRunResult.TIMEOUT error_msg = f"타임아웃 ({job.timeout_sec}초)" except Exception as exc: stdout = "" exit_code = -1 result = BatchRunResult.FAILED error_msg = f"SSH 실행 오류: {str(exc)[:200]}" # stdout 마지막 100줄 저장 stdout_tail = "\n".join((stdout or "").splitlines()[-100:]) # 자동 SR 생성 sr_id = None if result in (BatchRunResult.FAILED, BatchRunResult.TIMEOUT) and job.alert_on_fail: sr_id = _new_sr() async with SessionLocal() as db: sr = SRRequest( sr_id = sr_id, inst_id = job.inst_id, sr_type = SRType.LOG, title = f"[배치 실패] {job.job_name}", description = ( f"배치 작업 실패\n" f"작업명: {job.job_name}\n" f"실행자: {triggered_by}\n" f"오류: {error_msg or ''}\n" f"출력(마지막 20줄):\n{chr(10).join((stdout or '').splitlines()[-20:])}" ), status = SRStatus.RECEIVED, priority = Priority.HIGH, requested_by = "batch-scheduler", target_server= srv.server_name if srv else "", ) db.add(sr) await db.commit() logger.warning( "배치 실패 SR 자동 생성: job=%s sr_id=%s result=%s", job.job_name, sr_id, result, ) # BatchRun 업데이트 async with SessionLocal() as db: run = (await db.execute( select(BatchRun).where(BatchRun.id == run_id) )).scalars().first() if run: run.ended_at = datetime.now() run.result = result run.exit_code = exit_code run.stdout_tail = stdout_tail run.error_msg = error_msg run.sr_id = sr_id await db.commit() # BatchJob 마지막 실행 정보 업데이트 job_obj = (await db.execute( select(BatchJob).where(BatchJob.id == job_id) )).scalars().first() if job_obj: job_obj.last_run_at = datetime.now() job_obj.last_result = result await db.commit() logger.info( "배치 실행 완료: job=%s run_id=%d result=%s", job.job_name, run_id, result, ) except Exception as exc: logger.error("배치 실행 중 예외 발생: job_id=%d run_id=%d err=%s", job_id, run_id, exc, exc_info=True) try: async with SessionLocal() as db: run = (await db.execute( select(BatchRun).where(BatchRun.id == run_id) )).scalars().first() if run: run.result = BatchRunResult.FAILED run.ended_at = datetime.now() run.error_msg = f"내부 오류: {str(exc)[:200]}" await db.commit() except Exception: pass