Enterprise Integration Patterns for AI

Executive Summary

AI systems do not exist in isolation: every AI feature that adds value to an enterprise reads from and writes to existing systems — EHRs, data warehouses, message queues, identity providers, and business applications. The integration architecture between the AI layer and these systems is frequently the highest-risk, longest-lead component of an enterprise AI project, and the pattern chosen (synchronous, asynchronous, or batch) has direct consequences for system reliability, latency, and the complexity of failure handling. This chapter establishes the foundational integration patterns that all subsequent chapters in this section build upon.

Learning Objectives

  • Classify AI integration requirements as synchronous, asynchronous, or batch and justify the selection
  • Identify the failure modes specific to each integration pattern
  • Design an integration architecture that isolates AI system failures from upstream source systems
  • Apply the integration patterns to the Hospital Management System (HMS) scenario

Business Problem

A Reference Healthcare Organization deploying clinical decision support AI faces an integration challenge that is orthogonal to the AI capability itself: how does the AI system receive patient context from the EHR in time for a clinician's decision? How does it return recommendations without blocking the EHR workflow? How does it recover when the AI inference service is unavailable during a critical clinical encounter?

Answering these questions requires choosing and implementing the right integration pattern before any AI model is selected, because the pattern determines end-to-end latency, failure isolation, and operational complexity.

Why Integration Patterns Matter for AI

Traditional application integration patterns are well-established: REST for synchronous web services, message queues for decoupled event processing, ETL for batch data movement. AI introduces two properties that challenge direct application of these patterns:

Variable and long latency: An LLM inference call may return in 1 second for a short completion or 30 seconds for a long analysis. Traditional synchronous REST architectures assume response times under 2 seconds. Synchronous AI calls with 30-second timeouts degrade connection pool utilization and propagate latency throughout the calling system.

Non-deterministic outputs: AI responses are probabilistic. Integration patterns for AI must account for the possibility that the AI returns a high-confidence incorrect answer — not just an error. This changes the validation logic at the integration boundary.

Core Architecture

Pattern 1 — Synchronous Request-Response

When to use: The user is waiting for the response. Latency requirements are under 5–10 seconds. The AI response is needed in real-time to drive the next user action.

Examples in HMS: CDS Hooks clinical decision support (physician must receive recommendation within 5 seconds or EHR times out), medication interaction check at order entry, search-augmented clinical knowledge queries.

python
import httpx
from typing import Optional

# Educational example — not for clinical use

async def synchronous_ai_call(
    query: str,
    patient_context: dict,
    timeout_seconds: float = 5.0,
) -> Optional[dict]:
    """
    Synchronous AI integration with timeout and fallback.
    
    For CDS Hooks, the EHR mandates a 5-second response SLA.
    If the AI service exceeds this, return a safe fallback response
    (empty cards) rather than causing the EHR to error.
    
    Educational Example — Not intended for clinical decision making.
    """
    try:
        async with httpx.AsyncClient(timeout=timeout_seconds) as client:
            response = await client.post(
                "https://ai-gateway.reference-enterprise.internal/v1/clinical-rag",
                json={
                    "query": query,
                    "context": patient_context,
                },
                headers={"Authorization": f"Bearer {_get_service_token()}"}
            )
            response.raise_for_status()
            return response.json()
    
    except httpx.TimeoutException:
        # CDS Hooks fallback: return empty cards (do not block EHR workflow)
        return {"cards": [], "fallback_reason": "ai_timeout"}
    
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 429:
            return {"cards": [], "fallback_reason": "ai_rate_limited"}
        return {"cards": [], "fallback_reason": "ai_service_error"}


def _get_service_token() -> str:
    """Retrieve service account token from secrets manager."""
    # Implementation: read from AWS Secrets Manager, Azure Key Vault, etc.
    ...

Pattern 2 — Asynchronous Event-Driven

When to use: The AI processing time exceeds user-tolerable wait times. The caller does not need to remain connected while AI processing occurs. The use case is naturally event-driven (admission triggers AI summary, order triggers prior authorization workflow).

Examples in HMS: Discharge summary generation (triggered by discharge order; physician reviews AI draft within the next hour), prior authorization (triggered by an order; insurance response expected within hours), overnight clinical note summarization.

python
import json
import uuid
from datetime import datetime
from dataclasses import dataclass

# Educational example — not for clinical use

@dataclass
class AIJobRequest:
    job_id: str
    job_type: str           # "discharge_summary" | "prior_auth" | "note_summary"
    payload: dict
    submitted_at: str
    callback_url: str       # Where to POST the result when complete


@dataclass
class AIJobResult:
    job_id: str
    status: str             # "completed" | "failed" | "timeout"
    result: Optional[dict]
    error: Optional[str]
    completed_at: str


async def submit_async_ai_job(
    job_type: str,
    payload: dict,
    callback_url: str,
    kafka_producer,
) -> str:
    """
    Submit an AI job to the async processing queue.
    Returns job_id immediately; result delivered via callback_url.
    """
    job_id = str(uuid.uuid4())
    
    job_request = AIJobRequest(
        job_id=job_id,
        job_type=job_type,
        payload=payload,
        submitted_at=datetime.utcnow().isoformat(),
        callback_url=callback_url,
    )
    
    # Publish to Kafka topic (or SQS, Azure Service Bus, etc.)
    await kafka_producer.send(
        topic=f"ai-jobs.{job_type}",
        key=job_id.encode(),
        value=json.dumps(job_request.__dict__).encode(),
    )
    
    return job_id


