feat(tests): adicionar suite completa de testes automatizados

- 198 testes cobrindo todos os módulos do backend
- Testes unitários para calculador de pontuação (56 testes)
- Testes para value objects de período (23 testes)
- Testes para cliente Elasticsearch com mocks (27 testes)
- Testes para repository de consultores (48 testes)
- Testes de integração ES + Repository (6 testes)
- Testes para API routes FastAPI (23 testes)
- Testes para job de processamento (16 testes)
- Cobertura de 54% do código
This commit is contained in:
Frederico Castro
2025-12-29 08:06:08 -03:00
parent e0692ee49c
commit 143ec401f5
19 changed files with 2899 additions and 0 deletions

View File

@@ -0,0 +1,278 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime
from src.application.jobs.processar_ranking import ProcessarRankingJob
from src.infrastructure.ranking_store import RankingEntry
@pytest.fixture
def mock_es_client():
client = AsyncMock()
client.contar_com_atuacoes.return_value = 100
client.buscar_todos_consultores.return_value = {"processados": 100, "batches": 10}
return client
@pytest.fixture
def mock_ranking_store():
store = AsyncMock()
store.set_entries = AsyncMock()
return store
@pytest.fixture
def mock_oracle_repo():
repo = MagicMock()
repo.limpar_tabela = MagicMock()
repo.inserir_batch = MagicMock()
repo.atualizar_posicoes = MagicMock()
return repo
@pytest.fixture
def job(mock_es_client, mock_ranking_store, mock_oracle_repo):
return ProcessarRankingJob(
es_client=mock_es_client,
ranking_store=mock_ranking_store,
ranking_oracle_repo=mock_oracle_repo,
)
class TestGerarEntriesOrdenadas:
def test_lista_vazia(self):
entries = ProcessarRankingJob._gerar_entries_ordenadas([])
assert len(entries) == 0
def test_ordenacao_por_pontuacao(self):
consultores = [
{"id_pessoa": 1, "nome": "A", "pontuacao_total": 100, "bloco_a": 50, "bloco_b": 20, "bloco_c": 20, "bloco_d": 5, "bloco_e": 5, "ativo": True, "anos_atuacao": 5},
{"id_pessoa": 2, "nome": "B", "pontuacao_total": 300, "bloco_a": 150, "bloco_b": 80, "bloco_c": 50, "bloco_d": 10, "bloco_e": 10, "ativo": True, "anos_atuacao": 10},
{"id_pessoa": 3, "nome": "C", "pontuacao_total": 200, "bloco_a": 100, "bloco_b": 50, "bloco_c": 30, "bloco_d": 10, "bloco_e": 10, "ativo": False, "anos_atuacao": 3},
]
entries = ProcessarRankingJob._gerar_entries_ordenadas(consultores)
assert len(entries) == 3
assert entries[0].id_pessoa == 2
assert entries[0].posicao == 1
assert entries[0].pontuacao_total == 300
assert entries[1].id_pessoa == 3
assert entries[1].posicao == 2
assert entries[2].id_pessoa == 1
assert entries[2].posicao == 3
def test_posicoes_atribuidas_sequencialmente(self):
consultores = [
{"id_pessoa": i, "nome": f"C{i}", "pontuacao_total": 100 - i, "bloco_a": 50, "bloco_b": 20, "bloco_c": 20, "bloco_d": 5, "bloco_e": 5, "ativo": True, "anos_atuacao": 5}
for i in range(1, 11)
]
entries = ProcessarRankingJob._gerar_entries_ordenadas(consultores)
for i, entry in enumerate(entries, start=1):
assert entry.posicao == i
def test_empate_desempata_por_id(self):
consultores = [
{"id_pessoa": 10, "nome": "A", "pontuacao_total": 100, "bloco_a": 50, "bloco_b": 20, "bloco_c": 20, "bloco_d": 5, "bloco_e": 5, "ativo": True, "anos_atuacao": 5},
{"id_pessoa": 5, "nome": "B", "pontuacao_total": 100, "bloco_a": 50, "bloco_b": 20, "bloco_c": 20, "bloco_d": 5, "bloco_e": 5, "ativo": True, "anos_atuacao": 5},
]
entries = ProcessarRankingJob._gerar_entries_ordenadas(consultores)
assert entries[0].id_pessoa == 5
assert entries[1].id_pessoa == 10
class TestObterEstatisticas:
def test_lista_vazia(self):
stats = ProcessarRankingJob._obter_estatisticas([])
assert stats["total_consultores"] == 0
assert stats["pontuacao_media"] == 0
assert stats["pontuacao_maxima"] == 0
def test_estatisticas_calculadas(self):
entries = [
RankingEntry(id_pessoa=1, nome="A", posicao=1, pontuacao_total=300, bloco_a=100, bloco_b=80, bloco_c=60, bloco_d=40, bloco_e=20, ativo=True, anos_atuacao=5, detalhes={}),
RankingEntry(id_pessoa=2, nome="B", posicao=2, pontuacao_total=200, bloco_a=80, bloco_b=50, bloco_c=40, bloco_d=20, bloco_e=10, ativo=True, anos_atuacao=3, detalhes={}),
RankingEntry(id_pessoa=3, nome="C", posicao=3, pontuacao_total=100, bloco_a=40, bloco_b=30, bloco_c=20, bloco_d=5, bloco_e=5, ativo=False, anos_atuacao=2, detalhes={}),
]
stats = ProcessarRankingJob._obter_estatisticas(entries)
assert stats["total_consultores"] == 3
assert stats["total_ativos"] == 2
assert stats["total_inativos"] == 1
assert stats["pontuacao_maxima"] == 300
assert stats["pontuacao_minima"] == 100
assert stats["pontuacao_media"] == 200.0
def test_media_componentes(self):
entries = [
RankingEntry(id_pessoa=1, nome="A", posicao=1, pontuacao_total=300, bloco_a=100, bloco_b=50, bloco_c=80, bloco_d=40, bloco_e=30, ativo=True, anos_atuacao=5, detalhes={}),
RankingEntry(id_pessoa=2, nome="B", posicao=2, pontuacao_total=200, bloco_a=80, bloco_b=30, bloco_c=60, bloco_d=20, bloco_e=10, ativo=True, anos_atuacao=3, detalhes={}),
]
stats = ProcessarRankingJob._obter_estatisticas(entries)
assert stats["media_componentes"]["a"] == 90.0
assert stats["media_componentes"]["b"] == 40.0
class TestGerarJsonDetalhes:
def test_gerar_json_consultor_completo(self, job):
from src.domain.entities.consultor import Consultor, CoordenacaoCapes, Consultoria
from src.domain.value_objects.periodo import Periodo
from src.domain.value_objects.pontuacao import PontuacaoBloco, PontuacaoCompleta
periodo = Periodo(inicio=datetime(2020, 1, 1), fim=None)
consultor = Consultor(
id_pessoa=123,
nome="João Silva",
coordenacoes_capes=[
CoordenacaoCapes(
codigo="CA",
tipo="Coordenador de Área",
area_avaliacao="CIÊNCIAS",
periodo=periodo,
)
],
consultoria=Consultoria(
codigo="CONS_ATIVO",
situacao="Atividade Contínua",
periodo=periodo,
anos_consecutivos=5,
retornos=0,
),
)
consultor.pontuacao = PontuacaoCompleta(
bloco_a=PontuacaoBloco(bloco="A", atuacoes=[]),
bloco_b=PontuacaoBloco(bloco="B", atuacoes=[]),
bloco_c=PontuacaoBloco(bloco="C", atuacoes=[]),
bloco_d=PontuacaoBloco(bloco="D", atuacoes=[]),
bloco_e=PontuacaoBloco(bloco="E", atuacoes=[]),
)
result = job._gerar_json_detalhes(consultor)
assert result["id_pessoa"] == 123
assert result["nome"] == "João Silva"
assert len(result["coordenacoes_capes"]) == 1
assert result["consultoria"] is not None
assert result["consultoria"]["codigo"] == "CONS_ATIVO"
def test_gerar_json_consultor_sem_consultoria(self, job):
from src.domain.entities.consultor import Consultor
from src.domain.value_objects.pontuacao import PontuacaoBloco, PontuacaoCompleta
consultor = Consultor(
id_pessoa=456,
nome="Maria Santos",
)
consultor.pontuacao = PontuacaoCompleta(
bloco_a=PontuacaoBloco(bloco="A", atuacoes=[]),
bloco_b=PontuacaoBloco(bloco="B", atuacoes=[]),
bloco_c=PontuacaoBloco(bloco="C", atuacoes=[]),
bloco_d=PontuacaoBloco(bloco="D", atuacoes=[]),
bloco_e=PontuacaoBloco(bloco="E", atuacoes=[]),
)
result = job._gerar_json_detalhes(consultor)
assert result["id_pessoa"] == 456
assert result["consultoria"] is None
class TestProcessarBatch:
@pytest.mark.asyncio
async def test_processar_batch_atualiza_progresso(self, job):
with patch.object(job.consultor_repo, "_construir_consultor") as mock_construir:
from src.domain.entities.consultor import Consultor
from src.domain.value_objects.pontuacao import PontuacaoBloco, PontuacaoCompleta
consultor = Consultor(id_pessoa=1, nome="Test")
consultor.pontuacao = PontuacaoCompleta(
bloco_a=PontuacaoBloco(bloco="A", atuacoes=[]),
bloco_b=PontuacaoBloco(bloco="B", atuacoes=[]),
bloco_c=PontuacaoBloco(bloco="C", atuacoes=[]),
bloco_d=PontuacaoBloco(bloco="D", atuacoes=[]),
bloco_e=PontuacaoBloco(bloco="E", atuacoes=[]),
)
mock_construir.return_value = consultor
docs = [{"id": 1, "dadosPessoais": {"nome": "Test"}, "atuacoes": []}]
progress = {"processados": 1, "batch_atual": 1, "percentual": 10}
with patch("src.application.jobs.processar_ranking.job_status"):
await job._processar_batch(docs, progress)
assert len(job._consultores) == 1
class TestExecutar:
@pytest.mark.asyncio
async def test_executar_job_ja_rodando(self, job):
with patch("src.application.jobs.processar_ranking.job_status") as mock_status:
mock_status.is_running = True
with pytest.raises(RuntimeError, match="Job já está em execução"):
await job.executar()
@pytest.mark.asyncio
async def test_executar_sucesso(self, job, mock_es_client, mock_ranking_store):
with patch("src.application.jobs.processar_ranking.job_status") as mock_status:
mock_status.is_running = False
mock_status.iniciar = MagicMock()
mock_status.atualizar_progresso = MagicMock()
mock_status.finalizar = MagicMock()
mock_status.mensagem = ""
mock_status.tempo_decorrido = 10.5
job._consultores = [
{"id_pessoa": 1, "nome": "Test", "pontuacao_total": 100, "bloco_a": 50, "bloco_b": 20, "bloco_c": 20, "bloco_d": 5, "bloco_e": 5, "ativo": True, "anos_atuacao": 5}
]
resultado = await job.executar()
assert resultado["sucesso"] is True
mock_status.iniciar.assert_called_once()
mock_status.finalizar.assert_called_once_with(sucesso=True)
@pytest.mark.asyncio
async def test_executar_erro(self, job, mock_es_client):
with patch("src.application.jobs.processar_ranking.job_status") as mock_status:
mock_status.is_running = False
mock_status.iniciar = MagicMock()
mock_status.finalizar = MagicMock()
mock_es_client.contar_com_atuacoes.side_effect = Exception("ES Error")
with pytest.raises(RuntimeError):
await job.executar()
mock_status.finalizar.assert_called_once()
args = mock_status.finalizar.call_args
assert args[1]["sucesso"] is False
class TestPersistirOracle:
@pytest.mark.asyncio
async def test_persistir_oracle_limpa_antes(self, job, mock_oracle_repo):
consultores = [
{"id_pessoa": 1, "nome": "Test", "pontuacao_total": 100, "posicao": 1}
]
await job._persistir_oracle(consultores, limpar_antes=True)
mock_oracle_repo.limpar_tabela.assert_called_once()
mock_oracle_repo.inserir_batch.assert_called()
mock_oracle_repo.atualizar_posicoes.assert_called_once()
@pytest.mark.asyncio
async def test_persistir_oracle_sem_limpar(self, job, mock_oracle_repo):
consultores = [
{"id_pessoa": 1, "nome": "Test", "pontuacao_total": 100, "posicao": 1}
]
await job._persistir_oracle(consultores, limpar_antes=False)
mock_oracle_repo.limpar_tabela.assert_not_called()
@pytest.mark.asyncio
async def test_persistir_oracle_em_batches(self, job, mock_oracle_repo):
consultores = [{"id_pessoa": i, "nome": f"Test{i}", "pontuacao_total": 100} for i in range(5000)]
await job._persistir_oracle(consultores, limpar_antes=True)
assert mock_oracle_repo.inserir_batch.call_count == 3