diff --git a/backend/scripts/auditar_ranking.py b/backend/scripts/auditar_ranking.py new file mode 100644 index 0000000..fb9c904 --- /dev/null +++ b/backend/scripts/auditar_ranking.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +""" +Auditoria do ranking: recalcula A/B/C/D a partir das fontes (ES + Oracle PPG) +para uma amostra e compara com a TB_RANKING_CONSULTOR. + +Saída: +- Exibe divergências em tela +- Gera CSV em backend/logs/auditoria_ranking.csv +""" +import asyncio +import csv +import os +from pathlib import Path +from typing import Dict, List, Tuple + +from dotenv import load_dotenv + +# Ajusta PATH para importar os módulos do backend +ROOT_DIR = Path(__file__).resolve().parents[1] +load_dotenv(ROOT_DIR / ".env") +import sys +sys.path.insert(0, str(ROOT_DIR)) + +from src.infrastructure.elasticsearch.client import ElasticsearchClient # noqa: E402 +from src.infrastructure.oracle.client import OracleClient # noqa: E402 +from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl # noqa: E402 + + +def _conectar_oracle_local() -> OracleClient: + client = OracleClient( + user=os.getenv("ORACLE_LOCAL_USER", "local123"), + password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"), + dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"), + ) + client.connect() + return client + + +def _conectar_oracle_remoto() -> OracleClient: + client = OracleClient( + user=os.getenv("ORACLE_REMOTE_USER"), + password=os.getenv("ORACLE_REMOTE_PASSWORD"), + dsn=os.getenv("ORACLE_REMOTE_DSN"), + ) + client.connect() + return client + + +def _amostra_ids(oracle_local: OracleClient) -> List[Tuple[int, int]]: + """ + Retorna uma amostra de IDs: top 20, meio (100k..) e cauda (últimos 20). + """ + amostra: List[Tuple[int, int]] = [] + consultas = [ + "SELECT ID_PESSOA, POSICAO FROM TB_RANKING_CONSULTOR WHERE POSICAO <= 20", + """ + SELECT ID_PESSOA, POSICAO FROM ( + SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO) RN + FROM TB_RANKING_CONSULTOR + ) WHERE RN BETWEEN 100000 AND 100020 + """, + """ + SELECT ID_PESSOA, POSICAO FROM ( + SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO DESC) RN + FROM TB_RANKING_CONSULTOR + ) WHERE RN <= 20 + """, + ] + for q in consultas: + for r in oracle_local.executar_query(q): + amostra.append((int(r["ID_PESSOA"]), int(r["POSICAO"] or 0))) + + # Remove duplicados mantendo ordem + vistos = set() + final = [] + for item in amostra: + if item[0] not in vistos: + vistos.add(item[0]) + final.append(item) + return final + + +def _mapa_ranking_db(oracle_local: OracleClient, ids: List[int]) -> Dict[int, dict]: + if not ids: + return {} + placeholders = ", ".join(f":id{i}" for i in range(len(ids))) + params = {f"id{i}": val for i, val in enumerate(ids)} + q = f""" + SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D + FROM TB_RANKING_CONSULTOR + WHERE ID_PESSOA IN ({placeholders}) + """ + mapa = {} + for r in oracle_local.executar_query(q, params): + mapa[int(r["ID_PESSOA"])] = { + "posicao": int(r["POSICAO"] or 0) if r["POSICAO"] is not None else None, + "total": float(r["PONTUACAO_TOTAL"] or 0), + "a": float(r["COMPONENTE_A"] or 0), + "b": float(r["COMPONENTE_B"] or 0), + "c": float(r["COMPONENTE_C"] or 0), + "d": float(r["COMPONENTE_D"] or 0), + } + return mapa + + +async def auditar(): + oracle_local = _conectar_oracle_local() + oracle_remoto = _conectar_oracle_remoto() + + es_client = ElasticsearchClient( + url=os.getenv("ES_URL", "http://localhost:9200"), + index=os.getenv("ES_INDEX", "atuacapes"), + user=os.getenv("ES_USER", ""), + password=os.getenv("ES_PASSWORD", ""), + ) + await es_client.connect() + + repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=oracle_remoto) + + ids_posicoes = _amostra_ids(oracle_local) + ids = [i for i, _ in ids_posicoes] + mapa_db = _mapa_ranking_db(oracle_local, ids) + + resultados = [] + + for id_pessoa, posicao_db in ids_posicoes: + try: + consultor = await repo.buscar_por_id(id_pessoa) + if not consultor: + resultados.append({ + "id": id_pessoa, + "pos_db": posicao_db, + "pos_calc": None, + "total_db": mapa_db.get(id_pessoa, {}).get("total", 0), + "total_calc": None, + "delta": None, + "obs": "consultor não encontrado no ES" + }) + continue + + total_calc = consultor.pontuacao_total + comp = consultor.pontuacao + db_vals = mapa_db.get(id_pessoa, {}) + + resultados.append({ + "id": id_pessoa, + "pos_db": posicao_db, + "pos_calc": None, # posição calculada precisa de ordenação global; deixamos None + "total_db": db_vals.get("total", 0), + "total_calc": total_calc, + "delta": round(total_calc - db_vals.get("total", 0), 2), + "comp_a_db": db_vals.get("a", 0), + "comp_a_calc": comp.componente_a.total, + "comp_b_db": db_vals.get("b", 0), + "comp_b_calc": comp.componente_b.total, + "comp_c_db": db_vals.get("c", 0), + "comp_c_calc": comp.componente_c.total, + "comp_d_db": db_vals.get("d", 0), + "comp_d_calc": comp.componente_d.total, + "obs": "", + }) + except Exception as e: + resultados.append({ + "id": id_pessoa, + "pos_db": posicao_db, + "pos_calc": None, + "total_db": mapa_db.get(id_pessoa, {}).get("total", 0), + "total_calc": None, + "delta": None, + "obs": f"erro: {e}", + }) + + # Exporta CSV + logs_dir = ROOT_DIR / "logs" + logs_dir.mkdir(exist_ok=True) + csv_path = logs_dir / "auditoria_ranking.csv" + campos = [ + "id", "pos_db", "pos_calc", "total_db", "total_calc", "delta", + "comp_a_db", "comp_a_calc", "comp_b_db", "comp_b_calc", + "comp_c_db", "comp_c_calc", "comp_d_db", "comp_d_calc", "obs" + ] + with csv_path.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=campos) + writer.writeheader() + for r in resultados: + writer.writerow(r) + + # Mostra divergências relevantes + divergentes = [r for r in resultados if r.get("delta") and abs(r["delta"]) > 0.01] + print(f"Amostra auditada: {len(resultados)} IDs") + print(f"Divergências (delta != 0): {len(divergentes)}") + if divergentes: + for r in divergentes[:10]: + print( + f"ID {r['id']} delta {r['delta']} | " + f"A {r['comp_a_db']}->{r['comp_a_calc']} " + f"B {r['comp_b_db']}->{r['comp_b_calc']} " + f"C {r['comp_c_db']}->{r['comp_c_calc']} " + f"D {r['comp_d_db']}->{r['comp_d_calc']} " + f"{r.get('obs','')}" + ) + if len(divergentes) > 10: + print(f"... +{len(divergentes)-10} divergências (veja {csv_path})") + else: + print("Nenhuma divergência encontrada na amostra.") + print(f"CSV salvo em: {csv_path}") + + await es_client.close() + + +if __name__ == "__main__": + asyncio.run(auditar())