Corrigir E2BIG em pipelines, adicionar diretório de projeto e retomada

- Instalar Claude CLI no container Docker (npm install -g)
- Pipar prompt via stdin ao invés de argumento -p (resolve E2BIG)
- Adicionar campo workingDirectory na criação/edição de pipeline
- Pre-preencher com /home/projetos/ como base path
- Auto-criar diretório se não existir ao executar agente
- Salvar failedAtStep e lastStepInput quando pipeline falha
- Implementar retomada de pipeline a partir do passo que falhou
- Adicionar botão Retomar no histórico para pipelines com erro
- Configurar trust proxy para Express atrás de reverse proxy
This commit is contained in:
Frederico Castro
2026-02-27 23:45:36 -03:00
parent 38556f9bf5
commit 275d74b18c
11 changed files with 274 additions and 16 deletions

View File

@@ -1,5 +1,5 @@
import { spawn } from 'child_process';
import { existsSync } from 'fs';
import { existsSync, mkdirSync } from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import { v4 as uuidv4 } from 'uuid';
@@ -58,9 +58,9 @@ function cleanEnv(agentSecrets) {
return env;
}
function buildArgs(agentConfig, prompt) {
function buildArgs(agentConfig) {
const model = agentConfig.model || 'claude-sonnet-4-6';
const args = ['-p', prompt, '--output-format', 'stream-json', '--verbose', '--model', model];
const args = ['--output-format', 'stream-json', '--verbose', '--model', model];
if (existsSync(AGENT_SETTINGS)) {
args.push('--settings', AGENT_SETTINGS);
@@ -290,9 +290,13 @@ function validateWorkingDirectory(agentConfig, executionId, onError) {
}
if (!existsSync(agentConfig.workingDirectory)) {
const err = new Error(`Diretório de trabalho não encontrado: ${agentConfig.workingDirectory}`);
if (onError) onError(err, executionId);
return false;
try {
mkdirSync(agentConfig.workingDirectory, { recursive: true });
} catch (e) {
const err = new Error(`Não foi possível criar o diretório: ${agentConfig.workingDirectory} (${e.message})`);
if (onError) onError(err, executionId);
return false;
}
}
return true;
@@ -311,11 +315,11 @@ export function execute(agentConfig, task, callbacks = {}, secrets = null) {
if (!validateWorkingDirectory(agentConfig, executionId, onError)) return null;
const prompt = buildPrompt(task.description || task, task.instructions);
const args = buildArgs(agentConfig, prompt);
const args = buildArgs(agentConfig);
const spawnOptions = {
env: cleanEnv(secrets),
stdio: ['ignore', 'pipe', 'pipe'],
stdio: ['pipe', 'pipe', 'pipe'],
};
if (agentConfig.workingDirectory && agentConfig.workingDirectory.trim()) {
@@ -325,6 +329,8 @@ export function execute(agentConfig, task, callbacks = {}, secrets = null) {
console.log(`[executor] Iniciando: ${executionId} | Modelo: ${agentConfig.model || 'claude-sonnet-4-6'}`);
const child = spawn(CLAUDE_BIN, args, spawnOptions);
child.stdin.write(prompt);
child.stdin.end();
activeExecutions.set(executionId, {
process: child,
@@ -356,7 +362,6 @@ export function resume(agentConfig, sessionId, message, callbacks = {}) {
const model = agentConfig.model || 'claude-sonnet-4-6';
const args = [
'--resume', sessionId,
'-p', sanitizeText(message),
'--output-format', 'stream-json',
'--verbose',
'--model', model,
@@ -373,7 +378,7 @@ export function resume(agentConfig, sessionId, message, callbacks = {}) {
const spawnOptions = {
env: cleanEnv(),
stdio: ['ignore', 'pipe', 'pipe'],
stdio: ['pipe', 'pipe', 'pipe'],
};
if (agentConfig.workingDirectory && agentConfig.workingDirectory.trim()) {
@@ -383,6 +388,8 @@ export function resume(agentConfig, sessionId, message, callbacks = {}) {
console.log(`[executor] Resumindo sessão: ${sessionId} | Execução: ${executionId}`);
const child = spawn(CLAUDE_BIN, args, spawnOptions);
child.stdin.write(sanitizeText(message));
child.stdin.end();
activeExecutions.set(executionId, {
process: child,
@@ -443,7 +450,6 @@ ${text}
</conteudo_para_resumir>`;
const args = [
'-p', prompt,
'--output-format', 'text',
'--model', 'claude-haiku-4-5-20251001',
'--max-turns', '1',
@@ -456,8 +462,10 @@ ${text}
const child = spawn(CLAUDE_BIN, args, {
env: cleanEnv(),
stdio: ['ignore', 'pipe', 'pipe'],
stdio: ['pipe', 'pipe', 'pipe'],
});
child.stdin.write(prompt);
child.stdin.end();
let output = '';
const timer = setTimeout(() => {

View File

@@ -214,8 +214,9 @@ export async function executePipeline(pipelineId, initialInput, wsCallback, opti
if (agent.status !== 'active') throw new Error(`Agente ${agent.agent_name} está inativo`);
const stepConfig = { ...agent.config };
if (options.workingDirectory) {
stepConfig.workingDirectory = options.workingDirectory;
const effectiveWorkdir = options.workingDirectory || pl.workingDirectory;
if (effectiveWorkdir) {
stepConfig.workingDirectory = effectiveWorkdir;
}
const prompt = applyTemplate(step.inputTemplate, currentInput);
@@ -318,6 +319,8 @@ export async function executePipeline(pipelineId, initialInput, wsCallback, opti
executionsStore.update(historyRecord.id, {
status: 'error',
error: err.message,
failedAtStep: pipelineState.currentStep,
lastStepInput: currentInput.slice(0, 50000),
endedAt: new Date().toISOString(),
totalCostUsd: totalCost,
});
@@ -359,6 +362,175 @@ export function cancelPipeline(id) {
return true;
}
export async function resumePipeline(executionId, wsCallback, options = {}) {
const prevExec = executionsStore.getById(executionId);
if (!prevExec) throw new Error('Execução não encontrada');
if (prevExec.status !== 'error') throw new Error('Só é possível retomar execuções com erro');
if (prevExec.type !== 'pipeline') throw new Error('Execução não é de pipeline');
const pl = pipelinesStore.getById(prevExec.pipelineId);
if (!pl) throw new Error(`Pipeline ${prevExec.pipelineId} não encontrado`);
const startStep = prevExec.failedAtStep || 0;
const initialInput = prevExec.lastStepInput || prevExec.input;
const newExecId = uuidv4();
const pipelineState = { pipelineId: prevExec.pipelineId, currentExecutionId: null, currentStep: startStep, canceled: false, pendingApproval: null };
activePipelines.set(newExecId, pipelineState);
const prevSteps = Array.isArray(prevExec.steps) ? [...prevExec.steps] : [];
const prevCost = prevExec.totalCostUsd || 0;
const historyRecord = executionsStore.create({
type: 'pipeline',
pipelineId: prevExec.pipelineId,
pipelineName: pl.name,
input: prevExec.input,
resumedFrom: executionId,
resumedAtStep: startStep,
status: 'running',
startedAt: new Date().toISOString(),
steps: prevSteps,
totalCostUsd: prevCost,
});
const steps = buildSteps(pl.steps);
const results = prevSteps.map(s => ({ stepId: s.stepId, agentId: s.agentId, agentName: s.agentName, result: s.result, sessionId: '' }));
let currentInput = initialInput;
let totalCost = prevCost;
try {
for (let i = startStep; i < steps.length; i++) {
if (pipelineState.canceled) break;
const step = steps[i];
pipelineState.currentStep = i;
if (step.requiresApproval && i > 0) {
const prevAgentName = results.length > 0 ? results[results.length - 1].agentName : '';
executionsStore.update(historyRecord.id, { status: 'awaiting_approval' });
if (wsCallback) wsCallback({ type: 'pipeline_status', pipelineId: prevExec.pipelineId, status: 'awaiting_approval', stepIndex: i });
const approved = await waitForApproval(newExecId, prevExec.pipelineId, i, currentInput, prevAgentName, wsCallback);
if (!approved) {
pipelineState.canceled = true;
executionsStore.update(historyRecord.id, { status: 'rejected', endedAt: new Date().toISOString(), totalCostUsd: totalCost });
if (wsCallback) wsCallback({ type: 'pipeline_rejected', pipelineId: prevExec.pipelineId, stepIndex: i });
break;
}
executionsStore.update(historyRecord.id, { status: 'running' });
}
if (pipelineState.canceled) break;
const agent = agentsStore.getById(step.agentId);
if (!agent) throw new Error(`Agente ${step.agentId} não encontrado no passo ${i}`);
if (agent.status !== 'active') throw new Error(`Agente ${agent.agent_name} está inativo`);
const stepConfig = { ...agent.config };
const effectiveWorkdir = options.workingDirectory || pl.workingDirectory;
if (effectiveWorkdir) stepConfig.workingDirectory = effectiveWorkdir;
const prompt = applyTemplate(step.inputTemplate, currentInput);
const stepStart = new Date().toISOString();
if (wsCallback) {
wsCallback({
type: 'pipeline_step_start',
pipelineId: prevExec.pipelineId,
stepIndex: i,
stepId: step.id,
agentName: agent.agent_name,
totalSteps: steps.length,
resumed: true,
});
}
const stepResult = await executeStepAsPromise(stepConfig, prompt, pipelineState, wsCallback, prevExec.pipelineId, i);
if (pipelineState.canceled) break;
totalCost += stepResult.costUsd;
currentInput = stepResult.text;
results.push({ stepId: step.id, agentId: step.agentId, agentName: agent.agent_name, result: stepResult.text, sessionId: stepResult.sessionId });
const current = executionsStore.getById(historyRecord.id);
const savedSteps = current ? (current.steps || []) : [];
savedSteps.push({
stepIndex: i,
agentId: step.agentId,
agentName: agent.agent_name,
prompt: prompt.slice(0, 5000),
result: stepResult.text,
startedAt: stepStart,
endedAt: new Date().toISOString(),
status: 'completed',
costUsd: stepResult.costUsd,
durationMs: stepResult.durationMs,
numTurns: stepResult.numTurns,
});
executionsStore.update(historyRecord.id, { steps: savedSteps, totalCostUsd: totalCost });
if (wsCallback) {
wsCallback({
type: 'pipeline_step_complete',
pipelineId: prevExec.pipelineId,
stepIndex: i,
stepId: step.id,
result: stepResult.text.slice(0, 500),
costUsd: stepResult.costUsd,
});
}
if (i < steps.length - 1 && !pipelineState.canceled) {
if (wsCallback) wsCallback({ type: 'pipeline_summarizing', pipelineId: prevExec.pipelineId, stepIndex: i, originalLength: currentInput.length });
const summarized = await executor.summarize(currentInput);
if (summarized !== currentInput) {
if (wsCallback) wsCallback({ type: 'pipeline_summarized', pipelineId: prevExec.pipelineId, stepIndex: i, originalLength: currentInput.length, summarizedLength: summarized.length });
currentInput = summarized;
}
}
}
activePipelines.delete(newExecId);
const finalStatus = pipelineState.canceled ? 'canceled' : 'completed';
executionsStore.update(historyRecord.id, { status: finalStatus, endedAt: new Date().toISOString(), totalCostUsd: totalCost });
if (!pipelineState.canceled) {
try {
const updated = executionsStore.getById(historyRecord.id);
if (updated) {
const report = generatePipelineReport(updated);
if (wsCallback) wsCallback({ type: 'report_generated', pipelineId: prevExec.pipelineId, reportFile: report.filename });
}
} catch (e) { console.error('[pipeline] Erro ao gerar relatório:', e.message); }
const lastResult = results.length > 0 ? results[results.length - 1] : null;
if (wsCallback) wsCallback({
type: 'pipeline_complete',
pipelineId: prevExec.pipelineId,
executionId: newExecId,
results,
totalCostUsd: totalCost,
lastAgentId: lastResult?.agentId || '',
lastAgentName: lastResult?.agentName || '',
lastSessionId: lastResult?.sessionId || '',
});
}
return { executionId: newExecId, results };
} catch (err) {
activePipelines.delete(newExecId);
executionsStore.update(historyRecord.id, {
status: 'error',
error: err.message,
failedAtStep: pipelineState.currentStep,
lastStepInput: currentInput.slice(0, 50000),
endedAt: new Date().toISOString(),
totalCostUsd: totalCost,
});
if (wsCallback) wsCallback({ type: 'pipeline_error', pipelineId: prevExec.pipelineId, stepIndex: pipelineState.currentStep, error: err.message });
throw err;
}
}
export function getActivePipelines() {
return Array.from(activePipelines.entries()).map(([id, state]) => ({
executionId: id,
@@ -375,6 +547,7 @@ export function createPipeline(data) {
return pipelinesStore.create({
name: data.name,
description: data.description || '',
workingDirectory: data.workingDirectory || '',
steps: buildSteps(data.steps),
status: data.status || 'active',
});
@@ -386,6 +559,7 @@ export function updatePipeline(id, data) {
const updateData = {};
if (data.name !== undefined) updateData.name = data.name;
if (data.description !== undefined) updateData.description = data.description;
if (data.workingDirectory !== undefined) updateData.workingDirectory = data.workingDirectory;
if (data.status !== undefined) updateData.status = data.status;
if (data.steps !== undefined) updateData.steps = buildSteps(data.steps);
return pipelinesStore.update(id, updateData);

View File

@@ -506,6 +506,18 @@ router.post('/pipelines/:id/reject', (req, res) => {
}
});
router.post('/pipelines/resume/:executionId', async (req, res) => {
try {
const clientId = req.headers['x-client-id'] || null;
const result = pipeline.resumePipeline(req.params.executionId, (msg) => wsCallback(msg, clientId));
result.catch(() => {});
res.status(202).json({ status: 'resumed' });
} catch (err) {
const status = err.message.includes('não encontrad') ? 404 : 400;
res.status(status).json({ error: err.message });
}
});
router.get('/webhooks', (req, res) => {
try {
res.json(webhooksStore.getAll());