Skip to content

Document Analysis Tutorial

Process large collections of documents efficiently.


What You'll Learn

This tutorial covers strategies for processing large document collections: - Batch processing techniques - Parallelization strategies - Error handling and recovery - Performance optimization - Memory management


Use Cases

Use Case 1: Document Archive Processing

Process thousands of historical documents: - Legal document archives - Medical records - Research paper collections

Use Case 2: Real-time Document Pipeline

Continuous processing of incoming documents: - News feed analysis - Social media monitoring - Email processing

Use Case 3: Large-scale Analysis

One-time processing of massive datasets: - Corporate document repositories - Government archives - Academic databases


Architecture

document-analysis/
├── input/              # Input documents
│   ├── pending/       # Documents to process
│   ├── processing/    # Currently processing
│   └── completed/     # Successfully processed
├── output/            # Extracted knowledge abstracts
│   ├── batch_001/
│   ├── batch_002/
│   └── combined/
├── logs/              # Processing logs
├── batch_processor.py # Main processor
└── config.yaml        # Configuration

Strategies

Strategy 1: Simple Batching

Process documents in fixed-size batches.

Strategy 2: Parallel Processing

Use multiple workers to process documents concurrently.

Strategy 3: Streaming

Process documents as they arrive without waiting for batches.


Complete Example

"""Batch Document Processor."""

import os
import json
import logging
from pathlib import Path
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Optional

from hyperextract import Template

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/processor.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


@dataclass
class ProcessingResult:
    document: str
    success: bool
    output_path: Optional[str]
    error: Optional[str]
    processing_time: float


class BatchDocumentProcessor:
    """Process large collections of documents."""

    def __init__(
        self,
        template: str = "general/graph",
        language: str = "en",
        batch_size: int = 10,
        max_workers: int = 4
    ):
        self.template = template
        self.language = language
        self.batch_size = batch_size
        self.max_workers = max_workers

        # Ensure directories exist
        for dir_name in ['input/pending', 'input/processing', 'input/completed', 
                        'output', 'logs']:
            Path(dir_name).mkdir(parents=True, exist_ok=True)

    def get_pending_documents(self) -> List[Path]:
        """Get list of pending documents."""
        pending_dir = Path("input/pending")
        docs = []
        docs.extend(pending_dir.glob("**/*.md"))
        docs.extend(pending_dir.glob("**/*.txt"))
        return sorted(docs)

    def process_single_document(self, doc_path: Path) -> ProcessingResult:
        """Process a single document."""
        import time
        start_time = time.time()

        try:
            # Move to processing
            processing_path = Path("input/processing") / doc_path.name
            doc_path.rename(processing_path)

            # Read document
            logger.info(f"Processing: {doc_path.name}")
            text = processing_path.read_text(encoding="utf-8")

            # Extract knowledge
            ka = Template.create(self.template, self.language)
            result = ka.parse(text)

            # Build index
            result.build_index()

            # Save output
            output_name = doc_path.stem
            output_path = Path("output") / output_name
            result.dump(str(output_path))

            # Move to completed
            completed_path = Path("input/completed") / doc_path.name
            processing_path.rename(completed_path)

            processing_time = time.time() - start_time
            logger.info(f"✓ Completed: {doc_path.name} ({processing_time:.2f}s)")

            return ProcessingResult(
                document=str(doc_path),
                success=True,
                output_path=str(output_path),
                error=None,
                processing_time=processing_time
            )

        except Exception as e:
            processing_time = time.time() - start_time
            logger.error(f"✗ Failed: {doc_path.name} - {str(e)}")

            # Move back to pending for retry
            if processing_path.exists():
                processing_path.rename(doc_path)

            return ProcessingResult(
                document=str(doc_path),
                success=False,
                output_path=None,
                error=str(e),
                processing_time=processing_time
            )

    def process_batch(self, documents: List[Path]) -> List[ProcessingResult]:
        """Process a batch of documents."""
        results = []

        # Sequential processing (for memory efficiency)
        for doc in documents:
            result = self.process_single_document(doc)
            results.append(result)

        return results

    def process_parallel(self, documents: List[Path]) -> List[ProcessingResult]:
        """Process documents in parallel."""
        results = []

        with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_doc = {
                executor.submit(self.process_single_document, doc): doc 
                for doc in documents
            }

            # Collect results as they complete
            for future in as_completed(future_to_doc):
                result = future.result()
                results.append(result)

        return results

    def run(self, parallel: bool = False):
        """Run the batch processor."""
        # Get pending documents
        documents = self.get_pending_documents()

        if not documents:
            logger.info("No pending documents to process")
            return []

        logger.info(f"Found {len(documents)} documents to process")

        # Process in batches
        all_results = []

        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            logger.info(f"Processing batch {i//self.batch_size + 1}: {len(batch)} documents")

            if parallel:
                results = self.process_parallel(batch)
            else:
                results = self.process_batch(batch)

            all_results.extend(results)

            # Log batch summary
            successes = sum(1 for r in results if r.success)
            logger.info(f"Batch complete: {successes}/{len(batch)} successful")

        # Final summary
        total_success = sum(1 for r in all_results if r.success)
        logger.info(f"Processing complete: {total_success}/{len(documents)} successful")

        # Save results log
        self._save_results_log(all_results)

        return all_results

    def _save_results_log(self, results: List[ProcessingResult]):
        """Save processing results to log file."""
        log_data = [
            {
                "document": r.document,
                "success": r.success,
                "output_path": r.output_path,
                "error": r.error,
                "processing_time": r.processing_time,
                "timestamp": datetime.now().isoformat()
            }
            for r in results
        ]

        log_file = f"logs/results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(log_file, "w") as f:
            json.dump(log_data, f, indent=2)

        logger.info(f"Results log saved: {log_file}")

    def combine_results(self, output_name: str = "combined"):
        """Combine all individual results into one knowledge abstract."""
        logger.info("Combining results...")

        output_dirs = [d for d in Path("output").iterdir() if d.is_dir()]

        if not output_dirs:
            logger.warning("No output directories found")
            return

        # Load first result
        ka = Template.create(self.template, self.language)
        combined = ka.load(str(output_dirs[0]))

        # Merge remaining
        for output_dir in output_dirs[1:]:
            ka = Template.create(self.template, self.language)
            ka.load(str(output_dir))

            # Merge entities and relations
            for entity in ka.data.entities:
                if entity not in combined.data.entities:
                    combined.data.entities.append(entity)

            for relation in ka.data.relations:
                if relation not in combined.data.relations:
                    combined.data.relations.append(relation)

        # Rebuild index and save
        combined.build_index()
        combined_path = Path("output") / output_name
        combined.dump(str(combined_path))

        logger.info(f"Combined knowledge abstract saved: {combined_path}")
        return combined


