Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 60 additions & 79 deletions docs/arquitetura/fluxo-de-dados.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,53 @@
# Fluxo de Dados

## Pipeline Diário
## Pipeline

O pipeline de dados é executado diariamente às **4AM UTC** (1AM Brasília) via GitHub Actions.
O pipeline de dados é composto por dois estágios independentes:

1. **Scraping** (repo `scraper`): Via Airflow DAGs, a cada 15 minutos
2. **Enrichment** (repo `data-platform`): Via GitHub Actions, diário às 4AM UTC

### Diagrama de Sequência Completo

```mermaid
sequenceDiagram
participant GH as GitHub Actions
participant SC as Scraper Container
participant AF as Airflow DAGs
participant API as Scraper API (Cloud Run)
participant GOV as Sites gov.br
participant EBC as Sites EBC
participant PG as PostgreSQL
participant GH as GitHub Actions
participant CF as Cogfy API
participant EMB as Embeddings API
participant TS as Typesense
participant AF as Airflow (6AM UTC)
participant AF2 as Airflow (6AM UTC)
participant HF as HuggingFace

rect rgb(227, 242, 253)
Note over GH,SC: ETAPA 1: Scraping gov.br
GH->>SC: Trigger main-workflow (4AM UTC)
SC->>GOV: Requisições HTTP (160+ sites)
GOV-->>SC: HTML das páginas
SC->>SC: Parse HTML → Markdown
SC->>SC: Gera unique_id (MD5)
SC->>PG: postgres.insert(new_articles)
Note over AF,API: ETAPA 1: Scraping (a cada 15min, via Airflow)
AF->>API: POST /scrape/agencies
API->>GOV: Requisições HTTP (~155 sites)
GOV-->>API: HTML das páginas
API->>API: Parse HTML → Markdown
API->>API: Gera unique_id (MD5)
API->>PG: postgres.insert(new_articles)
end

rect rgb(255, 243, 224)
Note over SC,EBC: ETAPA 2: Scraping EBC
SC->>EBC: Requisições HTTP (sites EBC)
EBC-->>SC: HTML das páginas
SC->>SC: Parse especializado EBC
SC->>PG: postgres.insert(ebc_articles, allow_update=True)
Note over AF,EBC: ETAPA 2: Scraping EBC (a cada 15min)
AF->>API: POST /scrape/ebc
API->>EBC: Requisições HTTP (sites EBC)
EBC-->>API: HTML das páginas
API->>API: Parse especializado EBC
API->>PG: postgres.insert(ebc_articles, allow_update=True)
end

rect rgb(255, 253, 231)
Note over SC,CF: ETAPA 3: Upload para Cogfy
SC->>PG: get_news(date_range)
PG-->>SC: Registros
SC->>CF: POST /records (batch 1000)
CF-->>SC: IDs dos registros
Note over GH,CF: ETAPA 3: Upload para Cogfy (diário 4AM UTC)
GH->>PG: get_news(date_range)
PG-->>GH: Registros
GH->>CF: POST /records (batch 1000)
CF-->>GH: IDs dos registros
end

rect rgb(232, 245, 233)
Expand All @@ -53,79 +58,58 @@ sequenceDiagram
end

rect rgb(252, 228, 236)
Note over SC,PG: ETAPA 5: Enriquecimento
SC->>CF: GET /records (busca por unique_id)
CF-->>SC: themes + summary
SC->>SC: Mapeia códigos → labels
SC->>SC: Calcula most_specific_theme
SC->>PG: postgres.update(enriched_data)
Note over GH,PG: ETAPA 5: Enriquecimento
GH->>CF: GET /records (busca por unique_id)
CF-->>GH: themes + summary
GH->>GH: Mapeia códigos → labels
GH->>GH: Calcula most_specific_theme
GH->>PG: postgres.update(enriched_data)
end

rect rgb(255, 248, 225)
Note over SC,EMB: ETAPA 6: Embeddings
SC->>PG: get_news_without_embeddings()
PG-->>SC: Notícias sem vetores
SC->>EMB: POST /embed (batch 100)
EMB-->>SC: Vetores 768-dim
SC->>PG: postgres.update(embeddings)
Note over GH,EMB: ETAPA 6: Embeddings
GH->>PG: get_news_without_embeddings()
PG-->>GH: Notícias sem vetores
GH->>EMB: POST /embed (batch 100)
EMB-->>GH: Vetores 768-dim
GH->>PG: postgres.update(embeddings)
end

rect rgb(243, 229, 245)
Note over TS,PG: ETAPA 7: Indexação Typesense
GH->>TS: Trigger typesense-sync
TS->>PG: iter_news_for_typesense()
PG-->>TS: Batches de 5000
TS->>TS: Upsert documentos
GH->>PG: iter_news_for_typesense()
PG-->>GH: Batches de 5000
GH->>TS: Upsert documentos
end

rect rgb(225, 245, 254)
Note over AF,HF: ETAPA 8: Sync HuggingFace
AF->>PG: Query novos registros
PG-->>AF: Registros do dia anterior
AF->>AF: Cria parquet shard
AF->>HF: Upload shard
Note over AF2,HF: ETAPA 8: Sync HuggingFace
AF2->>PG: Query novos registros
PG-->>AF2: Registros do dia anterior
AF2->>AF2: Cria parquet shard
AF2->>HF: Upload shard
end
```

