You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

245 lines
7.3 KiB

#!/usr/bin/env python3
"""
Mem0 Collection 迁移脚本
从旧的多 Collection 架构迁移到单库融合架构
用途:
- 将 mem0_v4_life 中的记忆迁移到 mem0_global_v4
- 自动添加 agent_id 元数据标签
使用前:
1. 确保 Qdrant 运行正常
2. 备份现有数据
3. 修改配置中的 API Key
"""
import os
import sys
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
# ========== 配置 ==========
QDRANT_HOST = "localhost"
QDRANT_PORT = 6333
DASHSCOPE_API_KEY = "sk-4111c9dba5334510968f9ae72728944e" # 标准计费通道
# 旧 Collection 名称
OLD_COLLECTIONS = {
"mem0_v4_life": "life", # 张大师的记忆
# 可以添加更多旧 Collection
}
# 新 Collection 名称
NEW_COLLECTION = "mem0_global_v4"
# Embedding 维度
EMBEDDING_DIM = 1024 # text-embedding-v4
# ========== 初始化 ==========
print("🔧 Mem0 Collection 迁移工具")
print("=" * 50)
client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
# ========== 函数定义 ==========
def create_new_collection():
"""创建新的全局 Collection"""
print(f"\n📦 检查 Collection: {NEW_COLLECTION}")
try:
collections = client.get_collections().collections
exists = any(c.name == NEW_COLLECTION for c in collections)
if exists:
print(f"✅ Collection 已存在")
return True
print(f"⏳ 创建 Collection...")
client.create_collection(
collection_name=NEW_COLLECTION,
vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE),
on_disk_payload=True
)
print(f"✅ Collection 创建成功")
return True
except Exception as e:
print(f"❌ 创建失败:{e}")
return False
def migrate_collection(old_collection: str, agent_id: str):
"""迁移单个 Collection"""
print(f"\n🔄 迁移 Collection: {old_collection}{NEW_COLLECTION} (agent_id={agent_id})")
try:
# 检查旧 Collection 是否存在
collections = client.get_collections().collections
exists = any(c.name == old_collection for c in collections)
if not exists:
print(f" 旧 Collection 不存在,跳过")
return 0
# 获取旧 Collection 的所有点
print(f"⏳ 读取旧数据...")
old_points = client.scroll(
collection_name=old_collection,
limit=10000, # 根据实际数据量调整
with_payload=True,
with_vectors=True
)
if not old_points or not old_points[0]:
print(f" 旧 Collection 为空")
return 0
points = old_points[0]
print(f"📊 找到 {len(points)} 条记忆")
# 转换并迁移
print(f"⏳ 开始迁移...")
migrated_count = 0
for point in points:
try:
# 提取原有 payload
payload = point.payload or {}
# 添加 agent_id 元数据
payload['agent_id'] = agent_id
payload['migrated_from'] = old_collection
payload['migrated_at'] = __import__('datetime').datetime.now().isoformat()
# 创建新点
new_point = PointStruct(
id=point.id,
vector=point.vector,
payload=payload
)
# 上传到新 Collection
client.upsert(
collection_name=NEW_COLLECTION,
points=[new_point]
)
migrated_count += 1
except Exception as e:
print(f" 迁移单条失败:{e}")
continue
print(f"✅ 迁移完成:{migrated_count}/{len(points)}")
return migrated_count
except Exception as e:
print(f"❌ 迁移失败:{e}")
return 0
def verify_migration():
"""验证迁移结果"""
print(f"\n🔍 验证迁移结果...")
try:
# 统计新 Collection 的记忆数量
collections = client.get_collections().collections
new_collection_exists = any(c.name == NEW_COLLECTION for c in collections)
if not new_collection_exists:
print(f"❌ 新 Collection 不存在")
return False
# 获取总数
count_result = client.count(collection_name=NEW_COLLECTION)
total_count = count_result.count if count_result else 0
print(f"📊 新 Collection 总记忆数:{total_count}")
# 按 agent_id 统计
for agent_id in set(OLD_COLLECTIONS.values()):
filter_result = client.count(
collection_name=NEW_COLLECTION,
count_filter=Filter(
must=[
FieldCondition(
key="agent_id",
match=MatchValue(value=agent_id)
)
]
)
)
agent_count = filter_result.count if filter_result else 0
print(f" - agent_id='{agent_id}': {agent_count}")
return True
except Exception as e:
print(f"❌ 验证失败:{e}")
return False
def cleanup_old_collections():
"""清理旧 Collection(谨慎使用)"""
print(f"\n 清理旧 Collection...")
print(f" 警告:此操作不可逆!")
response = input("确认删除旧 Collection? (y/N): ")
if response.lower() != 'y':
print("❌ 已取消")
return
for old_collection in OLD_COLLECTIONS.keys():
try:
print(f"🗑 删除 {old_collection}...")
client.delete_collection(collection_name=old_collection)
print(f"✅ 已删除")
except Exception as e:
print(f"❌ 删除失败:{e}")
# ========== 主流程 ==========
def main():
print(f"\n📋 迁移计划:")
print(f" 新 Collection: {NEW_COLLECTION}")
print(f" 旧 Collection: {list(OLD_COLLECTIONS.keys())}")
print(f" Embedding 维度:{EMBEDDING_DIM}")
response = input("\n开始迁移?(y/N): ")
if response.lower() != 'y':
print("❌ 已取消")
sys.exit(0)
# 1. 创建新 Collection
if not create_new_collection():
print("❌ 无法创建新 Collection,终止迁移")
sys.exit(1)
# 2. 迁移数据
total_migrated = 0
for old_collection, agent_id in OLD_COLLECTIONS.items():
migrated = migrate_collection(old_collection, agent_id)
total_migrated += migrated
print(f"\n📊 总迁移:{total_migrated} 条记忆")
# 3. 验证
if not verify_migration():
print("❌ 验证失败,请手动检查")
sys.exit(1)
# 4. 清理(可选)
cleanup_old_collections()
print("\n✅ 迁移完成!")
print("\n📝 下一步:")
print(" 1. 重启 OpenClaw Gateway")
print(" 2. 测试记忆检索功能")
print(" 3. 监控日志确认无异常")
if __name__ == "__main__":
main()