Home Nieuws 4 YAML-bestanden in plaats van PySpark: hoe we analisten datapijplijnen laten bouwen...

4 YAML-bestanden in plaats van PySpark: hoe we analisten datapijplijnen laten bouwen zonder ingenieurs

6
0
4 YAML-bestanden in plaats van PySpark: hoe we analisten datapijplijnen laten bouwen zonder ingenieurs

ons drie weken de tijd om één enkele datapijplijn te verzenden. Tegenwoordig doet een analist zonder Python-ervaring het in een dag. Hier is hoe we daar zijn gekomen.

Ik ben Kiril Kazlou, een data-ingenieur bij Mindbox. Ons team herberekent regelmatig bedrijfsstatistieken voor klanten, wat betekent dat we voortdurend datamarts bouwen voor facturering en analyse, waarbij we gebruik maken van tientallen verschillende bronnen.

Lange tijd vertrouwden we op PySpark voor al onze gegevensverwerking. Het probleem? Zonder Python-ervaring kun je niet echt met PySpark werken. Voor elke nieuwe pijplijn was een ontwikkelaar nodig. En dat betekende wachten – soms wekenlang.

In dit bericht laat ik zien hoe we een intern dataplatform hebben gebouwd waar een analist of productmanager een regelmatig bijgewerkte pipeline kan opzetten door slechts vier YAML-bestanden te schrijven.

Waarom PySpark ons ​​vertraagde

Laat me de pijn illustreren met een schoolvoorbeeld: het berekenen van MAU (maandelijks actieve gebruikers).

Oppervlakkig gezien voelt dit als een eenvoudige SQL-taak: COUNT(DISTINCT customerId) over een paar tafels gedurende een tijdvenster. Maar vanwege alle infrastructuuroverhead – PySpark, Airflow DAG-installatie, Spark-resourcetoewijzing, testen – moesten we het aan ontwikkelaars overdragen. Het resultaat? Een volledige week om een ​​MAU-teller te verzenden.

Het duurde één tot drie weken om elke nieuwe statistiek te leveren. En elke keer zag het proces er hetzelfde uit:

  1. Een analist definieerde de businessvereisten, vond een beschikbare ontwikkelaar en overhandigde de context.
  2. De ontwikkelaar verduidelijkte de details, schreef PySpark-code, doorliep de codebeoordeling, configureerde de DAG en implementeerde deze.

Wat we eigenlijk wilden, was dat analisten en productmanagers – de mensen die de bedrijfslogica het beste begrijpen en vloeiend zijn in SQL en YAML – dit zelf zouden afhandelen. Geen Python. Geen PySpark.

Hoe pijpleidingen werden gebouwd met PySpark

Waar we PySpark mee hebben vervangen: YAML en SQL zijn alles wat u nodig heeft

Om een ​​declaratieve benadering te hanteren, hebben we onze gegevenslaag in drie delen opgesplitst en voor elk deel de juiste tool gekozen:

  • dlt (tool voor het laden van gegevens) — neemt gegevens van externe API’s en databases op in objectopslag. Volledig geconfigureerd via een YAML-bestand. Geen code vereist.
  • dbt (tool voor het bouwen van gegevens) op Trino — transformeert gegevens met behulp van pure SQL. Het koppelt modellen via ref()bouwt automatisch een afhankelijkheidsgrafiek op en verwerkt incrementele updates.
  • Luchtstroom + Kosmos – orkestreert de pijpleidingen. De Airflow DAG wordt automatisch gegenereerd uit dag.yaml en het dbt-project.

We gebruikten Trino al als query-engine voor ad-hocquery’s en hadden deze aangesloten op Superset voor BI. Het had zichzelf al bewezen: voor queries met standaardlogica verwerkte het enorme datasets sneller en met minder middelen dan Spark. Bovendien ondersteunt Trino native federatieve toegang tot meerdere datastores vanuit één enkele SQL-query. Voor 90% van onze pijpleidingen paste Trino perfect.

Diagram van de nieuwe pijplijnworkflow: een analist schrijft rechtstreeks YAML-configuraties en SQL-modellen. dbt en Trino zorgen automatisch voor de uitvoering via Airflow. Geen betrokkenheid van de ontwikkelaar vereist. Het volledige proces duurt één dag.
Daarna: pijpleidingen in handen van analisten met dbt + Trino

Hoe we gegevens laden: dlt.yaml

