import asyncio
import html
import unicodedata
from io import BytesIO
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List
def normalizar_texto(texto: str) -> str:
if not texto:
return ""
texto = html.unescape(texto)
texto = unicodedata.normalize('NFD', texto)
texto = ''.join(c for c in texto if unicodedata.category(c) != 'Mn')
return texto.lower()
from ...application.use_cases.obter_ranking import ObterRankingUseCase
from ...application.use_cases.obter_consultor import ObterConsultorUseCase
from ...infrastructure.repositories.consultor_repository_impl import ConsultorRepositoryImpl
from ...application.mappers.ranking_mapper import RankingMapper
from ..schemas.consultor_schema import (
RankingResponseSchema,
RankingDetalhadoResponseSchema,
ConsultorDetalhadoSchema,
ConsultorResumoSchema,
)
from ..schemas.ranking_schema import (
RankingPaginadoResponseSchema,
ConsultorRankingResumoSchema,
EstatisticasRankingSchema,
JobStatusSchema,
ProcessarRankingRequestSchema,
ProcessarRankingResponseSchema,
ConsultaNomeSchema,
PosicaoRankingSchema,
SugestaoConsultorSchema,
SugerirConsultoresResponseSchema,
AreaAvaliacaoSchema,
)
from .dependencies import get_repository, get_ranking_store, get_processar_job, get_es_client
from ...infrastructure.elasticsearch.client import ElasticsearchClient
from ...application.jobs.job_status import job_status
router = APIRouter(prefix="/api/v1", tags=["ranking"])
@router.get("/ranking", response_model=RankingResponseSchema)
async def obter_ranking(
limite: int = Query(default=100, ge=1, le=1000, description="Limite de consultores"),
offset: int = Query(default=0, ge=0, description="Offset para paginação"),
componente: Optional[str] = Query(
default=None, description="Filtrar por bloco (a, c, d)"
),
repository: ConsultorRepositoryImpl = Depends(get_repository),
store = Depends(get_ranking_store),
):
if store.is_ready():
total, entries = store.get_slice(offset=offset, limit=limite)
consultores_schema = [
ConsultorResumoSchema(
id_pessoa=e.id_pessoa,
nome=e.nome,
anos_atuacao=e.anos_atuacao,
ativo=e.ativo,
veterano=e.anos_atuacao >= 10,
pontuacao_total=e.pontuacao_total,
bloco_a=e.bloco_a,
bloco_c=e.bloco_c,
bloco_d=e.bloco_d,
rank=e.posicao,
)
for e in entries
]
return RankingResponseSchema(
total=total, limite=limite, offset=offset, consultores=consultores_schema
)
use_case = ObterRankingUseCase(repository=repository)
consultores_dto = await use_case.executar(limite=limite, componente=componente)
total = await repository.contar_total()
consultores_schema = [ConsultorResumoSchema(**vars(dto)) for dto in consultores_dto]
return RankingResponseSchema(total=total, limite=limite, offset=offset, consultores=consultores_schema)
@router.get("/ranking/detalhado", response_model=RankingDetalhadoResponseSchema)
async def obter_ranking_detalhado(
limite: int = Query(default=100, ge=1, le=1000, description="Limite de consultores"),
componente: Optional[str] = Query(
default=None, description="Filtrar por bloco (a, c, d)"
),
repository: ConsultorRepositoryImpl = Depends(get_repository),
):
use_case = ObterRankingUseCase(repository=repository)
consultores_dto = await use_case.executar_detalhado(limite=limite, componente=componente)
total = await repository.contar_total()
consultores_schema = [
ConsultorDetalhadoSchema(**dto.to_dict()) for dto in consultores_dto
]
return RankingDetalhadoResponseSchema(total=total, limite=limite, consultores=consultores_schema)
@router.get("/consultor/{id_pessoa}", response_model=ConsultorDetalhadoSchema)
async def obter_consultor(
id_pessoa: int,
repository: ConsultorRepositoryImpl = Depends(get_repository),
store = Depends(get_ranking_store),
):
use_case = ObterConsultorUseCase(repository=repository)
rank = None
if store.is_ready():
found = store.get_by_id(id_pessoa)
rank = found.posicao if found else None
consultor = await use_case.executar(id_pessoa=id_pessoa, rank=rank)
if not consultor:
raise HTTPException(status_code=404, detail=f"Consultor {id_pessoa} não encontrado")
return consultor
@router.get("/health")
async def health_check():
return {"status": "ok", "message": "API Ranking CAPES funcionando"}
@router.get("/ranking/selos")
async def listar_selos():
from ...infrastructure.ranking_store import SELOS_DISPONIVEIS
selos_info = {
"PRESID_CAMARA": {"label": "Presidente Câmara", "icone": "👑", "grupo": "funcoes"},
"COORD_PPG": {"label": "Coord. PPG", "icone": "🎓", "grupo": "funcoes"},
"BPQ": {"label": "Bolsista PQ", "icone": "🏅", "grupo": "funcoes"},
"AUTOR_GP": {"label": "Autor Grande Prêmio", "icone": "🏆", "grupo": "premiacoes"},
"AUTOR_PREMIO": {"label": "Autor Prêmio", "icone": "🥇", "grupo": "premiacoes"},
"AUTOR_MENCAO": {"label": "Autor Menção", "icone": "🥈", "grupo": "premiacoes"},
"ORIENT_GP": {"label": "Orientador GP", "icone": "🏆", "grupo": "premiacoes"},
"ORIENT_PREMIO": {"label": "Orientador Prêmio", "icone": "🎖️", "grupo": "premiacoes"},
"ORIENT_MENCAO": {"label": "Orientador Menção", "icone": "📜", "grupo": "premiacoes"},
"COORIENT_GP": {"label": "Coorientador GP", "icone": "🏆", "grupo": "premiacoes"},
"COORIENT_PREMIO": {"label": "Coorientador Prêmio", "icone": "🎖️", "grupo": "premiacoes"},
"COORIENT_MENCAO": {"label": "Coorientador Menção", "icone": "📜", "grupo": "premiacoes"},
"ORIENT_POS_DOC": {"label": "Orient. Pós-Doc", "icone": "🔬", "grupo": "orientacoes"},
"ORIENT_TESE": {"label": "Orient. Tese", "icone": "📚", "grupo": "orientacoes"},
"ORIENT_DISS": {"label": "Orient. Dissertação", "icone": "📄", "grupo": "orientacoes"},
"CO_ORIENT_POS_DOC": {"label": "Coorient. Pós-Doc", "icone": "🔬", "grupo": "coorientacoes"},
"CO_ORIENT_TESE": {"label": "Coorient. Tese", "icone": "📚", "grupo": "coorientacoes"},
"CO_ORIENT_DISS": {"label": "Coorient. Dissertação", "icone": "📄", "grupo": "coorientacoes"},
}
return {
"selos": [
{"codigo": s, **selos_info.get(s, {"label": s, "icone": "🏷️", "grupo": "outros"})}
for s in SELOS_DISPONIVEIS
]
}
@router.get("/ranking/paginado", response_model=RankingPaginadoResponseSchema)
async def ranking_paginado(
page: int = Query(default=1, ge=1, description="Número da página"),
size: int = Query(default=50, ge=1, le=1000, description="Tamanho da página (máx 1000)"),
ativo: Optional[bool] = Query(default=None, description="Filtrar por status ativo"),
selos: Optional[str] = Query(default=None, description="Filtrar por selos (separados por vírgula)"),
store = Depends(get_ranking_store),
):
if not store.is_ready():
raise HTTPException(
status_code=503,
detail="Ranking ainda não foi processado. Execute POST /api/v1/ranking/processar.",
)
filtro_selos = [s.strip() for s in selos.split(",") if s.strip()] if selos else None
total, entries = store.get_page(page=page, size=size, filtro_ativo=ativo, filtro_selos=filtro_selos)
total_pages = (total + size - 1) // size
consultores_schema = []
for e in entries:
d = e.detalhes
tipos_atuacao = RankingMapper._extrair_tipos_atuacao(d)
consultores_schema.append(
ConsultorRankingResumoSchema(
id_pessoa=e.id_pessoa,
nome=e.nome,
posicao=e.posicao,
pontuacao_total=float(e.pontuacao_total),
bloco_a=float(e.bloco_a),
bloco_b=float(e.bloco_b),
bloco_c=float(e.bloco_c),
bloco_d=float(e.bloco_d),
ativo=e.ativo,
anos_atuacao=float(e.anos_atuacao),
tipos_atuacao=tipos_atuacao,
coordenador_ppg=bool(d.get("coordenador_ppg", False)),
consultoria=d.get("consultoria"),
coordenacoes_capes=d.get("coordenacoes_capes"),
inscricoes=d.get("inscricoes"),
avaliacoes_comissao=d.get("avaliacoes_comissao"),
premiacoes=d.get("premiacoes"),
bolsas_cnpq=d.get("bolsas_cnpq"),
participacoes=d.get("participacoes"),
orientacoes=d.get("orientacoes"),
membros_banca=d.get("membros_banca"),
pontuacao=d.get("pontuacao"),
)
)
return RankingPaginadoResponseSchema(
total=total,
page=page,
size=size,
total_pages=total_pages,
consultores=consultores_schema
)
@router.get("/ranking/busca", response_model=List[ConsultaNomeSchema])
async def buscar_por_nome(
nome: str = Query(..., min_length=3, description="Nome (ou parte) para buscar"),
limit: int = Query(default=5, ge=1, le=20, description="Limite de resultados"),
store = Depends(get_ranking_store),
):
if not store.is_ready():
raise HTTPException(
status_code=503,
detail="Ranking ainda não foi processado. Execute POST /api/v1/ranking/processar.",
)
resultados = store.buscar_por_nome(nome=nome, limit=limit)
return [
ConsultaNomeSchema(
id_pessoa=r["ID_PESSOA"],
nome=r["NOME"],
posicao=r["POSICAO"],
pontuacao_total=float(r["PONTUACAO_TOTAL"]),
)
for r in resultados
]
@router.get("/ranking/estatisticas", response_model=EstatisticasRankingSchema)
async def ranking_estatisticas(
store = Depends(get_ranking_store),
):
if not store.is_ready():
raise HTTPException(
status_code=503,
detail="Ranking ainda não foi processado. Execute POST /api/v1/ranking/processar.",
)
total = store.total()
ativos = store.total(filtro_ativo=True)
inativos = total - ativos
entries = store.get_page(page=1, size=total)[1] if total else []
totais = [e.pontuacao_total for e in entries]
distribuicao = []
if total:
buckets = [
("800+", lambda x: x >= 800),
("600-799", lambda x: 600 <= x < 800),
("400-599", lambda x: 400 <= x < 600),
("200-399", lambda x: 200 <= x < 400),
("0-199", lambda x: x < 200),
]
for faixa, pred in buckets:
qtd = sum(1 for x in totais if pred(x))
distribuicao.append(
{
"faixa": faixa,
"quantidade": qtd,
"percentual": round((qtd * 100.0 / total), 2) if total else 0,
}
)
estatisticas = {
"total_consultores": total,
"total_ativos": ativos,
"total_inativos": inativos,
"ultima_atualizacao": store.last_update.isoformat() if store.last_update else None,
"pontuacao_media": (sum(totais) / total) if total else 0,
"pontuacao_maxima": max(totais) if totais else 0,
"pontuacao_minima": min(totais) if totais else 0,
"media_componentes": {
"a": (sum(e.bloco_a for e in entries) / total) if total else 0,
"b": (sum(e.bloco_b for e in entries) / total) if total else 0,
"c": (sum(e.bloco_c for e in entries) / total) if total else 0,
"d": (sum(e.bloco_d for e in entries) / total) if total else 0,
},
}
return EstatisticasRankingSchema(
total_consultores=estatisticas.get("total_consultores", 0),
total_ativos=estatisticas.get("total_ativos", 0),
total_inativos=estatisticas.get("total_inativos", 0),
ultima_atualizacao=estatisticas.get("ultima_atualizacao"),
pontuacao_media=estatisticas.get("pontuacao_media", 0),
pontuacao_maxima=estatisticas.get("pontuacao_maxima", 0),
pontuacao_minima=estatisticas.get("pontuacao_minima", 0),
media_blocos=estatisticas.get("media_componentes", {}),
distribuicao=distribuicao
)
@router.get("/ranking/status", response_model=JobStatusSchema)
async def status_processamento():
return JobStatusSchema(**job_status.to_dict())
@router.post("/ranking/processar", response_model=ProcessarRankingResponseSchema)
async def processar_ranking(
request: ProcessarRankingRequestSchema = ProcessarRankingRequestSchema(),
job = Depends(get_processar_job),
):
if job_status.is_running:
raise HTTPException(status_code=409, detail="Job já está em execução")
asyncio.create_task(job.executar(limpar_antes=request.limpar_antes))
return ProcessarRankingResponseSchema(
sucesso=True,
mensagem="Processamento do ranking iniciado em background",
job_id="ranking_job"
)
@router.get("/ranking/posicao/{id_pessoa}", response_model=PosicaoRankingSchema)
async def obter_posicao_ranking(
id_pessoa: int,
store = Depends(get_ranking_store),
):
if not store.is_ready():
raise HTTPException(
status_code=503,
detail="Ranking ainda não foi processado. Execute POST /api/v1/ranking/processar.",
)
entry = store.get_by_id(id_pessoa)
total = store.total()
if not entry:
return PosicaoRankingSchema(
id_pessoa=id_pessoa,
nome="",
posicao=None,
total_consultores=total,
pontuacao_total=0,
bloco_a=0,
bloco_b=0,
bloco_c=0,
bloco_d=0,
ativo=False,
encontrado=False,
)
return PosicaoRankingSchema(
id_pessoa=entry.id_pessoa,
nome=entry.nome,
posicao=entry.posicao,
total_consultores=total,
pontuacao_total=float(entry.pontuacao_total),
bloco_a=float(entry.bloco_a),
bloco_b=float(entry.bloco_b),
bloco_c=float(entry.bloco_c),
bloco_d=float(entry.bloco_d),
ativo=entry.ativo,
encontrado=True,
)
@router.get("/consultor/{id_pessoa}/raw")
async def obter_consultor_raw(
id_pessoa: int,
es_client: ElasticsearchClient = Depends(get_es_client),
):
try:
documento = await es_client.buscar_documento_completo(id_pessoa)
if not documento:
raise HTTPException(status_code=404, detail=f"Consultor {id_pessoa} não encontrado no Elasticsearch")
return documento
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/consultor/{id_pessoa}/pdf")
async def exportar_ficha_pdf(
id_pessoa: int,
repository: ConsultorRepositoryImpl = Depends(get_repository),
es_client: ElasticsearchClient = Depends(get_es_client),
store = Depends(get_ranking_store),
):
from ...application.services.pdf_service import PDFService
use_case = ObterConsultorUseCase(repository=repository)
rank = None
if store.is_ready():
found = store.get_by_id(id_pessoa)
rank = found.posicao if found else None
consultor = await use_case.executar(id_pessoa=id_pessoa, rank=rank)
if not consultor:
raise HTTPException(status_code=404, detail=f"Consultor {id_pessoa} não encontrado")
try:
pdf_service = PDFService()
raw_documento = await es_client.buscar_documento_completo(id_pessoa)
raw_source = raw_documento.get("_source") if raw_documento else {}
pdf_bytes = pdf_service.gerar_ficha_consultor(consultor, raw_source)
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Erro ao gerar PDF: {str(e)}")
nome_sanitizado = "".join(c if c.isalnum() or c in " -_" else "_" for c in consultor.nome)
nome_arquivo = f"ficha_consultor_{id_pessoa}_{nome_sanitizado[:30]}_{datetime.now().strftime('%Y%m%d')}.pdf"
return StreamingResponse(
BytesIO(pdf_bytes),
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{nome_arquivo}"'
}
)
class ConsultorEquipeSchema(BaseModel):
id_pessoa: int
nome: str
ies: Optional[str] = None
areas_avaliacao: List[str] = []
areas_conhecimento: List[str] = []
linhas_pesquisa: List[str] = []
situacao: str = ""
foi_coordenador: bool = False
foi_premiado: bool = False
posicao_ranking: Optional[int] = None
pontuacao_ranking: float = 0
motivos_match: List[str] = []
class GerarEquipePdfRequest(BaseModel):
tema: str
area_avaliacao: Optional[str] = None
consultores: List[ConsultorEquipeSchema]
@router.post("/equipe/pdf")
async def gerar_pdf_equipe(
request: GerarEquipePdfRequest,
):
from ...application.services.pdf_service import PDFService
if not request.consultores:
raise HTTPException(status_code=400, detail="Nenhum consultor selecionado")
try:
pdf_service = PDFService()
consultores_dict = [c.model_dump() for c in request.consultores]
pdf_bytes = pdf_service.gerar_pdf_equipe(
tema=request.tema,
area_avaliacao=request.area_avaliacao,
consultores=consultores_dict
)
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Erro ao gerar PDF: {str(e)}")
tema_sanitizado = "".join(c if c.isalnum() or c in " -_" else "_" for c in request.tema)
nome_arquivo = f"equipe_{tema_sanitizado[:30]}_{datetime.now().strftime('%Y%m%d')}.pdf"
return StreamingResponse(
BytesIO(pdf_bytes),
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="{nome_arquivo}"'
}
)
@router.get("/consultores/sugerir", response_model=SugerirConsultoresResponseSchema)
async def sugerir_consultores(
tema: str = Query(..., min_length=2, description="Tema ou assunto para buscar consultores"),
area_avaliacao: Optional[str] = Query(None, description="Filtrar por area de avaliacao especifica"),
apenas_ativos: bool = Query(True, description="Filtrar apenas consultores ativos"),
quantidade: int = Query(20, ge=1, le=100, description="Quantidade maxima de sugestoes"),
es_client: ElasticsearchClient = Depends(get_es_client),
store = Depends(get_ranking_store),
):
try:
resultados = await es_client.sugerir_consultores(
tema=tema,
area_avaliacao=area_avaliacao,
apenas_ativos=apenas_ativos,
size=quantidade * 3
)
consultores_raw = []
for doc in resultados:
id_pessoa = doc.get("id")
nome = doc.get("dadosPessoais", {}).get("nome", "")
score_match = doc.get("_score_match", 0)
areas_avaliacao = set()
areas_conhecimento = set()
linhas_pesquisa = set()
situacao = ""
ies = None
foi_coordenador = False
foi_premiado = False
motivos_match = set()
tema_norm = normalizar_texto(tema)
tema_palavras = tema_norm.split()
for atuacao in doc.get("atuacoes", []):
tipo = atuacao.get("tipo", "")
if tipo == "Consultor":
dados = atuacao.get("dadosConsultoria", {})
situacao = dados.get("situacaoConsultoria", "")
if dados.get("ies"):
ies = dados["ies"].get("sigla") or dados["ies"].get("nome")
for area in dados.get("areaConhecimentoPos", []):
if area.get("nome"):
area_nome = html.unescape(area["nome"])
areas_conhecimento.add(area_nome)
area_norm = normalizar_texto(area["nome"])
if tema_norm in area_norm or any(p in area_norm for p in tema_palavras if len(p) > 3):
motivos_match.add(f"Area: {area_nome}")
area_aval = area.get("areaAvaliacao", {})
if area_aval and area_aval.get("nome"):
aval_nome = html.unescape(area_aval["nome"])
areas_avaliacao.add(aval_nome)
aval_norm = normalizar_texto(area_aval["nome"])
if tema_norm in aval_norm or any(p in aval_norm for p in tema_palavras if len(p) > 3):
motivos_match.add(f"Area Avaliacao: {aval_nome}")
for pesq in dados.get("areaPesquisa", []):
if pesq.get("descricao"):
pesq_desc = html.unescape(pesq["descricao"])
linhas_pesquisa.add(pesq_desc)
pesq_norm = normalizar_texto(pesq["descricao"])
if tema_norm in pesq_norm or any(p in pesq_norm for p in tema_palavras if len(p) > 3):
motivos_match.add(f"Pesquisa: {pesq_desc[:50]}...")
elif "Coordenação" in tipo:
foi_coordenador = True
elif "Premiação" in tipo:
foi_premiado = True
posicao_ranking = None
pontuacao_ranking = 0
if store.is_ready():
entry = store.get_by_id(id_pessoa)
if entry:
posicao_ranking = entry.posicao
pontuacao_ranking = entry.pontuacao_total
score_final = score_match
if posicao_ranking:
bonus_ranking = max(0, (10000 - posicao_ranking) / 100)
score_final += bonus_ranking
if foi_coordenador:
score_final += 20
motivos_match.add("Foi Coordenador de Area")
if foi_premiado:
score_final += 10
motivos_match.add("Foi Premiado")
consultores_raw.append({
"id_pessoa": id_pessoa,
"nome": nome,
"score_match": score_match,
"score_final": score_final,
"posicao_ranking": posicao_ranking,
"pontuacao_ranking": pontuacao_ranking,
"areas_avaliacao": list(areas_avaliacao),
"areas_conhecimento": list(areas_conhecimento),
"linhas_pesquisa": list(linhas_pesquisa),
"situacao": situacao,
"ies": ies,
"foi_coordenador": foi_coordenador,
"foi_premiado": foi_premiado,
"motivos_match": list(motivos_match)[:5],
})
consultores_raw.sort(key=lambda x: x["score_final"], reverse=True)
ies_selecionadas = set()
consultores_finais = []
for c in consultores_raw:
if len(consultores_finais) >= quantidade:
break
if c["ies"] and c["ies"] in ies_selecionadas and len(consultores_finais) < quantidade // 2:
continue
consultores_finais.append(c)
if c["ies"]:
ies_selecionadas.add(c["ies"])
while len(consultores_finais) < quantidade and len(consultores_finais) < len(consultores_raw):
for c in consultores_raw:
if c not in consultores_finais:
consultores_finais.append(c)
if len(consultores_finais) >= quantidade:
break
consultores = [
SugestaoConsultorSchema(
id_pessoa=c["id_pessoa"],
nome=c["nome"],
score_match=c["score_final"],
areas_avaliacao=c["areas_avaliacao"],
areas_conhecimento=c["areas_conhecimento"],
linhas_pesquisa=c["linhas_pesquisa"],
situacao=c["situacao"],
ies=c["ies"],
foi_coordenador=c["foi_coordenador"],
foi_premiado=c["foi_premiado"],
posicao_ranking=c["posicao_ranking"],
pontuacao_ranking=c["pontuacao_ranking"],
motivos_match=c["motivos_match"],
)
for c in consultores_finais
]
return SugerirConsultoresResponseSchema(
tema_buscado=tema,
total_encontrados=len(consultores),
consultores=consultores
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao sugerir consultores: {str(e)}")
@router.get("/consultores/areas-avaliacao", response_model=List[AreaAvaliacaoSchema])
async def listar_areas_avaliacao(
es_client: ElasticsearchClient = Depends(get_es_client),
):
try:
areas = await es_client.listar_areas_avaliacao()
return [AreaAvaliacaoSchema(**a) for a in areas]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao listar areas de avaliacao: {str(e)}")