diff --git a/skills/mem0-integration/migrate_to_single_collection.py b/skills/mem0-integration/migrate_to_single_collection.py new file mode 100644 index 0000000..46ed9b1 --- /dev/null +++ b/skills/mem0-integration/migrate_to_single_collection.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +Mem0 Collection 迁移脚本 +从旧的多 Collection 架构迁移到单库融合架构 + +用途: +- 将 mem0_v4_life 中的记忆迁移到 mem0_global_v4 +- 自动添加 agent_id 元数据标签 + +使用前: +1. 确保 Qdrant 运行正常 +2. 备份现有数据 +3. 修改配置中的 API Key +""" + +import os +import sys +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue + +# ========== 配置 ========== +QDRANT_HOST = "localhost" +QDRANT_PORT = 6333 +DASHSCOPE_API_KEY = "sk-4111c9dba5334510968f9ae72728944e" # 标准计费通道 + +# 旧 Collection 名称 +OLD_COLLECTIONS = { + "mem0_v4_life": "life", # 张大师的记忆 + # 可以添加更多旧 Collection +} + +# 新 Collection 名称 +NEW_COLLECTION = "mem0_global_v4" + +# Embedding 维度 +EMBEDDING_DIM = 1024 # text-embedding-v4 + +# ========== 初始化 ========== +print("🔧 Mem0 Collection 迁移工具") +print("=" * 50) + +client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) + +# ========== 函数定义 ========== + +def create_new_collection(): + """创建新的全局 Collection""" + print(f"\n📦 检查 Collection: {NEW_COLLECTION}") + + try: + collections = client.get_collections().collections + exists = any(c.name == NEW_COLLECTION for c in collections) + + if exists: + print(f"✅ Collection 已存在") + return True + + print(f"⏳ 创建 Collection...") + client.create_collection( + collection_name=NEW_COLLECTION, + vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE), + on_disk_payload=True + ) + print(f"✅ Collection 创建成功") + return True + + except Exception as e: + print(f"❌ 创建失败:{e}") + return False + + +def migrate_collection(old_collection: str, agent_id: str): + """迁移单个 Collection""" + print(f"\n🔄 迁移 Collection: {old_collection} → {NEW_COLLECTION} (agent_id={agent_id})") + + try: + # 检查旧 Collection 是否存在 + collections = client.get_collections().collections + exists = any(c.name == old_collection for c in collections) + + if not exists: + print(f"⚠️ 旧 Collection 不存在,跳过") + return 0 + + # 获取旧 Collection 的所有点 + print(f"⏳ 读取旧数据...") + old_points = client.scroll( + collection_name=old_collection, + limit=10000, # 根据实际数据量调整 + with_payload=True, + with_vectors=True + ) + + if not old_points or not old_points[0]: + print(f"⚠️ 旧 Collection 为空") + return 0 + + points = old_points[0] + print(f"📊 找到 {len(points)} 条记忆") + + # 转换并迁移 + print(f"⏳ 开始迁移...") + migrated_count = 0 + + for point in points: + try: + # 提取原有 payload + payload = point.payload or {} + + # 添加 agent_id 元数据 + payload['agent_id'] = agent_id + payload['migrated_from'] = old_collection + payload['migrated_at'] = __import__('datetime').datetime.now().isoformat() + + # 创建新点 + new_point = PointStruct( + id=point.id, + vector=point.vector, + payload=payload + ) + + # 上传到新 Collection + client.upsert( + collection_name=NEW_COLLECTION, + points=[new_point] + ) + + migrated_count += 1 + + except Exception as e: + print(f"⚠️ 迁移单条失败:{e}") + continue + + print(f"✅ 迁移完成:{migrated_count}/{len(points)} 条") + return migrated_count + + except Exception as e: + print(f"❌ 迁移失败:{e}") + return 0 + + +def verify_migration(): + """验证迁移结果""" + print(f"\n🔍 验证迁移结果...") + + try: + # 统计新 Collection 的记忆数量 + collections = client.get_collections().collections + new_collection_exists = any(c.name == NEW_COLLECTION for c in collections) + + if not new_collection_exists: + print(f"❌ 新 Collection 不存在") + return False + + # 获取总数 + count_result = client.count(collection_name=NEW_COLLECTION) + total_count = count_result.count if count_result else 0 + + print(f"📊 新 Collection 总记忆数:{total_count}") + + # 按 agent_id 统计 + for agent_id in set(OLD_COLLECTIONS.values()): + filter_result = client.count( + collection_name=NEW_COLLECTION, + count_filter=Filter( + must=[ + FieldCondition( + key="agent_id", + match=MatchValue(value=agent_id) + ) + ] + ) + ) + agent_count = filter_result.count if filter_result else 0 + print(f" - agent_id='{agent_id}': {agent_count} 条") + + return True + + except Exception as e: + print(f"❌ 验证失败:{e}") + return False + + +def cleanup_old_collections(): + """清理旧 Collection(谨慎使用)""" + print(f"\n⚠️ 清理旧 Collection...") + print(f"⚠️ 警告:此操作不可逆!") + + response = input("确认删除旧 Collection? (y/N): ") + if response.lower() != 'y': + print("❌ 已取消") + return + + for old_collection in OLD_COLLECTIONS.keys(): + try: + print(f"🗑️ 删除 {old_collection}...") + client.delete_collection(collection_name=old_collection) + print(f"✅ 已删除") + except Exception as e: + print(f"❌ 删除失败:{e}") + + +# ========== 主流程 ========== + +def main(): + print(f"\n📋 迁移计划:") + print(f" 新 Collection: {NEW_COLLECTION}") + print(f" 旧 Collection: {list(OLD_COLLECTIONS.keys())}") + print(f" Embedding 维度:{EMBEDDING_DIM}") + + response = input("\n开始迁移?(y/N): ") + if response.lower() != 'y': + print("❌ 已取消") + sys.exit(0) + + # 1. 创建新 Collection + if not create_new_collection(): + print("❌ 无法创建新 Collection,终止迁移") + sys.exit(1) + + # 2. 迁移数据 + total_migrated = 0 + for old_collection, agent_id in OLD_COLLECTIONS.items(): + migrated = migrate_collection(old_collection, agent_id) + total_migrated += migrated + + print(f"\n📊 总迁移:{total_migrated} 条记忆") + + # 3. 验证 + if not verify_migration(): + print("❌ 验证失败,请手动检查") + sys.exit(1) + + # 4. 清理(可选) + cleanup_old_collections() + + print("\n✅ 迁移完成!") + print("\n📝 下一步:") + print(" 1. 重启 OpenClaw Gateway") + print(" 2. 测试记忆检索功能") + print(" 3. 监控日志确认无异常") + + +if __name__ == "__main__": + main()