Skip to content

Feat/prefect3 flows migration#1553

Draft
folhesgabriel wants to merge 57 commits into
feat/prefect3from
feat/prefect3-flows-migration
Draft

Feat/prefect3 flows migration#1553
folhesgabriel wants to merge 57 commits into
feat/prefect3from
feat/prefect3-flows-migration

Conversation

@folhesgabriel
Copy link
Copy Markdown
Collaborator

@folhesgabriel folhesgabriel commented May 21, 2026

Centralizar a migração de flows do prefect 0.15 p/ 3.0

Summary by CodeRabbit

  • New Features

    • Migrated pipeline flows to Prefect 3 orchestration platform with automated deployments via GitHub Actions
    • Added Docker containerization for Prefect 3 workers with system and Python dependencies pre-installed
    • Enabled automated flow deployment triggered on code changes to pipelines
  • Documentation

    • Added comprehensive migration guide documenting flow updates and best practices
    • Created tracking documentation for migration progress
  • Chores

    • Updated Python version support to 3.10–3.12
    • Updated project dependencies including Prefect 3, dbt-core 1.8+, and Pydantic 2.0+

Review Change Stack

- ibge_inflacao/_run_ibge_inflacao: gate is_outdated check behind force_run
- br_ibge_ipca, br_ibge_inpc factories: propagate force_run
- guia de migração: documenta o padrão e a ressalva de verbosidade
- flows migrados: marca os 8 flows IBGE como pendentes de re-deploy
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 21, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cfba81c4-af38-4c2a-9ae9-21084eea4205

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR executes a comprehensive migration of the Prefect orchestration framework from 0.x/2.x to Prefect 3. Changes span dependency updates, utilities refactoring into focused modules, deployment infrastructure (Docker image, CI/CD workflows, deployment scripts), shared task updates, and individual flow migrations with renamed objects and inline scheduling.

Changes

Prefect 3 Migration

Layer / File(s) Summary
Dependency and Runtime Setup
pyproject.toml, Dockerfile.prefect3, entrypoint.sh, dbt_project.yml
Pinned dependencies updated to Prefect 3.x, dbt 1.8+, and Pydantic 2.0+. Container image based on prefecthq/prefect:3-python3.12 with Chrome, OCR, and dbt deps; startup entrypoint decodes base64 GCS credentials into dev/prod credential files.
Utilities Refactoring - Core Modules
pipelines/utils/__init__.py, pipelines/utils/vault.py, pipelines/utils/discord.py, pipelines/utils/gcs.py, pipelines/utils/utils.py
Monolithic utils.py split into single-responsibility modules: vault.py (Hashicorp client, secret reading), discord.py (webhook notifications with production gating), gcs.py (credentials, blob listing, header dumping, artifact uploading). Core utils.py retains logging, partition writing, dataframe cleaning; Prefect logger integration added.
Utilities Refactoring - Tasks and Metadata
pipelines/utils/tasks.py, pipelines/utils/execute_dbt_model/flows.py, pipelines/utils/metadata/tasks.py, pipelines/utils/metadata/utils.py
Shared Prefect 3 tasks refactored: get_credentials, rename_flow_run_dataset_table (async), upload_to_gcs, run_dbt (default "run/test", artifact upload in finally), download_data_to_gcs (simplified size-based policies); dbt/GCS flow task updated; metadata task retry config changed from max_retries/timedelta to retries/retry_delay_seconds.
Deployment Infrastructure
.github/scripts/deploy_flows.py, .github/workflows/build-docker-prefect3.yaml, .github/workflows/cd-prefect3.yaml, .github/workflows/cd-prefect3-staging.yaml, scripts/deploy_dbt_model_flow.py
Dynamic flow discovery and deployment script (deploy_flows.py) loads Python modules, extracts Prefect Flow objects, deploys via flow.from_source().deploy() with branch/pool selection and schedule handling. Docker build workflow computes image tags, pushes to GCR, updates Prefect 3 work pool config. Prod/staging CD workflows trigger on pipeline changes, invoking deploy script with API credentials.
Flow Migrations - BCB Datasets
pipelines/crawler/bcb_estban/{tasks.py,utils.py}, pipelines/datasets/br_bcb_estban/{flows.py,schedules.py}, pipelines/datasets/br_bcb_taxa_selic/flows.py
bcb_estban tasks updated with Prefect 3 retry params, safer empty-metadata handling, typed return types; utils import source switched to crawler path. Flows converted to @flow factories with inline deploy_schedules (cron + timezone), shared _run_* orchestration helpers, Discord failure notification, and force-run parameter for bypassing outdated checks. Flow object names changed to br_bcb_estban__* pattern; schedules.py removed.
Flow Migrations - IBGE Inflation Flows
pipelines/crawler/ibge_inflacao/{flows.py,utils.py}, pipelines/datasets/br_ibge_ipca/flows.py, pipelines/datasets/br_ibge_inpc/flows.py
Shared _run_ibge_inflacao orchestration helper factored out for IBGE flows; factory pattern _ipca_flow(table_id, cron) and _inpc_flow(...) generate per-table flows with inline schedule deployment. Task invocation method changed from .run() to .fn() in utils callsites. Flow object names changed to br_ibge_*__* double-underscore pattern.
Flow Migrations - Câmara and Test Flows
pipelines/datasets/br_camara_dados_abertos/flows.py, pipelines/test_prefect3.py
Câmara flow migrated for deputado table only (other tables removed from executable code, preserved as comments); implements URL check, download/delimiter conversion, dev/prod upload, dbt execution tasks within a single @flow. Simple test flow demonstrates Prefect 3 @task/@flow syntax.
Migration Documentation
Prefect 3 Guia de Migração de Flows.md, Prefect 3 Flows Migrados.md
Comprehensive guide covering utils refactoring, Prefect 0→3 construct mapping, canonical flow template, factory patterns, JWT/GraphQL metadata authentication, deployment commands, known pitfalls (task.fn vs task.run, import issues, env-var stalls). Tracking document lists migration criteria, flow status (deployed/tested dev/prod), and prioritized next-to-migrate list.

