feat(backend): ranking 100% Elasticsearch e critérios do PDF
This commit is contained in:
@@ -1,15 +1,11 @@
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any
|
||||
from typing import Dict, Any, List
|
||||
|
||||
from ...infrastructure.elasticsearch.client import ElasticsearchClient
|
||||
from ...infrastructure.ranking_store import RankingEntry, RankingStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from ...infrastructure.oracle.client import OracleClient
|
||||
from ...infrastructure.oracle.ranking_repository import RankingOracleRepository
|
||||
from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl
|
||||
from ...domain.services.calculador_pontuacao import CalculadorPontuacao
|
||||
from .job_status import job_status
|
||||
|
||||
|
||||
@@ -17,16 +13,12 @@ class ProcessarRankingJob:
|
||||
def __init__(
|
||||
self,
|
||||
es_client: ElasticsearchClient,
|
||||
oracle_remote_client: OracleClient,
|
||||
oracle_local_client: OracleClient,
|
||||
ranking_repo: RankingOracleRepository,
|
||||
ranking_store: RankingStore,
|
||||
):
|
||||
self.es_client = es_client
|
||||
self.oracle_remote_client = oracle_remote_client
|
||||
self.oracle_local_client = oracle_local_client
|
||||
self.ranking_repo = ranking_repo
|
||||
self.ranking_store = ranking_store
|
||||
self.consultor_repo = ConsultorRepositoryImpl(es_client, oracle_client=None)
|
||||
self.calculador = CalculadorPontuacao()
|
||||
self._consultores: List[dict] = []
|
||||
|
||||
async def executar(self, limpar_antes: bool = True) -> Dict[str, Any]:
|
||||
if job_status.is_running:
|
||||
@@ -36,28 +28,26 @@ class ProcessarRankingJob:
|
||||
total = await self.es_client.contar_com_atuacoes()
|
||||
job_status.iniciar(total_esperado=total)
|
||||
|
||||
if limpar_antes:
|
||||
job_status.mensagem = "Limpando tabela de ranking..."
|
||||
self.ranking_repo.limpar_tabela()
|
||||
|
||||
job_status.mensagem = "Iniciando processamento via Scroll API..."
|
||||
self._consultores = []
|
||||
job_status.mensagem = "Iniciando processamento do ranking via Scroll API (Elasticsearch)..."
|
||||
|
||||
resultado = await self.es_client.buscar_todos_consultores(
|
||||
callback=self._processar_batch,
|
||||
batch_size=5000
|
||||
)
|
||||
|
||||
job_status.mensagem = "Atualizando posições no ranking..."
|
||||
self.ranking_repo.atualizar_posicoes()
|
||||
job_status.mensagem = "Ordenando e gerando posições..."
|
||||
entries = self._gerar_entries_ordenadas(self._consultores)
|
||||
await self.ranking_store.set_entries(entries)
|
||||
|
||||
estatisticas = self.ranking_repo.obter_estatisticas()
|
||||
estatisticas = self._obter_estatisticas(entries)
|
||||
|
||||
job_status.finalizar(sucesso=True)
|
||||
|
||||
return {
|
||||
"sucesso": True,
|
||||
"total_processados": resultado["processados"],
|
||||
"total_batches": resultado["batches"],
|
||||
"total_processados": resultado.get("processados", len(entries)),
|
||||
"total_batches": resultado.get("batches", 0),
|
||||
"tempo_decorrido": job_status.tempo_decorrido,
|
||||
"estatisticas": estatisticas
|
||||
}
|
||||
@@ -67,26 +57,11 @@ class ProcessarRankingJob:
|
||||
raise RuntimeError(f"Erro ao processar ranking: {e}")
|
||||
|
||||
async def _processar_batch(self, docs: list, progress: dict) -> None:
|
||||
consultores_para_inserir = []
|
||||
|
||||
for doc in docs:
|
||||
try:
|
||||
consultor = await self.consultor_repo._construir_consultor(doc)
|
||||
|
||||
consultor_dict = {
|
||||
"id_pessoa": consultor.id_pessoa,
|
||||
"nome": consultor.nome,
|
||||
"pontuacao_total": consultor.pontuacao_total,
|
||||
"componente_a": consultor.pontuacao_bloco_a,
|
||||
"componente_b": 0,
|
||||
"componente_c": consultor.pontuacao_bloco_c,
|
||||
"componente_d": consultor.pontuacao_bloco_d,
|
||||
"ativo": consultor.ativo,
|
||||
"anos_atuacao": consultor.anos_atuacao,
|
||||
"detalhes": self._gerar_json_detalhes(consultor)
|
||||
}
|
||||
|
||||
consultores_para_inserir.append(consultor_dict)
|
||||
self._consultores.append(self._gerar_json_detalhes(consultor))
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
@@ -94,9 +69,6 @@ class ProcessarRankingJob:
|
||||
logger.debug(f"Traceback: {traceback.format_exc()}")
|
||||
continue
|
||||
|
||||
if consultores_para_inserir:
|
||||
self.ranking_repo.inserir_batch(consultores_para_inserir)
|
||||
|
||||
job_status.atualizar_progresso(
|
||||
processados=progress["processados"],
|
||||
batch_atual=progress["batch_atual"],
|
||||
@@ -104,10 +76,29 @@ class ProcessarRankingJob:
|
||||
)
|
||||
|
||||
def _gerar_json_detalhes(self, consultor) -> dict:
|
||||
bloco_b = 0 # reservado no V1 (dados incompletos)
|
||||
pontuacao = consultor.pontuacao.to_dict() if consultor.pontuacao else None
|
||||
if isinstance(pontuacao, dict):
|
||||
pontuacao = dict(pontuacao)
|
||||
pontuacao["bloco_b"] = {"bloco": "B", "total": bloco_b, "atuacoes": []}
|
||||
pontuacao["pontuacao_total"] = (
|
||||
pontuacao.get("pontuacao_total", 0) + bloco_b
|
||||
if isinstance(pontuacao.get("pontuacao_total"), (int, float))
|
||||
else consultor.pontuacao_total + bloco_b
|
||||
)
|
||||
|
||||
return {
|
||||
"id_pessoa": consultor.id_pessoa,
|
||||
"nome": consultor.nome,
|
||||
"cpf": consultor.cpf,
|
||||
"posicao": None,
|
||||
"pontuacao_total": consultor.pontuacao_total + bloco_b,
|
||||
"bloco_a": consultor.pontuacao_bloco_a,
|
||||
"bloco_b": bloco_b,
|
||||
"bloco_c": consultor.pontuacao_bloco_c,
|
||||
"bloco_d": consultor.pontuacao_bloco_d,
|
||||
"ativo": consultor.ativo,
|
||||
"anos_atuacao": consultor.anos_atuacao,
|
||||
"coordenador_ppg": consultor.coordenador_ppg,
|
||||
"coordenacoes_capes": [
|
||||
{
|
||||
"codigo": c.codigo,
|
||||
@@ -115,7 +106,8 @@ class ProcessarRankingJob:
|
||||
"area_avaliacao": c.area_avaliacao,
|
||||
"inicio": c.periodo.inicio.isoformat() if c.periodo.inicio else None,
|
||||
"fim": c.periodo.fim.isoformat() if c.periodo.fim else None,
|
||||
"ativo": c.periodo.ativo
|
||||
"ativo": c.periodo.ativo,
|
||||
"presidente": c.presidente,
|
||||
}
|
||||
for c in consultor.coordenacoes_capes
|
||||
],
|
||||
@@ -153,7 +145,8 @@ class ProcessarRankingJob:
|
||||
"codigo": p.codigo,
|
||||
"tipo": p.tipo,
|
||||
"nome_premio": p.nome_premio,
|
||||
"ano": p.ano
|
||||
"ano": p.ano,
|
||||
"papel": p.papel,
|
||||
}
|
||||
for p in consultor.premiacoes
|
||||
],
|
||||
@@ -179,7 +172,10 @@ class ProcessarRankingJob:
|
||||
"codigo": o.codigo,
|
||||
"tipo": o.tipo,
|
||||
"nivel": o.nivel,
|
||||
"ano": o.ano
|
||||
"ano": o.ano,
|
||||
"coorientacao": o.coorientacao,
|
||||
"premiada": o.premiada,
|
||||
"premiacao_tipo": o.premiacao_tipo,
|
||||
}
|
||||
for o in consultor.orientacoes
|
||||
],
|
||||
@@ -192,5 +188,68 @@ class ProcessarRankingJob:
|
||||
}
|
||||
for m in consultor.membros_banca
|
||||
],
|
||||
"pontuacao": consultor.pontuacao.to_dict() if consultor.pontuacao else None
|
||||
"pontuacao": pontuacao,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _gerar_entries_ordenadas(consultores: List[dict]) -> List[RankingEntry]:
|
||||
consultores_ordenados = sorted(
|
||||
consultores,
|
||||
key=lambda c: (int(c.get("pontuacao_total", 0)), -int(c.get("id_pessoa", 0))),
|
||||
reverse=True,
|
||||
)
|
||||
entries: List[RankingEntry] = []
|
||||
for idx, c in enumerate(consultores_ordenados, start=1):
|
||||
c["posicao"] = idx
|
||||
entries.append(
|
||||
RankingEntry(
|
||||
id_pessoa=int(c["id_pessoa"]),
|
||||
nome=str(c.get("nome", "")),
|
||||
posicao=idx,
|
||||
pontuacao_total=int(c.get("pontuacao_total", 0)),
|
||||
bloco_a=int(c.get("bloco_a", 0)),
|
||||
bloco_b=int(c.get("bloco_b", 0)),
|
||||
bloco_c=int(c.get("bloco_c", 0)),
|
||||
bloco_d=int(c.get("bloco_d", 0)),
|
||||
ativo=bool(c.get("ativo", False)),
|
||||
anos_atuacao=float(c.get("anos_atuacao", 0) or 0),
|
||||
detalhes=c,
|
||||
)
|
||||
)
|
||||
return entries
|
||||
|
||||
@staticmethod
|
||||
def _obter_estatisticas(entries: List[RankingEntry]) -> Dict[str, Any]:
|
||||
if not entries:
|
||||
return {
|
||||
"total_consultores": 0,
|
||||
"total_ativos": 0,
|
||||
"total_inativos": 0,
|
||||
"ultima_atualizacao": None,
|
||||
"pontuacao_media": 0,
|
||||
"pontuacao_maxima": 0,
|
||||
"pontuacao_minima": 0,
|
||||
"media_componentes": {"a": 0, "b": 0, "c": 0, "d": 0},
|
||||
}
|
||||
|
||||
total = len(entries)
|
||||
ativos = sum(1 for e in entries if e.ativo)
|
||||
inativos = total - ativos
|
||||
totais = [e.pontuacao_total for e in entries]
|
||||
media_total = sum(totais) / total if total else 0
|
||||
|
||||
return {
|
||||
"total_consultores": total,
|
||||
"total_ativos": ativos,
|
||||
"total_inativos": inativos,
|
||||
"ultima_atualizacao": None,
|
||||
"pontuacao_media": float(round(media_total, 2)),
|
||||
"pontuacao_maxima": float(max(totais) if totais else 0),
|
||||
"pontuacao_minima": float(min(totais) if totais else 0),
|
||||
"media_componentes": {
|
||||
"a": float(round(sum(e.bloco_a for e in entries) / total, 2)),
|
||||
"b": float(round(sum(e.bloco_b for e in entries) / total, 2)),
|
||||
"c": float(round(sum(e.bloco_c for e in entries) / total, 2)),
|
||||
"d": float(round(sum(e.bloco_d for e in entries) / total, 2)),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -4,15 +4,13 @@ from datetime import datetime, time, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from .processar_ranking import ProcessarRankingJob
|
||||
from .popular_componente_b_job import PopularComponenteBJob
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RankingScheduler:
|
||||
def __init__(self, job: ProcessarRankingJob, job_componente_b: PopularComponenteBJob | None = None):
|
||||
def __init__(self, job: ProcessarRankingJob):
|
||||
self.job = job
|
||||
self.job_componente_b = job_componente_b
|
||||
self.task: Optional[asyncio.Task] = None
|
||||
self.running = False
|
||||
|
||||
@@ -45,10 +43,6 @@ class RankingScheduler:
|
||||
logger.info("Executando job de ranking automático")
|
||||
await self.job.executar(limpar_antes=True)
|
||||
|
||||
if self.job_componente_b:
|
||||
logger.info("Executando popular_componente_b após ranking")
|
||||
await asyncio.to_thread(self.job_componente_b.executar)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Scheduler cancelado")
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user