Files
ranking/backend/src/application/jobs/processar_ranking.py
Frederico Castro f91651056a Refatoracao de qualidade de codigo
- Mover logica de routes.py para RankingMapper na camada de aplicacao
- Consolidar funcoes mesclar_periodos e anos_completos_periodos em periodo.py
- Extrair RankingCache para modulo separado em infrastructure/cache
- Substituir todos os print() por logging adequado
- Corrigir exception handlers genericos para tipos especificos
- Remover classe Atuacao e atributo atuacoes_raw nao utilizados
- Documentar status dos scripts utilitarios
2025-12-14 21:47:00 -03:00

197 lines
7.3 KiB
Python

import json
import logging
from datetime import datetime
from typing import Optional, Dict, Any
from ...infrastructure.elasticsearch.client import ElasticsearchClient
logger = logging.getLogger(__name__)
from ...infrastructure.oracle.client import OracleClient
from ...infrastructure.oracle.ranking_repository import RankingOracleRepository
from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl
from ...domain.services.calculador_pontuacao import CalculadorPontuacao
from .job_status import job_status
class ProcessarRankingJob:
def __init__(
self,
es_client: ElasticsearchClient,
oracle_remote_client: OracleClient,
oracle_local_client: OracleClient,
ranking_repo: RankingOracleRepository,
):
self.es_client = es_client
self.oracle_remote_client = oracle_remote_client
self.oracle_local_client = oracle_local_client
self.ranking_repo = ranking_repo
self.consultor_repo = ConsultorRepositoryImpl(es_client, oracle_client=None)
self.calculador = CalculadorPontuacao()
async def executar(self, limpar_antes: bool = True) -> Dict[str, Any]:
if job_status.is_running:
raise RuntimeError("Job já está em execução")
try:
total = await self.es_client.contar_com_atuacoes()
job_status.iniciar(total_esperado=total)
if limpar_antes:
job_status.mensagem = "Limpando tabela de ranking..."
self.ranking_repo.limpar_tabela()
job_status.mensagem = "Iniciando processamento via Scroll API..."
resultado = await self.es_client.buscar_todos_consultores(
callback=self._processar_batch,
batch_size=5000
)
job_status.mensagem = "Atualizando posições no ranking..."
self.ranking_repo.atualizar_posicoes()
estatisticas = self.ranking_repo.obter_estatisticas()
job_status.finalizar(sucesso=True)
return {
"sucesso": True,
"total_processados": resultado["processados"],
"total_batches": resultado["batches"],
"tempo_decorrido": job_status.tempo_decorrido,
"estatisticas": estatisticas
}
except Exception as e:
job_status.finalizar(sucesso=False, erro=str(e))
raise RuntimeError(f"Erro ao processar ranking: {e}")
async def _processar_batch(self, docs: list, progress: dict) -> None:
consultores_para_inserir = []
for doc in docs:
try:
consultor = await self.consultor_repo._construir_consultor(doc)
consultor_dict = {
"id_pessoa": consultor.id_pessoa,
"nome": consultor.nome,
"pontuacao_total": consultor.pontuacao_total,
"componente_a": consultor.pontuacao_bloco_a,
"componente_b": 0,
"componente_c": consultor.pontuacao_bloco_c,
"componente_d": consultor.pontuacao_bloco_d,
"ativo": consultor.ativo,
"anos_atuacao": consultor.anos_atuacao,
"detalhes": self._gerar_json_detalhes(consultor)
}
consultores_para_inserir.append(consultor_dict)
except Exception as e:
import traceback
logger.warning(f"Erro ao processar consultor {doc.get('id')}: {e}")
logger.debug(f"Traceback: {traceback.format_exc()}")
continue
if consultores_para_inserir:
self.ranking_repo.inserir_batch(consultores_para_inserir)
job_status.atualizar_progresso(
processados=progress["processados"],
batch_atual=progress["batch_atual"],
mensagem=f"Processando batch {progress['batch_atual']} ({progress['percentual']}%)"
)
def _gerar_json_detalhes(self, consultor) -> dict:
return {
"id_pessoa": consultor.id_pessoa,
"nome": consultor.nome,
"cpf": consultor.cpf,
"coordenacoes_capes": [
{
"codigo": c.codigo,
"tipo": c.tipo,
"area_avaliacao": c.area_avaliacao,
"inicio": c.periodo.inicio.isoformat() if c.periodo.inicio else None,
"fim": c.periodo.fim.isoformat() if c.periodo.fim else None,
"ativo": c.periodo.ativo
}
for c in consultor.coordenacoes_capes
],
"consultoria": {
"codigo": consultor.consultoria.codigo,
"situacao": consultor.consultoria.situacao,
"inicio": consultor.consultoria.periodo.inicio.isoformat() if consultor.consultoria.periodo.inicio else None,
"fim": consultor.consultoria.periodo.fim.isoformat() if consultor.consultoria.periodo.fim else None,
"areas": consultor.consultoria.areas,
"anos_consecutivos": consultor.consultoria.anos_consecutivos,
"retornos": consultor.consultoria.retornos
} if consultor.consultoria else None,
"inscricoes": [
{
"codigo": i.codigo,
"tipo": i.tipo,
"premio": i.premio,
"ano": i.ano,
"situacao": i.situacao
}
for i in consultor.inscricoes
],
"avaliacoes_comissao": [
{
"codigo": a.codigo,
"tipo": a.tipo,
"premio": a.premio,
"ano": a.ano,
"comissao_tipo": a.comissao_tipo
}
for a in consultor.avaliacoes_comissao
],
"premiacoes": [
{
"codigo": p.codigo,
"tipo": p.tipo,
"nome_premio": p.nome_premio,
"ano": p.ano
}
for p in consultor.premiacoes
],
"bolsas_cnpq": [
{
"codigo": b.codigo,
"nivel": b.nivel,
"area": b.area
}
for b in consultor.bolsas_cnpq
],
"participacoes": [
{
"codigo": p.codigo,
"tipo": p.tipo,
"descricao": p.descricao,
"ano": p.ano
}
for p in consultor.participacoes
],
"orientacoes": [
{
"codigo": o.codigo,
"tipo": o.tipo,
"nivel": o.nivel,
"ano": o.ano
}
for o in consultor.orientacoes
],
"membros_banca": [
{
"codigo": m.codigo,
"tipo": m.tipo,
"nivel": m.nivel,
"ano": m.ano
}
for m in consultor.membros_banca
],
"pontuacao": consultor.pontuacao.to_dict() if consultor.pontuacao else None
}