diff --git a/src/routes/admin.js b/src/routes/admin.js index 47c66f8..27467c3 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -233,3 +233,96 @@ router.delete('/promos/:id', async (req, res) => { res.json({ ok: true }); } catch (err) { res.status(500).json({ error: err.message }); } }); + +// ── GENERATION QUEUE ───────────────────────────────────────── + +// GET /api/admin/queue — статус очереди + последние задачи +router.get('/queue', async (req, res) => { + if (!await requireAdmin(req, res)) return; + try { + const [stats, recent, stuck] = await Promise.all([ + // Статистика по статусам + query(` + SELECT status, count(*)::int as cnt, + round(avg(extract(epoch from (updated_at - created_at)))::numeric,1) as avg_sec + FROM generation_jobs + GROUP BY status ORDER BY cnt DESC + `), + // Последние 30 задач + query(` + SELECT j.id, j.type, j.status, + left(j.topic,60) as topic, + left(j.error,120) as error, + j.tokens_in, j.tokens_out, + j.created_at, j.updated_at, + u.email as user_email, + c.name as channel_name + FROM generation_jobs j + LEFT JOIN users u ON u.id = j.user_id + LEFT JOIN channels c ON c.id = j.channel_id + ORDER BY j.created_at DESC LIMIT 30 + `), + // Застрявшие (processing > 5 мин) + query(` + SELECT id, type, topic, created_at, updated_at + FROM generation_jobs + WHERE status = 'processing' + AND updated_at < NOW() - INTERVAL '5 minutes' + `), + ]); + + res.json({ + stats: stats.rows, + recent: recent.rows, + stuck: stuck.rows, + }); + } catch (err) { res.status(500).json({ error: err.message }); } +}); + +// POST /api/admin/queue/:id/retry — перезапустить задачу +router.post('/queue/:id/retry', async (req, res) => { + if (!await requireAdmin(req, res)) return; + try { + const { rows: [job] } = await query( + 'SELECT * FROM generation_jobs WHERE id=$1', [req.params.id] + ); + if (!job) return res.status(404).json({ error: 'Job not found' }); + + // Сбрасываем в pending + await query(` + UPDATE generation_jobs + SET status='pending', error=NULL, updated_at=NOW() + WHERE id=$1 + `, [req.params.id]); + + // Добавляем в очередь + const generationQueue = require('../workers/generation'); + if (generationQueue?.add) { + await generationQueue.add({ + jobId: job.id, + type: job.type, + topic: job.topic, + channelId: job.channel_id, + rubric: job.rubric, + keywords: [], + useCritique: true, + }); + } + + res.json({ ok: true, message: `Job ${job.id} requeued` }); + } catch (err) { res.status(500).json({ error: err.message }); } +}); + +// DELETE /api/admin/queue/stuck — сбросить застрявшие processing → failed +router.delete('/queue/stuck', async (req, res) => { + if (!await requireAdmin(req, res)) return; + try { + const { rows } = await query(` + UPDATE generation_jobs + SET status='failed', error='Сброшено администратором (stuck)', updated_at=NOW() + WHERE status='processing' AND updated_at < NOW() - INTERVAL '5 minutes' + RETURNING id + `); + res.json({ ok: true, cleared: rows.length }); + } catch (err) { res.status(500).json({ error: err.message }); } +});