diff --git a/src/database/calendar-integrations.js b/src/database/calendar-integrations.js index 0c3f1bdd..fc1d5b2c 100644 --- a/src/database/calendar-integrations.js +++ b/src/database/calendar-integrations.js @@ -114,6 +114,15 @@ async function updateIntegrationSyncResult(id, patch) { .update(update); } +async function updateIntegrationConfig(id, config) { + await knex()('channel_calendar_integrations') + .where({ id }) + .update({ + config_json: JSON.stringify(config || {}), + updated_at: Date.now() + }); +} + function parseExternalRow(row) { if (!row) return null; return { @@ -123,7 +132,8 @@ function parseExternalRow(row) { integration_id: row.integration_id, provider: row.provider, external_event_id: row.external_event_id, - external_etag: row.external_etag || null + external_etag: row.external_etag || null, + last_pushed_at: row.last_pushed_at || null }; } @@ -135,6 +145,13 @@ async function getExternalEvent(showId, integrationId) { return parseExternalRow(rows[0]); } +async function listExternalEventsForIntegration(channelId, integrationId) { + const rows = await knex()('channel_show_external_events') + .where({ channel_id: channelId, integration_id: integrationId, provider: 'google' }) + .select(); + return rows.map(parseExternalRow); +} + async function upsertExternalEvent({ channelId, showId, integrationId, provider, externalEventId, externalEtag }) { const now = Date.now(); const existing = await getExternalEvent(showId, integrationId); @@ -163,6 +180,83 @@ async function upsertExternalEvent({ channelId, showId, integrationId, provider, }); } +async function deleteExternalEvent(id, channelId, integrationId) { + await knex()('channel_show_external_events') + .where({ id, channel_id: channelId, integration_id: integrationId }) + .delete(); +} + +function parseGoogleIndexRow(row) { + if (!row) return null; + return { + id: row.id, + channel_id: row.channel_id, + integration_id: row.integration_id, + show_id: row.show_id || null, + external_event_id: row.external_event_id, + external_etag: row.external_etag || null, + start_at: row.start_at || null, + updated_remote_at: row.updated_remote_at || null, + last_seen_at: row.last_seen_at || null, + deleted_remote: !!row.deleted_remote, + created_at: row.created_at, + updated_at: row.updated_at + }; +} + +async function listGoogleEventIndex(integrationId) { + const rows = await knex()('channel_google_event_index') + .where({ integration_id: integrationId }) + .select(); + return rows.map(parseGoogleIndexRow); +} + +async function upsertGoogleEventIndexRow(row) { + const now = Date.now(); + const existing = await knex()('channel_google_event_index') + .where({ + integration_id: row.integration_id, + external_event_id: row.external_event_id + }) + .limit(1) + .select(); + + if (!existing || existing.length === 0) { + await knex()('channel_google_event_index').insert({ + channel_id: row.channel_id, + integration_id: row.integration_id, + show_id: row.show_id || null, + external_event_id: row.external_event_id, + external_etag: row.external_etag || null, + start_at: row.start_at || null, + updated_remote_at: row.updated_remote_at || null, + last_seen_at: row.last_seen_at || now, + deleted_remote: row.deleted_remote ? 1 : 0, + created_at: now, + updated_at: now + }); + return; + } + + await knex()('channel_google_event_index') + .where({ id: existing[0].id }) + .update({ + show_id: row.show_id || null, + external_etag: row.external_etag || null, + start_at: row.start_at || null, + updated_remote_at: row.updated_remote_at || null, + last_seen_at: row.last_seen_at || now, + deleted_remote: row.deleted_remote ? 1 : 0, + updated_at: now + }); +} + +async function deleteGoogleEventIndexRow(id, integrationId) { + await knex()('channel_google_event_index') + .where({ id, integration_id: integrationId }) + .delete(); +} + function buildGoogleCalendarUrl(calendarId) { return `https://calendar.google.com/calendar/u/0/r?cid=${encodeURIComponent(calendarId)}`; } @@ -227,7 +321,13 @@ module.exports = { upsertGoogleIntegration, disconnectIntegration, updateIntegrationSyncResult, + updateIntegrationConfig, getExternalEvent, + listExternalEventsForIntegration, upsertExternalEvent, + deleteExternalEvent, + listGoogleEventIndex, + upsertGoogleEventIndexRow, + deleteGoogleEventIndexRow, getGoogleLinksForShows }; diff --git a/src/database/tables.js b/src/database/tables.js index 62318384..4a0d8f29 100644 --- a/src/database/tables.js +++ b/src/database/tables.js @@ -270,4 +270,30 @@ export async function initTables() { t.unique(['show_id', 'integration_id'], 'channel_show_external_event_unique'); t.index(['integration_id', 'provider'], 'channel_show_external_event_integration_idx'); }); + + await ensureTable('channel_google_event_index', t => { + t.charset('utf8'); + t.increments('id').notNullable().primary(); + t.integer('channel_id') + .unsigned() + .notNullable() + .references('id').inTable('channels') + .onDelete('cascade'); + t.integer('integration_id') + .unsigned() + .notNullable() + .references('id').inTable('channel_calendar_integrations') + .onDelete('cascade'); + t.integer('show_id').unsigned().nullable(); + t.string('external_event_id', 255).notNullable(); + t.string('external_etag', 255).nullable(); + t.bigInteger('start_at').nullable(); + t.bigInteger('updated_remote_at').nullable(); + t.bigInteger('last_seen_at').nullable(); + t.boolean('deleted_remote').notNullable().defaultTo(false); + t.bigInteger('created_at').notNullable(); + t.bigInteger('updated_at').notNullable(); + t.unique(['integration_id', 'external_event_id'], 'channel_google_event_index_event_unique'); + t.index(['integration_id', 'show_id'], 'channel_google_event_index_show_idx'); + }); } diff --git a/src/database/update.js b/src/database/update.js index 7fb8dff7..a1aa59a5 100644 --- a/src/database/update.js +++ b/src/database/update.js @@ -3,7 +3,7 @@ import Promise from 'bluebird'; const LOGGER = require('@calzoneman/jsli')('database/update'); -const DB_VERSION = 16; +const DB_VERSION = 17; var hasUpdates = []; module.exports.checkVersion = function () { @@ -61,6 +61,8 @@ function update(version, cb) { addCalendarIntegrationAuditColumns(cb); } else if (version < 16) { addShowsEstimatedEndColumn(cb); + } else if (version < 17) { + addGoogleEventIndexTable(cb); } } @@ -274,3 +276,34 @@ function addShowsEstimatedEndColumn(cb) { } ); } + +function addGoogleEventIndexTable(cb) { + db.query( + "CREATE TABLE IF NOT EXISTS channel_google_event_index (" + + "id INT NOT NULL AUTO_INCREMENT PRIMARY KEY," + + "channel_id INT UNSIGNED NOT NULL," + + "integration_id INT UNSIGNED NOT NULL," + + "show_id INT UNSIGNED NULL," + + "external_event_id VARCHAR(255) NOT NULL," + + "external_etag VARCHAR(255) NULL," + + "start_at BIGINT NULL," + + "updated_remote_at BIGINT NULL," + + "last_seen_at BIGINT NULL," + + "deleted_remote TINYINT(1) NOT NULL DEFAULT 0," + + "created_at BIGINT NOT NULL," + + "updated_at BIGINT NOT NULL," + + "UNIQUE KEY channel_google_event_index_event_unique (integration_id, external_event_id)," + + "KEY channel_google_event_index_show_idx (integration_id, show_id)," + + "CONSTRAINT fk_google_event_index_channel FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE," + + "CONSTRAINT fk_google_event_index_integration FOREIGN KEY (integration_id) REFERENCES channel_calendar_integrations(id) ON DELETE CASCADE" + + ") CHARACTER SET utf8", + error => { + if (error) { + LOGGER.error(`Failed to create channel_google_event_index table: ${error}`); + cb(error); + return; + } + cb(); + } + ); +} diff --git a/src/integrations/google-calendar.js b/src/integrations/google-calendar.js index cd7e4b44..2e3144ae 100644 --- a/src/integrations/google-calendar.js +++ b/src/integrations/google-calendar.js @@ -111,7 +111,13 @@ function requestJson(method, baseUrl, path, body, headers = {}) { if (message && typeof message === 'object') { message = message.message || message.status || JSON.stringify(message); } - return reject(new Error(String(message))); + const err = new Error(String(message)); + err.statusCode = res.statusCode; + const retryAfter = parseInt(res.headers['retry-after'], 10); + if (!isNaN(retryAfter) && retryAfter > 0) { + err.retryAfterMs = retryAfter * 1000; + } + return reject(err); } resolve(parsed); }); @@ -149,7 +155,13 @@ function requestForm(baseUrl, path, formBody) { if (message && typeof message === 'object') { message = message.message || message.status || JSON.stringify(message); } - return reject(new Error(String(message))); + const err = new Error(String(message)); + err.statusCode = res.statusCode; + const retryAfter = parseInt(res.headers['retry-after'], 10); + if (!isNaN(retryAfter) && retryAfter > 0) { + err.retryAfterMs = retryAfter * 1000; + } + return reject(err); } resolve(parsed); }); @@ -218,6 +230,13 @@ async function upsertGoogleCalendarEvent(accessToken, calendarId, show) { dateTime: end.toISOString(), timeZone: show.timezone || 'UTC' }, + extendedProperties: { + private: { + source: 'veretube-sync', + show_id: String(show.id), + channel_id: String(show.channel_id || '') + } + }, colorId: null }; return requestJson( @@ -243,6 +262,13 @@ async function updateGoogleCalendarEvent(accessToken, calendarId, eventId, show) end: { dateTime: end.toISOString(), timeZone: show.timezone || 'UTC' + }, + extendedProperties: { + private: { + source: 'veretube-sync', + show_id: String(show.id), + channel_id: String(show.channel_id || '') + } } }; return requestJson( @@ -254,6 +280,38 @@ async function updateGoogleCalendarEvent(accessToken, calendarId, eventId, show) ); } +async function deleteGoogleCalendarEvent(accessToken, calendarId, eventId) { + return requestJson( + 'DELETE', + 'https://www.googleapis.com', + `/calendar/v3/calendars/${encodeURIComponent(calendarId)}/events/${encodeURIComponent(eventId)}`, + null, + { Authorization: `Bearer ${accessToken}` } + ); +} + +async function listGoogleCalendarEvents(accessToken, calendarId, opts = {}) { + const q = { + singleEvents: 'true', + showDeleted: 'true', + maxResults: String(opts.maxResults || 2500) + }; + if (opts.pageToken) { + q.pageToken = opts.pageToken; + } + if (opts.syncToken) { + q.syncToken = opts.syncToken; + } + const qs = querystring.stringify(q); + return requestJson( + 'GET', + 'https://www.googleapis.com', + `/calendar/v3/calendars/${encodeURIComponent(calendarId)}/events?${qs}`, + null, + { Authorization: `Bearer ${accessToken}` } + ); +} + module.exports = { isEnabled, assertConfigured, @@ -265,5 +323,7 @@ module.exports = { packTokens, unpackTokens, upsertGoogleCalendarEvent, - updateGoogleCalendarEvent + updateGoogleCalendarEvent, + deleteGoogleCalendarEvent, + listGoogleCalendarEvents }; diff --git a/src/web/routes/api/integrations.js b/src/web/routes/api/integrations.js index 00471399..3d664e29 100644 --- a/src/web/routes/api/integrations.js +++ b/src/web/routes/api/integrations.js @@ -10,6 +10,11 @@ const router = express.Router({ mergeParams: true }); const PROVIDERS = new Set(['google']); const SYNCABLE_STATUSES = new Set(['scheduled', 'running', 'paused', 'completed']); +const CHANNEL_SYNC_COOLDOWN_MS = 30 * 1000; +const GOOGLE_QUEUE_MIN_INTERVAL_MS = 250; +const CHANNEL_SYNC_STATE = new Map(); +let googleQueue = Promise.resolve(); +let nextGoogleRequestAt = 0; function sanitizeIntegration(integration) { return { @@ -72,6 +77,127 @@ async function ensureAccessToken(integration) { }; } +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +function shouldRetryGoogleError(err) { + if (!err) return false; + const code = Number(err.statusCode || 0); + if (code === 429 || code === 503) return true; + if (code === 403) { + const msg = String(err.message || '').toLowerCase(); + return msg.indexOf('quota') !== -1 || msg.indexOf('rate') !== -1; + } + return false; +} + +async function withGoogleBackoff(op, maxRetries = 4) { + let attempt = 0; + while (true) { + try { + return await op(); + } catch (err) { + if (!shouldRetryGoogleError(err) || attempt >= maxRetries) { + throw err; + } + const retryAfterMs = Number(err.retryAfterMs || 0); + const baseMs = Math.min(20000, 1000 * Math.pow(2, attempt)); + const jitterMs = Math.floor(Math.random() * 500); + await sleep(Math.max(retryAfterMs, baseMs + jitterMs)); + attempt++; + } + } +} + +function runInGoogleQueue(task) { + const runner = async () => { + const wait = nextGoogleRequestAt - Date.now(); + if (wait > 0) { + await sleep(wait); + } + try { + return await task(); + } finally { + nextGoogleRequestAt = Date.now() + GOOGLE_QUEUE_MIN_INTERVAL_MS; + } + }; + + const p = googleQueue.then(runner, runner); + googleQueue = p.catch(() => {}); + return p; +} + +function toMillis(input) { + if (!input) return null; + const ms = Date.parse(input); + return isNaN(ms) ? null : ms; +} + +async function refreshGoogleEventIndex({ integration, calendarId, accessToken }) { + const now = Date.now(); + const config = integration.config || {}; + let syncToken = config.google_sync_token || null; + let pageToken = null; + let nextSyncToken = syncToken; + + while (true) { + let resp; + try { + resp = await withGoogleBackoff(() => + googleCalendar.listGoogleCalendarEvents(accessToken, calendarId, { + syncToken, + pageToken + }) + ); + } catch (err) { + if (Number(err.statusCode || 0) === 410 && syncToken) { + syncToken = null; + pageToken = null; + nextSyncToken = null; + continue; + } + throw err; + } + + const items = Array.isArray(resp.items) ? resp.items : []; + for (const ev of items) { + const p = ev && ev.extendedProperties && ev.extendedProperties.private + ? ev.extendedProperties.private + : {}; + const source = String((p && p.source) || '').toLowerCase(); + const showId = p && p.show_id ? parseInt(p.show_id, 10) : null; + if (source !== 'veretube-sync' && (isNaN(showId) || !showId)) { + continue; + } + await calendarDB.upsertGoogleEventIndexRow({ + channel_id: integration.channel_id, + integration_id: integration.id, + show_id: isNaN(showId) ? null : showId, + external_event_id: ev.id, + external_etag: ev.etag || null, + start_at: ev.start && ev.start.dateTime ? toMillis(ev.start.dateTime) : null, + updated_remote_at: ev.updated ? toMillis(ev.updated) : null, + last_seen_at: now, + deleted_remote: ev.status === 'cancelled' + }); + } + + pageToken = resp.nextPageToken || null; + if (resp.nextSyncToken) { + nextSyncToken = resp.nextSyncToken; + } + if (!pageToken) { + break; + } + } + + const nextConfig = Object.assign({}, config, { + google_sync_token: nextSyncToken || null + }); + await calendarDB.updateIntegrationConfig(integration.id, nextConfig); +} + async function syncGoogleIntegration({ integration, channelRow }) { const config = integration.config || {}; const calendarId = config.calendar_id; @@ -85,19 +211,63 @@ async function syncGoogleIntegration({ integration, channelRow }) { const allShows = await showsDB.listShows(channelRow.id); const shows = allShows.filter(show => statuses.indexOf(show.status) >= 0); + const localById = new Map(); + shows.forEach(show => localById.set(show.id, show)); const tokenResult = await ensureAccessToken(integration); + await refreshGoogleEventIndex({ + integration, + calendarId, + accessToken: tokenResult.accessToken + }); + + const indexRows = await calendarDB.listGoogleEventIndex(integration.id); + const existingMappings = await calendarDB.listExternalEventsForIntegration(channelRow.id, integration.id); + const remoteByShowId = new Map(); + const staleRemote = []; + indexRows.forEach(row => { + if (row.deleted_remote) return; + if (row.show_id && localById.has(row.show_id)) { + remoteByShowId.set(row.show_id, row); + return; + } + staleRemote.push(row); + }); + + let syncedCount = 0; for (const show of shows) { - const mapping = await calendarDB.getExternalEvent(show.id, integration.id); + const remote = remoteByShowId.get(show.id) || null; + const mapping = existingMappings.find(m => m.show_id === show.id) || + await calendarDB.getExternalEvent(show.id, integration.id); + const needsUpdate = !remote || + !remote.external_etag || + Number(remote.updated_at || 0) < Number(show.updated_at || 0); let event; - if (!mapping) { - event = await googleCalendar.upsertGoogleCalendarEvent(tokenResult.accessToken, calendarId, show); + if (!remote) { + event = await withGoogleBackoff(() => + googleCalendar.upsertGoogleCalendarEvent(tokenResult.accessToken, calendarId, show) + ); } else { - event = await googleCalendar.updateGoogleCalendarEvent( - tokenResult.accessToken, - calendarId, - mapping.external_event_id, - show + if (!needsUpdate) { + if (!mapping) { + await calendarDB.upsertExternalEvent({ + channelId: channelRow.id, + showId: show.id, + integrationId: integration.id, + provider: 'google', + externalEventId: remote.external_event_id, + externalEtag: remote.external_etag || null + }); + } + continue; + } + event = await withGoogleBackoff(() => + googleCalendar.updateGoogleCalendarEvent( + tokenResult.accessToken, + calendarId, + remote.external_event_id, + show + ) ); } await calendarDB.upsertExternalEvent({ @@ -108,10 +278,61 @@ async function syncGoogleIntegration({ integration, channelRow }) { externalEventId: event.id, externalEtag: event.etag || null }); + await calendarDB.upsertGoogleEventIndexRow({ + channel_id: channelRow.id, + integration_id: integration.id, + show_id: show.id, + external_event_id: event.id, + external_etag: event.etag || null, + start_at: Number(show.scheduled_for || 0) || null, + updated_remote_at: Date.now(), + last_seen_at: Date.now(), + deleted_remote: false + }); + syncedCount++; + } + + for (const remote of staleRemote) { + if (remote.external_event_id) { + await withGoogleBackoff(() => + googleCalendar.deleteGoogleCalendarEvent( + tokenResult.accessToken, + calendarId, + remote.external_event_id + ) + ); + } + await calendarDB.deleteGoogleEventIndexRow(remote.id, integration.id); + if (remote.show_id) { + const mapped = await calendarDB.getExternalEvent(remote.show_id, integration.id); + if (mapped) { + await calendarDB.deleteExternalEvent(mapped.id, channelRow.id, integration.id); + } + } + } + + for (const mapping of existingMappings) { + if (localById.has(mapping.show_id)) { + continue; + } + const remote = staleRemote.find(r => r.show_id === mapping.show_id); + if (remote) { + continue; + } + if (mapping.external_event_id) { + await withGoogleBackoff(() => + googleCalendar.deleteGoogleCalendarEvent( + tokenResult.accessToken, + calendarId, + mapping.external_event_id + ) + ); + } + await calendarDB.deleteExternalEvent(mapping.id, channelRow.id, integration.id); } return { - synced: shows.length, + synced: syncedCount, tokenPatch: tokenResult.updatedPatch }; } @@ -183,10 +404,28 @@ router.post('/:provider/sync-now', async (req, res) => { return res.status(404).json({ error: 'Connected integration not found' }); } + const lockKey = `${auth.channelRow.id}:${provider}`; + const now = Date.now(); + const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 }; + if (current.inFlight) { + return res.status(409).json({ error: 'Sync already in progress for this channel' }); + } + if (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS) { + const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt); + return res.status(429).json({ + error: 'Sync cooldown active for this channel', + retry_after_ms: retryAfterMs + }); + } + + CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now }); + try { let result; if (provider === 'google') { - result = await syncGoogleIntegration({ integration, channelRow: auth.channelRow }); + result = await runInGoogleQueue(() => + syncGoogleIntegration({ integration, channelRow: auth.channelRow }) + ); } const patch = Object.assign({ status: 'connected', @@ -202,6 +441,12 @@ router.post('/:provider/sync-now', async (req, res) => { last_error: err.message || 'Sync failed' }); res.status(400).json({ error: err.message || 'Sync failed' }); + } finally { + const st = CHANNEL_SYNC_STATE.get(lockKey); + if (st) { + st.inFlight = false; + CHANNEL_SYNC_STATE.set(lockKey, st); + } } }); diff --git a/templates/channeloptions.pug b/templates/channeloptions.pug index 5422573b..2f63f23b 100644 --- a/templates/channeloptions.pug +++ b/templates/channeloptions.pug @@ -356,6 +356,7 @@ mixin integrations button#cs-int-google-connect.btn.btn-primary(type="button", style="margin-left:8px") Connect Google button#cs-int-google-sync.btn.btn-default(type="button", style="margin-left:8px") Sync Now button#cs-int-google-disconnect.btn.btn-danger(type="button", style="margin-left:8px") Disconnect + span#cs-int-google-sync-status.label.label-default(style="margin-left:8px") Idle hr table.table.table-striped.table-condensed thead diff --git a/www/js/ui.js b/www/js/ui.js index c9b95426..5d9bb9f3 100644 --- a/www/js/ui.js +++ b/www/js/ui.js @@ -2317,6 +2317,9 @@ var CSTShows = (function () { })(); var CSTIntegrations = (function () { + var syncInFlight = false; + var syncCooldownTimer = null; + function csrfField() { return (typeof CSRF_TOKEN === 'string' && CSRF_TOKEN.length > 0) ? CSRF_TOKEN : ''; } @@ -2337,10 +2340,49 @@ var CSTIntegrations = (function () { } } + function setSyncStatus(text, tone) { + var el = $('#cs-int-google-sync-status'); + if (!el.length) return; + el + .removeClass('label-default label-info label-success label-danger label-warning') + .addClass('label-' + (tone || 'default')) + .text(text || 'Idle'); + } + + function startSyncCooldown(ms) { + var remaining = Math.max(0, parseInt(ms, 10) || 0); + if (syncCooldownTimer) { + clearInterval(syncCooldownTimer); + syncCooldownTimer = null; + } + if (remaining <= 0) { + $('#cs-int-google-sync').prop('disabled', false); + return; + } + + $('#cs-int-google-sync').prop('disabled', true); + setSyncStatus('Cooldown ' + Math.ceil(remaining / 1000) + 's', 'warning'); + syncCooldownTimer = setInterval(function () { + remaining -= 1000; + if (remaining <= 0) { + clearInterval(syncCooldownTimer); + syncCooldownTimer = null; + if (!syncInFlight) { + $('#cs-int-google-sync').prop('disabled', false); + setSyncStatus('Connected', 'success'); + } + return; + } + setSyncStatus('Cooldown ' + Math.ceil(remaining / 1000) + 's', 'warning'); + }, 1000); + } + function render(rows) { var tbody = $('#cs-int-list').empty(); + var googleRow = null; if (!Array.isArray(rows) || rows.length === 0) { tbody.append('