from typing import List, Optional, Dict, Any from datetime import datetime import json from ...domain.entities.consultor_ranking import ConsultorRanking from .client import OracleClient class RankingOracleRepository: def __init__(self, oracle_client: OracleClient): self.client = oracle_client def inserir_batch(self, consultores: List[Dict[str, Any]]) -> int: """ Insere ou atualiza um batch de consultores usando MERGE. Retorna o número de registros processados. """ import oracledb if not consultores: return 0 merge_sql = """ MERGE INTO TB_RANKING_CONSULTOR t USING ( SELECT :id_pessoa AS ID_PESSOA, :nome AS NOME, :pontuacao_total AS PONTUACAO_TOTAL, :componente_a AS COMPONENTE_A, :componente_b AS COMPONENTE_B, :componente_c AS COMPONENTE_C, :componente_d AS COMPONENTE_D, :ativo AS ATIVO, :anos_atuacao AS ANOS_ATUACAO, TO_CLOB(:json_detalhes) AS JSON_DETALHES FROM DUAL ) s ON (t.ID_PESSOA = s.ID_PESSOA) WHEN MATCHED THEN UPDATE SET t.NOME = s.NOME, t.PONTUACAO_TOTAL = s.PONTUACAO_TOTAL, t.COMPONENTE_A = s.COMPONENTE_A, t.COMPONENTE_B = s.COMPONENTE_B, t.COMPONENTE_C = s.COMPONENTE_C, t.COMPONENTE_D = s.COMPONENTE_D, t.ATIVO = s.ATIVO, t.ANOS_ATUACAO = s.ANOS_ATUACAO, t.DT_CALCULO = CURRENT_TIMESTAMP, t.JSON_DETALHES = s.JSON_DETALHES WHEN NOT MATCHED THEN INSERT ( ID_PESSOA, NOME, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D, ATIVO, ANOS_ATUACAO, JSON_DETALHES, DT_CALCULO ) VALUES ( s.ID_PESSOA, s.NOME, s.PONTUACAO_TOTAL, s.COMPONENTE_A, s.COMPONENTE_B, s.COMPONENTE_C, s.COMPONENTE_D, s.ATIVO, s.ANOS_ATUACAO, s.JSON_DETALHES, CURRENT_TIMESTAMP ) """ with self.client.get_connection() as conn: cursor = conn.cursor() try: for consultor in consultores: json_str = json.dumps(consultor, ensure_ascii=False) cursor.setinputsizes(json_detalhes=oracledb.DB_TYPE_CLOB) params = { "id_pessoa": consultor["id_pessoa"], "nome": consultor["nome"], "pontuacao_total": consultor["pontuacao_total"], "componente_a": consultor.get("bloco_a") or consultor.get("componente_a", 0), "componente_b": consultor.get("bloco_b") or consultor.get("componente_b", 0), "componente_c": consultor.get("bloco_c") or consultor.get("componente_c", 0), "componente_d": consultor.get("bloco_d") or consultor.get("componente_d", 0), "ativo": "S" if consultor.get("ativo") else "N", "anos_atuacao": consultor.get("anos_atuacao", 0), "json_detalhes": json_str } cursor.execute(merge_sql, params) conn.commit() return len(consultores) except Exception as e: conn.rollback() raise RuntimeError(f"Erro ao inserir batch no Oracle: {e}") finally: cursor.close() def buscar_paginado( self, page: int = 1, size: int = 50, filtro_ativo: Optional[bool] = None ) -> List[ConsultorRanking]: """ Busca ranking paginado ordenado por posição. """ if page < 1: page = 1 if size < 1 or size > 10000: size = 50 offset = (page - 1) * size limit_end = offset + size where_clause = "" params = { "offset": offset, "limit_end": limit_end, } if filtro_ativo is not None: where_clause = "AND ATIVO = :ativo" params["ativo"] = "S" if filtro_ativo else "N" query = f""" SELECT * FROM ( SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D, ATIVO, ANOS_ATUACAO, DT_CALCULO, JSON_DETALHES, ROW_NUMBER() OVER (ORDER BY POSICAO NULLS LAST, PONTUACAO_TOTAL DESC) AS RN FROM TB_RANKING_CONSULTOR WHERE 1=1 {where_clause} ) WHERE RN > :offset AND RN <= :limit_end """ results = self.client.executar_query(query, params) consultores = [] for r in results: json_det = r["JSON_DETALHES"] if hasattr(json_det, "read"): json_det = json_det.read() else: json_det = str(json_det) if json_det else "{}" consultores.append( ConsultorRanking( id_pessoa=r["ID_PESSOA"], nome=r["NOME"], posicao=r["POSICAO"], pontuacao_total=float(r["PONTUACAO_TOTAL"]), componente_a=float(r["COMPONENTE_A"]), componente_b=float(r["COMPONENTE_B"]), componente_c=float(r["COMPONENTE_C"]), componente_d=float(r["COMPONENTE_D"]), ativo=r["ATIVO"] == "S", anos_atuacao=float(r["ANOS_ATUACAO"]), dt_calculo=r["DT_CALCULO"], json_detalhes=json_det ) ) return consultores def contar_total(self, filtro_ativo: Optional[bool] = None) -> int: """ Conta total de consultores no ranking. """ where_clause = "" params = {} if filtro_ativo is not None: where_clause = "WHERE ATIVO = :ativo" params["ativo"] = "S" if filtro_ativo else "N" query = f"SELECT COUNT(*) AS TOTAL FROM TB_RANKING_CONSULTOR {where_clause}" results = self.client.executar_query(query, params) return results[0]["TOTAL"] if results else 0 def buscar_por_nome(self, nome: str, limit: int = 5) -> List[Dict[str, Any]]: """ Busca consultores por nome, procurando cada palavra separadamente. """ palavras = [p.strip() for p in nome.upper().split() if len(p.strip()) >= 2] if not palavras: return [] conditions = [] params = {"limit": limit} for i, palavra in enumerate(palavras): param_name = f"p{i}" conditions.append(f"UPPER(NOME) LIKE :{param_name}") params[param_name] = f"%{palavra}%" where_clause = " AND ".join(conditions) query = f""" SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL FROM TB_RANKING_CONSULTOR WHERE {where_clause} ORDER BY POSICAO NULLS LAST FETCH FIRST :limit ROWS ONLY """ return self.client.executar_query(query, params) def buscar_por_id(self, id_pessoa: int) -> Optional[ConsultorRanking]: """ Busca consultor específico com sua posição no ranking. """ query = """ SELECT ID_PESSOA, NOME, POSICAO, PONTUACAO_TOTAL, COMPONENTE_A, COMPONENTE_B, COMPONENTE_C, COMPONENTE_D, ATIVO, ANOS_ATUACAO, DT_CALCULO, JSON_DETALHES FROM TB_RANKING_CONSULTOR WHERE ID_PESSOA = :id_pessoa """ results = self.client.executar_query(query, {"id_pessoa": id_pessoa}) if not results: return None r = results[0] json_det = r["JSON_DETALHES"] if hasattr(json_det, "read"): json_det = json_det.read() else: json_det = str(json_det) if json_det else "{}" return ConsultorRanking( id_pessoa=r["ID_PESSOA"], nome=r["NOME"], posicao=r["POSICAO"], pontuacao_total=float(r["PONTUACAO_TOTAL"]), componente_a=float(r["COMPONENTE_A"]), componente_b=float(r["COMPONENTE_B"]), componente_c=float(r["COMPONENTE_C"]), componente_d=float(r["COMPONENTE_D"]), ativo=r["ATIVO"] == "S", anos_atuacao=float(r["ANOS_ATUACAO"]), dt_calculo=r["DT_CALCULO"], json_detalhes=json_det ) def atualizar_posicoes(self) -> None: """ Chama a procedure SP_ATUALIZAR_POSICOES para recalcular as posições. """ with self.client.get_connection() as conn: cursor = conn.cursor() try: cursor.callproc("SP_ATUALIZAR_POSICOES") conn.commit() except Exception as e: conn.rollback() raise RuntimeError(f"Erro ao atualizar posições: {e}") finally: cursor.close() def obter_estatisticas(self) -> Dict[str, Any]: """ Retorna estatísticas do ranking. """ query = """ SELECT COUNT(*) AS TOTAL_CONSULTORES, COUNT(CASE WHEN ATIVO = 'S' THEN 1 END) AS TOTAL_ATIVOS, COUNT(CASE WHEN ATIVO = 'N' THEN 1 END) AS TOTAL_INATIVOS, MAX(DT_CALCULO) AS ULTIMA_ATUALIZACAO, AVG(PONTUACAO_TOTAL) AS PONTUACAO_MEDIA, MAX(PONTUACAO_TOTAL) AS PONTUACAO_MAXIMA, MIN(PONTUACAO_TOTAL) AS PONTUACAO_MINIMA, AVG(COMPONENTE_A) AS MEDIA_COMP_A, AVG(COMPONENTE_B) AS MEDIA_COMP_B, AVG(COMPONENTE_C) AS MEDIA_COMP_C, AVG(COMPONENTE_D) AS MEDIA_COMP_D FROM TB_RANKING_CONSULTOR """ results = self.client.executar_query(query) if not results: return {} r = results[0] return { "total_consultores": r["TOTAL_CONSULTORES"], "total_ativos": r["TOTAL_ATIVOS"], "total_inativos": r["TOTAL_INATIVOS"], "ultima_atualizacao": r["ULTIMA_ATUALIZACAO"].isoformat() if r["ULTIMA_ATUALIZACAO"] else None, "pontuacao_media": float(r["PONTUACAO_MEDIA"]) if r["PONTUACAO_MEDIA"] else 0, "pontuacao_maxima": float(r["PONTUACAO_MAXIMA"]) if r["PONTUACAO_MAXIMA"] else 0, "pontuacao_minima": float(r["PONTUACAO_MINIMA"]) if r["PONTUACAO_MINIMA"] else 0, "media_componentes": { "a": float(r["MEDIA_COMP_A"]) if r["MEDIA_COMP_A"] else 0, "b": float(r["MEDIA_COMP_B"]) if r["MEDIA_COMP_B"] else 0, "c": float(r["MEDIA_COMP_C"]) if r["MEDIA_COMP_C"] else 0, "d": float(r["MEDIA_COMP_D"]) if r["MEDIA_COMP_D"] else 0 } } def obter_distribuicao(self) -> List[Dict[str, Any]]: """ Retorna distribuição de consultores por faixa de pontuação. """ query = """ SELECT CASE WHEN PONTUACAO_TOTAL >= 800 THEN '800+' WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' ELSE '0-199' END AS FAIXA, COUNT(*) AS QUANTIDADE, ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM TB_RANKING_CONSULTOR), 2) AS PERCENTUAL FROM TB_RANKING_CONSULTOR GROUP BY CASE WHEN PONTUACAO_TOTAL >= 800 THEN '800+' WHEN PONTUACAO_TOTAL >= 600 THEN '600-799' WHEN PONTUACAO_TOTAL >= 400 THEN '400-599' WHEN PONTUACAO_TOTAL >= 200 THEN '200-399' ELSE '0-199' END ORDER BY CASE WHEN FAIXA = '800+' THEN 1 WHEN FAIXA = '600-799' THEN 2 WHEN FAIXA = '400-599' THEN 3 WHEN FAIXA = '200-399' THEN 4 ELSE 5 END """ results = self.client.executar_query(query) return [ { "faixa": r["FAIXA"], "quantidade": r["QUANTIDADE"], "percentual": float(r["PERCENTUAL"]) } for r in results ] def limpar_tabela(self) -> None: """ Limpa todos os registros da tabela de ranking. Usar apenas quando for reprocessar do zero. """ with self.client.get_connection() as conn: cursor = conn.cursor() try: cursor.execute("DELETE FROM TB_RANKING_CONSULTOR") conn.commit() except Exception as e: conn.rollback() raise RuntimeError(f"Erro ao limpar tabela: {e}") finally: cursor.close()