#!/usr/bin/env python3 """ Mem0 Collection 迁移脚本 从旧的多 Collection 架构迁移到单库融合架构 用途: - 将 mem0_v4_life 中的记忆迁移到 mem0_global_v4 - 自动添加 agent_id 元数据标签 使用前: 1. 确保 Qdrant 运行正常 2. 备份现有数据 3. 修改配置中的 API Key """ import os import sys from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue # ========== 配置 ========== QDRANT_HOST = "localhost" QDRANT_PORT = 6333 DASHSCOPE_API_KEY = os.getenv('MEM0_DASHSCOPE_API_KEY', os.getenv('DASHSCOPE_API_KEY', '')) # 旧 Collection 名称 (已清理完毕,保留结构供未来迁移使用) OLD_COLLECTIONS = { # "mem0_v4_life": "life", # removed -- agent decommissioned } # 新 Collection 名称 NEW_COLLECTION = "mem0_global_v4" # Embedding 维度 EMBEDDING_DIM = 1024 # text-embedding-v4 # ========== 初始化 ========== print("🔧 Mem0 Collection 迁移工具") print("=" * 50) client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) # ========== 函数定义 ========== def create_new_collection(): """创建新的全局 Collection""" print(f"\n📦 检查 Collection: {NEW_COLLECTION}") try: collections = client.get_collections().collections exists = any(c.name == NEW_COLLECTION for c in collections) if exists: print(f"✅ Collection 已存在") return True print(f"⏳ 创建 Collection...") client.create_collection( collection_name=NEW_COLLECTION, vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE), on_disk_payload=True ) print(f"✅ Collection 创建成功") return True except Exception as e: print(f"❌ 创建失败:{e}") return False def migrate_collection(old_collection: str, agent_id: str): """迁移单个 Collection""" print(f"\n🔄 迁移 Collection: {old_collection} → {NEW_COLLECTION} (agent_id={agent_id})") try: # 检查旧 Collection 是否存在 collections = client.get_collections().collections exists = any(c.name == old_collection for c in collections) if not exists: print(f"⚠️ 旧 Collection 不存在,跳过") return 0 # 获取旧 Collection 的所有点 print(f"⏳ 读取旧数据...") old_points = client.scroll( collection_name=old_collection, limit=10000, # 根据实际数据量调整 with_payload=True, with_vectors=True ) if not old_points or not old_points[0]: print(f"⚠️ 旧 Collection 为空") return 0 points = old_points[0] print(f"📊 找到 {len(points)} 条记忆") # 转换并迁移 print(f"⏳ 开始迁移...") migrated_count = 0 for point in points: try: # 提取原有 payload payload = point.payload or {} # 添加 agent_id 元数据 payload['agent_id'] = agent_id payload['migrated_from'] = old_collection payload['migrated_at'] = __import__('datetime').datetime.now().isoformat() # 创建新点 new_point = PointStruct( id=point.id, vector=point.vector, payload=payload ) # 上传到新 Collection client.upsert( collection_name=NEW_COLLECTION, points=[new_point] ) migrated_count += 1 except Exception as e: print(f"⚠️ 迁移单条失败:{e}") continue print(f"✅ 迁移完成:{migrated_count}/{len(points)} 条") return migrated_count except Exception as e: print(f"❌ 迁移失败:{e}") return 0 def verify_migration(): """验证迁移结果""" print(f"\n🔍 验证迁移结果...") try: # 统计新 Collection 的记忆数量 collections = client.get_collections().collections new_collection_exists = any(c.name == NEW_COLLECTION for c in collections) if not new_collection_exists: print(f"❌ 新 Collection 不存在") return False # 获取总数 count_result = client.count(collection_name=NEW_COLLECTION) total_count = count_result.count if count_result else 0 print(f"📊 新 Collection 总记忆数:{total_count}") # 按 agent_id 统计 for agent_id in set(OLD_COLLECTIONS.values()): filter_result = client.count( collection_name=NEW_COLLECTION, count_filter=Filter( must=[ FieldCondition( key="agent_id", match=MatchValue(value=agent_id) ) ] ) ) agent_count = filter_result.count if filter_result else 0 print(f" - agent_id='{agent_id}': {agent_count} 条") return True except Exception as e: print(f"❌ 验证失败:{e}") return False def cleanup_old_collections(): """清理旧 Collection(谨慎使用)""" print(f"\n⚠️ 清理旧 Collection...") print(f"⚠️ 警告:此操作不可逆!") response = input("确认删除旧 Collection? (y/N): ") if response.lower() != 'y': print("❌ 已取消") return for old_collection in OLD_COLLECTIONS.keys(): try: print(f"🗑️ 删除 {old_collection}...") client.delete_collection(collection_name=old_collection) print(f"✅ 已删除") except Exception as e: print(f"❌ 删除失败:{e}") # ========== 主流程 ========== def main(): print(f"\n📋 迁移计划:") print(f" 新 Collection: {NEW_COLLECTION}") print(f" 旧 Collection: {list(OLD_COLLECTIONS.keys())}") print(f" Embedding 维度:{EMBEDDING_DIM}") response = input("\n开始迁移?(y/N): ") if response.lower() != 'y': print("❌ 已取消") sys.exit(0) # 1. 创建新 Collection if not create_new_collection(): print("❌ 无法创建新 Collection,终止迁移") sys.exit(1) # 2. 迁移数据 total_migrated = 0 for old_collection, agent_id in OLD_COLLECTIONS.items(): migrated = migrate_collection(old_collection, agent_id) total_migrated += migrated print(f"\n📊 总迁移:{total_migrated} 条记忆") # 3. 验证 if not verify_migration(): print("❌ 验证失败,请手动检查") sys.exit(1) # 4. 清理(可选) cleanup_old_collections() print("\n✅ 迁移完成!") print("\n📝 下一步:") print(" 1. 重启 OpenClaw Gateway") print(" 2. 测试记忆检索功能") print(" 3. 监控日志确认无异常") if __name__ == "__main__": main()