diff --git a/src/bgtask.js b/src/bgtask.js index 1b37c48a..c1697a64 100644 --- a/src/bgtask.js +++ b/src/bgtask.js @@ -9,6 +9,8 @@ var Config = require("./config"); var db = require("./database"); var Promise = require("bluebird"); const shows = require('./shows'); +const calendarDB = require('./database/calendar-integrations'); +const integrationsApi = require('./web/routes/api/integrations'); const LOGGER = require('@calzoneman/jsli')('bgtask'); @@ -113,6 +115,58 @@ function initShowScheduler() { }, SCHEDULE_INTERVAL); } +function initCalendarAutoSyncScheduler() { + const AUTO_SYNC_INTERVAL_MS = 60 * 1000; + const AUTO_SYNC_PERIOD_MINUTES = 30; + let running = false; + + function inStaggerSlot(integrationId, now) { + const slot = Number(integrationId || 0) % AUTO_SYNC_PERIOD_MINUTES; + const minute = new Date(now).getUTCMinutes() % AUTO_SYNC_PERIOD_MINUTES; + return slot === minute; + } + + setInterval(async () => { + if (running) { + return; + } + + running = true; + try { + const now = Date.now(); + const rows = await calendarDB.listConnectedByProvider('google'); + for (const integration of rows) { + if (!inStaggerSlot(integration.id, now)) { + continue; + } + if (integration.last_sync_at && now - integration.last_sync_at < AUTO_SYNC_PERIOD_MINUTES * 60 * 1000) { + continue; + } + + try { + await integrationsApi.syncIntegrationNow({ + provider: 'google', + integration, + channelRow: { id: integration.channel_id }, + enforceCooldown: false + }); + } catch (err) { + LOGGER.warn( + 'Auto calendar sync failed integration=%s channel=%s: %s', + integration.id, + integration.channel_id, + err && (err.stack || err.message) || err + ); + } + } + } catch (err) { + LOGGER.error('Calendar auto-sync scheduler failure: %s', err.stack || err); + } finally { + running = false; + } + }, AUTO_SYNC_INTERVAL_MS); +} + module.exports = function (Server) { if (init === Server) { LOGGER.warn("Attempted to re-init background tasks"); @@ -125,4 +179,5 @@ module.exports = function (Server) { initPasswordResetCleanup(); initAccountCleanup(); initShowScheduler(); + initCalendarAutoSyncScheduler(); }; diff --git a/src/database/calendar-integrations.js b/src/database/calendar-integrations.js index fc1d5b2c..439708a3 100644 --- a/src/database/calendar-integrations.js +++ b/src/database/calendar-integrations.js @@ -39,6 +39,13 @@ async function listByChannel(channelId) { return rows.map(parseIntegrationRow); } +async function listConnectedByProvider(provider) { + const rows = await knex()('channel_calendar_integrations') + .where({ provider, status: 'connected' }) + .select(); + return rows.map(parseIntegrationRow); +} + async function getByChannelProvider(channelId, provider) { const rows = await knex()('channel_calendar_integrations') .where({ channel_id: channelId, provider }) @@ -316,6 +323,7 @@ async function getGoogleLinksForShows(channelId, showIds) { } module.exports = { + listConnectedByProvider, listByChannel, getByChannelProvider, upsertGoogleIntegration, diff --git a/src/web/routes/api/integrations.js b/src/web/routes/api/integrations.js index 3d664e29..c84f57f5 100644 --- a/src/web/routes/api/integrations.js +++ b/src/web/routes/api/integrations.js @@ -337,6 +337,57 @@ async function syncGoogleIntegration({ integration, channelRow }) { }; } +async function syncIntegrationNow({ provider, integration, channelRow, enforceCooldown = true }) { + const lockKey = `${channelRow.id}:${provider}`; + const now = Date.now(); + const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 }; + if (current.inFlight) { + const err = new Error('Sync already in progress for this channel'); + err.statusCode = 409; + throw err; + } + if (enforceCooldown && (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS)) { + const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt); + const err = new Error('Sync cooldown active for this channel'); + err.statusCode = 429; + err.retryAfterMs = retryAfterMs; + throw err; + } + + CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now }); + try { + let result; + if (provider === 'google') { + result = await runInGoogleQueue(() => + syncGoogleIntegration({ integration, channelRow }) + ); + } else { + throw new Error('Unsupported provider'); + } + + const patch = Object.assign({ + status: 'connected', + last_sync_at: Date.now(), + last_error: null + }, result && result.tokenPatch ? result.tokenPatch : {}); + await calendarDB.updateIntegrationSyncResult(integration.id, patch); + return { success: true, synced: result ? result.synced : 0 }; + } catch (err) { + await calendarDB.updateIntegrationSyncResult(integration.id, { + status: 'error', + last_sync_at: Date.now(), + last_error: err.message || 'Sync failed' + }); + throw err; + } finally { + const st = CHANNEL_SYNC_STATE.get(lockKey); + if (st) { + st.inFlight = false; + CHANNEL_SYNC_STATE.set(lockKey, st); + } + } +} + router.get('/', async (req, res) => { const auth = await authorizeChannelAdmin(req, res, 3); if (!auth) return; @@ -404,50 +455,27 @@ router.post('/:provider/sync-now', async (req, res) => { return res.status(404).json({ error: 'Connected integration not found' }); } - const lockKey = `${auth.channelRow.id}:${provider}`; - const now = Date.now(); - const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 }; - if (current.inFlight) { - return res.status(409).json({ error: 'Sync already in progress for this channel' }); - } - if (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS) { - const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt); - return res.status(429).json({ - error: 'Sync cooldown active for this channel', - retry_after_ms: retryAfterMs - }); - } - - CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now }); - try { - let result; - if (provider === 'google') { - result = await runInGoogleQueue(() => - syncGoogleIntegration({ integration, channelRow: auth.channelRow }) - ); - } - const patch = Object.assign({ - status: 'connected', - last_sync_at: Date.now(), - last_error: null - }, result && result.tokenPatch ? result.tokenPatch : {}); - await calendarDB.updateIntegrationSyncResult(integration.id, patch); - res.json({ success: true, synced: result ? result.synced : 0 }); - } catch (err) { - await calendarDB.updateIntegrationSyncResult(integration.id, { - status: 'error', - last_sync_at: Date.now(), - last_error: err.message || 'Sync failed' + const result = await syncIntegrationNow({ + provider, + integration, + channelRow: auth.channelRow, + enforceCooldown: true }); - res.status(400).json({ error: err.message || 'Sync failed' }); - } finally { - const st = CHANNEL_SYNC_STATE.get(lockKey); - if (st) { - st.inFlight = false; - CHANNEL_SYNC_STATE.set(lockKey, st); + res.json(result); + } catch (err) { + if (err.statusCode === 409) { + return res.status(409).json({ error: err.message }); } + if (err.statusCode === 429) { + return res.status(429).json({ + error: err.message, + retry_after_ms: err.retryAfterMs || CHANNEL_SYNC_COOLDOWN_MS + }); + } + res.status(400).json({ error: err.message || 'Sync failed' }); } }); +router.syncIntegrationNow = syncIntegrationNow; module.exports = router;