Portrait of Dmitry Shirokov

Data & AI Advisory

Dmitry Shirokov

Solutions Architect in Data and AI

I help companies turn data and AI investments into measurable outcomes—designing secure data platforms, operationalizing machine learning pipelines, and delivering agentic AI systems that work reliably in production.

  • 10+ yrs advisory & solutions engineering
  • Cloud migration and modernization AWS | GCP | Azure
  • Lakehouse Governance Snowflake, Databricks & BigQuery

Practice Areas

Consulting Practice Areas

Deploying domain-specific architectures that satisfy strict scale, compliance, and decision-making requirements.

Healthcare & Biotech

HIPAA-compliant, multi-omics architectures bringing trust and reproducibility to clinical and genomic research data.

Data
  • PHI/PII Safeguarding: Secure, HIPAA-aligned ingestion and access controls.
  • Multi-Omics Pipelines: Genomics, proteomics, and clinical registries.
  • Interoperability: FHIR / HL7 standard metadata integrations.
AI
  • Variant Analytics: Scaling variant QC and genomic ML models.
  • Model Productization: Moving research notebooks to orchestrated workflows.
  • Decision Support: Analytics pipelines for clinical outcomes research.

Fintech & Banks

High-throughput, audit-ready data ecosystems built for low latency, secure mesh governance, and compliance auditing.

Data
  • Lakehouse Governance: Snowflake, Databricks, and BigQuery frameworks.
  • Data Mesh: Composable domain data products with robust SLAs.
  • Data Assets & Contracts: Governed data product cataloging, schema contracts, and SLA-backed access agreements across domains.
AI
  • Compliance Anomaly Models: AI auditing pipelines detecting unusual data flows.
  • Fraud Detection Prep: Real-time streaming inputs for fraud classification models.
  • Cost & FinOps Models: AI-driven cost projection and query optimizations.

Retail & E-Commerce

Unified customer intelligence and supply chain scaling pipelines connecting raw interaction signals to business metrics.

Data
  • Customer 360 Pipelines: Merging clickstreams, transactions, and CRM sources.
  • Semantic Modeling: Governed dbt models driving semantic analytics.
  • Inventory Systems: Low-latency streaming metrics for supply chain tracking.
AI
  • Decision Intelligence: AI-assisted insights and executive dashboard narratives.
  • Recommendation Models: Scaled feature stores and pipeline integrations.
  • Predictive Analytics: Churn and LTV prediction datasets for marketing.

Services

Core Advisory & Architecture Services

End-to-end guidance from initial regulatory data mapping to deployed, production-grade cloud environments.

Strategic Data & AI Advisory

Designing modern organizational frameworks that bridge engineering feasibility with regulatory and compliance targets.

Data
  • HIPAA & SOC2 Blueprints: Architectural designs mapping regulatory boundaries.
  • Data Governance: Constructing domain-level data ownership and SLA models.
AI
  • ML Readiness: Evaluating legacy data structures for model training capabilities.
  • AI Risk Assessment: Establishing lineage for auditability and compliance.

Cloud Solutions Architecture

Building high-throughput, composable cloud engines that scale computation costs predictably.

Data
  • Lakehouse Deployment: Scaling Snowflake, Databricks, and BigQuery.
  • Ingestion Engines: Low-latency clickstreams, FHIR registries, and database CDC.
AI
  • Feature Store Design: Centralizing feature definitions for training and inference.
  • Dynamic Compute Scaling: FinOps strategies for model compute environments.

Operationalization & MLOps

Automating deployment pipelines to ensure reproducibility, data quality, and semantic truth.

Data
  • dbt & Semantic Modeling: Restructuring models to serve unified metrics.
  • Quality Observability: Deploying anomaly checkers and data quality alerts.
AI
  • Notebook-to-Production: Packaging research notebooks into automated DAG pipelines.
  • LLM Observability: Evaluation metrics and auditing layers for generative models.

Cloud Migration & Modernization

Structured hyperscaler migrations—AWS, Azure, and GCP—using MAP and partner frameworks to de-risk lift-and-shift, re-platforming, and full re-architecture engagements.

Migration
  • AWS MAP: Migration Acceleration Program readiness assessments, wave planning, and MGN/DMS tooling for large-scale fleet migrations.
  • Azure Migrate & Modernize: Landing-zone blueprints, Azure Migrate hub discovery, and Velostrata-based workload transfers.
  • GCP MAP: Migrations for Enterprise discovery, MCMA tooling, and BigQuery Migration Service for analytics workload lift.
FinOps
  • Landing Zone Design: Account/subscription/project foundations, IAM, networking guardrails, and security baselines across AWS Control Tower, Azure Landing Zones, and GCP Landing Zone.
  • FinOps & Cost Governance: Rightsizing dashboards, committed-use and reserved-instance planning, and cloud billing anomaly detection post-migration.

