-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
92 lines (74 loc) · 3.1 KB
/
main.py
File metadata and controls
92 lines (74 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from typing import Annotated
import datetime
import pandas as pd
from fastapi import Depends, FastAPI, HTTPException, Query, File, UploadFile
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Company(SQLModel, table=True):
id: int = Field(primary_key=True, default=None)
cnpj: str = Field(nullable=False)
denom_social: str = Field(nullable=False)
sit: str = Field(nullable=False)
updated_at: datetime.datetime = Field(default_factory=datetime.datetime.utcnow, nullable=False)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_session)]
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.get("/")
def read_root():
return {"message": "API está funcionando"}
@app.get("/companies/")
def read_companies(
session: SessionDep,
date: datetime.datetime,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100
) -> list[Company]:
companies = session.exec(select(Company).where(Company.updated_at == date).offset(offset).limit(limit)).all()
return companies
@app.post("/companies/upload/")
async def upload_companies(session: SessionDep, file: UploadFile = File(...)) -> str:
try:
# Tente ler o conteúdo do arquivo CSV
df = pd.read_csv(file.file, sep=";", encoding='latin1', on_bad_lines='skip') # Ignora linhas com erro
except pd.errors.EmptyDataError:
raise HTTPException(status_code=400, detail="O arquivo está vazio.")
except pd.errors.ParserError as e:
raise HTTPException(status_code=400, detail=f"Erro de parsing do arquivo CSV: {str(e)}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Erro ao ler o arquivo CSV: {str(e)}")
# Selecionar apenas as colunas necessárias
required_columns = ['CNPJ_CIA', 'DENOM_SOCIAL', 'SIT']
if not all(col in df.columns for col in required_columns):
raise HTTPException(status_code=400, detail="Colunas obrigatórias não encontradas no CSV.")
# Filtra apenas as colunas desejadas
filtered_df = df[required_columns].rename(columns={
'CNPJ_CIA': 'cnpj',
'DENOM_SOCIAL': 'denom_social',
'SIT': 'sit'
})
# Adiciona a data de consumo e insere os dados no banco
for _, row in filtered_df.iterrows():
company = Company(
cnpj=row['cnpj'],
denom_social=row['denom_social'],
sit=row['sit'],
updated_at=datetime.datetime.utcnow().date()
)
# Verifica se o CNPJ já existe
existing_company = session.exec(select(Company).where(Company.cnpj == company.cnpj)).first()
if existing_company is None:
session.add(company)
else:
session.merge(company)
session.commit()
return "Arquivo carregado com sucesso"