feat: generation queue admin

routes/admin.js: GET /queue (stats+recent30+stuck), POST /queue/:id/retry, DELETE /queue/stuck
  stuck = processing > 5 min → сбрасываем в failed
  retry = pending + requeue через Bull
This commit is contained in:
Ник (Claude)
2026-06-13 10:13:21 +03:00
parent ce74ac9909
commit 7994b0e73c
+93
View File
@@ -233,3 +233,96 @@ router.delete('/promos/:id', async (req, res) => {
res.json({ ok: true });
} catch (err) { res.status(500).json({ error: err.message }); }
});
// ── GENERATION QUEUE ─────────────────────────────────────────
// GET /api/admin/queue — статус очереди + последние задачи
router.get('/queue', async (req, res) => {
if (!await requireAdmin(req, res)) return;
try {
const [stats, recent, stuck] = await Promise.all([
// Статистика по статусам
query(`
SELECT status, count(*)::int as cnt,
round(avg(extract(epoch from (updated_at - created_at)))::numeric,1) as avg_sec
FROM generation_jobs
GROUP BY status ORDER BY cnt DESC
`),
// Последние 30 задач
query(`
SELECT j.id, j.type, j.status,
left(j.topic,60) as topic,
left(j.error,120) as error,
j.tokens_in, j.tokens_out,
j.created_at, j.updated_at,
u.email as user_email,
c.name as channel_name
FROM generation_jobs j
LEFT JOIN users u ON u.id = j.user_id
LEFT JOIN channels c ON c.id = j.channel_id
ORDER BY j.created_at DESC LIMIT 30
`),
// Застрявшие (processing > 5 мин)
query(`
SELECT id, type, topic, created_at, updated_at
FROM generation_jobs
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '5 minutes'
`),
]);
res.json({
stats: stats.rows,
recent: recent.rows,
stuck: stuck.rows,
});
} catch (err) { res.status(500).json({ error: err.message }); }
});
// POST /api/admin/queue/:id/retry — перезапустить задачу
router.post('/queue/:id/retry', async (req, res) => {
if (!await requireAdmin(req, res)) return;
try {
const { rows: [job] } = await query(
'SELECT * FROM generation_jobs WHERE id=$1', [req.params.id]
);
if (!job) return res.status(404).json({ error: 'Job not found' });
// Сбрасываем в pending
await query(`
UPDATE generation_jobs
SET status='pending', error=NULL, updated_at=NOW()
WHERE id=$1
`, [req.params.id]);
// Добавляем в очередь
const generationQueue = require('../workers/generation');
if (generationQueue?.add) {
await generationQueue.add({
jobId: job.id,
type: job.type,
topic: job.topic,
channelId: job.channel_id,
rubric: job.rubric,
keywords: [],
useCritique: true,
});
}
res.json({ ok: true, message: `Job ${job.id} requeued` });
} catch (err) { res.status(500).json({ error: err.message }); }
});
// DELETE /api/admin/queue/stuck — сбросить застрявшие processing → failed
router.delete('/queue/stuck', async (req, res) => {
if (!await requireAdmin(req, res)) return;
try {
const { rows } = await query(`
UPDATE generation_jobs
SET status='failed', error='Сброшено администратором (stuck)', updated_at=NOW()
WHERE status='processing' AND updated_at < NOW() - INTERVAL '5 minutes'
RETURNING id
`);
res.json({ ok: true, cleared: rows.length });
} catch (err) { res.status(500).json({ error: err.message }); }
});