#!/usr/bin/env python3 """ Memory cleanup and audit script. 此脚本只操作 Qdrant,无需 LLM/Embedding API key。 Stats mode (default / --dry-run): python3 memory_cleanup.py --dry-run Cleanup mode (requires --execute): python3 memory_cleanup.py --execute Retention policy (aligned with EXPIRATION_MAP in mem0_client.py): session -> 7 days chat_summary -> 30 days preference -> never auto-delete knowledge -> never auto-delete 删除逻辑优先级: 1. payload 中存在 expiration_date 字段 → 按该字段判断是否过期 2. 无 expiration_date → 按 timestamp(写入时间)+ RETENTION_DAYS 判断 3. --max-age-days 可强制覆盖所有类型的阈值(用于紧急清理) """ import os import sys import argparse import logging import yaml from pathlib import Path from datetime import datetime, timedelta, timezone try: from qdrant_client import QdrantClient from qdrant_client.models import ( Filter, FieldCondition, MatchValue, PointIdsList, ) except ImportError: print("qdrant-client not installed") sys.exit(1) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger(__name__) QDRANT_HOST = os.getenv('MEM0_QDRANT_HOST', 'localhost') QDRANT_PORT = int(os.getenv('MEM0_QDRANT_PORT', '6333')) COLLECTION = 'mem0_v4_shared' RETENTION_DAYS = { 'session': 7, 'chat_summary': 30, } AUDIT_LOG_DIR = Path('/root/.openclaw/workspace/logs/security') def _load_agent_ids(): try: agents_yaml = Path('/root/.openclaw/workspace/agents.yaml') with open(agents_yaml, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) or {} return list(data.get('agents', {}).keys()) + ['general'] except Exception: return ['main', 'general'] def get_stats(client: QdrantClient): """Print memory counts by agent_id, visibility, and memory_type.""" total = client.count(collection_name=COLLECTION) logger.info(f"Collection '{COLLECTION}' total: {total.count}") agent_ids = _load_agent_ids() for field, values in [ ('agent_id', agent_ids), ('visibility', ['public', 'project', 'private']), ('memory_type', ['session', 'chat_summary', 'preference', 'knowledge']), ]: logger.info(f"\n--- {field} ---") for val in values: try: result = client.count( collection_name=COLLECTION, count_filter=Filter(must=[ FieldCondition(key=field, match=MatchValue(value=val)) ]) ) if result.count > 0: logger.info(f" {field}={val}: {result.count}") except Exception: pass return total.count def _parse_dt(s: str) -> datetime: """将 ISO 字符串解析为 aware datetime(UTC)。""" s = s.replace('Z', '+00:00') dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt def _find_expired_points(client: QdrantClient, memory_type: str, cutoff_days: int): """ 扫描指定 memory_type 的记录,返回已过期的 ID 列表。 优先级: 1. payload['expiration_date'] 存在 → 与当前时间比较 2. 不存在 → 用 payload['timestamp'] + cutoff_days 判断 """ now = datetime.now(timezone.utc) cutoff_by_age = now - timedelta(days=cutoff_days) expired_ids = [] offset = None while True: results, next_offset = client.scroll( collection_name=COLLECTION, scroll_filter=Filter(must=[ FieldCondition(key="memory_type", match=MatchValue(value=memory_type)) ]), limit=500, offset=offset, with_payload=True, with_vectors=False, ) if not results: break for point in results: payload = point.payload or {} # 优先读取 expiration_date 字段(mem0 写入时设置) exp_raw = (payload.get('expiration_date') or payload.get('data', {}).get('expiration_date')) if exp_raw: try: if now > _parse_dt(str(exp_raw)): expired_ids.append(point.id) except (TypeError, ValueError): pass continue # 有 expiration_date 就不再走 timestamp fallback # Fallback:按写入时间 + RETENTION_DAYS 判断 ts_raw = payload.get('timestamp') or payload.get('created_at', '') if not ts_raw: continue try: if _parse_dt(str(ts_raw)) < cutoff_by_age: expired_ids.append(point.id) except (TypeError, ValueError): continue if next_offset is None: break offset = next_offset return expired_ids def cleanup_expired(client: QdrantClient, max_age_days: int | None, execute: bool): """ 识别并可选删除过期的 session/chat_summary 记忆。 cutoff_days 逻辑: - max_age_days 为 None → 使用 RETENTION_DAYS 中的每类型默认值 - max_age_days 有值 → 强制覆盖所有类型(用于紧急清理) """ total_deleted = 0 results_summary = [] for memory_type, default_days in RETENTION_DAYS.items(): cutoff_days = max_age_days if max_age_days is not None else default_days logger.info(f"\nScanning memory_type={memory_type} (cutoff: {cutoff_days} days)...") expired_ids = _find_expired_points(client, memory_type, cutoff_days) if not expired_ids: logger.info(f" No expired {memory_type} memories found.") results_summary.append((memory_type, 0)) continue logger.info(f" Found {len(expired_ids)} expired {memory_type} memories") if execute: batch_size = 100 for i in range(0, len(expired_ids), batch_size): batch = expired_ids[i:i + batch_size] client.delete( collection_name=COLLECTION, points_selector=PointIdsList(points=batch), ) logger.info(f" DELETED {len(expired_ids)} {memory_type} memories") total_deleted += len(expired_ids) else: logger.info(f" [dry-run] Would delete {len(expired_ids)} {memory_type} memories") results_summary.append((memory_type, len(expired_ids))) return total_deleted, results_summary def _write_audit_log(total_before, total_deleted, results_summary, max_age_days, execute): AUDIT_LOG_DIR.mkdir(parents=True, exist_ok=True) log_file = AUDIT_LOG_DIR / f"memory-cleanup-{datetime.now().strftime('%Y-%m-%d')}.log" with open(log_file, 'a', encoding='utf-8') as f: f.write(f"\n{'='*60}\n") f.write(f"Memory Cleanup - {datetime.now().isoformat()}\n") f.write(f"Mode: {'EXECUTE' if execute else 'DRY-RUN'}\n") f.write(f"Max age: {max_age_days} days\n") f.write(f"Total before: {total_before}\n") for mtype, count in results_summary: f.write(f" {mtype}: {count} expired\n") f.write(f"Total deleted: {total_deleted}\n") f.write(f"Total after: {total_before - total_deleted}\n") logger.info(f"Audit log: {log_file}") def main(): parser = argparse.ArgumentParser(description='Mem0 memory cleanup and audit') parser.add_argument('--dry-run', action='store_true', help='Show stats and expired counts without deleting (default behavior)') parser.add_argument('--execute', action='store_true', help='Actually delete expired memories (requires this flag)') parser.add_argument('--max-age-days', type=int, default=None, help='强制覆盖所有类型的阈值(天数);不指定则按 RETENTION_DAYS 每类型策略') args = parser.parse_args() if args.execute and args.dry_run: logger.error("Cannot use --execute and --dry-run together") sys.exit(1) execute = args.execute if not execute: args.dry_run = True client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) logger.info("=" * 60) logger.info(f"Memory Cleanup - {datetime.now().strftime('%Y-%m-%d %H:%M')}") logger.info(f"Mode: {'EXECUTE' if execute else 'DRY-RUN (use --execute to delete)'}") if args.max_age_days is not None: logger.info(f"Max age override: {args.max_age_days} days (all types)") else: logger.info(f"Retention: session={RETENTION_DAYS['session']}d, " f"chat_summary={RETENTION_DAYS['chat_summary']}d, " f"preference=permanent, knowledge=permanent") logger.info("=" * 60) total_before = get_stats(client) total_deleted, results_summary = cleanup_expired( client, args.max_age_days, execute ) if execute and total_deleted > 0: logger.info(f"\nPost-cleanup stats:") get_stats(client) _write_audit_log(total_before, total_deleted, results_summary, args.max_age_days, execute) logger.info(f"\nSummary: {total_deleted} memories " f"{'deleted' if execute else 'would be deleted (dry-run)'}.") if __name__ == '__main__': main()