import { v4 as uuidv4 } from 'uuid'; import { pipelinesStore, agentsStore, executionsStore } from '../store/db.js'; import * as executor from './executor.js'; import * as gitIntegration from './git-integration.js'; import { mem } from '../cache/index.js'; import { generatePipelineReport } from '../reports/generator.js'; const activePipelines = new Map(); const AGENT_MAP_TTL = 30_000; function getAgentMap() { const hit = mem.get('agent:map'); if (hit !== undefined) return hit; const agents = agentsStore.getAll(); const map = new Map(agents.map((a) => [a.id, a.agent_name])); mem.set('agent:map', map, AGENT_MAP_TTL); return map; } export function invalidateAgentMapCache() { mem.del('agent:map'); } function validatePipeline(data) { const errors = []; if (!data.name || typeof data.name !== 'string') { errors.push('name é obrigatório e deve ser uma string'); } if (!Array.isArray(data.steps) || data.steps.length === 0) { errors.push('steps é obrigatório e deve ser um array não vazio'); } else { data.steps.forEach((step, i) => { if (!step.agentId) errors.push(`steps[${i}].agentId é obrigatório`); }); } return errors; } function buildSteps(steps) { return steps .map((step, index) => ({ id: step.id || uuidv4(), agentId: step.agentId, order: step.order !== undefined ? step.order : index, inputTemplate: step.inputTemplate || null, description: step.description || '', requiresApproval: index === 0 ? false : !!step.requiresApproval, })) .sort((a, b) => a.order - b.order); } function enrichStepsWithAgentNames(steps) { const agentMap = getAgentMap(); return steps.map((s) => ({ ...s, agentName: agentMap.get(s.agentId) || s.agentId })); } function applyTemplate(template, input) { if (!template) return input; return template.replace(/\{\{input\}\}/g, input); } function executeStepAsPromise(agentConfig, prompt, pipelineState, wsCallback, pipelineId, stepIndex) { return new Promise((resolve, reject) => { const executionId = executor.execute( agentConfig, { description: prompt }, { onData: (parsed, execId) => { if (wsCallback) { wsCallback({ type: 'pipeline_step_output', pipelineId, stepIndex, executionId: execId, data: parsed, }); } }, onError: (err) => reject(err), onComplete: (result) => { if (result.exitCode !== 0 && !result.result) { reject(new Error(result.stderr || `Processo encerrado com código ${result.exitCode}`)); return; } resolve({ text: result.result || '', costUsd: result.costUsd || 0, durationMs: result.durationMs || 0, numTurns: result.numTurns || 0, sessionId: result.sessionId || '', }); }, } ); if (!executionId) { reject(new Error('Limite de execuções simultâneas atingido')); return; } pipelineState.currentExecutionId = executionId; }); } function waitForApproval(executionId, pipelineId, stepIndex, previousOutput, agentName, wsCallback) { return new Promise((resolve) => { const state = activePipelines.get(executionId); if (!state) { resolve(false); return; } state.pendingApproval = { stepIndex, previousOutput: previousOutput.slice(0, 3000), agentName, resolve, }; if (wsCallback) { wsCallback({ type: 'pipeline_approval_required', pipelineId, executionId, stepIndex, agentName, previousOutput: previousOutput.slice(0, 3000), }); } }); } function findPipelineState(idOrExecId) { if (activePipelines.has(idOrExecId)) return activePipelines.get(idOrExecId); for (const [, state] of activePipelines) { if (state.pipelineId === idOrExecId) return state; } return null; } export function approvePipelineStep(id) { const state = findPipelineState(id); if (!state?.pendingApproval) return false; const { resolve } = state.pendingApproval; state.pendingApproval = null; resolve(true); return true; } export function rejectPipelineStep(id) { const state = findPipelineState(id); if (!state?.pendingApproval) return false; const { resolve } = state.pendingApproval; state.pendingApproval = null; resolve(false); return true; } export async function executePipeline(pipelineId, initialInput, wsCallback, options = {}) { const pl = pipelinesStore.getById(pipelineId); if (!pl) throw new Error(`Pipeline ${pipelineId} não encontrado`); if (pl.status !== 'active') throw new Error(`Pipeline "${pl.name}" está desativado`); const executionId = uuidv4(); const pipelineState = { pipelineId, currentExecutionId: null, currentStep: 0, canceled: false, pendingApproval: null }; activePipelines.set(executionId, pipelineState); const historyRecord = executionsStore.create({ type: 'pipeline', pipelineId, pipelineName: pl.name, input: initialInput, status: 'running', startedAt: new Date().toISOString(), steps: [], totalCostUsd: 0, }); const steps = buildSteps(pl.steps); const results = []; let currentInput = initialInput; let totalCost = 0; try { for (let i = 0; i < steps.length; i++) { if (pipelineState.canceled) break; const step = steps[i]; pipelineState.currentStep = i; if (step.requiresApproval && i > 0) { const prevAgentName = results.length > 0 ? results[results.length - 1].agentName : ''; executionsStore.update(historyRecord.id, { status: 'awaiting_approval' }); if (wsCallback) { wsCallback({ type: 'pipeline_status', pipelineId, status: 'awaiting_approval', stepIndex: i }); } const approved = await waitForApproval(executionId, pipelineId, i, currentInput, prevAgentName, wsCallback); if (!approved) { pipelineState.canceled = true; executionsStore.update(historyRecord.id, { status: 'rejected', endedAt: new Date().toISOString(), totalCostUsd: totalCost }); if (wsCallback) { wsCallback({ type: 'pipeline_rejected', pipelineId, stepIndex: i }); } break; } executionsStore.update(historyRecord.id, { status: 'running' }); } if (pipelineState.canceled) break; const agent = agentsStore.getById(step.agentId); if (!agent) throw new Error(`Agente ${step.agentId} não encontrado no passo ${i}`); if (agent.status !== 'active') throw new Error(`Agente ${agent.agent_name} está inativo`); const stepConfig = { ...agent.config }; const effectiveWorkdir = options.workingDirectory || pl.workingDirectory; if (effectiveWorkdir) { stepConfig.workingDirectory = effectiveWorkdir; } const prompt = applyTemplate(step.inputTemplate, currentInput); const stepStart = new Date().toISOString(); if (wsCallback) { wsCallback({ type: 'pipeline_step_start', pipelineId, stepIndex: i, stepId: step.id, agentName: agent.agent_name, totalSteps: steps.length, }); } const stepResult = await executeStepAsPromise(stepConfig, prompt, pipelineState, wsCallback, pipelineId, i); if (pipelineState.canceled) break; totalCost += stepResult.costUsd; currentInput = stepResult.text; results.push({ stepId: step.id, agentId: step.agentId, agentName: agent.agent_name, result: stepResult.text, sessionId: stepResult.sessionId }); const current = executionsStore.getById(historyRecord.id); const savedSteps = current ? (current.steps || []) : []; savedSteps.push({ stepIndex: i, agentId: step.agentId, agentName: agent.agent_name, prompt: prompt.slice(0, 5000), result: stepResult.text, startedAt: stepStart, endedAt: new Date().toISOString(), status: 'completed', costUsd: stepResult.costUsd, durationMs: stepResult.durationMs, numTurns: stepResult.numTurns, }); executionsStore.update(historyRecord.id, { steps: savedSteps, totalCostUsd: totalCost }); if (wsCallback) { wsCallback({ type: 'pipeline_step_complete', pipelineId, stepIndex: i, stepId: step.id, result: stepResult.text.slice(0, 500), costUsd: stepResult.costUsd, }); } if (i < steps.length - 1 && !pipelineState.canceled) { if (wsCallback) { wsCallback({ type: 'pipeline_summarizing', pipelineId, stepIndex: i, originalLength: currentInput.length }); } const summarized = await executor.summarize(currentInput); if (summarized !== currentInput) { if (wsCallback) { wsCallback({ type: 'pipeline_summarized', pipelineId, stepIndex: i, originalLength: currentInput.length, summarizedLength: summarized.length }); } currentInput = summarized; } } } activePipelines.delete(executionId); const finalStatus = pipelineState.canceled ? 'canceled' : 'completed'; executionsStore.update(historyRecord.id, { status: finalStatus, endedAt: new Date().toISOString(), totalCostUsd: totalCost, }); if (!pipelineState.canceled) { if (options.repoName) { try { const repoDir = gitIntegration.getProjectDir(options.repoName); const gitResult = await gitIntegration.commitAndPush(repoDir, pl.name, `Pipeline: ${pl.name}`); if (gitResult.changed && wsCallback) { wsCallback({ type: 'pipeline_step_output', pipelineId, stepIndex: steps.length - 1, data: { type: 'success', content: `Git: commit ${gitResult.commitHash} pushed para ${options.repoName} (${gitResult.filesChanged} arquivos) → ${gitResult.commitUrl}` }, }); } } catch (e) { console.error('[pipeline] Erro no auto-commit:', e.message); } } try { const updated = executionsStore.getById(historyRecord.id); if (updated) { const report = generatePipelineReport(updated); if (wsCallback) wsCallback({ type: 'report_generated', pipelineId, reportFile: report.filename }); } } catch (e) { console.error('[pipeline] Erro ao gerar relatório:', e.message); } const lastResult = results.length > 0 ? results[results.length - 1] : null; if (wsCallback) wsCallback({ type: 'pipeline_complete', pipelineId, executionId, results, totalCostUsd: totalCost, lastAgentId: lastResult?.agentId || '', lastAgentName: lastResult?.agentName || '', lastSessionId: lastResult?.sessionId || '', }); } return { executionId, results }; } catch (err) { activePipelines.delete(executionId); executionsStore.update(historyRecord.id, { status: 'error', error: err.message, failedAtStep: pipelineState.currentStep, lastStepInput: currentInput.slice(0, 50000), endedAt: new Date().toISOString(), totalCostUsd: totalCost, }); if (wsCallback) { wsCallback({ type: 'pipeline_error', pipelineId, stepIndex: pipelineState.currentStep, error: err.message, }); } throw err; } } export function cancelPipeline(id) { let executionId = id; let state = activePipelines.get(id); if (!state) { for (const [execId, s] of activePipelines) { if (s.pipelineId === id) { state = s; executionId = execId; break; } } } if (!state) return false; state.canceled = true; if (state.pendingApproval) { state.pendingApproval.resolve(false); state.pendingApproval = null; } if (state.currentExecutionId) executor.cancel(state.currentExecutionId); activePipelines.delete(executionId); const allExecs = executionsStore.getAll(); const exec = allExecs.find(e => e.pipelineId === state.pipelineId && (e.status === 'running' || e.status === 'awaiting_approval')); if (exec) { executionsStore.update(exec.id, { status: 'canceled', endedAt: new Date().toISOString() }); } return true; } export async function resumePipeline(executionId, wsCallback, options = {}) { const prevExec = executionsStore.getById(executionId); if (!prevExec) throw new Error('Execução não encontrada'); if (prevExec.status !== 'error') throw new Error('Só é possível retomar execuções com erro'); if (prevExec.type !== 'pipeline') throw new Error('Execução não é de pipeline'); const pl = pipelinesStore.getById(prevExec.pipelineId); if (!pl) throw new Error(`Pipeline ${prevExec.pipelineId} não encontrado`); const startStep = prevExec.failedAtStep || 0; const initialInput = prevExec.lastStepInput || prevExec.input; const newExecId = uuidv4(); const pipelineState = { pipelineId: prevExec.pipelineId, currentExecutionId: null, currentStep: startStep, canceled: false, pendingApproval: null }; activePipelines.set(newExecId, pipelineState); const prevSteps = Array.isArray(prevExec.steps) ? [...prevExec.steps] : []; const prevCost = prevExec.totalCostUsd || 0; const historyRecord = executionsStore.create({ type: 'pipeline', pipelineId: prevExec.pipelineId, pipelineName: pl.name, input: prevExec.input, resumedFrom: executionId, resumedAtStep: startStep, status: 'running', startedAt: new Date().toISOString(), steps: prevSteps, totalCostUsd: prevCost, }); const steps = buildSteps(pl.steps); const results = prevSteps.map(s => ({ stepId: s.stepId, agentId: s.agentId, agentName: s.agentName, result: s.result, sessionId: '' })); let currentInput = initialInput; let totalCost = prevCost; try { for (let i = startStep; i < steps.length; i++) { if (pipelineState.canceled) break; const step = steps[i]; pipelineState.currentStep = i; if (step.requiresApproval && i > 0) { const prevAgentName = results.length > 0 ? results[results.length - 1].agentName : ''; executionsStore.update(historyRecord.id, { status: 'awaiting_approval' }); if (wsCallback) wsCallback({ type: 'pipeline_status', pipelineId: prevExec.pipelineId, status: 'awaiting_approval', stepIndex: i }); const approved = await waitForApproval(newExecId, prevExec.pipelineId, i, currentInput, prevAgentName, wsCallback); if (!approved) { pipelineState.canceled = true; executionsStore.update(historyRecord.id, { status: 'rejected', endedAt: new Date().toISOString(), totalCostUsd: totalCost }); if (wsCallback) wsCallback({ type: 'pipeline_rejected', pipelineId: prevExec.pipelineId, stepIndex: i }); break; } executionsStore.update(historyRecord.id, { status: 'running' }); } if (pipelineState.canceled) break; const agent = agentsStore.getById(step.agentId); if (!agent) throw new Error(`Agente ${step.agentId} não encontrado no passo ${i}`); if (agent.status !== 'active') throw new Error(`Agente ${agent.agent_name} está inativo`); const stepConfig = { ...agent.config }; const effectiveWorkdir = options.workingDirectory || pl.workingDirectory; if (effectiveWorkdir) stepConfig.workingDirectory = effectiveWorkdir; const prompt = applyTemplate(step.inputTemplate, currentInput); const stepStart = new Date().toISOString(); if (wsCallback) { wsCallback({ type: 'pipeline_step_start', pipelineId: prevExec.pipelineId, stepIndex: i, stepId: step.id, agentName: agent.agent_name, totalSteps: steps.length, resumed: true, }); } const stepResult = await executeStepAsPromise(stepConfig, prompt, pipelineState, wsCallback, prevExec.pipelineId, i); if (pipelineState.canceled) break; totalCost += stepResult.costUsd; currentInput = stepResult.text; results.push({ stepId: step.id, agentId: step.agentId, agentName: agent.agent_name, result: stepResult.text, sessionId: stepResult.sessionId }); const current = executionsStore.getById(historyRecord.id); const savedSteps = current ? (current.steps || []) : []; savedSteps.push({ stepIndex: i, agentId: step.agentId, agentName: agent.agent_name, prompt: prompt.slice(0, 5000), result: stepResult.text, startedAt: stepStart, endedAt: new Date().toISOString(), status: 'completed', costUsd: stepResult.costUsd, durationMs: stepResult.durationMs, numTurns: stepResult.numTurns, }); executionsStore.update(historyRecord.id, { steps: savedSteps, totalCostUsd: totalCost }); if (wsCallback) { wsCallback({ type: 'pipeline_step_complete', pipelineId: prevExec.pipelineId, stepIndex: i, stepId: step.id, result: stepResult.text.slice(0, 500), costUsd: stepResult.costUsd, }); } if (i < steps.length - 1 && !pipelineState.canceled) { if (wsCallback) wsCallback({ type: 'pipeline_summarizing', pipelineId: prevExec.pipelineId, stepIndex: i, originalLength: currentInput.length }); const summarized = await executor.summarize(currentInput); if (summarized !== currentInput) { if (wsCallback) wsCallback({ type: 'pipeline_summarized', pipelineId: prevExec.pipelineId, stepIndex: i, originalLength: currentInput.length, summarizedLength: summarized.length }); currentInput = summarized; } } } activePipelines.delete(newExecId); const finalStatus = pipelineState.canceled ? 'canceled' : 'completed'; executionsStore.update(historyRecord.id, { status: finalStatus, endedAt: new Date().toISOString(), totalCostUsd: totalCost }); if (!pipelineState.canceled) { try { const updated = executionsStore.getById(historyRecord.id); if (updated) { const report = generatePipelineReport(updated); if (wsCallback) wsCallback({ type: 'report_generated', pipelineId: prevExec.pipelineId, reportFile: report.filename }); } } catch (e) { console.error('[pipeline] Erro ao gerar relatório:', e.message); } const lastResult = results.length > 0 ? results[results.length - 1] : null; if (wsCallback) wsCallback({ type: 'pipeline_complete', pipelineId: prevExec.pipelineId, executionId: newExecId, results, totalCostUsd: totalCost, lastAgentId: lastResult?.agentId || '', lastAgentName: lastResult?.agentName || '', lastSessionId: lastResult?.sessionId || '', }); } return { executionId: newExecId, results }; } catch (err) { activePipelines.delete(newExecId); executionsStore.update(historyRecord.id, { status: 'error', error: err.message, failedAtStep: pipelineState.currentStep, lastStepInput: currentInput.slice(0, 50000), endedAt: new Date().toISOString(), totalCostUsd: totalCost, }); if (wsCallback) wsCallback({ type: 'pipeline_error', pipelineId: prevExec.pipelineId, stepIndex: pipelineState.currentStep, error: err.message }); throw err; } } export function getActivePipelines() { return Array.from(activePipelines.entries()).map(([id, state]) => ({ executionId: id, pipelineId: state.pipelineId, currentStep: state.currentStep, currentExecutionId: state.currentExecutionId, pendingApproval: !!state.pendingApproval, })); } export function createPipeline(data) { const errors = validatePipeline(data); if (errors.length > 0) throw new Error(errors.join('; ')); return pipelinesStore.create({ name: data.name, description: data.description || '', workingDirectory: data.workingDirectory || '', steps: buildSteps(data.steps), status: data.status || 'active', }); } export function updatePipeline(id, data) { const existing = pipelinesStore.getById(id); if (!existing) return null; const updateData = {}; if (data.name !== undefined) updateData.name = data.name; if (data.description !== undefined) updateData.description = data.description; if (data.workingDirectory !== undefined) updateData.workingDirectory = data.workingDirectory; if (data.status !== undefined) updateData.status = data.status; if (data.steps !== undefined) updateData.steps = buildSteps(data.steps); return pipelinesStore.update(id, updateData); } export function deletePipeline(id) { return pipelinesStore.delete(id); } export function getPipeline(id) { return pipelinesStore.getById(id); } export function getAllPipelines() { return pipelinesStore.getAll().map((p) => ({ ...p, steps: enrichStepsWithAgentNames(p.steps || []), })); }