## Etapas Detalhadas

### Etapa 1: Scraping gov.br

**Workflow**: `main-workflow.yaml` → job `scraper`
**Repo**: `scraper` — via Airflow DAGs (a cada 15 min)

```bash
data-platform scrape --start-date YYYY-MM-DD --end-date YYYY-MM-DD
```
- ~158 DAGs dinâmicas chamam `POST /scrape/agencies` na Scraper API (Cloud Run)
- Cada agência é raspada independentemente
- Parse HTML → Markdown, gera `unique_id = MD5(agency + published_at + title)`
- Insert direto no PostgreSQL

**Processo**:

1. Carrega URLs de `src/data_platform/scrapers/site_urls.yaml` (~160+ URLs)
2. Para cada URL, instancia `WebScraper`
3. Navega por páginas com paginação (`?b_start:int=N`)
4. Extrai campos: title, date, url, image, category, tags
5. Faz fetch do conteúdo completo de cada notícia
6. Converte HTML → Markdown com `markdownify`
7. Gera `unique_id = MD5(agency + published_at + title)`
8. Insere no PostgreSQL via `PostgresManager.insert()`

**Retry Logic**:
```python
@retry(tries=5, delay=2, backoff=3, jitter=(1,3))
def fetch_page(url): ...
```
→ Veja [Módulo Scraper](../modulos/scraper.md) para detalhes.

### Etapa 2: Scraping EBC

**Workflow**: `main-workflow.yaml` → job `ebc-scraper`

```bash
data-platform scrape-ebc --start-date YYYY-MM-DD --end-date YYYY-MM-DD --allow-update
```

**Diferenças**:
**Repo**: `scraper` — via Airflow DAG `scrape_ebc`

- DAG chama `POST /scrape/ebc` na Scraper API
- Scraper especializado (`EBCWebScraper`)
- Estrutura HTML diferente dos sites gov.br
- `allow_update=True` permite sobrescrever registros existentes

### Etapa 3: Upload para Cogfy
Expand Down Expand Up @@ -303,15 +287,12 @@ data-platform sync-typesense --start-date YYYY-MM-DD

## Execução Manual

### Scraping de período específico
### Enrichment de período específico
```bash
# Via CLI
data-platform scrape --start-date 2024-01-01 --end-date 2024-01-31

# Via GitHub Actions
gh workflow run main-workflow.yaml \
-f start-date=2024-01-01 \
-f end-date=2024-01-31
# Via GitHub Actions (enrichment pipeline)
gh workflow run main-workflow.yaml -R destaquesgovbr/data-platform \
-f start_date=2024-01-01 \
-f end_date=2024-01-31
```

### Enriquecimento manual
Expand Down
78 changes: 12 additions & 66 deletions docs/modulos/data-platform.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ Repositório centralizado para toda a infraestrutura de dados do DestaquesGovBr.

## Visão Geral

