332 lines
13 KiB
Bash
Executable File
332 lines
13 KiB
Bash
Executable File
#!/bin/bash
|
|
# OpenClaw Monitor Agent
|
|
set -e
|
|
SERVER="" TOKEN="" NAME="" ROLE="worker" INTERVAL=10 DEFAULT_PROV=""
|
|
|
|
while getopts "s:t:n:r:i:d:" opt; do
|
|
case $opt in s)SERVER="$OPTARG";;t)TOKEN="$OPTARG";;n)NAME="$OPTARG";;r)ROLE="$OPTARG";;i)INTERVAL="$OPTARG";;d)DEFAULT_PROV="$OPTARG";;esac
|
|
done
|
|
[ -z "$SERVER" ] || [ -z "$TOKEN" ] && echo "Usage: $0 -s SERVER_URL -t TOKEN [-n name] [-r role]" && exit 1
|
|
|
|
NODE_ID=$(hostname | md5sum 2>/dev/null | cut -c1-12 || md5 -qs "$(hostname)" | cut -c1-12)
|
|
NAME="${NAME:-$(hostname)}"
|
|
OC_CONFIG=""
|
|
for p in "$HOME/.openclaw/openclaw.json" "/root/.openclaw/openclaw.json"; do
|
|
[ -f "$p" ] && OC_CONFIG="$p" && break
|
|
done
|
|
OC_VERSION=$(openclaw --version 2>/dev/null | head -1 || echo "unknown")
|
|
|
|
echo "OC Monitor Agent: node=$NODE_ID name=$NAME server=$SERVER"
|
|
|
|
TICK=0
|
|
while true; do
|
|
if [ $((TICK % 6)) -eq 0 ]; then
|
|
# Full run: collect everything
|
|
python3 - "$OC_CONFIG" "$NODE_ID" "$NAME" "$OC_VERSION" "$ROLE" "$DEFAULT_PROV" << 'PYEOF' > /tmp/.oc-agent-payload.json
|
|
import json,subprocess,os,platform,time,sys,glob
|
|
|
|
def run(cmd):
|
|
try: return subprocess.check_output(cmd,shell=True,stderr=subprocess.DEVNULL).decode().strip()
|
|
except: return ''
|
|
|
|
cfg,nid,name,ver,role = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5]
|
|
default_prov = sys.argv[6] if len(sys.argv)>6 else ''
|
|
full_run = sys.argv[7]=='1' if len(sys.argv)>7 else True
|
|
mac = sys.platform=='darwin'
|
|
|
|
# Host IP - try multiple interfaces on macOS
|
|
if mac:
|
|
host = run("ifconfig | grep 'inet ' | grep -v 127 | head -1 | awk '{print $2}'")
|
|
else:
|
|
host = run("hostname -I | awk '{print $1}'")
|
|
if not host: host = 'unknown'
|
|
|
|
# Providers
|
|
providers = []
|
|
if cfg and os.path.exists(cfg):
|
|
try:
|
|
c = json.load(open(cfg))
|
|
m = c.get('models',{})
|
|
# Auto-detect default: most used provider from recent sessions
|
|
dp = default_prov or m.get('default','')
|
|
if not dp:
|
|
from collections import Counter
|
|
prov_count = Counter()
|
|
for sd2 in glob.glob(os.path.join(os.path.expanduser('~/.openclaw'),'agents','*','sessions')):
|
|
for jf2 in glob.glob(os.path.join(sd2,'*.jsonl')):
|
|
if os.path.getmtime(jf2) < time.time()-7*86400: continue
|
|
try:
|
|
with open(jf2) as f2:
|
|
for ln in f2:
|
|
if '"provider"' in ln:
|
|
try: prov_count[json.loads(ln).get('message',{}).get('provider','')] += 1
|
|
except: pass
|
|
except: pass
|
|
if prov_count: dp = prov_count.most_common(1)[0][0]
|
|
default_name = dp.split('/')[0] if '/' in dp else dp
|
|
for n,p in m.get('providers',{}).items():
|
|
if not isinstance(p,dict): continue
|
|
base = p.get('baseUrl','')
|
|
key = p.get('apiKey','')
|
|
api_type = p.get('api','')
|
|
models = [mod.get('id','') for mod in p.get('models',[])]
|
|
providers.append({'name':n,'model':' | '.join(models),'_test_model':models[0] if models else '',
|
|
'api':api_type,'default':n==default_name,'_base':base,'_key':key})
|
|
providers.sort(key=lambda x: (not x.get('default',False), x['name']))
|
|
except: pass
|
|
|
|
# Health check each provider
|
|
import urllib.request,urllib.error
|
|
# macOS: use system proxy (Surge) if available
|
|
_opener = None
|
|
if mac:
|
|
try:
|
|
proxy = urllib.request.ProxyHandler({'http':'http://127.0.0.1:6152','https':'http://127.0.0.1:6152'})
|
|
_opener = urllib.request.build_opener(proxy)
|
|
except: pass
|
|
|
|
def _urlopen(req,timeout=10):
|
|
if _opener: return _opener.open(req,timeout=timeout)
|
|
return urllib.request.urlopen(req,timeout=timeout)
|
|
|
|
def check_provider(p):
|
|
base,key,api = p.get('_base',''),p.get('_key',''),p.get('api','')
|
|
if not base or not key: return {'ok':False,'ms':0,'err':'no config'}
|
|
try:
|
|
t0 = time.time()
|
|
if 'anthropic' in api:
|
|
url = base.rstrip('/')+'/v1/messages'
|
|
data = json.dumps({"model":p.get('_test_model',p['model']),"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode()
|
|
req = urllib.request.Request(url,data,{'Content-Type':'application/json','x-api-key':key,'anthropic-version':'2023-06-01'})
|
|
else:
|
|
url = base.rstrip('/')+'/chat/completions'
|
|
data = json.dumps({"model":p.get('_test_model',p['model']),"max_tokens":1,"messages":[{"role":"user","content":"hi"}]}).encode()
|
|
req = urllib.request.Request(url,data,{'Content-Type':'application/json','Authorization':'Bearer '+key})
|
|
resp = _urlopen(req,timeout=10)
|
|
ms = int((time.time()-t0)*1000)
|
|
return {'ok':True,'ms':ms,'err':''}
|
|
except urllib.error.HTTPError as e:
|
|
ms = int((time.time()-t0)*1000)
|
|
code = e.code
|
|
if code == 429: return {'ok':False,'ms':ms,'err':'限额'}
|
|
if code in (402,): return {'ok':False,'ms':ms,'err':'余额不足'}
|
|
return {'ok':True,'ms':ms,'err':''} # 400/401/403/422 means API is reachable
|
|
except Exception as e:
|
|
return {'ok':False,'ms':0,'err':'timeout'}
|
|
|
|
for p in providers:
|
|
r = check_provider(p)
|
|
p['status'] = 'ok' if r['ok'] else 'err'
|
|
p['ms'] = r['ms']
|
|
p['err'] = r['err']
|
|
del p['_base'], p['_key']
|
|
p.pop('_test_model',None)
|
|
|
|
# CPU
|
|
if mac:
|
|
raw = run("ps -A -o %cpu | tail -n +2")
|
|
try: cpu = round(sum(float(x) for x in raw.split() if x) / os.cpu_count(), 1)
|
|
except: cpu = 0
|
|
else:
|
|
cpu = float(run("top -bn1 | grep 'Cpu' | awk '{print 100-$8}'") or 0)
|
|
|
|
# Memory
|
|
if mac:
|
|
try:
|
|
vm = run('vm_stat')
|
|
d = {}
|
|
for l in vm.split('\n')[1:]:
|
|
if ':' not in l: continue
|
|
k,v = l.split(':',1)
|
|
d[k.strip()] = int(v.strip().rstrip('.'))
|
|
used = d.get('Pages active',0) + d.get('Pages wired down',0)
|
|
total = used + d.get('Pages free',0) + d.get('Pages inactive',0) + d.get('Pages speculative',0)
|
|
mem = round(used/total*100,1) if total else 0
|
|
except: mem = 0
|
|
else:
|
|
mem = float(run("free | awk '/Mem/{printf \"%.1f\",$3/$2*100}'") or 0)
|
|
|
|
# Disk
|
|
disk = float(run("df / | awk 'NR==2{gsub(/%/,\"\",$5);print $5}'") or 0)
|
|
|
|
# Swap
|
|
if mac:
|
|
try:
|
|
sw = run('sysctl vm.swapusage')
|
|
used_sw = float(sw.split('used = ')[1].split('M')[0])
|
|
total_sw = float(sw.split('total = ')[1].split('M')[0])
|
|
swap = round(used_sw/total_sw*100,1) if total_sw > 0 else 0
|
|
except: swap = 0
|
|
else:
|
|
swap = float(run("free | awk '/Swap/{if($2>0)printf \"%.1f\",$3/$2*100;else print 0}'") or 0)
|
|
|
|
# Uptime
|
|
if mac:
|
|
try:
|
|
b = run('sysctl -n kern.boottime')
|
|
uptime = int(time.time()) - int(b.split('sec = ')[1].split(',')[0])
|
|
except: uptime = 0
|
|
else:
|
|
try: uptime = int(float(open('/proc/uptime').read().split()[0]))
|
|
except: uptime = 0
|
|
|
|
# Gateway/daemon - check by process name
|
|
gw = bool(run('pgrep -f "openclaw"'))
|
|
daemon = gw # if openclaw is running, both are likely up
|
|
|
|
# Sessions - count jsonl files in agents/*/sessions/
|
|
sessions = 0
|
|
oc_dir = os.path.expanduser('~/.openclaw')
|
|
for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')):
|
|
sessions += len(glob.glob(os.path.join(sd,'*.jsonl')))
|
|
|
|
# Token usage from session jsonl files
|
|
tok_today=tok_week=tok_month=0
|
|
now_ts = time.time()
|
|
day_ago = now_ts - 86400
|
|
week_ago = now_ts - 7*86400
|
|
month_ago = now_ts - 30*86400
|
|
sess_dirs = glob.glob(os.path.join(oc_dir,'agents','*','sessions'))
|
|
for sd in sess_dirs:
|
|
for jf in glob.glob(os.path.join(sd,'*.jsonl')):
|
|
try:
|
|
mtime = os.path.getmtime(jf)
|
|
if mtime < month_ago: continue
|
|
with open(jf) as f:
|
|
for line in f:
|
|
if '"usage"' not in line: continue
|
|
try:
|
|
d = json.loads(line)
|
|
u = d.get('message',{}).get('usage',{})
|
|
if not u: continue
|
|
ts = d.get('timestamp',0)
|
|
if isinstance(ts,str):
|
|
from datetime import datetime
|
|
ts = datetime.fromisoformat(ts.replace('Z','+00:00')).timestamp()
|
|
total = u.get('input',0) + u.get('output',0) + u.get('cacheRead',0) + u.get('cacheWrite',0)
|
|
if ts > day_ago: tok_today += total
|
|
if ts > week_ago: tok_week += total
|
|
if ts > month_ago: tok_month += total
|
|
except: pass
|
|
except: pass
|
|
|
|
print(json.dumps({
|
|
'id':nid,'name':name,'host':host,
|
|
'os':platform.system()+' '+platform.release()+' '+platform.machine(),
|
|
'oc_version':ver,'role':role,'providers':providers,
|
|
'cpu':cpu,'mem':mem,'disk':disk,'swap':swap,
|
|
'sessions':sessions,'gw_ok':gw,'daemon_ok':daemon,'uptime':uptime,
|
|
'tok_today':tok_today,'tok_week':tok_week,'tok_month':tok_month
|
|
}))
|
|
PYEOF
|
|
else
|
|
# Light run: only update metrics in cached payload
|
|
python3 -c "
|
|
import json,subprocess,os,platform
|
|
def run(cmd):
|
|
try: return subprocess.check_output(cmd,shell=True,stderr=subprocess.DEVNULL).decode().strip()
|
|
except: return ''
|
|
mac=platform.system()=='Darwin'
|
|
if mac:
|
|
raw=run('ps -A -o %cpu | tail -n +2')
|
|
try: cpu=round(sum(float(x) for x in raw.split() if x)/os.cpu_count(),1)
|
|
except: cpu=0
|
|
else:
|
|
cpu=float(run(\"top -bn1 | grep 'Cpu' | awk '{print 100-\$8}'\") or 0)
|
|
ds=run('df -h / | tail -1').split()
|
|
disk=int(ds[4].replace('%','')) if len(ds)>4 else 0
|
|
if mac:
|
|
try:
|
|
vm=run('vm_stat');d={}
|
|
for l in vm.split(chr(10))[1:]:
|
|
if ':' in l: k,v=l.split(':',1);d[k.strip()]=int(v.strip().rstrip('.'))
|
|
ps=16384;used=(d.get('Pages active',0)+d.get('Pages wired down',0))*ps;total=int(run('sysctl -n hw.memsize'))
|
|
mem=round(used/total*100,1)
|
|
except: mem=0
|
|
si=run('sysctl vm.swapusage');swap=0
|
|
if si:
|
|
import re;u=re.search(r'used\s*=\s*([\d.]+)M',si);t=re.search(r'total\s*=\s*([\d.]+)M',si)
|
|
if u and t and float(t.group(1))>0: swap=round(float(u.group(1))/float(t.group(1))*100,1)
|
|
else:
|
|
mi=run('free -b | grep Mem').split();mem=round(int(mi[2])/int(mi[1])*100,1) if len(mi)>2 else 0
|
|
si=run('free -b | grep Swap').split();swap=round(int(si[2])/int(si[1])*100,1) if len(si)>2 and int(si[1])>0 else 0
|
|
try:
|
|
p=json.load(open('/tmp/.oc-agent-payload.json'))
|
|
p['cpu']=cpu;p['mem']=mem;p['disk']=disk;p['swap']=swap
|
|
json.dump(p,open('/tmp/.oc-agent-payload.json','w'))
|
|
except: pass
|
|
" 2>/dev/null
|
|
fi
|
|
|
|
if [ -s /tmp/.oc-agent-payload.json ]; then
|
|
curl -sS -X POST "$SERVER/api/heartbeat" \
|
|
-H "Authorization: Bearer $TOKEN" \
|
|
-H "Content-Type: application/json" \
|
|
-d @/tmp/.oc-agent-payload.json -m 10 >/dev/null 2>&1 \
|
|
&& echo "[$(date +%H:%M:%S)] heartbeat ok" \
|
|
|| echo "[$(date +%H:%M:%S)] heartbeat failed"
|
|
fi
|
|
|
|
# Report new API requests only on full runs
|
|
if [ $((TICK % 6)) -eq 0 ]; then
|
|
python3 - "$OC_CONFIG" "$NODE_ID" "$SERVER" "$TOKEN" << 'REQEOF' 2>/dev/null
|
|
import json,os,sys,glob,time,urllib.request,urllib.error
|
|
cfg,nid,server,token = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]
|
|
oc_dir = os.path.expanduser('~/.openclaw')
|
|
state_f = '/tmp/.oc-agent-req-state.json'
|
|
state = {}
|
|
if os.path.exists(state_f):
|
|
try: state = json.load(open(state_f))
|
|
except: pass
|
|
|
|
reqs = []
|
|
for sd in glob.glob(os.path.join(oc_dir,'agents','*','sessions')):
|
|
for jf in glob.glob(os.path.join(sd,'*.jsonl')):
|
|
if os.path.getmtime(jf) < time.time()-86400: continue
|
|
prev_pos = state.get(jf,0)
|
|
try:
|
|
sz = os.path.getsize(jf)
|
|
if sz <= prev_pos: continue
|
|
with open(jf) as f:
|
|
f.seek(prev_pos)
|
|
for line in f:
|
|
if '"usage"' not in line: continue
|
|
try:
|
|
d = json.loads(line)
|
|
m = d.get('message',{})
|
|
u = m.get('usage',{})
|
|
if not u or not m.get('provider'): continue
|
|
ts_raw = d.get('timestamp',0)
|
|
if isinstance(ts_raw,str):
|
|
from datetime import datetime
|
|
ts_val = int(datetime.fromisoformat(ts_raw.replace('Z','+00:00')).timestamp())
|
|
else:
|
|
ts_val = int(ts_raw)
|
|
reqs.append({'node_id':nid,'upstream':m.get('provider',''),
|
|
'model':m.get('model',''),'status':200,
|
|
'input_tokens':u.get('input',0),'output_tokens':u.get('output',0),
|
|
'cache_read':u.get('cacheRead',0),'cache_write':u.get('cacheWrite',0),
|
|
'ttft_ms':0,'total_ms':0,'success':True,
|
|
'ts':ts_val})
|
|
except: pass
|
|
state[jf] = f.tell()
|
|
except: pass
|
|
|
|
# Send new requests (batch)
|
|
if reqs:
|
|
try:
|
|
data = json.dumps(reqs).encode()
|
|
req = urllib.request.Request(server+'/api/request',data,
|
|
{'Content-Type':'application/json','Authorization':'Bearer '+token})
|
|
urllib.request.urlopen(req,timeout=15)
|
|
except: pass
|
|
|
|
json.dump(state,open(state_f,'w'))
|
|
if reqs: print(f'[{time.strftime("%H:%M:%S")}] reported {len(reqs)} requests')
|
|
REQEOF
|
|
fi
|
|
|
|
TICK=$((TICK + 1))
|
|
sleep "$INTERVAL"
|
|
done
|