You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
6.1 KiB

#!/usr/bin/env python3
"""
Layer 3 本地搜索 Layer 4 (Qdrant) 不可达时的 fallback
基于 SQLite FTS5 全文检索零额外内存开销
用法:
from local_search import LocalSearchFallback
fb = LocalSearchFallback(agent_id='main')
fb.rebuild_index() # 重建索引(启动或 MEMORY.md 变更时)
results = fb.search("Qdrant 配置") # 搜索
"""
import sqlite3
import logging
import yaml
from pathlib import Path
from typing import List, Dict
logger = logging.getLogger(__name__)
WORKSPACE_ROOT = Path('/root/.openclaw/workspace')
AGENTS_YAML = WORKSPACE_ROOT / 'agents.yaml'
SHARED_PATHS = [
WORKSPACE_ROOT / 'CORE_INDEX.md',
WORKSPACE_ROOT / 'IDENTITY.md',
WORKSPACE_ROOT / 'SOUL.md',
]
def _load_agent_memory_paths() -> Dict[str, List[Path]]:
"""Build AGENT_MEMORY_PATHS dynamically from agents.yaml."""
result = {}
try:
with open(AGENTS_YAML, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
for aid, agent in data.get('agents', {}).items():
ws = Path(agent.get('workspace', ''))
if ws.exists():
paths = []
mem_file = ws / 'MEMORY.md'
mem_dir = ws / 'memory'
if mem_file.exists():
paths.append(mem_file)
if mem_dir.exists():
paths.append(mem_dir)
if paths:
result[aid] = paths
except Exception as e:
logger.debug(f"Failed to load agents.yaml: {e}")
if not result:
result['main'] = [WORKSPACE_ROOT / 'MEMORY.md', WORKSPACE_ROOT / 'memory']
return result
AGENT_MEMORY_PATHS = _load_agent_memory_paths()
def _is_cjk(ch: str) -> bool:
cp = ord(ch)
return (0x4E00 <= cp <= 0x9FFF
or 0x3400 <= cp <= 0x4DBF
or 0xF900 <= cp <= 0xFAFF)
def _tokenize_chinese(text: str) -> str:
"""简易中文分词:CJK 字符逐字拆分 + ASCII 单词保持完整,过滤标点"""
tokens = []
buf = []
for ch in text:
if ch.isascii() and ch.isalnum():
buf.append(ch)
else:
if buf:
tokens.append(''.join(buf))
buf = []
if _is_cjk(ch):
tokens.append(ch)
if buf:
tokens.append(''.join(buf))
return ' '.join(tokens)
class LocalSearchFallback:
"""基于 SQLite FTS5 的本地全文检索"""
def __init__(self, agent_id: str = 'main', db_path: str = None):
self.agent_id = agent_id
if db_path is None:
cache_dir = Path(f'/root/.openclaw/agents/{agent_id}/qmd/xdg-cache/qmd')
cache_dir.mkdir(parents=True, exist_ok=True)
db_path = str(cache_dir / 'fts5_index.sqlite')
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts
USING fts5(
title,
content,
source_path,
agent_id UNINDEXED,
tokenize='unicode61'
)
''')
conn.commit()
conn.close()
def rebuild_index(self):
"""重建 FTS5 索引,扫描 agent 相关的所有 memory 文件"""
conn = sqlite3.connect(self.db_path)
conn.execute('DELETE FROM memory_fts')
paths = list(SHARED_PATHS)
agent_paths = AGENT_MEMORY_PATHS.get(self.agent_id, [])
for p in agent_paths:
if p.is_file():
paths.append(p)
elif p.is_dir():
paths.extend(p.rglob('*.md'))
indexed = 0
for filepath in paths:
if not filepath.exists():
continue
try:
text = filepath.read_text(encoding='utf-8')
title = filepath.stem
tokenized = _tokenize_chinese(text)
conn.execute(
'INSERT INTO memory_fts(title, content, source_path, agent_id) VALUES (?, ?, ?, ?)',
(title, tokenized, str(filepath), self.agent_id)
)
indexed += 1
except Exception as e:
logger.debug(f"索引文件失败 {filepath}: {e}")
conn.commit()
conn.close()
logger.info(f"FTS5 索引重建完成: {indexed} 个文件 (agent={self.agent_id})")
return indexed
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""全文检索"""
tokenized_query = _tokenize_chinese(query)
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute(
'''SELECT title, snippet(memory_fts, 1, '>>>', '<<<', '...', 64) as snippet,
source_path, rank
FROM memory_fts
WHERE memory_fts MATCH ?
ORDER BY rank
LIMIT ?''',
(tokenized_query, top_k)
)
results = []
for row in cursor:
results.append({
'title': row[0],
'snippet': row[1],
'source': row[2],
'score': -row[3],
})
return results
except Exception as e:
logger.debug(f"FTS5 检索失败: {e}")
return []
finally:
conn.close()
def get_stats(self) -> Dict:
conn = sqlite3.connect(self.db_path)
try:
row = conn.execute('SELECT COUNT(*) FROM memory_fts').fetchone()
return {'indexed_documents': row[0] if row else 0, 'db_path': self.db_path}
finally:
conn.close()
if __name__ == '__main__':
import sys
agent = sys.argv[1] if len(sys.argv) > 1 else 'main'
fb = LocalSearchFallback(agent_id=agent)
count = fb.rebuild_index()
print(f"Indexed {count} files for agent '{agent}'")
if len(sys.argv) > 2:
query = ' '.join(sys.argv[2:])
results = fb.search(query)
for r in results:
print(f" [{r['score']:.2f}] {r['title']}: {r['snippet'][:100]}")