Python 中 DocumentProcessor 类逐行讲解

🔒 You must be logged in as an Administrator or Editor to listen to this audio.

Python 中 DocumentProcessor 类逐行讲解

我给你逐行、超简单、大白话讲这段 Python 代码,完全不绕弯,新手也能看懂。

逐行讲解:DocumentProcessor 类

这是一个专门处理 LangChain 文档(Document)的工具类,作用是:清洗文本、过滤垃圾文档、记录处理轨迹。


1. 类定义 + 文档说明

class DocumentProcessor:
    """
    LangChain Document专用处理器
    """
  • 定义一个类,名字叫 DocumentProcessor
  • 注释:这是给 LangChain 文档 用的处理器

2. 初始化方法 init(创建对象时自动运行)

def __init__(self,
             engine: Optional[CleaningEngine] = None,
             config: Optional[Dict] = None):
    self.engine = engine or CleaningEngine(config)
    self.logger = logging.getLogger(self.__class__.__name__)
    self.blacklist_rules: List[Callable[[LCDocument], bool] = []
    self._init_blacklist()
    self.lineage = LineageTracker()

逐行解释:

  1. __init__:创建这个类的对象时,自动执行
  2. engine:文本清洗引擎(没有就自动新建一个)
  3. logger:日志工具,用来打印报错/提示
  4. blacklist_rules黑名单规则列表,符合规则的文档会被扔掉
  5. _init_blacklist():加载预设的黑名单规则
  6. lineage数据血缘追踪器,记录文档从哪来、被怎么处理过

3. 初始化黑名单规则

def _init_blacklist(self):
    """初始化黑名单规则"""
    self.blacklist_rules = [
        lambda doc: len(doc.page_content) < 10,  # 文本太短
        lambda doc: "内部废弃" in doc.page_content,  # 包含废弃
        lambda doc: doc.page_content.startswith("广告:"),  # 广告开头
        lambda doc: "草稿版" in doc.page_content and "正式版" not in doc,  # 草稿无正式版
        lambda doc: doc.metadata.get("status") == "deleted",  # 元数据标记删除
    ]
  • 不需要的文档定义 5 条过滤规则
  • 满足任意一条,文档就会被直接丢弃

4. 动态添加黑名单规则

def add_blacklist_rule(self, rule: Callable[[LCDocument], bool]):
    """动态添加黑名单规则"""
    self.blacklist_rules.append(rule)
  • 你可以随时新加一条过滤规则,很灵活

5. 判断文档是否要被过滤

def _should_filter(self, doc: LCDocument) -> Tuple[bool, str]:
    """检查是否应该过滤"""
    for i, rule in enumerate(self.blacklist_rules):
        try:
            if rule(doc):
                return True, f"rule_{i}"
        except Exception as e:
            self.logger.warning(f"Blacklist rule error: {e}")
    return False, ""
  • 一条一条检查黑名单
  • 符合 → 返回 True(要过滤)+ 规则编号
  • 不符合 → 返回 False(保留)

6. 生成“数据血缘”记录(处理轨迹)

def _generate_lineage(self, original_doc: LCDocument, result: CleanResult) -> DataLineage:
    """生成血缘记录"""
    return DataLineage(
        doc_id=original_doc.metadata.get("id", "unknown"),
        transformation_type="document_cleaning",
        input_hash=result.original_hash,
        output_hash=hashlib.sha256(result.content.encode()).hexdigest()[:16],
        cleaner_version=self.engine.VERSION,
        rules_version="1.0",
        timestamp=datetime.utcnow().isoformat(),
        quality_metrics=result.metrics,
        parent_ids=[]
    )
  • 记录:文档ID、清洗前后内容、时间、版本、质量指标
  • 作用:可追溯、可审计,知道文档被怎么改过

7. 处理单个文档(核心方法)

def process(self, doc: LCDocument, options=None) -> Optional[CleanResult]:
    """处理单个Document"""
    should_filter, reason = self._should_filter(doc)
    if should_filter:
        self.logger.debug(f"Document filtered: {reason}")
        return None

    result = self.engine.clean(
        content=doc.page_content,
        content_type_hint=doc.metadata.get("content_type"),
        metadata={**doc.metadata, "processor": "DocumentProcessor"},
        options=options
    )

    lineage = self._generate_lineage(doc, result)
    self.lineage.record(lineage)

    return result

核心流程 3 步:

  1. 先过滤:黑名单匹配 → 符合就扔掉
  2. 再清洗:调用清洗引擎清理文本
  3. 记录血缘:保存处理轨迹
  4. 返回清洗结果

8. 异步处理单个文档

async def process_async(...):
  • 和上面 process 功能一样
  • 只是异步版本,不阻塞程序

9. 批量处理文档

def process_batch(self, docs: List[LCDocument], ...):
    results = []
    for doc in docs:
        try:
            result = self.process(doc, options)
            results.append((doc, result))
        except Exception as e:
            报错日志...
    return results
  • 给一个文档列表,循环一个个处理
  • 出错可以选择继续或停止
  • 返回:原文档 + 处理结果

10. 批量异步处理(高性能)

async def process_batch_async(...):
    # 先过滤
    # 再批量清洗
    # 最后记录血缘
  • 先过滤掉垃圾文档
  • 批量异步清洗(速度快)
  • 支持进度回调

11. 获取血缘报告 / 导出血缘

def get_lineage_report(self) -> Dict[str, Any]:
def export_lineage(self, filepath: str):
  • 查看所有文档处理轨迹
  • 导出成 JSON 文件

用一句话总结这个类

DocumentProcessor 是一个文档清洗 + 过滤 + 追踪工具:

  1. 过滤:太短、广告、废弃、草稿、已删除的文档直接扔掉
  2. 清洗:用清洗引擎清理文本
  3. 追踪:记录每篇文档的处理轨迹(血缘),可审计可回溯
  4. 支持同步/异步/批量处理

最简单的使用逻辑(你能马上看懂)

# 1. 创建处理器
processor = DocumentProcessor()

# 2. 处理一篇文档
result = processor.process(你的文档)

# 3. 被过滤 → result = None
# 没被过滤 → result = 清洗后的内容

总结

  • 这是企业级 RAG 系统里非常标准的文档预处理类
  • 核心功能:过滤垃圾 + 文本清洗 + 数据血缘
  • 支持同步、异步、批量,非常健壮
  • 代码结构清晰,可直接用于生产环境

核心流程图(极简版)及 20 行极简代码说明

  1. 核心流程图(极简版)

暂时无法在豆包文档外展示此内容 2. 20行极简代码(保留核心逻辑)

import logging
from typing import List, Callable, Optional

class DocumentProcessor:
    def __init__(self):
        self.blacklist = [lambda d: len(d.page_content)<10, lambda d:"广告:"in d.page_content]
        self.logger = logging.getLogger(self.__class__.__name__)

    def _should_filter(self, doc):
        return any(rule(doc) for rule in self.blacklist)

    def process(self, doc, clean_func):
        if self._should_filter(doc):
            self.logger.debug("Filtered")
            return None
        cleaned = clean_func(doc.page_content)
        return {"content": cleaned, "hash": hash(cleaned)}

    def process_batch(self, docs, clean_func):
        return [(d, self.process(d, clean_func)) for d in docs]

极简版说明

  • 保留:过滤 + 清洗 + 批量 三大核心
  • 去掉:异步、血缘、复杂配置、异常捕获(便于理解)
  • 外部传入 clean_func 即可使用