Webhook and Callback Patterns for AI

Executive Summary

Asynchronous AI operations — document analysis, long-form generation, multi-step agent workflows — cannot return results within the HTTP request lifecycle. Webhooks and callback patterns provide the delivery mechanism that bridges the gap between a client submitting an AI job and eventually receiving the result, whether that job takes 10 seconds or 10 minutes. This chapter covers the design of reliable, secure webhook delivery for AI results, polling APIs as a fallback mechanism, and Server-Sent Events (SSE) for near-real-time streaming — the three async result delivery patterns that production clinical AI systems rely on.

Learning Objectives

  • Design webhook delivery with signature verification, retry policies, and idempotency
  • Implement polling APIs as a fallback for clients that cannot receive webhooks
  • Apply Server-Sent Events (SSE) for real-time streaming of AI outputs to interactive UIs
  • Secure webhook endpoints against spoofing and replay attacks

Business Problem

A Reference Healthcare Organization's clinical AI platform accepts discharge summary generation requests and returns AI-drafted summaries to the requesting clinician's EHR workflow. The AI generation takes 8–15 seconds — too long for a synchronous REST response within a CDS Hooks flow, but too short to require the clinician to manually check a status screen. The platform needs a delivery mechanism that pushes the result to the clinician's interface as soon as it is ready, without the clinician needing to wait, poll, or take an additional action.

Webhooks and SSE solve this problem for different contexts: SSE for interactive browser-based interfaces where the client maintains a persistent connection, webhooks for server-to-server result delivery (EHR system receives AI result at a registered callback endpoint), and polling for clients behind firewalls that cannot receive inbound webhook calls.

Webhook Delivery

python
import hmac
import hashlib
import httpx
import asyncio
import json
from datetime import datetime
from dataclasses import dataclass
from typing import Optional

# Educational example — not for clinical use

@dataclass
class WebhookDeliveryAttempt:
    url: str
    payload: dict
    attempt_number: int
    status_code: Optional[int]
    success: bool
    error: Optional[str]
    attempted_at: str


class WebhookDeliveryService:
    """
    Reliable webhook delivery for AI job results.
    
    Security properties:
    - HMAC-SHA256 signature on every delivery (X-Signature header)
    - Timestamp in payload prevents replay attacks
    - Retry with exponential backoff for transient failures
    - Dead letter logging for persistent failures
    
    Educational example — not for clinical use.
    """
    
    MAX_RETRIES = 5
    RETRY_DELAYS = [5, 30, 120, 600, 1800]   # 5s, 30s, 2m, 10m, 30m
    
    def __init__(self, signing_secret_manager):
        self.signing_secret_manager = signing_secret_manager
    
    async def deliver(
        self,
        callback_url: str,
        job_id: str,
        payload: dict,
        webhook_secret_name: str,
    ) -> bool:
        """
        Deliver AI job result to registered callback URL.
        Returns True if delivery succeeded within retry budget.
        
        Educational example — not for clinical use.
        """
        # Add delivery metadata to payload
        delivery_payload = {
            **payload,
            "job_id": job_id,
            "delivered_at": datetime.utcnow().isoformat() + "Z",
        }
        
        serialized = json.dumps(delivery_payload, sort_keys=True)
        signing_secret = await self.signing_secret_manager.get_secret(webhook_secret_name)
        
        signature = self._compute_signature(serialized, signing_secret)
        
        for attempt, delay in enumerate(self.RETRY_DELAYS[:self.MAX_RETRIES], 1):
            try:
                async with httpx.AsyncClient(timeout=10.0) as client:
                    response = await client.post(
                        callback_url,
                        content=serialized,
                        headers={
                            "Content-Type": "application/json",
                            "X-AI-Platform-Signature": f"sha256={signature}",
                            "X-AI-Platform-Job-ID": job_id,
                            "X-AI-Platform-Delivery-Attempt": str(attempt),
                        },
                    )
                
                if response.status_code in (200, 201, 202, 204):
                    return True
                
                if response.status_code in (400, 401, 403, 404, 410):
                    # Non-retryable client errors
                    await self._log_failed_delivery(job_id, callback_url, response.status_code)
                    return False
                
                # 5xx or unexpected status: retry
            except (httpx.TimeoutException, httpx.ConnectError) as e:
                pass  # Retry on network errors
            
            if attempt < self.MAX_RETRIES:
                await asyncio.sleep(delay)
        
        await self._log_failed_delivery(job_id, callback_url, None, "Max retries exceeded")
        return False
    
    def _compute_signature(self, payload: str, secret: str) -> str:
        """Compute HMAC-SHA256 signature of payload body."""
        return hmac.new(
            secret.encode(),
            payload.encode(),
            hashlib.sha256
        ).hexdigest()
    
    async def _log_failed_delivery(
        self,
        job_id: str,
        callback_url: str,
        status_code: Optional[int],
        error: Optional[str] = None,
    ) -> None:
        """Log permanently failed deliveries for investigation."""
        # Implementation: write to DLQ Kafka topic or database
        pass