async def ai_job_worker(kafka_consumer, ai_service, http_client):
    """
    Consumer side: process jobs from the queue and deliver results.
    Educational example — not for clinical use.
    """
    async for message in kafka_consumer:
        job_request = AIJobRequest(**json.loads(message.value))
        
        try:
            result = await ai_service.process(job_request.job_type, job_request.payload)
            
            job_result = AIJobResult(
                job_id=job_request.job_id,
                status="completed",
                result=result,
                error=None,
                completed_at=datetime.utcnow().isoformat(),
            )
        except Exception as e:
            job_result = AIJobResult(
                job_id=job_request.job_id,
                status="failed",
                result=None,
                error=str(e),
                completed_at=datetime.utcnow().isoformat(),
            )
        
        # Deliver result via callback
        await http_client.post(
            job_request.callback_url,
            json=job_result.__dict__,
        )

Pattern 3 — Batch Processing

When to use: Large volumes of records must be processed on a schedule. Results are consumed analytically, not in real-time. Cost optimization through bulk processing is important.

Examples in HMS: Overnight population health risk scoring, monthly quality measure extraction from clinical notes, weekly prior authorization outcome analysis.

python
from typing import Iterator

# Educational example — not for clinical use

def batch_ai_pipeline(
    source_records: Iterator[dict],
    ai_processor,
    output_sink,
    batch_size: int = 50,
    checkpoint_store = None,
) -> dict:
    """
    Batch AI processing with checkpointing for resumability.
    If the pipeline fails midway, it can resume from the last checkpoint.
    
    Educational example — not for clinical use.
    """
    processed = 0
    failed = 0
    
    batch = []
    for record in source_records:
        # Skip already-processed records (checkpoint-based deduplication)
        if checkpoint_store and checkpoint_store.is_processed(record["id"]):
            continue
        
        batch.append(record)
        
        if len(batch) >= batch_size:
            results = _process_batch(batch, ai_processor)
            output_sink.write_batch(results["successes"])
            
            if checkpoint_store:
                checkpoint_store.mark_processed([r["id"] for r in batch])
            
            processed += len(results["successes"])
            failed += len(results["failures"])
            batch = []
    
    # Process remaining records
    if batch:
        results = _process_batch(batch, ai_processor)
        output_sink.write_batch(results["successes"])
        processed += len(results["successes"])
        failed += len(results["failures"])
    
    return {"processed": processed, "failed": failed}


def _process_batch(records: list[dict], ai_processor) -> dict:
    """Process a batch of records; separate successes from failures."""
    successes, failures = [], []
    for record in records:
        try:
            result = ai_processor.process(record)
            successes.append(result)
        except Exception as e:
            failures.append({"record_id": record.get("id"), "error": str(e)})
    return {"successes": successes, "failures": failures}

Pattern Selection Guide

Requirement Synchronous Asynchronous Batch
User is waiting for response Yes No No
Response time < 5 seconds Yes — —
Response time 5s–5min With caution Yes —
Response time > 5 minutes No Yes —
Triggered by user action Yes Yes No
Triggered by schedule or event — Yes Yes
Volume: single record Yes Yes —
Volume: 1,000+ records No With queuing Yes
PHI handling required Yes (with TLS) Yes (with encryption) Yes (with access control)

Enterprise Considerations

Failure isolation: The AI layer must never take down the source system. Always implement circuit breakers on synchronous AI calls, and design async consumers so that queue depth growth does not cause backpressure on the source system.

Idempotency in async patterns: Messages may be delivered more than once in at-least-once delivery queues. AI workers must be idempotent: processing the same job_id twice should produce the same result and not duplicate the output (e.g., writing two discharge summaries to the EHR).

Dead letter queues: Async pipelines must route failed jobs to a dead letter queue (DLQ) after exhausting retries. Without a DLQ, failed jobs are silently dropped. The DLQ must be monitored and produce alerts when messages accumulate.

Common Mistakes

1. Using synchronous calls for long-running AI operations. A 30-second AI response on a synchronous call holds an HTTP connection and a thread for the entire duration. Under load, this exhausts connection pools and cascades failures to the calling system.

2. Not implementing fallback for synchronous AI calls. If the AI service is unavailable and the calling system has no fallback, the calling system fails too. For clinical systems, the fallback must be defined before the feature is deployed.

3. No idempotency in async workers. A Kafka consumer that processes the same message twice writes two AI-generated documents to the EHR. Every async AI worker must check whether a job_id has already been processed before executing.

Key Takeaways

  • The integration pattern (synchronous, asynchronous, batch) must be selected before the AI model, because it determines end-to-end architecture
  • Synchronous AI integration requires explicit timeout and fallback behavior; the AI system must never take down the calling system
  • Asynchronous AI integration requires idempotent workers and dead letter queues
  • Batch AI integration requires checkpointing for resumability on large datasets
  • PHI handling requirements apply to all three patterns: encryption in transit, audit logging, least-privilege access

Further Reading