🔒 You must be logged in as an Administrator or Editor to listen to this audio.
Python 中 DocumentProcessor 类逐行讲解
我给你逐行、超简单、大白话讲这段 Python 代码,完全不绕弯,新手也能看懂。
逐行讲解:DocumentProcessor 类
这是一个专门处理 LangChain 文档(Document)的工具类,作用是:清洗文本、过滤垃圾文档、记录处理轨迹。
1. 类定义 + 文档说明
class DocumentProcessor:
"""
LangChain Document专用处理器
"""
- 定义一个类,名字叫
DocumentProcessor - 注释:这是给 LangChain 文档 用的处理器
2. 初始化方法 init(创建对象时自动运行)
def __init__(self,
engine: Optional[CleaningEngine] = None,
config: Optional[Dict] = None):
self.engine = engine or CleaningEngine(config)
self.logger = logging.getLogger(self.__class__.__name__)
self.blacklist_rules: List[Callable[[LCDocument], bool] = []
self._init_blacklist()
self.lineage = LineageTracker()
逐行解释:
__init__:创建这个类的对象时,自动执行engine:文本清洗引擎(没有就自动新建一个)logger:日志工具,用来打印报错/提示blacklist_rules:黑名单规则列表,符合规则的文档会被扔掉_init_blacklist():加载预设的黑名单规则lineage:数据血缘追踪器,记录文档从哪来、被怎么处理过
3. 初始化黑名单规则
def _init_blacklist(self):
"""初始化黑名单规则"""
self.blacklist_rules = [
lambda doc: len(doc.page_content) < 10, # 文本太短
lambda doc: "内部废弃" in doc.page_content, # 包含废弃
lambda doc: doc.page_content.startswith("广告:"), # 广告开头
lambda doc: "草稿版" in doc.page_content and "正式版" not in doc, # 草稿无正式版
lambda doc: doc.metadata.get("status") == "deleted", # 元数据标记删除
]
- 给不需要的文档定义 5 条过滤规则
- 满足任意一条,文档就会被直接丢弃
4. 动态添加黑名单规则
def add_blacklist_rule(self, rule: Callable[[LCDocument], bool]):
"""动态添加黑名单规则"""
self.blacklist_rules.append(rule)
- 你可以随时新加一条过滤规则,很灵活
5. 判断文档是否要被过滤
def _should_filter(self, doc: LCDocument) -> Tuple[bool, str]:
"""检查是否应该过滤"""
for i, rule in enumerate(self.blacklist_rules):
try:
if rule(doc):
return True, f"rule_{i}"
except Exception as e:
self.logger.warning(f"Blacklist rule error: {e}")
return False, ""
- 一条一条检查黑名单
- 符合 → 返回
True(要过滤)+ 规则编号 - 不符合 → 返回
False(保留)
6. 生成“数据血缘”记录(处理轨迹)
def _generate_lineage(self, original_doc: LCDocument, result: CleanResult) -> DataLineage:
"""生成血缘记录"""
return DataLineage(
doc_id=original_doc.metadata.get("id", "unknown"),
transformation_type="document_cleaning",
input_hash=result.original_hash,
output_hash=hashlib.sha256(result.content.encode()).hexdigest()[:16],
cleaner_version=self.engine.VERSION,
rules_version="1.0",
timestamp=datetime.utcnow().isoformat(),
quality_metrics=result.metrics,
parent_ids=[]
)
- 记录:文档ID、清洗前后内容、时间、版本、质量指标
- 作用:可追溯、可审计,知道文档被怎么改过
7. 处理单个文档(核心方法)
def process(self, doc: LCDocument, options=None) -> Optional[CleanResult]:
"""处理单个Document"""
should_filter, reason = self._should_filter(doc)
if should_filter:
self.logger.debug(f"Document filtered: {reason}")
return None
result = self.engine.clean(
content=doc.page_content,
content_type_hint=doc.metadata.get("content_type"),
metadata={**doc.metadata, "processor": "DocumentProcessor"},
options=options
)
lineage = self._generate_lineage(doc, result)
self.lineage.record(lineage)
return result
核心流程 3 步:
- 先过滤:黑名单匹配 → 符合就扔掉
- 再清洗:调用清洗引擎清理文本
- 记录血缘:保存处理轨迹
- 返回清洗结果
8. 异步处理单个文档
async def process_async(...):
- 和上面
process功能一样 - 只是异步版本,不阻塞程序
9. 批量处理文档
def process_batch(self, docs: List[LCDocument], ...):
results = []
for doc in docs:
try:
result = self.process(doc, options)
results.append((doc, result))
except Exception as e:
报错日志...
return results
- 给一个文档列表,循环一个个处理
- 出错可以选择继续或停止
- 返回:原文档 + 处理结果
10. 批量异步处理(高性能)
async def process_batch_async(...):
# 先过滤
# 再批量清洗
# 最后记录血缘
- 先过滤掉垃圾文档
- 再批量异步清洗(速度快)
- 支持进度回调
11. 获取血缘报告 / 导出血缘
def get_lineage_report(self) -> Dict[str, Any]:
def export_lineage(self, filepath: str):
- 查看所有文档处理轨迹
- 导出成 JSON 文件
用一句话总结这个类
DocumentProcessor 是一个文档清洗 + 过滤 + 追踪工具:
- 过滤:太短、广告、废弃、草稿、已删除的文档直接扔掉
- 清洗:用清洗引擎清理文本
- 追踪:记录每篇文档的处理轨迹(血缘),可审计可回溯
- 支持同步/异步/批量处理
最简单的使用逻辑(你能马上看懂)
# 1. 创建处理器
processor = DocumentProcessor()
# 2. 处理一篇文档
result = processor.process(你的文档)
# 3. 被过滤 → result = None
# 没被过滤 → result = 清洗后的内容
总结
- 这是企业级 RAG 系统里非常标准的文档预处理类
- 核心功能:过滤垃圾 + 文本清洗 + 数据血缘
- 支持同步、异步、批量,非常健壮
- 代码结构清晰,可直接用于生产环境
核心流程图(极简版)及 20 行极简代码说明
- 核心流程图(极简版)
暂时无法在豆包文档外展示此内容 2. 20行极简代码(保留核心逻辑)
import logging
from typing import List, Callable, Optional
class DocumentProcessor:
def __init__(self):
self.blacklist = [lambda d: len(d.page_content)<10, lambda d:"广告:"in d.page_content]
self.logger = logging.getLogger(self.__class__.__name__)
def _should_filter(self, doc):
return any(rule(doc) for rule in self.blacklist)
def process(self, doc, clean_func):
if self._should_filter(doc):
self.logger.debug("Filtered")
return None
cleaned = clean_func(doc.page_content)
return {"content": cleaned, "hash": hash(cleaned)}
def process_batch(self, docs, clean_func):
return [(d, self.process(d, clean_func)) for d in docs]
极简版说明
- 保留:过滤 + 清洗 + 批量 三大核心
- 去掉:异步、血缘、复杂配置、异常捕获(便于理解)
- 外部传入 clean_func 即可使用