from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from typing import Optional from .processar_ranking import ProcessarRankingJob class RankingScheduler: def __init__(self, job: ProcessarRankingJob): self.job = job self.scheduler: Optional[AsyncIOScheduler] = None def iniciar(self) -> None: """ Inicia o scheduler e agenda o job para rodar diariamente às 3h. """ if self.scheduler and self.scheduler.running: return self.scheduler = AsyncIOScheduler() self.scheduler.add_job( self.job.executar, trigger=CronTrigger(hour=3, minute=0), id='ranking_diario', name='Processamento diário do ranking de consultores', replace_existing=True, kwargs={"limpar_antes": True} ) self.scheduler.start() def parar(self) -> None: """ Para o scheduler. """ if self.scheduler and self.scheduler.running: self.scheduler.shutdown(wait=False) def executar_agora(self) -> None: """ Executa o job imediatamente (fora do agendamento). """ if self.scheduler: self.scheduler.add_job( self.job.executar, id='ranking_manual', replace_existing=True, kwargs={"limpar_antes": True} )