文档分析教程¶
高效处理大量文档集合。
你将学到什么¶
本教程涵盖处理大型文档集合的策略: - 批处理技术 - 并行化策略 - 错误处理和恢复 - 性能优化 - 内存管理
用例¶
用例 1:文档档案处理¶
处理数千份历史文档: - 法律文档档案 - 医疗记录 - 研究论文集合
用例 2:实时文档管道¶
持续处理传入文档: - 新闻源分析 - 社交媒体监控 - 邮件处理
用例 3:大规模分析¶
一次性处理海量数据集: - 企业文档仓库 - 政府档案 - 学术数据库
架构¶
document-analysis/
├── input/ # 输入文档
│ ├── pending/ # 待处理文档
│ ├── processing/ # 正在处理
│ └── completed/ # 成功处理
├── output/ # 提取的知识库
│ ├── batch_001/
│ ├── batch_002/
│ └── combined/
├── logs/ # 处理日志
├── batch_processor.py # 主处理器
└── config.yaml # 配置
策略¶
策略 1:简单批处理¶
处理固定大小的批次文档。
策略 2:并行处理¶
使用多个工作线程并发处理文档。
策略 3:流式处理¶
文档到达时立即处理,无需等待批次。
完整示例¶
"""批处理文档处理器。"""
import os
import json
import logging
from pathlib import Path
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Optional
from hyperextract import Template
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/processor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class ProcessingResult:
document: str
success: bool
output_path: Optional[str]
error: Optional[str]
processing_time: float
class BatchDocumentProcessor:
"""处理大量文档集合。"""
def __init__(
self,
template: str = "general/graph",
language: str = "en",
batch_size: int = 10,
max_workers: int = 4
):
self.template = template
self.language = language
self.batch_size = batch_size
self.max_workers = max_workers
# 确保目录存在
for dir_name in ['input/pending', 'input/processing', 'input/completed',
'output', 'logs']:
Path(dir_name).mkdir(parents=True, exist_ok=True)
def get_pending_documents(self) -> List[Path]:
"""获取待处理文档列表。"""
pending_dir = Path("input/pending")
docs = []
docs.extend(pending_dir.glob("**/*.md"))
docs.extend(pending_dir.glob("**/*.txt"))
return sorted(docs)
def process_single_document(self, doc_path: Path) -> ProcessingResult:
"""处理单个文档。"""
import time
start_time = time.time()
try:
# 移至处理中
processing_path = Path("input/processing") / doc_path.name
doc_path.rename(processing_path)
# 读取文档
logger.info(f"处理: {doc_path.name}")
text = processing_path.read_text(encoding="utf-8")
# 提取知识
ka = Template.create(self.template, self.language)
result = ka.parse(text)
# 构建索引
result.build_index()
# 保存输出
output_name = doc_path.stem
output_path = Path("output") / output_name
result.dump(str(output_path))
# 移至已完成
completed_path = Path("input/completed") / doc_path.name
processing_path.rename(completed_path)
processing_time = time.time() - start_time
logger.info(f"✓ 完成: {doc_path.name} ({processing_time:.2f}s)")
return ProcessingResult(
document=str(doc_path),
success=True,
output_path=str(output_path),
error=None,
processing_time=processing_time
)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"✗ 失败: {doc_path.name} - {str(e)}")
# 移回待处理以便重试
if processing_path.exists():
processing_path.rename(doc_path)
return ProcessingResult(
document=str(doc_path),
success=False,
output_path=None,
error=str(e),
processing_time=processing_time
)
def process_batch(self, documents: List[Path]) -> List[ProcessingResult]:
"""处理一批文档。"""
results = []
# 顺序处理(内存效率)
for doc in documents:
result = self.process_single_document(doc)
results.append(result)
return results
def process_parallel(self, documents: List[Path]) -> List[ProcessingResult]:
"""并行处理文档。"""
results = []
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_doc = {
executor.submit(self.process_single_document, doc): doc
for doc in documents
}
# 收集完成的结果
for future in as_completed(future_to_doc):
result = future.result()
results.append(result)
return results
def run(self, parallel: bool = False):
"""运行批处理器。"""
# 获取待处理文档
documents = self.get_pending_documents()
if not documents:
logger.info("没有待处理文档")
return []
logger.info(f"发现 {len(documents)} 个文档待处理")
# 分批处理
all_results = []
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
logger.info(f"处理批次 {i//self.batch_size + 1}: {len(batch)} 个文档")
if parallel:
results = self.process_parallel(batch)
else:
results = self.process_batch(batch)
all_results.extend(results)
# 记录批次摘要
successes = sum(1 for r in results if r.success)
logger.info(f"批次完成: {successes}/{len(batch)} 成功")
# 最终摘要
total_success = sum(1 for r in all_results if r.success)
logger.info(f"处理完成: {total_success}/{len(documents)} 成功")
# 保存结果日志
self._save_results_log(all_results)
return all_results
def _save_results_log(self, results: List[ProcessingResult]):
"""保存处理结果到日志文件。"""
log_data = [
{
"document": r.document,
"success": r.success,
"output_path": r.output_path,
"error": r.error,
"processing_time": r.processing_time,
"timestamp": datetime.now().isoformat()
}
for r in results
]
log_file = f"logs/results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(log_file, "w") as f:
json.dump(log_data, f, indent=2)
logger.info(f"结果日志已保存: {log_file}")
def combine_results(self, output_name: str = "combined"):
"""将所有单独结果合并为一个知识库。"""
logger.info("合并结果...")
output_dirs = [d for d in Path("output").iterdir() if d.is_dir()]
if not output_dirs:
logger.warning("未找到输出目录")
return
# 加载第一个结果
ka = Template.create(self.template, self.language)
combined = ka.load(str(output_dirs[0]))
# 合并剩余结果
for output_dir in output_dirs[1:]:
ka = Template.create(self.template, self.language)
ka.load(str(output_dir))
# 合并实体和关系
for entity in ka.data.entities:
if entity not in combined.data.entities:
combined.data.entities.append(entity)
for relation in ka.data.relations:
if relation not in combined.data.relations:
combined.data.relations.append(relation)
# 重建索引并保存
combined.build_index()
combined_path = Path("output") / output_name
combined.dump(str(combined_path))
logger.info(f"合并知识库已保存: {combined_path}")
return combined
# CLI 接口
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="批处理文档处理器")
parser.add_argument("--template", default="general/graph", help="使用的模板")
parser.add_argument("--language", default="en", help="文档语言")
parser.add_argument("--batch-size", type=int, default=10, help="批次大小")
parser.add_argument("--parallel", action="store_true", help="启用并行处理")
parser.add_argument("--combine", action="store_true", help="处理后合并结果")
args = parser.parse_args()
processor = BatchDocumentProcessor(
template=args.template,
language=args.language,
batch_size=args.batch_size
)
# 处理文档
results = processor.run(parallel=args.parallel)
# 如需要则合并
if args.combine:
processor.combine_results()
使用说明¶
设置¶
# 创建目录结构
mkdir -p input/pending input/processing input/completed output logs
# 添加待处理文档
cp /path/to/documents/*.md input/pending/
运行¶
# 顺序处理
python batch_processor.py
# 并行处理
python batch_processor.py --parallel --max-workers 4
# 使用自定义模板
python batch_processor.py --template finance/earnings_summary
# 处理并合并结果
python batch_processor.py --combine
性能提示¶
1. 批次大小¶
# 内存受限环境使用小批次
processor = BatchDocumentProcessor(batch_size=5)
# I/O 密集型处理使用大批次
processor = BatchDocumentProcessor(batch_size=50)
2. 并行工作线程¶
# CPU 密集型: 匹配 CPU 核心数
processor = BatchDocumentProcessor(max_workers=os.cpu_count())
# API 密集型: 限制以避免速率限制
processor = BatchDocumentProcessor(max_workers=5)
3. 内存管理¶
# 分小块处理
def process_large_dataset(documents, chunk_size=100):
for i in range(0, len(documents), chunk_size):
chunk = documents[i:i + chunk_size]
results = processor.process_batch(chunk)
# 保存中间结果
save_checkpoint(results, i)
# 清理内存
gc.collect()
错误处理¶
重试逻辑¶
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def process_with_retry(doc_path):
return processor.process_single_document(doc_path)
失败文档队列¶
def process_with_fallback(documents):
failed = []
for doc in documents:
result = processor.process_single_document(doc)
if not result.success:
failed.append(doc)
# 使用不同设置重试失败文档
for doc in failed:
# 尝试使用更小的分块
result = processor.process_single_document(
doc,
chunk_size=1024 # 更小的分块
)
监控¶
进度跟踪¶
from tqdm import tqdm
def process_with_progress(documents):
results = []
with tqdm(total=len(documents), desc="处理中") as pbar:
for doc in documents:
result = processor.process_single_document(doc)
results.append(result)
pbar.update(1)
# 更新描述
success_rate = sum(1 for r in results if r.success) / len(results)
pbar.set_postfix({"成功率": f"{success_rate:.1%}"})
return results
指标收集¶
import time
from dataclasses import asdict
def collect_metrics(results: List[ProcessingResult]):
metrics = {
"总文档数": len(results),
"成功": sum(1 for r in results if r.success),
"失败": sum(1 for r in results if not r.success),
"平均处理时间": sum(r.processing_time for r in results) / len(results),
"总时间": sum(r.processing_time for r in results),
"错误": [r.error for r in results if r.error]
}
with open("logs/metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
return metrics
高级:流式处理器¶
用于实时文档处理:
import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class StreamingProcessor(FileSystemEventHandler):
"""文档到达时立即处理。"""
def __init__(self):
self.processor = BatchDocumentProcessor()
self.queue = asyncio.Queue()
def on_created(self, event):
if not event.is_directory:
self.queue.put_nowait(event.src_path)
async def process_queue(self):
while True:
file_path = await self.queue.get()
result = self.processor.process_single_document(Path(file_path))
if result.success:
logger.info(f"已处理: {file_path}")
else:
logger.error(f"失败: {file_path} - {result.error}")
self.queue.task_done()
def start(self, watch_dir: str = "input/pending"):
# 启动文件监视器
observer = Observer()
observer.schedule(self, watch_dir, recursive=True)
observer.start()
# 启动处理循环
asyncio.run(self.process_queue())
总结¶
您现在拥有一个完整的批处理系统,可以:
✓ 处理大型文档集合
✓ 处理错误和重试
✓ 并行运行以提高速度
✓ 监控进度和收集指标
✓ 将结果合并为统一的知识库
下一步¶
- 部署为服务
- 添加 REST API 接口
- 实现实时流式处理
- 使用分布式处理扩展