Data Pipelines for AI

Executive Summary

AI systems are only as good as the data they are built on. The data pipeline is the infrastructure that transforms raw source data — clinical notes, policy documents, transactional records, structured databases — into the embeddings, fine-tuning datasets, and evaluation corpora that AI systems consume. At enterprise scale, data pipelines for AI are significantly more complex than traditional ETL pipelines: they must handle unstructured text, manage document versioning, coordinate embedding computation across heterogeneous sources, and maintain data quality in a domain where pipeline failures produce silent quality degradation rather than system errors. This chapter covers the architecture of AI data pipelines from ingestion through embedding and indexing, with production patterns for the healthcare context.

Learning Objectives

  • Design an end-to-end AI data pipeline from source document ingestion through vector store indexing
  • Select appropriate chunking strategies for different document types and use cases
  • Implement pipeline orchestration with error handling, retry logic, and quality validation
  • Design pipeline monitoring that detects silent failures (quality degradation) as well as hard failures
  • Apply healthcare-specific pipeline requirements for PHI handling and document provenance

Business Problem

An AI system that retrieves from a stale or incomplete knowledge base produces worse outcomes than one that retrieves nothing — because the AI presents outdated information with apparent confidence. In clinical settings, a knowledge base that reflects guidelines from 18 months ago will produce clinical recommendations that no longer reflect current standard of care. The data pipeline is what keeps the knowledge base current, complete, and trustworthy.

Traditional ETL pipelines fail silently in AI contexts: a missed document, an incorrect chunk boundary, or a stale embedding produces no error — it simply degrades the AI's output quality in ways that are difficult to detect without continuous evaluation.

Why This Technology Exists

The first RAG systems used manual, one-time document loading: engineers uploaded PDF files to a script that chunked and embedded them. This approach fails in production for three reasons: source documents change (guidelines are updated, formularies are revised), new documents must be added without full re-indexing, and the scale of document collections grows beyond manual management.

Purpose-built AI data pipelines address these requirements: scheduled ingestion that detects new and updated documents, incremental indexing that adds new chunks without re-embedding the entire collection, and quality validation that detects pipeline failures before they reach the production RAG system.

Core Architecture

Components

Document Ingestion and Extraction

python
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
import hashlib

# Educational Example — Not for clinical use

@dataclass
class SourceDocument:
    """Represents a source document before processing."""
    source_id: str                    # Unique identifier for this document source
    title: str
    source_url: str                   # Where to retrieve the document
    source_organization: str         # e.g., "American Heart Association"
    document_type: str               # "guideline" | "formulary" | "protocol" | "policy"
    last_modified: datetime
    content_hash: Optional[str] = None  # SHA-256 of raw content; used for change detection
    
    def compute_content_hash(self, content: bytes) -> str:
        return hashlib.sha256(content).hexdigest()


@dataclass
class ExtractedDocument:
    """Raw text extracted from a source document."""
    source_document: SourceDocument
    raw_text: str
    extraction_method: str           # "pdf_parser" | "html_parser" | "docx_parser"
    extraction_timestamp: datetime
    page_count: Optional[int] = None
    extraction_warnings: list[str] = field(default_factory=list)


class DocumentExtractor:
    """
    Extracts plain text from various document formats.
    Educational example — adapt for specific formats.
    """
    
    def extract(self, source_doc: SourceDocument, content: bytes) -> ExtractedDocument:
        if source_doc.source_url.endswith(".pdf"):
            return self._extract_pdf(source_doc, content)
        elif source_doc.source_url.endswith((".html", ".htm")):
            return self._extract_html(source_doc, content)
        elif source_doc.source_url.endswith(".docx"):
            return self._extract_docx(source_doc, content)
        else:
            raise ValueError(f"Unsupported document format: {source_doc.source_url}")
    
    def _extract_pdf(self, source_doc: SourceDocument, content: bytes) -> ExtractedDocument:
        import pdfplumber  # pdfplumber provides better layout detection than PyPDF2
        import io
        
        warnings = []
        pages = []
        
        with pdfplumber.open(io.BytesIO(content)) as pdf:
            page_count = len(pdf.pages)
            for page in pdf.pages:
                text = page.extract_text()
                if text:
                    pages.append(text)
                else:
                    warnings.append(f"Empty page {page.page_number} — may be scanned image")
        
        return ExtractedDocument(
            source_document=source_doc,
            raw_text="\n\n".join(pages),
            extraction_method="pdf_parser",
            extraction_timestamp=datetime.utcnow(),
            page_count=page_count,
            extraction_warnings=warnings
        )
    
    def _extract_html(self, source_doc: SourceDocument, content: bytes) -> ExtractedDocument:
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(content, "html.parser")
        # Remove navigation, headers, footers, scripts, styles
        for tag in soup.find_all(["nav", "header", "footer", "script", "style"]):
            tag.decompose()
        
        text = soup.get_text(separator="\n", strip=True)
        
        return ExtractedDocument(
            source_document=source_doc,
            raw_text=text,
            extraction_method="html_parser",
            extraction_timestamp=datetime.utcnow()
        )
    
    def _extract_docx(self, source_doc: SourceDocument, content: bytes) -> ExtractedDocument:
        import docx
        import io
        
        doc = docx.Document(io.BytesIO(content))
        paragraphs = [para.text for para in doc.paragraphs if para.text.strip()]
        
        return ExtractedDocument(
            source_document=source_doc,
            raw_text="\n\n".join(paragraphs),
            extraction_method="docx_parser",
            extraction_timestamp=datetime.utcnow()
        )

