213 lines
7.3 KiB
Python
213 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Auditoria do ranking: recalcula A/B/C/D a partir das fontes (ES + Oracle PPG)
|
|
para uma amostra e compara com a TB_RANKING_CONSULTOR.
|
|
|
|
Saída:
|
|
- Exibe divergências em tela
|
|
- Gera CSV em backend/logs/auditoria_ranking.csv
|
|
"""
|
|
import asyncio
|
|
import csv
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
# Ajusta PATH para importar os módulos do backend
|
|
ROOT_DIR = Path(__file__).resolve().parents[1]
|
|
load_dotenv(ROOT_DIR / ".env")
|
|
import sys
|
|
sys.path.insert(0, str(ROOT_DIR))
|
|
|
|
from src.infrastructure.elasticsearch.client import ElasticsearchClient # noqa: E402
|
|
from src.infrastructure.oracle.client import OracleClient # noqa: E402
|
|
from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl # noqa: E402
|
|
|
|
|
|
def _conectar_oracle_local() -> OracleClient:
|
|
client = OracleClient(
|
|
user=os.getenv("ORACLE_LOCAL_USER", "local123"),
|
|
password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"),
|
|
dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"),
|
|
)
|
|
client.connect()
|
|
return client
|
|
|
|
|
|
def _conectar_oracle_remoto() -> OracleClient:
|
|
client = OracleClient(
|
|
user=os.getenv("ORACLE_REMOTE_USER"),
|
|
password=os.getenv("ORACLE_REMOTE_PASSWORD"),
|
|
dsn=os.getenv("ORACLE_REMOTE_DSN"),
|
|
)
|
|
client.connect()
|
|
return client
|
|
|
|
|
|
def _amostra_ids(oracle_local: OracleClient) -> List[Tuple[int, int]]:
|
|
"""
|
|
Retorna uma amostra de IDs: top 20, meio (100k..) e cauda (últimos 20).
|
|
"""
|
|
amostra: List[Tuple[int, int]] = []
|
|
consultas = [
|
|
"SELECT ID_PESSOA, POSICAO FROM TB_RANKING_CONSULTOR WHERE POSICAO <= 20",
|
|
"""
|
|
SELECT ID_PESSOA, POSICAO FROM (
|
|
SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO) RN
|
|
FROM TB_RANKING_CONSULTOR
|
|
) WHERE RN BETWEEN 100000 AND 100020
|
|
""",
|
|
"""
|
|
SELECT ID_PESSOA, POSICAO FROM (
|
|
SELECT ID_PESSOA, POSICAO, ROW_NUMBER() OVER (ORDER BY POSICAO DESC) RN
|
|
FROM TB_RANKING_CONSULTOR
|
|
) WHERE RN <= 20
|
|
""",
|
|
]
|
|
for q in consultas:
|
|
for r in oracle_local.executar_query(q):
|
|
amostra.append((int(r["ID_PESSOA"]), int(r["POSICAO"] or 0)))
|
|
|
|
# Remove duplicados mantendo ordem
|
|
vistos = set()
|
|
final = []
|
|
for item in amostra:
|
|
if item[0] not in vistos:
|
|
vistos.add(item[0])
|
|
final.append(item)
|
|
return final
|
|
|
|
|
|
def _mapa_ranking_db(oracle_local: OracleClient, ids: List[int]) -> Dict[int, dict]:
|
|
if not ids:
|
|
return {}
|
|
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
|
|
params = {f"id{i}": val for i, val in enumerate(ids)}
|
|
q = f"""
|
|
SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D
|
|
FROM TB_RANKING_CONSULTOR
|
|
WHERE ID_PESSOA IN ({placeholders})
|
|
"""
|
|
mapa = {}
|
|
for r in oracle_local.executar_query(q, params):
|
|
mapa[int(r["ID_PESSOA"])] = {
|
|
"posicao": int(r["POSICAO"] or 0) if r["POSICAO"] is not None else None,
|
|
"total": float(r["PONTUACAO_TOTAL"] or 0),
|
|
"a": float(r["COMPONENTE_A"] or 0),
|
|
"b": float(r["COMPONENTE_B"] or 0),
|
|
"c": float(r["COMPONENTE_C"] or 0),
|
|
"d": float(r["COMPONENTE_D"] or 0),
|
|
}
|
|
return mapa
|
|
|
|
|
|
async def auditar():
|
|
oracle_local = _conectar_oracle_local()
|
|
oracle_remoto = _conectar_oracle_remoto()
|
|
|
|
es_client = ElasticsearchClient(
|
|
url=os.getenv("ES_URL", "http://localhost:9200"),
|
|
index=os.getenv("ES_INDEX", "atuacapes"),
|
|
user=os.getenv("ES_USER", ""),
|
|
password=os.getenv("ES_PASSWORD", ""),
|
|
)
|
|
await es_client.connect()
|
|
|
|
repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=oracle_remoto)
|
|
|
|
ids_posicoes = _amostra_ids(oracle_local)
|
|
ids = [i for i, _ in ids_posicoes]
|
|
mapa_db = _mapa_ranking_db(oracle_local, ids)
|
|
|
|
resultados = []
|
|
|
|
for id_pessoa, posicao_db in ids_posicoes:
|
|
try:
|
|
consultor = await repo.buscar_por_id(id_pessoa)
|
|
if not consultor:
|
|
resultados.append({
|
|
"id": id_pessoa,
|
|
"pos_db": posicao_db,
|
|
"pos_calc": None,
|
|
"total_db": mapa_db.get(id_pessoa, {}).get("total", 0),
|
|
"total_calc": None,
|
|
"delta": None,
|
|
"obs": "consultor não encontrado no ES"
|
|
})
|
|
continue
|
|
|
|
total_calc = consultor.pontuacao_total
|
|
comp = consultor.pontuacao
|
|
db_vals = mapa_db.get(id_pessoa, {})
|
|
|
|
resultados.append({
|
|
"id": id_pessoa,
|
|
"pos_db": posicao_db,
|
|
"pos_calc": None, # posição calculada precisa de ordenação global; deixamos None
|
|
"total_db": db_vals.get("total", 0),
|
|
"total_calc": total_calc,
|
|
"delta": round(total_calc - db_vals.get("total", 0), 2),
|
|
"comp_a_db": db_vals.get("a", 0),
|
|
"comp_a_calc": comp.componente_a.total,
|
|
"comp_b_db": db_vals.get("b", 0),
|
|
"comp_b_calc": comp.componente_b.total,
|
|
"comp_c_db": db_vals.get("c", 0),
|
|
"comp_c_calc": comp.componente_c.total,
|
|
"comp_d_db": db_vals.get("d", 0),
|
|
"comp_d_calc": comp.componente_d.total,
|
|
"obs": "",
|
|
})
|
|
except Exception as e:
|
|
resultados.append({
|
|
"id": id_pessoa,
|
|
"pos_db": posicao_db,
|
|
"pos_calc": None,
|
|
"total_db": mapa_db.get(id_pessoa, {}).get("total", 0),
|
|
"total_calc": None,
|
|
"delta": None,
|
|
"obs": f"erro: {e}",
|
|
})
|
|
|
|
# Exporta CSV
|
|
logs_dir = ROOT_DIR / "logs"
|
|
logs_dir.mkdir(exist_ok=True)
|
|
csv_path = logs_dir / "auditoria_ranking.csv"
|
|
campos = [
|
|
"id", "pos_db", "pos_calc", "total_db", "total_calc", "delta",
|
|
"comp_a_db", "comp_a_calc", "comp_b_db", "comp_b_calc",
|
|
"comp_c_db", "comp_c_calc", "comp_d_db", "comp_d_calc", "obs"
|
|
]
|
|
with csv_path.open("w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=campos)
|
|
writer.writeheader()
|
|
for r in resultados:
|
|
writer.writerow(r)
|
|
|
|
# Mostra divergências relevantes
|
|
divergentes = [r for r in resultados if r.get("delta") and abs(r["delta"]) > 0.01]
|
|
print(f"Amostra auditada: {len(resultados)} IDs")
|
|
print(f"Divergências (delta != 0): {len(divergentes)}")
|
|
if divergentes:
|
|
for r in divergentes[:10]:
|
|
print(
|
|
f"ID {r['id']} delta {r['delta']} | "
|
|
f"A {r['comp_a_db']}->{r['comp_a_calc']} "
|
|
f"B {r['comp_b_db']}->{r['comp_b_calc']} "
|
|
f"C {r['comp_c_db']}->{r['comp_c_calc']} "
|
|
f"D {r['comp_d_db']}->{r['comp_d_calc']} "
|
|
f"{r.get('obs','')}"
|
|
)
|
|
if len(divergentes) > 10:
|
|
print(f"... +{len(divergentes)-10} divergências (veja {csv_path})")
|
|
else:
|
|
print("Nenhuma divergência encontrada na amostra.")
|
|
print(f"CSV salvo em: {csv_path}")
|
|
|
|
await es_client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(auditar())
|