Sequence Diagrams

sequenceDiagram
  participant dev as Developer
  participant gh as GitHub
  participant deploy as deploy_flows.py
  participant prefect as Prefect API
  participant gcs as GCS/Storage

  gh->>gh: Push to main or feat/prefect3
  gh->>deploy: Trigger cd-prefect3 workflow
  deploy->>deploy: load_flows_from_file(path)
  deploy->>prefect: flow.from_source(repo, branch).deploy(pool_name, schedule)
  prefect->>gcs: Store flow definition
  prefect-->>deploy: deployment_id
  deploy-->>gh: Success/fail exit status
Loading
sequenceDiagram
  participant Flow as Prefect Flow
  participant Task as run_dbt task
  participant dbtRunner as dbtRunner
  participant GCS as GCS Artifacts

  Flow->>Task: run_dbt(dataset_id, model)
  Task->>Task: Validate dbt_command
  Task->>Task: Resolve model path
  Task->>dbtRunner: invoke(run_results_path, ...)
  dbtRunner-->>Task: InvocationResult
  Task->>GCS: Upload dbt artifacts (finally block)
  Task-->>Flow: Success or raise exception
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related issues

Suggested labels

check-metadata


🐰 Prefect 3 hops into the frame,
With tasks and flows in modern shape!
Utilities split, pipelines bright,
From zero-point-x to version-three-height! 🚀

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete and does not follow the required template structure. It lacks essential sections: nomeação com bracketed keywords, motivação/contexto, detalhes técnicos, testes/validações, riscos/mitigações, and dependências. Add the required template structure: use [Feature] or appropriate keyword in brackets, explain motivation/context, detail technical changes, specify testing status, identify risks and rollback plans, and list dependencies.
Docstring Coverage ⚠️ Warning Docstring coverage is 28.77% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'Feat/prefect3 flows migration' clearly summarizes the main change: migrating Prefect flows from version 0.15 to 3.0. It is concise, specific, and directly related to the primary objective of the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/prefect3-flows-migration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@folhesgabriel folhesgabriel marked this pull request as draft May 21, 2026 17:48
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented May 21, 2026

@folhesgabriel esse pull request tem conflitos 😩

@mergify mergify Bot added the conflict [PR] Conflito de merge a resolver label May 21, 2026
@folhesgabriel folhesgabriel changed the base branch from main to feat/prefect3 May 21, 2026 17:48
folhesgabriel and others added 21 commits May 21, 2026 15:28
- Refactor br_inmet_bdmep__microdados flow with canonical Prefect 3 template
  (force_run guard, on_failure Discord hook, deploy_schedules cron)
- Move tasks/utils/constants from pipelines/datasets/br_inmet_bdmep/ to
  pipelines/crawler/inmet_bdmep/ to avoid pipelines.datasets.__init__ poisoning
- Drop legacy schedules.py (replaced by flow.deploy_schedules)
- Add README to pipelines/crawler/ibge_inflacao explaining the IBGE SIDRA API
  request cap (100k values) and why force_run=false is required for inflation
  flows
- Update prefect3-flows-migrados.md: mark IPCA/INPC flows as Dev OK (validated
  via 4a only); add br_inmet_bdmep__microdados as Pronto p/ deploy
- Refactor to factory pattern over 4 tables (microdados, densidade_municipio,
  densidade_brasil, densidade_uf).
- Shared _run_anatel_banda_larga_fixa in pipelines/crawler/anatel/banda_larga_fixa/flows.py.
- Update tracking and runbook (dbt failures after staging upload are
  non-blocking for migration validation).
folhesgabriel and others added 29 commits May 25, 2026 08:21
…age to avoid 403 on requester-pays user_project
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

conflict [PR] Conflito de merge a resolver

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants