# Multi-agent RAG pipeline — LangGraph + LangChain
from langgraph.graph import StateGraph
from langchain_core.tools import tool
@tool
def query_clinical_data(patient_id: str) -> dict:
"""Retrieve FHIR-compliant patient record from lakehouse."""
return lakehouse.query(f"SELECT * FROM fhir.patient WHERE id='{patient_id}'")
@tool
def run_risk_model(record: dict) -> float:
"""Score readmission risk via deployed ML endpoint."""
return ml_endpoint.predict(record)["risk_score"]
graph = StateGraph(AgentState)
graph.add_node("retrieve", query_clinical_data)
graph.add_node("score", run_risk_model)
graph.add_node("summarize", llm_summarizer) # LLM synthesis step
graph.add_edge("retrieve", "score")
graph.add_edge("score", "summarize")
agent = graph.compile(checkpointer=memory, interrupt_before=["summarize"])
-- models/marts/clinical/fct_trial_enrollment.sql
-- Pattern: incremental merge + contract + multi-ref join
{{
config(
materialized = 'incremental',
unique_key = ['trial_id', 'patient_key', 'enrollment_date'],
incremental_strategy = 'merge',
on_schema_change = 'sync_all_columns',
cluster_by = ['enrollment_date'],
contract = { 'enforced': true },
meta = {
'owner' : 'clinical-analytics',
'domain' : 'trials',
'contains_phi' : true,
'sla' : '99.9%'
}
)
}}
with patients as (
select * from {{ ref('dim_patient') }}
),
trials as (
select * from {{ ref('dim_clinical_trial') }}
),
enrollments as (
select * from {{ ref('stg_ctms__enrollments') }}
{% if is_incremental() %}
-- only process new records on incremental runs
where enrolled_at > (select max(enrollment_date) from {{ this }})
{% endif %}
),
final as ()
select
{{ dbt_utils.generate_surrogate_key(['e.trial_id','e.patient_id']) }}
as enrollment_sk,
e.trial_id,
p.patient_key,
t.protocol_number,
e.site_id,
e.enrolled_at::date as enrollment_date,
e.arm_code,
e.consent_version,
e.dropped_at is not null as is_dropped,
{{ current_timestamp() }} as dbt_updated_at
from enrollments e
join patients p using (patient_id)
join trials t using (trial_id)
where e.status != 'SCREEN_FAIL'
)
select * from final
-- Data Vault 2.0 — Patient / Encounter domain
-- Hub + Satellite + Link pattern (Snowflake / BigQuery)
-- ── HUB_PATIENT: one row per unique business key ──────────────────
create or replace table dv.HUB_PATIENT as
select
sha2(mrn, 256) as patient_hk, -- surrogate hash key
mrn as patient_bk, -- business key (MRN)
'EHR_EPIC' as record_source,
current_timestamp() as load_dts
from staging.raw_patient
qualify row_number() over (partition by mrn order by load_dts) = 1;
-- ── SAT_PATIENT_DEMOGRAPHICS: attributes with delta detection ─────
create or replace table dv.SAT_PATIENT_DEMOGRAPHICS as
select
sha2(mrn, 256) as patient_hk,
sha2(concat_ws('|', gender, dob, race,
ethnicity, postal_code), 256) as hashdiff,
gender, dob, race, ethnicity, postal_code,
'EHR_EPIC' as record_source,
current_timestamp() as load_dts
from staging.raw_patient
qualify hashdiff
!= lag(hashdiff) over (
partition by patient_hk order by load_dts
);
-- ── LNK_PATIENT_ENCOUNTER: foreign-key relationship ──────────────
create or replace table dv.LNK_PATIENT_ENCOUNTER as
select
sha2(concat_ws('|', mrn, encounter_id), 256) as pat_enc_hk,
sha2(mrn, 256) as patient_hk,
sha2(encounter_id, 256) as encounter_hk,
'EHR_EPIC' as record_source,
current_timestamp() as load_dts
from staging.raw_encounter
qualify row_number() over (
partition by mrn, encounter_id order by load_dts
) = 1;
// Structured Streaming — FHIR observation ingest (Scala / Spark)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val schema = spark.read
.option("multiline", "true")
.json("s3://landing/fhir-sample/")
.schema
val rawStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", "fhir.Observation")
.load()
.select(from_json(col("value").cast("string"), schema).as("obs"))
.select("obs.*")
val enriched = rawStream
.withColumn("ingested_at", current_timestamp())
.withColumn("phi_masked", sha2(col("subject.reference"), 256))
.drop("subject")
enriched.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/fhir-obs/")
.trigger(Trigger.ProcessingTime("30 seconds"))
.toTable("gold.fhir_observations")
// AWS CDK — HIPAA-aligned Lambda stack for patient data processing
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as kms from 'aws-cdk-lib/aws-kms';
export class PatientDataStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// KMS key for PHI encryption at rest
const phiKey = new kms.Key(this, 'PhiKey', {
enableKeyRotation: true,
description: 'CMK for patient PHI fields',
});
// Dead-letter queue for failed processing events
const dlq = new sqs.Queue(this, 'PatientDlq', {
encryptionMasterKey: phiKey,
retentionPeriod: cdk.Duration.days(14),
});
// Lambda — processes FHIR Observation events, masks PHI, writes to S3
const processFn = new lambda.Function(this, 'ProcessPatientData', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'handler.process',
code: lambda.Code.fromAsset('lambda/patient_processor'),
environment: { PHI_KEY_ARN: phiKey.keyArn, TARGET_BUCKET: 'phi-processed' },
deadLetterQueue: dlq,
tracing: lambda.Tracing.ACTIVE, // X-Ray enabled
});
phiKey.grantEncryptDecrypt(processFn);
// EventBridge rule — trigger on FHIR Observation PUTs
new events.Rule(this, 'FhirObsRule', {
eventPattern: { source: ['fhir.gateway'], detailType: ['Observation'] },
targets: [new targets.LambdaFunction(processFn)],
});
}
}