Data & AI Strategy & Governance

Defining the organizational frameworks, roadmaps, and ownership models that turn raw data and AI ambitions into measurable, governed business outcomes.

Strategy
  • Data Strategy: Maturity assessments, multi-year data platform roadmaps, and data product strategy aligned to business domains.
  • AI Strategy: LLM adoption frameworks, AI readiness evaluations, responsible AI policies, and build-vs-buy analysis for generative and predictive use cases.
  • Executive Alignment: Board-level narratives, KPI frameworks, and investment cases bridging engineering feasibility with C-suite priorities.
Governance
  • Data Governance: Metadata cataloging (Collibra, Atlan, Unity Catalog), lineage tracking, data stewardship models, and access policy enforcement.
  • AI Governance: Model risk management, bias auditing, explainability requirements, and auditability frameworks for regulated industries.
  • Regulatory Alignment: HIPAA, SOC 2, GDPR, and EU AI Act compliance mapping integrated into platform and pipeline design.

AI Agents & Agentic Systems

Designing and delivering autonomous multi-agent architectures that execute complex workflows, reason over enterprise data, and integrate safely into production systems.

Architecture
  • Agent Design: Composing task-specific agents with defined tool boundaries, memory, and guardrails aligned to business workflows.
  • Orchestration: Multi-agent coordination patterns—sequential, parallel, and supervisor-routed—using frameworks like LangGraph, CrewAI, and Vertex AI Agent Builder.
  • Data Integration: Connecting agents to enterprise datastores, APIs, and vector indexes with retrieval-augmented generation (RAG) pipelines.
AI
  • Operationalization: Deploying agents with evaluation harnesses, human-in-the-loop checkpoints, and observability layers for production trust.
  • Safety & Governance: Prompt hardening, tool-call auditing, and policy enforcement to meet enterprise security and compliance requirements.

Blueprint

How a typical engagement runs

  1. Discover Discovery & alignment. Map stakeholders, regulatory constraints (HIPAA, SOC 2, GDPR), data domains, and cloud estate.
  2. Assess Data & AI readiness. Score data maturity, platform gaps, and AI feasibility—governance posture, infrastructure, and agentic prerequisites.
  3. Design Architecture & roadmap. Blueprint target-state platform, AI/ML pipelines, and agent layer. Define contracts, SLAs, cost model, and phased milestones.
  4. Build Build & accelerate. Stand up ingestion, curation, and analytics layers. Apply Lakehouse, streaming, and RAG patterns with observability by default.
  5. Ship AI AI & agent delivery. Ship predictive models, LLM workflows, or multi-agent systems with eval harnesses, HITL controls, and safety guardrails.
  6. Hand off Productize & hand off. Deliver runbooks, governance playbooks, and monitoring dashboards. Embed knowledge transfer for team independence.

Highlights

Recent work

Patterns in practice
# Multi-agent RAG pipeline — LangGraph + LangChain
from langgraph.graph import StateGraph
from langchain_core.tools import tool

@tool
def query_clinical_data(patient_id: str) -> dict:
    """Retrieve FHIR-compliant patient record from lakehouse."""
    return lakehouse.query(f"SELECT * FROM fhir.patient WHERE id='{patient_id}'")

@tool
def run_risk_model(record: dict) -> float:
    """Score readmission risk via deployed ML endpoint."""
    return ml_endpoint.predict(record)["risk_score"]

graph = StateGraph(AgentState)
graph.add_node("retrieve",  query_clinical_data)
graph.add_node("score",     run_risk_model)
graph.add_node("summarize", llm_summarizer)   # LLM synthesis step
graph.add_edge("retrieve", "score")
graph.add_edge("score",    "summarize")
agent = graph.compile(checkpointer=memory, interrupt_before=["summarize"])
-- models/marts/clinical/fct_trial_enrollment.sql
-- Pattern: incremental merge + contract + multi-ref join

{{
  config(
    materialized         = 'incremental',
    unique_key           = ['trial_id', 'patient_key', 'enrollment_date'],
    incremental_strategy = 'merge',
    on_schema_change     = 'sync_all_columns',
    cluster_by           = ['enrollment_date'],
    contract             = { 'enforced': true },
    meta = {
      'owner'        : 'clinical-analytics',
      'domain'       : 'trials',
      'contains_phi' : true,
      'sla'          : '99.9%'
    }
  )
}}

