EHR Integration Patterns

Executive Summary

The Electronic Health Record (EHR) is the primary data source for clinical AI and the primary system of record into which AI outputs are written. EHR integration for AI requires mastering three distinct protocols: FHIR R4 for structured data access, HL7 v2 for legacy ADT event feeds, and CDS Hooks for inline clinical decision support at the point of care. Each protocol has distinct authorization patterns, data models, and integration constraints. This chapter provides the integration patterns for connecting AI systems to EHR infrastructure, with working examples from the HMS scenario.

Learning Objectives

  • Design FHIR R4 read and write operations for clinical AI context retrieval and result persistence
  • Implement CDS Hooks services that meet the 5-second EHR timeout requirement
  • Handle SMART on FHIR authorization for AI-powered clinical applications
  • Map HL7 v2 ADT messages to Kafka events for real-time clinical AI triggers

Business Problem

A Reference Healthcare Organization deploying clinical decision support AI faces a fundamental integration problem: the clinical data needed to inform AI recommendations lives in the EHR (patient demographics, diagnoses, medications, lab results, encounters), but the AI inference layer runs outside the EHR. This data must be retrieved in real time, used to augment the AI prompt, and the AI's output must be written back to the EHR in a format that clinicians can access within their normal workflow.

FHIR R4 is the standard solution for this read/write integration. CDS Hooks is the standard mechanism for surfacing AI recommendations inline within the EHR clinical workflow. HL7 v2 ADT feeds are the legacy (but still dominant) mechanism for real-time event notification from inpatient EHR systems.

FHIR R4 Integration

python
import httpx
from dataclasses import dataclass
from typing import Optional
from datetime import datetime

# Educational example — not for clinical use

@dataclass
class PatientClinicalContext:
    """Structured clinical context assembled from FHIR R4 resources."""
    patient_id: str
    patient_name: str
    date_of_birth: str
    encounter_id: str
    encounter_class: str                # "inpatient" | "outpatient" | "emergency"
    encounter_start: str
    primary_diagnosis_codes: list[str]  # ICD-10-CM codes
    active_medications: list[dict]      # [{display, code, dose}]
    recent_labs: list[dict]            # [{code, display, value, unit, date}]
    allergies: list[dict]               # [{substance, reaction, severity}]


class FHIRContextClient:
    """
    FHIR R4 client for assembling patient clinical context for AI prompting.
    
    Educational Example — Not intended for clinical decision making.
    Requires SMART on FHIR authorization — see 06-identity-and-access.md.
    """
    
    def __init__(self, fhir_base_url: str, access_token: str):
        self.base_url = fhir_base_url.rstrip("/")
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/fhir+json",
        }
    
    async def get_encounter_context(
        self,
        patient_id: str,
        encounter_id: str,
    ) -> PatientClinicalContext:
        """
        Assemble complete clinical context from FHIR for a given encounter.
        Makes parallel FHIR requests to minimize latency.
        
        Educational example — not for clinical use.
        """
        async with httpx.AsyncClient(
            base_url=self.base_url,
            headers=self.headers,
            timeout=10.0,
        ) as client:
            # Execute FHIR requests in parallel
            patient_resp, encounter_resp, conditions_resp, meds_resp, labs_resp, allergies_resp = (
                await asyncio.gather(
                    client.get(f"/Patient/{patient_id}"),
                    client.get(f"/Encounter/{encounter_id}"),
                    client.get(f"/Condition?patient={patient_id}&clinical-status=active&_count=20"),
                    client.get(f"/MedicationRequest?patient={patient_id}&status=active&_count=30"),
                    client.get(
                        f"/Observation?patient={patient_id}&category=laboratory"
                        f"&date=ge{_thirty_days_ago()}&_count=50&_sort=-date"
                    ),
                    client.get(f"/AllergyIntolerance?patient={patient_id}&_count=20"),
                    return_exceptions=True
                )
            )
            
            patient = patient_resp.json()
            encounter = encounter_resp.json()
            
            return PatientClinicalContext(
                patient_id=patient_id,
                patient_name=_extract_patient_name(patient),
                date_of_birth=patient.get("birthDate", ""),
                encounter_id=encounter_id,
                encounter_class=encounter.get("class", {}).get("code", ""),
                encounter_start=encounter.get("period", {}).get("start", ""),
                primary_diagnosis_codes=_extract_condition_codes(conditions_resp),
                active_medications=_extract_medications(meds_resp),
                recent_labs=_extract_observations(labs_resp),
                allergies=_extract_allergies(allergies_resp),
            )
    
    async def write_ai_document(
        self,
        patient_id: str,
        encounter_id: str,
        document_title: str,
        document_content: str,
        document_type_code: str,    # LOINC code, e.g., "18842-5" for Discharge Summary
    ) -> str:
        """
        Write an AI-generated document to the EHR as a FHIR DocumentReference.
        Returns the created resource ID.
        
        Educational Example — Not intended for clinical decision making.
        """
        import base64
        
        document_reference = {
            "resourceType": "DocumentReference",
            "status": "current",
            "docStatus": "preliminary",    # AI drafts are "preliminary" pending physician review
            "type": {
                "coding": [{
                    "system": "http://loinc.org",
                    "code": document_type_code,
                    "display": document_title,
                }]
            },
            "subject": {"reference": f"Patient/{patient_id}"},
            "context": {"encounter": [{"reference": f"Encounter/{encounter_id}"}]},
            "date": datetime.utcnow().isoformat() + "Z",
            "content": [{
                "attachment": {
                    "contentType": "text/plain",
                    "data": base64.b64encode(document_content.encode()).decode(),
                    "title": document_title,
                }
            }],
            "extension": [{
                "url": "https://reference-enterprise.example/fhir/StructureDefinition/ai-generated",
                "valueBoolean": True,
            }],
        }
        
        async with httpx.AsyncClient(base_url=self.base_url, headers=self.headers) as client:
            response = await client.post("/DocumentReference", json=document_reference)
            response.raise_for_status()
            return response.json()["id"]


