manual-deploy 2026-06-06 18:13

This commit is contained in:
DESKTOP-TKLFCPR\ython 2026-06-06 18:13:48 +09:00
parent 70843489f3
commit 20f6e23c4a
10 changed files with 1621 additions and 2 deletions

13
main.py
View File

@ -64,6 +64,11 @@ from routers import (
conversational_ops,
ux_analytics,
sr_auto_review,
alert_rules,
search as search_router,
sr_chat,
inventory,
system as system_router,
)
@ -375,6 +380,7 @@ from routers import (
app.include_router(autodiscovery.router) # CMDB SSH 자동 발견
app.include_router(snmp_discovery.router) # SNMP 네트워크 장비 발견
app.include_router(dependency_map.router) # 서비스 의존성 자동 매핑
app.include_router(inventory.router) # #62 부품 재고 (/api/inventory/parts — config_inventory보다 먼저 등록)
app.include_router(config_inventory.router) # 서버 구성 인벤토리 자동 수집
app.include_router(nlquery.router) # Text-to-SQL 자연어 쿼리
app.include_router(op_assistant.router) # 대화형 운영 어시스턴트
@ -517,6 +523,13 @@ app.include_router(data_governance.router) # 데이터 거버넌스 (PII
app.include_router(harness_builder.router) # 하네스 빌더 (노코드 에이전트 생성·실행·스킬)
app.include_router(tmux_sessions.router) # tmux 세션 관리 (영속터미널·공유·명령전송)
# ── 모바일 100기능 백엔드 API ────────────────────────────────────────────────
app.include_router(alert_rules.router) # #45 알림 규칙 CRUD
app.include_router(search_router.router) # #50 통합 검색
app.include_router(sr_chat.router) # #98 SR 채팅 (REST + WebSocket)
app.include_router(system_router.router) # #77 시스템 정보/릴리즈 노트
app.include_router(approvals.changes_router) # #68 변경 달력 (/api/changes)
# ── 개방망 보안 헤더 미들웨어 ────────────────────────────────────────────────
@app.middleware("http")

120
models.py
View File

@ -7153,3 +7153,123 @@ class TmuxCommand(Base):
command = Column(Text)
sent_by = Column(String(100))
created_at = Column(DateTime, default=func.now())
# ════════════════════════════════════════════════════════════════════════════════
# ── 모바일 100기능 백엔드 모델 (Mobile API) ─────────────────────────────────────
# ════════════════════════════════════════════════════════════════════════════════
class AlertRule(Base):
"""알림 규칙 — 서버/서비스/SR 지표 임계치 기반 노코드 알림 정의 (#45)."""
__tablename__ = "tb_alert_rule"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
target_type = Column(String(20), nullable=False) # server | service | sr
target_id = Column(String(100))
metric = Column(String(30), nullable=False) # cpu | memory | disk | sla
threshold = Column(Float, nullable=False)
operator = Column(String(5), default=">") # > | < | =
channel = Column(String(20), default="inapp") # push | inapp | sms
enabled = Column(Boolean, default=True)
created_by = Column(String(100))
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
class InventoryPart(Base):
"""부품 재고 — 현장 서비스 교체 부품 관리 (#62)."""
__tablename__ = "tb_inventory_part"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
name = Column(String(200), nullable=False)
model = Column(String(100))
quantity = Column(Integer, default=0)
min_quantity = Column(Integer, default=1)
location = Column(String(200))
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
class SRChatMessage(Base):
"""SR 채팅 메시지 — SR별 실시간 협업 대화 (#98)."""
__tablename__ = "tb_sr_chat_message"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String(30), nullable=False, index=True) # SRRequest.sr_id
sender_id = Column(String(100), nullable=False)
content = Column(Text, nullable=False)
msg_type = Column(String(20), default="text") # text | image | sr_update
read_by = Column(Text) # JSON list of usernames who read
created_at = Column(DateTime, default=func.now(), index=True)
class SRSubscription(Base):
"""SR 구독/팔로우 — 사용자가 SR 변경 알림을 받도록 구독 (#14)."""
__tablename__ = "tb_sr_subscription"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String(30), nullable=False, index=True) # SRRequest.sr_id
username = Column(String(100), nullable=False, index=True)
created_at = Column(DateTime, default=func.now())
class SRRating(Base):
"""SR 완료 만족도 평가 (#53)."""
__tablename__ = "tb_sr_rating"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String(30), nullable=False, index=True)
rater = Column(String(100), nullable=False)
score = Column(Integer, nullable=False) # 1~5
comment = Column(Text)
created_at = Column(DateTime, default=func.now())
class SRSignature(Base):
"""현장 전자서명 — 작업 완료 현장 확인 서명 (#70)."""
__tablename__ = "tb_sr_signature"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String(30), nullable=False, index=True)
signed_by = Column(String(100), nullable=False)
signature_b64 = Column(Text, nullable=False) # base64 PNG 서명 데이터
created_at = Column(DateTime, default=func.now())
class SRCheckin(Base):
"""현장 체크인 — 엔지니어 현장 도착 GPS 기록 (#93)."""
__tablename__ = "tb_sr_checkin"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String(30), nullable=False, index=True)
username = Column(String(100), nullable=False)
lat = Column(Float)
lng = Column(Float)
created_at = Column(DateTime, default=func.now())
class UserDevice(Base):
"""등록 디바이스 — 모바일 앱 디바이스/푸시 토큰 관리 (#33)."""
__tablename__ = "tb_user_device"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(100), nullable=False, index=True)
device_name = Column(String(150))
device_type = Column(String(30)) # android | ios | web
push_token = Column(String(300))
last_seen_at = Column(DateTime, default=func.now())
created_at = Column(DateTime, default=func.now())
class LoginEvent(Base):
"""보안 이벤트 로그 — 로그인 성공/실패/디바이스 등록 등 (#34)."""
__tablename__ = "tb_login_event"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(100), index=True)
event_type = Column(String(40), nullable=False) # LOGIN_SUCCESS | LOGIN_FAIL | DEVICE_ADDED | ...
detail = Column(Text)
ip_addr_hash= Column(String(64)) # SHA-256, 원본 미저장
created_at = Column(DateTime, default=func.now(), index=True)

198
routers/alert_rules.py Normal file
View File