Het eerste YAML-bestand beschrijft waar en hoe gegevens moeten worden geladen voor downstream-verwerking. Hier is een voorbeeld uit de praktijk: het laden van factureringsgegevens vanuit een interne API:

product: sg-team
feature: billing
schema: billing_tarification

dag:
  dag_id: dlt_billing_tarification
  schedule: "0 4 * * *"
  description: "Daily refresh of tarification data"
  tags:
    - billing

alerts:
  enabled: true
  severity: warning

source:
  type: rest_api
  client:
    base_url: "https://internal-api.example.com"
    auth:
      type: bearer
      token: dlt-billing.token
  resources:
    - name: tarification_data
      endpoint:
        path: /tarificationData
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
          pricingPlanLine: CurrentPlan
      write_disposition: replace
      processing_steps:
        - map: dlt_custom.billing_tarification_data.map

    - name: charges_raw
      columns:
        staffUserName:
          data_type: text
          nullable: true
      endpoint:
        path: /data-feed/charges
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

    - name: discounts_raw
      endpoint:
        path: /data-feed/discounts
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

Deze configuratie definieert vier bronnen van één enkele API. Voor elk daarvan specificeren we het eindpunt, de aanvraagparameters en een schrijfstrategie – in ons geval: replace betekent ‘elke keer overschrijven’. U kunt ook verwerkingsstappen toevoegen, kolomtypen definiëren en waarschuwingen configureren.

De volledige configuratie is 40 regels YAML. Zonder dlt zou elke connector een Python-script zijn dat verzoeken, paginering, nieuwe pogingen, serialisatie naar Delta Table-indeling en uploads naar opslag afhandelt.

Hoe we gegevens transformeren met SQL: dbt_project.yaml en source.yaml

De volgende stap is het configureren van het dbt-model. Bij Trino betekent dat SQL-query’s.

Hier is een voorbeeld van hoe we de MAU-berekening hebben opgezet. Zo ziet de voorbereiding van evenementen vanuit één bron eruit:

-- int_mau_events_visits.sql (simplified)
{{ config(materialized='table') }}

WITH period AS (
    -- Rolling window: last 5 months to current
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

events AS (
    -- Pull visit events within the period window
    SELECT src._tenant, src.unmergedCustomerId,
           'visits' AS src_type, src.endpoint
    FROM {{ source('final', 'customerstracking_visits') }} src
    CROSS JOIN period p
    WHERE src.unmergedCustomerId IS NOT NULL
      AND /* ...timestamp filtering by year/month bounds... */
),

events_with_customer AS (
    -- Resolve merged customer IDs
    SELECT e._tenant,
           COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
           e.src_type, e.endpoint
    FROM events e
    LEFT JOIN {{ ref('int_merged_customers') }} mc
      ON e._tenant = mc._tenant
      AND e.unmergedCustomerId = mc.unmergedCustomerId
)

-- Keep only actual (non-deleted) customers
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
    SELECT 1 FROM {{ ref('int_actual_customers') }} ac
    WHERE ewc._tenant = ac._tenant
      AND ewc.customerId = ac.customerId
)

Alle tien gebeurtenisbronnen volgen exact hetzelfde patroon. De enige verschillen zijn de brontabel en de filters. Vervolgens worden de modellen samengevoegd tot één enkele stream:

-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 more sources

En tot slot de datamart waar alles wordt samengevoegd:

-- mau_period_datamart.sql
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key=('_tenant', 'start_year', 'start_month', 'end_year', 'end_month')
) }}

{%- set months_back = var('months_back', 5) | int -%}

WITH period AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
    SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
    SELECT
        er._tenant,
        COUNT(DISTINCT CASE WHEN src_type="visits"
              THEN customerId END) AS CustomersTracking_Visits,
        COUNT(DISTINCT CASE WHEN src_type="orders"
              THEN customerId END) AS ProcessingOrders_Orders,
        COUNT(DISTINCT CASE WHEN src_type="mailings"
              THEN customerId END) AS Mailings_MessageStatuses,
        -- ...other metrics
        COUNT(DISTINCT customerId) AS MAU
    FROM events_resolved er
    GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p

Voor de datamart-configuratie gebruiken we incremental_strategy='merge'. dbt genereert automatisch de samenvoegquery, ter vervanging van de unique_key voor boven. Het is niet nodig om handmatig incrementeel laden te implementeren.

