feat: batch request reporting + fix timestamp parsing
This commit is contained in:
@@ -59,6 +59,18 @@ if cfg and os.path.exists(cfg):
|
||||
|
||||
# Health check each provider
|
||||
import urllib.request,urllib.error
|
||||
# macOS: use system proxy (Surge) if available
|
||||
_opener = None
|
||||
if mac:
|
||||
try:
|
||||
proxy = urllib.request.ProxyHandler({'http':'http://127.0.0.1:6152','https':'http://127.0.0.1:6152'})
|
||||
_opener = urllib.request.build_opener(proxy)
|
||||
except: pass
|
||||
|
||||
def _urlopen(req,timeout=10):
|
||||
if _opener: return _opener.open(req,timeout=timeout)
|
||||
return urllib.request.urlopen(req,timeout=timeout)
|
||||
|
||||
def check_provider(p):
|
||||
base,key,api = p.get('_base',''),p.get('_key',''),p.get('api','')
|
||||
if not base or not key: return {'ok':False,'ms':0,'err':'no config'}
|
||||
@@ -72,7 +84,7 @@ def check_provider(p):
|
||||
url = base.rstrip('/')+'/chat/completions'
|
||||
data = json.dumps({"model":p['model'],"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode()
|
||||
req = urllib.request.Request(url,data,{'Content-Type':'application/json','Authorization':'Bearer '+key})
|
||||
resp = urllib.request.urlopen(req,timeout=10)
|
||||
resp = _urlopen(req,timeout=10)
|
||||
ms = int((time.time()-t0)*1000)
|
||||
return {'ok':True,'ms':ms,'err':''}
|
||||
except urllib.error.HTTPError as e:
|
||||
@@ -197,5 +209,62 @@ PYEOF
|
||||
&& echo "[$(date +%H:%M:%S)] heartbeat ok" \
|
||||
|| echo "[$(date +%H:%M:%S)] heartbeat failed"
|
||||
fi
|
||||
|
||||
# Report new API requests from session jsonl files
|
||||
python3 - "$OC_CONFIG" "$NODE_ID" "$SERVER" "$TOKEN" << 'REQEOF' 2>/dev/null
|
||||
import json,os,sys,glob,time,urllib.request,urllib.error
|
||||
cfg,nid,server,token = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]
|
||||
oc_dir = os.path.expanduser('~/.openclaw')
|
||||
state_f = '/tmp/.oc-agent-req-state.json'
|
||||
state = {}
|
||||
if os.path.exists(state_f):
|
||||
try: state = json.load(open(state_f))
|
||||
except: pass
|
||||
|
||||
reqs = []
|
||||
for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')):
|
||||
for jf in glob.glob(os.path.join(sd,'*.jsonl')):
|
||||
if os.path.getmtime(jf) < time.time()-86400: continue
|
||||
prev_pos = state.get(jf,0)
|
||||
try:
|
||||
sz = os.path.getsize(jf)
|
||||
if sz <= prev_pos: continue
|
||||
with open(jf) as f:
|
||||
f.seek(prev_pos)
|
||||
for line in f:
|
||||
if '"usage"' not in line: continue
|
||||
try:
|
||||
d = json.loads(line)
|
||||
m = d.get('message',{})
|
||||
u = m.get('usage',{})
|
||||
if not u or not m.get('provider'): continue
|
||||
ts_raw = d.get('timestamp',0)
|
||||
if isinstance(ts_raw,str):
|
||||
from datetime import datetime
|
||||
ts_val = int(datetime.fromisoformat(ts_raw.replace('Z','+00:00')).timestamp())
|
||||
else:
|
||||
ts_val = int(ts_raw)
|
||||
reqs.append({'node_id':nid,'upstream':m.get('provider',''),
|
||||
'model':m.get('model',''),'status':200,
|
||||
'input_tokens':u.get('input',0),'output_tokens':u.get('output',0),
|
||||
'ttft_ms':0,'total_ms':0,'success':True,
|
||||
'ts':ts_val})
|
||||
except: pass
|
||||
state[jf] = f.tell()
|
||||
except: pass
|
||||
|
||||
# Send new requests (batch)
|
||||
if reqs:
|
||||
try:
|
||||
data = json.dumps(reqs).encode()
|
||||
req = urllib.request.Request(server+'/api/request',data,
|
||||
{'Content-Type':'application/json','Authorization':'Bearer '+token})
|
||||
urllib.request.urlopen(req,timeout=15)
|
||||
except: pass
|
||||
|
||||
json.dump(state,open(state_f,'w'))
|
||||
if reqs: print(f'[{time.strftime("%H:%M:%S")}] reported {len(reqs)} requests')
|
||||
REQEOF
|
||||
|
||||
sleep "$INTERVAL"
|
||||
done
|
||||
|
||||
@@ -140,16 +140,19 @@ const server = http.createServer((req, res) => {
|
||||
return json(200, { ok: true });
|
||||
}
|
||||
|
||||
// POST /api/request - agent reports API call
|
||||
// POST /api/request - agent reports API call (single or batch)
|
||||
if (url.pathname === '/api/request' && method === 'POST') {
|
||||
if (!auth()) return;
|
||||
const b = await readBody();
|
||||
const now = Math.floor(Date.now()/1000);
|
||||
insertReq.run(b.node_id,b.upstream,b.model,b.status||200,
|
||||
b.input_tokens||0,b.output_tokens||0,b.ttft_ms||0,b.total_ms||0,
|
||||
b.success!==false?1:0, b.ts||now);
|
||||
broadcast({ type:'request', request: b });
|
||||
return json(200, { ok: true });
|
||||
const items = Array.isArray(b) ? b : [b];
|
||||
for (const r of items) {
|
||||
insertReq.run(r.node_id,r.upstream,r.model,r.status||200,
|
||||
r.input_tokens||0,r.output_tokens||0,r.ttft_ms||0,r.total_ms||0,
|
||||
r.success!==false?1:0, r.ts||now);
|
||||
}
|
||||
if (items.length <= 5) items.forEach(r => broadcast({ type:'request', request: r }));
|
||||
return json(200, { ok: true, count: items.length });
|
||||
}
|
||||
|
||||
// POST /api/node/rename
|
||||
|
||||
Reference in New Issue
Block a user