@ -0,0 +1,198 @@
"""
알림 규칙 CRUD API (모바일 기능 #45).
엔드포인트:
GET /api/alert-rules/ 알림 규칙 목록 (tenant 필터)
POST /api/alert-rules/ 알림 규칙 생성
PUT /api/alert-rules/{id} 알림 규칙 수정
DELETE /api/alert-rules/{id} 알림 규칙 삭제
PATCH /api/alert-rules/{id}/toggle 활성/비활성 토글
AlertRule: target_type(server/service/sr), metric(cpu/memory/disk/sla),
threshold, operator(>/</=), channel(push/inapp/sms), enabled
"""
from __future__ import annotations
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, ConfigDict, field_validator
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import AlertRule, User
router = APIRouter(prefix="/api/alert-rules", tags=["Alert Rules"])
_VALID_TARGET = {"server", "service", "sr"}
_VALID_METRIC = {"cpu", "memory", "disk", "sla"}
_VALID_OPERATOR = {">", "<", "="}
_VALID_CHANNEL = {"push", "inapp", "sms"}
def _tenant_of(user: User) -> str:
"""사용자의 테넌트 식별자 — inst_code 우선, 없으면 username 단위 격리."""
return user.inst_code or f"user:{user.username}"
class AlertRuleCreate(BaseModel):
target_type: str
target_id: Optional[str] = None
metric: str
threshold: float
operator: str = ">"
channel: str = "inapp"
enabled: bool = True
@field_validator("target_type")
@classmethod
def _v_target(cls, v: str) -> str:
if v not in _VALID_TARGET:
raise ValueError(f"target_type은 {_VALID_TARGET} 중 하나여야 합니다.")
return v
@field_validator("metric")
@classmethod
def _v_metric(cls, v: str) -> str:
if v not in _VALID_METRIC:
raise ValueError(f"metric은 {_VALID_METRIC} 중 하나여야 합니다.")
return v
@field_validator("operator")
@classmethod
def _v_op(cls, v: str) -> str:
if v not in _VALID_OPERATOR:
raise ValueError(f"operator는 {_VALID_OPERATOR} 중 하나여야 합니다.")
return v
@field_validator("channel")
@classmethod
def _v_ch(cls, v: str) -> str:
if v not in _VALID_CHANNEL:
raise ValueError(f"channel은 {_VALID_CHANNEL} 중 하나여야 합니다.")
return v
class AlertRuleUpdate(BaseModel):
target_type: Optional[str] = None
target_id: Optional[str] = None
metric: Optional[str] = None
threshold: Optional[float] = None
operator: Optional[str] = None
channel: Optional[str] = None
enabled: Optional[bool] = None
class AlertRuleOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
target_type: str
target_id: Optional[str]
metric: str
threshold: float
operator: str
channel: str
enabled: bool
created_by: Optional[str]
created_at: Optional[datetime]
@router.get("/", response_model=List[AlertRuleOut])
async def list_alert_rules(
enabled: Optional[bool] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""내 테넌트의 알림 규칙 목록."""
q = select(AlertRule).where(AlertRule.tenant_id == _tenant_of(current_user))
if enabled is not None:
q = q.where(AlertRule.enabled == enabled)
q = q.order_by(AlertRule.created_at.desc())
rows = (await db.execute(q)).scalars().all()
return rows
@router.post("/", response_model=AlertRuleOut, status_code=201)
async def create_alert_rule(
payload: AlertRuleCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rule = AlertRule(
tenant_id=_tenant_of(current_user),
target_type=payload.target_type,
target_id=payload.target_id,
metric=payload.metric,
threshold=payload.threshold,
operator=payload.operator,
channel=payload.channel,
enabled=payload.enabled,
created_by=current_user.username,
)
db.add(rule)
await db.commit()
await db.refresh(rule)
return rule
async def _get_owned_rule(rule_id: int, db: AsyncSession, user: User) -> AlertRule:
rule = await db.get(AlertRule, rule_id)
if not rule or rule.tenant_id != _tenant_of(user):
raise HTTPException(404, "알림 규칙을 찾을 수 없습니다.")
return rule
@router.put("/{rule_id}", response_model=AlertRuleOut)
async def update_alert_rule(
rule_id: int,
payload: AlertRuleUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rule = await _get_owned_rule(rule_id, db, current_user)
data = payload.model_dump(exclude_unset=True)
# 유효성 검증
if "target_type" in data and data["target_type"] not in _VALID_TARGET:
raise HTTPException(422, f"target_type은 {_VALID_TARGET} 중 하나여야 합니다.")
if "metric" in data and data["metric"] not in _VALID_METRIC:
raise HTTPException(422, f"metric은 {_VALID_METRIC} 중 하나여야 합니다.")
if "operator" in data and data["operator"] not in _VALID_OPERATOR:
raise HTTPException(422, f"operator는 {_VALID_OPERATOR} 중 하나여야 합니다.")
if "channel" in data and data["channel"] not in _VALID_CHANNEL:
raise HTTPException(422, f"channel은 {_VALID_CHANNEL} 중 하나여야 합니다.")
for k, v in data.items():
setattr(rule, k, v)
rule.updated_at = datetime.now()
await db.commit()
await db.refresh(rule)
return rule
@router.delete("/{rule_id}", status_code=204)
async def delete_alert_rule(
rule_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
rule = await _get_owned_rule(rule_id, db, current_user)
await db.delete(rule)
await db.commit()
@router.patch("/{rule_id}/toggle", response_model=AlertRuleOut)
async def toggle_alert_rule(
rule_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""알림 규칙 활성/비활성 토글."""
rule = await _get_owned_rule(rule_id, db, current_user)
rule.enabled = not rule.enabled
rule.updated_at = datetime.now()
await db.commit()
await db.refresh(rule)
return rule

View File

@ -34,6 +34,66 @@ async def _write_audit(db: AsyncSession, sr_id: str, actor: str, action: str, de
))
class DelegateByUsernameRequest(BaseModel):
delegate_to: str # 대리 결재자 username
from_date: str # ISO date
to_date: str # ISO date
reason: Optional[str] = None
@router.post("/delegate")
async def delegate_global(
body: DelegateByUsernameRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""기간 기반 대리 결재 위임 — 현재 사용자의 대기 결재를 일괄 위임 (#65).
주의: 라우트는 POST /{sr_id} 보다 먼저 정의되어야 'delegate'
sr_id 경로 변수로 잘못 매칭되지 않는다.
"""
try:
from_dt = datetime.fromisoformat(body.from_date)
to_dt = datetime.fromisoformat(body.to_date)
except ValueError:
raise HTTPException(422, "from_date / to_date는 ISO 날짜 형식이어야 합니다.")
if to_dt < from_dt:
raise HTTPException(422, "to_date는 from_date 이후여야 합니다.")
delegate_user = (await db.execute(
select(User).where(User.username == body.delegate_to)
)).scalars().first()
if not delegate_user:
raise HTTPException(404, f"대리 결재자 '{body.delegate_to}'를 찾을 수 없습니다.")
pending = (await db.execute(
select(ApprovalFlow).where(
ApprovalFlow.approver == current_user.username,
ApprovalFlow.result == ApprovalResult.PENDING,
)
)).scalars().all()
delegated = 0
for apv in pending:
apv.delegate_to = delegate_user.id
apv.delegate_until = to_dt
delegated += 1
if pending:
await _write_audit(
db, pending[0].sr_id, current_user.username, "APPROVAL_DELEGATED_BULK",
f"대리 결재 → {delegate_user.username} ({body.from_date}~{body.to_date}) "
f"| {delegated}건 | 사유: {body.reason or ''}"
)
await db.commit()
return {
"delegate_to": delegate_user.username,
"from_date": body.from_date,
"to_date": body.to_date,
"delegated_count": delegated,
"message": f"{delegated}건의 결재가 위임되었습니다.",
}
@router.get("/{sr_id}", response_model=List[ApprovalOut])
async def list_approvals(sr_id: str, db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user)):
@ -245,3 +305,99 @@ async def extend_deadline(
"new_deadline": body.new_deadline,
"message": "승인 마감이 연장되었습니다.",
}
# ════════════════════════════════════════════════════════════════════════════════
# ── 모바일 100기능: 다단계 승인 현황 ────────────────────────────────────────────
# ════════════════════════════════════════════════════════════════════════════════
@router.get("/{approval_id}/stages")
async def approval_stages(
approval_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""다단계 승인 현황 — 동일 SR의 모든 승인 단계를 순서대로 반환 (#67)."""
apv = await db.get(ApprovalFlow, approval_id)
if not apv:
raise HTTPException(404, "승인 레코드를 찾을 수 없습니다.")
rows = (await db.execute(
select(ApprovalFlow).where(ApprovalFlow.sr_id == apv.sr_id)
.order_by(ApprovalFlow.created_at, ApprovalFlow.id)
)).scalars().all()
stages = []
for level, r in enumerate(rows, start=1):
stages.append({
"level": level,
"approval_id": r.id,
"approver": r.approver,
"status": r.result,
"approved_at": r.decided_at.isoformat() if r.decided_at else None,
"delegate_to": r.delegate_to,
"signed": bool(r.signature),
})
return {"sr_id": apv.sr_id, "stages": stages, "total_stages": len(stages)}
# ════════════════════════════════════════════════════════════════════════════════
# ── 변경 달력 (#68) — /api/changes/calendar ─────────────────────────────────────
# ════════════════════════════════════════════════════════════════════════════════
changes_router = APIRouter(prefix="/api/changes", tags=["changes"])
@changes_router.get("/calendar")
async def changes_calendar(
month: str, # YYYY-MM
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""변경 달력 — 해당 월 날짜별 변경(SR) 목록 (#68)."""
try:
year, mon = month.split("-")
year_i, mon_i = int(year), int(mon)
if not (1 <= mon_i <= 12):
raise ValueError()
except (ValueError, AttributeError):
raise HTTPException(422, "month는 YYYY-MM 형식이어야 합니다.")
from datetime import date as _date
start = _date(year_i, mon_i, 1)
end = _date(year_i + (1 if mon_i == 12 else 0),
1 if mon_i == 12 else mon_i + 1, 1)
start_dt = datetime(start.year, start.month, start.day)
end_dt = datetime(end.year, end.month, end.day)
# CHANGE / DEPLOY 유형 SR을 변경으로 취급, created_at 기준 그룹핑
from models import SRType
q = select(SRRequest).where(
SRRequest.created_at >= start_dt,
SRRequest.created_at < end_dt,
)
# CUSTOMER 기관 필터
from models import UserRole
if current_user.role == UserRole.CUSTOMER and current_user.inst_code:
from models import Institution as _Inst
inst = (await db.execute(
select(_Inst).where(_Inst.inst_code == current_user.inst_code)
)).scalars().first()
q = q.where(SRRequest.inst_id == (inst.id if inst else -1))
srs = (await db.execute(q.order_by(SRRequest.created_at))).scalars().all()
by_date: dict = {}
for s in srs:
if not s.created_at:
continue
key = s.created_at.date().isoformat()
by_date.setdefault(key, []).append({
"sr_id": s.sr_id,
"title": s.title,
"sr_type": s.sr_type,
"status": s.status,
"priority": s.priority,
})
return {"month": month, "calendar": by_date, "total": len(srs)}

View File

@ -10,7 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import (create_access_token, get_current_user, hash_password,
verify_password, MAX_FAILED_ATTEMPTS, LOCKOUT_MINUTES)
from database import get_db
from models import User
from models import User, UserDevice, LoginEvent, Institution
router = APIRouter(prefix="/api/auth", tags=["auth"])
@ -453,6 +453,158 @@ async def admin_user_lock_status(
}
# ════════════════════════════════════════════════════════════════════════════════
# ── 모바일 100기능: 디바이스 / 보안 이벤트 / 네트워크 상태 / 기관 전환 ───────────
# ════════════════════════════════════════════════════════════════════════════════
class DeviceRegister(BaseModel):
device_name: Optional[str] = None
device_type: Optional[str] = None # android | ios | web
push_token: Optional[str] = None
class SwitchTenantRequest(BaseModel):
tenant_id: str # 전환 대상 inst_code
@router.get("/devices")
async def list_devices(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""등록 디바이스 목록 (#33)."""
rows = (await db.execute(
select(UserDevice).where(UserDevice.username == current_user.username)
.order_by(UserDevice.last_seen_at.desc())
)).scalars().all()
return [
{
"id": d.id,
"device_name": d.device_name,
"device_type": d.device_type,
"last_seen_at": d.last_seen_at.isoformat() if d.last_seen_at else None,
"created_at": d.created_at.isoformat() if d.created_at else None,
}
for d in rows
]
@router.post("/devices", status_code=201)
async def register_device(
body: DeviceRegister,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""디바이스 등록/갱신 (#33)."""
dev = UserDevice(
username=current_user.username,
device_name=body.device_name,
device_type=body.device_type,
push_token=body.push_token,
last_seen_at=datetime.now(),
)
db.add(dev)
db.add(LoginEvent(username=current_user.username, event_type="DEVICE_ADDED",
detail=f"{body.device_type or '?'} / {body.device_name or '?'}"))
await db.commit()
await db.refresh(dev)
return {"id": dev.id, "message": "디바이스가 등록되었습니다."}
@router.delete("/devices/{device_id}", status_code=204)
async def unregister_device(
device_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""디바이스 등록 해제 (#33)."""
dev = await db.get(UserDevice, device_id)
if not dev or dev.username != current_user.username:
raise HTTPException(404, "디바이스를 찾을 수 없습니다.")
await db.delete(dev)
db.add(LoginEvent(username=current_user.username, event_type="DEVICE_REMOVED",
detail=f"device_id={device_id}"))
await db.commit()
@router.get("/events")
async def list_security_events(
limit: int = 50,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""보안 이벤트 로그 — 로그인 이력/실패/디바이스 변경 등 (#34)."""
rows = (await db.execute(
select(LoginEvent).where(LoginEvent.username == current_user.username)
.order_by(LoginEvent.created_at.desc())
.limit(min(limit, 200))
)).scalars().all()
return [
{
"id": e.id,
"event_type": e.event_type,
"detail": e.detail,
"created_at": e.created_at.isoformat() if e.created_at else None,
}
for e in rows
]
@router.get("/network-status")
async def network_status(
current_user: User = Depends(get_current_user),
):
"""접속 경로 상태 — VPN / 개방망 / 내부망 (#37)."""
import os as _os
mode = _os.environ.get("GUARDIA_NETWORK_MODE", "internal")
if mode == "open":
via, level = "opennet", 2
elif mode == "vpn":
via, level = "vpn", 2
else:
via, level = "internal", 3 # 내부망이 가장 신뢰 수준 높음
return {
"via": via,
"level": level,
"username": current_user.username,
"checked_at": datetime.now().isoformat(),
}
@router.post("/switch-tenant")
async def switch_tenant(
body: SwitchTenantRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""기관 전환 — 새 inst_code로 전환 후 새 JWT 발급 (#33/멀티기관)."""
inst = (await db.execute(
select(Institution).where(Institution.inst_code == body.tenant_id)
)).scalars().first()
if not inst:
raise HTTPException(404, "전환 대상 기관을 찾을 수 없습니다.")
# CUSTOMER는 자기 기관으로만 제한, ADMIN/PM/ENGINEER는 자유 전환
from models import UserRole
if current_user.role == UserRole.CUSTOMER and current_user.inst_code != body.tenant_id:
raise HTTPException(403, "해당 기관으로 전환할 권한이 없습니다.")
token = create_access_token({
"sub": current_user.username,
"role": current_user.role,
"tenant": body.tenant_id,
})
db.add(LoginEvent(username=current_user.username, event_type="TENANT_SWITCH",
detail=f"{body.tenant_id}"))
await db.commit()
return {
"access_token": token,
"token_type": "bearer",
"tenant_id": body.tenant_id,
"inst_name": inst.inst_name,
}
# ── OAuth2 소셜 로그인 ────────────────────────────────────────────────────────
@router.get("/oauth/providers")

171
routers/inventory.py Normal file
View File

@ -0,0 +1,171 @@
"""
부품 재고 API (모바일 기능 #62).
GET /api/inventory/parts 부품 목록 (tenant 필터)
GET /api/inventory/parts/{id} 부품 상세
POST /api/inventory/parts 부품 등록
POST /api/inventory/parts/{id}/request 부품 요청 SR 자동 생성
"""
from __future__ import annotations
from datetime import datetime
from typing import List, Optional
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, ConfigDict
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import Institution, InventoryPart, SRRequest, SRStatus, SRType, User
router = APIRouter(prefix="/api/inventory", tags=["Inventory"])
def _tenant_of(user: User) -> str:
return user.inst_code or f"user:{user.username}"
class PartCreate(BaseModel):
name: str
model: Optional[str] = None
quantity: int = 0
min_quantity: int = 1
location: Optional[str] = None
class PartOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
model: Optional[str]
quantity: int
min_quantity: int
location: Optional[str]
low_stock: bool = False
class PartRequest(BaseModel):
quantity: int = 1
reason: Optional[str] = None
target_server: Optional[str] = None
def _to_out(p: InventoryPart) -> dict:
return {
"id": p.id,
"name": p.name,
"model": p.model,
"quantity": p.quantity,
"min_quantity": p.min_quantity,
"location": p.location,
"low_stock": (p.quantity or 0) <= (p.min_quantity or 0),
}
@router.get("/parts", response_model=List[PartOut])
async def list_parts(
low_stock_only: bool = False,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""내 테넌트의 부품 목록."""
q = select(InventoryPart).where(
InventoryPart.tenant_id == _tenant_of(current_user)
).order_by(InventoryPart.name)
rows = (await db.execute(q)).scalars().all()
out = [_to_out(p) for p in rows]
if low_stock_only:
out = [p for p in out if p["low_stock"]]
return out
@router.post("/parts", response_model=PartOut, status_code=201)
async def create_part(
payload: PartCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
part = InventoryPart(
tenant_id=_tenant_of(current_user),
name=payload.name,
model=payload.model,
quantity=payload.quantity,
min_quantity=payload.min_quantity,
location=payload.location,
)
db.add(part)
await db.commit()
await db.refresh(part)
return _to_out(part)
async def _get_owned_part(part_id: int, db: AsyncSession, user: User) -> InventoryPart:
part = await db.get(InventoryPart, part_id)
if not part or part.tenant_id != _tenant_of(user):
raise HTTPException(404, "부품을 찾을 수 없습니다.")
return part
@router.get("/parts/{part_id}", response_model=PartOut)
async def get_part(
part_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
part = await _get_owned_part(part_id, db, current_user)
return _to_out(part)
@router.post("/parts/{part_id}/request", status_code=201)
async def request_part(
part_id: int,
payload: PartRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""부품 요청 → SR 자동 생성."""
part = await _get_owned_part(part_id, db, current_user)
if payload.quantity < 1:
raise HTTPException(422, "요청 수량은 1 이상이어야 합니다.")
# 소속 기관 id 매핑 (있으면)
inst_id = None
if current_user.inst_code:
inst = (await db.execute(
select(Institution).where(Institution.inst_code == current_user.inst_code)
)).scalars().first()
if inst:
inst_id = inst.id
sr_id = f"SR-{datetime.now().strftime('%Y%m%d')}-{str(uuid4())[:6].upper()}"
desc = (
f"부품 요청\n"
f"- 부품명: {part.name}\n"
f"- 모델: {part.model or '-'}\n"
f"- 요청수량: {payload.quantity}\n"
f"- 보관위치: {part.location or '-'}\n"
f"- 사유: {payload.reason or '-'}"
)
sr = SRRequest(
sr_id=sr_id,
inst_id=inst_id,
sr_type=SRType.OTHER,
title=f"[부품요청] {part.name} x{payload.quantity}",
description=desc,
status=SRStatus.RECEIVED,
requested_by=current_user.username,
target_server=payload.target_server,
)
db.add(sr)
await db.commit()
return {
"sr_id": sr_id,
"part_id": part.id,
"part_name": part.name,
"requested_quantity": payload.quantity,
"message": "부품 요청 SR이 생성되었습니다.",
}

138
routers/search.py Normal file
View File

@ -0,0 +1,138 @@
"""
통합 검색 API (모바일 기능 #50).
GET /api/search/?q={query}&types=sr,server,kb,institution
SR, 서버(CMDB), KB 문서, 기관을 동시에 검색하여 타입별 결과를 반환.
보안: 서버 결과는 ServerOut 안전 필드만 반환(ip_addr/ssh_user/os_pw_enc 제외).
CUSTOMER 역할은 자신의 기관 SR/서버만 조회.
"""
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import (
Institution, KBDocument, Server, SRRequest, User, UserRole,
)
router = APIRouter(prefix="/api/search", tags=["Search"])
_PER_TYPE_LIMIT = 5
async def _customer_inst_id(user: User, db: AsyncSession) -> Optional[int]:
"""CUSTOMER 역할이면 소속 기관 id 반환, 아니면 None."""
if user.role == UserRole.CUSTOMER and user.inst_code:
inst = (await db.execute(
select(Institution).where(Institution.inst_code == user.inst_code)
)).scalars().first()
return inst.id if inst else -1
return None
@router.get("/")
async def global_search(
q: str = Query(..., min_length=1, description="검색어"),
types: str = Query("sr,server,kb,institution", description="콤마 구분 검색 대상"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR + 서버 + KB + 기관 통합 검색."""
wanted = {t.strip() for t in types.split(",") if t.strip()}
if not wanted:
raise HTTPException(422, "types에 최소 하나의 검색 대상을 지정하세요.")
like = f"%{q}%"
results: dict = {}
cust_inst_id = await _customer_inst_id(current_user, db)
# ── SR 검색 ──────────────────────────────────────────────────────────
if "sr" in wanted:
sr_q = select(SRRequest).where(
or_(SRRequest.title.ilike(like),
SRRequest.description.ilike(like),
SRRequest.sr_id.ilike(like))
)
if cust_inst_id is not None:
sr_q = sr_q.where(SRRequest.inst_id == cust_inst_id)
sr_q = sr_q.order_by(SRRequest.created_at.desc()).limit(_PER_TYPE_LIMIT)
srs = (await db.execute(sr_q)).scalars().all()
results["sr"] = [
{
"sr_id": s.sr_id,
"title": s.title,
"status": s.status,
"priority": s.priority,
"sr_type": s.sr_type,
}
for s in srs
]
# ── 서버(CMDB) 검색 — 자격증명 필드 절대 제외 ──────────────────────────
if "server" in wanted:
srv_q = select(Server).where(
or_(Server.server_name.ilike(like),
Server.server_role.ilike(like),
Server.os_type.ilike(like))
)
if cust_inst_id is not None:
srv_q = srv_q.where(Server.inst_id == cust_inst_id)
srv_q = srv_q.limit(_PER_TYPE_LIMIT)
servers = (await db.execute(srv_q)).scalars().all()
results["server"] = [
{
"id": s.id,
"server_name": s.server_name,
"server_role": s.server_role,
"os_type": s.os_type,
"inst_id": s.inst_id,
# ip_addr / ssh_user / os_pw_enc 절대 미포함
}
for s in servers
]
# ── KB 검색 ──────────────────────────────────────────────────────────
if "kb" in wanted:
kb_q = select(KBDocument).where(
or_(KBDocument.title.ilike(like),
KBDocument.symptoms.ilike(like),
KBDocument.tags.ilike(like))
).limit(_PER_TYPE_LIMIT)
kbs = (await db.execute(kb_q)).scalars().all()
results["kb"] = [
{
"doc_id": k.doc_id,
"title": k.title,
"category": k.category,
"tags": k.tags,
}
for k in kbs
]
# ── 기관 검색 ────────────────────────────────────────────────────────
if "institution" in wanted:
inst_q = select(Institution).where(
or_(Institution.inst_name.ilike(like),
Institution.inst_code.ilike(like))
)
if cust_inst_id is not None and cust_inst_id != -1:
inst_q = inst_q.where(Institution.id == cust_inst_id)
inst_q = inst_q.limit(_PER_TYPE_LIMIT)
insts = (await db.execute(inst_q)).scalars().all()
results["institution"] = [
{
"id": i.id,
"inst_code": i.inst_code,
"inst_name": i.inst_name,
}
for i in insts
]
total = sum(len(v) for v in results.values())
return {"query": q, "total": total, "results": results}

280
routers/sr_chat.py Normal file
View File

@ -0,0 +1,280 @@
"""
SR 채팅방 API + WebSocket (모바일 기능 #98).
WS /ws/sr-chat/{sr_id}?token={jwt} SR별 실시간 채팅
POST /api/sr-chat/{sr_id}/messages 메시지 전송 (REST)
GET /api/sr-chat/{sr_id}/messages 메시지 이력
POST /api/sr-chat/{sr_id}/read 읽음 처리
메시지 타입: text | image | sr_update
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Set
from fastapi import (
APIRouter, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect,
)
from pydantic import BaseModel, ConfigDict
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db, SessionLocal
from models import SRChatMessage, SRRequest, User
logger = logging.getLogger(__name__)
router = APIRouter(tags=["SR Chat"])
_VALID_MSG_TYPE = {"text", "image", "sr_update"}
# ── WebSocket 연결 관리 (SR방별 그룹) ─────────────────────────────────────────
class _ChatRooms:
def __init__(self) -> None:
# { sr_id: set(WebSocket) }
self._rooms: Dict[str, Set[WebSocket]] = {}
def join(self, sr_id: str, ws: WebSocket) -> None:
self._rooms.setdefault(sr_id, set()).add(ws)
def leave(self, sr_id: str, ws: WebSocket) -> None:
room = self._rooms.get(sr_id)
if room:
room.discard(ws)
if not room:
self._rooms.pop(sr_id, None)
async def broadcast(self, sr_id: str, payload: dict) -> None:
room = self._rooms.get(sr_id)
if not room:
return
msg = json.dumps(payload, ensure_ascii=False)
dead = []
for ws in list(room):
try:
await ws.send_text(msg)
except Exception:
dead.append(ws)
for ws in dead:
room.discard(ws)
rooms = _ChatRooms()
# ── 스키마 ────────────────────────────────────────────────────────────────────
class ChatMessageCreate(BaseModel):
content: str
msg_type: str = "text"
class ChatMessageOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
task_id: str
sender_id: str
content: str
msg_type: str
created_at: Optional[datetime]
# ── 헬퍼 ──────────────────────────────────────────────────────────────────────
async def _ensure_sr(sr_id: str, db: AsyncSession) -> SRRequest:
sr = (await db.execute(
select(SRRequest).where(SRRequest.sr_id == sr_id)
)).scalars().first()
if not sr:
raise HTTPException(404, "SR을 찾을 수 없습니다.")
return sr
async def _save_message(db: AsyncSession, sr_id: str, sender: str,
content: str, msg_type: str) -> SRChatMessage:
if msg_type not in _VALID_MSG_TYPE:
raise HTTPException(422, f"msg_type은 {_VALID_MSG_TYPE} 중 하나여야 합니다.")
if not content or not content.strip():
raise HTTPException(422, "메시지 내용이 비어 있습니다.")
m = SRChatMessage(
task_id=sr_id,
sender_id=sender,
content=content,
msg_type=msg_type,
read_by=json.dumps([sender], ensure_ascii=False),
)
db.add(m)
await db.commit()
await db.refresh(m)
return m
# ── REST: 메시지 전송 ─────────────────────────────────────────────────────────
@router.post("/api/sr-chat/{sr_id}/messages", response_model=ChatMessageOut, status_code=201)
async def send_message(
sr_id: str,
payload: ChatMessageCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR 채팅 메시지 전송 (REST). 연결된 WebSocket 구독자에게도 브로드캐스트."""
await _ensure_sr(sr_id, db)
m = await _save_message(db, sr_id, current_user.username,
payload.content, payload.msg_type)
await rooms.broadcast(sr_id, {
"type": "message",
"id": m.id,
"task_id": sr_id,
"sender_id": m.sender_id,
"content": m.content,
"msg_type": m.msg_type,
"created_at": m.created_at.isoformat() if m.created_at else None,
})
return m
# ── REST: 메시지 이력 ─────────────────────────────────────────────────────────
@router.get("/api/sr-chat/{sr_id}/messages", response_model=List[ChatMessageOut])
async def list_messages(
sr_id: str,
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR 채팅 메시지 이력 (오래된 순)."""
await _ensure_sr(sr_id, db)
rows = (await db.execute(
select(SRChatMessage)
.where(SRChatMessage.task_id == sr_id)
.order_by(SRChatMessage.created_at.asc())
.offset(skip).limit(min(limit, 500))
)).scalars().all()
return rows
# ── REST: 읽음 처리 ───────────────────────────────────────────────────────────
@router.post("/api/sr-chat/{sr_id}/read")
async def mark_read(
sr_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""현재 사용자가 SR 채팅의 모든 메시지를 읽음 처리."""
await _ensure_sr(sr_id, db)
rows = (await db.execute(
select(SRChatMessage).where(SRChatMessage.task_id == sr_id)
)).scalars().all()
updated = 0
for m in rows:
try:
readers = json.loads(m.read_by) if m.read_by else []
except Exception:
readers = []
if current_user.username not in readers:
readers.append(current_user.username)
m.read_by = json.dumps(readers, ensure_ascii=False)
updated += 1
await db.commit()
return {"sr_id": sr_id, "marked_read": updated, "reader": current_user.username}
# ── WebSocket: 실시간 채팅 ────────────────────────────────────────────────────
async def _authenticate_ws(token: str, db: AsyncSession) -> Optional[User]:
if not token:
return None
try:
from core.auth import SECRET_KEY, ALGORITHM
from jose import jwt
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("mfa_pending"):
return None
username = payload.get("sub")
if not username:
return None
user = (await db.execute(
select(User).where(User.username == username)
)).scalars().first()
return user if (user and user.is_active) else None
except Exception:
return None
@router.websocket("/ws/sr-chat/{sr_id}")
async def sr_chat_ws(
websocket: WebSocket,
sr_id: str,
token: str = Query(..., description="JWT access_token"),
db: AsyncSession = Depends(get_db),
):
"""SR별 실시간 채팅 WebSocket."""
user = await _authenticate_ws(token, db)
if not user:
await websocket.close(code=4001, reason="인증 실패: 유효한 토큰이 필요합니다.")
return
# SR 존재 확인
sr = (await db.execute(
select(SRRequest).where(SRRequest.sr_id == sr_id)
)).scalars().first()
if not sr:
await websocket.close(code=4004, reason="SR을 찾을 수 없습니다.")
return
await websocket.accept()
rooms.join(sr_id, websocket)
await websocket.send_text(json.dumps({
"type": "connected",
"sr_id": sr_id,
"username": user.username,
"server_time": datetime.now().isoformat(),
}, ensure_ascii=False))
try:
while True:
raw = await websocket.receive_text()
try:
data = json.loads(raw)
except Exception:
await websocket.send_text(json.dumps(
{"type": "error", "message": "JSON 형식이 아닙니다."},
ensure_ascii=False))
continue
if data.get("type") == "ping":
await websocket.send_text(json.dumps(
{"type": "pong", "server_time": datetime.now().isoformat()},
ensure_ascii=False))
continue
content = (data.get("content") or "").strip()
msg_type = data.get("msg_type", "text")
if not content or msg_type not in _VALID_MSG_TYPE:
await websocket.send_text(json.dumps(
{"type": "error", "message": "content 또는 msg_type이 올바르지 않습니다."},
ensure_ascii=False))
continue
# DB 저장 (독립 세션) + 구독자에게 브로드캐스트
async with SessionLocal() as _db:
m = await _save_message(_db, sr_id, user.username, content, msg_type)
payload = {
"type": "message",
"id": m.id,
"task_id": sr_id,
"sender_id": m.sender_id,
"content": m.content,
"msg_type": m.msg_type,
"created_at": m.created_at.isoformat() if m.created_at else None,
}
await rooms.broadcast(sr_id, payload)
except WebSocketDisconnect:
pass
except Exception as exc:
logger.debug("SR 채팅 WS 오류: sr=%s err=%s", sr_id, exc)
finally:
rooms.leave(sr_id, websocket)

92
routers/system.py Normal file
View File

@ -0,0 +1,92 @@
"""
시스템 정보 API (모바일 기능 #77).
GET /api/system/release-notes 버전별 릴리즈 노트 목록
GET /api/system/version 현재 버전 정보
릴리즈 노트는 정적 정의(코드 내장) 제공한다.
"""
from __future__ import annotations
from typing import List, Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from core.auth import get_current_user
from models import User
router = APIRouter(prefix="/api/system", tags=["System"])
CURRENT_VERSION = "2.0.0"
# 최신 → 과거 순서
_RELEASE_NOTES: List[dict] = [
{
"version": "2.0.0",
"date": "2026-06-06",
"changes": [
"모바일 100기능 백엔드 API 추가 (알림규칙·통합검색·SR채팅·부품재고)",
"SR 에스컬레이션/구독/만족도/현장서명/체크인 지원",
"보안 이벤트 로그 및 디바이스 관리 추가",
"다단계 승인 현황 및 변경 달력 API 추가",
],
"breaking_changes": [],
},
{
"version": "1.5.0",
"date": "2026-06-01",
"changes": [
"CI/CD 배포 트리거 연동",
"tmux 세션 관리·하네스 빌더 추가",
"AI-SOC·데이터 거버넌스 영역 확장",
],
"breaking_changes": [],
},
{
"version": "1.0.0",
"date": "2026-05-20",
"changes": [
"GUARDiA ITSM 정식 출시",
"SR 라이프사이클·CMDB·KB·SLA·승인 워크플로우",
],
"breaking_changes": [],
},
]
class ReleaseNote(BaseModel):
version: str
date: str
changes: List[str]
breaking_changes: List[str] = []
@router.get("/release-notes", response_model=List[ReleaseNote])
async def list_release_notes(
since: Optional[str] = None,
_u: User = Depends(get_current_user),
):
"""버전별 릴리즈 노트 목록 (최신순). since 지정 시 해당 버전 이후만."""
notes = _RELEASE_NOTES
if since:
# since 버전 이후(미포함)만 반환
filtered = []
for n in notes:
if n["version"] == since:
break
filtered.append(n)
notes = filtered
return notes
@router.get("/version")
async def get_version(_u: User = Depends(get_current_user)):
"""현재 버전 정보."""
latest = _RELEASE_NOTES[0] if _RELEASE_NOTES else None
return {
"name": "GUARDiA ITSM",
"version": CURRENT_VERSION,
"latest_release_date": latest["date"] if latest else None,
"release_count": len(_RELEASE_NOTES),
}

View File

@ -13,7 +13,8 @@ from core.events import broadcast
from database import get_db
from models import (
AuditLog, Institution, SRCreate, SROut, SRRequest, SRStatus,
SRStatusUpdate, SRType, User, compute_log_hash
SRStatusUpdate, SRType, User, compute_log_hash,
SRSubscription, SRRating, SRSignature, SRCheckin,
)
router = APIRouter(prefix="/api/tasks", tags=["tasks"])
@ -507,3 +508,301 @@ async def bulk_sr_action(
"results": results,
}
# ════════════════════════════════════════════════════════════════════════════════
# ── 모바일 100기능: SR 액션 + 통계 ──────────────────────────────────────────────
# ════════════════════════════════════════════════════════════════════════════════
class EscalateRequest(BaseModel):
reason: Optional[str] = None
escalate_to: Optional[str] = None # 미지정 시 온콜 에스컬레이션 체인 사용
class RatingRequest(BaseModel):
score: int
comment: Optional[str] = None
class SignatureRequest(BaseModel):
signature_base64: str
class SlaExceptionRequest(BaseModel):
reason: str
new_deadline: str # ISO datetime/date
class CheckinRequest(BaseModel):
lat: float
lng: float
async def _get_sr(sr_id: str, db: AsyncSession) -> SRRequest:
sr = (await db.execute(
select(SRRequest).where(SRRequest.sr_id == sr_id)
)).scalars().first()
if not sr:
raise HTTPException(404, detail="SR을 찾을 수 없습니다.")
return sr
@router.post("/{sr_id}/escalate")
async def escalate_task(
sr_id: str,
payload: EscalateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR 에스컬레이션 — 상위 담당자로 재배정 + 알림 (#8)."""
from models import UserRole
if current_user.role == UserRole.CUSTOMER:
raise HTTPException(403, "에스컬레이션 권한이 없습니다.")
sr = await _get_sr(sr_id, db)
target = payload.escalate_to
if not target:
# 온콜 에스컬레이션 체인에서 대상 도출 시도
try:
from core.oncall_rotate import get_current_oncall
sched = await get_current_oncall(db)
if sched:
target = sched.escalation_to or sched.backup_engineer or sched.engineer
except Exception:
target = None
sr.escalated_at = datetime.now()
sr.escalated_to = target
if target:
sr.assigned_to = target
sr.updated_at = datetime.now()
await _write_audit(db, sr_id, current_user.username, "SR_ESCALATED",
f"에스컬레이션 → {target or '미정'} | 사유: {payload.reason or ''}")
await db.commit()
await broadcast("sla_escalated", {
"sr_id": sr_id, "escalated_to": target, "by": current_user.username,
})
return {
"sr_id": sr_id,
"escalated_to": target,
"escalated_at": sr.escalated_at.isoformat(),
"message": f"'{target or '대상 미정'}'(으)로 에스컬레이션되었습니다.",
}
@router.post("/{sr_id}/subscribe")
async def toggle_subscribe(
sr_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SR 구독/팔로우 토글 (#14)."""
await _get_sr(sr_id, db)
existing = (await db.execute(
select(SRSubscription).where(
SRSubscription.task_id == sr_id,
SRSubscription.username == current_user.username,
)
)).scalars().first()
if existing:
await db.delete(existing)
await db.commit()
return {"sr_id": sr_id, "subscribed": False, "message": "구독을 해제했습니다."}
sub = SRSubscription(task_id=sr_id, username=current_user.username)
db.add(sub)
await db.commit()
return {"sr_id": sr_id, "subscribed": True, "message": "구독했습니다."}
@router.post("/{sr_id}/rating", status_code=201)
async def rate_task(
sr_id: str,
payload: RatingRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""완료 SR 만족도 평가 (#53)."""
if not (1 <= payload.score <= 5):
raise HTTPException(422, "score는 1~5 범위여야 합니다.")
sr = await _get_sr(sr_id, db)
if sr.status != SRStatus.COMPLETED:
raise HTTPException(400, "완료된 SR만 평가할 수 있습니다.")
rating = SRRating(
task_id=sr_id, rater=current_user.username,
score=payload.score, comment=payload.comment,
)
db.add(rating)
await _write_audit(db, sr_id, current_user.username, "SR_RATED",
f"만족도 {payload.score}")
await db.commit()
await db.refresh(rating)
return {"sr_id": sr_id, "score": payload.score, "rating_id": rating.id}
@router.post("/{sr_id}/signature", status_code=201)
async def save_signature(
sr_id: str,
payload: SignatureRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""현장 전자서명 저장 (#70)."""
if not payload.signature_base64 or len(payload.signature_base64) < 10:
raise HTTPException(422, "signature_base64 데이터가 올바르지 않습니다.")
await _get_sr(sr_id, db)
sig = SRSignature(
task_id=sr_id, signed_by=current_user.username,
signature_b64=payload.signature_base64,
)
db.add(sig)
await _write_audit(db, sr_id, current_user.username, "SR_SIGNED", "현장 서명 등록")
await db.commit()
await db.refresh(sig)
return {"sr_id": sr_id, "signature_id": sig.id, "signed_by": current_user.username}
@router.post("/{sr_id}/sla-exception")
async def request_sla_exception(
sr_id: str,
payload: SlaExceptionRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""SLA 예외 승인 요청 — SLA 마감 연장 요청 (#94)."""
sr = await _get_sr(sr_id, db)
try:
new_dl = datetime.fromisoformat(payload.new_deadline)
except ValueError:
raise HTTPException(422, "new_deadline은 ISO 날짜/시간 형식이어야 합니다.")
old_dl = sr.sla_deadline
sr.sla_deadline = new_dl
sr.sla_breached = False
sr.updated_at = datetime.now()
await _write_audit(db, sr_id, current_user.username, "SLA_EXCEPTION_REQUESTED",
f"SLA 마감 {old_dl}{new_dl} | 사유: {payload.reason}")
await db.commit()
return {
"sr_id": sr_id,
"old_deadline": old_dl.isoformat() if old_dl else None,
"new_deadline": new_dl.isoformat(),
"reason": payload.reason,
"message": "SLA 예외(마감 연장)가 적용되었습니다.",
}
@router.post("/{sr_id}/checkin", status_code=201)
async def checkin_task(
sr_id: str,
payload: CheckinRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""현장 체크인 — GPS 좌표 기록 (#93)."""
if not (-90 <= payload.lat <= 90) or not (-180 <= payload.lng <= 180):
raise HTTPException(422, "좌표 값이 올바르지 않습니다.")
await _get_sr(sr_id, db)
chk = SRCheckin(
task_id=sr_id, username=current_user.username,
lat=payload.lat, lng=payload.lng,
)
db.add(chk)
await _write_audit(db, sr_id, current_user.username, "SR_CHECKIN",
f"현장 체크인 ({payload.lat}, {payload.lng})")
await db.commit()
await db.refresh(chk)
return {
"sr_id": sr_id,
"checkin_id": chk.id,
"lat": payload.lat,
"lng": payload.lng,
"checked_in_at": chk.created_at.isoformat() if chk.created_at else None,
}
@router.get("/stats/mine")
async def my_sr_stats(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""내 SR 처리 통계 — total / closed / avg_resolve_hours / sla_met_rate (#3)."""
rows = (await db.execute(
select(SRRequest).where(SRRequest.assigned_to == current_user.username)
)).scalars().all()
total = len(rows)
terminal = {SRStatus.COMPLETED, SRStatus.REJECTED, SRStatus.FAILED_ROLLBACK}
closed_rows = [r for r in rows if r.status in terminal]
closed = len(closed_rows)
# 평균 해결 시간 (완료 건의 created_at→updated_at)
completed = [r for r in rows if r.status == SRStatus.COMPLETED
and r.created_at and r.updated_at]
avg_hours = 0.0
if completed:
secs = sum((r.updated_at - r.created_at).total_seconds() for r in completed)
avg_hours = round(secs / len(completed) / 3600, 2)
# SLA 준수율 (마감 시각이 있는 건 중 미위반 비율)
sla_rows = [r for r in rows if r.sla_deadline is not None]
sla_met = len([r for r in sla_rows if not r.sla_breached])
sla_met_rate = round(sla_met / len(sla_rows) * 100, 1) if sla_rows else 100.0
return {
"username": current_user.username,
"total": total,
"closed": closed,
"open": total - closed,
"avg_resolve_hours": avg_hours,
"sla_met_rate": sla_met_rate,
}
@router.get("/stats/by-institution")
async def stats_by_institution(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""기관별 SR 현황 비교 (#4). CUSTOMER는 자기 기관만."""
from models import UserRole
q = select(SRRequest)
cust_inst_id = None
if current_user.role == UserRole.CUSTOMER and current_user.inst_code:
inst = (await db.execute(
select(Institution).where(Institution.inst_code == current_user.inst_code)
)).scalars().first()
cust_inst_id = inst.id if inst else -1
q = q.where(SRRequest.inst_id == cust_inst_id)
srs = (await db.execute(q)).scalars().all()
# 기관 이름 매핑
insts = (await db.execute(select(Institution))).scalars().all()
name_map = {i.id: i.inst_name for i in insts}
terminal = {SRStatus.COMPLETED, SRStatus.REJECTED, SRStatus.FAILED_ROLLBACK}
agg: dict = {}
for s in srs:
key = s.inst_id
bucket = agg.setdefault(key, {
"inst_id": key,
"inst_name": name_map.get(key, "미지정"),
"total": 0, "closed": 0, "open": 0, "sla_breached": 0,
})
bucket["total"] += 1
if s.status in terminal:
bucket["closed"] += 1
else:
bucket["open"] += 1
if s.sla_breached:
bucket["sla_breached"] += 1
result = sorted(agg.values(), key=lambda x: x["total"], reverse=True)
return {"institutions": result, "count": len(result)}