Om de modellen in één project te binden, hebben we een opzet gemaakt dbt_project.yaml:

name: mau_period
version: '1.0.0'

models:
  mau_period:
    +on_table_exists: replace
    +on_schema_change: append_new_columns

En sources.yamlwaarin de invoertabellen worden beschreven:

sources:
  - name: final
    database: data_platform
    schema: final
    tables:
      - name: inapps_targetings_v2
      - name: inapps_clicks_v2
      - name: customerstracking_visits
      - name: processingorders_orders
      - name: cdp_mergedcustomers_v2
      # ...

Het resultaat is dezelfde bedrijfslogica die we hadden in PySpark, maar dan in pure SQL: sources.yaml vervangt typedspark-schema’s, {{ ref() }} En {{ source() }} vervangen .get_table()en de automatische uitvoeringsvolgorde via de afhankelijkheidsgrafiek vervangt de handmatige afstemming van Spark-resources.

Hoe we de luchtstroom configureren: dag.yaml

Het vierde configuratiebestand definieert wanneer en hoe Airflow de pijplijn uitvoert:

product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *"  # every day at 00:15 MSK

params:
  - name: start_date
    description: "Start date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: end_date
    description: "End date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: months_back
    description: "Months to look back (default: 5)"
    default: 5

alerts:
  enabled: true
  severity: warning

Vervolgens parseert ons Python-script dag.yaml En dbt_project.yaml en gebruikt de Cosmos-bibliotheek om een ​​volledig functionele Airflow DAG te genereren. Dit is de enige stukje Python-code in de gehele opstelling. Het is één keer geschreven en werkt voor elk dbt-project. Dit is het belangrijkste onderdeel:

def _build_dbt_project_dags(project_path: Path, environ: dict) -> list(DbtDag):
    config_dict = yaml.safe_load(dag_config_path.read_text())
    config = DagConfig.model_validate(config_dict)

    # YAML params → Airflow Params
    params = {}
    operator_vars = {}
    for param in config.params:
        params(param.name) = Param(
            default=param.default if param.default is not None else "",
            description=param.description,
        )
        operator_vars(param.name) = f"{{{{ params.{param.name} }}}}"

    # Cosmos creates the DAG from the dbt project
    with DbtDag(
        dag_id=f"dbt_{project_path.name}",
        schedule=config.schedule,
        params=params,
        project_config=ProjectConfig(dbt_project_path=project_path),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name=project_name,
            profile_mapping=TrinoLDAPProfileMapping(
                conn_id="trino_default",
                profile_args={
                    "database": profile_database,
                    "schema": profile_schema,
                },
            ),
        ),
        operator_args={"vars": operator_vars},
    ) as dag:
        # Create schema before running models
        create_schema = SQLExecuteQueryOperator(
            task_id="create_schema",
            conn_id="trino_default",
            sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
        )
        # Attach to root tasks
        for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
            task = dag.tasks_map(unique_id)
            if not task.upstream_task_ids:
                create_schema >> task

Kosmos leest manifest.json uit het dbt-project, parseert de modelafhankelijkheidsgrafiek en maakt voor elk model een afzonderlijke Airflow-taak. Taakafhankelijkheden worden automatisch opgebouwd op basis van ref() oproepen in de SQL.

Hoe analisten pijplijnen bouwen zonder ontwikkelaars

Als een analist nu een nieuwe terugkerende pijplijn nodig heeft, kan hij deze in een paar stappen samenstellen:

Stap 1. Maak een map in de repository: dbt-projects/my_new_pipeline/.

Stap 2. Als externe gegevensopname nodig is, schrijft u een YAML-configuratie voor dlt.

Stap 3. Schrijf SQL-modellen in de models/ map en beschrijf de bronnen daarin sources.yaml.

Stap 4. Creëren dbt_project.yaml En dag.yaml.

Stap 5. Push naar Git, doorloop de beoordeling, voeg samen.

CI/CD bouwt het dbt-project en verzendt artefacten naar S3. Airflow leest van daaruit de DAG-bestanden, Cosmos parseert het dbt-project en genereert de taakgrafiek. Op schema laat dbt de modellen in de juiste volgorde op Trino draaien. Het eindresultaat is een bijgewerkte datamart in het magazijn, toegankelijk via Superset.

Wat er veranderde na de migratie