with patients as (
  select * from {{ ref('dim_patient') }}
),
trials as (
  select * from {{ ref('dim_clinical_trial') }}
),
enrollments as (
  select * from {{ ref('stg_ctms__enrollments') }}
  {% if is_incremental() %}
    -- only process new records on incremental runs
    where enrolled_at > (select max(enrollment_date) from {{ this }})
  {% endif %}
),
final as ()
  select
    {{ dbt_utils.generate_surrogate_key(['e.trial_id','e.patient_id']) }}
                                      as enrollment_sk,
    e.trial_id,
    p.patient_key,
    t.protocol_number,
    e.site_id,
    e.enrolled_at::date               as enrollment_date,
    e.arm_code,
    e.consent_version,
    e.dropped_at is not null          as is_dropped,
    {{ current_timestamp() }}         as dbt_updated_at
  from enrollments  e
  join patients     p using (patient_id)
  join trials       t using (trial_id)
  where e.status != 'SCREEN_FAIL'
)
select * from final
-- Data Vault 2.0 — Patient / Encounter domain
-- Hub + Satellite + Link pattern (Snowflake / BigQuery)

-- ── HUB_PATIENT: one row per unique business key ──────────────────
create or replace table dv.HUB_PATIENT as
select
  sha2(mrn, 256)       as patient_hk,     -- surrogate hash key
  mrn                  as patient_bk,     -- business key (MRN)
  'EHR_EPIC'           as record_source,
  current_timestamp()  as load_dts
from staging.raw_patient
qualify row_number() over (partition by mrn order by load_dts) = 1;


-- ── SAT_PATIENT_DEMOGRAPHICS: attributes with delta detection ─────
create or replace table dv.SAT_PATIENT_DEMOGRAPHICS as
select
  sha2(mrn, 256)                                   as patient_hk,
  sha2(concat_ws('|', gender, dob, race,
                 ethnicity, postal_code), 256)      as hashdiff,
  gender, dob, race, ethnicity, postal_code,
  'EHR_EPIC'                                       as record_source,
  current_timestamp()                              as load_dts
from staging.raw_patient
qualify hashdiff
     != lag(hashdiff) over (
          partition by patient_hk order by load_dts
        );


-- ── LNK_PATIENT_ENCOUNTER: foreign-key relationship ──────────────
create or replace table dv.LNK_PATIENT_ENCOUNTER as
select
  sha2(concat_ws('|', mrn, encounter_id), 256)  as pat_enc_hk,
  sha2(mrn, 256)                                as patient_hk,
  sha2(encounter_id, 256)                       as encounter_hk,
  'EHR_EPIC'                                   as record_source,
  current_timestamp()                          as load_dts
from staging.raw_encounter
qualify row_number() over (
  partition by mrn, encounter_id order by load_dts
) = 1;
// Structured Streaming — FHIR observation ingest (Scala / Spark)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val schema = spark.read
  .option("multiline", "true")
  .json("s3://landing/fhir-sample/")
  .schema

val rawStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", "fhir.Observation")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("obs"))
  .select("obs.*")

val enriched = rawStream
  .withColumn("ingested_at", current_timestamp())
  .withColumn("phi_masked",  sha2(col("subject.reference"), 256))
  .drop("subject")

enriched.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "s3://checkpoints/fhir-obs/")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .toTable("gold.fhir_observations")
// AWS CDK — HIPAA-aligned Lambda stack for patient data processing
import * as cdk  from 'aws-cdk-lib';
import * as lambda    from 'aws-cdk-lib/aws-lambda';
import * as events    from 'aws-cdk-lib/aws-events';
import * as targets   from 'aws-cdk-lib/aws-events-targets';
import * as sqs       from 'aws-cdk-lib/aws-sqs';
import * as kms       from 'aws-cdk-lib/aws-kms';

export class PatientDataStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // KMS key for PHI encryption at rest
    const phiKey = new kms.Key(this, 'PhiKey', {
      enableKeyRotation: true,
      description:       'CMK for patient PHI fields',
    });

    // Dead-letter queue for failed processing events
    const dlq = new sqs.Queue(this, 'PatientDlq', {
      encryptionMasterKey: phiKey,
      retentionPeriod:     cdk.Duration.days(14),
    });

    // Lambda — processes FHIR Observation events, masks PHI, writes to S3
    const processFn = new lambda.Function(this, 'ProcessPatientData', {
      runtime:     lambda.Runtime.PYTHON_3_12,
      handler:     'handler.process',
      code:        lambda.Code.fromAsset('lambda/patient_processor'),
      environment: { PHI_KEY_ARN: phiKey.keyArn, TARGET_BUCKET: 'phi-processed' },
      deadLetterQueue: dlq,
      tracing:     lambda.Tracing.ACTIVE,   // X-Ray enabled
    });
    phiKey.grantEncryptDecrypt(processFn);

    // EventBridge rule — trigger on FHIR Observation PUTs
    new events.Rule(this, 'FhirObsRule', {
      eventPattern: { source: ['fhir.gateway'], detailType: ['Observation'] },
      targets: [new targets.LambdaFunction(processFn)],
    });
  }
}
Publications Previous contributions