guardia-itsm/routers/bid_watcher.py
2026-06-07 15:13:59 +09:00

518 lines
19 KiB
Python

"""
나라장터(G2B) 당일 입찰정보 크롤링 + 참가 의사결정 워크플로우 + 문서함
- g2b_opportunity.py(적합성 분석 도구)와 달리, "당일 신규 공고 수집 → 참가/보류/삭제
의사결정 + RFP 등 첨부문서 다운로드"를 담당하는 운영 모니터링 도구다.
- SI/SM 프로젝트(SW개발용역·유지보수·시스템구축·시스템고도화 등)에 한정해서만 수집한다.
- 개방망(GUARDIA_NETWORK_MODE=open)에서만 외부 공공데이터포털 API를 호출하고,
폐쇄망에서는 샘플 데이터로 동작한다 (g2b_opportunity.py의 _OPEN 분기 패턴 재사용).
"""
from __future__ import annotations
import os
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Optional, List
from urllib.parse import quote
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict
from sqlalchemy import select, func as sa_func
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from database import get_db
from models import User, BidWatch, AuditLog
router = APIRouter(prefix="/api/bid-watcher", tags=["나라장터 입찰워처"])
_OPEN = os.environ.get("GUARDIA_NETWORK_MODE") == "open"
# 공공데이터포털 나라장터 입찰공고정보서비스 (개방망 전용)
BID_API_BASE = "https://apis.data.go.kr/1230000/ad/BidPublicInfoService"
BID_API_KEY = os.environ.get("G2B_API_KEY", "")
DOC_ROOT = Path(__file__).parent.parent / "uploads" / "bid_documents"
def _tenant(user: User) -> str:
return user.inst_code or str(user.id)
# ── SI/SM 한정 필터 (가장 중요한 제약 — 통과 못하면 절대 저장하지 않는다) ──────────
SI_SM_KEYWORDS = [
"소프트웨어개발", "SW개발", "소프트웨어 유지보수", "유지보수", "유지관리",
"시스템 구축", "시스템구축", "시스템 고도화", "시스템고도화",
"정보시스템 운영", "정보시스템운영", "ISP", "정보화전략계획",
"플랫폼 구축", "플랫폼구축", "솔루션 도입", "SI", "SM",
]
EXCLUDE_KEYWORDS = ["공사", "용역(청소)", "장비구매", "차량", "건축"]
def is_si_sm_project(title: str, category: str) -> bool:
"""제목/분류명에 화이트리스트 키워드가 있고 제외 키워드가 없을 때만 True."""
text = f"{title or ''} {category or ''}"
if any(ex in text for ex in EXCLUDE_KEYWORDS):
return False
return any(kw in text for kw in SI_SM_KEYWORDS)
# ── 폐쇄망 샘플 데이터 (당일 신규 공고 시뮬레이션) ─────────────────────────────
def _sample_bids_today() -> List[dict]:
today = date.today()
base = [
{
"bid_no": f"{today.strftime('%Y%m%d')}001",
"title": "2026년 OO시 통합 행정정보시스템 고도화 사업",
"category": "시스템 고도화",
"institution": "OO광역시청",
"budget": 850_000_000,
"deadline_days": 14,
"attachments": [
{"name": "제안요청서(RFP).hwp", "doc_type": "RFP"},
{"name": "과업지시서.pdf", "doc_type": "SOW"},
],
},
{
"bid_no": f"{today.strftime('%Y%m%d')}002",
"title": "공공데이터 플랫폼 구축 및 SW개발 용역",
"category": "소프트웨어개발",
"institution": "한국정보화진흥원",
"budget": 1_200_000_000,
"deadline_days": 21,
"attachments": [
{"name": "제안요청서.hwp", "doc_type": "RFP"},
{"name": "입찰공고문.pdf", "doc_type": "NOTICE"},
],
},
{
"bid_no": f"{today.strftime('%Y%m%d')}003",
"title": "차세대 정보시스템 운영 및 유지보수 사업",
"category": "정보시스템운영",
"institution": "OO도교육청",
"budget": 430_000_000,
"deadline_days": 10,
"attachments": [
{"name": "제안요청서(RFP).hwp", "doc_type": "RFP"},
],
},
{
"bid_no": f"{today.strftime('%Y%m%d')}004",
"title": "청사 시설 유지보수(전기·소방설비) 용역",
"category": "시설관리",
"institution": "OO군청",
"budget": 95_000_000,
"deadline_days": 7,
"attachments": [],
},
{
"bid_no": f"{today.strftime('%Y%m%d')}005",
"title": "스마트시티 통합플랫폼 구축 ISP 수립 용역",
"category": "ISP",
"institution": "국토교통부",
"budget": 280_000_000,
"deadline_days": 18,
"attachments": [
{"name": "ISP수립_제안요청서.hwp", "doc_type": "RFP"},
{"name": "참고자료.zip", "doc_type": "REFERENCE"},
],
},
{
"bid_no": f"{today.strftime('%Y%m%d')}006",
"title": "청사 신축 건축 공사 (1공구)",
"category": "건축공사",
"institution": "OO구청",
"budget": 5_400_000_000,
"deadline_days": 30,
"attachments": [],
},
]
out = []
for b in base:
out.append({
"bid_no": b["bid_no"],
"title": b["title"],
"category": b["category"],
"institution": b["institution"],
"budget": b["budget"],
"announce_date": today,
"deadline_date": today + timedelta(days=b["deadline_days"]),
"source_url": f"https://www.g2b.go.kr/pt/menu/selectSubFrame.do?bidNo={b['bid_no']}",
"attachments": [
{**a, "name": a["name"], "url": f"https://www.g2b.go.kr/docs/{b['bid_no']}/{quote(a['name'])}"}
for a in b["attachments"]
],
})
return out
async def _fetch_bid_api_today() -> List[dict]:
"""개방망: 공공데이터포털 나라장터 입찰공고정보서비스에서 당일 등록 공고 조회."""
if not BID_API_KEY:
return _sample_bids_today()
today_str = date.today().strftime("%Y%m%d")
params = {
"serviceKey": BID_API_KEY,
"type": "json",
"numOfRows": "100",
"pageNo": "1",
"inqryDiv": "1",
"inqryBgnDt": f"{today_str}0000",
"inqryEndDt": f"{today_str}2359",
}
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(f"{BID_API_BASE}/getBidPblancListInfoServc", params=params)
resp.raise_for_status()
data = resp.json()
items = (
data.get("response", {})
.get("body", {})
.get("items", [])
)
if isinstance(items, dict):
items = items.get("item", [])
out = []
for it in items if isinstance(items, list) else []:
bid_no = it.get("bidNtceNo") or it.get("bidNo")
if not bid_no:
continue
title = it.get("bidNtceNm", "")
category = it.get("ntceKindNm", "")
out.append({
"bid_no": bid_no,
"title": title,
"category": category,
"institution": it.get("ntceInsttNm", ""),
"budget": _safe_int(it.get("presmptPrce")),
"announce_date": _parse_date(it.get("bidNtceDt")) or date.today(),
"deadline_date": _parse_date(it.get("bidClseDt")),
"source_url": it.get("bidNtceUrl") or f"https://www.g2b.go.kr/pt/menu/selectSubFrame.do?bidNo={bid_no}",
"attachments": [],
})
return out
except Exception:
return _sample_bids_today()
def _safe_int(v) -> Optional[int]:
try:
return int(float(v)) if v not in (None, "") else None
except (ValueError, TypeError):
return None
def _parse_date(v: Optional[str]) -> Optional[date]:
if not v:
return None
for fmt in ("%Y%m%d%H%M", "%Y%m%d", "%Y-%m-%d"):
try:
return datetime.strptime(v[:len(fmt.replace("%", ""))] if False else v[:10].replace("-", "")[:8], "%Y%m%d").date()
except ValueError:
continue
return None
async def crawl_today_bids() -> List[dict]:
"""
당일(오늘) 등록 공고만 조회 → SI/SM 필터(is_si_sm_project) 통과분만 반환.
개방망: 공공데이터포털 나라장터 API. 폐쇄망: 샘플 데이터.
cron/APScheduler 등에서 직접 호출 가능하도록 의존성 없이 독립 작성.
"""
raw = await _fetch_bid_api_today() if _OPEN else _sample_bids_today()
return [b for b in raw if is_si_sm_project(b.get("title", ""), b.get("category", ""))]
async def fetch_bid_attachments(bid_no: str, fallback: Optional[List[dict]] = None) -> List[dict]:
"""공고 상세에서 첨부 문서(RFP·제안요청서·과업지시서 등) 목록 추출."""
if fallback is not None:
return fallback
return []
# ── Pydantic 스키마 ────────────────────────────────────────────────────────────
class BidWatchOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
bid_no: str
title: str
category: Optional[str]
institution: Optional[str]
budget: Optional[int]
announce_date: Optional[date]
deadline_date: Optional[date]
source_url: Optional[str]
attachments: Optional[list]
status: str
memo: Optional[str]
decided_by: Optional[int]
decided_at: Optional[datetime]
collected_at: Optional[datetime]
class StatusUpdateIn(BaseModel):
status: str # NEW|JOIN|HOLD|DELETED
memo: Optional[str] = None
class CrawlRunOut(BaseModel):
collected: int
skipped_non_si_sm: int
new_saved: int
updated: int
network_mode: str
_VALID_STATUSES = {"NEW", "JOIN", "HOLD", "DELETED"}
# ── 엔드포인트 ─────────────────────────────────────────────────────────────────
@router.post("/crawl/run", response_model=CrawlRunOut)
async def run_crawl(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""당일 입찰정보 크롤링 실행 — SI/SM 필터 적용, 신규분만 등록(중복은 upsert)."""
raw_total = await (_fetch_bid_api_today() if _OPEN else _sample_bids_today())
filtered = [b for b in raw_total if is_si_sm_project(b.get("title", ""), b.get("category", ""))]
skipped = len(raw_total) - len(filtered)
new_saved = 0
updated = 0
for b in filtered:
r = await db.execute(select(BidWatch).where(BidWatch.bid_no == b["bid_no"]))
existing = r.scalars().first()
atts = b.get("attachments") or await fetch_bid_attachments(b["bid_no"])
if existing:
existing.title = b["title"]
existing.category = b.get("category")
existing.institution = b.get("institution")
existing.budget = b.get("budget")
existing.announce_date = b.get("announce_date")
existing.deadline_date = b.get("deadline_date")
existing.source_url = b.get("source_url")
existing.attachments = atts
# status/memo/decided_* 는 보존 (운영자 의사결정 유지)
updated += 1
else:
db.add(BidWatch(
bid_no=b["bid_no"],
title=b["title"],
category=b.get("category"),
institution=b.get("institution"),
budget=b.get("budget"),
announce_date=b.get("announce_date"),
deadline_date=b.get("deadline_date"),
source_url=b.get("source_url"),
attachments=atts,
status="NEW",
collected_at=datetime.utcnow(),
))
new_saved += 1
db.add(AuditLog(
actor=user.username if hasattr(user, "username") else str(user.id),
action="BID_WATCH_CRAWL_RUN",
detail=f"수집 {len(raw_total)}건 / SI·SM 필터 통과 {len(filtered)}건 / 신규 {new_saved} / 갱신 {updated}",
entity_type="BID_WATCH",
severity="INFO",
))
await db.commit()
return CrawlRunOut(
collected=len(raw_total),
skipped_non_si_sm=skipped,
new_saved=new_saved,
updated=updated,
network_mode="open" if _OPEN else "closed",
)
@router.get("/bids", response_model=List[BidWatchOut])
async def list_bids(
status: Optional[str] = Query(None, description="NEW|JOIN|HOLD|DELETED"),
q: Optional[str] = Query(None, description="제목/발주기관 검색"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""입찰 목록 조회 — 그리드 (상태별 필터 NEW/JOIN/HOLD/DELETED)."""
stmt = select(BidWatch)
if status:
if status.upper() not in _VALID_STATUSES:
raise HTTPException(400, "status는 NEW|JOIN|HOLD|DELETED 중 하나여야 합니다.")
stmt = stmt.where(BidWatch.status == status.upper())
if q:
like = f"%{q}%"
stmt = stmt.where((BidWatch.title.ilike(like)) | (BidWatch.institution.ilike(like)))
stmt = stmt.order_by(BidWatch.collected_at.desc()).offset(offset).limit(limit)
r = await db.execute(stmt)
return r.scalars().all()
@router.get("/bids/{bid_id}", response_model=BidWatchOut)
async def get_bid(
bid_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""상세 — 공고 원문 링크·발주기관·예산·마감일·첨부문서 목록."""
bid = await db.get(BidWatch, bid_id)
if not bid:
raise HTTPException(404, "입찰 정보를 찾을 수 없습니다.")
return bid
@router.patch("/bids/{bid_id}/status", response_model=BidWatchOut)
async def update_bid_status(
bid_id: int,
body: StatusUpdateIn,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""상태 변경 — 참가(JOIN)/보류(HOLD)/삭제(DELETED)."""
bid = await db.get(BidWatch, bid_id)
if not bid:
raise HTTPException(404, "입찰 정보를 찾을 수 없습니다.")
new_status = body.status.upper()
if new_status not in _VALID_STATUSES:
raise HTTPException(400, "status는 NEW|JOIN|HOLD|DELETED 중 하나여야 합니다.")
prev = bid.status
bid.status = new_status
if body.memo is not None:
bid.memo = body.memo
bid.decided_by = user.id
bid.decided_at = datetime.utcnow()
db.add(AuditLog(
actor=user.username if hasattr(user, "username") else str(user.id),
action="BID_WATCH_STATUS_CHANGE",
detail=f"[{bid.bid_no}] {bid.title[:60]} : {prev}{new_status}" + (f" (메모: {body.memo[:80]})" if body.memo else ""),
entity_type="BID_WATCH",
entity_id=str(bid.id),
severity="INFO",
))
await db.commit()
await db.refresh(bid)
return bid
@router.get("/bids/{bid_id}/documents")
async def list_bid_documents(
bid_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""첨부 문서(RFP 등) 목록."""
bid = await db.get(BidWatch, bid_id)
if not bid:
raise HTTPException(404, "입찰 정보를 찾을 수 없습니다.")
docs = bid.attachments or []
return [
{"doc_id": idx, "name": d.get("name"), "doc_type": d.get("doc_type", "ETC")}
for idx, d in enumerate(docs)
]
@router.get("/bids/{bid_id}/documents/{doc_id}/download")
async def download_bid_document(
bid_id: int,
doc_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""RFP 등 문서 다운로드 — 외부 URL을 클라이언트에 직접 노출하지 않는 프록시 방식.
로컬 캐시 파일이 있으면 경로 검증 후 스트리밍, 없으면 안내 placeholder를 반환한다."""
bid = await db.get(BidWatch, bid_id)
if not bid:
raise HTTPException(404, "입찰 정보를 찾을 수 없습니다.")
docs = bid.attachments or []
if doc_id < 0 or doc_id >= len(docs):
raise HTTPException(404, "문서를 찾을 수 없습니다.")
doc = docs[doc_id]
name = doc.get("name", f"document_{doc_id}")
DOC_ROOT.mkdir(parents=True, exist_ok=True)
cache_path = DOC_ROOT / f"{bid.bid_no}_{doc_id}_{name}"
if cache_path.exists():
try:
cache_path.resolve().relative_to(DOC_ROOT.resolve())
except ValueError:
raise HTTPException(403, "접근이 거부되었습니다.")
encoded = quote(name, safe="")
def _iter():
with open(cache_path, "rb") as fh:
while chunk := fh.read(65536):
yield chunk
return StreamingResponse(
_iter(),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded}"},
)
# 캐시가 없는 경우: 외부 URL을 직접 노출하지 않고 안내만 제공
raise HTTPException(
503,
"문서가 아직 로컬에 캐시되지 않았습니다. 나라장터 원본 공고에서 직접 내려받아 주세요 "
"(보안 정책상 외부 다운로드 링크는 직접 노출하지 않습니다).",
)
@router.get("/bids/{bid_id}/link")
async def get_bid_link(
bid_id: int,
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""나라장터 원본 공고 링크 정보."""
bid = await db.get(BidWatch, bid_id)
if not bid:
raise HTTPException(404, "입찰 정보를 찾을 수 없습니다.")
return {
"bid_no": bid.bid_no,
"title": bid.title,
"source_url": bid.source_url,
"institution": bid.institution,
}
@router.get("/stats")
async def get_stats(
db: AsyncSession = Depends(get_db),
_u: User = Depends(get_current_user),
):
"""일자별/상태별 집계 (대시보드 카드용)."""
r = await db.execute(
select(BidWatch.status, sa_func.count(BidWatch.id)).group_by(BidWatch.status)
)
by_status = {row[0]: row[1] for row in r.all()}
today = date.today()
r2 = await db.execute(
select(sa_func.count(BidWatch.id)).where(BidWatch.collected_at >= datetime(today.year, today.month, today.day))
)
today_count = r2.scalar() or 0
return {
"today_collected": today_count,
"by_status": {
"NEW": by_status.get("NEW", 0),
"JOIN": by_status.get("JOIN", 0),
"HOLD": by_status.get("HOLD", 0),
"DELETED": by_status.get("DELETED", 0),
},
"total": sum(by_status.values()),
"network_mode": "open" if _OPEN else "closed",
}