From a78bfdd49c45f67f917791e5645a9b005034580f Mon Sep 17 00:00:00 2001 From: mango Date: Sun, 22 Feb 2026 18:43:07 +0800 Subject: [PATCH] feat: batch request reporting + fix timestamp parsing --- agent/agent.sh | 71 ++++++++++++++++++++++++++++++++++++++++++++++++- server/index.js | 15 ++++++----- 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/agent/agent.sh b/agent/agent.sh index f704832..4a9b8c4 100755 --- a/agent/agent.sh +++ b/agent/agent.sh @@ -59,6 +59,18 @@ if cfg and os.path.exists(cfg): # Health check each provider import urllib.request,urllib.error +# macOS: use system proxy (Surge) if available +_opener = None +if mac: + try: + proxy = urllib.request.ProxyHandler({'http':'http://127.0.0.1:6152','https':'http://127.0.0.1:6152'}) + _opener = urllib.request.build_opener(proxy) + except: pass + +def _urlopen(req,timeout=10): + if _opener: return _opener.open(req,timeout=timeout) + return urllib.request.urlopen(req,timeout=timeout) + def check_provider(p): base,key,api = p.get('_base',''),p.get('_key',''),p.get('api','') if not base or not key: return {'ok':False,'ms':0,'err':'no config'} @@ -72,7 +84,7 @@ def check_provider(p): url = base.rstrip('/')+'/chat/completions' data = json.dumps({"model":p['model'],"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode() req = urllib.request.Request(url,data,{'Content-Type':'application/json','Authorization':'Bearer '+key}) - resp = urllib.request.urlopen(req,timeout=10) + resp = _urlopen(req,timeout=10) ms = int((time.time()-t0)*1000) return {'ok':True,'ms':ms,'err':''} except urllib.error.HTTPError as e: @@ -197,5 +209,62 @@ PYEOF && echo "[$(date +%H:%M:%S)] heartbeat ok" \ || echo "[$(date +%H:%M:%S)] heartbeat failed" fi + + # Report new API requests from session jsonl files + python3 - "$OC_CONFIG" "$NODE_ID" "$SERVER" "$TOKEN" << 'REQEOF' 2>/dev/null +import json,os,sys,glob,time,urllib.request,urllib.error +cfg,nid,server,token = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4] +oc_dir = os.path.expanduser('~/.openclaw') +state_f = '/tmp/.oc-agent-req-state.json' +state = {} +if os.path.exists(state_f): + try: state = json.load(open(state_f)) + except: pass + +reqs = [] +for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')): + for jf in glob.glob(os.path.join(sd,'*.jsonl')): + if os.path.getmtime(jf) < time.time()-86400: continue + prev_pos = state.get(jf,0) + try: + sz = os.path.getsize(jf) + if sz <= prev_pos: continue + with open(jf) as f: + f.seek(prev_pos) + for line in f: + if '"usage"' not in line: continue + try: + d = json.loads(line) + m = d.get('message',{}) + u = m.get('usage',{}) + if not u or not m.get('provider'): continue + ts_raw = d.get('timestamp',0) + if isinstance(ts_raw,str): + from datetime import datetime + ts_val = int(datetime.fromisoformat(ts_raw.replace('Z','+00:00')).timestamp()) + else: + ts_val = int(ts_raw) + reqs.append({'node_id':nid,'upstream':m.get('provider',''), + 'model':m.get('model',''),'status':200, + 'input_tokens':u.get('input',0),'output_tokens':u.get('output',0), + 'ttft_ms':0,'total_ms':0,'success':True, + 'ts':ts_val}) + except: pass + state[jf] = f.tell() + except: pass + +# Send new requests (batch) +if reqs: + try: + data = json.dumps(reqs).encode() + req = urllib.request.Request(server+'/api/request',data, + {'Content-Type':'application/json','Authorization':'Bearer '+token}) + urllib.request.urlopen(req,timeout=15) + except: pass + +json.dump(state,open(state_f,'w')) +if reqs: print(f'[{time.strftime("%H:%M:%S")}] reported {len(reqs)} requests') +REQEOF + sleep "$INTERVAL" done diff --git a/server/index.js b/server/index.js index b344c5b..198735c 100644 --- a/server/index.js +++ b/server/index.js @@ -140,16 +140,19 @@ const server = http.createServer((req, res) => { return json(200, { ok: true }); } - // POST /api/request - agent reports API call + // POST /api/request - agent reports API call (single or batch) if (url.pathname === '/api/request' && method === 'POST') { if (!auth()) return; const b = await readBody(); const now = Math.floor(Date.now()/1000); - insertReq.run(b.node_id,b.upstream,b.model,b.status||200, - b.input_tokens||0,b.output_tokens||0,b.ttft_ms||0,b.total_ms||0, - b.success!==false?1:0, b.ts||now); - broadcast({ type:'request', request: b }); - return json(200, { ok: true }); + const items = Array.isArray(b) ? b : [b]; + for (const r of items) { + insertReq.run(r.node_id,r.upstream,r.model,r.status||200, + r.input_tokens||0,r.output_tokens||0,r.ttft_ms||0,r.total_ms||0, + r.success!==false?1:0, r.ts||now); + } + if (items.length <= 5) items.forEach(r => broadcast({ type:'request', request: r })); + return json(200, { ok: true, count: items.length }); } // POST /api/node/rename