mirror of
https://github.com/Spengreb/sync.git
synced 2026-06-09 23:02:05 +00:00
Gcalendar can sync as background task
This commit is contained in:
parent
efd5fe0465
commit
e917ea25eb
3 changed files with 131 additions and 40 deletions
|
|
@ -9,6 +9,8 @@ var Config = require("./config");
|
|||
var db = require("./database");
|
||||
var Promise = require("bluebird");
|
||||
const shows = require('./shows');
|
||||
const calendarDB = require('./database/calendar-integrations');
|
||||
const integrationsApi = require('./web/routes/api/integrations');
|
||||
|
||||
const LOGGER = require('@calzoneman/jsli')('bgtask');
|
||||
|
||||
|
|
@ -113,6 +115,58 @@ function initShowScheduler() {
|
|||
}, SCHEDULE_INTERVAL);
|
||||
}
|
||||
|
||||
function initCalendarAutoSyncScheduler() {
|
||||
const AUTO_SYNC_INTERVAL_MS = 60 * 1000;
|
||||
const AUTO_SYNC_PERIOD_MINUTES = 30;
|
||||
let running = false;
|
||||
|
||||
function inStaggerSlot(integrationId, now) {
|
||||
const slot = Number(integrationId || 0) % AUTO_SYNC_PERIOD_MINUTES;
|
||||
const minute = new Date(now).getUTCMinutes() % AUTO_SYNC_PERIOD_MINUTES;
|
||||
return slot === minute;
|
||||
}
|
||||
|
||||
setInterval(async () => {
|
||||
if (running) {
|
||||
return;
|
||||
}
|
||||
|
||||
running = true;
|
||||
try {
|
||||
const now = Date.now();
|
||||
const rows = await calendarDB.listConnectedByProvider('google');
|
||||
for (const integration of rows) {
|
||||
if (!inStaggerSlot(integration.id, now)) {
|
||||
continue;
|
||||
}
|
||||
if (integration.last_sync_at && now - integration.last_sync_at < AUTO_SYNC_PERIOD_MINUTES * 60 * 1000) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await integrationsApi.syncIntegrationNow({
|
||||
provider: 'google',
|
||||
integration,
|
||||
channelRow: { id: integration.channel_id },
|
||||
enforceCooldown: false
|
||||
});
|
||||
} catch (err) {
|
||||
LOGGER.warn(
|
||||
'Auto calendar sync failed integration=%s channel=%s: %s',
|
||||
integration.id,
|
||||
integration.channel_id,
|
||||
err && (err.stack || err.message) || err
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
LOGGER.error('Calendar auto-sync scheduler failure: %s', err.stack || err);
|
||||
} finally {
|
||||
running = false;
|
||||
}
|
||||
}, AUTO_SYNC_INTERVAL_MS);
|
||||
}
|
||||
|
||||
module.exports = function (Server) {
|
||||
if (init === Server) {
|
||||
LOGGER.warn("Attempted to re-init background tasks");
|
||||
|
|
@ -125,4 +179,5 @@ module.exports = function (Server) {
|
|||
initPasswordResetCleanup();
|
||||
initAccountCleanup();
|
||||
initShowScheduler();
|
||||
initCalendarAutoSyncScheduler();
|
||||
};
|
||||
|
|
|
|||
|
|
@ -39,6 +39,13 @@ async function listByChannel(channelId) {
|
|||
return rows.map(parseIntegrationRow);
|
||||
}
|
||||
|
||||
async function listConnectedByProvider(provider) {
|
||||
const rows = await knex()('channel_calendar_integrations')
|
||||
.where({ provider, status: 'connected' })
|
||||
.select();
|
||||
return rows.map(parseIntegrationRow);
|
||||
}
|
||||
|
||||
async function getByChannelProvider(channelId, provider) {
|
||||
const rows = await knex()('channel_calendar_integrations')
|
||||
.where({ channel_id: channelId, provider })
|
||||
|
|
@ -316,6 +323,7 @@ async function getGoogleLinksForShows(channelId, showIds) {
|
|||
}
|
||||
|
||||
module.exports = {
|
||||
listConnectedByProvider,
|
||||
listByChannel,
|
||||
getByChannelProvider,
|
||||
upsertGoogleIntegration,
|
||||
|
|
|
|||
|
|
@ -337,6 +337,57 @@ async function syncGoogleIntegration({ integration, channelRow }) {
|
|||
};
|
||||
}
|
||||
|
||||
async function syncIntegrationNow({ provider, integration, channelRow, enforceCooldown = true }) {
|
||||
const lockKey = `${channelRow.id}:${provider}`;
|
||||
const now = Date.now();
|
||||
const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 };
|
||||
if (current.inFlight) {
|
||||
const err = new Error('Sync already in progress for this channel');
|
||||
err.statusCode = 409;
|
||||
throw err;
|
||||
}
|
||||
if (enforceCooldown && (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS)) {
|
||||
const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt);
|
||||
const err = new Error('Sync cooldown active for this channel');
|
||||
err.statusCode = 429;
|
||||
err.retryAfterMs = retryAfterMs;
|
||||
throw err;
|
||||
}
|
||||
|
||||
CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now });
|
||||
try {
|
||||
let result;
|
||||
if (provider === 'google') {
|
||||
result = await runInGoogleQueue(() =>
|
||||
syncGoogleIntegration({ integration, channelRow })
|
||||
);
|
||||
} else {
|
||||
throw new Error('Unsupported provider');
|
||||
}
|
||||
|
||||
const patch = Object.assign({
|
||||
status: 'connected',
|
||||
last_sync_at: Date.now(),
|
||||
last_error: null
|
||||
}, result && result.tokenPatch ? result.tokenPatch : {});
|
||||
await calendarDB.updateIntegrationSyncResult(integration.id, patch);
|
||||
return { success: true, synced: result ? result.synced : 0 };
|
||||
} catch (err) {
|
||||
await calendarDB.updateIntegrationSyncResult(integration.id, {
|
||||
status: 'error',
|
||||
last_sync_at: Date.now(),
|
||||
last_error: err.message || 'Sync failed'
|
||||
});
|
||||
throw err;
|
||||
} finally {
|
||||
const st = CHANNEL_SYNC_STATE.get(lockKey);
|
||||
if (st) {
|
||||
st.inFlight = false;
|
||||
CHANNEL_SYNC_STATE.set(lockKey, st);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
router.get('/', async (req, res) => {
|
||||
const auth = await authorizeChannelAdmin(req, res, 3);
|
||||
if (!auth) return;
|
||||
|
|
@ -404,50 +455,27 @@ router.post('/:provider/sync-now', async (req, res) => {
|
|||
return res.status(404).json({ error: 'Connected integration not found' });
|
||||
}
|
||||
|
||||
const lockKey = `${auth.channelRow.id}:${provider}`;
|
||||
const now = Date.now();
|
||||
const current = CHANNEL_SYNC_STATE.get(lockKey) || { inFlight: false, lastRunAt: 0 };
|
||||
if (current.inFlight) {
|
||||
return res.status(409).json({ error: 'Sync already in progress for this channel' });
|
||||
}
|
||||
if (now - current.lastRunAt < CHANNEL_SYNC_COOLDOWN_MS) {
|
||||
const retryAfterMs = CHANNEL_SYNC_COOLDOWN_MS - (now - current.lastRunAt);
|
||||
return res.status(429).json({
|
||||
error: 'Sync cooldown active for this channel',
|
||||
retry_after_ms: retryAfterMs
|
||||
});
|
||||
}
|
||||
|
||||
CHANNEL_SYNC_STATE.set(lockKey, { inFlight: true, lastRunAt: now });
|
||||
|
||||
try {
|
||||
let result;
|
||||
if (provider === 'google') {
|
||||
result = await runInGoogleQueue(() =>
|
||||
syncGoogleIntegration({ integration, channelRow: auth.channelRow })
|
||||
);
|
||||
}
|
||||
const patch = Object.assign({
|
||||
status: 'connected',
|
||||
last_sync_at: Date.now(),
|
||||
last_error: null
|
||||
}, result && result.tokenPatch ? result.tokenPatch : {});
|
||||
await calendarDB.updateIntegrationSyncResult(integration.id, patch);
|
||||
res.json({ success: true, synced: result ? result.synced : 0 });
|
||||
} catch (err) {
|
||||
await calendarDB.updateIntegrationSyncResult(integration.id, {
|
||||
status: 'error',
|
||||
last_sync_at: Date.now(),
|
||||
last_error: err.message || 'Sync failed'
|
||||
const result = await syncIntegrationNow({
|
||||
provider,
|
||||
integration,
|
||||
channelRow: auth.channelRow,
|
||||
enforceCooldown: true
|
||||
});
|
||||
res.status(400).json({ error: err.message || 'Sync failed' });
|
||||
} finally {
|
||||
const st = CHANNEL_SYNC_STATE.get(lockKey);
|
||||
if (st) {
|
||||
st.inFlight = false;
|
||||
CHANNEL_SYNC_STATE.set(lockKey, st);
|
||||
res.json(result);
|
||||
} catch (err) {
|
||||
if (err.statusCode === 409) {
|
||||
return res.status(409).json({ error: err.message });
|
||||
}
|
||||
if (err.statusCode === 429) {
|
||||
return res.status(429).json({
|
||||
error: err.message,
|
||||
retry_after_ms: err.retryAfterMs || CHANNEL_SYNC_COOLDOWN_MS
|
||||
});
|
||||
}
|
||||
res.status(400).json({ error: err.message || 'Sync failed' });
|
||||
}
|
||||
});
|
||||
|
||||
router.syncIntegrationNow = syncIntegrationNow;
|
||||
module.exports = router;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue