Orchestration and Workflow Automation for AI
Executive Summary
AI workloads differ from traditional software workloads in ways that fundamentally change how orchestration must be designed: jobs are long-running (minutes to hours for fine-tuning, seconds for batched inference), GPU resources must be reserved, retried with backoff, and released promptly, and failure modes include silent quality degradation rather than hard crashes. This chapter covers the orchestration layer for AI systems: scheduling recurring data pipelines, managing multi-step AI workflows, scaling inference infrastructure, and operating LLM workflows in production without accumulating technical debt from ad-hoc job management.
Learning Objectives
- Select the right orchestration approach (Kubernetes, Airflow, Temporal) for each class of AI workload
- Design durable, resumable workflows for multi-step AI pipelines
- Implement retry logic, timeout management, and failure handling appropriate for LLM API calls
- Operate AI infrastructure at scale: autoscaling, resource scheduling, and cost containment
Business Problem
AI pipelines run at the intersection of three scheduling domains that traditional job schedulers handle poorly: data operations (run nightly at 2 AM, skip if source unchanged), AI inference operations (variable latency, rate-limited external APIs, non-deterministic outputs), and model lifecycle operations (evaluate, promote, deploy new model versions). Without purpose-built orchestration, these pipelines are typically implemented as shell scripts, cron jobs, or fragile chains of dependent Lambda functions — systems that are difficult to monitor, impossible to retry reliably, and produce no audit trail.
Why This Technology Exists
The first generation of AI pipelines at scale used Apache Airflow, designed for data warehousing batch jobs. Airflow works well for scheduled, partition-based pipelines but struggles with the durable execution requirements of agentic AI: a multi-hour LLM workflow that must survive process restarts, a human-in-the-loop approval step that pauses execution for days, or a branching workflow that dynamically routes to different agents based on document classification.
Temporal emerged from Netflix's need to coordinate long-running distributed workflows that must survive infrastructure failures. Its durable execution model — where workflow state survives process crashes and can be replayed from a write-ahead log — is a direct fit for AI pipelines with external API calls, human approval gates, and multi-step orchestration requirements.
Core Architecture
Airflow for AI Data Pipelines
Apache Airflow is the standard choice for scheduled, batch-oriented AI data pipelines: nightly knowledge base refreshes, weekly embedding re-indexing, and periodic evaluation runs.
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Educational example — not for clinical use
default_args = {
"owner": "ai-platform-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ai-ops@reference-enterprise.example"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
@dag(
dag_id="clinical_knowledge_base_refresh",
default_args=default_args,
schedule_interval="0 2 * * *", # Nightly at 2 AM UTC
start_date=days_ago(1),
catchup=False,
tags=["ai-infrastructure", "knowledge-base", "clinical"],
)
def clinical_knowledge_base_refresh_dag():
"""
Nightly refresh of clinical knowledge base vector store.
Detects changed documents, re-embeds, and updates the index.
Educational Example — Not intended for clinical decision making.
"""
@task()
def detect_changed_documents() -> list[dict]:
"""Check source systems for new or updated documents."""
from pipeline.source_connectors import GuidelineConnector, FormularyConnector
connector = GuidelineConnector()
changed = connector.get_documents_modified_since(
since=datetime.utcnow() - timedelta(days=1)
)
return [{"source_id": d.source_id, "url": d.source_url} for d in changed]
@task()
def extract_and_chunk(changed_docs: list[dict]) -> list[dict]:
"""Extract text and create chunks for each changed document."""
from pipeline.extractor import DocumentExtractor
from pipeline.chunker import chunk_clinical_guideline
all_chunks = []
extractor = DocumentExtractor()
for doc_ref in changed_docs:
extracted = extractor.extract_from_url(doc_ref["url"])
chunks = chunk_clinical_guideline(extracted)
all_chunks.extend([c.__dict__ for c in chunks])
return all_chunks
@task()
def embed_chunks(chunks: list[dict]) -> int:
"""Compute embeddings for all chunks and update vector store."""
from pipeline.embedding_service import EmbeddingService
from pipeline.vector_store import ClinicalVectorStore
embedding_service = EmbeddingService()
vector_store = ClinicalVectorStore()
indexed_count = 0
for batch in _batch(chunks, 20):
embeddings = embedding_service.embed_batch([c["content"] for c in batch])
vector_store.upsert_batch(list(zip(batch, embeddings)))
indexed_count += len(batch)
return indexed_count
@task()
def run_quality_evaluation(indexed_count: int) -> dict:
"""Run golden query set against updated index; alert if quality drops."""
from pipeline.evaluation import KnowledgeBaseEvaluator
evaluator = KnowledgeBaseEvaluator()
results = evaluator.run_golden_queries()
if results["mrr"] < 0.75: # Mean Reciprocal Rank below threshold
raise ValueError(
f"Knowledge base quality below threshold after refresh. "
f"MRR={results['mrr']:.3f} < 0.75. Indexed {indexed_count} chunks."
)
return results
def _batch(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
changed = detect_changed_documents()
chunks = extract_and_chunk(changed)
count = embed_chunks(chunks)
run_quality_evaluation(count)
dag = clinical_knowledge_base_refresh_dag()Temporal for Durable AI Workflows
Temporal is the appropriate orchestration layer for multi-step AI workflows that must survive infrastructure failures, include human approval steps, or involve external API calls with complex retry semantics.
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy
# Educational example — not for clinical use
# --- Activity definitions (units of work with retry semantics) ---
@activity.defn
async def extract_clinical_context(patient_id: str, encounter_id: str) -> dict:
"""Retrieve clinical context from EHR for document generation."""
from integrations.fhir_client import FHIRClient
client = FHIRClient()
return await client.get_encounter_context(patient_id, encounter_id)
@activity.defn
async def generate_draft_summary(clinical_context: dict) -> str:
"""Generate AI draft of clinical document using LLM."""
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-opus-4-8", # verify current model IDs in official docs
max_tokens=2048,
system="You are assisting in generating a draft clinical summary for physician review.",
messages=[{
"role": "user",
"content": f"Generate a draft discharge summary for the following encounter:\n{clinical_context}"
}]
)
return response.content[0].text
@activity.defn
async def request_physician_review(draft_summary: str, encounter_id: str) -> str:
"""Send draft to physician inbox for review; return task ID."""
from integrations.ehr_task_client import EHRTaskClient
task_client = EHRTaskClient()
task_id = await task_client.create_review_task(
document=draft_summary,
encounter_id=encounter_id,
task_type="ai_discharge_summary_review"
)
return task_id
@activity.defn
async def poll_physician_approval(task_id: str) -> dict:
"""Check if physician has reviewed and approved the draft."""
from integrations.ehr_task_client import EHRTaskClient
task_client = EHRTaskClient()
task = await task_client.get_task(task_id)
return {"status": task.status, "approved": task.status == "completed"}
@activity.defn
async def write_to_ehr(summary: str, encounter_id: str) -> str:
"""Write approved document back to EHR as DocumentReference."""
from integrations.fhir_client import FHIRClient
client = FHIRClient()
resource_id = await client.create_document_reference(
content=summary,
encounter_id=encounter_id,
document_type="discharge-summary"
)
return resource_id
# --- Workflow definition (durable orchestration) ---
@workflow.defn
class DischargeSummaryWorkflow:
"""
Durable workflow for AI-assisted discharge summary generation.
Key durability properties:
- Survives process restarts and infrastructure failures
- Physician approval can take days; workflow remains paused safely
- All activities are retried on transient failures with exponential backoff
- Full execution history for audit and compliance
Educational Example — Not intended for clinical decision making.
"""
@workflow.run
async def run(self, patient_id: str, encounter_id: str) -> str:
# Step 1: Extract clinical context from EHR
clinical_context = await workflow.execute_activity(
extract_clinical_context,
args=[patient_id, encounter_id],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=2),
backoff_coefficient=2.0,
)
)
# Step 2: Generate AI draft (LLM call — may be slow)
draft_summary = await workflow.execute_activity(
generate_draft_summary,
args=[clinical_context],
start_to_close_timeout=timedelta(minutes=3),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=5),
backoff_coefficient=2.0,
)
)
# Step 3: Send for physician review
task_id = await workflow.execute_activity(
request_physician_review,
args=[draft_summary, encounter_id],
start_to_close_timeout=timedelta(seconds=30),
)
# Step 4: Wait for physician approval (can take minutes to days)
# Poll every 5 minutes; timeout after 7 days
approved = False
for _ in range(2016): # 2016 * 5 min = 7 days
result = await workflow.execute_activity(
poll_physician_approval,
args=[task_id],
start_to_close_timeout=timedelta(seconds=10),
)
if result["approved"]:
approved = True
break
await asyncio.sleep(300) # 5 minutes between polls
if not approved:
raise Exception(f"Physician approval timeout for encounter {encounter_id}")
# Step 5: Write approved document back to EHR
ehr_resource_id = await workflow.execute_activity(
write_to_ehr,
args=[draft_summary, encounter_id],
start_to_close_timeout=timedelta(seconds=30),
)
return ehr_resource_idKubernetes for AI Inference Scaling
Kubernetes with the NVIDIA device plugin is the standard infrastructure layer for GPU-accelerated AI inference. Key patterns for AI workloads:
# Educational example — GPU inference deployment with autoscaling
apiVersion: apps/v1
kind: Deployment
metadata:
name: clinical-inference-service
namespace: ai-infrastructure
spec:
replicas: 2 # Minimum replicas
selector:
matchLabels:
app: clinical-inference
template:
metadata:
labels:
app: clinical-inference
spec:
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest # verify current image tag
resources:
requests:
nvidia.com/gpu: "1"
memory: "32Gi"
cpu: "4"
limits:
nvidia.com/gpu: "1" # Never over-provision GPU
memory: "40Gi"
cpu: "8"
env:
- name: MODEL_NAME
value: "meta-llama/Meta-Llama-3.1-8B-Instruct" # example
- name: MAX_MODEL_LEN
value: "8192"
- name: GPU_MEMORY_UTILIZATION
value: "0.85" # Leave 15% for KV cache headroom
ports:
- containerPort: 8000
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60 # Model loading takes time
periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: clinical-inference-hpa
namespace: ai-infrastructure
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: clinical-inference-service
minReplicas: 2
maxReplicas: 8
metrics:
- type: Pods
pods:
metric:
name: vllm_num_waiting_seqs # Queue depth from vLLM metrics
target:
type: AverageValue
averageValue: "5" # Scale when >5 requests waiting per podEnterprise Considerations
Cost containment for GPU workloads: GPU compute is expensive (illustrative — verify current cloud pricing). GPU nodes should be provisioned on demand for batch inference jobs and released when complete. Use spot/preemptible instances for fault-tolerant batch workloads (embedding, evaluation) to reduce cost. Reserve on-demand capacity only for latency-sensitive inference serving.
Workflow observability: Temporal provides complete execution history for every workflow run, which serves dual purposes: debugging failed workflows and providing audit trail for regulated workloads (clinical AI requires audit logs for every AI-assisted clinical document). Ensure the Temporal server is deployed in the same data residency region as clinical data.
Multi-tenancy in orchestration: In a Reference Healthcare Organization with multiple departments, isolate workloads using Kubernetes namespaces with resource quotas. Prevent one department's batch embedding job from starving clinical inference workloads during peak hours.
Healthcare Considerations
The Temporal DischargeSummaryWorkflow above captures the physician review step as a durable workflow event. This is significant for compliance: every execution produces a complete, immutable audit trail showing when the AI draft was generated, when the physician reviewed it, and when it was written to the EHR. This audit trail satisfies Joint Commission documentation requirements.
Workflow timeout policy: Clinical workflows that include human approval steps should have explicit timeout policies. A discharge summary workflow that has not received physician approval within 7 days should escalate, not silently expire. Implement notification to department supervisor at 24 hours and 72 hours in addition to the 7-day hard timeout.
Common Mistakes
1. Using Airflow for long-running workflows. Airflow tasks time out and occupy slots for their entire duration. A workflow waiting for physician approval occupies an Airflow worker slot for days. Use Temporal for workflows with human-in-the-loop steps.
2. Not setting GPU resource limits. Without explicit GPU limits in Kubernetes, a single workload can monopolize all GPU capacity. Always set both requests and limits for nvidia.com/gpu.
3. Over-broad retry policies on LLM activities. Retrying a non-idempotent LLM activity 10 times may produce 10 different outputs or consume significant API budget. LLM activities should retry only on transient network errors; they should not retry on model errors or rate limit responses without backoff.
4. Scheduling embedding and inference jobs at the same time. Nightly knowledge base refresh (embedding-intensive) and peak clinical usage (inference-intensive) should not compete for the same GPU resources. Schedule batch embedding jobs during off-peak inference hours.
Best Practices
- Use Airflow for scheduled, partitioned data pipelines; use Temporal for durable, long-running AI workflows with human-in-the-loop steps
- Set explicit resource requests and limits for GPU workloads in Kubernetes
- Use Temporal's retry policy to control retry semantics per activity type; don't apply a single global retry policy
- Schedule batch embedding jobs during off-peak inference hours to avoid GPU contention
- Implement quality evaluation as the final step in every knowledge base refresh pipeline
Trade-offs
| Orchestrator | Strengths | Weaknesses | Best For |
|---|---|---|---|
| Airflow | Mature, widely deployed, rich UI | Workers block during task execution | Scheduled batch pipelines |
| Temporal | Durable execution, fine-grained retry | Operational complexity, infrastructure required | Long-running, event-driven AI workflows |
| Kubernetes Jobs | Native GPU support, simple for batch | No workflow logic | One-shot batch inference, fine-tuning runs |
| Step Functions (AWS) | Managed, integrates with AWS services | Vendor-locked, limited duration | AWS-native AI pipelines |
Interview Questions
Q: A clinical AI workflow has a step where a physician reviews an AI-generated document. The review may take anywhere from 10 minutes to 3 days. How would you design the orchestration for this workflow?
Category: System Design Difficulty: Senior Role: AI Architect
Answer Framework:
This is a durable workflow problem. The key constraint is that the workflow must remain paused for up to 3 days while consuming no compute resources — the physician review step is an external, human-triggered event, not a blocking computation.
Wrong approach: Using a thread.sleep() loop in an Airflow task or a Lambda function that polls every minute for 3 days. This consumes compute resources continuously and produces no audit trail.
Correct approach: Temporal's durable execution model is built for exactly this pattern. The workflow suspends after creating the physician review task, persisting its state in the Temporal service. When the physician completes the review (triggering a webhook or a poll activity), the workflow resumes from its exact suspension point. The full execution history — including suspension time, resume time, and physician identity — is recorded in Temporal's history log.
Key Points to Hit:
- Durable execution vs. stateful polling: the former suspends without consuming resources
- Audit trail requirement for clinical AI compliance
- Timeout policy: explicit escalation at 24h and 72h, not silent expiry at 7 days
- Separation of concerns: physician review task is created in the EHR workflow system, not managed by Temporal
Key Takeaways
- Airflow is appropriate for scheduled, batch-oriented AI data pipelines; Temporal is appropriate for durable, event-driven AI workflows
- GPU workloads require explicit resource limits in Kubernetes to prevent resource monopolization
- Human-in-the-loop steps require durable workflow orchestration — polling loops in Airflow workers are an antipattern
- Clinical AI workflows require complete audit trails; Temporal's execution history satisfies this requirement
- Quality evaluation (golden query evaluation) should be a first-class step in every knowledge base refresh pipeline
Further Reading
- Data Pipelines for AI — The pipelines that orchestration schedules
- LLM Serving Infrastructure — The inference infrastructure orchestration scales
- Agent Observability — Monitoring agentic workflows in production