O **data-platform** unifica toda a lógica de dados que anteriormente estava distribuída em múltiplos repositórios (`scraper`, `typesense`). Ele é responsável por:
O **data-platform** é responsável por enriquecimento, embeddings e armazenamento de dados do DestaquesGovBr. A coleta (scraping) é feita pelo repo standalone [scraper](https://github.com/destaquesgovbr/scraper).

- **Coleta**: Scrapers para gov.br e EBC
- **Armazenamento**: Gerenciamento do PostgreSQL (fonte de verdade) e HuggingFace (distribuição)
- **Enriquecimento**: Integração com Cogfy para classificação temática e sumários
- **Embeddings**: Geração de vetores para busca semântica
Expand All @@ -19,27 +18,27 @@ O **data-platform** unifica toda a lógica de dados que anteriormente estava dis

```mermaid
graph TB
subgraph "Coleta"
S1[Scraper Gov.br]
S2[Scraper EBC]
subgraph "Coleta (repo scraper)"
S[Scraper API<br/>Cloud Run]
DAG_S[DAGs Airflow<br/>~158 agências + EBC]
end

subgraph "Armazenamento"
PG[(PostgreSQL<br/>Fonte de Verdade)]
HF[(HuggingFace<br/>Dados Abertos)]
end

subgraph "Processamento"
subgraph "Processamento (data-platform)"
COG[Cogfy<br/>Enriquecimento]
EMB[Embeddings API<br/>Vetores 768-dim]
end

subgraph "Indexação"
subgraph "Indexação (data-platform)"
TS[Typesense<br/>Busca]
end

S1 --> PG
S2 --> PG
DAG_S -->|HTTP POST| S
S -->|INSERT| PG
PG --> COG
COG --> PG
PG --> EMB
Expand All @@ -56,10 +55,7 @@ data-platform/
│ ├── managers/ # Gerenciadores de storage
│ │ ├── postgres_manager.py # Acesso ao PostgreSQL
│ │ ├── dataset_manager.py # Acesso ao HuggingFace
│ │ └── storage_adapter.py # Abstração dual-write
│ ├── scrapers/ # Scrapers de notícias
│ │ ├── scrape_manager.py # Gov.br
│ │ └── ebc_scrape_manager.py
│ │ └── storage_adapter.py
│ ├── cogfy/ # Integração Cogfy
│ │ ├── cogfy_manager.py
│ │ ├── upload_manager.py
Expand All @@ -69,8 +65,9 @@ data-platform/
│ │ ├── collection.py
│ │ └── indexer.py
│ ├── jobs/ # Jobs de processamento
│ │ ├── enrichment/
│ │ ├── embeddings/
│ │ ├── typesense/sync_job.py
│ │ ├── embeddings/embedding_generator.py
│ │ └── hf_sync/
│ ├── models/ # Modelos Pydantic
│ │ └── news.py
Expand All @@ -84,16 +81,6 @@ data-platform/

## CLI - Comandos Disponíveis

### Scraping

```bash
# Raspar sites gov.br
data-platform scrape --start-date 2025-01-01 --end-date 2025-01-31

# Raspar sites EBC (Agência Brasil, TV Brasil)
data-platform scrape-ebc --start-date 2025-01-01
```

### Enriquecimento

```bash
Expand Down Expand Up @@ -134,43 +121,6 @@ data-platform typesense-delete --confirm
data-platform sync-hf --start-date 2025-01-01
```

## Storage Adapter

O **StorageAdapter** é a camada de abstração que permite transição gradual entre backends de armazenamento.

### Modos de Operação

| Modo | Descrição |
|------|-----------|
| `POSTGRES` | Escreve apenas no PostgreSQL |
| `HUGGINGFACE` | Escreve apenas no HuggingFace (legado) |
| `DUAL_WRITE` | Escreve em ambos para transição segura |

### Configuração

```bash
# Variáveis de ambiente
STORAGE_BACKEND=postgres # postgres | huggingface | dual_write
STORAGE_READ_FROM=postgres # De onde ler os dados
```

### Uso no Código

```python
from data_platform.managers.storage_adapter import StorageAdapter, StorageBackend

adapter = StorageAdapter(
backend=StorageBackend.DUAL_WRITE,
read_from=StorageBackend.POSTGRES
)

# Insert transparente - escreve em ambos
inserted = adapter.insert(news_list, allow_update=False)

# Read - sempre do backend configurado em read_from
news = adapter.get(filters={"agency_key": "mec"}, limit=100)
```

## PostgresManager

Gerenciador de acesso ao PostgreSQL com connection pooling e cache.
Expand Down Expand Up @@ -226,17 +176,13 @@ COGFY_COLLECTION_ID=xxx
# Embeddings
EMBEDDINGS_API_URL=https://embeddings-api-xxx.run.app
EMBEDDINGS_API_KEY=xxx

# Storage
STORAGE_BACKEND=postgres
STORAGE_READ_FROM=postgres
```

## Workflows GitHub Actions

| Workflow | Trigger | Descrição |
|----------|---------|-----------|
| `main-workflow.yaml` | Diário (4AM UTC) | Pipeline completo: scrape → enrich → embed → sync |
| `main-workflow.yaml` | Diário (4AM UTC) | Pipeline de enrichment: upload-cogfy → enrich → embed → sync |
| `typesense-maintenance-sync.yaml` | Diário (10AM UTC) | Sync incremental Typesense |
| `composer-deploy-dags.yaml` | Push | Deploy de DAGs no Airflow |

Expand Down
Loading