Chunking Strategies

The chunking strategy determines the granularity of retrieval. Chunks that are too large retrieve too much irrelevant content; chunks that are too small lose surrounding context.

python
from enum import Enum
from typing import Iterator

class ChunkingStrategy(Enum):
    FIXED_SIZE = "fixed_size"           # Split at fixed token count
    SENTENCE = "sentence"               # Split at sentence boundaries
    SECTION_BOUNDARY = "section_boundary"  # Split at document sections (headings)
    SEMANTIC = "semantic"               # Split at semantic shifts (embedding-based)
    RECOMMENDATION_UNIT = "recommendation_unit"  # Clinical: one recommendation per chunk

@dataclass
class DocumentChunk:
    """A chunk ready for embedding."""
    chunk_id: str
    parent_document_id: str
    content: str
    chunk_index: int
    token_count: int
    metadata: dict    # source, document_type, effective_date, section, etc.


def chunk_clinical_guideline(
    document: ExtractedDocument,
    strategy: ChunkingStrategy = ChunkingStrategy.SECTION_BOUNDARY,
    max_chunk_tokens: int = 400,
    overlap_tokens: int = 50,
) -> list[DocumentChunk]:
    """
    Chunk a clinical guideline document.
    
    Section-boundary chunking is preferred for guidelines: it keeps
    recommendation text (which is the unit of clinical utility) intact.
    
    Educational example — not for clinical use.
    """
    import re
    
    if strategy == ChunkingStrategy.SECTION_BOUNDARY:
        # Split on section headings (common patterns in clinical guidelines)
        section_pattern = r'\n(?=(?:\d+\.\d*\s|[A-Z][A-Z\s]{3,}:|\bSection\b|\bChapter\b))'
        sections = re.split(section_pattern, document.raw_text)
        
        chunks = []
        for i, section in enumerate(sections):
            if not section.strip():
                continue
            
            # Extract section heading if present
            lines = section.strip().split('\n')
            heading = lines[0].strip() if lines else ""
            
            # If section is too long, further split at paragraph boundaries
            if len(section.split()) > max_chunk_tokens * 1.5:
                sub_chunks = _split_at_paragraphs(section, max_chunk_tokens, overlap_tokens)
            else:
                sub_chunks = [section]
            
            for j, chunk_text in enumerate(sub_chunks):
                chunks.append(DocumentChunk(
                    chunk_id=f"{document.source_document.source_id}_{i}_{j}",
                    parent_document_id=document.source_document.source_id,
                    content=chunk_text.strip(),
                    chunk_index=len(chunks),
                    token_count=len(chunk_text.split()),  # approximate
                    metadata={
                        "source": document.source_document.source_organization,
                        "document_type": document.source_document.document_type,
                        "effective_date": document.source_document.last_modified.isoformat(),
                        "section": heading,
                        "title": document.source_document.title,
                    }
                ))
        
        return chunks
    
    raise NotImplementedError(f"Strategy {strategy} not implemented in this example")


def _split_at_paragraphs(
    text: str,
    max_tokens: int,
    overlap_tokens: int
) -> list[str]:
    """Split long section text at paragraph boundaries with overlap."""
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    
    chunks = []
    current_chunk = []
    current_size = 0
    
    for para in paragraphs:
        para_size = len(para.split())
        
        if current_size + para_size > max_tokens and current_chunk:
            chunks.append('\n\n'.join(current_chunk))
            # Overlap: keep last paragraph for context
            overlap = current_chunk[-1:] if current_chunk else []
            current_chunk = overlap
            current_size = sum(len(p.split()) for p in current_chunk)
        
        current_chunk.append(para)
        current_size += para_size
    
    if current_chunk:
        chunks.append('\n\n'.join(current_chunk))
    
    return chunks

Pipeline Orchestration

python
from dataclasses import dataclass
import asyncio
from typing import Callable

