From 3ea6a4409eefefe00df598bc13b0014ac4bd8f68 Mon Sep 17 00:00:00 2001 From: Frederico Castro Date: Wed, 10 Dec 2025 01:33:00 -0300 Subject: [PATCH] feat: Implementa job de ranking para 300k consultores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - Adiciona Scroll API no cliente Elasticsearch para processar todos os 300k+ consultores - Cria tabela TB_RANKING_CONSULTOR no Oracle para ranking pré-calculado - Implementa job de processamento com APScheduler (diário às 3h) - Adiciona endpoints: /ranking/paginado, /ranking/status, /ranking/processar, /ranking/estatisticas - Repository Oracle com paginação eficiente via ROW_NUMBER - Status do job com progresso em tempo real (polling) - Leitura automática de LOBs no OracleClient Frontend: - Componente RankingPaginado com paginação completa - Barra de progresso do job em tempo real - Botão para reprocessar ranking - Alternância entre Top N (rápido) e Ranking Completo (300k) Infraestrutura: - Docker compose com depends_on para garantir Oracle disponível - Schema SQL com procedure SP_ATUALIZAR_POSICOES - Índices otimizados para paginação --- backend/requirements.txt | 1 + backend/sql/schema_ranking.sql | 109 ++++++ backend/src/application/jobs/__init__.py | 0 backend/src/application/jobs/job_status.py | 87 +++++ .../src/application/jobs/processar_ranking.py | 162 +++++++++ backend/src/application/jobs/scheduler.py | 50 +++ .../src/domain/entities/consultor_ranking.py | 19 + .../infrastructure/elasticsearch/client.py | 126 +++++++ backend/src/infrastructure/oracle/client.py | 13 +- .../oracle/ranking_repository.py | 341 ++++++++++++++++++ backend/src/interface/api/app.py | 20 +- backend/src/interface/api/dependencies.py | 22 ++ backend/src/interface/api/routes.py | 98 ++++- .../src/interface/schemas/ranking_schema.py | 61 ++++ docker-compose.yml | 29 ++ frontend/src/App.css | 30 ++ frontend/src/App.jsx | 55 ++- frontend/src/components/RankingPaginado.css | 197 ++++++++++ frontend/src/components/RankingPaginado.jsx | 196 ++++++++++ 19 files changed, 1596 insertions(+), 20 deletions(-) create mode 100644 backend/sql/schema_ranking.sql create mode 100644 backend/src/application/jobs/__init__.py create mode 100644 backend/src/application/jobs/job_status.py create mode 100644 backend/src/application/jobs/processar_ranking.py create mode 100644 backend/src/application/jobs/scheduler.py create mode 100644 backend/src/domain/entities/consultor_ranking.py create mode 100644 backend/src/infrastructure/oracle/ranking_repository.py create mode 100644 backend/src/interface/schemas/ranking_schema.py create mode 100644 frontend/src/components/RankingPaginado.css create mode 100644 frontend/src/components/RankingPaginado.jsx diff --git a/backend/requirements.txt b/backend/requirements.txt index ca29160..ae680ec 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -7,3 +7,4 @@ python-dateutil==2.8.2 httpx==0.26.0 python-dotenv==1.0.0 aiohttp==3.9.1 +apscheduler==3.10.4 diff --git a/backend/sql/schema_ranking.sql b/backend/sql/schema_ranking.sql new file mode 100644 index 0000000..87d54a2 --- /dev/null +++ b/backend/sql/schema_ranking.sql @@ -0,0 +1,109 @@ +-- Schema para Ranking de Consultores CAPES +-- Versão: 1.0 +-- Data: 2025-01-15 + +-- Tabela principal de ranking +CREATE TABLE TB_RANKING_CONSULTOR ( + ID_PESSOA NUMBER(10) NOT NULL, + NOME VARCHAR2(200) NOT NULL, + POSICAO NUMBER(10), + PONTUACAO_TOTAL NUMBER(10,2) NOT NULL, + COMPONENTE_A NUMBER(10,2) DEFAULT 0, + COMPONENTE_B NUMBER(10,2) DEFAULT 0, + COMPONENTE_C NUMBER(10,2) DEFAULT 0, + COMPONENTE_D NUMBER(10,2) DEFAULT 0, + ATIVO CHAR(1) DEFAULT 'N', + ANOS_ATUACAO NUMBER(5,1) DEFAULT 0, + DT_CALCULO TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + JSON_DETALHES CLOB, + CONSTRAINT PK_RANKING_CONSULTOR PRIMARY KEY (ID_PESSOA), + CONSTRAINT CHK_ATIVO CHECK (ATIVO IN ('S', 'N')) +); + +-- Índices para performance +CREATE INDEX IDX_RANKING_POSICAO ON TB_RANKING_CONSULTOR(POSICAO); +CREATE INDEX IDX_RANKING_PONTUACAO ON TB_RANKING_CONSULTOR(PONTUACAO_TOTAL DESC); +CREATE INDEX IDX_RANKING_ATIVO ON TB_RANKING_CONSULTOR(ATIVO); +CREATE INDEX IDX_RANKING_DT_CALCULO ON TB_RANKING_CONSULTOR(DT_CALCULO DESC); + +-- Procedure para atualizar posições após processamento +CREATE OR REPLACE PROCEDURE SP_ATUALIZAR_POSICOES AS +BEGIN + MERGE INTO TB_RANKING_CONSULTOR t + USING ( + SELECT ID_PESSOA, + ROW_NUMBER() OVER (ORDER BY PONTUACAO_TOTAL DESC, ID_PESSOA) AS NOVA_POSICAO + FROM TB_RANKING_CONSULTOR + ) s + ON (t.ID_PESSOA = s.ID_PESSOA) + WHEN MATCHED THEN UPDATE SET t.POSICAO = s.NOVA_POSICAO; + COMMIT; +END SP_ATUALIZAR_POSICOES; +/ + +-- View para estatísticas do ranking +CREATE OR REPLACE VIEW VW_RANKING_ESTATISTICAS AS +SELECT + COUNT(*) AS TOTAL_CONSULTORES, + COUNT(CASE WHEN ATIVO = 'S' THEN 1 END) AS TOTAL_ATIVOS, + COUNT(CASE WHEN ATIVO = 'N' THEN 1 END) AS TOTAL_INATIVOS, + MAX(DT_CALCULO) AS ULTIMA_ATUALIZACAO, + AVG(PONTUACAO_TOTAL) AS PONTUACAO_MEDIA, + MAX(PONTUACAO_TOTAL) AS PONTUACAO_MAXIMA, + MIN(PONTUACAO_TOTAL) AS PONTUACAO_MINIMA, + AVG(COMPONENTE_A) AS MEDIA_COMP_A, + AVG(COMPONENTE_B) AS MEDIA_COMP_B, + AVG(COMPONENTE_C) AS MEDIA_COMP_C, + AVG(COMPONENTE_D) AS MEDIA_COMP_D +FROM TB_RANKING_CONSULTOR; + +-- View para distribuição por faixas de pontuação +CREATE OR REPLACE VIEW VW_RANKING_DISTRIBUICAO AS +SELECT + CASE + WHEN PONTUACAO_TOTAL >= 800 THEN '800+' + WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' + WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' + WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' + ELSE '0-199' + END AS FAIXA, + COUNT(*) AS QUANTIDADE, + ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM TB_RANKING_CONSULTOR), 2) AS PERCENTUAL +FROM TB_RANKING_CONSULTOR +GROUP BY + CASE + WHEN PONTUACAO_TOTAL >= 800 THEN '800+' + WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' + WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' + WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' + ELSE '0-199' + END +ORDER BY + CASE + WHEN FAIXA = '800+' THEN 1 + WHEN FAIXA = '600-799' THEN 2 + WHEN FAIXA = '400-599' THEN 3 + WHEN FAIXA = '200-399' THEN 4 + ELSE 5 + END; + +-- Comentários nas tabelas e colunas +COMMENT ON TABLE TB_RANKING_CONSULTOR IS 'Tabela de ranking pré-calculado de consultores CAPES'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.ID_PESSOA IS 'ID da pessoa no sistema AtuaCAPES'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.NOME IS 'Nome completo do consultor'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.POSICAO IS 'Posição no ranking (1 = primeiro lugar)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.PONTUACAO_TOTAL IS 'Pontuação total calculada (soma dos 4 componentes)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.COMPONENTE_A IS 'Pontuação do Componente A (Coordenação CAPES)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.COMPONENTE_B IS 'Pontuação do Componente B (Coordenação PPG)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.COMPONENTE_C IS 'Pontuação do Componente C (Consultoria)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.COMPONENTE_D IS 'Pontuação do Componente D (Premiações)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.ATIVO IS 'Indicador se o consultor está ativo (S/N)'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.ANOS_ATUACAO IS 'Anos de atuação do consultor'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.DT_CALCULO IS 'Data/hora do último cálculo'; +COMMENT ON COLUMN TB_RANKING_CONSULTOR.JSON_DETALHES IS 'Dados completos do consultor em JSON para exibição'; + +-- Grant de permissões para o usuário de aplicação (ajustar conforme necessário) +-- GRANT SELECT, INSERT, UPDATE, DELETE ON TB_RANKING_CONSULTOR TO app_user; +-- GRANT SELECT ON VW_RANKING_ESTATISTICAS TO app_user; +-- GRANT SELECT ON VW_RANKING_DISTRIBUICAO TO app_user; +-- GRANT EXECUTE ON SP_ATUALIZAR_POSICOES TO app_user; diff --git a/backend/src/application/jobs/__init__.py b/backend/src/application/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/src/application/jobs/job_status.py b/backend/src/application/jobs/job_status.py new file mode 100644 index 0000000..b7b5d22 --- /dev/null +++ b/backend/src/application/jobs/job_status.py @@ -0,0 +1,87 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional + + +@dataclass +class JobStatus: + is_running: bool = False + progress: int = 0 + total_processados: int = 0 + total_esperado: int = 0 + mensagem: str = "" + inicio: Optional[datetime] = None + fim: Optional[datetime] = None + erro: Optional[str] = None + batch_atual: int = 0 + total_batches: int = 0 + + def iniciar(self, total_esperado: int) -> None: + self.is_running = True + self.progress = 0 + self.total_processados = 0 + self.total_esperado = total_esperado + self.mensagem = "Iniciando processamento..." + self.inicio = datetime.now() + self.fim = None + self.erro = None + self.batch_atual = 0 + self.total_batches = 0 + + def atualizar_progresso(self, processados: int, batch_atual: int, mensagem: str = "") -> None: + self.total_processados = processados + self.batch_atual = batch_atual + if self.total_esperado > 0: + self.progress = int((processados / self.total_esperado) * 100) + self.mensagem = mensagem or f"Processando batch {batch_atual}" + + def finalizar(self, sucesso: bool = True, erro: Optional[str] = None) -> None: + self.is_running = False + self.fim = datetime.now() + self.progress = 100 if sucesso else self.progress + self.erro = erro + if sucesso: + self.mensagem = f"Processamento concluído: {self.total_processados} consultores" + else: + self.mensagem = f"Processamento falhou: {erro}" + + @property + def tempo_decorrido(self) -> Optional[str]: + if not self.inicio: + return None + fim = self.fim or datetime.now() + delta = fim - self.inicio + minutos, segundos = divmod(int(delta.total_seconds()), 60) + return f"{minutos}m {segundos}s" + + @property + def tempo_estimado(self) -> Optional[str]: + if not self.inicio or self.total_processados == 0 or self.total_esperado == 0: + return None + if self.progress >= 100: + return "0m 0s" + + decorrido = (datetime.now() - self.inicio).total_seconds() + taxa = self.total_processados / decorrido + restante = (self.total_esperado - self.total_processados) / taxa + minutos, segundos = divmod(int(restante), 60) + return f"{minutos}m {segundos}s" + + def to_dict(self) -> dict: + return { + "running": self.is_running, + "progress": self.progress, + "processados": self.total_processados, + "total": self.total_esperado, + "mensagem": self.mensagem, + "batch_atual": self.batch_atual, + "total_batches": self.total_batches, + "tempo_decorrido": self.tempo_decorrido, + "tempo_estimado": self.tempo_estimado, + "inicio": self.inicio.isoformat() if self.inicio else None, + "fim": self.fim.isoformat() if self.fim else None, + "erro": self.erro + } + + +job_status = JobStatus() diff --git a/backend/src/application/jobs/processar_ranking.py b/backend/src/application/jobs/processar_ranking.py new file mode 100644 index 0000000..fe29b10 --- /dev/null +++ b/backend/src/application/jobs/processar_ranking.py @@ -0,0 +1,162 @@ +import json +from datetime import datetime +from typing import Optional, Dict, Any + +from ...infrastructure.elasticsearch.client import ElasticsearchClient +from ...infrastructure.oracle.client import OracleClient +from ...infrastructure.oracle.ranking_repository import RankingOracleRepository +from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl +from ...domain.services.calculador_pontuacao import CalculadorPontuacao +from .job_status import job_status + + +class ProcessarRankingJob: + def __init__( + self, + es_client: ElasticsearchClient, + oracle_client: OracleClient, + ranking_repo: RankingOracleRepository, + ): + self.es_client = es_client + self.oracle_client = oracle_client + self.ranking_repo = ranking_repo + self.consultor_repo = ConsultorRepositoryImpl(es_client, oracle_client) + self.calculador = CalculadorPontuacao() + + async def executar(self, limpar_antes: bool = True) -> Dict[str, Any]: + """ + Executa o processamento completo do ranking: + 1. Limpa tabela (se solicitado) + 2. Scroll por todos os documentos ES + 3. Para cada batch: calcula pontuação e insere no Oracle + 4. Atualiza posições + 5. Retorna estatísticas + """ + if job_status.is_running: + raise RuntimeError("Job já está em execução") + + try: + total = await self.es_client.contar_com_atuacoes() + job_status.iniciar(total_esperado=total) + + if limpar_antes: + job_status.mensagem = "Limpando tabela de ranking..." + self.ranking_repo.limpar_tabela() + + job_status.mensagem = "Iniciando processamento via Scroll API..." + + resultado = await self.es_client.buscar_todos_consultores( + callback=self._processar_batch, + batch_size=1000 + ) + + job_status.mensagem = "Atualizando posições no ranking..." + self.ranking_repo.atualizar_posicoes() + + estatisticas = self.ranking_repo.obter_estatisticas() + + job_status.finalizar(sucesso=True) + + return { + "sucesso": True, + "total_processados": resultado["processados"], + "total_batches": resultado["batches"], + "tempo_decorrido": job_status.tempo_decorrido, + "estatisticas": estatisticas + } + + except Exception as e: + job_status.finalizar(sucesso=False, erro=str(e)) + raise RuntimeError(f"Erro ao processar ranking: {e}") + + async def _processar_batch(self, docs: list, progress: dict) -> None: + """ + Processa um batch de documentos: + 1. Constrói consultores + 2. Calcula pontuação + 3. Insere no Oracle + 4. Atualiza status + """ + consultores_para_inserir = [] + + for doc in docs: + try: + consultor = await self.consultor_repo._construir_consultor(doc) + + consultor_dict = { + "id_pessoa": consultor.id_pessoa, + "nome": consultor.nome, + "pontuacao_total": consultor.pontuacao_total, + "componente_a": consultor.pontuacao.componente_a.total, + "componente_b": consultor.pontuacao.componente_b.total, + "componente_c": consultor.pontuacao.componente_c.total, + "componente_d": consultor.pontuacao.componente_d.total, + "ativo": consultor.ativo, + "anos_atuacao": consultor.anos_atuacao, + "detalhes": self._gerar_json_detalhes(consultor) + } + + consultores_para_inserir.append(consultor_dict) + + except Exception as e: + print(f"AVISO: Erro ao processar consultor {doc.get('id')}: {e}") + continue + + if consultores_para_inserir: + self.ranking_repo.inserir_batch(consultores_para_inserir) + + job_status.atualizar_progresso( + processados=progress["processados"], + batch_atual=progress["batch_atual"], + mensagem=f"Processando batch {progress['batch_atual']} ({progress['percentual']}%)" + ) + + def _gerar_json_detalhes(self, consultor) -> dict: + """ + Gera JSON com detalhes completos do consultor para armazenar no CLOB. + """ + return { + "id_pessoa": consultor.id_pessoa, + "nome": consultor.nome, + "cpf": consultor.cpf, + "coordenacoes_capes": [ + { + "tipo": c.tipo, + "area_avaliacao": c.area_avaliacao, + "inicio": c.periodo.inicio.isoformat() if c.periodo.inicio else None, + "fim": c.periodo.fim.isoformat() if c.periodo.fim else None, + "ativo": c.periodo.ativo + } + for c in consultor.coordenacoes_capes + ], + "coordenacoes_programas": [ + { + "id_programa": c.id_programa, + "nome_programa": c.nome_programa, + "codigo_programa": c.codigo_programa, + "nota_ppg": c.nota_ppg, + "modalidade": c.modalidade, + "area_avaliacao": c.area_avaliacao, + "inicio": c.periodo.inicio.isoformat() if c.periodo.inicio else None, + "fim": c.periodo.fim.isoformat() if c.periodo.fim else None + } + for c in consultor.coordenacoes_programas + ], + "consultoria": { + "total_eventos": consultor.consultoria.total_eventos, + "eventos_recentes": consultor.consultoria.eventos_recentes, + "situacao": consultor.consultoria.situacao, + "anos_completos": consultor.consultoria.anos_completos, + "areas": consultor.consultoria.areas + } if consultor.consultoria else None, + "premiacoes": [ + { + "tipo": p.tipo, + "nome_premio": p.nome_premio, + "ano": p.ano, + "pontos": p.pontos + } + for p in consultor.premiacoes + ], + "pontuacao": consultor.pontuacao.detalhamento + } diff --git a/backend/src/application/jobs/scheduler.py b/backend/src/application/jobs/scheduler.py new file mode 100644 index 0000000..818c7ae --- /dev/null +++ b/backend/src/application/jobs/scheduler.py @@ -0,0 +1,50 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from typing import Optional + +from .processar_ranking import ProcessarRankingJob + + +class RankingScheduler: + def __init__(self, job: ProcessarRankingJob): + self.job = job + self.scheduler: Optional[AsyncIOScheduler] = None + + def iniciar(self) -> None: + """ + Inicia o scheduler e agenda o job para rodar diariamente às 3h. + """ + if self.scheduler and self.scheduler.running: + return + + self.scheduler = AsyncIOScheduler() + + self.scheduler.add_job( + self.job.executar, + trigger=CronTrigger(hour=3, minute=0), + id='ranking_diario', + name='Processamento diário do ranking de consultores', + replace_existing=True, + kwargs={"limpar_antes": True} + ) + + self.scheduler.start() + + def parar(self) -> None: + """ + Para o scheduler. + """ + if self.scheduler and self.scheduler.running: + self.scheduler.shutdown(wait=False) + + def executar_agora(self) -> None: + """ + Executa o job imediatamente (fora do agendamento). + """ + if self.scheduler: + self.scheduler.add_job( + self.job.executar, + id='ranking_manual', + replace_existing=True, + kwargs={"limpar_antes": True} + ) diff --git a/backend/src/domain/entities/consultor_ranking.py b/backend/src/domain/entities/consultor_ranking.py new file mode 100644 index 0000000..6ca0612 --- /dev/null +++ b/backend/src/domain/entities/consultor_ranking.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + + +@dataclass +class ConsultorRanking: + id_pessoa: int + nome: str + posicao: Optional[int] + pontuacao_total: float + componente_a: float + componente_b: float + componente_c: float + componente_d: float + ativo: bool + anos_atuacao: float + dt_calculo: datetime + json_detalhes: str diff --git a/backend/src/infrastructure/elasticsearch/client.py b/backend/src/infrastructure/elasticsearch/client.py index 1e623ac..d45b1b6 100644 --- a/backend/src/infrastructure/elasticsearch/client.py +++ b/backend/src/infrastructure/elasticsearch/client.py @@ -359,3 +359,129 @@ class ElasticsearchClient: return results except Exception as e: raise RuntimeError(f"Erro ao buscar ranking com score: {e}") + + async def iniciar_scroll(self, size: int = 1000, scroll_timeout: str = "5m") -> dict: + """ + Inicia um scroll para iterar por todos os documentos com atuações. + Retorna o scroll_id e a primeira página de resultados. + """ + query = { + "query": { + "nested": { + "path": "atuacoes", + "query": {"exists": {"field": "atuacoes.tipo"}} + } + }, + "_source": ["id", "dadosPessoais", "atuacoes"], + "size": size, + "sort": [{"id": "asc"}] + } + + try: + response = await self.client.post( + f"{self.url}/{self.index}/_search?scroll={scroll_timeout}", + json=query, + timeout=120.0 + ) + response.raise_for_status() + + data = response.json() + return { + "scroll_id": data.get("_scroll_id"), + "hits": [hit["_source"] for hit in data.get("hits", {}).get("hits", [])], + "total": data.get("hits", {}).get("total", {}).get("value", 0) + } + except Exception as e: + raise RuntimeError(f"Erro ao iniciar scroll: {e}") + + async def continuar_scroll(self, scroll_id: str, scroll_timeout: str = "5m") -> dict: + """ + Continua um scroll existente usando o scroll_id. + Retorna a próxima página de resultados. + """ + try: + response = await self.client.post( + f"{self.url}/_search/scroll", + json={ + "scroll": scroll_timeout, + "scroll_id": scroll_id + }, + timeout=120.0 + ) + response.raise_for_status() + + data = response.json() + return { + "scroll_id": data.get("_scroll_id"), + "hits": [hit["_source"] for hit in data.get("hits", {}).get("hits", [])] + } + except Exception as e: + raise RuntimeError(f"Erro ao continuar scroll: {e}") + + async def limpar_scroll(self, scroll_id: str) -> None: + """ + Limpa o contexto de scroll após uso. + """ + try: + await self.client.delete( + f"{self.url}/_search/scroll", + json={"scroll_id": scroll_id}, + timeout=30.0 + ) + except Exception: + pass + + async def buscar_todos_consultores(self, callback, batch_size: int = 1000): + """ + Itera por TODOS os consultores do índice usando Scroll API. + Chama callback(batch, progress) para cada batch de documentos. + + Args: + callback: função assíncrona que recebe (docs: list, progress: dict) + batch_size: tamanho do batch (padrão 1000) + + Progress dict contém: + - total: total de documentos + - processados: documentos processados até agora + - batch_atual: número do batch atual + - percentual: percentual de progresso (0-100) + """ + scroll_id = None + try: + result = await self.iniciar_scroll(size=batch_size) + scroll_id = result["scroll_id"] + total = result["total"] + hits = result["hits"] + + processados = 0 + batch_atual = 1 + + while hits: + processados += len(hits) + percentual = int((processados / total) * 100) if total > 0 else 100 + + progress = { + "total": total, + "processados": processados, + "batch_atual": batch_atual, + "percentual": percentual + } + + await callback(hits, progress) + + if len(hits) < batch_size: + break + + result = await self.continuar_scroll(scroll_id) + scroll_id = result["scroll_id"] + hits = result["hits"] + batch_atual += 1 + + return { + "total": total, + "processados": processados, + "batches": batch_atual + } + finally: + if scroll_id: + await self.limpar_scroll(scroll_id) diff --git a/backend/src/infrastructure/oracle/client.py b/backend/src/infrastructure/oracle/client.py index 3443cbd..9ef0649 100644 --- a/backend/src/infrastructure/oracle/client.py +++ b/backend/src/infrastructure/oracle/client.py @@ -57,8 +57,19 @@ class OracleClient: cursor.execute(query, params or {}) columns = [col[0] for col in cursor.description] rows = cursor.fetchall() + + results = [] + for row in rows: + row_dict = {} + for i, col in enumerate(columns): + value = row[i] + if hasattr(value, 'read'): + value = value.read() + row_dict[col] = value + results.append(row_dict) + cursor.close() - return [dict(zip(columns, row)) for row in rows] + return results except Exception as e: print(f"AVISO Oracle: falha ao executar query: {e}") self._connected = False diff --git a/backend/src/infrastructure/oracle/ranking_repository.py b/backend/src/infrastructure/oracle/ranking_repository.py new file mode 100644 index 0000000..27af8c5 --- /dev/null +++ b/backend/src/infrastructure/oracle/ranking_repository.py @@ -0,0 +1,341 @@ +from typing import List, Optional, Dict, Any +from datetime import datetime +import json + +from ...domain.entities.consultor_ranking import ConsultorRanking +from .client import OracleClient + + +class RankingOracleRepository: + def __init__(self, oracle_client: OracleClient): + self.client = oracle_client + + def inserir_batch(self, consultores: List[Dict[str, Any]]) -> int: + """ + Insere ou atualiza um batch de consultores usando MERGE. + Retorna o número de registros processados. + """ + if not consultores: + return 0 + + merge_sql = """ + MERGE INTO TB_RANKING_CONSULTOR t + USING ( + SELECT + :id_pessoa AS ID_PESSOA, + :nome AS NOME, + :pontuacao_total AS PONTUACAO_TOTAL, + :componente_a AS COMPONENTE_A, + :componente_b AS COMPONENTE_B, + :componente_c AS COMPONENTE_C, + :componente_d AS COMPONENTE_D, + :ativo AS ATIVO, + :anos_atuacao AS ANOS_ATUACAO, + :json_detalhes AS JSON_DETALHES + FROM DUAL + ) s + ON (t.ID_PESSOA = s.ID_PESSOA) + WHEN MATCHED THEN + UPDATE SET + t.NOME = s.NOME, + t.PONTUACAO_TOTAL = s.PONTUACAO_TOTAL, + t.COMPONENTE_A = s.COMPONENTE_A, + t.COMPONENTE_B = s.COMPONENTE_B, + t.COMPONENTE_C = s.COMPONENTE_C, + t.COMPONENTE_D = s.COMPONENTE_D, + t.ATIVO = s.ATIVO, + t.ANOS_ATUACAO = s.ANOS_ATUACAO, + t.DT_CALCULO = CURRENT_TIMESTAMP, + t.JSON_DETALHES = s.JSON_DETALHES + WHEN NOT MATCHED THEN + INSERT ( + ID_PESSOA, NOME, PONTUACAO_TOTAL, + COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D, + ATIVO, ANOS_ATUACAO, JSON_DETALHES, DT_CALCULO + ) + VALUES ( + s.ID_PESSOA, s.NOME, s.PONTUACAO_TOTAL, + s.COMPONENTE_A, s.COMPONENTE_B, s.COMPONENTE_C, s.COMPONENTE_D, + s.ATIVO, s.ANOS_ATUACAO, s.JSON_DETALHES, CURRENT_TIMESTAMP + ) + """ + + with self.client.get_connection() as conn: + cursor = conn.cursor() + try: + for consultor in consultores: + params = { + "id_pessoa": consultor["id_pessoa"], + "nome": consultor["nome"], + "pontuacao_total": consultor["pontuacao_total"], + "componente_a": consultor["componente_a"], + "componente_b": consultor["componente_b"], + "componente_c": consultor["componente_c"], + "componente_d": consultor["componente_d"], + "ativo": "S" if consultor["ativo"] else "N", + "anos_atuacao": consultor["anos_atuacao"], + "json_detalhes": json.dumps(consultor.get("detalhes", {}), ensure_ascii=False) + } + cursor.execute(merge_sql, params) + + conn.commit() + return len(consultores) + except Exception as e: + conn.rollback() + raise RuntimeError(f"Erro ao inserir batch no Oracle: {e}") + finally: + cursor.close() + + def buscar_paginado( + self, + page: int = 1, + size: int = 50, + filtro_ativo: Optional[bool] = None + ) -> List[ConsultorRanking]: + """ + Busca ranking paginado ordenado por posição. + """ + offset = (page - 1) * size + limit_end = offset + size + + where_clause = "" + params = {} + + if filtro_ativo is not None: + where_clause = "AND ATIVO = :ativo" + params["ativo"] = "S" if filtro_ativo else "N" + + query = f""" + SELECT * FROM ( + SELECT + ID_PESSOA, + NOME, + POSICAO, + PONTUACAO_TOTAL, + COMPONENTE_A, + COMPONENTE_B, + COMPONENTE_C, + COMPONENTE_D, + ATIVO, + ANOS_ATUACAO, + DT_CALCULO, + JSON_DETALHES, + ROW_NUMBER() OVER (ORDER BY POSICAO NULLS LAST, PONTUACAO_TOTAL DESC) AS RN + FROM TB_RANKING_CONSULTOR + WHERE 1=1 {where_clause} + ) + WHERE RN > {offset} AND RN <= {limit_end} + """ + + results = self.client.executar_query(query, params) + + consultores = [] + for r in results: + json_det = r["JSON_DETALHES"] + if hasattr(json_det, "read"): + json_det = json_det.read() + else: + json_det = str(json_det) if json_det else "{}" + + consultores.append( + ConsultorRanking( + id_pessoa=r["ID_PESSOA"], + nome=r["NOME"], + posicao=r["POSICAO"], + pontuacao_total=float(r["PONTUACAO_TOTAL"]), + componente_a=float(r["COMPONENTE_A"]), + componente_b=float(r["COMPONENTE_B"]), + componente_c=float(r["COMPONENTE_C"]), + componente_d=float(r["COMPONENTE_D"]), + ativo=r["ATIVO"] == "S", + anos_atuacao=float(r["ANOS_ATUACAO"]), + dt_calculo=r["DT_CALCULO"], + json_detalhes=json_det + ) + ) + + return consultores + + def contar_total(self, filtro_ativo: Optional[bool] = None) -> int: + """ + Conta total de consultores no ranking. + """ + where_clause = "" + params = {} + + if filtro_ativo is not None: + where_clause = "WHERE ATIVO = :ativo" + params["ativo"] = "S" if filtro_ativo else "N" + + query = f"SELECT COUNT(*) AS TOTAL FROM TB_RANKING_CONSULTOR {where_clause}" + results = self.client.executar_query(query, params) + + return results[0]["TOTAL"] if results else 0 + + def buscar_por_id(self, id_pessoa: int) -> Optional[ConsultorRanking]: + """ + Busca consultor específico com sua posição no ranking. + """ + query = """ + SELECT + ID_PESSOA, + NOME, + POSICAO, + PONTUACAO_TOTAL, + COMPONENTE_A, + COMPONENTE_B, + COMPONENTE_C, + COMPONENTE_D, + ATIVO, + ANOS_ATUACAO, + DT_CALCULO, + JSON_DETALHES + FROM TB_RANKING_CONSULTOR + WHERE ID_PESSOA = :id_pessoa + """ + + results = self.client.executar_query(query, {"id_pessoa": id_pessoa}) + + if not results: + return None + + r = results[0] + + json_det = r["JSON_DETALHES"] + if hasattr(json_det, "read"): + json_det = json_det.read() + else: + json_det = str(json_det) if json_det else "{}" + + return ConsultorRanking( + id_pessoa=r["ID_PESSOA"], + nome=r["NOME"], + posicao=r["POSICAO"], + pontuacao_total=float(r["PONTUACAO_TOTAL"]), + componente_a=float(r["COMPONENTE_A"]), + componente_b=float(r["COMPONENTE_B"]), + componente_c=float(r["COMPONENTE_C"]), + componente_d=float(r["COMPONENTE_D"]), + ativo=r["ATIVO"] == "S", + anos_atuacao=float(r["ANOS_ATUACAO"]), + dt_calculo=r["DT_CALCULO"], + json_detalhes=json_det + ) + + def atualizar_posicoes(self) -> None: + """ + Chama a procedure SP_ATUALIZAR_POSICOES para recalcular as posições. + """ + with self.client.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.callproc("SP_ATUALIZAR_POSICOES") + conn.commit() + except Exception as e: + conn.rollback() + raise RuntimeError(f"Erro ao atualizar posições: {e}") + finally: + cursor.close() + + def obter_estatisticas(self) -> Dict[str, Any]: + """ + Retorna estatísticas do ranking. + """ + query = """ + SELECT + COUNT(*) AS TOTAL_CONSULTORES, + COUNT(CASE WHEN ATIVO = 'S' THEN 1 END) AS TOTAL_ATIVOS, + COUNT(CASE WHEN ATIVO = 'N' THEN 1 END) AS TOTAL_INATIVOS, + MAX(DT_CALCULO) AS ULTIMA_ATUALIZACAO, + AVG(PONTUACAO_TOTAL) AS PONTUACAO_MEDIA, + MAX(PONTUACAO_TOTAL) AS PONTUACAO_MAXIMA, + MIN(PONTUACAO_TOTAL) AS PONTUACAO_MINIMA, + AVG(COMPONENTE_A) AS MEDIA_COMP_A, + AVG(COMPONENTE_B) AS MEDIA_COMP_B, + AVG(COMPONENTE_C) AS MEDIA_COMP_C, + AVG(COMPONENTE_D) AS MEDIA_COMP_D + FROM TB_RANKING_CONSULTOR + """ + + results = self.client.executar_query(query) + + if not results: + return {} + + r = results[0] + return { + "total_consultores": r["TOTAL_CONSULTORES"], + "total_ativos": r["TOTAL_ATIVOS"], + "total_inativos": r["TOTAL_INATIVOS"], + "ultima_atualizacao": r["ULTIMA_ATUALIZACAO"].isoformat() if r["ULTIMA_ATUALIZACAO"] else None, + "pontuacao_media": float(r["PONTUACAO_MEDIA"]) if r["PONTUACAO_MEDIA"] else 0, + "pontuacao_maxima": float(r["PONTUACAO_MAXIMA"]) if r["PONTUACAO_MAXIMA"] else 0, + "pontuacao_minima": float(r["PONTUACAO_MINIMA"]) if r["PONTUACAO_MINIMA"] else 0, + "media_componentes": { + "a": float(r["MEDIA_COMP_A"]) if r["MEDIA_COMP_A"] else 0, + "b": float(r["MEDIA_COMP_B"]) if r["MEDIA_COMP_B"] else 0, + "c": float(r["MEDIA_COMP_C"]) if r["MEDIA_COMP_C"] else 0, + "d": float(r["MEDIA_COMP_D"]) if r["MEDIA_COMP_D"] else 0 + } + } + + def obter_distribuicao(self) -> List[Dict[str, Any]]: + """ + Retorna distribuição de consultores por faixa de pontuação. + """ + query = """ + SELECT + CASE + WHEN PONTUACAO_TOTAL >= 800 THEN '800+' + WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' + WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' + WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' + ELSE '0-199' + END AS FAIXA, + COUNT(*) AS QUANTIDADE, + ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM TB_RANKING_CONSULTOR), 2) AS PERCENTUAL + FROM TB_RANKING_CONSULTOR + GROUP BY + CASE + WHEN PONTUACAO_TOTAL >= 800 THEN '800+' + WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' + WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' + WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' + ELSE '0-199' + END + ORDER BY + CASE + WHEN FAIXA = '800+' THEN 1 + WHEN FAIXA = '600-799' THEN 2 + WHEN FAIXA = '400-599' THEN 3 + WHEN FAIXA = '200-399' THEN 4 + ELSE 5 + END + """ + + results = self.client.executar_query(query) + + return [ + { + "faixa": r["FAIXA"], + "quantidade": r["QUANTIDADE"], + "percentual": float(r["PERCENTUAL"]) + } + for r in results + ] + + def limpar_tabela(self) -> None: + """ + Limpa todos os registros da tabela de ranking. + Usar apenas quando for reprocessar do zero. + """ + with self.client.get_connection() as conn: + cursor = conn.cursor() + try: + cursor.execute("DELETE FROM TB_RANKING_CONSULTOR") + conn.commit() + except Exception as e: + conn.rollback() + raise RuntimeError(f"Erro ao limpar tabela: {e}") + finally: + cursor.close() diff --git a/backend/src/interface/api/app.py b/backend/src/interface/api/app.py index f3002c4..23b1daf 100644 --- a/backend/src/interface/api/app.py +++ b/backend/src/interface/api/app.py @@ -4,7 +4,8 @@ from contextlib import asynccontextmanager from .routes import router from .config import settings -from .dependencies import es_client, oracle_client +from .dependencies import es_client, oracle_client, get_processar_job +from ...application.jobs.scheduler import RankingScheduler @asynccontextmanager @@ -14,7 +15,24 @@ async def lifespan(app: FastAPI): oracle_client.connect() except Exception as e: print(f"AVISO: Oracle não conectou: {e}. Sistema rodando sem Coordenação PPG.") + + scheduler = None + try: + job = get_processar_job() + scheduler = RankingScheduler(job) + scheduler.iniciar() + print("Scheduler do ranking iniciado: job rodará diariamente às 3h") + except Exception as e: + print(f"AVISO: Scheduler não iniciou: {e}") + yield + + if scheduler: + try: + scheduler.parar() + except: + pass + await es_client.close() try: oracle_client.close() diff --git a/backend/src/interface/api/dependencies.py b/backend/src/interface/api/dependencies.py index 7f9e879..0e5a2b9 100644 --- a/backend/src/interface/api/dependencies.py +++ b/backend/src/interface/api/dependencies.py @@ -1,6 +1,8 @@ from ...infrastructure.elasticsearch.client import ElasticsearchClient from ...infrastructure.oracle.client import OracleClient +from ...infrastructure.oracle.ranking_repository import RankingOracleRepository from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl +from ...application.jobs.processar_ranking import ProcessarRankingJob from .config import settings @@ -16,6 +18,8 @@ oracle_client = OracleClient( ) _repository: ConsultorRepositoryImpl = None +_ranking_repository: RankingOracleRepository = None +_processar_job: ProcessarRankingJob = None def get_repository() -> ConsultorRepositoryImpl: @@ -23,3 +27,21 @@ def get_repository() -> ConsultorRepositoryImpl: if _repository is None: _repository = ConsultorRepositoryImpl(es_client=es_client, oracle_client=oracle_client) return _repository + + +def get_ranking_repository() -> RankingOracleRepository: + global _ranking_repository + if _ranking_repository is None: + _ranking_repository = RankingOracleRepository(oracle_client=oracle_client) + return _ranking_repository + + +def get_processar_job() -> ProcessarRankingJob: + global _processar_job + if _processar_job is None: + _processar_job = ProcessarRankingJob( + es_client=es_client, + oracle_client=oracle_client, + ranking_repo=get_ranking_repository() + ) + return _processar_job diff --git a/backend/src/interface/api/routes.py b/backend/src/interface/api/routes.py index 03b9ffc..0e74270 100644 --- a/backend/src/interface/api/routes.py +++ b/backend/src/interface/api/routes.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks from typing import Optional from ...application.use_cases.obter_ranking import ObterRankingUseCase @@ -10,7 +10,16 @@ from ..schemas.consultor_schema import ( ConsultorDetalhadoSchema, ConsultorResumoSchema, ) -from .dependencies import get_repository +from ..schemas.ranking_schema import ( + RankingPaginadoResponseSchema, + ConsultorRankingResumoSchema, + EstatisticasRankingSchema, + JobStatusSchema, + ProcessarRankingRequestSchema, + ProcessarRankingResponseSchema, +) +from .dependencies import get_repository, get_ranking_repository, get_processar_job +from ...application.jobs.job_status import job_status router = APIRouter(prefix="/api/v1", tags=["ranking"]) @@ -75,3 +84,88 @@ async def obter_consultor( @router.get("/health") async def health_check(): return {"status": "ok", "message": "API Ranking CAPES funcionando"} + + +@router.get("/ranking/paginado", response_model=RankingPaginadoResponseSchema) +async def ranking_paginado( + page: int = Query(default=1, ge=1, description="Número da página"), + size: int = Query(default=50, ge=1, le=100, description="Tamanho da página"), + ativo: Optional[bool] = Query(default=None, description="Filtrar por status ativo"), + ranking_repo = Depends(get_ranking_repository), +): + """ + Retorna ranking paginado do Oracle (pré-calculado). + """ + total = ranking_repo.contar_total(filtro_ativo=ativo) + consultores = ranking_repo.buscar_paginado(page=page, size=size, filtro_ativo=ativo) + + total_pages = (total + size - 1) // size + + consultores_schema = [ + ConsultorRankingResumoSchema( + id_pessoa=c.id_pessoa, + nome=c.nome, + posicao=c.posicao, + pontuacao_total=c.pontuacao_total, + componente_a=c.componente_a, + componente_b=c.componente_b, + componente_c=c.componente_c, + componente_d=c.componente_d, + ativo=c.ativo, + anos_atuacao=c.anos_atuacao + ) + for c in consultores + ] + + return RankingPaginadoResponseSchema( + total=total, + page=page, + size=size, + total_pages=total_pages, + consultores=consultores_schema + ) + + +@router.get("/ranking/estatisticas", response_model=EstatisticasRankingSchema) +async def ranking_estatisticas( + ranking_repo = Depends(get_ranking_repository), +): + """ + Retorna estatísticas do ranking. + """ + estatisticas = ranking_repo.obter_estatisticas() + distribuicao = ranking_repo.obter_distribuicao() + + return EstatisticasRankingSchema( + **estatisticas, + distribuicao=distribuicao + ) + + +@router.get("/ranking/status", response_model=JobStatusSchema) +async def status_processamento(): + """ + Retorna o status do job de processamento do ranking. + """ + return JobStatusSchema(**job_status.to_dict()) + + +@router.post("/ranking/processar", response_model=ProcessarRankingResponseSchema) +async def processar_ranking( + background_tasks: BackgroundTasks, + request: ProcessarRankingRequestSchema = ProcessarRankingRequestSchema(), + job = Depends(get_processar_job), +): + """ + Dispara o processamento do ranking em background. + """ + if job_status.is_running: + raise HTTPException(status_code=409, detail="Job já está em execução") + + background_tasks.add_task(job.executar, limpar_antes=request.limpar_antes) + + return ProcessarRankingResponseSchema( + sucesso=True, + mensagem="Processamento do ranking iniciado em background", + job_id="ranking_job" + ) diff --git a/backend/src/interface/schemas/ranking_schema.py b/backend/src/interface/schemas/ranking_schema.py new file mode 100644 index 0000000..559c9f9 --- /dev/null +++ b/backend/src/interface/schemas/ranking_schema.py @@ -0,0 +1,61 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime + + +class ConsultorRankingResumoSchema(BaseModel): + id_pessoa: int + nome: str + posicao: Optional[int] + pontuacao_total: float + componente_a: float + componente_b: float + componente_c: float + componente_d: float + ativo: bool + anos_atuacao: float + + +class RankingPaginadoResponseSchema(BaseModel): + total: int + page: int + size: int + total_pages: int + consultores: List[ConsultorRankingResumoSchema] + + +class EstatisticasRankingSchema(BaseModel): + total_consultores: int + total_ativos: int + total_inativos: int + ultima_atualizacao: Optional[str] + pontuacao_media: float + pontuacao_maxima: float + pontuacao_minima: float + media_componentes: dict + distribuicao: List[dict] + + +class JobStatusSchema(BaseModel): + running: bool + progress: int + processados: int + total: int + mensagem: str + batch_atual: int + total_batches: int + tempo_decorrido: Optional[str] + tempo_estimado: Optional[str] + inicio: Optional[str] + fim: Optional[str] + erro: Optional[str] + + +class ProcessarRankingRequestSchema(BaseModel): + limpar_antes: bool = Field(default=True, description="Se deve limpar a tabela antes de processar") + + +class ProcessarRankingResponseSchema(BaseModel): + sucesso: bool + mensagem: str + job_id: Optional[str] = None diff --git a/docker-compose.yml b/docker-compose.yml index 1716692..5a2c89f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,9 @@ services: volumes: - ./backend/src:/app/src - /etc/localtime:/etc/localtime:ro + depends_on: + oracle18c: + condition: service_healthy networks: - shared_network restart: unless-stopped @@ -40,7 +43,33 @@ services: networks: - shared_network restart: unless-stopped + + oracle18c: + container_name: mqapilc_oracle18c + image: gvenzl/oracle-xe:18-slim + environment: + - ORACLE_PASSWORD=local123 + - ORACLE_CHARACTERSET=AL32UTF8 + - APP_USER=locasl123 + - APP_USER_PASSWORD=local123 + - TZ=America/Sao_Paulo + ports: + - "1521:1521" + - "5500:5500" + volumes: + - oracle_data:/opt/oracle/oradata + healthcheck: + test: ["CMD", "bash", "-c", "echo 'SELECT 1 FROM DUAL;' | sqlplus -s SYSTEM/\"$${ORACLE_PASSWORD}\"@localhost:1521/XEPDB1"] + interval: 30s + timeout: 10s + retries: 20 + networks: + - shared_network networks: shared_network: external: true + +volumes: + oracle_data: + driver: local \ No newline at end of file diff --git a/frontend/src/App.css b/frontend/src/App.css index abd08dd..c562dea 100644 --- a/frontend/src/App.css +++ b/frontend/src/App.css @@ -46,6 +46,36 @@ background: var(--accent-2); } +.mode-selector { + display: flex; + gap: 0.5rem; + margin: 1.5rem 0; + justify-content: center; +} + +.mode-selector button { + padding: 0.75rem 1.5rem; + background: rgba(255,255,255,0.06); + border: 1px solid var(--stroke); + border-radius: 8px; + color: var(--muted); + font-size: 0.95rem; + font-weight: 500; + cursor: pointer; + transition: all 200ms; +} + +.mode-selector button:hover { + border-color: var(--accent-2); + color: var(--text); +} + +.mode-selector button.active { + background: var(--accent); + border-color: var(--accent); + color: white; +} + .controls { margin: 1.5rem 0; display: flex; diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index baa5c29..12d7b67 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -1,6 +1,7 @@ import { useState, useEffect } from 'react'; import Header from './components/Header'; import ConsultorCard from './components/ConsultorCard'; +import RankingPaginado from './components/RankingPaginado'; import { rankingService } from './services/api'; import './App.css'; @@ -10,6 +11,7 @@ function App() { const [error, setError] = useState(null); const [total, setTotal] = useState(0); const [limite, setLimite] = useState(10); + const [modo, setModo] = useState('completo'); useEffect(() => { loadRanking(); @@ -54,24 +56,45 @@ function App() {
-
- +
+ +
-
- {consultores.map((consultor) => ( - - ))} -
+ {modo === 'top' ? ( + <> +
+ +
+ +
+ {consultores.map((consultor) => ( + + ))} +
+ + ) : ( + + )}