feat(backend): adicionar script de validação do ranking

- Cria script validar_ranking.py para verificar integridade do ranking
- Valida critérios do código vs especificação oficial
- Verifica ordenação, posições duplicadas e gaps
- Recalcula pontuação de amostra e compara com banco
- Corrige CONS_FALECIDO para não pontuar por tempo (spec oficial)
This commit is contained in:
Frederico Castro
2025-12-15 12:16:54 -03:00
parent 99e59e2100
commit e785b9fc62
2 changed files with 512 additions and 3 deletions

View File

@@ -0,0 +1,509 @@
#!/usr/bin/env python3
"""
Validação completa do Ranking de Consultores CAPES.
Verifica:
1. PONTUAÇÃO: Recalcula a partir do ES e compara com o BD
2. ORDENAÇÃO: Verifica se as posições estão corretas (por pontuação decrescente)
3. CRITÉRIOS: Compara critérios do código com a especificação oficial
Uso:
python scripts/validar_ranking.py [--amostra N] [--top N] [--id ID1,ID2,...]
Saída:
- Relatório em tela
- CSV em backend/logs/validacao_ranking.csv
"""
import asyncio
import argparse
import csv
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from dotenv import load_dotenv
ROOT_DIR = Path(__file__).resolve().parents[1]
load_dotenv(ROOT_DIR / ".env")
sys.path.insert(0, str(ROOT_DIR))
from src.infrastructure.elasticsearch.client import ElasticsearchClient
from src.infrastructure.oracle.client import OracleClient
from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl
from src.domain.value_objects.criterios_pontuacao import CRITERIOS
CRITERIOS_OFICIAIS = {
"CA": {"base": 200, "teto": 450, "tempo": 10, "teto_tempo": 100, "bonus_atual": 30, "bonus_retorno": 20},
"CAJ": {"base": 150, "teto": 370, "tempo": 8, "teto_tempo": 80, "bonus_atual": 20, "bonus_retorno": 15},
"CAJ_MP": {"base": 120, "teto": 315, "tempo": 6, "teto_tempo": 60, "bonus_atual": 15, "bonus_retorno": 10},
"CAM": {"base": 100, "teto": 280, "tempo": 5, "teto_tempo": 50, "bonus_atual": 20, "bonus_retorno": 10},
"CONS_ATIVO": {"base": 150, "teto": 230, "tempo": 5, "teto_tempo": 50, "bonus_continuidade": 20, "bonus_retorno": 15},
"CONS_HIST": {"base": 100, "teto": 230, "tempo": 5, "teto_tempo": 50, "bonus_continuidade": 20},
"CONS_FALECIDO": {"base": 100, "teto": 230, "tempo": 0, "teto_tempo": 0, "bonus_continuidade": 20},
"INSC_AUTOR": {"base": 10, "teto": 20},
"INSC_INST": {"base": 30, "teto": 60},
"AVAL_COMIS_PREMIO": {"base": 30, "teto": 60},
"AVAL_COMIS_GP": {"base": 50, "teto": 80},
"COORD_COMIS_PREMIO": {"base": 50, "teto": 100},
"COORD_COMIS_GP": {"base": 60, "teto": 120},
"BOL_BPQ_SUPERIOR": {"base": 30, "teto": 60},
"BOL_BPQ_INTERMEDIARIO": {"base": 50, "teto": 100},
"PREMIACAO": {"base": 100, "teto": 180},
"PREMIACAO_GP": {"base": 50, "teto": 60},
"MENCAO": {"base": 30, "teto": 30},
"EVENTO": {"base": 1, "teto": 5},
"PROJ": {"base": 10, "teto": 40},
"ORIENT_POS_DOC": {"base": 15, "teto": 100},
"ORIENT_TESE": {"base": 10, "teto": 50},
"ORIENT_DISS": {"base": 5, "teto": 25},
"CO_ORIENT_POS_DOC": {"base": 7, "teto": 35},
"CO_ORIENT_TESE": {"base": 5, "teto": 25},
"CO_ORIENT_DISS": {"base": 3, "teto": 15},
"MB_BANCA_POS_DOC": {"base": 3, "teto": 15},
"MB_BANCA_TESE": {"base": 3, "teto": 15},
"MB_BANCA_DISS": {"base": 2, "teto": 10},
}
@dataclass
class ResultadoValidacao:
id_pessoa: int
nome: str
posicao_db: int
pontuacao_db: float
pontuacao_calc: Optional[float]
delta: Optional[float]
bloco_a_db: float
bloco_a_calc: Optional[float]
bloco_c_db: float
bloco_c_calc: Optional[float]
bloco_d_db: float
bloco_d_calc: Optional[float]
status: str
observacao: str
def conectar_oracle_local() -> OracleClient:
client = OracleClient(
user=os.getenv("ORACLE_LOCAL_USER", "local123"),
password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"),
dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"),
)
client.connect()
return client
def obter_ranking_db(oracle: OracleClient, limite: int = 100, offset: int = 0) -> List[Dict]:
query = """
SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL,
COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D
FROM TB_RANKING_CONSULTOR
ORDER BY POSICAO
OFFSET :offset ROWS FETCH NEXT :limite ROWS ONLY
"""
return oracle.executar_query(query, {"limite": limite, "offset": offset})
def obter_por_ids(oracle: OracleClient, ids: List[int]) -> Dict[int, Dict]:
if not ids:
return {}
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
params = {f"id{i}": val for i, val in enumerate(ids)}
query = f"""
SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL,
COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D
FROM TB_RANKING_CONSULTOR
WHERE ID_PESSOA IN ({placeholders})
"""
resultado = {}
for r in oracle.executar_query(query, params):
resultado[int(r["ID_PESSOA"])] = r
return resultado
def verificar_ordenacao(oracle: OracleClient) -> Tuple[bool, List[str]]:
query = """
SELECT COUNT(*) as total FROM (
SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL,
LAG(PONTUACAO_TOTAL) OVER (ORDER BY POSICAO) as pont_anterior,
LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior
FROM TB_RANKING_CONSULTOR
) WHERE pont_anterior < PONTUACAO_TOTAL
"""
resultado = oracle.executar_query(query)
inversoes = int(resultado[0]["TOTAL"]) if resultado else 0
erros = []
if inversoes > 0:
query_detalhes = """
SELECT * FROM (
SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL,
LAG(PONTUACAO_TOTAL) OVER (ORDER BY POSICAO) as pont_anterior,
LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior,
LAG(ID_PESSOA) OVER (ORDER BY POSICAO) as id_anterior
FROM TB_RANKING_CONSULTOR
) WHERE pont_anterior < PONTUACAO_TOTAL
FETCH FIRST 10 ROWS ONLY
"""
for r in oracle.executar_query(query_detalhes):
erros.append(
f"Pos {r['POS_ANTERIOR']}{r['POSICAO']}: "
f"ID {r['ID_ANTERIOR']} ({r['PONT_ANTERIOR']} pts) < "
f"ID {r['ID_PESSOA']} ({r['PONTUACAO_TOTAL']} pts)"
)
return inversoes == 0, erros
def verificar_posicoes_duplicadas(oracle: OracleClient) -> Tuple[bool, List[str]]:
query = """
SELECT POSICAO, COUNT(*) as qtd
FROM TB_RANKING_CONSULTOR
GROUP BY POSICAO
HAVING COUNT(*) > 1
ORDER BY POSICAO
FETCH FIRST 10 ROWS ONLY
"""
resultado = oracle.executar_query(query)
erros = [f"Posição {r['POSICAO']} duplicada ({r['QTD']}x)" for r in resultado]
return len(erros) == 0, erros
def verificar_gaps_posicao(oracle: OracleClient) -> Tuple[bool, List[str]]:
query = """
SELECT COUNT(*) as total FROM (
SELECT POSICAO,
LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior
FROM TB_RANKING_CONSULTOR
) WHERE POSICAO - pos_anterior > 1
"""
resultado = oracle.executar_query(query)
gaps = int(resultado[0]["TOTAL"]) if resultado else 0
erros = []
if gaps > 0:
query_detalhes = """
SELECT * FROM (
SELECT POSICAO, LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior
FROM TB_RANKING_CONSULTOR
) WHERE POSICAO - pos_anterior > 1
FETCH FIRST 10 ROWS ONLY
"""
for r in oracle.executar_query(query_detalhes):
erros.append(f"Gap entre posições {r['POS_ANTERIOR']} e {r['POSICAO']}")
return gaps == 0, erros
def verificar_criterios_codigo() -> Tuple[bool, List[str]]:
divergencias = []
for codigo, oficial in CRITERIOS_OFICIAIS.items():
if codigo not in CRITERIOS:
if codigo not in ["BOL_BPQ_SUP", "BOL_BPQ_INT"]:
divergencias.append(f"{codigo}: não existe no código")
continue
impl = CRITERIOS[codigo]
if impl.base != oficial["base"]:
divergencias.append(f"{codigo}: base {impl.base} != {oficial['base']} (oficial)")
if impl.teto != oficial["teto"]:
divergencias.append(f"{codigo}: teto {impl.teto} != {oficial['teto']} (oficial)")
if "tempo" in oficial:
if impl.multiplicador_tempo != oficial["tempo"]:
divergencias.append(f"{codigo}: tempo {impl.multiplicador_tempo} != {oficial['tempo']} (oficial)")
if "teto_tempo" in oficial:
if impl.teto_tempo != oficial["teto_tempo"]:
divergencias.append(f"{codigo}: teto_tempo {impl.teto_tempo} != {oficial['teto_tempo']} (oficial)")
return len(divergencias) == 0, divergencias
async def validar_pontuacao_amostra(
oracle: OracleClient,
es_client: ElasticsearchClient,
ids: List[int]
) -> List[ResultadoValidacao]:
repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=None)
mapa_db = obter_por_ids(oracle, ids)
resultados = []
for id_pessoa in ids:
db_data = mapa_db.get(id_pessoa, {})
try:
consultor = await repo.buscar_por_id(id_pessoa)
if not consultor:
resultados.append(ResultadoValidacao(
id_pessoa=id_pessoa,
nome=db_data.get("NOME", "?"),
posicao_db=int(db_data.get("POSICAO") or 0),
pontuacao_db=float(db_data.get("PONTUACAO_TOTAL") or 0),
pontuacao_calc=None,
delta=None,
bloco_a_db=float(db_data.get("COMPONENTE_A") or 0),
bloco_a_calc=None,
bloco_c_db=float(db_data.get("COMPONENTE_C") or 0),
bloco_c_calc=None,
bloco_d_db=float(db_data.get("COMPONENTE_D") or 0),
bloco_d_calc=None,
status="ERRO",
observacao="Não encontrado no ES",
))
continue
total_calc = consultor.pontuacao_total
total_db = float(db_data.get("PONTUACAO_TOTAL") or 0)
delta = round(total_calc - total_db, 2)
status = "OK" if abs(delta) < 0.01 else "DIVERGENTE"
obs = ""
if abs(delta) >= 0.01:
obs = f"Delta: {delta:+.2f}"
resultados.append(ResultadoValidacao(
id_pessoa=id_pessoa,
nome=consultor.nome,
posicao_db=int(db_data.get("POSICAO") or 0),
pontuacao_db=total_db,
pontuacao_calc=total_calc,
delta=delta,
bloco_a_db=float(db_data.get("COMPONENTE_A") or 0),
bloco_a_calc=consultor.pontuacao_bloco_a,
bloco_c_db=float(db_data.get("COMPONENTE_C") or 0),
bloco_c_calc=consultor.pontuacao_bloco_c,
bloco_d_db=float(db_data.get("COMPONENTE_D") or 0),
bloco_d_calc=consultor.pontuacao_bloco_d,
status=status,
observacao=obs,
))
except Exception as e:
resultados.append(ResultadoValidacao(
id_pessoa=id_pessoa,
nome=db_data.get("NOME", "?"),
posicao_db=int(db_data.get("POSICAO") or 0),
pontuacao_db=float(db_data.get("PONTUACAO_TOTAL") or 0),
pontuacao_calc=None,
delta=None,
bloco_a_db=float(db_data.get("COMPONENTE_A") or 0),
bloco_a_calc=None,
bloco_c_db=float(db_data.get("COMPONENTE_C") or 0),
bloco_c_calc=None,
bloco_d_db=float(db_data.get("COMPONENTE_D") or 0),
bloco_d_calc=None,
status="ERRO",
observacao=str(e)[:100],
))
return resultados
def gerar_amostra_ids(oracle: OracleClient, tamanho: int = 100) -> List[int]:
ids = []
top_n = min(tamanho // 3, 50)
query_top = f"""
SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR
ORDER BY POSICAO
FETCH FIRST {top_n} ROWS ONLY
"""
ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_top)])
query_total = "SELECT COUNT(*) as total FROM TB_RANKING_CONSULTOR"
total = int(oracle.executar_query(query_total)[0]["TOTAL"])
meio_n = min(tamanho // 3, 30)
meio_offset = total // 2
query_meio = f"""
SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR
ORDER BY POSICAO
OFFSET {meio_offset} ROWS FETCH NEXT {meio_n} ROWS ONLY
"""
ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_meio)])
cauda_n = min(tamanho // 3, 20)
query_cauda = f"""
SELECT ID_PESSOA FROM (
SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR ORDER BY POSICAO DESC
FETCH FIRST {cauda_n} ROWS ONLY
)
"""
ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_cauda)])
return list(set(ids))
def exportar_csv(resultados: List[ResultadoValidacao], caminho: Path):
campos = [
"id_pessoa", "nome", "posicao_db", "pontuacao_db", "pontuacao_calc", "delta",
"bloco_a_db", "bloco_a_calc", "bloco_c_db", "bloco_c_calc",
"bloco_d_db", "bloco_d_calc", "status", "observacao"
]
with caminho.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=campos)
writer.writeheader()
for r in resultados:
writer.writerow({
"id_pessoa": r.id_pessoa,
"nome": r.nome,
"posicao_db": r.posicao_db,
"pontuacao_db": r.pontuacao_db,
"pontuacao_calc": r.pontuacao_calc,
"delta": r.delta,
"bloco_a_db": r.bloco_a_db,
"bloco_a_calc": r.bloco_a_calc,
"bloco_c_db": r.bloco_c_db,
"bloco_c_calc": r.bloco_c_calc,
"bloco_d_db": r.bloco_d_db,
"bloco_d_calc": r.bloco_d_calc,
"status": r.status,
"observacao": r.observacao,
})
async def main():
parser = argparse.ArgumentParser(description="Validação do Ranking de Consultores CAPES")
parser.add_argument("--amostra", type=int, default=100, help="Tamanho da amostra para validação")
parser.add_argument("--top", type=int, default=0, help="Validar apenas top N")
parser.add_argument("--id", type=str, default="", help="IDs específicos (separados por vírgula)")
args = parser.parse_args()
print("=" * 70)
print("VALIDAÇÃO DO RANKING DE CONSULTORES CAPES")
print(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
print("\n[1/4] Verificando critérios do código...")
criterios_ok, criterios_erros = verificar_criterios_codigo()
if criterios_ok:
print("✓ Critérios do código estão de acordo com a especificação")
else:
print(f"{len(criterios_erros)} divergência(s) nos critérios:")
for erro in criterios_erros[:10]:
print(f" - {erro}")
if len(criterios_erros) > 10:
print(f" ... e mais {len(criterios_erros) - 10}")
print("\n[2/4] Conectando aos bancos de dados...")
oracle = conectar_oracle_local()
print("✓ Oracle Local conectado")
es_client = ElasticsearchClient(
url=os.getenv("ES_URL", "http://localhost:9200"),
index=os.getenv("ES_INDEX", "atuacapes"),
user=os.getenv("ES_USER", ""),
password=os.getenv("ES_PASSWORD", ""),
)
await es_client.connect()
print("✓ Elasticsearch conectado")
print("\n[3/4] Verificando integridade da ordenação...")
ordem_ok, ordem_erros = verificar_ordenacao(oracle)
if ordem_ok:
print("✓ Ordenação por pontuação está correta")
else:
print(f"✗ Problemas na ordenação:")
for erro in ordem_erros:
print(f" - {erro}")
dup_ok, dup_erros = verificar_posicoes_duplicadas(oracle)
if dup_ok:
print("✓ Não há posições duplicadas")
else:
print(f"✗ Posições duplicadas encontradas:")
for erro in dup_erros:
print(f" - {erro}")
gaps_ok, gaps_erros = verificar_gaps_posicao(oracle)
if gaps_ok:
print("✓ Não há gaps nas posições")
else:
print(f"✗ Gaps encontrados:")
for erro in gaps_erros:
print(f" - {erro}")
print("\n[4/4] Validando pontuação da amostra...")
if args.id:
ids = [int(x.strip()) for x in args.id.split(",")]
print(f"Validando IDs específicos: {ids}")
elif args.top > 0:
query = f"""
SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR
ORDER BY POSICAO FETCH FIRST {args.top} ROWS ONLY
"""
ids = [int(r["ID_PESSOA"]) for r in oracle.executar_query(query)]
print(f"Validando top {args.top}")
else:
ids = gerar_amostra_ids(oracle, args.amostra)
print(f"Amostra gerada: {len(ids)} consultores (top, meio, cauda)")
resultados = await validar_pontuacao_amostra(oracle, es_client, ids)
ok_count = sum(1 for r in resultados if r.status == "OK")
divergente_count = sum(1 for r in resultados if r.status == "DIVERGENTE")
erro_count = sum(1 for r in resultados if r.status == "ERRO")
print(f"\nResultados da validação de pontuação:")
print(f" ✓ OK: {ok_count}")
print(f" ✗ Divergentes: {divergente_count}")
print(f" ⚠ Erros: {erro_count}")
if divergente_count > 0:
print(f"\nDivergências encontradas:")
for r in [x for x in resultados if x.status == "DIVERGENTE"][:10]:
print(f" #{r.posicao_db} {r.nome[:30]:30} | DB: {r.pontuacao_db:.0f} → Calc: {r.pontuacao_calc:.0f} ({r.delta:+.0f})")
if r.bloco_a_db != r.bloco_a_calc:
print(f" Bloco A: {r.bloco_a_db:.0f}{r.bloco_a_calc:.0f}")
if r.bloco_c_db != r.bloco_c_calc:
print(f" Bloco C: {r.bloco_c_db:.0f}{r.bloco_c_calc:.0f}")
if r.bloco_d_db != r.bloco_d_calc:
print(f" Bloco D: {r.bloco_d_db:.0f}{r.bloco_d_calc:.0f}")
logs_dir = ROOT_DIR / "logs"
logs_dir.mkdir(exist_ok=True)
csv_path = logs_dir / "validacao_ranking.csv"
exportar_csv(resultados, csv_path)
print(f"\nCSV exportado: {csv_path}")
print("\n" + "=" * 70)
print("RESUMO DA VALIDAÇÃO")
print("=" * 70)
tudo_ok = criterios_ok and ordem_ok and dup_ok and gaps_ok and divergente_count == 0
if tudo_ok:
print("✓ RANKING VÁLIDO - Todas as verificações passaram")
else:
print("✗ RANKING COM PROBLEMAS:")
if not criterios_ok:
print(f" - {len(criterios_erros)} divergência(s) nos critérios")
if not ordem_ok:
print(f" - Problemas na ordenação")
if not dup_ok:
print(f" - Posições duplicadas")
if not gaps_ok:
print(f" - Gaps nas posições")
if divergente_count > 0:
print(f" - {divergente_count} pontuação(ões) divergente(s)")
await es_client.close()
return 0 if tudo_ok else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)