def verify_webhook_signature(
    payload_body: bytes,
    signature_header: str,
    webhook_secret: str,
) -> bool:
    """
    Verify the HMAC-SHA256 signature on an incoming webhook.
    
    Call this at the receiving end to confirm the webhook is from
    the AI platform and has not been tampered with.
    
    Educational example — not for clinical use.
    """
    if not signature_header.startswith("sha256="):
        return False
    
    expected_signature = hmac.new(
        webhook_secret.encode(),
        payload_body,
        hashlib.sha256
    ).hexdigest()
    
    received_signature = signature_header[7:]  # Strip "sha256=" prefix
    
    # Use constant-time comparison to prevent timing attacks
    return hmac.compare_digest(expected_signature, received_signature)

Polling API

Some clients cannot receive inbound webhook calls (firewall restrictions, EHR vendor limitations). The polling API provides a fallback: the client submits a job, receives a job_id, and polls for status until complete.

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Literal
import asyncio

# Educational example — not for clinical use

app = FastAPI()

class JobStatus(BaseModel):
    job_id: str
    status: Literal["queued", "processing", "completed", "failed"]
    result: Optional[dict]          # Populated when status == "completed"
    error: Optional[str]            # Populated when status == "failed"
    created_at: str
    updated_at: str
    estimated_completion_seconds: Optional[int]


@app.get("/v1/jobs/{job_id}")
async def get_job_status(job_id: str, team_id: str) -> JobStatus:
    """
    Poll for AI job status.
    
    Polling interval guidance (returned in response headers):
    - status == "queued": retry after 5 seconds
    - status == "processing": retry after 3 seconds
    - status == "completed" | "failed": stop polling
    
    Educational example — not for clinical use.
    """
    job_store = _get_job_store()
    job = await job_store.get_job(job_id, team_id)
    
    if not job:
        raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
    
    # Set Retry-After hint for clients
    from fastapi.responses import JSONResponse
    retry_after = {"queued": 5, "processing": 3}.get(job.status, 0)
    
    response = JSONResponse(content=job.dict())
    if retry_after > 0:
        response.headers["Retry-After"] = str(retry_after)
    
    return response


def _get_job_store():
    """Return job store instance — Redis or database backed."""
    pass  # Implementation omitted

Server-Sent Events for Real-Time Streaming

SSE is the preferred delivery mechanism for interactive AI features where the user watches the response generate token-by-token in a browser interface.

python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
import anthropic

# Educational example — not for clinical use

@app.get("/v1/stream/{session_id}")
async def stream_ai_response(session_id: str, request: Request):
    """
    Server-Sent Events endpoint for real-time AI token streaming.
    
    The client opens a persistent SSE connection. As the AI generates tokens,
    the server pushes them through the SSE connection in real time.
    The client disconnects after the [DONE] event.
    
    Educational example — not for clinical use.
    """
    return StreamingResponse(
        _sse_generator(session_id, request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",    # Disable nginx response buffering
        }
    )