# CLI Interface
if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Batch Document Processor")
    parser.add_argument("--template", default="general/graph", help="Template to use")
    parser.add_argument("--language", default="en", help="Document language")
    parser.add_argument("--batch-size", type=int, default=10, help="Batch size")
    parser.add_argument("--parallel", action="store_true", help="Enable parallel processing")
    parser.add_argument("--combine", action="store_true", help="Combine results after processing")

    args = parser.parse_args()

    processor = BatchDocumentProcessor(
        template=args.template,
        language=args.language,
        batch_size=args.batch_size
    )

    # Process documents
    results = processor.run(parallel=args.parallel)

    # Combine if requested
    if args.combine:
        processor.combine_results()

Usage

Setup

# Create directory structure
mkdir -p input/pending input/processing input/completed output logs

# Add documents to process
cp /path/to/documents/*.md input/pending/

Run

# Sequential processing
python batch_processor.py

# Parallel processing
python batch_processor.py --parallel --max-workers 4

# With custom template
python batch_processor.py --template finance/earnings_summary

# Process and combine results
python batch_processor.py --combine

Performance Tips

1. Batch Size

# Small batches for memory-constrained environments
processor = BatchDocumentProcessor(batch_size=5)

# Larger batches for I/O bound processing
processor = BatchDocumentProcessor(batch_size=50)

2. Parallel Workers

# CPU-bound: Match CPU cores
processor = BatchDocumentProcessor(max_workers=os.cpu_count())

# API-bound: Limit to avoid rate limits
processor = BatchDocumentProcessor(max_workers=5)

3. Memory Management

# Process in smaller chunks
def process_large_dataset(documents, chunk_size=100):
    for i in range(0, len(documents), chunk_size):
        chunk = documents[i:i + chunk_size]
        results = processor.process_batch(chunk)

        # Save intermediate results
        save_checkpoint(results, i)

        # Clear memory
        gc.collect()

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def process_with_retry(doc_path):
    return processor.process_single_document(doc_path)

Failed Document Queue

def process_with_fallback(documents):
    failed = []

    for doc in documents:
        result = processor.process_single_document(doc)
        if not result.success:
            failed.append(doc)

    # Retry failed documents with different settings
    for doc in failed:
        # Try with smaller chunks
        result = processor.process_single_document(
            doc, 
            chunk_size=1024  # Smaller chunks
        )

Monitoring

Progress Tracking

from tqdm import tqdm

def process_with_progress(documents):
    results = []

    with tqdm(total=len(documents), desc="Processing") as pbar:
        for doc in documents:
            result = processor.process_single_document(doc)
            results.append(result)
            pbar.update(1)

            # Update description
            success_rate = sum(1 for r in results if r.success) / len(results)
            pbar.set_postfix({"success_rate": f"{success_rate:.1%}"})

    return results

Metrics Collection

import time
from dataclasses import asdict

def collect_metrics(results: List[ProcessingResult]):
    metrics = {
        "total_documents": len(results),
        "successful": sum(1 for r in results if r.success),
        "failed": sum(1 for r in results if not r.success),
        "avg_processing_time": sum(r.processing_time for r in results) / len(results),
        "total_time": sum(r.processing_time for r in results),
        "errors": [r.error for r in results if r.error]
    }

    with open("logs/metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)

    return metrics

Advanced: Streaming Processor

For real-time document processing:

import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class StreamingProcessor(FileSystemEventHandler):
    """Process documents as they arrive."""

    def __init__(self):
        self.processor = BatchDocumentProcessor()
        self.queue = asyncio.Queue()

    def on_created(self, event):
        if not event.is_directory:
            self.queue.put_nowait(event.src_path)

    async def process_queue(self):
        while True:
            file_path = await self.queue.get()
            result = self.processor.process_single_document(Path(file_path))

            if result.success:
                logger.info(f"Processed: {file_path}")
            else:
                logger.error(f"Failed: {file_path} - {result.error}")

            self.queue.task_done()

    def start(self, watch_dir: str = "input/pending"):
        # Start file watcher
        observer = Observer()
        observer.schedule(self, watch_dir, recursive=True)
        observer.start()

        # Start processing loop
        asyncio.run(self.process_queue())

Summary

You now have a complete batch processing system that can:

✓ Process large document collections
✓ Handle errors and retries
✓ Run in parallel for speed
✓ Monitor progress and collect metrics
✓ Combine results into unified knowledge abstracts

Next Steps

  • Deploy as a service
  • Add REST API interface
  • Implement real-time streaming
  • Scale with distributed processing

See Also