Enterprise Integration Patterns for AI
Executive Summary
AI systems do not exist in isolation: every AI feature that adds value to an enterprise reads from and writes to existing systems ā EHRs, data warehouses, message queues, identity providers, and business applications. The integration architecture between the AI layer and these systems is frequently the highest-risk, longest-lead component of an enterprise AI project, and the pattern chosen (synchronous, asynchronous, or batch) has direct consequences for system reliability, latency, and the complexity of failure handling. This chapter establishes the foundational integration patterns that all subsequent chapters in this section build upon.
Learning Objectives
- Classify AI integration requirements as synchronous, asynchronous, or batch and justify the selection
- Identify the failure modes specific to each integration pattern
- Design an integration architecture that isolates AI system failures from upstream source systems
- Apply the integration patterns to the Hospital Management System (HMS) scenario
Business Problem
A Reference Healthcare Organization deploying clinical decision support AI faces an integration challenge that is orthogonal to the AI capability itself: how does the AI system receive patient context from the EHR in time for a clinician's decision? How does it return recommendations without blocking the EHR workflow? How does it recover when the AI inference service is unavailable during a critical clinical encounter?
Answering these questions requires choosing and implementing the right integration pattern before any AI model is selected, because the pattern determines end-to-end latency, failure isolation, and operational complexity.
Why Integration Patterns Matter for AI
Traditional application integration patterns are well-established: REST for synchronous web services, message queues for decoupled event processing, ETL for batch data movement. AI introduces two properties that challenge direct application of these patterns:
Variable and long latency: An LLM inference call may return in 1 second for a short completion or 30 seconds for a long analysis. Traditional synchronous REST architectures assume response times under 2 seconds. Synchronous AI calls with 30-second timeouts degrade connection pool utilization and propagate latency throughout the calling system.
Non-deterministic outputs: AI responses are probabilistic. Integration patterns for AI must account for the possibility that the AI returns a high-confidence incorrect answer ā not just an error. This changes the validation logic at the integration boundary.
Core Architecture
Pattern 1 ā Synchronous Request-Response
When to use: The user is waiting for the response. Latency requirements are under 5ā10 seconds. The AI response is needed in real-time to drive the next user action.
Examples in HMS: CDS Hooks clinical decision support (physician must receive recommendation within 5 seconds or EHR times out), medication interaction check at order entry, search-augmented clinical knowledge queries.
import httpx
from typing import Optional
# Educational example ā not for clinical use
async def synchronous_ai_call(
query: str,
patient_context: dict,
timeout_seconds: float = 5.0,
) -> Optional[dict]:
"""
Synchronous AI integration with timeout and fallback.
For CDS Hooks, the EHR mandates a 5-second response SLA.
If the AI service exceeds this, return a safe fallback response
(empty cards) rather than causing the EHR to error.
Educational Example ā Not intended for clinical decision making.
"""
try:
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
response = await client.post(
"https://ai-gateway.reference-enterprise.internal/v1/clinical-rag",
json={
"query": query,
"context": patient_context,
},
headers={"Authorization": f"Bearer {_get_service_token()}"}
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
# CDS Hooks fallback: return empty cards (do not block EHR workflow)
return {"cards": [], "fallback_reason": "ai_timeout"}
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
return {"cards": [], "fallback_reason": "ai_rate_limited"}
return {"cards": [], "fallback_reason": "ai_service_error"}
def _get_service_token() -> str:
"""Retrieve service account token from secrets manager."""
# Implementation: read from AWS Secrets Manager, Azure Key Vault, etc.
...Pattern 2 ā Asynchronous Event-Driven
When to use: The AI processing time exceeds user-tolerable wait times. The caller does not need to remain connected while AI processing occurs. The use case is naturally event-driven (admission triggers AI summary, order triggers prior authorization workflow).
Examples in HMS: Discharge summary generation (triggered by discharge order; physician reviews AI draft within the next hour), prior authorization (triggered by an order; insurance response expected within hours), overnight clinical note summarization.
import json
import uuid
from datetime import datetime
from dataclasses import dataclass
# Educational example ā not for clinical use
@dataclass
class AIJobRequest:
job_id: str
job_type: str # "discharge_summary" | "prior_auth" | "note_summary"
payload: dict
submitted_at: str
callback_url: str # Where to POST the result when complete
@dataclass
class AIJobResult:
job_id: str
status: str # "completed" | "failed" | "timeout"
result: Optional[dict]
error: Optional[str]
completed_at: str
async def submit_async_ai_job(
job_type: str,
payload: dict,
callback_url: str,
kafka_producer,
) -> str:
"""
Submit an AI job to the async processing queue.
Returns job_id immediately; result delivered via callback_url.
"""
job_id = str(uuid.uuid4())
job_request = AIJobRequest(
job_id=job_id,
job_type=job_type,
payload=payload,
submitted_at=datetime.utcnow().isoformat(),
callback_url=callback_url,
)
# Publish to Kafka topic (or SQS, Azure Service Bus, etc.)
await kafka_producer.send(
topic=f"ai-jobs.{job_type}",
key=job_id.encode(),
value=json.dumps(job_request.__dict__).encode(),
)
return job_id
async def ai_job_worker(kafka_consumer, ai_service, http_client):
"""
Consumer side: process jobs from the queue and deliver results.
Educational example ā not for clinical use.
"""
async for message in kafka_consumer:
job_request = AIJobRequest(**json.loads(message.value))
try:
result = await ai_service.process(job_request.job_type, job_request.payload)
job_result = AIJobResult(
job_id=job_request.job_id,
status="completed",
result=result,
error=None,
completed_at=datetime.utcnow().isoformat(),
)
except Exception as e:
job_result = AIJobResult(
job_id=job_request.job_id,
status="failed",
result=None,
error=str(e),
completed_at=datetime.utcnow().isoformat(),
)
# Deliver result via callback
await http_client.post(
job_request.callback_url,
json=job_result.__dict__,
)Pattern 3 ā Batch Processing
When to use: Large volumes of records must be processed on a schedule. Results are consumed analytically, not in real-time. Cost optimization through bulk processing is important.
Examples in HMS: Overnight population health risk scoring, monthly quality measure extraction from clinical notes, weekly prior authorization outcome analysis.
from typing import Iterator
# Educational example ā not for clinical use
def batch_ai_pipeline(
source_records: Iterator[dict],
ai_processor,
output_sink,
batch_size: int = 50,
checkpoint_store = None,
) -> dict:
"""
Batch AI processing with checkpointing for resumability.
If the pipeline fails midway, it can resume from the last checkpoint.
Educational example ā not for clinical use.
"""
processed = 0
failed = 0
batch = []
for record in source_records:
# Skip already-processed records (checkpoint-based deduplication)
if checkpoint_store and checkpoint_store.is_processed(record["id"]):
continue
batch.append(record)
if len(batch) >= batch_size:
results = _process_batch(batch, ai_processor)
output_sink.write_batch(results["successes"])
if checkpoint_store:
checkpoint_store.mark_processed([r["id"] for r in batch])
processed += len(results["successes"])
failed += len(results["failures"])
batch = []
# Process remaining records
if batch:
results = _process_batch(batch, ai_processor)
output_sink.write_batch(results["successes"])
processed += len(results["successes"])
failed += len(results["failures"])
return {"processed": processed, "failed": failed}
def _process_batch(records: list[dict], ai_processor) -> dict:
"""Process a batch of records; separate successes from failures."""
successes, failures = [], []
for record in records:
try:
result = ai_processor.process(record)
successes.append(result)
except Exception as e:
failures.append({"record_id": record.get("id"), "error": str(e)})
return {"successes": successes, "failures": failures}Pattern Selection Guide
| Requirement | Synchronous | Asynchronous | Batch |
|---|---|---|---|
| User is waiting for response | Yes | No | No |
| Response time < 5 seconds | Yes | ā | ā |
| Response time 5sā5min | With caution | Yes | ā |
| Response time > 5 minutes | No | Yes | ā |
| Triggered by user action | Yes | Yes | No |
| Triggered by schedule or event | ā | Yes | Yes |
| Volume: single record | Yes | Yes | ā |
| Volume: 1,000+ records | No | With queuing | Yes |
| PHI handling required | Yes (with TLS) | Yes (with encryption) | Yes (with access control) |
Enterprise Considerations
Failure isolation: The AI layer must never take down the source system. Always implement circuit breakers on synchronous AI calls, and design async consumers so that queue depth growth does not cause backpressure on the source system.
Idempotency in async patterns: Messages may be delivered more than once in at-least-once delivery queues. AI workers must be idempotent: processing the same job_id twice should produce the same result and not duplicate the output (e.g., writing two discharge summaries to the EHR).
Dead letter queues: Async pipelines must route failed jobs to a dead letter queue (DLQ) after exhausting retries. Without a DLQ, failed jobs are silently dropped. The DLQ must be monitored and produce alerts when messages accumulate.
Common Mistakes
1. Using synchronous calls for long-running AI operations. A 30-second AI response on a synchronous call holds an HTTP connection and a thread for the entire duration. Under load, this exhausts connection pools and cascades failures to the calling system.
2. Not implementing fallback for synchronous AI calls. If the AI service is unavailable and the calling system has no fallback, the calling system fails too. For clinical systems, the fallback must be defined before the feature is deployed.
3. No idempotency in async workers. A Kafka consumer that processes the same message twice writes two AI-generated documents to the EHR. Every async AI worker must check whether a job_id has already been processed before executing.
Key Takeaways
- The integration pattern (synchronous, asynchronous, batch) must be selected before the AI model, because it determines end-to-end architecture
- Synchronous AI integration requires explicit timeout and fallback behavior; the AI system must never take down the calling system
- Asynchronous AI integration requires idempotent workers and dead letter queues
- Batch AI integration requires checkpointing for resumability on large datasets
- PHI handling requirements apply to all three patterns: encryption in transit, audit logging, least-privilege access
Further Reading
- Event-Driven AI ā Deep dive on Kafka and event-triggered AI workflows
- EHR Integration Patterns ā FHIR R4 and CDS Hooks integration
- Webhook and Callback Patterns ā Async result delivery
- Healthcare AI Landscape ā HMS integration context