Automatiza componente B e ajuste frontend do ranking
This commit is contained in:
159
backend/src/application/jobs/popular_componente_b_job.py
Normal file
159
backend/src/application/jobs/popular_componente_b_job.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from datetime import datetime
|
||||
from itertools import islice
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from ...infrastructure.oracle.client import OracleClient
|
||||
|
||||
|
||||
class PopularComponenteBJob:
|
||||
"""
|
||||
Preenche COMPONENTE_B na TB_RANKING_CONSULTOR usando PPGs do Oracle remoto.
|
||||
Usa lotes (IN) para reduzir round-trips.
|
||||
"""
|
||||
|
||||
def __init__(self, oracle_local_client: OracleClient, oracle_remote_client: OracleClient):
|
||||
self.oracle_local = oracle_local_client
|
||||
self.oracle_remote = oracle_remote_client
|
||||
|
||||
@staticmethod
|
||||
def _chunked(iterable, size):
|
||||
it = iter(iterable)
|
||||
while True:
|
||||
bloco = list(islice(it, size))
|
||||
if not bloco:
|
||||
break
|
||||
yield bloco
|
||||
|
||||
@staticmethod
|
||||
def _calcular_componente_b(coordenacoes: List[Dict]) -> int:
|
||||
if not coordenacoes:
|
||||
return 0
|
||||
|
||||
base = 70
|
||||
anos_totais = 0
|
||||
for c in coordenacoes:
|
||||
inicio = c.get("DT_INICIO_VIGENCIA")
|
||||
fim = c.get("DT_FIM_VIGENCIA")
|
||||
if not inicio:
|
||||
continue
|
||||
if fim and fim < inicio:
|
||||
fim = None
|
||||
fim_ref = fim or datetime.now()
|
||||
anos_totais += (fim_ref - inicio).days // 365
|
||||
|
||||
tempo = min(int(anos_totais * 5), 50)
|
||||
|
||||
programas_distintos = len({c.get("ID_PROGRAMA_SNPG") for c in coordenacoes})
|
||||
extras = min((programas_distintos - 1) * 20, 40) if programas_distintos > 1 else 0
|
||||
|
||||
maior_nota = 0
|
||||
for c in coordenacoes:
|
||||
nota_str = str(c.get("DS_CONCEITO") or "").strip()
|
||||
if nota_str in ("7", "6", "5", "4", "3"):
|
||||
maior_nota = max(maior_nota, int(nota_str))
|
||||
|
||||
bonus_nota = {7: 20, 6: 15, 5: 10, 4: 5, 3: 0}.get(maior_nota, 0)
|
||||
|
||||
return base + tempo + extras + bonus_nota
|
||||
|
||||
def _buscar_ids_pendentes(self) -> List[int]:
|
||||
query = "SELECT ID_PESSOA FROM TB_RANKING_CONSULTOR WHERE NVL(COMPONENTE_B,0) = 0"
|
||||
resultados = self.oracle_local.executar_query(query)
|
||||
return [int(r["ID_PESSOA"]) for r in resultados] if resultados else []
|
||||
|
||||
def _buscar_ppg_lote(self, ids: List[int]) -> List[Dict]:
|
||||
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
|
||||
params = {f"id{i}": val for i, val in enumerate(ids)}
|
||||
query = f"""
|
||||
SELECT
|
||||
c.ID_PESSOA,
|
||||
c.ID_PROGRAMA_SNPG,
|
||||
p.NM_PROGRAMA,
|
||||
p.CD_PROGRAMA_PPG,
|
||||
p.DS_CONCEITO,
|
||||
p.NM_PROGRAMA_MODALIDADE,
|
||||
aa.NM_AREA_AVALIACAO,
|
||||
c.DT_INICIO_VIGENCIA,
|
||||
c.DT_FIM_VIGENCIA
|
||||
FROM SUCUPIRA_PAINEL.VM_COORDENADOR c
|
||||
INNER JOIN SUCUPIRA_PAINEL.VM_PROGRAMA_SUCUPIRA p ON c.ID_PROGRAMA_SNPG = p.ID_PROGRAMA
|
||||
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_CONHECIMENTO ac ON p.ID_AREA_CONHECIMENTO_ATUAL = ac.ID_AREA_CONHECIMENTO
|
||||
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_AVALIACAO aa ON ac.ID_AREA_AVALIACAO = aa.ID_AREA_AVALIACAO
|
||||
WHERE c.ID_PESSOA IN ({placeholders})
|
||||
"""
|
||||
return self.oracle_remote.executar_query(query, params)
|
||||
|
||||
def executar(self, batch_ids: int = 500, batch_updates: int = 1000) -> None:
|
||||
"""
|
||||
Executa a rotina de preenchimento do Componente B.
|
||||
Este método é síncrono; use asyncio.to_thread quando chamá-lo em corrotina.
|
||||
"""
|
||||
if not self.oracle_local.is_connected:
|
||||
print("PopularComponenteB: Oracle LOCAL não conectado, abortando.")
|
||||
return
|
||||
if not self.oracle_remote.is_connected:
|
||||
print("PopularComponenteB: Oracle REMOTO não conectado, abortando.")
|
||||
return
|
||||
|
||||
ids_pessoas = self._buscar_ids_pendentes()
|
||||
total_ids = len(ids_pessoas)
|
||||
print(f"PopularComponenteB: {total_ids} consultores pendentes para COMPONENTE_B")
|
||||
|
||||
processados = 0
|
||||
com_ppg = 0
|
||||
batch = []
|
||||
|
||||
for lote in self._chunked(ids_pessoas, batch_ids):
|
||||
try:
|
||||
registros = self._buscar_ppg_lote(lote)
|
||||
except Exception as e:
|
||||
print(f"PopularComponenteB: erro ao buscar lote {lote[:3]}... -> {e}")
|
||||
continue
|
||||
|
||||
por_pessoa: Dict[int, List[Dict]] = {}
|
||||
for r in registros:
|
||||
por_pessoa.setdefault(int(r["ID_PESSOA"]), []).append(r)
|
||||
|
||||
for id_pessoa in lote:
|
||||
coordenacoes = por_pessoa.get(id_pessoa, [])
|
||||
if not coordenacoes:
|
||||
processados += 1
|
||||
continue
|
||||
|
||||
comp_b = self._calcular_componente_b(coordenacoes)
|
||||
batch.append({"comp_b": comp_b, "id_pessoa": id_pessoa})
|
||||
com_ppg += 1
|
||||
processados += 1
|
||||
|
||||
if len(batch) >= batch_updates:
|
||||
self._aplicar_batch(batch)
|
||||
print(f"PopularComponenteB: Processados {processados}/{total_ids} | Com PPG: {com_ppg}")
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
self._aplicar_batch(batch)
|
||||
|
||||
self._atualizar_posicoes()
|
||||
print(f"PopularComponenteB: Finalizado. Processados={processados} Com PPG={com_ppg}")
|
||||
|
||||
def _aplicar_batch(self, batch: List[Dict[str, int]]) -> None:
|
||||
if not batch:
|
||||
return
|
||||
update_sql = """
|
||||
UPDATE TB_RANKING_CONSULTOR
|
||||
SET COMPONENTE_B = :comp_b,
|
||||
PONTUACAO_TOTAL = COMPONENTE_A + :comp_b + COMPONENTE_C + COMPONENTE_D
|
||||
WHERE ID_PESSOA = :id_pessoa
|
||||
"""
|
||||
with self.oracle_local.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.executemany(update_sql, batch)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def _atualizar_posicoes(self) -> None:
|
||||
with self.oracle_local.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.callproc("SP_ATUALIZAR_POSICOES")
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
@@ -3,11 +3,13 @@ from datetime import datetime, time, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from .processar_ranking import ProcessarRankingJob
|
||||
from .popular_componente_b_job import PopularComponenteBJob
|
||||
|
||||
|
||||
class RankingScheduler:
|
||||
def __init__(self, job: ProcessarRankingJob):
|
||||
def __init__(self, job: ProcessarRankingJob, job_componente_b: PopularComponenteBJob | None = None):
|
||||
self.job = job
|
||||
self.job_componente_b = job_componente_b
|
||||
self.task: Optional[asyncio.Task] = None
|
||||
self.running = False
|
||||
|
||||
@@ -40,6 +42,10 @@ class RankingScheduler:
|
||||
print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S')}] Executando job de ranking automático")
|
||||
await self.job.executar(limpar_antes=True)
|
||||
|
||||
if self.job_componente_b:
|
||||
print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S')}] Executando popular_componente_b após ranking")
|
||||
await asyncio.to_thread(self.job_componente_b.executar)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
print("Scheduler cancelado")
|
||||
break
|
||||
|
||||
@@ -4,7 +4,13 @@ from contextlib import asynccontextmanager
|
||||
|
||||
from .routes import router
|
||||
from .config import settings
|
||||
from .dependencies import es_client, oracle_local_client, oracle_remote_client, get_processar_job
|
||||
from .dependencies import (
|
||||
es_client,
|
||||
oracle_local_client,
|
||||
oracle_remote_client,
|
||||
get_processar_job,
|
||||
get_popular_componente_b_job,
|
||||
)
|
||||
from ...application.jobs.scheduler import RankingScheduler
|
||||
|
||||
|
||||
@@ -29,7 +35,8 @@ async def lifespan(app: FastAPI):
|
||||
scheduler = None
|
||||
try:
|
||||
job = get_processar_job()
|
||||
scheduler = RankingScheduler(job)
|
||||
job_b = get_popular_componente_b_job()
|
||||
scheduler = RankingScheduler(job, job_componente_b=job_b)
|
||||
await scheduler.iniciar()
|
||||
except Exception as e:
|
||||
print(f"AVISO: Scheduler não iniciou: {e}")
|
||||
|
||||
@@ -3,6 +3,7 @@ from ...infrastructure.oracle.client import OracleClient
|
||||
from ...infrastructure.oracle.ranking_repository import RankingOracleRepository
|
||||
from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl
|
||||
from ...application.jobs.processar_ranking import ProcessarRankingJob
|
||||
from ...application.jobs.popular_componente_b_job import PopularComponenteBJob
|
||||
from .config import settings
|
||||
|
||||
|
||||
@@ -30,6 +31,7 @@ oracle_remote_client = OracleClient(
|
||||
_repository: ConsultorRepositoryImpl = None
|
||||
_ranking_repository: RankingOracleRepository = None
|
||||
_processar_job: ProcessarRankingJob = None
|
||||
_popular_b_job: PopularComponenteBJob = None
|
||||
|
||||
|
||||
def get_repository() -> ConsultorRepositoryImpl:
|
||||
@@ -56,3 +58,13 @@ def get_processar_job() -> ProcessarRankingJob:
|
||||
ranking_repo=get_ranking_repository()
|
||||
)
|
||||
return _processar_job
|
||||
|
||||
|
||||
def get_popular_componente_b_job() -> PopularComponenteBJob:
|
||||
global _popular_b_job
|
||||
if _popular_b_job is None:
|
||||
_popular_b_job = PopularComponenteBJob(
|
||||
oracle_local_client=oracle_local_client,
|
||||
oracle_remote_client=oracle_remote_client
|
||||
)
|
||||
return _popular_b_job
|
||||
|
||||
@@ -20,6 +20,7 @@ from ..schemas.ranking_schema import (
|
||||
)
|
||||
from .dependencies import get_repository, get_ranking_repository, get_processar_job
|
||||
from ...application.jobs.job_status import job_status
|
||||
import json
|
||||
|
||||
router = APIRouter(prefix="/api/v1", tags=["ranking"])
|
||||
|
||||
@@ -102,19 +103,7 @@ async def ranking_paginado(
|
||||
total_pages = (total + size - 1) // size
|
||||
|
||||
consultores_schema = [
|
||||
ConsultorRankingResumoSchema(
|
||||
id_pessoa=c.id_pessoa,
|
||||
nome=c.nome,
|
||||
posicao=c.posicao,
|
||||
pontuacao_total=c.pontuacao_total,
|
||||
componente_a=c.componente_a,
|
||||
componente_b=c.componente_b,
|
||||
componente_c=c.componente_c,
|
||||
componente_d=c.componente_d,
|
||||
ativo=c.ativo,
|
||||
anos_atuacao=c.anos_atuacao
|
||||
)
|
||||
for c in consultores
|
||||
_consultor_resumo_from_ranking(c) for c in consultores
|
||||
]
|
||||
|
||||
return RankingPaginadoResponseSchema(
|
||||
@@ -126,6 +115,29 @@ async def ranking_paginado(
|
||||
)
|
||||
|
||||
|
||||
def _consultor_resumo_from_ranking(c):
|
||||
consultoria = None
|
||||
try:
|
||||
jd = json.loads(c.json_detalhes) if c.json_detalhes else {}
|
||||
consultoria = jd.get("consultoria") if isinstance(jd, dict) else None
|
||||
except Exception:
|
||||
consultoria = None
|
||||
|
||||
return ConsultorRankingResumoSchema(
|
||||
id_pessoa=c.id_pessoa,
|
||||
nome=c.nome,
|
||||
posicao=c.posicao,
|
||||
pontuacao_total=c.pontuacao_total,
|
||||
componente_a=c.componente_a,
|
||||
componente_b=c.componente_b,
|
||||
componente_c=c.componente_c,
|
||||
componente_d=c.componente_d,
|
||||
ativo=c.ativo,
|
||||
anos_atuacao=c.anos_atuacao,
|
||||
consultoria=consultoria,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/ranking/estatisticas", response_model=EstatisticasRankingSchema)
|
||||
async def ranking_estatisticas(
|
||||
ranking_repo = Depends(get_ranking_repository),
|
||||
|
||||
@@ -14,6 +14,7 @@ class ConsultorRankingResumoSchema(BaseModel):
|
||||
componente_d: float
|
||||
ativo: bool
|
||||
anos_atuacao: float
|
||||
consultoria: Optional[dict] = None
|
||||
|
||||
|
||||
class RankingPaginadoResponseSchema(BaseModel):
|
||||
|
||||
Reference in New Issue
Block a user