#!/usr/bin/env python3 """ Validação completa do Ranking de Consultores CAPES. Verifica: 1. PONTUAÇÃO: Recalcula a partir do ES e compara com o BD 2. ORDENAÇÃO: Verifica se as posições estão corretas (por pontuação decrescente) 3. CRITÉRIOS: Compara critérios do código com a especificação oficial Uso: python scripts/validar_ranking.py [--amostra N] [--top N] [--id ID1,ID2,...] Saída: - Relatório em tela - CSV em backend/logs/validacao_ranking.csv """ import asyncio import argparse import csv import json import os import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from dotenv import load_dotenv ROOT_DIR = Path(__file__).resolve().parents[1] load_dotenv(ROOT_DIR / ".env") sys.path.insert(0, str(ROOT_DIR)) from src.infrastructure.elasticsearch.client import ElasticsearchClient from src.infrastructure.oracle.client import OracleClient from src.infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl from src.domain.value_objects.criterios_pontuacao import CRITERIOS CRITERIOS_OFICIAIS = { "CA": {"base": 200, "teto": 450, "tempo": 10, "teto_tempo": 100, "bonus_atual": 30, "bonus_retorno": 20}, "CAJ": {"base": 150, "teto": 370, "tempo": 8, "teto_tempo": 80, "bonus_atual": 20, "bonus_retorno": 15}, "CAJ_MP": {"base": 120, "teto": 315, "tempo": 6, "teto_tempo": 60, "bonus_atual": 15, "bonus_retorno": 10}, "CAM": {"base": 100, "teto": 280, "tempo": 5, "teto_tempo": 50, "bonus_atual": 20, "bonus_retorno": 10}, "PPG_COORD": {"base": 0, "teto": 0, "tempo": 0, "teto_tempo": 0, "bonus_atual": 15, "bonus_retorno": 10, "bonus_continuidade": 15}, "CONS_ATIVO": {"base": 150, "teto": 230, "tempo": 5, "teto_tempo": 50, "bonus_atual": 20, "bonus_retorno": 15, "bonus_continuidade": 20}, "CONS_HIST": {"base": 100, "teto": 230, "tempo": 5, "teto_tempo": 50, "bonus_retorno": 20, "bonus_continuidade": 20}, "CONS_FALECIDO": {"base": 100, "teto": 230, "tempo": 5, "teto_tempo": 50, "bonus_continuidade": 20}, "INSC_AUTOR": {"base": 10, "teto": 20}, "INSC_INST_AUTOR": {"base": 20, "teto": 50}, "AVAL_COMIS_PREMIO": {"base": 30, "teto": 60}, "AVAL_COMIS_GP": {"base": 40, "teto": 80}, "COORD_COMIS_PREMIO": {"base": 40, "teto": 100}, "COORD_COMIS_GP": {"base": 50, "teto": 120}, "BOL_BPQ_NIVEL": {"base": 30, "teto": 60}, "PREMIACAO_GP_AUTOR": {"base": 100, "teto": 300}, "PREMIACAO_AUTOR": {"base": 50, "teto": 150}, "MENCAO_AUTOR": {"base": 30, "teto": 90}, "EVENTO": {"base": 1, "teto": 5}, "PROJ": {"base": 10, "teto": 30}, "ORIENT_POS_DOC": {"base": 0, "teto": 0}, "ORIENT_TESE": {"base": 0, "teto": 0}, "ORIENT_DISS": {"base": 0, "teto": 0}, "CO_ORIENT_POS_DOC": {"base": 0, "teto": 0}, "CO_ORIENT_TESE": {"base": 0, "teto": 0}, "CO_ORIENT_DISS": {"base": 0, "teto": 0}, "MB_BANCA_POS_DOC": {"base": 0, "teto": 0}, "MB_BANCA_TESE": {"base": 0, "teto": 0}, "MB_BANCA_DISS": {"base": 0, "teto": 0}, } @dataclass class ResultadoValidacao: id_pessoa: int nome: str posicao_db: int pontuacao_db: float pontuacao_calc: Optional[float] delta: Optional[float] bloco_a_db: float bloco_a_calc: Optional[float] bloco_c_db: float bloco_c_calc: Optional[float] bloco_d_db: float bloco_d_calc: Optional[float] status: str observacao: str def conectar_oracle_local() -> OracleClient: client = OracleClient( user=os.getenv("ORACLE_LOCAL_USER", "local123"), password=os.getenv("ORACLE_LOCAL_PASSWORD", "local123"), dsn=os.getenv("ORACLE_LOCAL_DSN", "oracle18c:1521/XEPDB1"), ) client.connect() return client def obter_ranking_db(oracle: OracleClient, limite: int = 100, offset: int = 0) -> List[Dict]: query = """ SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D FROM TB_RANKING_CONSULTOR ORDER BY POSICAO OFFSET :offset ROWS FETCH NEXT :limite ROWS ONLY """ return oracle.executar_query(query, {"limite": limite, "offset": offset}) def obter_por_ids(oracle: OracleClient, ids: List[int]) -> Dict[int, Dict]: if not ids: return {} placeholders = ", ".join(f":id{i}" for i in range(len(ids))) params = {f"id{i}": val for i, val in enumerate(ids)} query = f""" SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D FROM TB_RANKING_CONSULTOR WHERE ID_PESSOA IN ({placeholders}) """ resultado = {} for r in oracle.executar_query(query, params): resultado[int(r["ID_PESSOA"])] = r return resultado def verificar_ordenacao(oracle: OracleClient) -> Tuple[bool, List[str]]: query = """ SELECT COUNT(*) as total FROM ( SELECT ID_PESSOA, POSICAO, PONTUACAO_TOTAL, LAG(PONTUACAO_TOTAL) OVER (ORDER BY POSICAO) as pont_anterior, LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior FROM TB_RANKING_CONSULTOR ) WHERE pont_anterior < PONTUACAO_TOTAL """ resultado = oracle.executar_query(query) inversoes = int(resultado[0]["TOTAL"]) if resultado else 0 erros = [] if inversoes > 0: query_detalhes = """ SELECT * FROM ( SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL, LAG(PONTUACAO_TOTAL) OVER (ORDER BY POSICAO) as pont_anterior, LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior, LAG(ID_PESSOA) OVER (ORDER BY POSICAO) as id_anterior FROM TB_RANKING_CONSULTOR ) WHERE pont_anterior < PONTUACAO_TOTAL FETCH FIRST 10 ROWS ONLY """ for r in oracle.executar_query(query_detalhes): erros.append( f"Pos {r['POS_ANTERIOR']}→{r['POSICAO']}: " f"ID {r['ID_ANTERIOR']} ({r['PONT_ANTERIOR']} pts) < " f"ID {r['ID_PESSOA']} ({r['PONTUACAO_TOTAL']} pts)" ) return inversoes == 0, erros def verificar_posicoes_duplicadas(oracle: OracleClient) -> Tuple[bool, List[str]]: query = """ SELECT POSICAO, COUNT(*) as qtd FROM TB_RANKING_CONSULTOR GROUP BY POSICAO HAVING COUNT(*) > 1 ORDER BY POSICAO FETCH FIRST 10 ROWS ONLY """ resultado = oracle.executar_query(query) erros = [f"Posição {r['POSICAO']} duplicada ({r['QTD']}x)" for r in resultado] return len(erros) == 0, erros def verificar_gaps_posicao(oracle: OracleClient) -> Tuple[bool, List[str]]: query = """ SELECT COUNT(*) as total FROM ( SELECT POSICAO, LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior FROM TB_RANKING_CONSULTOR ) WHERE POSICAO - pos_anterior > 1 """ resultado = oracle.executar_query(query) gaps = int(resultado[0]["TOTAL"]) if resultado else 0 erros = [] if gaps > 0: query_detalhes = """ SELECT * FROM ( SELECT POSICAO, LAG(POSICAO) OVER (ORDER BY POSICAO) as pos_anterior FROM TB_RANKING_CONSULTOR ) WHERE POSICAO - pos_anterior > 1 FETCH FIRST 10 ROWS ONLY """ for r in oracle.executar_query(query_detalhes): erros.append(f"Gap entre posições {r['POS_ANTERIOR']} e {r['POSICAO']}") return gaps == 0, erros def verificar_criterios_codigo() -> Tuple[bool, List[str]]: divergencias = [] for codigo, oficial in CRITERIOS_OFICIAIS.items(): if codigo not in CRITERIOS: divergencias.append(f"{codigo}: não existe no código") continue impl = CRITERIOS[codigo] if impl.base != oficial["base"]: divergencias.append(f"{codigo}: base {impl.base} != {oficial['base']} (oficial)") if impl.teto != oficial["teto"]: divergencias.append(f"{codigo}: teto {impl.teto} != {oficial['teto']} (oficial)") if "tempo" in oficial: if impl.multiplicador_tempo != oficial["tempo"]: divergencias.append(f"{codigo}: tempo {impl.multiplicador_tempo} != {oficial['tempo']} (oficial)") if "teto_tempo" in oficial: if impl.teto_tempo != oficial["teto_tempo"]: divergencias.append(f"{codigo}: teto_tempo {impl.teto_tempo} != {oficial['teto_tempo']} (oficial)") return len(divergencias) == 0, divergencias async def validar_pontuacao_amostra( oracle: OracleClient, es_client: ElasticsearchClient, ids: List[int] ) -> List[ResultadoValidacao]: repo = ConsultorRepositoryImpl(es_client=es_client, oracle_client=None) mapa_db = obter_por_ids(oracle, ids) resultados = [] for id_pessoa in ids: db_data = mapa_db.get(id_pessoa, {}) try: consultor = await repo.buscar_por_id(id_pessoa) if not consultor: resultados.append(ResultadoValidacao( id_pessoa=id_pessoa, nome=db_data.get("NOME", "?"), posicao_db=int(db_data.get("POSICAO") or 0), pontuacao_db=float(db_data.get("PONTUACAO_TOTAL") or 0), pontuacao_calc=None, delta=None, bloco_a_db=float(db_data.get("COMPONENTE_A") or 0), bloco_a_calc=None, bloco_c_db=float(db_data.get("COMPONENTE_C") or 0), bloco_c_calc=None, bloco_d_db=float(db_data.get("COMPONENTE_D") or 0), bloco_d_calc=None, status="ERRO", observacao="Não encontrado no ES", )) continue total_calc = consultor.pontuacao_total total_db = float(db_data.get("PONTUACAO_TOTAL") or 0) delta = round(total_calc - total_db, 2) status = "OK" if abs(delta) < 0.01 else "DIVERGENTE" obs = "" if abs(delta) >= 0.01: obs = f"Delta: {delta:+.2f}" resultados.append(ResultadoValidacao( id_pessoa=id_pessoa, nome=consultor.nome, posicao_db=int(db_data.get("POSICAO") or 0), pontuacao_db=total_db, pontuacao_calc=total_calc, delta=delta, bloco_a_db=float(db_data.get("COMPONENTE_A") or 0), bloco_a_calc=consultor.pontuacao_bloco_a, bloco_c_db=float(db_data.get("COMPONENTE_C") or 0), bloco_c_calc=consultor.pontuacao_bloco_c, bloco_d_db=float(db_data.get("COMPONENTE_D") or 0), bloco_d_calc=consultor.pontuacao_bloco_d, status=status, observacao=obs, )) except Exception as e: resultados.append(ResultadoValidacao( id_pessoa=id_pessoa, nome=db_data.get("NOME", "?"), posicao_db=int(db_data.get("POSICAO") or 0), pontuacao_db=float(db_data.get("PONTUACAO_TOTAL") or 0), pontuacao_calc=None, delta=None, bloco_a_db=float(db_data.get("COMPONENTE_A") or 0), bloco_a_calc=None, bloco_c_db=float(db_data.get("COMPONENTE_C") or 0), bloco_c_calc=None, bloco_d_db=float(db_data.get("COMPONENTE_D") or 0), bloco_d_calc=None, status="ERRO", observacao=str(e)[:100], )) return resultados async def validar_selos(oracle: OracleClient, es_client: ElasticsearchClient, limite: int = 50) -> Tuple[bool, List[str]]: result = oracle.executar_query(f''' SELECT ID_PESSOA, NOME, JSON_DETALHES FROM TB_RANKING_CONSULTOR WHERE POSICAO <= {limite} ORDER BY POSICAO ''') erros = [] for r in result: id_pessoa = r['ID_PESSOA'] nome = r['NOME'] detalhes = json.loads(r['JSON_DETALHES']) db_bolsas = len(detalhes.get('bolsas_cnpq', [])) db_prem = len(detalhes.get('premiacoes', [])) db_orient = len(detalhes.get('orientacoes', [])) doc = await es_client.buscar_por_id(id_pessoa) if not doc: erros.append(f'{nome}: Não encontrado no ES') continue atuacoes = doc.get('atuacoes', []) es_bolsas = sum(1 for a in atuacoes if 'bolsista' in str(a.get('tipo', '')).lower() and 'cnpq' in str(a.get('tipo', '')).lower()) es_prem = sum(1 for a in atuacoes if a.get('tipo') == 'Premiação Prêmio') es_orient = 0 for a in atuacoes: if a.get('tipo') == 'Orientação de Discentes': dados = a.get('dadosOrientacaoDiscente', {}) es_orient += (dados.get('totalOrientacaoFinalizadaMestrado') or 0) es_orient += (dados.get('totalOrientacaoFinalizadaDoutorado') or 0) es_orient += (dados.get('totalAcompanhamentoPosDoutorado') or 0) diffs = [] if db_bolsas != es_bolsas: diffs.append(f'Bolsas DB:{db_bolsas} ES:{es_bolsas}') if db_prem != es_prem: diffs.append(f'Prem DB:{db_prem} ES:{es_prem}') if db_orient != es_orient: diffs.append(f'Orient DB:{db_orient} ES:{es_orient}') if diffs: erros.append(f'{nome}: {" | ".join(diffs)}') return len(erros) == 0, erros def gerar_amostra_ids(oracle: OracleClient, tamanho: int = 100) -> List[int]: ids = [] top_n = min(tamanho // 3, 50) query_top = f""" SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR ORDER BY POSICAO FETCH FIRST {top_n} ROWS ONLY """ ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_top)]) query_total = "SELECT COUNT(*) as total FROM TB_RANKING_CONSULTOR" total = int(oracle.executar_query(query_total)[0]["TOTAL"]) meio_n = min(tamanho // 3, 30) meio_offset = total // 2 query_meio = f""" SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR ORDER BY POSICAO OFFSET {meio_offset} ROWS FETCH NEXT {meio_n} ROWS ONLY """ ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_meio)]) cauda_n = min(tamanho // 3, 20) query_cauda = f""" SELECT ID_PESSOA FROM ( SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR ORDER BY POSICAO DESC FETCH FIRST {cauda_n} ROWS ONLY ) """ ids.extend([int(r["ID_PESSOA"]) for r in oracle.executar_query(query_cauda)]) return list(set(ids)) def exportar_csv(resultados: List[ResultadoValidacao], caminho: Path): campos = [ "id_pessoa", "nome", "posicao_db", "pontuacao_db", "pontuacao_calc", "delta", "bloco_a_db", "bloco_a_calc", "bloco_c_db", "bloco_c_calc", "bloco_d_db", "bloco_d_calc", "status", "observacao" ] with caminho.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=campos) writer.writeheader() for r in resultados: writer.writerow({ "id_pessoa": r.id_pessoa, "nome": r.nome, "posicao_db": r.posicao_db, "pontuacao_db": r.pontuacao_db, "pontuacao_calc": r.pontuacao_calc, "delta": r.delta, "bloco_a_db": r.bloco_a_db, "bloco_a_calc": r.bloco_a_calc, "bloco_c_db": r.bloco_c_db, "bloco_c_calc": r.bloco_c_calc, "bloco_d_db": r.bloco_d_db, "bloco_d_calc": r.bloco_d_calc, "status": r.status, "observacao": r.observacao, }) async def main(): parser = argparse.ArgumentParser(description="Validação do Ranking de Consultores CAPES") parser.add_argument("--amostra", type=int, default=100, help="Tamanho da amostra para validação") parser.add_argument("--top", type=int, default=0, help="Validar apenas top N") parser.add_argument("--id", type=str, default="", help="IDs específicos (separados por vírgula)") args = parser.parse_args() print("=" * 70) print("VALIDAÇÃO DO RANKING DE CONSULTORES CAPES") print(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 70) print("\n[1/4] Verificando critérios do código...") criterios_ok, criterios_erros = verificar_criterios_codigo() if criterios_ok: print("✓ Critérios do código estão de acordo com a especificação") else: print(f"✗ {len(criterios_erros)} divergência(s) nos critérios:") for erro in criterios_erros[:10]: print(f" - {erro}") if len(criterios_erros) > 10: print(f" ... e mais {len(criterios_erros) - 10}") print("\n[2/4] Conectando aos bancos de dados...") oracle = conectar_oracle_local() print("✓ Oracle Local conectado") es_client = ElasticsearchClient( url=os.getenv("ES_URL", "http://localhost:9200"), index=os.getenv("ES_INDEX", "atuacapes"), user=os.getenv("ES_USER", ""), password=os.getenv("ES_PASSWORD", ""), ) await es_client.connect() print("✓ Elasticsearch conectado") print("\n[3/4] Verificando integridade da ordenação...") ordem_ok, ordem_erros = verificar_ordenacao(oracle) if ordem_ok: print("✓ Ordenação por pontuação está correta") else: print(f"✗ Problemas na ordenação:") for erro in ordem_erros: print(f" - {erro}") dup_ok, dup_erros = verificar_posicoes_duplicadas(oracle) if dup_ok: print("✓ Não há posições duplicadas") else: print(f"✗ Posições duplicadas encontradas:") for erro in dup_erros: print(f" - {erro}") gaps_ok, gaps_erros = verificar_gaps_posicao(oracle) if gaps_ok: print("✓ Não há gaps nas posições") else: print(f"✗ Gaps encontrados:") for erro in gaps_erros: print(f" - {erro}") print("\n[4/5] Validando selos (top 50)...") selos_ok, selos_erros = await validar_selos(oracle, es_client, limite=50) if selos_ok: print("✓ Selos validados com sucesso") else: print(f"✗ {len(selos_erros)} divergência(s) nos selos:") for erro in selos_erros[:5]: print(f" - {erro}") print("\n[5/5] Validando pontuação da amostra...") if args.id: ids = [int(x.strip()) for x in args.id.split(",")] print(f"Validando IDs específicos: {ids}") elif args.top > 0: query = f""" SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR ORDER BY POSICAO FETCH FIRST {args.top} ROWS ONLY """ ids = [int(r["ID_PESSOA"]) for r in oracle.executar_query(query)] print(f"Validando top {args.top}") else: ids = gerar_amostra_ids(oracle, args.amostra) print(f"Amostra gerada: {len(ids)} consultores (top, meio, cauda)") resultados = await validar_pontuacao_amostra(oracle, es_client, ids) ok_count = sum(1 for r in resultados if r.status == "OK") divergente_count = sum(1 for r in resultados if r.status == "DIVERGENTE") erro_count = sum(1 for r in resultados if r.status == "ERRO") print(f"\nResultados da validação de pontuação:") print(f" ✓ OK: {ok_count}") print(f" ✗ Divergentes: {divergente_count}") print(f" ⚠ Erros: {erro_count}") if divergente_count > 0: print(f"\nDivergências encontradas:") for r in [x for x in resultados if x.status == "DIVERGENTE"][:10]: print(f" #{r.posicao_db} {r.nome[:30]:30} | DB: {r.pontuacao_db:.0f} → Calc: {r.pontuacao_calc:.0f} ({r.delta:+.0f})") if r.bloco_a_db != r.bloco_a_calc: print(f" Bloco A: {r.bloco_a_db:.0f} → {r.bloco_a_calc:.0f}") if r.bloco_c_db != r.bloco_c_calc: print(f" Bloco C: {r.bloco_c_db:.0f} → {r.bloco_c_calc:.0f}") if r.bloco_d_db != r.bloco_d_calc: print(f" Bloco D: {r.bloco_d_db:.0f} → {r.bloco_d_calc:.0f}") logs_dir = ROOT_DIR / "logs" logs_dir.mkdir(exist_ok=True) csv_path = logs_dir / "validacao_ranking.csv" exportar_csv(resultados, csv_path) print(f"\nCSV exportado: {csv_path}") print("\n" + "=" * 70) print("RESUMO DA VALIDAÇÃO") print("=" * 70) tudo_ok = criterios_ok and ordem_ok and dup_ok and gaps_ok and selos_ok and divergente_count == 0 if tudo_ok: print("✓ RANKING VÁLIDO - Todas as verificações passaram") else: print("✗ RANKING COM PROBLEMAS:") if not criterios_ok: print(f" - {len(criterios_erros)} divergência(s) nos critérios") if not ordem_ok: print(f" - Problemas na ordenação") if not dup_ok: print(f" - Posições duplicadas") if not gaps_ok: print(f" - Gaps nas posições") if not selos_ok: print(f" - {len(selos_erros)} divergência(s) nos selos") if divergente_count > 0: print(f" - {divergente_count} pontuação(ões) divergente(s)") await es_client.close() return 0 if tudo_ok else 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)