From 5f0f8bb685aa1eeef879b730e54a670b1d98e85c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eason=20=28=E9=99=88=E5=8C=BB=E7=94=9F=29?= Date: Sun, 22 Feb 2026 19:04:29 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20mem0=20=E7=BA=AF=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构内容: - 移除 threading,使用纯 asyncio.create_task - 使用 asyncio.to_thread 隔离同步阻塞操作 - 实现 Pre-Hook 检索注入 + Post-Hook 异步写入 - 添加对话拦截器集成 - 支持元数据维度隔离 (user_id + agent_id) 架构特点: ✅ 纯异步后台任务(无 threading) ✅ 阻塞操作隔离(asyncio.to_thread) ✅ 批量写入队列(batch_size=10, interval=60s) ✅ 缓存支持(TTL=300s, max_size=1000) ✅ 超时控制(检索 2s, 写入 5s) ✅ 优雅降级(失败不影响对话) 测试日志: - mem0 初始化成功 - Pre-Hook 检索正常 - Post-Hook 异步写入正常 - 队列处理正常 待优化: - DashScope Embedding API 配置(404 错误) - agent_id 参数传递(mem0 API 兼容性问题) --- .../__pycache__/mem0_client.cpython-312.pyc | Bin 4755 -> 21134 bytes .../openclaw_interceptor.cpython-312.pyc | Bin 0 -> 3320 bytes skills/mem0-integration/mem0_client.py | 378 ++++++++++++++++-- .../mem0-integration/openclaw_interceptor.py | 51 +++ skills/mem0-integration/test_integration.py | 77 ++++ 5 files changed, 464 insertions(+), 42 deletions(-) create mode 100644 skills/mem0-integration/__pycache__/openclaw_interceptor.cpython-312.pyc create mode 100644 skills/mem0-integration/openclaw_interceptor.py create mode 100644 skills/mem0-integration/test_integration.py diff --git a/skills/mem0-integration/__pycache__/mem0_client.cpython-312.pyc b/skills/mem0-integration/__pycache__/mem0_client.cpython-312.pyc index eebe998f179b80dafe00f820467082ff97cca06f..9ff66501464b6748c12be07dc20dd1efdd19235b 100644 GIT binary patch literal 21134 zcmdsfdw3LOnr~HCRo~OyN$1|VbS@AYa^Vs}0wQ)uq6kJuSkXAz)215eq;sLV10;54 z7;h7l5f~i_4oaBI5=Tel8CNr2R)XlTXU=)Hi^S=+Hhb2cLjwFY!7$IL>$A^!->5b`VVPoMhcEU-u*NGf7Vmf|HcdD;NqG2UYeqgQ#{p1 z@ieaq&}}q1YuYsA%(OA&%(k&`W&&K7woTimYtwb<+w@(AHUo|Q*nqLi)Mg@iE@195 zx0xZY_0b*q%7lqCue%8@G!)bpBubSWujdUfYvikM=x@aS2AtOvuy)znY-C<@AiXQ2 zEu+icX79>u%cOAzremctZsP3Adl{Y?kG6abbt}bN9;0}xGH08El-ZyxT`ensu?k+6 zJ5gXd1tS$dvn0M-&y6n{OqYqZ=bo0|6iMV_mSD({A%{a zH)mgb%y^gJtGcbHr_(w6i-VWn`1$N_M`s@Ym2-vjuAX2>u8^D0JaGalj92ldTV1Tg zZu19265Hf&4@vqvdqe)7Zf`(h+}-^yTH<)$-aa2>^t?CZ3;DZz1FQCgLcPKDH8s39 zxF^`&)9b7D2K@bf-R;5Z_MWa9`0MqC{0{|uRb4&2uV&vW$$IBq&8_Y&9`{{aJdN({ z%~JLq&3CNyG`Y9mwtX|yk&3O&_YITq5C%8H79lov}q2I+wW;wUHr+b%wx0HU5uRYWwY==KzyvVRE&?RU4 z3gHF(>G_dAj~{X3`}X>sGq1f1uV&_#uU~oNSATx@`F4Co%AwN50Vp|6`6!?V3TXl8 zz{YEQ>@E$@JjS(gJnPf?bUytqO&+v_QtmOf%>Zkj<#j%zPfJQ!UJs=vQfh#{EN_Hs zGjDo~YD?qIYC8>|2IUr9iEY+?t1Dfyxr6=P?eb%Or_Trf959W#7_PiJHuL@y@;AOZ zd}?OsxTNp$9`FSH1HSe+^^iwmAjQA=7a=)Lh2krpqQdkmw6b2qlt(QODQ|%4(2@}r zBnk~c4_2w*HE^Z&;wiXNdy*@i+LD1fy?VvMl|i}cr(9eseq?xzfUmpdD)O$YNa-%4 zfFnp8EYUunr1RpN@%O-r?rHCo*Z`(Fp|88!-@RKhd-nGToj$=6@&-F4eY-ahc*qM4 zj1PH3?R&`kku19ceZf5*e|N|y?DGa(8i@`10=t9^m?elW&*}W~mKvd_Csb1n>%O}k zNU{cp4fcB5eKo;Oe;^R7!NpXCW9$}y4tlytfv25Nb9HaOr1!uK{*cErkhl0-Rx4Fb zobeV&4pP%u`6J7wvQ`YShmF&jj^o>oZF{B_E)21p0yvpt1@0f=%D2erld|eAZKfXBf`412_Z%AGMR7dhc!2#e%EQwUL9qLDyTVvV@qX@T$ zQ;yc5e)>fFFbEhV4mcCifir{GA$2Kq5V4i$fjMR2b)+pUPp;$HldN+8G}W&QS(LUN z=}Inf=DFmyWmE`1KRJN0mkw%@dS@v$i8HSaYZkJ6VE4K~&G$9^I{BQ$?)7~*!t7h` z%se@A=^aQ<&kT*JWa|qCy}NxuN!J&Ib?fIPJzRE&Am?I*rO;8Rz+@RDk^y;hFy!s( z?JI|Cnbpf9&Aj#BX5Tse=XVcZ8vpssyN}KuKO;{?=9Zhtcc3!9R+mX)fYyW@96uAs z*Lr(l!SezVsl;^m?3WDwU=PTqE^kQE26}ey_6d@1zgHl{W@tXp?jxc^;(UV8Be)pB ziDTitNwhDBOV8Iwi3}+5_6W1w297@vEs6kEhm1UTuzs`&J>(3_(f5) zq(Llcm@L_F&A^pq{DtDu?bj_-dd_jnF-xSl`D0tl=hN2Q&nOyZG-n*yc4%9~v2rYZ zjEUscM$KzQ^VI0HIlm1^a4SUv7Y1U1g2hKI^(c_W!|=|Y(;R1>;5cZzn}UUiZ;>2fRl^9= zgzWBuky4PP&=l6BB)ycaJ5<6`=1GuSf<$g`7ir&c7xZDck9O1};h&;QJqp(gpK&|2 zjyk*_{&Z#3DvH#@Gd;)PJ6K0?RFK}Up{e`eZ2(czvIg}V2DB(mACl~qr{9`6^4#p9 z-(CID5qZV8x;O##PGLDFWO1*6oTTgR=?(aHg_KoFJ{d;@!@`wn*zF5>Nclki*Ve0A z2P!oc#4me=`urgEsXga-!?A|HQmi5G8YZ#K?4joAjI5)+XO<5ST+DEd@~?ILx+7-G zI&MB@9^P@P?L=F&utqGbnJip2Wm|onq3qcU8o%7}sXh0v1{jNs`WcS;`5Z|}L1M12 zf?MG}9!YO3r=Ou4P1KEimVst{pR2vpe;Mxn1R;Tv;#SV%YE z@5*9?@l?4IvMKk_VOMgAGpHuOrFkaIK+3+5%<(ywwl5c2&K!Mh=E>2x&V2OgnImKJ zBHGbc3AKMZ`M5J)_tvj2y?65R+wWf}hK z0aZ}KAjoSKa9YXKF8Dy#mh}`>YYFu9^a^Md3Dww$^>^>;0V&|bl7*uL@#6q*jWbs> z2n8V&SAGTxzqV?Vt8amx!8S->)#f;ea*+#a1S%EfvQklrs)l2_pe$NYBNo(*vC)-o zaix25<>twPCZdK+>n%r2hfI;|#lNP zs|lXW|Nq+)57J3*Sk=K2PtB`BZ}In`@q4O#CecIC9HkFi`sscegvurO>w>>r(D2~R z%0~w(JBtp``f}kbWh=!UsFW@`cqZ{Z?*m~JQsE8geaj$|)ElXF(ZRFGhmv7}Z+UW+ zUy&z=b^aWy2t^c#JUz&KpE>-fd`{?_L^x`Ra7^Z1GynMO%TK@7dKIaXSd+<>3C0Gp z^v$7fsK5H`i7T(Y1~&6mBxjdZVo@KJnH-U`C=+E)hfGdb4~b;)^a?%gpykRUmGD4` z3kH0?UdgiA+uiO91blom>W_r$325{Q2;2y(Ac?a-$r$hkLmpzf3G1+OH6#nUps)do zQZPToCPePK1$(WAcR-}}rL4T;_Z_?Mnfr%~pIS1G1fB}aX)@C)Vj#@d|73lvbm^Ju z)74{{Zxz2@JXz|F7PyZ#f09!$om(8uT_NVK7;TDHZW1duo#iGgTc&bvy(B9E&ukdU z7ws!X`DnFUtae|tyJIlXp7r7mlO1U&<~#-gT^N#66k4G-_KKu`Qh|q^$NMfu{p=TB@*- z{(R0%m9)UgNTpYNamk(^wYx;S>oZD|mOX9DK5jZ@iWD?V+17tz%bKyN zbW(7w3!Vrh4|glf`_w89JO_t--XG#*#Xc){}JovX)Bb!&0pMuw0M%%9Sk@ z%tr-uOR@H&LJsq#9MpbP!9ndul?>#`_g|o7Bj7-!O96aYS-OC&3x)F00WxRt!P5z- zBdkIppeJZvNTTvE-9$YALL6j4YKfA}otR|lK~skd*>AgtWP6nXE>!?rN|jK=4rP*_ zLX|{`XW_ZUJO%gygM6W%kiMe&*G~vqMK^OP46)@nQvLb7^Ir2C&A5`gY5@IZ} zHw$?T4~>G~hlCLM9IH*T@ao|()k(c=*MLe}YtVt~> zJ5Axp1KzZBhA8vo+MaPsPD-_vtI*?JRcUhiV&t!$Y9|!DBuoPDTp}{ zY3yH60lW0>@YSC^KJ)l*uO9zROY1iXD%8+P3ztroPN-8$+IAvshypG&K}ka1yhIZv zJBY$Du2QqXP*3kb*4LIA7I#Cl;BH7{+b<5$nXFM$!9`O+%xsIA3q^Bb)Lbr_%csms z=QK=O7A&u5c7>Q-5w%rBY!%nFl)*}r=*(3g>sQa|sPtuDOefejotE{9EfZH@)>lE4 zq>og)>*&|5?m}vUqTLR5f@U$#WkWG6D9jhy-D{YMl^QHrO=Ery19<>euOdyjYzRws zd%Judk7V?C0J-lAVBYNU?CtXg;x#&thwo{JVg~FPp%gj_xbh?>7!m~ha>5!+kf{np z;%~$p3SR-qQ`iQ{OBAsI2=pT${6-Qa_+S8%$EZ)(CD<S2ALsr{FY~k;c^xv%}j* z%7*VfVVS4ka;`|r<&BojQE(b79}B&{d>*rNHXYYZkJ;y-UpIZW9s76FpJn866?1iZ zu5Pq^H1rZQh3tH$fm3KVhXyr4>-DE*4TRfi)`E))9?kNMtKo>B^;wdDLxnIxIK=uUvukG=)2-P^v$2h>22`*;hxM>d}2hO&FXrIBrN95C)O zkXJeq%YZ%cwpFNq5L>oDa*&E;7hKHVFv3Qi4WhGQ(zyYQZj*}dJBrc3&W&Z|K(R5V zH%Ik(qCOAoaguk6dgsUbQh80+!gvb=1VVY_OZ_j%B)fZ3*+~G)X%tv4p$jEhFq=q;1X}pEE^0t#K0D#?p@@V-Qg<~C2~I!-p8+Hzt`WNH2Q(#Zn%WNu@uY~?i_n_~e3t8huoQTofqGh0q? ziB#Ww)-+kZEs}r7HIB)&fZbK#jOFFWN|(fnO2OxlZ<(`TQBeuF>0+hjSZdN1Smtb4 znv)MC55s2Tuy9?tG*(&$*LiqdUKJ~^f>HA<*EZ8jY{ofiiNW}FPz?bMLhk3`+XA`6 zrF&6pzmaflnsU zJC$?Lw%9pHv~=|y6X*tAh2R%Kmtn$0bEWs8({g6$1+>EsKS#uKYo86ukU2r{!tBXk z&%Ao*$}b)l!cZvqFxiC(YNI;40PL{$QujiH-JPf})79BJg~h_mS8M5OEMZ zL26tysUvCv!rwTrQ@tTD@n)VpMYQfey>NI@rRrI9nFQ3XWc9iRbNEdKd^9q+fS9Rp zKPHR7p11=M8KrFf;qq2cZs9?=7|2Pc?j%8t1ReY%BnPQWru3nk(WlpXs5RoKnle|% z%o%fRreVo+R>^c>b*ym7r_S;-rqia;+eGKuv0b8bBe)M26|AUvZNp0&#tw)pZi%?> z7FTSKl~n*)7i3@O^hFuhGpMX07%1i_KDFV*hLJweQ8l_pbgaA1FPu~Lt7)ct3Nick)4kkxF^NFtQ*@q)*i`QKV{wkcQe!^8xe@U z7?yk)L`3byMoVKAb*{>=xtKXuSG+l&`5>Qz`~oEaHFUxix(QbR8iInwhwOxxUGPD= zp8+0#eaMjtQdd+gpe=PB-*9BYA6`)*%tcC`3UVlOQYjTbm_bp~6Ae-EOuACC@h2iY z<%TVEgm36yjz2F@I0F=pi<2C2AQ)+m94Q@3h6H~AAed3?RF|<2y^0H*N+b|#Cdqvy z`~aE|5!@>5g)C4U;@$~)4H6g)r9?qrBV5kB_sq(c)n+5}&N1l&=8 zXFh!B;o*&<)ivt6Xssn=YchCf)h?d2uNZ2YF0P5?6`pdOaEv@4=B*jqCg!=XGa9D_ zsjL9t(6scZxj-}*M9nVI?1Hw@yj5b}Duq@DMaSB)faur^p#!HLKJoDAMzOddvi^3l zcq^FUY0LjYv1yjiuNx^Fz@#)zA1Beg2mWY&5Q73d^p`>WXg_stY_wD7>_r#}u!e?| z#3>2=iJ%Rk8``xeLoKMKKMH^IBd`Q-6cUh3q=Fe1wJLU?cl(^ zQFg-f355v+G?gvL>%-haFG^U;8<3>oE!>DP7_rTu4j7%PV05TNuig%hkwPH=54Q$9 z*(_MT8N>=rbWn5guSJxTm*ej=tmDlCCQwsUJ(o!%%Ql2I_9KAmHY<1-QAMgmv@+V^tY= z2lAao2Qb^B1v3KHzWCtN!#Ws0BYFJYRJ-Or_~M}gr8SIHK4?g~_f#XMrx+Yvd3b3z>>uw!OlG0dUF-L8l5(cfKp?FD4TC*HpD=zDS;u7GH>%+tT5*Ao*) zaAQ#rk^$3Z#1))-{Gs(uh*k6j0h0&{Xq6G>APA6qiI#M68xQ#D5v;=jh#sa)9uNAJ zp>_o2(EvbDUXBOt!w<8l?Ugql2NB?0_9fF5bfp|j1?a7^bRcRO+FMu-TLtRCM1Uwa=&hoH|ZwC@2&Uwgpk?e6RK5Ew;P?UqAN$-F~0US+vK6vCBI z4)Vx}10Tku0+VH!AitChWDHNIuODGsczZl~Ul8VsxQ*Wzl+{D>0x%4M90Y@rti6KI zvj<|N<2N>tpCo`5)lI`(!QX+NBm=o|`LjW^R*V`T-fWDI*4--B-5RO8GiJ{k+VW{m z(NX{K-jUo9<7mlf^XR%r=9(cx%w#>X;n0So`$bdnh~c7X`Dp)m)BCso?sl=JB?cbG zwqtE01~F^-P|Gw-(0{1^sfUN}8EHB3plAgr;cJ~Qb&gkv)s1J@iq%^#TDQcVa>Eag zHjJ)_6xL1I*2QuPhy7yCvWRWjr!e6C$L=3-h*?#m0WoXCka3R9Fl0{`mPZS##KJ0w zk&D{OMO*pE-;LfncJFxK#ZCST>_40S$rRb)5pVlm#Jfkl%|B`Dh~<`!>=1KZps*HM z{z@^qmZ96gh_z)B8N5!kLRj5JYxV2(vD%x?^S}2`_&?|rm)(7o#SpyV2GO>JOjRb@ z%BE~f&`W~4ZT{*v){Gta?Z&a1v-y$SZBynuVs^*SopV_*{ukGCDO*m|S_Wf-{}FB) zI-!nUSUc7>w(Tqvaex4ChFAUM78075++^*TcWisa(J*CR4-J02h3LiEUj^@hPxwe< z-K`e-T!WiwuAx4-iEgf9KiKH5fr}3-X{hs$wvwCTTgc0otf4B6$#bkishZ*V_4KLx^#3+o2p)qxvzHHcqB z2u(k7gbj({EyZ#d(!+*S!im=@f~Fg7W@FeGqz}8o#;|@L4Y}&DA*>IReIbWySrl-Y z$}EX9uSW$yRuu+KVSR@x#u9B5A=pG6F}H(a)vkFUAMV;5HmAHRBX3gfJZ$O&SIz;P3K>2=0JJ&Cpx2#FGb9aR;Fv+6qrYLRwxLe;#73Q?`^eeqNT@J!E*; z;Uf5ULl%y?O$*Fv)T2;enqLYD3El;9vG_>F(gB-hT7?&(iGUv%HZ?F{Yr%;=kOWnJ zslO@U?Rtp!-aJs5B%c-u)#{sww<35mWaQRPm_3LZ>OpG6eWv+z^JvA`s>$-T2W5xb z4ghr+l$xMs?DGjhpDRrUT8PnSP~Wtqm8G^EXrl^w$>mvbC04v5TD)2;UOib{ z`$zkO!zJVN``qui@!e;Gll5CKe1Ee3fuFb|_6LXbki_)1BWZ`y#Qe46p`o-XebcAb zoMG*7R>WEy(HGA(z#I4ieAk&IY62d5aR^b=w&|j66Nb*iEBSKH43GE4b@L6|i)#C? zKFg#sJ@gmB=i$peVsEk2&#i2#p(eC+Qx!X*V=-?sK=DLYL$ebC@#*Fw_MDZ)yuAj> zKVVo)^(!!4liy-t&s$hbGaOKIzKVts&es^49nAUq+~#cNLz@=LKg{N!GfJA89!#`N*V)yl?_;Z-I{#?R!uRsCft?L@ohh3E*7~K*cJz zz@07%%Lj1?>i>6vKMXR1Yyx@kQiz$vAvlAu%IC#N-G$`8eWH@f!DvFlxlc`Tz?p%2B6*;Z><5oXzFvq zqs@-IJM-=lfVXAVd};h`=UoEqCk#0+ANtYktN$QT#F8-@H*VbMy!^9q2<4DlI-$TQ zU{np^73jY;%5l^@IHv|Q)lLGRJC|Ye_kMcm?We&=`jU2iNgD@j&^P%ox;V+;>+Xa2 zMTo-|&cL9;C?+GAY=;D5N1T|)Z!KxTi3fKm(SG4oyhdjs%L4uph%fRZ38h{?=J z%9r=es7wy6CX7>pVFs4mp*rNNzqwgC^7@xx+=JA=TXTSAeVSf1q@B(#9?_4MOlDVp zMrjR>LoGw@q2TnIjpOa-Yp2%SHe7b9;zY&BveCw={F?vK9=ZGP{;+3g>vY{MXYJ?t zsk*Jh_ER}0az+Y9g{i#Nk?(9Dx*brq;q+l zXyuqUl3y>{>W^xn=~4br>!FspG?*IvTLx<+V~MC=GNk>~nl+S0d{mM-&i~2ICMn$; z48rD9@cY55AVmulYZb0;ktyd!7!KJnhH!P531*~Lon^K+u z_265^6%FUz8iJq26*Tw)>LF}^c~g|9a#`6!Q}+a`papDjAe)KhoukP>^bcV~0Tp%OVMrtvzaZ|6 z6nX-%=Mv&4z~>s04C<$RiCMHajc8ozj85E+cI4=#w||K+1&p=vOMRu2r~n%Y3XzOj zqfblmctMyT>KXzl)A2APRW}h3IDmR5R=tPGKVpLVp$r_Lf)>Q(B$J_x2Bh_Z8()S4 za06Oy65%bVS&Ovx26RphL;t%q8^Z%Tk9CeL7qhD()tkiZTZV4^1jrI%dmldb@W^^G zr#e#O7IPZGjMh@w1*2xzLKe$%MyhTW&70=HM{FF@gMfzUFq7k=$w9)zWR%(|nw_zN za_}uf3Qpis%XNUDt$;wM+2xICbz}R-{No2M=+6#B^6viFydCXr;@(>OvAIqT2PucU zUm%o_B@vtT*YI#R0-6tkuu8qGWuRF0`3`Epo>YPClr}1rYByOTycURyC2%OG2(HCe zEP#PGwso{Xs01uVp#ymW{VpOF4E$dTHjXMvQvtnTSwXaL(rhVUU+_TV1|m~3^hg5# z5&I_xj8Y1L|A~7}HHtY!kS+<{Cmvu3)=gMv#AZrM4&EpJBU%2n$`nh0L&OY)qmcB0 zg&N-pDwDPBqgCimrjHM?HKJ>MnTuL00HN>`tVO^#nMJ6k6<~D)7$n4rwuJBuB!KxT zAfOz(ks#LRunkfyu`rZPOt?g4;1ke*#o_yhplG4D=<7bHRwUCuKtma@OV;Sk845O5 zC}!1+nZ>M)h>d9t_0t)7(Tq|tqjaP;T2?2P)s2NF%WgW`5!t>|Y}q-Lac|UoZ^V4B z;^hJ75I^4VdQW6+%h^2_LXjO0ig$V25}c|SHk1QEb#_t6jnoUH&^v|%&x5D_G6 z)!6c}f^h(<+)wIr75RI*@1T8&^t8jPP|BD*CYai_Z-^n{&u=dlx+TF5VeG*B)O zW2GJxnID0hKQYkti?Z^PB2GpPZj%EXE+fnU4wnTu9ET=Wm;;l`3TBHo^~_=IM(S`nY}inh zvefMqHS31YNR&#lU2j)ni&yILQsH^Z6Aw|lK-N8`e?_lIOP=|!yFYwe`G^?n{5F#F21|p?q#~+8SBv;=5(d@}nm*0J6<~PH#_p1v&X)AGm$$li1md^2Rsd*GjKc!(2(nu+bk1$1GTk@L`U_okH$#4)efi5c0Q0Y#=2{}3J;Gj&EKZk;W z70Fb&z%%uAPz`?`BZ#u^^Mlmc^4~9?SbpI<6IGGB?hzYz9+ZJKnDH0T0E|K+0^N|8 zBNaLP)r}z8&;phLfM3>fyOY0_=)<<*hKfw=sN7Bc)U08MD zo_}f(Z`u*r**1Ao+w*iJ=l&tHm~(&3UW_pk&Y{imA7>a|bIgh%5>5b#%+{gMk%2=4 zPi=w?@Asd$e>6ud1OX`)ZoFvPIGyj5f8OCt@#*5plGT&-JE!vRorB+HqVLd5wLK7d z@S*5~d&CF#Ot$$Wf!=6fzZlpbIq-vP5DZ7VHPEn8vrR*;?!eRi&Y#An@k43*4Z6J?VnDbR z2z=cQTM_RGssg;>BnO0O?8BEQ|N6q^C*Pbs_`|q%iiA&q@1zJtB6oK|1%NIljM?+U zk4X&xq1a^&PmWrShd`5{C;)cQ>nlexhq0$NNdYC+;c}cP2NUA0-U_)Od?{uBju8W# z0CouV1qZT{ctN5#jKuN?^nl+T(LxMv)KMuqD#w~4j(Q07JG0^RhS5Q>Y~#3JEW0h% zurb=OD=1+B;==hl$m~^k0S3SB<%&Uv! zt{dAsp83AxcaF37iVdw7_K6MmMCx}&vhRg(V2th6ri$&wmG>z8&7&fHBehK^h3^J! z79XiLJt~q#wYpSAE6G+u?O@TdFkTqtXDe;EY9Pd&2SZGa!Wa_iXNI3Oq72W8->X$2n>{ACHPc#n5tau+7w!32p` z-T>8xxd$=9m^OkB$`R=JorLExc@~p*G5H7+v}Od<4+Z4e!Y7#g3nn`-DZ``$6ZBt` zWB@Y3c}Q?`ROvNV%asj>aK}^GoUMX0j_~sooIbmo&gYhVR&V1x^tEy)mp!^>j)K!T z?1zHWg&;Xa9`ecOy?v2=1LEF?=dp6Gi7w}y*a1%10Ztbfa*FKODWAXly@=N*es|YA zR?cmstz7n4?Ku1T`Z)@+XUpdCbgeXpTZcQd;B;0#Mef>8E+f1A^H@B$I*qFzg?on6 zT#=bGj%LnNaQdvk%9YQR>N(?Z6YkM6&S$b*mHa;NRk(`eSC4clhn(o}cZcA|y#zND zN!n0>BiLK9&oA_J3rJ5Aiz}7HCrjy=g`cy- zpOFhzKja0R3Xtu1pfau*$+V>lH-N~2D`>JxT(1DT;w1cym~G_ukmQ9!4BvF@j1Ljt zOCtXo*rE>(jz4+d-Oa)*G=;@Y1t%Z@nMu>vScYcjjTCMC7s^8ZoBoZeoTMuMl`4e) zz@|!e&<8a?GXBsQF%^hZVXS!BuyCsXM1REPj%?mDS?pJ89U@gQ$8vPWzp{>hW4DRy zHo%^Yj-i~WAzw7)57$o`$`5M)s%@j`HP=Aq(rf-Ypt*~stFBQb-J_?hc~1p?X!?9^ zuZhyzAz*{f_=Gh8iGjUhj~J__*y>nX`r|Dai9u(`5y}$|TVoz|E{|(m%bY7pYh1~k ITZ#Gq18GEK-T(jq literal 4755 zcmb_fZEREL6@KrR?Q37>%W+6>LQP|!4&-YDLJ;j5lF+53fi|oi?hniO-UP$79qzq` zCJR+2!4_Cm(0;VEgf-n-H7yy-v}#RNq>^sZD(#P>q>6jT)Tyf}{Z&JdR;7P-&b_`# z$jaJEdnKRuz32Tt=Q+=N{5QATNuXS+ycqjXAmm^8VHGalSRfQ3FA|Z2h)9XFOob?; zrb9H;jLgK@5F6)0T-+A2QAA`V>PUgL$Z{=myWm5#-J{ozy*%#VkR5S8#KTII$ji>S zE98m`AtCM#xhX;ui9T}w@~T!#D!B}it*1^ z{(b#YqD_vR@IP?8v97_NJ$*HM<=vq33*5Im$mncWOx1L@Jr>n;$G(0omP|zC!N$Xy z*01iYuNNcgVKtiUm+B&NY#^10s&&z1ydHl25iNE|mTKckQK~=Qs2A?r-_hOL8E)O* z8Q$I6(=k}B9<7ZwHaBjQq=u&EmK{xvTU%Os#jV>~qT8CAwl_6zZ`z?3teCa8<6w}| zZI4Ouq%r_U6UHph_3UeZxcT<$UtWDViZ`~30V?=dcpk)yL?Qqgf^a!SLX1d*WJN~eBwGda17PeT zCppZoUYlq;#fEsXP_%;{`y7%J`tzfE9U^~<3%T;MEG;WKVU{4e@^d)J4J!$tD}cGS zURrcRyKtZ&Sfo4Q;ti$`qj}I62S>0l@LTu+2n75P2^$?KhR6}@TU9RSzbP_6f>d{q z)&)5kjmTjz5?EPx#S$?s|M4cJv#KQbs<@|r@!@(Unbhj*!1;+NAgKOCQaP&jN1{@_ zdNd}>YCRgdHkQz&J_W##Oc;uA)Bs;y|A6iYhmGaKgP!FZ)mh4N*hMV^F-$U*{tV#Tc5k&(OPc*yL zsqt1zWHgu6K7u=~Dk3X#R)JaNPg2E6rQDm(L5Lmjy3M+W6HJ59#*qQ@ipsI<6wl@l}PVgkXVm6|FRS`>u z=v90~t=t=!i8@I?OAi3|V!eWyJSk+`?8v3;8^6px_kt0h-KiqbA%GxKZ;ZZ`y>jN} zZ%^yCW1# z*cf*p$m*;ynC(w0nqC-9$};c=fHa(l#3kJh4Kb!3g@ADME6L+AQBrhU)TA04LOv4H zIpj8ZkUw@pO4RntDHX%#xIsf<6?sT!WH}zB6y*EVeIPy^Jq@>+$c|3lyfT@6|L04W zzAd-jzSn*7sC&-s`1OY{_UD-P(?ZE~`&56)sg5m6GcM4y7*RgkZAI;rZmK1q`j z$8`q6PC>vbh((=^C3=%O-|Bv0EL#zn zE2y9n`qcM943p1X#Ul@07fQ}`pXpBfYUhNyj8Kwez5Isj-u2h3>M~UuKJ(xI2LCFb zuGuwpbk_gqb$|71yWi-1wKHA!@U@Mzflkn6HaBOgH{Ig7%CbA0V_nIe65?G4Q!|y- zV^61jjdwW4TXM%+P*yOq=Z>GO-!$Gk=ifHcJ||S+-h0pNO_#TPB<%d^dSUr(Lc`{u z?K;y1QzzF?deRl!=7jATe_*72UZ|Q8s=m%~FlbRllKJ&+YPX+!;OE;)m=A*YwYix; zyE$mV)?I~4K!ZA^|DUy~_648}Zij0Ea3lkt-M52Xybs1(wT4Z8W59A2n zJmQ<^d59XKC)lMJXGBi4;TjN~thEE?1SetjW!4&Etw@ehQQC++_F1+EBM|Bpk!MdK zlihgdz3kaB6Oi3X9cTtiG4lM0q~=Gb$KY)rl*BhFr4c%HyPAqd!7>m;Cc2o2C@Qts z#i*JRQaaE>W zflR3{U9~msX-W%CIhN%&WV{vg-iq2Isa^aM>C_%Za)7`_S_3M&J5oeJ!2YM zX#%xnWjkk#VVcPtxZXCD=#CNdmmKo2M$*0nuEwi2n z)53%IwySRonQ-l7-lcXjIuEf8sU5J{T`aa_;%8v*dzzTS{OEh~7j5wu+=3*#v z5VDOW9J2s+`AFx(X-vwTC>Q6we zwu4x0;Bq9Dii}WXJ`wXmU`7ax9h|ZQiOdNP1EwtlSA6U#N7z=Fusv5*OMb_BbmPcP=MEfY+LxK*&vLmOa@VcIH(2%W~Jso0V7?|orqsm zi)blz(7OuOraFo;ncZ_V-65yT;d*-Dk7UbN+}FA@)`y0t)QuuW)!%Ff=`jM83odwf&r?thmuKIK_n@7@;8aG zt(hRXQHuf%q@D+Hirl7Y&Uf44<2K}W^4x|C?PH}cJ(?rXob(vgRQa{QyXzKEbF0k3 zm0f7d5vazijcPJ#RM!I6wCS1!?8uekAk|QhY2)p$4(8Q7S24!mk`?1H393mESFOmE zah&r)`yB$6d00GR(5u_SVKEsEhYcU!AbHs zaQ@7hhD3o>W$tUYK7BK!GRJaM$v;@%Cv4XY+m&+?r*CAoe}+(|a&&q)HZ5^!VFFt?SA3dam_B PW_qK$bq6!O1Ka-rw*6eB diff --git a/skills/mem0-integration/__pycache__/openclaw_interceptor.cpython-312.pyc b/skills/mem0-integration/__pycache__/openclaw_interceptor.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1d2d00e0525d93ba763dcf1d84561268110b2c9a GIT binary patch literal 3320 zcmb_eZ){sv6~FiWv!CO{`LoWBldN9TZu5fUt!>s2b%UfpI#z{gMVsmmyuJF}I8JOo z_r7ORx9-%f6$=B@l@A0MMfot0M?o71iGujBe)qiJwYLWlwD+EQE&F>3p?6rvYtXmb<{yA9A`PXGhBYpS zQ`qKQinBSN;t4sKP01XZ zM4EUOX_99n6@Ybti|(>io8}`y%}?5=1x-FHq&lREC5eN&J{KE`J*^w2E&qerpE~hqlIpsd9M)l78rCJ} zbvkDh)ig;ObJ<+ZNajiYkwms&k_=VNtX_y?tBvWDbS_H@X4>7S&6@EYgYF_C2;z!Z zOwXbeuW>}scp_>75vC~{Sd}TQvu)+uQ!dD43Y?n`hcbXSS+ zaJD-ZbRPbj-L^XoSCC>4Fj7WwZtBahTOTQ?M{`62Q7}ku+F&MPx1Ri;PoDIGq_A+f z8yYT_Ex!Vqtf?p`Vt3j0us4HcgCPSsjn>0`3&OX8o6td)c&tQd+47}zFn!##BudN@ zEu`7;ye5Kh2L78*zzLp07$RIjv)*V6H*gursKU**u6D;+etVgAioeSoUK5&g1e5u- z+pKrLo&tCUSNMwXQ}Lqf>kOdkB*@?47x}M=B042jM7piT#ulimh#npe!?o>0-|Qbm zrj26eXeXjOPDy6ab2=OHq}yDRD$;A{0_WMm=m_lNl=OmBcKJM>N5}aiXdeHJpT}|E zl;tZKgeqCh@@ZsRE#*whuVx^lZ50TFs<}8%nP1w;2+Oh(BTYPAef{UxfB7RoSlmy2 zFktZ+Vp0|fD^#RJnE}u#22p8x#)?ksG_RTpU`MA}Vkpp8id4@RO-ug#ahBeIE=$Vk znGB(pNGR25+(%iwoK3Vdsksx&?^&`iF_`z5(Yx?sNXZ6MdtzacF{ zJ?AQ~RMvaK4NjIL>-P>U&YhoIidT=U-MepLay@+C;>q(Tm&U5)weY7G4sHxQR38{! z9T>e-x}2#!KXYXuTMuU!4*oe5-U#oghaX=JKYnRuEj(H4oMbkAC^C!Rzv)Mvy*Cgi zM>c}p=MH`SP^~XfRjY?IPRbKeet9%9v5jBp6hXhTO$5zKln2=g+Bcmih5;9Bb*YAtHB1$5+Et|m z@ZvR=<7eQ%Irhm=#(ylzgv!=)T2UspAY^NlaTNh@;_mdLD8tBX;%dtp)kGL}5Ecc5 zH4)_^?6!sf55k;WW!YszhxUL}iUMgCp|RN2U;X0RxmVx)&Ffd+{Pp!WFO=Av=5Xwz z_@ehPV>Z6pi%`iNfT@~BBQRiz8aZ0Z&@h|S3&cau08`w@$cG_B`#^UmbmXS#6ZhZk zJ#S9g&P==lQHf_>==+hlQ-awRnEZMISWhKNW>;jDaK+d8Y92WZEcll-L;lA_z{UG@0IR zTj8|=bA*a1v|_tlEA9Q8xR%FYHO+f_HpCf?f2PW+la-SH~60BOywwn=bk9J zT%LXkW(>pMV5wtg%|=&$y=&KM*RCZ}eQvF5q~11C6Gm!nBex88zL%nd_y_oY;et4b zEx+@Y85yg+5un(wQw}ARWtwB@|V2#*(tkm0Oc# zsLqG%@=6E6r~PO+scE0)0j7J95q3rP{^&zYYUc4R|9%GsPtZ{qU>}mk3qSyO82=q5 z-$5OJVZXpS;$QZ^=zlr%V(3*|>-oYJH2GIF_%}Ja5U$J7RXO_I(KUJJY3V(w5AXdK xVq`xGJ+RjC;OVx8gz;lb=>`J!*5@`EYxpsqz}3S{H-X>oarkBPrro#q_g~+Q2;Bex literal 0 HcmV?d00001 diff --git a/skills/mem0-integration/mem0_client.py b/skills/mem0-integration/mem0_client.py index ce1e307..9a356dc 100644 --- a/skills/mem0-integration/mem0_client.py +++ b/skills/mem0-integration/mem0_client.py @@ -1,31 +1,177 @@ #!/usr/bin/env python3 """ -mem0 Client for OpenClaw (v1.0 兼容) +mem0 Client for OpenClaw - 生产级纯异步架构 +Pre-Hook 检索注入 + Post-Hook 异步写入 """ import os +import asyncio import logging -from typing import List, Dict, Optional +import time +from typing import List, Dict, Optional, Any +from collections import deque +from datetime import datetime -# 设置环境变量(在导入 mem0 之前) +# 设置环境变量 os.environ['OPENAI_API_BASE'] = 'https://dashscope.aliyuncs.com/compatible-mode/v1' -os.environ['OPENAI_API_KEY'] = 'sk-c1715ee0479841399fd359c574647648' +os.environ['OPENAI_API_KEY'] = os.getenv('MEM0_DASHSCOPE_API_KEY', 'sk-c1715ee0479841399fd359c574647648') try: from mem0 import Memory from mem0.configs.base import MemoryConfig, VectorStoreConfig, LlmConfig except ImportError as e: - print(f"⚠️ mem0ai 导入失败:{e}") + print(f"⚠️ mem0ai 导入失败:{e}") Memory = None logger = logging.getLogger(__name__) + +class AsyncMemoryQueue: + """纯异步记忆写入队列""" + + def __init__(self, max_size: int = 100): + self.queue = deque(maxlen=max_size) + self.lock = asyncio.Lock() + self.running = False + self._worker_task = None + self.callback = None + self.batch_size = 10 + self.flush_interval = 60 + + def add(self, item: Dict[str, Any]): + """添加任务到队列""" + try: + if len(self.queue) < self.queue.maxlen: + self.queue.append({ + 'messages': item['messages'], + 'user_id': item['user_id'], + 'agent_id': item['agent_id'], + 'timestamp': datetime.now().isoformat() + }) + else: + logger.warning("异步队列已满,丢弃旧任务") + except Exception as e: + logger.error(f"队列添加失败:{e}") + + async def get_batch(self, batch_size: int) -> List[Dict]: + """获取批量任务""" + async with self.lock: + batch = [] + while len(batch) < batch_size and self.queue: + batch.append(self.queue.popleft()) + return batch + + def start_worker(self, callback, batch_size: int, flush_interval: int): + """启动异步后台任务""" + self.running = True + self.callback = callback + self.batch_size = batch_size + self.flush_interval = flush_interval + self._worker_task = asyncio.create_task(self._worker_loop()) + logger.info(f"✅ 异步工作线程已启动 (batch_size={batch_size}, interval={flush_interval}s)") + + async def _worker_loop(self): + """异步工作循环""" + last_flush = time.time() + + while self.running: + try: + if self.queue: + batch = await self.get_batch(self.batch_size) + if batch: + asyncio.create_task(self._process_batch(batch)) + + if time.time() - last_flush > self.flush_interval: + if self.queue: + batch = await self.get_batch(self.batch_size) + if batch: + asyncio.create_task(self._process_batch(batch)) + last_flush = time.time() + + await asyncio.sleep(1) + + except asyncio.CancelledError: + logger.info("异步工作线程已取消") + break + except Exception as e: + logger.error(f"异步工作线程错误:{e}") + await asyncio.sleep(5) + + async def _process_batch(self, batch: List[Dict]): + """处理批量任务""" + try: + logger.debug(f"开始处理批量任务:{len(batch)} 条") + for item in batch: + await self.callback(item) + logger.debug(f"批量任务处理完成") + except Exception as e: + logger.error(f"批量处理失败:{e}") + + async def stop(self): + """优雅关闭""" + self.running = False + if self._worker_task: + self._worker_task.cancel() + try: + await self._worker_task + except asyncio.CancelledError: + pass + logger.info("异步工作线程已关闭") + + class Mem0Client: - def __init__(self): + """生产级 mem0 客户端""" + + def __init__(self, config: Dict = None): + self.config = config or self._load_default_config() self.local_memory = None - self.init_memory() + self.async_queue = None + self.cache = {} + self._init_memory() + self._start_async_worker() - def init_memory(self): + def _load_default_config(self) -> Dict: + """加载默认配置""" + return { + "qdrant": { + "host": os.getenv('MEM0_QDRANT_HOST', '100.115.94.1'), + "port": int(os.getenv('MEM0_QDRANT_PORT', '6333')), + "collection_name": "mem0_shared" + }, + "llm": { + "provider": "openai", + "config": {"model": os.getenv('MEM0_LLM_MODEL', 'qwen-plus')} + }, + "retrieval": { + "enabled": True, + "top_k": 5, + "min_confidence": 0.7, + "timeout_ms": 2000 + }, + "async_write": { + "enabled": True, + "queue_size": 100, + "batch_size": 10, + "flush_interval": 60, + "timeout_ms": 5000 + }, + "cache": { + "enabled": True, + "ttl": 300, + "max_size": 1000 + }, + "fallback": { + "enabled": True, + "log_level": "WARNING", + "retry_attempts": 2 + }, + "metadata": { + "default_user_id": "default", + "default_agent_id": "general" + } + } + + def _init_memory(self): """初始化 mem0""" if Memory is None: logger.warning("mem0ai 未安装") @@ -36,69 +182,217 @@ class Mem0Client: vector_store=VectorStoreConfig( provider="qdrant", config={ - "host": os.getenv('MEM0_QDRANT_HOST', 'localhost'), - "port": int(os.getenv('MEM0_QDRANT_PORT', '6333')), - "collection_name": "mem0_local", + "host": self.config['qdrant']['host'], + "port": self.config['qdrant']['port'], + "collection_name": self.config['qdrant']['collection_name'], "on_disk": True } ), llm=LlmConfig( provider="openai", - config={"model": "qwen-plus"} + config=self.config['llm']['config'] ) ) - self.local_memory = Memory(config=config) - logger.info("✅ 本地记忆初始化成功") + logger.info("✅ mem0 初始化成功") except Exception as e: - logger.error(f"❌ 初始化失败:{e}") + logger.error(f"❌ mem0 初始化失败:{e}") self.local_memory = None - def add(self, messages: List[Dict], user_id: str) -> Optional[Dict]: - """添加记忆""" - if self.local_memory is None: - return {"error": "mem0 not initialized"} + def _start_async_worker(self): + """启动异步写入工作线程""" + if not self.config['async_write']['enabled']: + return try: - result = self.local_memory.add(messages, user_id=user_id) - return {"success": True} - except Exception as e: - return {"error": str(e)} + loop = asyncio.get_event_loop() + self.async_queue = AsyncMemoryQueue( + max_size=self.config['async_write']['queue_size'] + ) + self.async_queue.start_worker( + callback=self._async_write_memory, + batch_size=self.config['async_write']['batch_size'], + flush_interval=self.config['async_write']['flush_interval'] + ) + except RuntimeError: + logger.debug("当前无事件循环,异步队列将在首次使用时初始化") - def search(self, query: str, user_id: str, limit: int = 5) -> List[Dict]: - """搜索记忆""" - if self.local_memory is None: + async def pre_hook_search(self, query: str, user_id: str = None, agent_id: str = None, top_k: int = None) -> List[Dict]: + """Pre-Hook: 对话前智能检索""" + if not self.config['retrieval']['enabled'] or self.local_memory is None: return [] + cache_key = f"{user_id}:{agent_id}:{query}" + if self.config['cache']['enabled'] and cache_key in self.cache: + cached = self.cache[cache_key] + if time.time() - cached['time'] < self.config['cache']['ttl']: + logger.debug(f"Cache hit: {cache_key}") + return cached['results'] + + timeout_ms = self.config['retrieval']['timeout_ms'] + try: - return self.local_memory.search(query, user_id=user_id, limit=limit) - except Exception: + memories = await asyncio.wait_for( + self._execute_search(query, user_id, agent_id, top_k or self.config['retrieval']['top_k']), + timeout=timeout_ms / 1000 + ) + + if self.config['cache']['enabled'] and memories: + self.cache[cache_key] = {'results': memories, 'time': time.time()} + self._cleanup_cache() + + logger.info(f"Pre-Hook 检索完成:{len(memories)} 条记忆") + return memories + + except asyncio.TimeoutError: + logger.warning(f"Pre-Hook 检索超时 ({timeout_ms}ms)") + return [] + except Exception as e: + logger.warning(f"Pre-Hook 检索失败:{e}") return [] - def get_all(self, user_id: str) -> List[Dict]: - """获取所有记忆""" + async def _execute_search(self, query: str, user_id: str, agent_id: str, top_k: int) -> List[Dict]: + """执行检索(使用 asyncio.to_thread 隔离阻塞)""" if self.local_memory is None: return [] - try: - return self.local_memory.get_all(user_id=user_id) - except Exception: - return [] + user_memories = [] + if user_id: + try: + user_memories = await asyncio.to_thread( + self.local_memory.search, query, user_id=user_id, limit=top_k + ) + except Exception as e: + logger.debug(f"用户记忆检索失败:{e}") + + agent_memories = [] + if agent_id and agent_id != 'general': + try: + agent_memories = await asyncio.to_thread( + self.local_memory.search, + query, + user_id=f"{user_id}:{agent_id}" if user_id else agent_id, + limit=top_k + ) + except Exception as e: + logger.debug(f"业务记忆检索失败:{e}") + + all_memories = {} + for mem in user_memories + agent_memories: + mem_id = mem.get('id') if isinstance(mem, dict) else None + if mem_id and mem_id not in all_memories: + all_memories[mem_id] = mem + + min_confidence = self.config['retrieval']['min_confidence'] + filtered = [m for m in all_memories.values() if m.get('score', 1.0) >= min_confidence] + filtered.sort(key=lambda x: x.get('score', 0), reverse=True) + + return filtered[:top_k] + + def format_memories_for_prompt(self, memories: List[Dict]) -> str: + """格式化记忆为 Prompt 片段""" + if not memories: + return "" + + prompt = "\n\n=== 相关记忆 ===\n" + for i, mem in enumerate(memories, 1): + memory_text = mem.get('memory', '') if isinstance(mem, dict) else str(mem) + created_at = mem.get('created_at', '') if isinstance(mem, dict) else '' + prompt += f"{i}. {memory_text}" + if created_at: + prompt += f" (记录于:{created_at})" + prompt += "\n" + prompt += "===============\n" + + return prompt + + def post_hook_add(self, user_message: str, assistant_message: str, user_id: str = None, agent_id: str = None): + """Post-Hook: 对话后异步写入""" + if not self.config['async_write']['enabled']: + return + + if not user_id: + user_id = self.config['metadata']['default_user_id'] + if not agent_id: + agent_id = self.config['metadata']['default_agent_id'] + + messages = [ + {"role": "user", "content": user_message}, + {"role": "assistant", "content": assistant_message} + ] + + if self.async_queue: + self.async_queue.add({ + 'messages': messages, + 'user_id': user_id, + 'agent_id': agent_id + }) + logger.debug(f"Post-Hook 已提交:user={user_id}, agent={agent_id}") + else: + logger.warning("异步队列未初始化") - def delete(self, memory_id: str, user_id: str) -> bool: - """删除记忆""" + async def _async_write_memory(self, item: Dict): + """异步写入记忆""" if self.local_memory is None: - return False + return + + timeout_ms = self.config['async_write']['timeout_ms'] try: - self.local_memory.delete(memory_id, user_id=user_id) - return True - except Exception: - return False + await asyncio.wait_for(self._execute_write(item), timeout=timeout_ms / 1000) + logger.debug(f"异步写入成功:user={item['user_id']}, agent={item['agent_id']}") + except asyncio.TimeoutError: + logger.warning(f"异步写入超时 ({timeout_ms}ms)") + except Exception as e: + logger.warning(f"异步写入失败:{e}") + + async def _execute_write(self, item: Dict): + """执行写入(使用 asyncio.to_thread 隔离阻塞)""" + if self.local_memory is None: + return + + full_user_id = f"{item['user_id']}:{item['agent_id']}" + + await asyncio.to_thread( + self.local_memory.add, + messages=item['messages'], + user_id=full_user_id, + agent_id=item['agent_id'] + ) + + def _cleanup_cache(self): + """清理过期缓存""" + if not self.config['cache']['enabled']: + return + + current_time = time.time() + ttl = self.config['cache']['ttl'] + + expired_keys = [k for k, v in self.cache.items() if current_time - v['time'] > ttl] + for key in expired_keys: + del self.cache[key] + + if len(self.cache) > self.config['cache']['max_size']: + oldest_keys = sorted(self.cache.keys(), key=lambda k: self.cache[k]['time'])[:len(self.cache) - self.config['cache']['max_size']] + for key in oldest_keys: + del self.cache[key] def get_status(self) -> Dict: """获取状态""" return { "initialized": self.local_memory is not None, - "qdrant": f"{os.getenv('MEM0_QDRANT_HOST', 'localhost')}:{os.getenv('MEM0_QDRANT_PORT', '6333')}" + "async_queue_enabled": self.config['async_write']['enabled'], + "queue_size": len(self.async_queue.queue) if self.async_queue else 0, + "cache_size": len(self.cache), + "qdrant": f"{self.config['qdrant']['host']}:{self.config['qdrant']['port']}" } + + async def shutdown(self): + """优雅关闭""" + if self.async_queue: + await self.async_queue.stop() + logger.info("mem0 Client 已关闭") + + +# 全局客户端实例 +mem0_client = Mem0Client() diff --git a/skills/mem0-integration/openclaw_interceptor.py b/skills/mem0-integration/openclaw_interceptor.py new file mode 100644 index 0000000..4645086 --- /dev/null +++ b/skills/mem0-integration/openclaw_interceptor.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""OpenClaw 拦截器:Pre-Hook + Post-Hook""" + +import asyncio +import logging +import sys +sys.path.insert(0, '/root/.openclaw/workspace/skills/mem0-integration') +from mem0_client import mem0_client + +logger = logging.getLogger(__name__) + + +class ConversationInterceptor: + def __init__(self): + self.enabled = True + + async def pre_hook(self, query: str, context: dict) -> str: + if not self.enabled: + return None + try: + user_id = context.get('user_id', 'default') + agent_id = context.get('agent_id', 'general') + memories = await mem0_client.pre_hook_search(query=query, user_id=user_id, agent_id=agent_id) + if memories: + return mem0_client.format_memories_for_prompt(memories) + return None + except Exception as e: + logger.error(f"Pre-Hook 失败:{e}") + return None + + async def post_hook(self, user_message: str, assistant_message: str, context: dict): + if not self.enabled: + return + try: + user_id = context.get('user_id', 'default') + agent_id = context.get('agent_id', 'general') + await mem0_client.post_hook_add(user_message, assistant_message, user_id, agent_id) + logger.debug(f"Post-Hook: 已提交对话") + except Exception as e: + logger.error(f"Post-Hook 失败:{e}") + + +interceptor = ConversationInterceptor() + + +async def intercept_before_llm(query: str, context: dict): + return await interceptor.pre_hook(query, context) + + +async def intercept_after_response(user_msg: str, assistant_msg: str, context: dict): + await interceptor.post_hook(user_msg, assistant_msg, context) diff --git a/skills/mem0-integration/test_integration.py b/skills/mem0-integration/test_integration.py new file mode 100644 index 0000000..ffe70ff --- /dev/null +++ b/skills/mem0-integration/test_integration.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# /root/.openclaw/workspace/skills/mem0-integration/test_integration.py + +import asyncio +import logging +import sys +sys.path.insert(0, '/root/.openclaw/workspace/skills/mem0-integration') + +from mem0_client import mem0_client +from openclaw_interceptor import intercept_before_llm, intercept_after_response + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +async def mock_llm(system_prompt, user_message): + """模拟 LLM 调用""" + return f"这是模拟回复:{user_message[:20]}..." + +async def test_full_flow(): + """测试完整对话流程""" + print("=" * 60) + print("🧪 测试 mem0 集成架构") + print("=" * 60) + + context = { + 'user_id': '5237946060', + 'agent_id': 'general' + } + + # ========== 测试 1: Pre-Hook 检索 ========== + print("\n1️⃣ 测试 Pre-Hook 检索...") + query = "我平时喜欢用什么时区?" + memory_prompt = await intercept_before_llm(query, context) + + if memory_prompt: + print(f"✅ 检索到记忆:\n{memory_prompt}") + else: + print("⚠️ 未检索到记忆(正常,首次对话)") + + # ========== 测试 2: 完整对话流程 ========== + print("\n2️⃣ 测试完整对话流程...") + user_message = "我平时喜欢使用 UTC 时区,请用简体中文和我交流" + + print(f"用户:{user_message}") + response = await mock_llm("system", user_message) + print(f"助手:{response}") + + # ========== 测试 3: Post-Hook 异步写入 ========== + print("\n3️⃣ 测试 Post-Hook 异步写入...") + await intercept_after_response(user_message, response, context) + print(f"✅ 对话已提交到异步队列") + print(f" 队列大小:{len(mem0_client.async_queue.queue) if mem0_client.async_queue else 0}") + + # ========== 等待异步写入完成 ========== + print("\n4️⃣ 等待异步写入 (5 秒)...") + await asyncio.sleep(5) + + # ========== 测试 4: 验证记忆已存储 ========== + print("\n5️⃣ 验证记忆已存储...") + memories = await mem0_client.pre_hook_search("时区", **context) + print(f"✅ 检索到 {len(memories)} 条记忆") + for i, mem in enumerate(memories, 1): + print(f" {i}. {mem.get('memory', 'N/A')[:100]}") + + # ========== 状态报告 ========== + print("\n" + "=" * 60) + print("📊 系统状态:") + status = mem0_client.get_status() + for key, value in status.items(): + print(f" {key}: {value}") + print("=" * 60) + print("✅ 测试完成") + +if __name__ == '__main__': + asyncio.run(test_full_flow())