@dataclass
class PipelineResult:
    documents_processed: int
    chunks_created: int
    chunks_embedded: int
    chunks_indexed: int
    errors: list[dict]
    skipped_unchanged: int
    duration_seconds: float


class AIDataPipeline:
    """
    Orchestrates the full ingestion pipeline:
    Source → Extract → Chunk → Embed → Validate → Index
    
    Educational example — not for clinical use.
    """
    
    def __init__(
        self,
        extractor: DocumentExtractor,
        embedding_client,
        vector_store,
        document_store,
        audit_log,
    ):
        self.extractor = extractor
        self.embedding_client = embedding_client
        self.vector_store = vector_store
        self.document_store = document_store
        self.audit_log = audit_log
    
    async def run_incremental(
        self,
        source_documents: list[SourceDocument],
        chunking_strategy: ChunkingStrategy = ChunkingStrategy.SECTION_BOUNDARY,
        batch_size: int = 10,       # Embed in batches to avoid API rate limits
    ) -> PipelineResult:
        """
        Run incremental pipeline: only process new or changed documents.
        """
        import time
        start = time.time()
        result = PipelineResult(0, 0, 0, 0, [], 0, 0)
        
        for source_doc in source_documents:
            try:
                # Change detection: skip if document unchanged
                existing_hash = await self.document_store.get_hash(source_doc.source_id)
                if existing_hash == source_doc.content_hash:
                    result.skipped_unchanged += 1
                    continue
                
                # Fetch and extract
                content = await self._fetch_document(source_doc.source_url)
                extracted = self.extractor.extract(source_doc, content)
                result.documents_processed += 1
                
                # Chunk
                chunks = chunk_clinical_guideline(extracted, chunking_strategy)
                result.chunks_created += len(chunks)
                
                # Validate chunks
                valid_chunks = self._validate_chunks(chunks)
                
                # Embed in batches
                for i in range(0, len(valid_chunks), batch_size):
                    batch = valid_chunks[i:i + batch_size]
                    embeddings = await self.embedding_client.embed_batch(
                        [c.content for c in batch]
                    )
                    result.chunks_embedded += len(batch)
                    
                    # Index
                    await self.vector_store.upsert_batch(
                        [(chunk, embedding) for chunk, embedding in zip(batch, embeddings)]
                    )
                    result.chunks_indexed += len(batch)
                
                # Update document store and hash
                await self.document_store.upsert(source_doc, extracted)
                
            except Exception as e:
                result.errors.append({
                    "source_id": source_doc.source_id,
                    "error": str(e),
                    "error_type": type(e).__name__
                })
                await self.audit_log.record_error(source_doc.source_id, e)
        
        result.duration_seconds = time.time() - start
        await self.audit_log.record_run(result)
        return result
    
    def _validate_chunks(self, chunks: list[DocumentChunk]) -> list[DocumentChunk]:
        """Validate chunk quality before embedding."""
        valid = []
        for chunk in chunks:
            if len(chunk.content.strip()) < 50:  # Skip very short chunks
                continue
            if chunk.token_count > 1000:  # Skip very long chunks
                continue
            required_metadata = ["source", "document_type", "effective_date"]
            if not all(k in chunk.metadata for k in required_metadata):
                continue
            valid.append(chunk)
        return valid
    
    async def _fetch_document(self, url: str) -> bytes:
        import httpx
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.get(url)
            response.raise_for_status()
            return response.content

Enterprise Considerations

Pipeline scheduling: Clinical knowledge bases require update cadences driven by clinical safety requirements:

  • Hospital formulary: within 5 business days of any formulary change (patient safety)
  • Clinical guidelines: quarterly or on publication of major updates
  • Hospital protocols: within 5 business days of committee approval
  • Drug interaction database: continuous (licensed data feed)

Orphan chunk cleanup: When a source document is updated or removed, the old chunks must be deleted from the vector store. Failure to delete creates a mix of current and outdated content that degrades retrieval quality and may produce incorrect clinical recommendations.

Embedding model migration: When the embedding model is upgraded, all existing chunks must be re-embedded with the new model before the new model is used at query time. A phased migration maintains a shadow index during re-embedding and switches query traffic atomically.

Pipeline observability: Silent failures are the primary risk in AI data pipelines. Monitor: document ingestion rate, chunk validation rejection rate, embedding API error rate, index size growth (should grow monotonically), and query-time retrieval quality (via periodic golden query evaluation).

Healthcare Considerations

PHI in source documents: Clinical documents may contain PHI. The pipeline must handle PHI appropriately: avoid indexing patient-identifying information in the vector store (policy documents and clinical guidelines generally do not contain patient PHI; patient-specific clinical notes should not be indexed in a shared knowledge base). If the source documents contain PHI that must be indexed, treat the entire vector store as a HIPAA data store.

