forked from admin/zeropost-engine
feat: P6 inbox — TG webhook + AI classify + reply
DB: inbox_messages (text, ai_type, ai_reply, status), channels.tg_webhook_enabled
Engine routes/inbox.js:
POST /api/tg-webhook/:channelId — получаем комментарии от TG (публичный)
GET /api/inbox/:channelId — список сообщений с фильтром
GET /api/inbox/stats/:channelId — статистика по типам
POST /api/inbox/:id/reply — отправить ответ в TG
POST /api/inbox/:id/status — изменить статус
POST /api/inbox/:channelId/setup-webhook — зарегистрировать TG webhook
AI: claude haiku → {type, reply} JSON
This commit is contained in:
@@ -56,6 +56,10 @@ app.post('/api/billing/webhook',
|
||||
})
|
||||
);
|
||||
|
||||
// TG webhook — публичный (TG не шлёт internal secret)
|
||||
const inboxRoutes = require('./src/routes/inbox');
|
||||
app.use('/api', inboxRoutes); // включает /api/tg-webhook/:channelId
|
||||
|
||||
// Simple internal auth middleware
|
||||
app.use((req, res, next) => {
|
||||
const secret = req.headers['x-internal-secret'];
|
||||
@@ -102,6 +106,7 @@ app.use('/api/metrics', metricsRoutes);
|
||||
app.use('/api/usage', usageRoutes);
|
||||
app.use('/api/billing', require('./src/routes/billing'));
|
||||
app.use('/api/channels', require('./src/routes/polls'));
|
||||
app.use('/api', inboxRoutes); // /inbox/:id, /inbox/:id/reply, /tg-webhook/:channelId
|
||||
|
||||
app.get('/health', (req, res) => {
|
||||
res.json({ ok: true, service: 'zeropost-engine', time: new Date() });
|
||||
|
||||
@@ -0,0 +1,260 @@
|
||||
/**
|
||||
* inbox.js — входящие комментарии и сообщения из TG.
|
||||
*
|
||||
* Флоу:
|
||||
* 1. TG шлёт webhook на POST /api/tg-webhook/:channelId
|
||||
* 2. Сохраняем сообщение в inbox_messages
|
||||
* 3. Фоново AI классифицирует + предлагает ответ (Claude Haiku)
|
||||
* 4. UI показывает inbox с предложенными ответами
|
||||
*/
|
||||
const express = require('express');
|
||||
const router = express.Router();
|
||||
const { query } = require('../config/db');
|
||||
const channelsSvc = require('../services/channels');
|
||||
const settings = require('../services/settings');
|
||||
const axios = require('axios');
|
||||
const config = require('../config');
|
||||
|
||||
// ─────────────────────────────────────────────
|
||||
// Webhook от Telegram
|
||||
// ─────────────────────────────────────────────
|
||||
|
||||
// POST /api/tg-webhook/:channelId (публичный — без internal secret)
|
||||
router.post('/tg-webhook/:channelId', express.json(), async (req, res) => {
|
||||
res.json({ ok: true }); // TG требует быстрый ответ
|
||||
|
||||
const { channelId } = req.params;
|
||||
const update = req.body;
|
||||
|
||||
try {
|
||||
await processUpdate(channelId, update);
|
||||
} catch (err) {
|
||||
console.error(`[inbox] webhook processing error: ${err.message}`);
|
||||
}
|
||||
});
|
||||
|
||||
async function processUpdate(channelId, update) {
|
||||
// Интересуют только channel_post и message (ответы на посты)
|
||||
const msg = update.channel_post || update.message;
|
||||
if (!msg || !msg.text) return;
|
||||
|
||||
// Игнорируем сообщения от самого бота
|
||||
if (msg.from?.is_bot) return;
|
||||
|
||||
// Только ответы (комментарии), не оригинальные посты канала
|
||||
const isComment = !!msg.reply_to_message || update.message;
|
||||
if (!isComment) return;
|
||||
|
||||
const { rows: [channel] } = await query(
|
||||
'SELECT * FROM channels WHERE id=$1', [channelId]
|
||||
);
|
||||
if (!channel || !channel.tg_webhook_enabled) return;
|
||||
|
||||
const msgId = String(msg.message_id);
|
||||
const fromUser = msg.from || {};
|
||||
|
||||
// Дедупликация
|
||||
const existing = await query(
|
||||
'SELECT id FROM inbox_messages WHERE channel_id=$1 AND external_msg_id=$2',
|
||||
[channelId, msgId]
|
||||
);
|
||||
if (existing.rows.length) return;
|
||||
|
||||
// Сохраняем
|
||||
const { rows: [saved] } = await query(`
|
||||
INSERT INTO inbox_messages
|
||||
(channel_id, platform, external_msg_id, from_user_id, from_username, from_name,
|
||||
reply_to_msg_id, text, raw)
|
||||
VALUES ($1,'telegram',$2,$3,$4,$5,$6,$7,$8)
|
||||
RETURNING id
|
||||
`, [
|
||||
channelId, msgId,
|
||||
String(fromUser.id || ''),
|
||||
fromUser.username || null,
|
||||
[fromUser.first_name, fromUser.last_name].filter(Boolean).join(' ') || null,
|
||||
msg.reply_to_message ? String(msg.reply_to_message.message_id) : null,
|
||||
msg.text,
|
||||
JSON.stringify(update),
|
||||
]);
|
||||
|
||||
// Фоновая AI обработка
|
||||
classifyAndSuggest(saved.id, channel, msg.text).catch(e =>
|
||||
console.error(`[inbox] AI classify error msg=${saved.id}: ${e.message}`)
|
||||
);
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────
|
||||
// AI классификация + предложенный ответ
|
||||
// ─────────────────────────────────────────────
|
||||
|
||||
const TYPE_LABELS = {
|
||||
question: '❓ Вопрос',
|
||||
praise: '👍 Похвала',
|
||||
complaint: '😤 Жалоба/критика',
|
||||
spam: '🚫 Спам',
|
||||
other: '💬 Другое',
|
||||
};
|
||||
|
||||
async function classifyAndSuggest(msgId, channel, text) {
|
||||
const niche = channel.niche || '';
|
||||
const systemPrompt = `Ты ассистент SMM-менеджера канала "${channel.name || 'канал'}" (ниша: ${niche || 'общая'}).
|
||||
Проанализируй комментарий к посту и верни JSON с двумя полями:
|
||||
- type: одно из ["question","praise","complaint","spam","other"]
|
||||
- reply: предлагаемый ответ (2-3 предложения, дружелюбно, от имени автора канала). Если spam — reply: null.
|
||||
|
||||
Отвечай ТОЛЬКО JSON, без markdown.`;
|
||||
|
||||
try {
|
||||
const ai = require('../services/ai');
|
||||
const result = await ai.chat(
|
||||
config.ai.models.topics || 'claude-haiku-4-5-20251001',
|
||||
systemPrompt,
|
||||
`Комментарий: "${text.slice(0, 500)}"`,
|
||||
0.4, 400
|
||||
);
|
||||
const clean = result.replace(/```json|```/g, '').trim();
|
||||
const parsed = JSON.parse(clean);
|
||||
const type = ['question','praise','complaint','spam','other'].includes(parsed.type)
|
||||
? parsed.type : 'other';
|
||||
const reply = typeof parsed.reply === 'string' ? parsed.reply.trim() : null;
|
||||
|
||||
await query(`
|
||||
UPDATE inbox_messages
|
||||
SET ai_type=$1, ai_reply=$2, ai_processed_at=NOW(),
|
||||
status = CASE WHEN $1='spam' THEN 'spam' ELSE status END
|
||||
WHERE id=$3
|
||||
`, [type, reply, msgId]);
|
||||
} catch (err) {
|
||||
console.error(`[inbox] classify failed: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────
|
||||
// REST API для UI
|
||||
// ─────────────────────────────────────────────
|
||||
|
||||
function uid(req) { return req.headers['x-user-id'] ? parseInt(req.headers['x-user-id']) : null; }
|
||||
|
||||
// GET /api/inbox/:channelId — список сообщений
|
||||
router.get('/inbox/:channelId', async (req, res) => {
|
||||
const userId = uid(req);
|
||||
if (!userId) return res.status(401).json({ error: 'x-user-id required' });
|
||||
const { status = 'new', limit = 30, offset = 0 } = req.query;
|
||||
try {
|
||||
const { rows } = await query(`
|
||||
SELECT * FROM inbox_messages
|
||||
WHERE channel_id = $1 ${status !== 'all' ? 'AND status = $4' : ''}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
`, status !== 'all'
|
||||
? [req.params.channelId, limit, offset, status]
|
||||
: [req.params.channelId, limit, offset]
|
||||
);
|
||||
const { rows: [{ total }] } = await query(
|
||||
`SELECT count(*)::int as total FROM inbox_messages WHERE channel_id=$1 ${status !== 'all' ? 'AND status=$2' : ''}`,
|
||||
status !== 'all' ? [req.params.channelId, status] : [req.params.channelId]
|
||||
);
|
||||
res.json({ messages: rows, total, type_labels: TYPE_LABELS });
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// GET /api/inbox/stats/:channelId — статистика по типам
|
||||
router.get('/inbox/stats/:channelId', async (req, res) => {
|
||||
try {
|
||||
const { rows } = await query(`
|
||||
SELECT status, ai_type, count(*)::int as cnt
|
||||
FROM inbox_messages WHERE channel_id=$1
|
||||
GROUP BY status, ai_type
|
||||
`, [req.params.channelId]);
|
||||
res.json(rows);
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /api/inbox/:id/reply — отправить ответ в TG
|
||||
router.post('/inbox/:id/reply', async (req, res) => {
|
||||
const userId = uid(req);
|
||||
if (!userId) return res.status(401).json({ error: 'x-user-id required' });
|
||||
const { text } = req.body;
|
||||
if (!text?.trim()) return res.status(400).json({ error: 'text required' });
|
||||
|
||||
try {
|
||||
const { rows: [msg] } = await query(
|
||||
'SELECT im.*, c.bot_token, c.tg_channel_id FROM inbox_messages im JOIN channels c ON c.id=im.channel_id WHERE im.id=$1',
|
||||
[req.params.id]
|
||||
);
|
||||
if (!msg) return res.status(404).json({ error: 'Not found' });
|
||||
if (!msg.bot_token) return res.status(400).json({ error: 'Бот не настроен' });
|
||||
|
||||
const base = await settings.get('TELEGRAM_API_BASE', 'https://api.telegram.org');
|
||||
|
||||
// Отправляем ответ (reply на конкретное сообщение если есть chat_id пользователя)
|
||||
const raw = msg.raw || {};
|
||||
const chatId = raw.message?.chat?.id || raw.channel_post?.chat?.id || msg.tg_channel_id;
|
||||
|
||||
const tgRes = await axios.post(`${base}/bot${msg.bot_token}/sendMessage`, {
|
||||
chat_id: chatId,
|
||||
text: text.trim(),
|
||||
reply_to_message_id: parseInt(msg.external_msg_id) || undefined,
|
||||
}, { timeout: 15_000 });
|
||||
|
||||
if (!tgRes.data?.ok) throw new Error(tgRes.data?.description || 'TG error');
|
||||
|
||||
await query(`
|
||||
UPDATE inbox_messages
|
||||
SET status='replied', replied_text=$1, replied_at=NOW()
|
||||
WHERE id=$2
|
||||
`, [text.trim(), req.params.id]);
|
||||
|
||||
res.json({ ok: true });
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /api/inbox/:id/status — изменить статус (ignore, spam)
|
||||
router.post('/inbox/:id/status', async (req, res) => {
|
||||
const { status } = req.body;
|
||||
if (!['new','replied','ignored','spam'].includes(status))
|
||||
return res.status(400).json({ error: 'Invalid status' });
|
||||
try {
|
||||
await query('UPDATE inbox_messages SET status=$1 WHERE id=$2', [status, req.params.id]);
|
||||
res.json({ ok: true });
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /api/inbox/:channelId/setup-webhook — настроить TG webhook
|
||||
router.post('/inbox/:channelId/setup-webhook', async (req, res) => {
|
||||
const userId = uid(req);
|
||||
if (!userId) return res.status(401).json({ error: 'x-user-id required' });
|
||||
|
||||
try {
|
||||
const { rows: [channel] } = await query('SELECT * FROM channels WHERE id=$1', [req.params.channelId]);
|
||||
if (!channel?.bot_token) return res.status(400).json({ error: 'bot_token не настроен' });
|
||||
|
||||
const base = await settings.get('TELEGRAM_API_BASE', 'https://api.telegram.org');
|
||||
const engineBase = process.env.ENGINE_PUBLIC_URL || 'https://engine.zeropost.ru';
|
||||
const webhookUrl = `${engineBase}/api/tg-webhook/${channel.id}`;
|
||||
|
||||
const tgRes = await axios.post(`${base}/bot${channel.bot_token}/setWebhook`, {
|
||||
url: webhookUrl,
|
||||
allowed_updates: ['message', 'channel_post'],
|
||||
drop_pending_updates: false,
|
||||
}, { timeout: 10_000 });
|
||||
|
||||
if (!tgRes.data?.ok) throw new Error(tgRes.data?.description || 'TG error');
|
||||
|
||||
await query('UPDATE channels SET tg_webhook_enabled=true WHERE id=$1', [channel.id]);
|
||||
|
||||
res.json({ ok: true, webhook_url: webhookUrl });
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
Reference in New Issue
Block a user