feat: P4 metrics collector + /api/metrics; P5 from-url generator (cheerio)
This commit is contained in:
@@ -0,0 +1,162 @@
|
||||
/**
|
||||
* metricsCollector.js
|
||||
* Воркер сбора метрик для опубликованных постов.
|
||||
*
|
||||
* Что умеет:
|
||||
* - getMessageReactionCount (Bot API 7+) — реакции на пост
|
||||
* - forwards — getForwardCount (Bot API 8+, если доступен)
|
||||
* - views — в Bot API недоступны напрямую; оставляем 0 до MTProto
|
||||
*
|
||||
* Запуск: каждые 15 минут через setInterval (из index.js)
|
||||
* или вручную: POST /api/metrics/collect
|
||||
*/
|
||||
|
||||
const axios = require('axios');
|
||||
const { query } = require('../config/db');
|
||||
|
||||
const COLLECT_WINDOW_DAYS = 30; // собираем метрики для постов за последние N дней
|
||||
|
||||
async function getTgApiBase() {
|
||||
try {
|
||||
const { rows } = await query(`SELECT value FROM app_settings WHERE key='TELEGRAM_API_BASE'`);
|
||||
return rows[0]?.value?.replace(/\/$/, '') || 'https://api.telegram.org';
|
||||
} catch { return 'https://api.telegram.org'; }
|
||||
}
|
||||
|
||||
/**
|
||||
* Собрать реакции для одного поста.
|
||||
* Возвращает { reactions: {emoji: count, ...}, forwards: 0, views: 0 }
|
||||
*/
|
||||
async function collectForPost({ botToken, tgChannelId, tgMessageId, tgApiBase }) {
|
||||
const result = { reactions: {}, forwards: 0, views: 0 };
|
||||
if (!botToken || !tgChannelId || !tgMessageId) return result;
|
||||
|
||||
try {
|
||||
const url = `${tgApiBase}/bot${botToken}/getMessageReactionCount`;
|
||||
const res = await axios.get(url, {
|
||||
params: { chat_id: tgChannelId, message_id: tgMessageId },
|
||||
timeout: 8000,
|
||||
});
|
||||
if (res.data?.ok && Array.isArray(res.data.result?.reactions)) {
|
||||
for (const r of res.data.result.reactions) {
|
||||
const emoji = r.type?.emoji || r.type?.custom_emoji_id || '?';
|
||||
result.reactions[emoji] = (result.reactions[emoji] || 0) + (r.count || 0);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// 400 = реакции не включены или пост не найден — не критично
|
||||
if (e.response?.status !== 400) {
|
||||
console.warn('[Metrics] getMessageReactionCount error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Основная функция сбора метрик.
|
||||
* Проходит по posts (системные каналы) за последние COLLECT_WINDOW_DAYS дней.
|
||||
*/
|
||||
async function collectMetrics() {
|
||||
const tgApiBase = await getTgApiBase();
|
||||
const since = new Date(Date.now() - COLLECT_WINDOW_DAYS * 86400_000);
|
||||
|
||||
// Берём посты с tg_message_id за последние N дней
|
||||
const { rows: posts } = await query(`
|
||||
SELECT p.id, p.tg_message_id, p.channel_id, p.published_at,
|
||||
c.bot_token, c.tg_channel_id
|
||||
FROM posts p
|
||||
JOIN channels c ON c.id = p.channel_id
|
||||
WHERE p.tg_message_id IS NOT NULL
|
||||
AND p.published_at > $1
|
||||
AND c.platform = 'telegram'
|
||||
AND c.bot_token IS NOT NULL
|
||||
ORDER BY p.published_at DESC
|
||||
LIMIT 100
|
||||
`, [since]);
|
||||
|
||||
let updated = 0;
|
||||
for (const post of posts) {
|
||||
try {
|
||||
const metrics = await collectForPost({
|
||||
botToken: post.bot_token,
|
||||
tgChannelId: post.tg_channel_id,
|
||||
tgMessageId: post.tg_message_id,
|
||||
tgApiBase,
|
||||
});
|
||||
|
||||
const totalReactions = Object.values(metrics.reactions).reduce((s, v) => s + v, 0);
|
||||
|
||||
// Обновляем posts — последний снапшот
|
||||
await query(`
|
||||
UPDATE posts
|
||||
SET reactions=$1, forwards=$2, metrics_at=NOW()
|
||||
WHERE id=$3
|
||||
`, [JSON.stringify(metrics.reactions), metrics.forwards, post.id]);
|
||||
|
||||
// Пишем в историю только если есть хоть что-то
|
||||
if (totalReactions > 0 || metrics.forwards > 0) {
|
||||
await query(`
|
||||
INSERT INTO post_metrics (post_id, captured_at, views, forwards, reactions)
|
||||
VALUES ($1, NOW(), $2, $3, $4)
|
||||
`, [post.id, metrics.views, metrics.forwards, JSON.stringify(metrics.reactions)]);
|
||||
}
|
||||
|
||||
updated++;
|
||||
} catch (e) {
|
||||
console.error('[Metrics] post', post.id, 'error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
// user_posts — пользовательские посты с tg_message_id
|
||||
const { rows: userPosts } = await query(`
|
||||
SELECT up.id, up.tg_message_id, up.channel_id,
|
||||
c.bot_token, c.tg_channel_id
|
||||
FROM user_posts up
|
||||
JOIN channels c ON c.id = up.channel_id
|
||||
WHERE up.tg_message_id IS NOT NULL
|
||||
AND up.published_at > $1
|
||||
AND c.platform = 'telegram'
|
||||
AND c.bot_token IS NOT NULL
|
||||
ORDER BY up.published_at DESC
|
||||
LIMIT 50
|
||||
`, [since]);
|
||||
|
||||
for (const post of userPosts) {
|
||||
try {
|
||||
const metrics = await collectForPost({
|
||||
botToken: post.bot_token,
|
||||
tgChannelId: post.tg_channel_id,
|
||||
tgMessageId: post.tg_message_id,
|
||||
tgApiBase,
|
||||
});
|
||||
|
||||
await query(`
|
||||
UPDATE user_posts
|
||||
SET reactions=$1, forwards=$2, metrics_at=NOW()
|
||||
WHERE id=$3
|
||||
`, [JSON.stringify(metrics.reactions), metrics.forwards, post.id]);
|
||||
|
||||
updated++;
|
||||
} catch (e) {
|
||||
console.error('[Metrics] user_post', post.id, 'error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[Metrics] Collected for ${updated} posts`);
|
||||
return { updated };
|
||||
}
|
||||
|
||||
// Авто-запуск каждые 15 минут
|
||||
let _timer = null;
|
||||
function startAutoCollect() {
|
||||
if (_timer) return;
|
||||
_timer = setInterval(() => {
|
||||
collectMetrics().catch(e => console.error('[Metrics] auto-collect error:', e.message));
|
||||
}, 15 * 60 * 1000);
|
||||
// Первый запуск через 30 секунд после старта
|
||||
setTimeout(() => collectMetrics().catch(e => console.error('[Metrics] init error:', e.message)), 30_000);
|
||||
console.log('[Metrics] Auto-collect started (every 15 min)');
|
||||
}
|
||||
|
||||
module.exports = { collectMetrics, startAutoCollect };
|
||||
Reference in New Issue
Block a user