#!/usr/bin/env python3 import paramiko, time, sys HOST = '101.79.17.164'; USER = 'root'; PASS = '1q2w3e!Q' client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(HOST, username=USER, password=PASS, timeout=15) sftp = client.open_sftp() script = """ import urllib.request, json BASE = "http://localhost:8001" def api(method, path, data=None, token=None, form=False): url = f"{BASE}{path}" if form: import urllib.parse body = urllib.parse.urlencode(data).encode() if data else None ct = "application/x-www-form-urlencoded" else: body = json.dumps(data).encode() if data else None ct = "application/json" req = urllib.request.Request(url, data=body, method=method) req.add_header("Content-Type", ct) if token: req.add_header("Authorization", f"Bearer {token}") try: resp = urllib.request.urlopen(req, timeout=10) body_out = resp.read() return json.loads(body_out) if body_out else {}, resp.status except urllib.error.HTTPError as e: body_out = e.read() try: return json.loads(body_out), e.code except: return {"raw": body_out.decode()[:200]}, e.code # JSON 로그인 d, s = api("POST", "/api/auth/login", {"username":"admin","password":"1111"}) TOKEN = d.get("access_token","") print(f"Login HTTP {s}: {'OK' if TOKEN else 'FAIL'} token={TOKEN[:20]}...") # Trial d2, s2 = api("POST", "/api/license/trial", {"customer":"지오정보기술 체험판","days":7}, token=TOKEN) print(f"Trial HTTP {s2}: {json.dumps(d2, ensure_ascii=False)[:200]}") # 상태 d3, s3 = api("GET", "/api/license/status", token=TOKEN) print(f"Status HTTP {s3}: valid={d3.get('valid')}, edition={d3.get('edition')}, days={d3.get('days_remaining')}") print(f" message: {d3.get('message')}") # 이력 d4, s4 = api("GET", "/api/license/history", token=TOKEN) cnt = len(d4) if isinstance(d4,list) else "?" print(f"History HTTP {s4}: {cnt}건") if isinstance(d4,list) and d4: r = d4[0] print(f" 최신: edition={r.get('edition')}, customer={r.get('customer')}, is_trial={r.get('is_trial')}") """ with sftp.open('/tmp/trial2.py', 'w') as f: f.write(script) sftp.close() chan = client.get_transport().open_session() chan.set_combine_stderr(True) chan.exec_command('python3 /tmp/trial2.py 2>&1') start = time.time() while not chan.exit_status_ready(): if chan.recv_ready(): sys.stdout.buffer.write(chan.recv(4096)); sys.stdout.flush() if time.time() - start > 20: break time.sleep(0.2) while chan.recv_ready(): sys.stdout.buffer.write(chan.recv(4096)) sys.stdout.flush() chan.recv_exit_status() client.close()