#!/usr/bin/env python3 """ 紧急恢复:手动写入"犇犇"部署记忆到 Qdrant """ import requests import json import time DASHSCOPE_API_KEY = "sk-4111c9dba5334510968f9ae72728944e" QDRANT_URL = "http://localhost:6333" COLLECTION = "mem0_v4_shared" def generate_embedding(text): """生成 1024 维向量""" resp = requests.post( "https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {DASHSCOPE_API_KEY}" }, json={"model": "text-embedding-v4", "input": [text]} ) data = resp.json() if "data" in data and len(data["data"]) > 0: return data["data"][0]["embedding"] raise Exception(f"Embedding 生成失败:{data}") def write_memory(id, memory_text, user_id, agent_id, metadata=None): """写入单条记忆""" embedding = generate_embedding(memory_text) print(f"✅ 生成向量:{len(embedding)} 维") payload = { "memory": memory_text, "user_id": user_id, "agent_id": agent_id, "source": "manual_recovery", "recovered_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } if metadata: payload.update(metadata) resp = requests.put( f"{QDRANT_URL}/collections/{COLLECTION}/points/{id}", headers={"Content-Type": "application/json"}, json={ "id": id, "vector": embedding, "payload": payload } ) result = resp.json() print(f"✅ 写入 Qdrant: {result}") return result def search_memories(query, limit=5): """检索记忆""" query_embedding = generate_embedding(query) resp = requests.post( f"{QDRANT_URL}/collections/{COLLECTION}/points/search", headers={"Content-Type": "application/json"}, json={ "vector": query_embedding, "limit": limit, "with_payload": True } ) result = resp.json() if "result" in result: return result["result"] return [] if __name__ == "__main__": print("=== 紧急恢复:犇犇部署记忆 ===\n") # 恢复的记忆内容 memories_to_restore = [ { "id": 1001, "text": "犇犇部署进度 - 企业微信插件挂载中,待完成", "user_id": "wang 院长", "agent_id": "main", "metadata": { "task_type": "deployment", "status": "pending", "component": "wecom_plugin", "priority": "high" } }, { "id": 1002, "text": "Collection 名称统一为 mem0_v4_shared - 已完成(2026-02-27)", "user_id": "wang 院长", "agent_id": "main", "metadata": { "task_type": "configuration", "status": "completed", "completed_at": "2026-02-27T11:30:00Z" } }, { "id": 1003, "text": "安全审计误报分析 - 已完成复核(2026-02-26)", "user_id": "wang 院长", "agent_id": "main", "metadata": { "task_type": "security", "status": "completed", "completed_at": "2026-02-26T21:05:00Z" } } ] # 写入记忆 for mem in memories_to_restore: print(f"\n--- 写入记忆 {mem['id']} ---") write_memory( id=mem["id"], memory_text=mem["text"], user_id=mem["user_id"], agent_id=mem["agent_id"], metadata=mem["metadata"] ) time.sleep(1) # 验证检索 print("\n=== 验证检索 ===") results = search_memories("犇犇 部署", limit=5) print(f"检索到 {len(results)} 条记忆:\n") for r in results: print(f" 分数:{r['score']:.4f}") print(f" 内容:{r['payload'].get('memory', 'N/A')}") print(f" 元数据:{r['payload'].get('metadata', {})}") print() print("✅ 记忆恢复完成")