Document provenance for citations: Clinical RAG citations must include the source organization, document title, section, and effective date so clinicians can verify the recommendation. All this metadata must be captured during ingestion and stored with each chunk.

Version management for clinical safety: When a guideline is superseded, both the new and old version may temporarily exist in the index during the transition. The pipeline must atomically replace old chunks with new chunks to avoid mixed-version retrieval.

Common Mistakes

1. Chunking at fixed token sizes without regard for semantic boundaries. Fixed-size chunking frequently splits a clinical recommendation in the middle, creating chunks that lack context. Always try section-boundary chunking first for structured clinical documents.

2. Not implementing orphan chunk cleanup. When a document is updated, the old chunks remain in the index. The vector store now contains both the old and new version of the guideline. The AI may retrieve either, producing inconsistent recommendations.

3. Not validating chunk metadata completeness. Chunks missing effective_date cannot be filtered by date in retrieval. Validate metadata completeness before indexing.

4. Embedding batch size too large. Large embedding batches take longer to process and are harder to retry on failure. Batch size of 10–20 documents provides a good balance of throughput and error recovery.

5. No quality monitoring on pipeline output. Monitoring only for pipeline errors (exceptions, HTTP errors) misses quality failures: chunks that are too short, chunks with missing sections, chunks with incorrect metadata.

Best Practices

  • Use section-boundary chunking for structured clinical documents; fixed-size with overlap for unstructured text
  • Implement change detection (content hash) to skip unchanged documents in incremental runs
  • Validate chunk quality (size, metadata completeness) before embedding
  • Delete orphan chunks when source documents are updated or removed
  • Monitor pipeline output quality, not just pipeline errors
  • Establish update cadences driven by clinical safety requirements (formulary: 5 business days; guidelines: quarterly)
  • Never mix patient-specific clinical notes into shared knowledge base indexes

Trade-offs

Chunk size: Smaller chunks are more precise but lose context. Larger chunks retain context but reduce retrieval precision. The right chunk size is use-case and document-type specific; benchmark against the target use case.

Incremental vs. full re-index: Incremental updates are faster but accumulate complexity (orphan chunks, version mixing). Full re-indexing is slower but produces a clean index. Schedule full re-indexing quarterly for production clinical knowledge bases.

Interview Questions

Q: A clinical RAG system occasionally retrieves outdated treatment guidelines even though new guidelines have been indexed. What are the most likely causes and how would you fix them?

Category: System Design Difficulty: Senior Role: AI Architect

Answer Framework:

Three root causes in order of likelihood:

Orphan chunks: The old guideline chunks were not deleted when the new version was indexed. Both old and new exist in the vector store; depending on query similarity, either may be retrieved. Fix: implement atomic version replacement — delete all old chunks for a document ID before indexing new chunks, in a single transaction.

Metadata date filter not applied: The retrieval query does not filter by effective<em>date, so old chunks can rank higher than new chunks based on embedding similarity alone (old phrasing may match queries better if the terminology changed). Fix: apply effective</em>date &gt;= [cutoff] filter in retrieval; or version-tag chunks and filter by current version only.

Embedding similarity favoring older phrasing: If the guideline changed its recommended terminology (e.g., new dosing terminology), the new chunk may score lower on similarity to queries that use the old terminology. Fix: query expansion to include both old and new clinical terms, or a hybrid retrieval approach combining semantic and keyword search.

Key Points to Hit:

  • Orphan chunks as the most common root cause
  • Atomic replacement pattern as the fix
  • Metadata date filtering as defense in depth
  • Embedding similarity as a secondary cause

Key Takeaways

  • AI data pipelines must handle document versioning, orphan chunk cleanup, and incremental updates — not just initial ingestion
  • Section-boundary chunking outperforms fixed-size chunking for structured clinical documents
  • Silent quality failures are the primary risk: pipeline monitoring must include output quality checks, not just error rates
  • Clinical knowledge base update cadences are patient safety requirements, not operational preferences
  • PHI in source documents requires treating the entire pipeline (and vector store) as a HIPAA data store
  • Embedding model migration requires re-indexing all chunks before switching query traffic

Glossary

Incremental Pipeline: A pipeline that processes only new or changed documents, skipping unchanged documents using content hash comparison.

Orphan Chunk: A chunk in the vector store from a document version that has been superseded but not deleted during re-indexing.

Content Hash: A SHA-256 (or similar) hash of a document's raw content used to detect changes without downloading the full document on every pipeline run.

Chunking Strategy: The algorithm used to split documents into retrievable units (chunks) for indexing in the vector store.

Further Reading