Voor-en-na-vergelijking laat zien dat de levertijd van de pijplijn daalt van één naar drie weken onder PySpark naar één dag met de op YAML gebaseerde stack, en dat het eigendom van de pijplijn verschuift van ontwikkelaars naar analisten.
Wat veranderde: van weken naar één dag, van ontwikkelaars naar analisten

Als analisten zelf pijpleidingen willen bouwen, moeten ze dit begrijpen ref() En source() concepten, het verschil tussen table En incremental materialisatie en de basisprincipes van Git. We hebben een aantal interne workshops georganiseerd en stapsgewijze handleidingen voor elk taaktype samengesteld.

Waarom de nieuwe stack PySpark niet volledig vervangt

Voor ongeveer 10% van onze pipelines is PySpark nog steeds de enige optie – wanneer een transformatie simpelweg niet in SQL past. dbt ondersteunt Jinja-macro’s, maar dat is geen vervanging voor volwaardige Python. En het zou oneerlijk zijn om de beperkingen van de nieuwe tools over het hoofd te zien.

dlt + Delta: experimentele upsert-ondersteuning. We gebruiken het Delta-formaat in onze opslaglaag. De Delta-connector van dlt is gemarkeerd als experimenteel, dus de samenvoegstrategie werkte niet standaard. We moesten oplossingen vinden – in sommige gevallen gebruikten we dat ook replace in plaats van merge (het opofferen van incrementaliteit), en in andere schreven we maatwerk processing_steps.

Trino’s beperkte fouttolerantie. Trino heeft een fouttolerantiemechanisme, maar het werkt door tussenresultaten naar S3 te schrijven. Bij onze datavolumes op terabyteschaal is dit onpraktisch; het enorme aantal S3-bewerkingen maakt het onbetaalbaar. Als fouttolerantie niet is ingeschakeld, mislukt de hele query als een Trino-werknemer uitvalt. Spark daarentegen herstart alleen de mislukte taak. We hebben dit aangepakt met nieuwe pogingen op DAG-niveau en door zware modellen op te splitsen in ketens van tussenliggende modellen.

UDF’s en aangepaste logica. In Spark kun je aangepaste logica in Python rechtstreeks in de pijplijn schrijven – superhandig. Met de nieuwe architectuur is dit veel moeilijker. dbt bovenop Trino helpt niet: Jinja genereert alleen SQL, en de Python-modellen van dbt werken alleen met Snowflake, Databricks en BigQuery. Je kunt UDF’s schrijven in Trino, maar alleen in Java, met alle overhead die dat met zich meebrengt: een afzonderlijke opslagplaats, een build-pijplijn, het inzetten van JAR’s voor alle werkers. Dus als een transformatie niet in SQL past, krijg je óf een onhoudbaar SQL-monster óf een op zichzelf staand script dat de lijn doorbreekt.

Wat nu: tests, modelsjablonen en training

Beter testen. We hebben solide pijplijntests gehad in PySpark, maar de nieuwe architectuur is nog steeds bezig met een inhaalslag. Recente dbt-versies introduceerden unit-tests: u kunt nu de SQL-modellogica valideren aan de hand van nepgegevens zonder de volledige pijplijn op gang te brengen. Zowel op modelniveau als als aparte monitoringlaag willen we dbt-testen toevoegen.

Herbruikbare sjablonen voor veelgebruikte patronen. Veel van onze dbt-modellen lijken op elkaar. Eén enkele configuratie zou een tiental modellen met hetzelfde patroon kunnen beschrijven; alleen de brontabel en filters verschillen. We zijn van plan de gedeelde logica in dbt-macro’s te extraheren.

Het uitbreiden van de gebruikersbasis van het platform. We willen dat meer engineers en analisten zelfstandig met data kunnen werken. We plannen regelmatig interne trainingssessies, documentatie en introductiehandleidingen, zodat nieuwe gebruikers snel aan de slag kunnen en hun eigen modellen kunnen gaan bouwen.

Als uw team vastzit in dezelfde lus van ‘analisten wachten op ontwikkelaars’, zou ik graag willen horen hoe u dit oplost. Connecteer met mij op LinkedIn en laten we de notities vergelijken.


Alle afbeeldingen in dit artikel zijn van de auteur, tenzij anders vermeld.

Nieuwsbron

LAAT EEN REACTIE ACHTER

Vul alstublieft uw commentaar in!
Vul hier uw naam in