def _thirty_days_ago() -> str:
    from datetime import timedelta
    return (datetime.utcnow() - timedelta(days=30)).strftime("%Y-%m-%d")


def _extract_patient_name(patient: dict) -> str:
    names = patient.get("name", [])
    if names:
        name = names[0]
        family = name.get("family", "")
        given = " ".join(name.get("given", []))
        return f"{given} {family}".strip()
    return "Unknown"


def _extract_condition_codes(conditions_resp) -> list[str]:
    if isinstance(conditions_resp, Exception):
        return []
    bundle = conditions_resp.json()
    codes = []
    for entry in bundle.get("entry", []):
        resource = entry.get("resource", {})
        for coding in resource.get("code", {}).get("coding", []):
            codes.append(f"{coding.get('code', '')} ({coding.get('display', '')})")
    return codes


def _extract_medications(meds_resp) -> list[dict]:
    if isinstance(meds_resp, Exception):
        return []
    bundle = meds_resp.json()
    medications = []
    for entry in bundle.get("entry", []):
        resource = entry.get("resource", {})
        medication = resource.get("medicationCodeableConcept", {})
        medications.append({
            "display": medication.get("text", ""),
            "code": next((c.get("code") for c in medication.get("coding", [])), ""),
        })
    return medications


def _extract_observations(labs_resp) -> list[dict]:
    if isinstance(labs_resp, Exception):
        return []
    bundle = labs_resp.json()
    observations = []
    for entry in bundle.get("entry", []):
        resource = entry.get("resource", {})
        observations.append({
            "code": next(
                (c.get("code") for c in resource.get("code", {}).get("coding", [])), ""
            ),
            "display": resource.get("code", {}).get("text", ""),
            "value": resource.get("valueQuantity", {}).get("value"),
            "unit": resource.get("valueQuantity", {}).get("unit", ""),
            "date": resource.get("effectiveDateTime", ""),
        })
    return observations


def _extract_allergies(allergies_resp) -> list[dict]:
    if isinstance(allergies_resp, Exception):
        return []
    bundle = allergies_resp.json()
    allergies = []
    for entry in bundle.get("entry", []):
        resource = entry.get("resource", {})
        substance = resource.get("code", {}).get("text", "")
        reactions = [
            r.get("manifestation", [{}])[0].get("text", "")
            for r in resource.get("reaction", [])
        ]
        allergies.append({
            "substance": substance,
            "reactions": reactions,
            "criticality": resource.get("criticality", ""),
        })
    return allergies

CDS Hooks Integration

CDS Hooks is the EHR-native mechanism for surfacing AI recommendations inline within the clinical workflow. The AI system registers as a CDS Hook service; the EHR calls it on workflow events such as patient chart open or order entry.

python
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional

# Educational example — not for clinical use

class CDSRequest(BaseModel):
    """Incoming CDS Hooks request from the EHR."""
    hookInstance: str
    hook: str           # "patient-view" | "order-select" | "order-sign"
    fhirServer: str
    fhirAuthorization: Optional[dict]
    context: dict
    prefetch: Optional[dict]


class CDSCard(BaseModel):
    summary: str
    detail: Optional[str]
    indicator: str      # "info" | "warning" | "critical"
    source: dict
    links: Optional[list[dict]]
    suggestions: Optional[list[dict]]


class CDSResponse(BaseModel):
    cards: list[CDSCard]


app = FastAPI()


@app.get("/cds-services")
async def cds_discovery():
    """
    CDS Hooks discovery endpoint.
    The EHR calls this at configuration time to discover available services.
    """
    return {
        "services": [
            {
                "hook": "patient-view",
                "id": "clinical-risk-assessment",
                "title": "AI Clinical Risk Assessment",
                "description": (
                    "Educational Example — Not for clinical use. "
                    "Provides AI-generated risk assessment when a patient chart is opened."
                ),
                "prefetch": {
                    "patient": "Patient/{{context.patientId}}",
                    "encounters": "Encounter?patient={{context.patientId}}&_count=5",
                }
            }
        ]
    }


@app.post("/cds-services/clinical-risk-assessment")
async def patient_view_hook(request: CDSRequest) -> CDSResponse:
    """
    CDS Hook handler for patient-view.
    
    EHR SLA requirement: respond within 5 seconds or the EHR will timeout.
    Implement timeout handling that returns empty cards on timeout.
    
    Educational Example — Not intended for clinical decision making.
    """
    import asyncio
    
    try:
        # Run AI assessment with strict 4.5-second timeout
        # (0.5 second buffer before EHR's 5-second hard timeout)
        cards = await asyncio.wait_for(
            _generate_risk_cards(request),
            timeout=4.5
        )
        return CDSResponse(cards=cards)
    
    except asyncio.TimeoutError:
        # Return empty cards — do NOT block the EHR workflow
        return CDSResponse(cards=[])
    
    except Exception as e:
        # Log error for investigation; never surface internal errors to EHR
        logger.error(f"CDS Hook error for patient {request.context.get('patientId')}: {e}")
        return CDSResponse(cards=[])


async def _generate_risk_cards(request: CDSRequest) -> list[CDSCard]:
    """
    Generate CDS cards from AI risk assessment.
    
    Educational Example — Not intended for clinical decision making.
    """
    patient_id = request.context.get("patientId")
    
    # Use prefetched FHIR resources if available; avoid second FHIR round-trip
    patient = request.prefetch.get("patient") if request.prefetch else None
    
    # Build clinical context (from prefetch or FHIR call)
    if not patient:
        fhir_client = FHIRContextClient(
            request.fhirServer,
            request.fhirAuthorization.get("access_token", "") if request.fhirAuthorization else ""
        )
        context = await fhir_client.get_encounter_context(
            patient_id,
            request.context.get("encounterId", "")
        )
    else:
        # Parse from prefetch
        context = patient
    
    # Generate AI risk assessment (abbreviated example)
    risk_level = "info"   # AI determines: "info" | "warning" | "critical"
    summary = "AI risk assessment complete."  # AI generates actual summary
    
    cards = []
    if summary:
        cards.append(CDSCard(
            summary=summary,
            detail=(
                "Educational Example — Not for clinical decision making. "
                "This card illustrates the CDS Hooks integration pattern only."
            ),
            indicator=risk_level,
            source={
                "label": "Reference AI Platform",
                "url": "https://ai-platform.reference-enterprise.internal",
            }
        ))
    
    return cards

HL7 v2 ADT to Kafka Bridge

python
import hl7
from aiokafka import AIOKafkaProducer
import json

# Educational example — not for clinical use

class HL7ADTBridge:
    """
    Converts HL7 v2 ADT messages to structured JSON events on Kafka.
    
    HL7 v2 ADT messages are the primary real-time event feed from
    inpatient EHR systems. This bridge normalizes them into a
    structured event format suitable for AI event consumers.
    
    Educational example — not for clinical use.
    """
    
    ADT_EVENT_MAP = {
        "A01": "admission",
        "A02": "transfer",
        "A03": "discharge",
        "A08": "patient_update",
        "A11": "admission_cancelled",
        "A13": "discharge_cancelled",
    }
    
    def __init__(self, kafka_producer: AIOKafkaProducer):
        self.producer = kafka_producer
    
    async def process_hl7_message(self, raw_hl7: str) -> None:
        """Parse HL7 v2 message and publish structured event to Kafka."""
        message = hl7.parse(raw_hl7)
        
        msh = message["MSH"][0]
        evn = message["EVN"][0] if "EVN" in message else None
        pid = message["PID"][0] if "PID" in message else None
        pv1 = message["PV1"][0] if "PV1" in message else None
        
        trigger_event = str(msh[9][0][1])  # MSH-9.2 = trigger event type
        event_type = self.ADT_EVENT_MAP.get(trigger_event, "unknown")
        
        patient_id = str(pid[3][0][0]) if pid else None    # PID-3.1 = MRN
        encounter_id = str(pv1[19][0]) if pv1 else None   # PV1-19 = visit number
        
        event = {
            "event_type": event_type,
            "hl7_trigger": trigger_event,
            "patient_id": patient_id,
            "encounter_id": encounter_id,
            "event_timestamp": str(msh[7]) if msh[7] else "",
            "event_id": f"{patient_id}-{encounter_id}-{trigger_event}-{str(msh[7])}",
            "source_facility": str(msh[4]),
        }
        
        await self.producer.send(
            topic="hl7-adt-events",
            key=(patient_id or "unknown").encode(),     # Partition by patient_id
            value=json.dumps(event).encode(),
        )

Enterprise Considerations

FHIR versioning: FHIR R4 is the current required version for US healthcare AI integrations (US Core Implementation Guide). FHIR R4B and R5 introduce breaking changes; confirm the EHR vendor's FHIR version before building integration code.

CDS Hooks performance budget: The EHR imposes a hard 5-second timeout on CDS Hook calls. AI systems must stay well within this budget: FHIR prefetch reduces round-trips, semantic caching eliminates LLM calls for repeated clinical scenarios, and timeout handling ensures empty cards are returned rather than EHR workflow failures.

SMART on FHIR scopes: Request the minimum necessary FHIR scopes. Clinical AI systems frequently over-request scopes "to be safe." Over-broad FHIR scopes are a HIPAA minimum-necessary violation. See Identity and Access for SMART scope guidance.

Common Mistakes

1. FHIR requests in series during CDS Hook. Making FHIR API calls sequentially within a CDS Hook (Patient → Conditions → Medications → Labs → Allergies) consumes 1–2 seconds per call and will timeout. Always execute parallel FHIR requests with asyncio.gather.

2. Not handling FHIR prefetch failures. If the EHR fails to populate prefetch resources (network error, permission), the CDS Hook receives null prefetch. The hook must detect this and fall back to direct FHIR calls or return empty cards.

3. Writing AI drafts to EHR without docStatus: "preliminary". An AI draft written with docStatus: "final" appears to clinicians as a finalized document. All AI-generated documents must use docStatus: "preliminary" pending physician review and signature.

Key Takeaways

  • FHIR R4 parallel requests are required for CDS Hooks performance within the 5-second EHR timeout
  • AI-generated EHR documents must use docStatus: "preliminary" — never write as final
  • CDS Hooks must return empty cards on timeout, never block the EHR workflow with errors
  • HL7 v2 ADT messages are the dominant real-time event feed for inpatient AI triggers; bridge them to Kafka for scalable AI event consumption
  • SMART on FHIR scopes must be minimum-necessary; over-broad scopes are a HIPAA violation

Further Reading