Files
oc-monitor/agent/agent.sh

332 lines
13 KiB
Bash
Executable File

#!/bin/bash
# OpenClaw Monitor Agent
set -e
SERVER="" TOKEN="" NAME="" ROLE="worker" INTERVAL=10 DEFAULT_PROV=""
while getopts "s:t:n:r:i:d:" opt; do
case $opt in s)SERVER="$OPTARG";;t)TOKEN="$OPTARG";;n)NAME="$OPTARG";;r)ROLE="$OPTARG";;i)INTERVAL="$OPTARG";;d)DEFAULT_PROV="$OPTARG";;esac
done
[ -z "$SERVER" ] || [ -z "$TOKEN" ] && echo "Usage: $0 -s SERVER_URL -t TOKEN [-n name] [-r role]" && exit 1
NODE_ID=$(hostname | md5sum 2>/dev/null | cut -c1-12 || md5 -qs "$(hostname)" | cut -c1-12)
NAME="${NAME:-$(hostname)}"
OC_CONFIG=""
for p in "$HOME/.openclaw/openclaw.json" "/root/.openclaw/openclaw.json"; do
[ -f "$p" ] && OC_CONFIG="$p" && break
done
OC_VERSION=$(openclaw --version 2>/dev/null | head -1 || echo "unknown")
echo "OC Monitor Agent: node=$NODE_ID name=$NAME server=$SERVER"
TICK=0
while true; do
if [ $((TICK % 6)) -eq 0 ]; then
# Full run: collect everything
python3 - "$OC_CONFIG" "$NODE_ID" "$NAME" "$OC_VERSION" "$ROLE" "$DEFAULT_PROV" << 'PYEOF' > /tmp/.oc-agent-payload.json
import json,subprocess,os,platform,time,sys,glob
def run(cmd):
try: return subprocess.check_output(cmd,shell=True,stderr=subprocess.DEVNULL).decode().strip()
except: return ''
cfg,nid,name,ver,role = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5]
default_prov = sys.argv[6] if len(sys.argv)>6 else ''
full_run = sys.argv[7]=='1' if len(sys.argv)>7 else True
mac = sys.platform=='darwin'
# Host IP - try multiple interfaces on macOS
if mac:
host = run("ifconfig | grep 'inet ' | grep -v 127 | head -1 | awk '{print $2}'")
else:
host = run("hostname -I | awk '{print $1}'")
if not host: host = 'unknown'
# Providers
providers = []
if cfg and os.path.exists(cfg):
try:
c = json.load(open(cfg))
m = c.get('models',{})
# Auto-detect default: most used provider from recent sessions
dp = default_prov or m.get('default','')
if not dp:
from collections import Counter
prov_count = Counter()
for sd2 in glob.glob(os.path.join(os.path.expanduser('~/.openclaw'),'agents','*','sessions')):
for jf2 in glob.glob(os.path.join(sd2,'*.jsonl')):
if os.path.getmtime(jf2) < time.time()-7*86400: continue
try:
with open(jf2) as f2:
for ln in f2:
if '"provider"' in ln:
try: prov_count[json.loads(ln).get('message',{}).get('provider','')] += 1
except: pass
except: pass
if prov_count: dp = prov_count.most_common(1)[0][0]
default_name = dp.split('/')[0] if '/' in dp else dp
for n,p in m.get('providers',{}).items():
if not isinstance(p,dict): continue
base = p.get('baseUrl','')
key = p.get('apiKey','')
api_type = p.get('api','')
models = [mod.get('id','') for mod in p.get('models',[])]
providers.append({'name':n,'model':' | '.join(models),'_test_model':models[0] if models else '',
'api':api_type,'default':n==default_name,'_base':base,'_key':key})
providers.sort(key=lambda x: (not x.get('default',False), x['name']))
except: pass
# Health check each provider
import urllib.request,urllib.error
# macOS: use system proxy (Surge) if available
_opener = None
if mac:
try:
proxy = urllib.request.ProxyHandler({'http':'http://127.0.0.1:6152','https':'http://127.0.0.1:6152'})
_opener = urllib.request.build_opener(proxy)
except: pass
def _urlopen(req,timeout=10):
if _opener: return _opener.open(req,timeout=timeout)
return urllib.request.urlopen(req,timeout=timeout)
def check_provider(p):
base,key,api = p.get('_base',''),p.get('_key',''),p.get('api','')
if not base or not key: return {'ok':False,'ms':0,'err':'no config'}
try:
t0 = time.time()
if 'anthropic' in api:
url = base.rstrip('/')+'/v1/messages'
data = json.dumps({"model":p.get('_test_model',p['model']),"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode()
req = urllib.request.Request(url,data,{'Content-Type':'application/json','x-api-key':key,'anthropic-version':'2023-06-01'})
else:
url = base.rstrip('/')+'/chat/completions'
data = json.dumps({"model":p.get('_test_model',p['model']),"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode()
req = urllib.request.Request(url,data,{'Content-Type':'application/json','Authorization':'Bearer '+key})
resp = _urlopen(req,timeout=10)
ms = int((time.time()-t0)*1000)
return {'ok':True,'ms':ms,'err':''}
except urllib.error.HTTPError as e:
ms = int((time.time()-t0)*1000)
code = e.code
if code == 429: return {'ok':False,'ms':ms,'err':'限额'}
if code in (402,): return {'ok':False,'ms':ms,'err':'余额不足'}
return {'ok':True,'ms':ms,'err':''} # 400/401/403/422 means API is reachable
except Exception as e:
return {'ok':False,'ms':0,'err':'timeout'}
for p in providers:
r = check_provider(p)
p['status'] = 'ok' if r['ok'] else 'err'
p['ms'] = r['ms']
p['err'] = r['err']
del p['_base'], p['_key']
p.pop('_test_model',None)
# CPU
if mac:
raw = run("ps -A -o %cpu | tail -n +2")
try: cpu = round(sum(float(x) for x in raw.split() if x) / os.cpu_count(), 1)
except: cpu = 0
else:
cpu = float(run("top -bn1 | grep 'Cpu' | awk '{print 100-$8}'") or 0)
# Memory
if mac:
try:
vm = run('vm_stat')
d = {}
for l in vm.split('\n')[1:]:
if ':' not in l: continue
k,v = l.split(':',1)
d[k.strip()] = int(v.strip().rstrip('.'))
used = d.get('Pages active',0) + d.get('Pages wired down',0)
total = used + d.get('Pages free',0) + d.get('Pages inactive',0) + d.get('Pages speculative',0)
mem = round(used/total*100,1) if total else 0
except: mem = 0
else:
mem = float(run("free | awk '/Mem/{printf \"%.1f\",$3/$2*100}'") or 0)
# Disk
disk = float(run("df / | awk 'NR==2{gsub(/%/,\"\",$5);print $5}'") or 0)
# Swap
if mac:
try:
sw = run('sysctl vm.swapusage')
used_sw = float(sw.split('used = ')[1].split('M')[0])
total_sw = float(sw.split('total = ')[1].split('M')[0])
swap = round(used_sw/total_sw*100,1) if total_sw > 0 else 0
except: swap = 0
else:
swap = float(run("free | awk '/Swap/{if($2>0)printf \"%.1f\",$3/$2*100;else print 0}'") or 0)
# Uptime
if mac:
try:
b = run('sysctl -n kern.boottime')
uptime = int(time.time()) - int(b.split('sec = ')[1].split(',')[0])
except: uptime = 0
else:
try: uptime = int(float(open('/proc/uptime').read().split()[0]))
except: uptime = 0
# Gateway/daemon - check by process name
gw = bool(run('pgrep -f "openclaw"'))
daemon = gw # if openclaw is running, both are likely up
# Sessions - count jsonl files in agents/*/sessions/
sessions = 0
oc_dir = os.path.expanduser('~/.openclaw')
for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')):
sessions += len(glob.glob(os.path.join(sd,'*.jsonl')))
# Token usage from session jsonl files
tok_today=tok_week=tok_month=0
now_ts = time.time()
day_ago = now_ts - 86400
week_ago = now_ts - 7*86400
month_ago = now_ts - 30*86400
sess_dirs = glob.glob(os.path.join(oc_dir,'agents','*','sessions'))
for sd in sess_dirs:
for jf in glob.glob(os.path.join(sd,'*.jsonl')):
try:
mtime = os.path.getmtime(jf)
if mtime < month_ago: continue
with open(jf) as f:
for line in f:
if '"usage"' not in line: continue
try:
d = json.loads(line)
u = d.get('message',{}).get('usage',{})
if not u: continue
ts = d.get('timestamp',0)
if isinstance(ts,str):
from datetime import datetime
ts = datetime.fromisoformat(ts.replace('Z','+00:00')).timestamp()
total = u.get('input',0) + u.get('output',0) + u.get('cacheRead',0) + u.get('cacheWrite',0)
if ts > day_ago: tok_today += total
if ts > week_ago: tok_week += total
if ts > month_ago: tok_month += total
except: pass
except: pass
print(json.dumps({
'id':nid,'name':name,'host':host,
'os':platform.system()+' '+platform.release()+' '+platform.machine(),
'oc_version':ver,'role':role,'providers':providers,
'cpu':cpu,'mem':mem,'disk':disk,'swap':swap,
'sessions':sessions,'gw_ok':gw,'daemon_ok':daemon,'uptime':uptime,
'tok_today':tok_today,'tok_week':tok_week,'tok_month':tok_month
}))
PYEOF
else
# Light run: only update metrics in cached payload
python3 -c "
import json,subprocess,os,platform
def run(cmd):
try: return subprocess.check_output(cmd,shell=True,stderr=subprocess.DEVNULL).decode().strip()
except: return ''
mac=platform.system()=='Darwin'
if mac:
raw=run('ps -A -o %cpu | tail -n +2')
try: cpu=round(sum(float(x) for x in raw.split() if x)/os.cpu_count(),1)
except: cpu=0
else:
cpu=float(run(\"top -bn1 | grep 'Cpu' | awk '{print 100-\$8}'\") or 0)
ds=run('df -h / | tail -1').split()
disk=int(ds[4].replace('%','')) if len(ds)>4 else 0
if mac:
try:
vm=run('vm_stat');d={}
for l in vm.split(chr(10))[1:]:
if ':' in l: k,v=l.split(':',1);d[k.strip()]=int(v.strip().rstrip('.'))
ps=16384;used=(d.get('Pages active',0)+d.get('Pages wired down',0))*ps;total=int(run('sysctl -n hw.memsize'))
mem=round(used/total*100,1)
except: mem=0
si=run('sysctl vm.swapusage');swap=0
if si:
import re;u=re.search(r'used\s*=\s*([\d.]+)M',si);t=re.search(r'total\s*=\s*([\d.]+)M',si)
if u and t and float(t.group(1))>0: swap=round(float(u.group(1))/float(t.group(1))*100,1)
else:
mi=run('free -b | grep Mem').split();mem=round(int(mi[2])/int(mi[1])*100,1) if len(mi)>2 else 0
si=run('free -b | grep Swap').split();swap=round(int(si[2])/int(si[1])*100,1) if len(si)>2 and int(si[1])>0 else 0
try:
p=json.load(open('/tmp/.oc-agent-payload.json'))
p['cpu']=cpu;p['mem']=mem;p['disk']=disk;p['swap']=swap
json.dump(p,open('/tmp/.oc-agent-payload.json','w'))
except: pass
" 2>/dev/null
fi
if [ -s /tmp/.oc-agent-payload.json ]; then
curl -sS -X POST "$SERVER/api/heartbeat" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d @/tmp/.oc-agent-payload.json -m 10 >/dev/null 2>&1 \
&& echo "[$(date +%H:%M:%S)] heartbeat ok" \
|| echo "[$(date +%H:%M:%S)] heartbeat failed"
fi
# Report new API requests only on full runs
if [ $((TICK % 6)) -eq 0 ]; then
python3 - "$OC_CONFIG" "$NODE_ID" "$SERVER" "$TOKEN" << 'REQEOF' 2>/dev/null
import json,os,sys,glob,time,urllib.request,urllib.error
cfg,nid,server,token = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]
oc_dir = os.path.expanduser('~/.openclaw')
state_f = '/tmp/.oc-agent-req-state.json'
state = {}
if os.path.exists(state_f):
try: state = json.load(open(state_f))
except: pass
reqs = []
for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')):
for jf in glob.glob(os.path.join(sd,'*.jsonl')):
if os.path.getmtime(jf) < time.time()-86400: continue
prev_pos = state.get(jf,0)
try:
sz = os.path.getsize(jf)
if sz <= prev_pos: continue
with open(jf) as f:
f.seek(prev_pos)
for line in f:
if '"usage"' not in line: continue
try:
d = json.loads(line)
m = d.get('message',{})
u = m.get('usage',{})
if not u or not m.get('provider'): continue
ts_raw = d.get('timestamp',0)
if isinstance(ts_raw,str):
from datetime import datetime
ts_val = int(datetime.fromisoformat(ts_raw.replace('Z','+00:00')).timestamp())
else:
ts_val = int(ts_raw)
reqs.append({'node_id':nid,'upstream':m.get('provider',''),
'model':m.get('model',''),'status':200,
'input_tokens':u.get('input',0),'output_tokens':u.get('output',0),
'cache_read':u.get('cacheRead',0),'cache_write':u.get('cacheWrite',0),
'ttft_ms':0,'total_ms':0,'success':True,
'ts':ts_val})
except: pass
state[jf] = f.tell()
except: pass
# Send new requests (batch)
if reqs:
try:
data = json.dumps(reqs).encode()
req = urllib.request.Request(server+'/api/request',data,
{'Content-Type':'application/json','Authorization':'Bearer '+token})
urllib.request.urlopen(req,timeout=15)
except: pass
json.dump(state,open(state_f,'w'))
if reqs: print(f'[{time.strftime("%H:%M:%S")}] reported {len(reqs)} requests')
REQEOF
fi
TICK=$((TICK + 1))
sleep "$INTERVAL"
done