async def _sse_generator(session_id: str, request: Request):
    """
    Generate SSE events for an AI session.
    Streams tokens from the LLM and sends heartbeat pings to detect disconnects.
    
    Educational example — not for clinical use.
    """
    session = await _get_session(session_id)
    client = anthropic.Anthropic()
    
    try:
        with client.messages.stream(
            model="claude-opus-4-8",    # verify current model IDs
            max_tokens=1024,
            messages=session["messages"],
        ) as stream:
            for text in stream.text_stream:
                # Check for client disconnect
                if await request.is_disconnected():
                    break
                
                event_data = json.dumps({"type": "token", "content": text, "session_id": session_id})
                yield f"data: {event_data}\n\n"
        
        # Send completion event
        final_message = stream.get_final_message()
        done_data = json.dumps({
            "type": "done",
            "session_id": session_id,
            "usage": {
                "prompt_tokens": final_message.usage.input_tokens,
                "completion_tokens": final_message.usage.output_tokens,
            }
        })
        yield f"data: {done_data}\n\n"
        yield "data: [DONE]\n\n"
    
    except Exception as e:
        error_data = json.dumps({
            "type": "error",
            "session_id": session_id,
            "error": "AI service error",  # Never expose internal error details
        })
        yield f"data: {error_data}\n\n"


async def _get_session(session_id: str) -> dict:
    """Retrieve session context from session store."""
    pass

Pattern Selection Guide

Requirement Webhook Polling SSE
Client is a server Best Fallback Not applicable
Client is a browser Not applicable Possible Best
Client behind firewall Not applicable Best Not applicable
Response time: seconds Webhook (push) Poll every 3s SSE
Response time: minutes Webhook (push) Poll every 30s Connection may timeout
Real-time token streaming Not applicable Not applicable Best
EHR-to-EHR result delivery Best Fallback Not applicable

Enterprise Considerations

Webhook endpoint security: Webhook receiving endpoints must validate the HMAC signature on every delivery. Without signature verification, any HTTP client that knows the callback URL can POST a fabricated AI result. Clinical systems must never act on an unverified webhook payload.

Idempotency at the receiver: Webhook delivery services retry on failure. The receiving endpoint will receive the same job result multiple times if the first delivery acknowledgment was lost. Receiving endpoints must be idempotent: processing the same job_id twice must produce the same result as processing it once.

SSE connection management: SSE connections are long-lived HTTP connections. Proxies and load balancers frequently close idle connections after 30–60 seconds. Configure heartbeat pings (: keep-alive\n\n SSE comment events) at 15-second intervals to prevent proxy disconnects during long AI generations.

Common Mistakes

1. Not verifying webhook signatures. A webhook receiver that does not verify the HMAC signature will process fabricated payloads from any source that knows the endpoint URL. Always verify signatures.

2. Returning 200 before processing the webhook. If the webhook receiver processes the payload synchronously and takes 10 seconds, the sender's timeout may expire before it receives the 200 acknowledgment. The receiver should: (1) immediately return 200 Accepted, (2) process the payload asynchronously, (3) handle duplicate delivery via idempotency.

3. No retry policy on webhook delivery. If the receiving server is temporarily unavailable when the AI job completes, the webhook delivery fails and the result is lost. Always implement a retry policy with exponential backoff for webhook delivery.

4. Buffering SSE responses in the web framework or proxy. SSE requires that tokens are flushed immediately to the client. Framework response buffering or nginx proxy buffering accumulates tokens and delivers them in bursts rather than streaming in real time. Always set X-Accel-Buffering: no and disable framework response buffering for SSE endpoints.

Key Takeaways

  • Webhooks (server push) are the preferred pattern for server-to-server AI result delivery; polling is the fallback for clients that cannot receive inbound connections
  • All webhook deliveries must be signed (HMAC-SHA256) and receiving endpoints must verify the signature
  • Webhook receiving endpoints must be idempotent; retry delivery means the same result may arrive multiple times
  • SSE is the preferred pattern for real-time token streaming to browser-based clinical interfaces
  • SSE connections require heartbeat pings at 15-second intervals to prevent proxy disconnects

Further Reading