mirror of
https://github.com/Spengreb/sync.git
synced 2026-06-09 14:52:05 +00:00
Gcalendar now can batch jobs and queue create and delete. Keeps a local record of gcalendar records.
This commit is contained in:
parent
6b40b9c500
commit
efd5fe0465
7 changed files with 561 additions and 17 deletions
|
|
@ -114,6 +114,15 @@ async function updateIntegrationSyncResult(id, patch) {
|
|||
.update(update);
|
||||
}
|
||||
|
||||
async function updateIntegrationConfig(id, config) {
|
||||
await knex()('channel_calendar_integrations')
|
||||
.where({ id })
|
||||
.update({
|
||||
config_json: JSON.stringify(config || {}),
|
||||
updated_at: Date.now()
|
||||
});
|
||||
}
|
||||
|
||||
function parseExternalRow(row) {
|
||||
if (!row) return null;
|
||||
return {
|
||||
|
|
@ -123,7 +132,8 @@ function parseExternalRow(row) {
|
|||
integration_id: row.integration_id,
|
||||
provider: row.provider,
|
||||
external_event_id: row.external_event_id,
|
||||
external_etag: row.external_etag || null
|
||||
external_etag: row.external_etag || null,
|
||||
last_pushed_at: row.last_pushed_at || null
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -135,6 +145,13 @@ async function getExternalEvent(showId, integrationId) {
|
|||
return parseExternalRow(rows[0]);
|
||||
}
|
||||
|
||||
async function listExternalEventsForIntegration(channelId, integrationId) {
|
||||
const rows = await knex()('channel_show_external_events')
|
||||
.where({ channel_id: channelId, integration_id: integrationId, provider: 'google' })
|
||||
.select();
|
||||
return rows.map(parseExternalRow);
|
||||
}
|
||||
|
||||
async function upsertExternalEvent({ channelId, showId, integrationId, provider, externalEventId, externalEtag }) {
|
||||
const now = Date.now();
|
||||
const existing = await getExternalEvent(showId, integrationId);
|
||||
|
|
@ -163,6 +180,83 @@ async function upsertExternalEvent({ channelId, showId, integrationId, provider,
|
|||
});
|
||||
}
|
||||
|
||||
async function deleteExternalEvent(id, channelId, integrationId) {
|
||||
await knex()('channel_show_external_events')
|
||||
.where({ id, channel_id: channelId, integration_id: integrationId })
|
||||
.delete();
|
||||
}
|
||||
|
||||
function parseGoogleIndexRow(row) {
|
||||
if (!row) return null;
|
||||
return {
|
||||
id: row.id,
|
||||
channel_id: row.channel_id,
|
||||
integration_id: row.integration_id,
|
||||
show_id: row.show_id || null,
|
||||
external_event_id: row.external_event_id,
|
||||
external_etag: row.external_etag || null,
|
||||
start_at: row.start_at || null,
|
||||
updated_remote_at: row.updated_remote_at || null,
|
||||
last_seen_at: row.last_seen_at || null,
|
||||
deleted_remote: !!row.deleted_remote,
|
||||
created_at: row.created_at,
|
||||
updated_at: row.updated_at
|
||||
};
|
||||
}
|
||||
|
||||
async function listGoogleEventIndex(integrationId) {
|
||||
const rows = await knex()('channel_google_event_index')
|
||||
.where({ integration_id: integrationId })
|
||||
.select();
|
||||
return rows.map(parseGoogleIndexRow);
|
||||
}
|
||||
|
||||
async function upsertGoogleEventIndexRow(row) {
|
||||
const now = Date.now();
|
||||
const existing = await knex()('channel_google_event_index')
|
||||
.where({
|
||||
integration_id: row.integration_id,
|
||||
external_event_id: row.external_event_id
|
||||
})
|
||||
.limit(1)
|
||||
.select();
|
||||
|
||||
if (!existing || existing.length === 0) {
|
||||
await knex()('channel_google_event_index').insert({
|
||||
channel_id: row.channel_id,
|
||||
integration_id: row.integration_id,
|
||||
show_id: row.show_id || null,
|
||||
external_event_id: row.external_event_id,
|
||||
external_etag: row.external_etag || null,
|
||||
start_at: row.start_at || null,
|
||||
updated_remote_at: row.updated_remote_at || null,
|
||||
last_seen_at: row.last_seen_at || now,
|
||||
deleted_remote: row.deleted_remote ? 1 : 0,
|
||||
created_at: now,
|
||||
updated_at: now
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await knex()('channel_google_event_index')
|
||||
.where({ id: existing[0].id })
|
||||
.update({
|
||||
show_id: row.show_id || null,
|
||||
external_etag: row.external_etag || null,
|
||||
start_at: row.start_at || null,
|
||||
updated_remote_at: row.updated_remote_at || null,
|
||||
last_seen_at: row.last_seen_at || now,
|
||||
deleted_remote: row.deleted_remote ? 1 : 0,
|
||||
updated_at: now
|
||||
});
|
||||
}
|
||||
|
||||
async function deleteGoogleEventIndexRow(id, integrationId) {
|
||||
await knex()('channel_google_event_index')
|
||||
.where({ id, integration_id: integrationId })
|
||||
.delete();
|
||||
}
|
||||
|
||||
function buildGoogleCalendarUrl(calendarId) {
|
||||
return `https://calendar.google.com/calendar/u/0/r?cid=${encodeURIComponent(calendarId)}`;
|
||||
}
|
||||
|
|
@ -227,7 +321,13 @@ module.exports = {
|
|||
upsertGoogleIntegration,
|
||||
disconnectIntegration,
|
||||
updateIntegrationSyncResult,
|
||||
updateIntegrationConfig,
|
||||
getExternalEvent,
|
||||
listExternalEventsForIntegration,
|
||||
upsertExternalEvent,
|
||||
deleteExternalEvent,
|
||||
listGoogleEventIndex,
|
||||
upsertGoogleEventIndexRow,
|
||||
deleteGoogleEventIndexRow,
|
||||
getGoogleLinksForShows
|
||||
};
|
||||
|
|
|
|||
|
|
@ -270,4 +270,30 @@ export async function initTables() {
|
|||
t.unique(['show_id', 'integration_id'], 'channel_show_external_event_unique');
|
||||
t.index(['integration_id', 'provider'], 'channel_show_external_event_integration_idx');
|
||||
});
|
||||
|
||||
await ensureTable('channel_google_event_index', t => {
|
||||
t.charset('utf8');
|
||||
t.increments('id').notNullable().primary();
|
||||
t.integer('channel_id')
|
||||
.unsigned()
|
||||
.notNullable()
|
||||
.references('id').inTable('channels')
|
||||
.onDelete('cascade');
|
||||
t.integer('integration_id')
|
||||
.unsigned()
|
||||
.notNullable()
|
||||
.references('id').inTable('channel_calendar_integrations')
|
||||
.onDelete('cascade');
|
||||
t.integer('show_id').unsigned().nullable();
|
||||
t.string('external_event_id', 255).notNullable();
|
||||
t.string('external_etag', 255).nullable();
|
||||
t.bigInteger('start_at').nullable();
|
||||
t.bigInteger('updated_remote_at').nullable();
|
||||
t.bigInteger('last_seen_at').nullable();
|
||||
t.boolean('deleted_remote').notNullable().defaultTo(false);
|
||||
t.bigInteger('created_at').notNullable();
|
||||
t.bigInteger('updated_at').notNullable();
|
||||
t.unique(['integration_id', 'external_event_id'], 'channel_google_event_index_event_unique');
|
||||
t.index(['integration_id', 'show_id'], 'channel_google_event_index_show_idx');
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import Promise from 'bluebird';
|
|||
|
||||
const LOGGER = require('@calzoneman/jsli')('database/update');
|
||||
|
||||
const DB_VERSION = 16;
|
||||
const DB_VERSION = 17;
|
||||
var hasUpdates = [];
|
||||
|
||||
module.exports.checkVersion = function () {
|
||||
|
|
@ -61,6 +61,8 @@ function update(version, cb) {
|
|||
addCalendarIntegrationAuditColumns(cb);
|
||||
} else if (version < 16) {
|
||||
addShowsEstimatedEndColumn(cb);
|
||||
} else if (version < 17) {
|
||||
addGoogleEventIndexTable(cb);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -274,3 +276,34 @@ function addShowsEstimatedEndColumn(cb) {
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
function addGoogleEventIndexTable(cb) {
|
||||
db.query(
|
||||
"CREATE TABLE IF NOT EXISTS channel_google_event_index (" +
|
||||
"id INT NOT NULL AUTO_INCREMENT PRIMARY KEY," +
|
||||
"channel_id INT UNSIGNED NOT NULL," +
|
||||
"integration_id INT UNSIGNED NOT NULL," +
|
||||
"show_id INT UNSIGNED NULL," +
|
||||
"external_event_id VARCHAR(255) NOT NULL," +
|
||||
"external_etag VARCHAR(255) NULL," +
|
||||
"start_at BIGINT NULL," +
|
||||
"updated_remote_at BIGINT NULL," +
|
||||
"last_seen_at BIGINT NULL," +
|
||||
"deleted_remote TINYINT(1) NOT NULL DEFAULT 0," +
|
||||
"created_at BIGINT NOT NULL," +
|
||||
"updated_at BIGINT NOT NULL," +
|
||||
"UNIQUE KEY channel_google_event_index_event_unique (integration_id, external_event_id)," +
|
||||
"KEY channel_google_event_index_show_idx (integration_id, show_id)," +
|
||||
"CONSTRAINT fk_google_event_index_channel FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE," +
|
||||
"CONSTRAINT fk_google_event_index_integration FOREIGN KEY (integration_id) REFERENCES channel_calendar_integrations(id) ON DELETE CASCADE" +
|
||||
") CHARACTER SET utf8",
|
||||
error => {
|
||||
if (error) {
|
||||
LOGGER.error(`Failed to create channel_google_event_index table: ${error}`);
|
||||
cb(error);
|
||||
return;
|
||||
}
|
||||
cb();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,7 +111,13 @@ function requestJson(method, baseUrl, path, body, headers = {}) {
|
|||
if (message && typeof message === 'object') {
|
||||
message = message.message || message.status || JSON.stringify(message);
|
||||
}
|
||||
return reject(new Error(String(message)));
|
||||
const err = new Error(String(message));
|
||||
err.statusCode = res.statusCode;
|
||||
const retryAfter = parseInt(res.headers['retry-after'], 10);
|
||||
if (!isNaN(retryAfter) && retryAfter > 0) {
|
||||
err.retryAfterMs = retryAfter * 1000;
|
||||
}
|
||||
return reject(err);
|
||||
}
|
||||
resolve(parsed);
|
||||
});
|
||||
|
|
@ -149,7 +155,13 @@ function requestForm(baseUrl, path, formBody) {
|
|||
if (message && typeof message === 'object') {
|
||||
message = message.message || message.status || JSON.stringify(message);
|
||||
}
|
||||
return reject(new Error(String(message)));
|
||||
const err = new Error(String(message));
|
||||
err.statusCode = res.statusCode;
|
||||
const retryAfter = parseInt(res.headers['retry-after'], 10);
|
||||
if (!isNaN(retryAfter) && retryAfter > 0) {
|
||||
err.retryAfterMs = retryAfter * 1000;
|
||||
}
|
||||
return reject(err);
|
||||
}
|
||||
resolve(parsed);
|
||||
});
|
||||
|
|
@ -218,6 +230,13 @@ async function upsertGoogleCalendarEvent(accessToken, calendarId, show) {
|
|||
dateTime: end.toISOString(),
|
||||
timeZone: show.timezone || 'UTC'
|
||||
},
|
||||
extendedProperties: {
|
||||
private: {
|
||||
source: 'veretube-sync',
|
||||
show_id: String(show.id),
|
||||
channel_id: String(show.channel_id || '')
|
||||
}
|
||||
},
|
||||
colorId: null
|
||||
};
|
||||
return requestJson(
|
||||
|
|
@ -243,6 +262,13 @@ async function updateGoogleCalendarEvent(accessToken, calendarId, eventId, show)
|
|||
end: {
|
||||
dateTime: end.toISOString(),
|
||||
timeZone: show.timezone || 'UTC'
|
||||
},
|
||||
extendedProperties: {
|
||||
private: {
|
||||
source: 'veretube-sync',
|
||||
show_id: String(show.id),
|
||||
channel_id: String(show.channel_id || '')
|
||||
}
|
||||
}
|
||||
};
|
||||
return requestJson(
|
||||
|
|
@ -254,6 +280,38 @@ async function updateGoogleCalendarEvent(accessToken, calendarId, eventId, show)
|
|||
);
|
||||
}
|
||||
|
||||
async function deleteGoogleCalendarEvent(accessToken, calendarId, eventId) {
|
||||
return requestJson(
|
||||
'DELETE',
|
||||
'https://www.googleapis.com',
|
||||
`/calendar/v3/calendars/${encodeURIComponent(calendarId)}/events/${encodeURIComponent(eventId)}`,
|
||||
null,
|
||||
{ Authorization: `Bearer ${accessToken}` }
|
||||
);
|
||||
}
|
||||
|
||||
async function listGoogleCalendarEvents(accessToken, calendarId, opts = {}) {
|
||||
const q = {
|
||||
singleEvents: 'true',
|
||||
showDeleted: 'true',
|
||||
maxResults: String(opts.maxResults || 2500)
|
||||
};
|
||||
if (opts.pageToken) {
|
||||
q.pageToken = opts.pageToken;
|
||||
}
|
||||
if (opts.syncToken) {
|
||||
q.syncToken = opts.syncToken;
|
||||
}
|
||||
const qs = querystring.stringify(q);
|
||||
return requestJson(
|
||||
'GET',
|
||||
'https://www.googleapis.com',
|
||||
`/calendar/v3/calendars/${encodeURIComponent(calendarId)}/events?${qs}`,
|
||||
null,
|
||||
{ Authorization: `Bearer ${accessToken}` }
|
||||
);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
isEnabled,
|
||||
assertConfigured,
|
||||
|
|
@ -265,5 +323,7 @@ module.exports = {
|
|||
packTokens,
|
||||
unpackTokens,
|
||||
upsertGoogleCalendarEvent,
|
||||
updateGoogleCalendarEvent
|
||||
updateGoogleCalendarEvent,
|
||||
deleteGoogleCalendarEvent,
|
||||
listGoogleCalendarEvents
|
||||
};
|
||||
|
|
|
|||
|
|
@ -10,6 +10,11 @@ const router = express.Router({ mergeParams: true });
|
|||
|
||||
const PROVIDERS = new Set(['google']);
|
||||
const SYNCABLE_STATUSES = new Set(['scheduled', 'running', 'paused', 'completed']);
|
||||
const CHANNEL_SYNC_COOLDOWN_MS = 30 * 1000;
|
||||
const GOOGLE_QUEUE_MIN_INTERVAL_MS = 250;
|
||||
const CHANNEL_SYNC_STATE = new Map();
|
||||
let googleQueue = Promise.resolve();
|
||||
let nextGoogleRequestAt = 0;
|
||||
|
||||
function sanitizeIntegration(integration) {
|
||||
return {
|
||||
|
|
@ -72,6 +77,127 @@ async function ensureAccessToken(integration) {
|
|||
};
|
||||
}
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function shouldRetryGoogleError(err) {
|
||||
if (!err) return false;
|
||||
const code = Number(err.statusCode || 0);
|
||||
if (code === 429 || code === 503) return true;
|
||||
if (code === 403) {
|
||||
const msg = String(err.message || '').toLowerCase();
|
||||
return msg.indexOf('quota') !== -1 || msg.indexOf('rate') !== -1;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async function withGoogleBackoff(op, maxRetries = 4) {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await op();
|
||||
} catch (err) {
|
||||
if (!shouldRetryGoogleError(err) || attempt >= maxRetries) {
|
||||
throw err;
|
||||
}
|
||||
const retryAfterMs = Number(err.retryAfterMs || 0);
|
||||
const baseMs = Math.min(20000, 1000 * Math.pow(2, attempt));
|
||||
const jitterMs = Math.floor(Math.random() * 500);
|
||||
await sleep(Math.max(retryAfterMs, baseMs + jitterMs));
|
||||
attempt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function runInGoogleQueue(task) {
|
||||
const runner = async () => {
|
||||
const wait = nextGoogleRequestAt - Date.now();
|
||||
if (wait > 0) {
|
||||
await sleep(wait);
|
||||
}
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
nextGoogleRequestAt = Date.now() + GOOGLE_QUEUE_MIN_INTERVAL_MS;
|
||||
}
|
||||
};
|
||||
|
||||
const p = googleQueue.then(runner, runner);
|
||||
googleQueue = p.catch(() => {});
|
||||
return p;
|
||||
}
|
||||
|
||||
function toMillis(input) {
|
||||
if (!input) return null;
|
||||
const ms = Date.parse(input);
|
||||
return isNaN(ms) ? null : ms;
|
||||
}
|
||||
|
||||
async function refreshGoogleEventIndex({ integration, calendarId, accessToken }) {
|
||||
const now = Date.now();
|
||||
const config = integration.config || {};
|
||||
let syncToken = config.google_sync_token || null;
|
||||
let pageToken = null;
|
||||
let nextSyncToken = syncToken;
|
||||
|
||||
while (true) {
|
||||
let resp;
|
||||
try {
|
||||
resp = await withGoogleBackoff(() =>
|
||||
googleCalendar.listGoogleCalendarEvents(accessToken, calendarId, {
|
||||
syncToken,
|
||||
pageToken
|
||||
})
|
||||
);
|
||||
} catch (err) {
|
||||
if (Number(err.statusCode || 0) === 410 && syncToken) {
|
||||
syncToken = null;
|
||||
pageToken = null;
|
||||
nextSyncToken = null;
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
const items = Array.isArray(resp.items) ? resp.items : [];
|
||||
for (const ev of items) {
|
||||
const p = ev && ev.extendedProperties && ev.extendedProperties.private
|
||||
? ev.extendedProperties.private
|
||||
: {};
|
||||
const source = String((p && p.source) || '').toLowerCase();
|
||||
const showId = p && p.show_id ? parseInt(p.show_id, 10) : null;
|
||||
if (source !== 'veretube-sync' && (isNaN(showId) || !showId)) {
|
||||
continue;
|
||||
}
|
||||
await calendarDB.upsertGoogleEventIndexRow({
|
||||
channel_id: integration.channel_id,
|
||||
integration_id: integration.id,
|
||||
show_id: isNaN(showId) ? null : showId,
|
||||
external_event_id: ev.id,
|
||||
external_etag: ev.etag || null,
|
||||
start_at: ev.start && ev.start.dateTime ? toMillis(ev.start.dateTime) : null,
|
||||
updated_remote_at: ev.updated ? toMillis(ev.updated) : null,
|
||||
last_seen_at: now,
|
||||
deleted_remote: ev.status === 'cancelled'
|
||||
});
|
||||
}
|
||||
|
||||
pageToken = resp.nextPageToken || null;
|
||||
if (resp.nextSyncToken) {
|
||||
nextSyncToken = resp.nextSyncToken;
|
||||
}
|
||||
if (!pageToken) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const nextConfig = Object.assign({}, config, {
|
||||
google_sync_token: nextSyncToken || null
|
||||
});
|
||||
await calendarDB.updateIntegrationConfig(integration.id, nextConfig);
|
||||
}
|
||||
|
||||
async function syncGoogleIntegration({ integration, channelRow }) {
|
||||
const config = integration.config || {};
|
||||
const calendarId = config.calendar_id;
|
||||
|
|
@ -85,19 +211,63 @@ async function syncGoogleIntegration({ integration, channelRow }) {
|
|||
|
||||
const allShows = await showsDB.listShows(channelRow.id);
|
||||
const shows = allShows.filter(show => statuses.indexOf(show.status) >= 0);
|
||||
const localById = new Map();
|
||||
shows.forEach(show => localById.set(show.id, show));
|
||||
const tokenResult = await ensureAccessToken(integration);
|
||||
await refreshGoogleEventIndex({
|
||||
integration,
|
||||
calendarId,
|
||||
accessToken: tokenResult.accessToken
|
||||
});
|
||||
|
||||
const indexRows = await calendarDB.listGoogleEventIndex(integration.id);
|
||||
const existingMappings = await calendarDB.listExternalEventsForIntegration(channelRow.id, integration.id);
|
||||
const remoteByShowId = new Map();
|
||||
const staleRemote = [];
|
||||
indexRows.forEach(row => {
|
||||
if (row.deleted_remote) return;
|
||||
if (row.show_id && localById.has(row.show_id)) {
|
||||
remoteByShowId.set(row.show_id, row);
|
||||
return;
|
||||
}
|
||||
staleRemote.push(row);
|
||||
});
|
||||
|
||||
let syncedCount = 0;
|
||||
|
||||
for (const show of shows) {
|
||||
const mapping = await calendarDB.getExternalEvent(show.id, integration.id);
|
||||
const remote = remoteByShowId.get(show.id) || null;
|
||||
const mapping = existingMappings.find(m => m.show_id === show.id) ||
|
||||
await calendarDB.getExternalEvent(show.id, integration.id);
|
||||
const needsUpdate = !remote ||
|
||||
!remote.external_etag ||
|
||||
Number(remote.updated_at || 0) < Number(show.updated_at || 0);
|
||||
let event;
|
||||
if (!mapping) {
|
||||
event = await googleCalendar.upsertGoogleCalendarEvent(tokenResult.accessToken, calendarId, show);
|
||||
if (!remote) {
|
||||
event = await withGoogleBackoff(() =>
|
||||
googleCalendar.upsertGoogleCalendarEvent(tokenResult.accessToken, calendarId, show)
|
||||
);
|
||||
} else {
|
||||
event = await googleCalendar.updateGoogleCalendarEvent(
|
||||
tokenResult.accessToken,
|
||||
calendarId,
|
||||
mapping.external_event_id,
|
||||
show
|
||||
if (!needsUpdate) {
|
||||
if (!mapping) {
|
||||
await calendarDB.upsertExternalEvent({
|
||||
channelId: channelRow.id,
|
||||
showId: show.id,
|
||||
integrationId: integration.id,
|
||||
provider: 'google',
|
||||
externalEventId: remote.external_event_id,
|
||||
externalEtag: remote.external_etag || null
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
event = await withGoogleBackoff(() =>
|
||||
googleCalendar.updateGoogleCalendarEvent(
|
||||
tokenResult.accessToken,
|
||||
calendarId,
|
||||
remote.external_event_id,
|
||||
show
|
||||
)
|
||||
);
|
||||
}
|
||||
await calendarDB.upsertExternalEvent({
|
||||
|
|
@ -108,10 +278,61 @@ async function syncGoogleIntegration({ integration, channelRow }) {
|
|||
externalEventId: event.id,
|
||||
externalEtag: event.etag || null
|
||||
});
|
||||
await calendarDB.upsertGoogleEventIndexRow({
|
||||
channel_id: channelRow.id,
|
||||
integration_id: integration.id,
|
||||
show_id: show.id,
|
||||
external_event_id: event.id,
|
||||
external_etag: event.etag || null,
|
||||
start_at: Number(show.scheduled_for || 0) || null,
|
||||
updated_remote_at: Date.now(),
|
||||
last_seen_at: Date.now(),
|
||||
deleted_remote: false
|
||||
});
|
||||
syncedCount++;
|
||||
}
|
||||
|
||||
for (const remote of staleRemote) {
|
||||
if (remote.external_event_id) {
|
||||
await withGoogleBackoff(() =>
|
||||
googleCalendar.deleteGoogleCalendarEvent(
|
||||
tokenResult.accessToken,
|
||||
calendarId,
|
||||
remote.external_event_id
|
||||
)
|
||||
);
|
||||
}
|
||||
await calendarDB.deleteGoogleEventIndexRow(remote.id, integration.id);
|
||||
if (remote.show_id) {
|
||||
const mapped = await calendarDB.getExternalEvent(remote.show_id, integration.id);
|
||||
if (mapped) {
|
||||
await calendarDB.deleteExternalEvent(mapped.id, channelRow.id, integration.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const mapping of existingMappings) {
|
||||
if (localById.has(mapping.show_id)) {
|
||||
continue;
|
||||
}
|
||||
const remote = staleRemote.find(r => r.show_id === mapping.show_id);
|
||||
if (remote) {
|
||||
continue;
|
||||
}
|
||||
if (mapping.external_event_id) {
|
||||
await withGoogleBackoff(() =>
|
||||
googleCalendar.deleteGoogleCalendarEvent(
|
||||
tokenResult.accessToken,
|
||||
calendarId,
|
||||
mapping.external_event_id
|
||||
)
|
||||
);
|
||||
}
|
||||
await calendarDB.deleteExternalEvent(mapping.id, channelRow.id, integration.id);
|
||||
}
|
||||
|
||||
return {
|
||||
synced: shows.length,
|
||||
synced: syncedCount,
|
||||
tokenPatch: tokenResult.updatedPatch
|
||||
};
|
||||
}
|
||||
|
|
@ -183,10 +404,28 @@ router.post('/:provider/sync-now', async (req, res) => {
|
|||
return res.status(404).json({ error: 'Connected integration not found' });
|
||||
}
|
||||
|
||||
const lockKey = `${auth.channelRow.id}:${provider}`;
|
||||
const now = Date.now();
|
||||
const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 };
|
||||
if (current.inFlight) {
|
||||
return res.status(409).json({ error: 'Sync already in progress for this channel' });
|
||||
}
|
||||
if (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS) {
|
||||
const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt);
|
||||
return res.status(429).json({
|
||||
error: 'Sync cooldown active for this channel',
|
||||
retry_after_ms: retryAfterMs
|
||||
});
|
||||
}
|
||||
|
||||
CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now });
|
||||
|
||||
try {
|
||||
let result;
|
||||
if (provider === 'google') {
|
||||
result = await syncGoogleIntegration({ integration, channelRow: auth.channelRow });
|
||||
result = await runInGoogleQueue(() =>
|
||||
syncGoogleIntegration({ integration, channelRow: auth.channelRow })
|
||||
);
|
||||
}
|
||||
const patch = Object.assign({
|
||||
status: 'connected',
|
||||
|
|
@ -202,6 +441,12 @@ router.post('/:provider/sync-now', async (req, res) => {
|
|||
last_error: err.message || 'Sync failed'
|
||||
});
|
||||
res.status(400).json({ error: err.message || 'Sync failed' });
|
||||
} finally {
|
||||
const st = CHANNEL_SYNC_STATE.get(lockKey);
|
||||
if (st) {
|
||||
st.inFlight = false;
|
||||
CHANNEL_SYNC_STATE.set(lockKey, st);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -356,6 +356,7 @@ mixin integrations
|
|||
button#cs-int-google-connect.btn.btn-primary(type="button", style="margin-left:8px") Connect Google
|
||||
button#cs-int-google-sync.btn.btn-default(type="button", style="margin-left:8px") Sync Now
|
||||
button#cs-int-google-disconnect.btn.btn-danger(type="button", style="margin-left:8px") Disconnect
|
||||
span#cs-int-google-sync-status.label.label-default(style="margin-left:8px") Idle
|
||||
hr
|
||||
table.table.table-striped.table-condensed
|
||||
thead
|
||||
|
|
|
|||
83
www/js/ui.js
83
www/js/ui.js
|
|
@ -2317,6 +2317,9 @@ var CSTShows = (function () {
|
|||
})();
|
||||
|
||||
var CSTIntegrations = (function () {
|
||||
var syncInFlight = false;
|
||||
var syncCooldownTimer = null;
|
||||
|
||||
function csrfField() {
|
||||
return (typeof CSRF_TOKEN === 'string' && CSRF_TOKEN.length > 0) ? CSRF_TOKEN : '';
|
||||
}
|
||||
|
|
@ -2337,10 +2340,49 @@ var CSTIntegrations = (function () {
|
|||
}
|
||||
}
|
||||
|
||||
function setSyncStatus(text, tone) {
|
||||
var el = $('#cs-int-google-sync-status');
|
||||
if (!el.length) return;
|
||||
el
|
||||
.removeClass('label-default label-info label-success label-danger label-warning')
|
||||
.addClass('label-' + (tone || 'default'))
|
||||
.text(text || 'Idle');
|
||||
}
|
||||
|
||||
function startSyncCooldown(ms) {
|
||||
var remaining = Math.max(0, parseInt(ms, 10) || 0);
|
||||
if (syncCooldownTimer) {
|
||||
clearInterval(syncCooldownTimer);
|
||||
syncCooldownTimer = null;
|
||||
}
|
||||
if (remaining <= 0) {
|
||||
$('#cs-int-google-sync').prop('disabled', false);
|
||||
return;
|
||||
}
|
||||
|
||||
$('#cs-int-google-sync').prop('disabled', true);
|
||||
setSyncStatus('Cooldown ' + Math.ceil(remaining / 1000) + 's', 'warning');
|
||||
syncCooldownTimer = setInterval(function () {
|
||||
remaining -= 1000;
|
||||
if (remaining <= 0) {
|
||||
clearInterval(syncCooldownTimer);
|
||||
syncCooldownTimer = null;
|
||||
if (!syncInFlight) {
|
||||
$('#cs-int-google-sync').prop('disabled', false);
|
||||
setSyncStatus('Connected', 'success');
|
||||
}
|
||||
return;
|
||||
}
|
||||
setSyncStatus('Cooldown ' + Math.ceil(remaining / 1000) + 's', 'warning');
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
function render(rows) {
|
||||
var tbody = $('#cs-int-list').empty();
|
||||
var googleRow = null;
|
||||
if (!Array.isArray(rows) || rows.length === 0) {
|
||||
tbody.append('<tr><td colspan="6" class="text-muted">No integrations connected</td></tr>');
|
||||
setSyncStatus('Not connected', 'default');
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -2356,9 +2398,20 @@ var CSTIntegrations = (function () {
|
|||
tbody.append(tr);
|
||||
|
||||
if (row.provider === 'google') {
|
||||
googleRow = row;
|
||||
$('#cs-int-google-calendar-id').val(calendarId || '');
|
||||
}
|
||||
});
|
||||
|
||||
if (!googleRow) {
|
||||
setSyncStatus('Not connected', 'default');
|
||||
} else if (googleRow.status === 'error' && googleRow.last_error) {
|
||||
setSyncStatus('Error: ' + googleRow.last_error, 'danger');
|
||||
} else if (googleRow.status === 'connected') {
|
||||
setSyncStatus('Connected', 'success');
|
||||
} else {
|
||||
setSyncStatus(String(googleRow.status || 'Idle'), 'default');
|
||||
}
|
||||
}
|
||||
|
||||
function load() {
|
||||
|
|
@ -2396,18 +2449,43 @@ var CSTIntegrations = (function () {
|
|||
}
|
||||
|
||||
function syncGoogleNow() {
|
||||
if (syncInFlight) {
|
||||
setSyncStatus('Already syncing...', 'warning');
|
||||
return;
|
||||
}
|
||||
|
||||
syncInFlight = true;
|
||||
$('#cs-int-google-sync').prop('disabled', true);
|
||||
setSyncStatus('Queued...', 'info');
|
||||
|
||||
$.ajax({
|
||||
url: apiBase() + '/google/sync-now',
|
||||
method: 'POST',
|
||||
contentType: 'application/json',
|
||||
data: JSON.stringify({ _csrf: csrfField() })
|
||||
}).done(function (data) {
|
||||
setSyncStatus('Syncing...', 'info');
|
||||
load();
|
||||
if (data && typeof data.synced === 'number') {
|
||||
alert('Synced ' + data.synced + ' shows to Google Calendar');
|
||||
setSyncStatus('Synced ' + data.synced + ' shows', 'success');
|
||||
} else {
|
||||
setSyncStatus('Sync completed', 'success');
|
||||
}
|
||||
}).fail(function (xhr) {
|
||||
alert('Sync failed: ' + formatError(xhr, 'Sync failed'));
|
||||
if (xhr && xhr.status === 409) {
|
||||
setSyncStatus('Already syncing on server', 'warning');
|
||||
return;
|
||||
}
|
||||
if (xhr && xhr.status === 429 && xhr.responseJSON && xhr.responseJSON.retry_after_ms) {
|
||||
startSyncCooldown(xhr.responseJSON.retry_after_ms);
|
||||
return;
|
||||
}
|
||||
setSyncStatus('Sync failed: ' + formatError(xhr, 'Sync failed'), 'danger');
|
||||
}).always(function () {
|
||||
syncInFlight = false;
|
||||
if (!syncCooldownTimer) {
|
||||
$('#cs-int-google-sync').prop('disabled', false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -2428,6 +2506,7 @@ var CSTIntegrations = (function () {
|
|||
$('#cs-int-google-connect').on('click', connectGoogle);
|
||||
$('#cs-int-google-sync').on('click', syncGoogleNow);
|
||||
$('#cs-int-google-disconnect').on('click', disconnectGoogle);
|
||||
setSyncStatus('Idle', 'default');
|
||||
|
||||
return { load: load };
|
||||
})();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue