feat: Sistema de Ranking de Consultores CAPES - versão inicial

Backend (FastAPI + DDD):
- Arquitetura DDD com camadas Domain, Application, Infrastructure, Interface
- Integração com Elasticsearch (ATUACAPES) para dados de consultores
- Integração com Oracle (SUCUPIRA_PAINEL) para coordenações PPG
- Cálculo dos 4 componentes de pontuação (A, B, C, D)
- Cache em memória para otimização de performance
- API REST com endpoints /ranking, /ranking/detalhado, /consultor/{id}

Frontend (React + Vite):
- Interface responsiva com cards expansíveis
- Visualização detalhada de pontuação por componente
- Filtro por quantidade de consultores (Top 10, 50, 100, etc)

Docker:
- docker-compose com shared_network externa
- Backend com Oracle Instant Client
- Frontend com Vite dev server
This commit is contained in:
Frederico Castro
2025-12-09 01:24:35 -03:00
commit 9e6ba459a8
69 changed files with 4902 additions and 0 deletions

View File

@@ -0,0 +1,112 @@
import cx_Oracle
from typing import List, Dict, Any, Optional
from contextlib import contextmanager
class OracleClient:
def __init__(self, user: str, password: str, dsn: str):
self.user = user
self.password = password
self.dsn = dsn
self._pool: Optional[cx_Oracle.SessionPool] = None
self._connected = False
def connect(self) -> None:
try:
self._pool = cx_Oracle.SessionPool(
user=self.user,
password=self.password,
dsn=self.dsn,
min=2,
max=10,
increment=1,
encoding="UTF-8",
)
self._connected = True
except Exception as e:
print(f"AVISO Oracle: {e}")
self._connected = False
def close(self) -> None:
if self._pool:
try:
self._pool.close()
except:
pass
@property
def is_connected(self) -> bool:
return self._connected and self._pool is not None
@contextmanager
def get_connection(self):
if not self._pool:
raise RuntimeError("Pool Oracle não conectado. Execute connect() primeiro.")
conn = self._pool.acquire()
try:
yield conn
finally:
self._pool.release(conn)
def executar_query(self, query: str, params: Optional[dict] = None) -> List[Dict[str, Any]]:
if not self.is_connected:
return []
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params or {})
columns = [col[0] for col in cursor.description]
rows = cursor.fetchall()
cursor.close()
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"AVISO Oracle: falha ao executar query: {e}")
self._connected = False
return []
def buscar_coordenacoes_programa(self, id_pessoa: int) -> List[Dict[str, Any]]:
query = """
SELECT
c.ID_PESSOA,
c.ID_PROGRAMA_SNPG,
p.NM_PROGRAMA,
p.CD_PROGRAMA_PPG,
p.DS_CONCEITO AS NOTA_PPG,
p.NM_PROGRAMA_MODALIDADE,
aa.NM_AREA_AVALIACAO,
c.DT_INICIO_VIGENCIA,
c.DT_FIM_VIGENCIA
FROM SUCUPIRA_PAINEL.VM_COORDENADOR c
INNER JOIN SUCUPIRA_PAINEL.VM_PROGRAMA_SUCUPIRA p
ON c.ID_PROGRAMA_SNPG = p.ID_PROGRAMA
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_CONHECIMENTO ac
ON p.ID_AREA_CONHECIMENTO_ATUAL = ac.ID_AREA_CONHECIMENTO
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_AVALIACAO aa
ON ac.ID_AREA_AVALIACAO = aa.ID_AREA_AVALIACAO
WHERE c.ID_PESSOA = :id_pessoa
ORDER BY c.DT_INICIO_VIGENCIA DESC
"""
return self.executar_query(query, {"id_pessoa": id_pessoa})
def buscar_todas_coordenacoes_programa(self) -> List[Dict[str, Any]]:
query = """
SELECT
c.ID_PESSOA,
c.ID_PROGRAMA_SNPG,
p.NM_PROGRAMA,
p.CD_PROGRAMA_PPG,
p.DS_CONCEITO AS NOTA_PPG,
p.NM_PROGRAMA_MODALIDADE,
aa.NM_AREA_AVALIACAO,
c.DT_INICIO_VIGENCIA,
c.DT_FIM_VIGENCIA
FROM SUCUPIRA_PAINEL.VM_COORDENADOR c
INNER JOIN SUCUPIRA_PAINEL.VM_PROGRAMA_SUCUPIRA p
ON c.ID_PROGRAMA_SNPG = p.ID_PROGRAMA
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_CONHECIMENTO ac
ON p.ID_AREA_CONHECIMENTO_ATUAL = ac.ID_AREA_CONHECIMENTO
LEFT JOIN SUCUPIRA_PAINEL.VM_AREA_AVALIACAO aa
ON ac.ID_AREA_AVALIACAO = aa.ID_AREA_AVALIACAO
ORDER BY c.ID_PESSOA, c.DT_INICIO_VIGENCIA DESC
"""
return self.executar_query(query)