#!/usr/bin/env python3 """ Auditoria do ranking: recalcula A/B/C/D a partir das fontes (ES + Oracle PPG) para uma amostra e compara com a TB_RANKING_CONSULTOR. Saída: - Exibe divergências em tela - Gera CSV em backend/logs/auditoria_ranking.csv """ import asyncio import csv import os from pathlib import Path from typing import Dict, List, Tuple from dotenv import load_dotenv # Ajusta PATH para importar os módulos do backend ROOT_DIR = Path(__file__).resolve().parents[1] load_dotenv(ROOT_DIR / ".env") import sys sys.path.insert(0, str(ROOT_DIR)) from src.infrastructure.elasticsearch.client import ElasticsearchClient # noqa: E402 from src.infrastructure.oracle.client import OracleClient # noqa: E402 from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl # noqa: E402 def _conectar_oracle_local() -> OracleClient: client = OracleClient( user=os.getenv("ORACLE_LOCAL_USER", "local123"), password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"), dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"), ) client.connect() return client def _conectar_oracle_remoto() -> OracleClient: client = OracleClient( user=os.getenv("ORACLE_REMOTE_USER"), password=os.getenv("ORACLE_REMOTE_PASSWORD"), dsn=os.getenv("ORACLE_REMOTE_DSN"), ) client.connect() return client def _amostra_ids(oracle_local: OracleClient) -> List[Tuple[int, int]]: """ Retorna uma amostra de IDs: top 20, meio (100k..) e cauda (últimos 20). """ amostra: List[Tuple[int, int]] = [] consultas = [ "SELECT ID_PESSOA, POSICAO FROM TB_RANKING_CONSULTOR WHERE POSICAO <= 20", """ SELECT ID_PESSOA, POSICAO FROM ( SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO) RN FROM TB_RANKING_CONSULTOR ) WHERE RN BETWEEN 100000 AND 100020 """, """ SELECT ID_PESSOA, POSICAO FROM ( SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO DESC) RN FROM TB_RANKING_CONSULTOR ) WHERE RN <= 20 """, ] for q in consultas: for r in oracle_local.executar_query(q): amostra.append((int(r["ID_PESSOA"]), int(r["POSICAO"] or 0))) # Remove duplicados mantendo ordem vistos = set() final = [] for item in amostra: if item[0] not in vistos: vistos.add(item[0]) final.append(item) return final def _mapa_ranking_db(oracle_local: OracleClient, ids: List[int]) -> Dict[int, dict]: if not ids: return {} placeholders = ", ".join(f":id{i}" for i in range(len(ids))) params = {f"id{i}": val for i, val in enumerate(ids)} q = f""" SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D FROM TB_RANKING_CONSULTOR WHERE ID_PESSOA IN ({placeholders}) """ mapa = {} for r in oracle_local.executar_query(q, params): mapa[int(r["ID_PESSOA"])] = { "posicao": int(r["POSICAO"] or 0) if r["POSICAO"] is not None else None, "total": float(r["PONTUACAO_TOTAL"] or 0), "a": float(r["COMPONENTE_A"] or 0), "b": float(r["COMPONENTE_B"] or 0), "c": float(r["COMPONENTE_C"] or 0), "d": float(r["COMPONENTE_D"] or 0), } return mapa async def auditar(): oracle_local = _conectar_oracle_local() oracle_remoto = _conectar_oracle_remoto() es_client = ElasticsearchClient( url=os.getenv("ES_URL", "http://localhost:9200"), index=os.getenv("ES_INDEX", "atuacapes"), user=os.getenv("ES_USER", ""), password=os.getenv("ES_PASSWORD", ""), ) await es_client.connect() repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=oracle_remoto) ids_posicoes = _amostra_ids(oracle_local) ids = [i for i, _ in ids_posicoes] mapa_db = _mapa_ranking_db(oracle_local, ids) resultados = [] for id_pessoa, posicao_db in ids_posicoes: try: consultor = await repo.buscar_por_id(id_pessoa) if not consultor: resultados.append({ "id": id_pessoa, "pos_db": posicao_db, "pos_calc": None, "total_db": mapa_db.get(id_pessoa, {}).get("total", 0), "total_calc": None, "delta": None, "obs": "consultor não encontrado no ES" }) continue total_calc = consultor.pontuacao_total comp = consultor.pontuacao db_vals = mapa_db.get(id_pessoa, {}) resultados.append({ "id": id_pessoa, "pos_db": posicao_db, "pos_calc": None, # posição calculada precisa de ordenação global; deixamos None "total_db": db_vals.get("total", 0), "total_calc": total_calc, "delta": round(total_calc - db_vals.get("total", 0), 2), "comp_a_db": db_vals.get("a", 0), "comp_a_calc": comp.componente_a.total, "comp_b_db": db_vals.get("b", 0), "comp_b_calc": comp.componente_b.total, "comp_c_db": db_vals.get("c", 0), "comp_c_calc": comp.componente_c.total, "comp_d_db": db_vals.get("d", 0), "comp_d_calc": comp.componente_d.total, "obs": "", }) except Exception as e: resultados.append({ "id": id_pessoa, "pos_db": posicao_db, "pos_calc": None, "total_db": mapa_db.get(id_pessoa, {}).get("total", 0), "total_calc": None, "delta": None, "obs": f"erro: {e}", }) # Exporta CSV logs_dir = ROOT_DIR / "logs" logs_dir.mkdir(exist_ok=True) csv_path = logs_dir / "auditoria_ranking.csv" campos = [ "id", "pos_db", "pos_calc", "total_db", "total_calc", "delta", "comp_a_db", "comp_a_calc", "comp_b_db", "comp_b_calc", "comp_c_db", "comp_c_calc", "comp_d_db", "comp_d_calc", "obs" ] with csv_path.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=campos) writer.writeheader() for r in resultados: writer.writerow(r) # Mostra divergências relevantes divergentes = [r for r in resultados if r.get("delta") and abs(r["delta"]) > 0.01] print(f"Amostra auditada: {len(resultados)} IDs") print(f"Divergências (delta != 0): {len(divergentes)}") if divergentes: for r in divergentes[:10]: print( f"ID {r['id']} delta {r['delta']} | " f"A {r['comp_a_db']}->{r['comp_a_calc']} " f"B {r['comp_b_db']}->{r['comp_b_calc']} " f"C {r['comp_c_db']}->{r['comp_c_calc']} " f"D {r['comp_d_db']}->{r['comp_d_calc']} " f"{r.get('obs','')}" ) if len(divergentes) > 10: print(f"... +{len(divergentes)-10} divergências (veja {csv_path})") else: print("Nenhuma divergência encontrada na amostra.") print(f"CSV salvo em: {csv_path}") await es_client.close() if __name__ == "__main__": asyncio.run(auditar())