Files
zeropost-engine/src/services/metricsCollector.js
T

163 lines
5.6 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* metricsCollector.js
* Воркер сбора метрик для опубликованных постов.
*
* Что умеет:
* - getMessageReactionCount (Bot API 7+) — реакции на пост
* - forwards — getForwardCount (Bot API 8+, если доступен)
* - views — в Bot API недоступны напрямую; оставляем 0 до MTProto
*
* Запуск: каждые 15 минут через setInterval (из index.js)
* или вручную: POST /api/metrics/collect
*/
const axios = require('axios');
const { query } = require('../config/db');
const COLLECT_WINDOW_DAYS = 30; // собираем метрики для постов за последние N дней
async function getTgApiBase() {
try {
const { rows } = await query(`SELECT value FROM app_settings WHERE key='TELEGRAM_API_BASE'`);
return rows[0]?.value?.replace(/\/$/, '') || 'https://api.telegram.org';
} catch { return 'https://api.telegram.org'; }
}
/**
* Собрать реакции для одного поста.
* Возвращает { reactions: {emoji: count, ...}, forwards: 0, views: 0 }
*/
async function collectForPost({ botToken, tgChannelId, tgMessageId, tgApiBase }) {
const result = { reactions: {}, forwards: 0, views: 0 };
if (!botToken || !tgChannelId || !tgMessageId) return result;
try {
const url = `${tgApiBase}/bot${botToken}/getMessageReactionCount`;
const res = await axios.get(url, {
params: { chat_id: tgChannelId, message_id: tgMessageId },
timeout: 8000,
});
if (res.data?.ok && Array.isArray(res.data.result?.reactions)) {
for (const r of res.data.result.reactions) {
const emoji = r.type?.emoji || r.type?.custom_emoji_id || '?';
result.reactions[emoji] = (result.reactions[emoji] || 0) + (r.count || 0);
}
}
} catch (e) {
// 400 = реакции не включены или пост не найден — не критично
if (e.response?.status !== 400) {
console.warn('[Metrics] getMessageReactionCount error:', e.message);
}
}
return result;
}
/**
* Основная функция сбора метрик.
* Проходит по posts (системные каналы) за последние COLLECT_WINDOW_DAYS дней.
*/
async function collectMetrics() {
const tgApiBase = await getTgApiBase();
const since = new Date(Date.now() - COLLECT_WINDOW_DAYS * 86400_000);
// Берём посты с tg_message_id за последние N дней
const { rows: posts } = await query(`
SELECT p.id, p.tg_message_id, p.channel_id, p.published_at,
c.bot_token, c.tg_channel_id
FROM posts p
JOIN channels c ON c.id = p.channel_id
WHERE p.tg_message_id IS NOT NULL
AND p.published_at > $1
AND c.platform = 'telegram'
AND c.bot_token IS NOT NULL
ORDER BY p.published_at DESC
LIMIT 100
`, [since]);
let updated = 0;
for (const post of posts) {
try {
const metrics = await collectForPost({
botToken: post.bot_token,
tgChannelId: post.tg_channel_id,
tgMessageId: post.tg_message_id,
tgApiBase,
});
const totalReactions = Object.values(metrics.reactions).reduce((s, v) => s + v, 0);
// Обновляем posts — последний снапшот
await query(`
UPDATE posts
SET reactions=$1, forwards=$2, metrics_at=NOW()
WHERE id=$3
`, [JSON.stringify(metrics.reactions), metrics.forwards, post.id]);
// Пишем в историю только если есть хоть что-то
if (totalReactions > 0 || metrics.forwards > 0) {
await query(`
INSERT INTO post_metrics (post_id, captured_at, views, forwards, reactions)
VALUES ($1, NOW(), $2, $3, $4)
`, [post.id, metrics.views, metrics.forwards, JSON.stringify(metrics.reactions)]);
}
updated++;
} catch (e) {
console.error('[Metrics] post', post.id, 'error:', e.message);
}
}
// user_posts — пользовательские посты с tg_message_id
const { rows: userPosts } = await query(`
SELECT up.id, up.tg_message_id, up.channel_id,
c.bot_token, c.tg_channel_id
FROM user_posts up
JOIN channels c ON c.id = up.channel_id
WHERE up.tg_message_id IS NOT NULL
AND up.published_at > $1
AND c.platform = 'telegram'
AND c.bot_token IS NOT NULL
ORDER BY up.published_at DESC
LIMIT 50
`, [since]);
for (const post of userPosts) {
try {
const metrics = await collectForPost({
botToken: post.bot_token,
tgChannelId: post.tg_channel_id,
tgMessageId: post.tg_message_id,
tgApiBase,
});
await query(`
UPDATE user_posts
SET reactions=$1, forwards=$2, metrics_at=NOW()
WHERE id=$3
`, [JSON.stringify(metrics.reactions), metrics.forwards, post.id]);
updated++;
} catch (e) {
console.error('[Metrics] user_post', post.id, 'error:', e.message);
}
}
console.log(`[Metrics] Collected for ${updated} posts`);
return { updated };
}
// Авто-запуск каждые 15 минут
let _timer = null;
function startAutoCollect() {
if (_timer) return;
_timer = setInterval(() => {
collectMetrics().catch(e => console.error('[Metrics] auto-collect error:', e.message));
}, 15 * 60 * 1000);
// Первый запуск через 30 секунд после старта
setTimeout(() => collectMetrics().catch(e => console.error('[Metrics] init error:', e.message)), 30_000);
console.log('[Metrics] Auto-collect started (every 15 min)');
}
module.exports = { collectMetrics, startAutoCollect };