Histórico persistente de execuções com visualização detalhada
- Novo executionsStore em db.js com cache in-memory e escrita debounced - Camada de cache (src/cache/index.js) com TTL e suporte opcional a Redis - Persistência de execuções de agentes e pipelines com metadados completos - Pipeline grava cada etapa com prompt, resultado, timestamps e status - 4 endpoints REST: listagem paginada com filtros, detalhe, exclusão individual e limpeza total - Componente frontend (history.js) com cards, filtros, paginação e modal de detalhe - Timeline visual para pipelines com prompts colapsáveis por etapa - Correção do executor: --max-turns em vez de --max-tokens, --permission-mode bypassPermissions - Refatoração do scheduler com persistência melhorada e graceful shutdown
This commit is contained in:
@@ -1,9 +1,23 @@
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { pipelinesStore } from '../store/db.js';
|
||||
import { agentsStore } from '../store/db.js';
|
||||
import { pipelinesStore, agentsStore, executionsStore } from '../store/db.js';
|
||||
import * as executor from './executor.js';
|
||||
import { mem } from '../cache/index.js';
|
||||
|
||||
const activePipelines = new Map();
|
||||
const AGENT_MAP_TTL = 30_000;
|
||||
|
||||
function getAgentMap() {
|
||||
const hit = mem.get('agent:map');
|
||||
if (hit !== undefined) return hit;
|
||||
const agents = agentsStore.getAll();
|
||||
const map = new Map(agents.map((a) => [a.id, a.agent_name]));
|
||||
mem.set('agent:map', map, AGENT_MAP_TTL);
|
||||
return map;
|
||||
}
|
||||
|
||||
export function invalidateAgentMapCache() {
|
||||
mem.del('agent:map');
|
||||
}
|
||||
|
||||
function validatePipeline(data) {
|
||||
const errors = [];
|
||||
@@ -13,8 +27,8 @@ function validatePipeline(data) {
|
||||
if (!Array.isArray(data.steps) || data.steps.length === 0) {
|
||||
errors.push('steps é obrigatório e deve ser um array não vazio');
|
||||
} else {
|
||||
data.steps.forEach((step, index) => {
|
||||
if (!step.agentId) errors.push(`steps[${index}].agentId é obrigatório`);
|
||||
data.steps.forEach((step, i) => {
|
||||
if (!step.agentId) errors.push(`steps[${i}].agentId é obrigatório`);
|
||||
});
|
||||
}
|
||||
return errors;
|
||||
@@ -33,13 +47,8 @@ function buildSteps(steps) {
|
||||
}
|
||||
|
||||
function enrichStepsWithAgentNames(steps) {
|
||||
const agents = agentsStore.getAll();
|
||||
const agentMap = new Map(agents.map((a) => [a.id, a.agent_name]));
|
||||
|
||||
return steps.map((s) => ({
|
||||
...s,
|
||||
agentName: agentMap.get(s.agentId) || s.agentId,
|
||||
}));
|
||||
const agentMap = getAgentMap();
|
||||
return steps.map((s) => ({ ...s, agentName: agentMap.get(s.agentId) || s.agentId }));
|
||||
}
|
||||
|
||||
function applyTemplate(template, input) {
|
||||
@@ -64,9 +73,7 @@ function executeStepAsPromise(agentConfig, prompt, pipelineState, wsCallback, pi
|
||||
});
|
||||
}
|
||||
},
|
||||
onError: (err) => {
|
||||
reject(err);
|
||||
},
|
||||
onError: (err) => reject(err),
|
||||
onComplete: (result) => {
|
||||
if (result.exitCode !== 0 && !result.result) {
|
||||
reject(new Error(result.stderr || `Processo encerrado com código ${result.exitCode}`));
|
||||
@@ -87,18 +94,23 @@ function executeStepAsPromise(agentConfig, prompt, pipelineState, wsCallback, pi
|
||||
}
|
||||
|
||||
export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
const pipeline = pipelinesStore.getById(pipelineId);
|
||||
if (!pipeline) throw new Error(`Pipeline ${pipelineId} não encontrado`);
|
||||
|
||||
const pipelineState = {
|
||||
currentExecutionId: null,
|
||||
currentStep: 0,
|
||||
canceled: false,
|
||||
};
|
||||
const pl = pipelinesStore.getById(pipelineId);
|
||||
if (!pl) throw new Error(`Pipeline ${pipelineId} não encontrado`);
|
||||
|
||||
const pipelineState = { currentExecutionId: null, currentStep: 0, canceled: false };
|
||||
activePipelines.set(pipelineId, pipelineState);
|
||||
|
||||
const steps = buildSteps(pipeline.steps);
|
||||
const historyRecord = executionsStore.create({
|
||||
type: 'pipeline',
|
||||
pipelineId,
|
||||
pipelineName: pl.name,
|
||||
input: initialInput,
|
||||
status: 'running',
|
||||
startedAt: new Date().toISOString(),
|
||||
steps: [],
|
||||
});
|
||||
|
||||
const steps = buildSteps(pl.steps);
|
||||
const results = [];
|
||||
let currentInput = initialInput;
|
||||
|
||||
@@ -114,6 +126,7 @@ export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
if (agent.status !== 'active') throw new Error(`Agente ${agent.agent_name} está inativo`);
|
||||
|
||||
const prompt = applyTemplate(step.inputTemplate, currentInput);
|
||||
const stepStart = new Date().toISOString();
|
||||
|
||||
if (wsCallback) {
|
||||
wsCallback({
|
||||
@@ -133,6 +146,20 @@ export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
currentInput = result;
|
||||
results.push({ stepId: step.id, agentName: agent.agent_name, result });
|
||||
|
||||
const current = executionsStore.getById(historyRecord.id);
|
||||
const savedSteps = current ? (current.steps || []) : [];
|
||||
savedSteps.push({
|
||||
stepIndex: i,
|
||||
agentId: step.agentId,
|
||||
agentName: agent.agent_name,
|
||||
prompt: prompt.slice(0, 5000),
|
||||
result,
|
||||
startedAt: stepStart,
|
||||
endedAt: new Date().toISOString(),
|
||||
status: 'completed',
|
||||
});
|
||||
executionsStore.update(historyRecord.id, { steps: savedSteps });
|
||||
|
||||
if (wsCallback) {
|
||||
wsCallback({
|
||||
type: 'pipeline_step_complete',
|
||||
@@ -145,19 +172,23 @@ export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
}
|
||||
|
||||
activePipelines.delete(pipelineId);
|
||||
executionsStore.update(historyRecord.id, {
|
||||
status: pipelineState.canceled ? 'canceled' : 'completed',
|
||||
endedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
if (!pipelineState.canceled && wsCallback) {
|
||||
wsCallback({
|
||||
type: 'pipeline_complete',
|
||||
pipelineId,
|
||||
results,
|
||||
});
|
||||
wsCallback({ type: 'pipeline_complete', pipelineId, results });
|
||||
}
|
||||
|
||||
return results;
|
||||
} catch (err) {
|
||||
activePipelines.delete(pipelineId);
|
||||
|
||||
executionsStore.update(historyRecord.id, {
|
||||
status: 'error',
|
||||
error: err.message,
|
||||
endedAt: new Date().toISOString(),
|
||||
});
|
||||
if (wsCallback) {
|
||||
wsCallback({
|
||||
type: 'pipeline_error',
|
||||
@@ -166,7 +197,6 @@ export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
error: err.message,
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
@@ -174,13 +204,8 @@ export async function executePipeline(pipelineId, initialInput, wsCallback) {
|
||||
export function cancelPipeline(pipelineId) {
|
||||
const state = activePipelines.get(pipelineId);
|
||||
if (!state) return false;
|
||||
|
||||
state.canceled = true;
|
||||
|
||||
if (state.currentExecutionId) {
|
||||
executor.cancel(state.currentExecutionId);
|
||||
}
|
||||
|
||||
if (state.currentExecutionId) executor.cancel(state.currentExecutionId);
|
||||
activePipelines.delete(pipelineId);
|
||||
return true;
|
||||
}
|
||||
@@ -196,27 +221,22 @@ export function getActivePipelines() {
|
||||
export function createPipeline(data) {
|
||||
const errors = validatePipeline(data);
|
||||
if (errors.length > 0) throw new Error(errors.join('; '));
|
||||
|
||||
const pipelineData = {
|
||||
return pipelinesStore.create({
|
||||
name: data.name,
|
||||
description: data.description || '',
|
||||
steps: buildSteps(data.steps),
|
||||
status: data.status || 'active',
|
||||
};
|
||||
|
||||
return pipelinesStore.create(pipelineData);
|
||||
});
|
||||
}
|
||||
|
||||
export function updatePipeline(id, data) {
|
||||
const existing = pipelinesStore.getById(id);
|
||||
if (!existing) return null;
|
||||
|
||||
const updateData = {};
|
||||
if (data.name !== undefined) updateData.name = data.name;
|
||||
if (data.description !== undefined) updateData.description = data.description;
|
||||
if (data.status !== undefined) updateData.status = data.status;
|
||||
if (data.steps !== undefined) updateData.steps = buildSteps(data.steps);
|
||||
|
||||
return pipelinesStore.update(id, updateData);
|
||||
}
|
||||
|
||||
@@ -229,8 +249,7 @@ export function getPipeline(id) {
|
||||
}
|
||||
|
||||
export function getAllPipelines() {
|
||||
const pipelines = pipelinesStore.getAll();
|
||||
return pipelines.map((p) => ({
|
||||
return pipelinesStore.getAll().map((p) => ({
|
||||
...p,
|
||||
steps: enrichStepsWithAgentNames(p.steps || []),
|
||||
}));
|
||||
|
||||
Reference in New Issue
Block a user