Adiciona script de auditoria do ranking
This commit is contained in:
212
backend/scripts/auditar_ranking.py
Normal file
212
backend/scripts/auditar_ranking.py
Normal file
@@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Auditoria do ranking: recalcula A/B/C/D a partir das fontes (ES + Oracle PPG)
|
||||
para uma amostra e compara com a TB_RANKING_CONSULTOR.
|
||||
|
||||
Saída:
|
||||
- Exibe divergências em tela
|
||||
- Gera CSV em backend/logs/auditoria_ranking.csv
|
||||
"""
|
||||
import asyncio
|
||||
import csv
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Ajusta PATH para importar os módulos do backend
|
||||
ROOT_DIR = Path(__file__).resolve().parents[1]
|
||||
load_dotenv(ROOT_DIR / ".env")
|
||||
import sys
|
||||
sys.path.insert(0, str(ROOT_DIR))
|
||||
|
||||
from src.infrastructure.elasticsearch.client import ElasticsearchClient # noqa: E402
|
||||
from src.infrastructure.oracle.client import OracleClient # noqa: E402
|
||||
from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl # noqa: E402
|
||||
|
||||
|
||||
def _conectar_oracle_local() -> OracleClient:
|
||||
client = OracleClient(
|
||||
user=os.getenv("ORACLE_LOCAL_USER", "local123"),
|
||||
password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"),
|
||||
dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"),
|
||||
)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
|
||||
def _conectar_oracle_remoto() -> OracleClient:
|
||||
client = OracleClient(
|
||||
user=os.getenv("ORACLE_REMOTE_USER"),
|
||||
password=os.getenv("ORACLE_REMOTE_PASSWORD"),
|
||||
dsn=os.getenv("ORACLE_REMOTE_DSN"),
|
||||
)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
|
||||
def _amostra_ids(oracle_local: OracleClient) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Retorna uma amostra de IDs: top 20, meio (100k..) e cauda (últimos 20).
|
||||
"""
|
||||
amostra: List[Tuple[int, int]] = []
|
||||
consultas = [
|
||||
"SELECT ID_PESSOA, POSICAO FROM TB_RANKING_CONSULTOR WHERE POSICAO <= 20",
|
||||
"""
|
||||
SELECT ID_PESSOA, POSICAO FROM (
|
||||
SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO) RN
|
||||
FROM TB_RANKING_CONSULTOR
|
||||
) WHERE RN BETWEEN 100000 AND 100020
|
||||
""",
|
||||
"""
|
||||
SELECT ID_PESSOA, POSICAO FROM (
|
||||
SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO DESC) RN
|
||||
FROM TB_RANKING_CONSULTOR
|
||||
) WHERE RN <= 20
|
||||
""",
|
||||
]
|
||||
for q in consultas:
|
||||
for r in oracle_local.executar_query(q):
|
||||
amostra.append((int(r["ID_PESSOA"]), int(r["POSICAO"] or 0)))
|
||||
|
||||
# Remove duplicados mantendo ordem
|
||||
vistos = set()
|
||||
final = []
|
||||
for item in amostra:
|
||||
if item[0] not in vistos:
|
||||
vistos.add(item[0])
|
||||
final.append(item)
|
||||
return final
|
||||
|
||||
|
||||
def _mapa_ranking_db(oracle_local: OracleClient, ids: List[int]) -> Dict[int, dict]:
|
||||
if not ids:
|
||||
return {}
|
||||
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
|
||||
params = {f"id{i}": val for i, val in enumerate(ids)}
|
||||
q = f"""
|
||||
SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D
|
||||
FROM TB_RANKING_CONSULTOR
|
||||
WHERE ID_PESSOA IN ({placeholders})
|
||||
"""
|
||||
mapa = {}
|
||||
for r in oracle_local.executar_query(q, params):
|
||||
mapa[int(r["ID_PESSOA"])] = {
|
||||
"posicao": int(r["POSICAO"] or 0) if r["POSICAO"] is not None else None,
|
||||
"total": float(r["PONTUACAO_TOTAL"] or 0),
|
||||
"a": float(r["COMPONENTE_A"] or 0),
|
||||
"b": float(r["COMPONENTE_B"] or 0),
|
||||
"c": float(r["COMPONENTE_C"] or 0),
|
||||
"d": float(r["COMPONENTE_D"] or 0),
|
||||
}
|
||||
return mapa
|
||||
|
||||
|
||||
async def auditar():
|
||||
oracle_local = _conectar_oracle_local()
|
||||
oracle_remoto = _conectar_oracle_remoto()
|
||||
|
||||
es_client = ElasticsearchClient(
|
||||
url=os.getenv("ES_URL", "http://localhost:9200"),
|
||||
index=os.getenv("ES_INDEX", "atuacapes"),
|
||||
user=os.getenv("ES_USER", ""),
|
||||
password=os.getenv("ES_PASSWORD", ""),
|
||||
)
|
||||
await es_client.connect()
|
||||
|
||||
repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=oracle_remoto)
|
||||
|
||||
ids_posicoes = _amostra_ids(oracle_local)
|
||||
ids = [i for i, _ in ids_posicoes]
|
||||
mapa_db = _mapa_ranking_db(oracle_local, ids)
|
||||
|
||||
resultados = []
|
||||
|
||||
for id_pessoa, posicao_db in ids_posicoes:
|
||||
try:
|
||||
consultor = await repo.buscar_por_id(id_pessoa)
|
||||
if not consultor:
|
||||
resultados.append({
|
||||
"id": id_pessoa,
|
||||
"pos_db": posicao_db,
|
||||
"pos_calc": None,
|
||||
"total_db": mapa_db.get(id_pessoa, {}).get("total", 0),
|
||||
"total_calc": None,
|
||||
"delta": None,
|
||||
"obs": "consultor não encontrado no ES"
|
||||
})
|
||||
continue
|
||||
|
||||
total_calc = consultor.pontuacao_total
|
||||
comp = consultor.pontuacao
|
||||
db_vals = mapa_db.get(id_pessoa, {})
|
||||
|
||||
resultados.append({
|
||||
"id": id_pessoa,
|
||||
"pos_db": posicao_db,
|
||||
"pos_calc": None, # posição calculada precisa de ordenação global; deixamos None
|
||||
"total_db": db_vals.get("total", 0),
|
||||
"total_calc": total_calc,
|
||||
"delta": round(total_calc - db_vals.get("total", 0), 2),
|
||||
"comp_a_db": db_vals.get("a", 0),
|
||||
"comp_a_calc": comp.componente_a.total,
|
||||
"comp_b_db": db_vals.get("b", 0),
|
||||
"comp_b_calc": comp.componente_b.total,
|
||||
"comp_c_db": db_vals.get("c", 0),
|
||||
"comp_c_calc": comp.componente_c.total,
|
||||
"comp_d_db": db_vals.get("d", 0),
|
||||
"comp_d_calc": comp.componente_d.total,
|
||||
"obs": "",
|
||||
})
|
||||
except Exception as e:
|
||||
resultados.append({
|
||||
"id": id_pessoa,
|
||||
"pos_db": posicao_db,
|
||||
"pos_calc": None,
|
||||
"total_db": mapa_db.get(id_pessoa, {}).get("total", 0),
|
||||
"total_calc": None,
|
||||
"delta": None,
|
||||
"obs": f"erro: {e}",
|
||||
})
|
||||
|
||||
# Exporta CSV
|
||||
logs_dir = ROOT_DIR / "logs"
|
||||
logs_dir.mkdir(exist_ok=True)
|
||||
csv_path = logs_dir / "auditoria_ranking.csv"
|
||||
campos = [
|
||||
"id", "pos_db", "pos_calc", "total_db", "total_calc", "delta",
|
||||
"comp_a_db", "comp_a_calc", "comp_b_db", "comp_b_calc",
|
||||
"comp_c_db", "comp_c_calc", "comp_d_db", "comp_d_calc", "obs"
|
||||
]
|
||||
with csv_path.open("w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=campos)
|
||||
writer.writeheader()
|
||||
for r in resultados:
|
||||
writer.writerow(r)
|
||||
|
||||
# Mostra divergências relevantes
|
||||
divergentes = [r for r in resultados if r.get("delta") and abs(r["delta"]) > 0.01]
|
||||
print(f"Amostra auditada: {len(resultados)} IDs")
|
||||
print(f"Divergências (delta != 0): {len(divergentes)}")
|
||||
if divergentes:
|
||||
for r in divergentes[:10]:
|
||||
print(
|
||||
f"ID {r['id']} delta {r['delta']} | "
|
||||
f"A {r['comp_a_db']}->{r['comp_a_calc']} "
|
||||
f"B {r['comp_b_db']}->{r['comp_b_calc']} "
|
||||
f"C {r['comp_c_db']}->{r['comp_c_calc']} "
|
||||
f"D {r['comp_d_db']}->{r['comp_d_calc']} "
|
||||
f"{r.get('obs','')}"
|
||||
)
|
||||
if len(divergentes) > 10:
|
||||
print(f"... +{len(divergentes)-10} divergências (veja {csv_path})")
|
||||
else:
|
||||
print("Nenhuma divergência encontrada na amostra.")
|
||||
print(f"CSV salvo em: {csv_path}")
|
||||
|
||||
await es_client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(auditar())
|
||||
Reference in New Issue
Block a user