diff --git a/src/bgtask.js b/src/bgtask.js index 75564715..1b37c48a 100644 --- a/src/bgtask.js +++ b/src/bgtask.js @@ -8,6 +8,7 @@ var Config = require("./config"); var db = require("./database"); var Promise = require("bluebird"); +const shows = require('./shows'); const LOGGER = require('@calzoneman/jsli')('bgtask'); @@ -92,6 +93,26 @@ function initAccountCleanup() { }, 3600 * 1000); } +function initShowScheduler() { + var SCHEDULE_INTERVAL = 15 * 1000; + var running = false; + + setInterval(async () => { + if (running) { + return; + } + + running = true; + try { + await shows.pollAndRunDueShows(); + } catch (error) { + LOGGER.error('Show scheduler failure: %s', error.stack || error); + } finally { + running = false; + } + }, SCHEDULE_INTERVAL); +} + module.exports = function (Server) { if (init === Server) { LOGGER.warn("Attempted to re-init background tasks"); @@ -103,4 +124,5 @@ module.exports = function (Server) { initChannelDumper(Server); initPasswordResetCleanup(); initAccountCleanup(); + initShowScheduler(); }; diff --git a/src/database/shows.js b/src/database/shows.js new file mode 100644 index 00000000..7f0baaed --- /dev/null +++ b/src/database/shows.js @@ -0,0 +1,193 @@ +const db = require('../database'); + +function knex() { + return db.getDB().knex; +} + +function parseShowRow(row) { + if (!row) return null; + + let playlist = []; + let recurrenceMeta = null; + try { + playlist = JSON.parse(row.playlist || '[]'); + } catch (_err) { + playlist = []; + } + + try { + recurrenceMeta = row.recurrence_meta ? JSON.parse(row.recurrence_meta) : null; + } catch (_err) { + recurrenceMeta = null; + } + + return { + id: row.id, + channel_name: row.channel_name, + channel_id: row.channel_id, + name: row.name, + playlist, + timezone: row.timezone, + scheduled_for: row.scheduled_for, + next_run_at: row.next_run_at, + status: row.status, + recurrence: row.recurrence, + recurrence_meta: recurrenceMeta, + fill_mode: row.fill_mode, + conflict_mode: row.conflict_mode, + start_playback: !!row.start_playback, + run_count: row.run_count, + last_run_at: row.last_run_at, + created_at: row.created_at, + updated_at: row.updated_at, + created_by: row.created_by, + updated_by: row.updated_by, + last_error: row.last_error + }; +} + +function serializeShowInput(input) { + return { + name: input.name, + playlist: JSON.stringify(input.playlist || []), + timezone: input.timezone, + scheduled_for: input.scheduled_for, + next_run_at: input.next_run_at, + status: input.status, + recurrence: input.recurrence, + recurrence_meta: input.recurrence_meta ? JSON.stringify(input.recurrence_meta) : null, + fill_mode: input.fill_mode, + conflict_mode: input.conflict_mode, + start_playback: !!input.start_playback, + last_error: input.last_error || null, + updated_by: input.updated_by, + updated_at: Date.now() + }; +} + +async function listShows(channelId) { + const rows = await knex()('channel_shows') + .where({ channel_id: channelId }) + .orderBy('next_run_at', 'asc') + .orderBy('created_at', 'desc') + .select(); + return rows.map(parseShowRow); +} + +async function getShowById(id, channelId) { + const rows = await knex()('channel_shows') + .where({ id, channel_id: channelId }) + .select(); + return parseShowRow(rows[0]); +} + +async function createShow({ channelId, createdBy, input }) { + const now = Date.now(); + const row = serializeShowInput({ + ...input, + updated_by: createdBy, + last_error: null + }); + + const [id] = await knex()('channel_shows').insert({ + channel_id: channelId, + created_by: createdBy, + created_at: now, + run_count: 0, + ...row + }); + + return id; +} + +async function updateShow({ id, channelId, input }) { + const row = serializeShowInput(input); + await knex()('channel_shows') + .where({ id, channel_id: channelId }) + .update(row); +} + +async function deleteShow(id, channelId) { + await knex()('channel_shows') + .where({ id, channel_id: channelId }) + .delete(); +} + +async function updateShowStatus({ id, channelId, status, updatedBy, lastError = null }) { + await knex()('channel_shows') + .where({ id, channel_id: channelId }) + .update({ + status, + last_error: lastError, + updated_by: updatedBy, + updated_at: Date.now() + }); +} + +async function claimDueShows(limit = 20) { + const now = Date.now(); + const rows = await knex()('channel_shows') + .join('channels', 'channel_shows.channel_id', 'channels.id') + .where({ status: 'scheduled' }) + .andWhere('next_run_at', '<=', now) + .orderBy('channel_shows.next_run_at', 'asc') + .limit(limit) + .select('channel_shows.*', 'channels.name as channel_name'); + + const claimed = []; + for (const row of rows) { + const updated = await knex()('channel_shows') + .where({ id: row.id, status: 'scheduled' }) + .andWhere('next_run_at', '<=', now) + .update({ + status: 'running', + updated_at: Date.now(), + last_error: null + }); + + if (updated > 0) { + claimed.push(parseShowRow({ ...row, status: 'running' })); + } + } + + return claimed; +} + +async function completeRun({ id, recurrence, nextRunAt, updatedBy }) { + const patch = { + status: recurrence === 'none' ? 'completed' : 'scheduled', + next_run_at: recurrence === 'none' ? nextRunAt : nextRunAt, + run_count: knex().raw('run_count + 1'), + last_run_at: Date.now(), + updated_by: updatedBy, + updated_at: Date.now(), + last_error: null + }; + + await knex()('channel_shows') + .where({ id }) + .update(patch); +} + +async function failRun({ id, updatedBy, error }) { + await knex()('channel_shows') + .where({ id }) + .update({ + status: 'failed', + last_error: error, + updated_by: updatedBy, + updated_at: Date.now() + }); +} + +module.exports = { + listShows, + getShowById, + createShow, + updateShow, + deleteShow, + updateShowStatus, + claimDueShows, + completeRun, + failRun +}; diff --git a/src/database/tables.js b/src/database/tables.js index 54762251..f8515be3 100644 --- a/src/database/tables.js +++ b/src/database/tables.js @@ -175,6 +175,36 @@ export async function initTables() { t.index('channel_id'); }); + await ensureTable('channel_shows', t => { + t.charset('utf8'); + t.increments('id').notNullable().primary(); + t.integer('channel_id') + .unsigned() + .notNullable() + .references('id').inTable('channels') + .onDelete('cascade'); + t.string('name', 100).notNullable(); + t.specificType('playlist', 'mediumtext character set utf8mb4 not null'); + t.string('timezone', 64).notNullable().defaultTo('UTC'); + t.bigInteger('scheduled_for').notNullable(); + t.bigInteger('next_run_at').notNullable(); + t.string('status', 20).notNullable().defaultTo('draft'); + t.string('recurrence', 20).notNullable().defaultTo('none'); + t.specificType('recurrence_meta', 'text character set utf8mb4'); + t.string('fill_mode', 20).notNullable().defaultTo('append'); + t.string('conflict_mode', 20).notNullable().defaultTo('force'); + t.boolean('start_playback').notNullable().defaultTo(false); + t.integer('run_count').notNullable().defaultTo(0); + t.bigInteger('last_run_at').nullable(); + t.bigInteger('created_at').notNullable(); + t.bigInteger('updated_at').notNullable(); + t.string('created_by', 20).notNullable(); + t.string('updated_by', 20).notNullable(); + t.specificType('last_error', 'text character set utf8mb4'); + t.index(['channel_id', 'status', 'next_run_at'], 'channel_shows_due_idx'); + t.index(['channel_id', 'created_at'], 'channel_shows_channel_created_idx'); + }); + await ensureTable('banned_channels', t => { t.charset('utf8mb4'); t.string('channel_name', 30) diff --git a/src/shows.js b/src/shows.js new file mode 100644 index 00000000..03914b57 --- /dev/null +++ b/src/shows.js @@ -0,0 +1,119 @@ +const LOGGER = require('@calzoneman/jsli')('shows'); +const util = require('./utilities'); +const showDB = require('./database/shows'); +const Server = require('./server'); + +function makeSystemProxy(name) { + const rank = 5; + return { + effectiveRank: rank, + account: { effectiveRank: rank }, + getName: () => name, + getLowerName: () => name.toLowerCase(), + is: () => true, + isAnonymous: () => false, + queueLimiter: util.newRateLimiter(), + socket: { + emit: () => {} + } + }; +} + +function computeNextRunAt(show) { + const base = Number(show.next_run_at || show.scheduled_for || Date.now()); + if (show.recurrence === 'daily') { + return base + 24 * 60 * 60 * 1000; + } + + if (show.recurrence === 'weekly') { + return base + 7 * 24 * 60 * 60 * 1000; + } + + return base; +} + +function normalizePlaylist(rawPlaylist) { + if (!Array.isArray(rawPlaylist)) return []; + return rawPlaylist + .map(item => ({ + id: item && item.id ? String(item.id).trim() : '', + type: item && item.type ? String(item.type).trim() : '', + pos: item && item.pos === 'next' ? 'next' : 'end' + })) + .filter(item => item.id && item.type); +} + +function applyShowToChannel(chan, show) { + const playlist = normalizePlaylist(show.playlist); + if (playlist.length === 0) { + throw new Error('Show playlist is empty'); + } + + const plmod = chan.modules.playlist; + + if (show.conflict_mode === 'skip' && plmod.items.length > 0) { + throw new Error('Conflict mode is skip and playlist is not empty'); + } + + const actorName = '[show:' + show.id + ']'; + const proxy = makeSystemProxy(actorName); + + if (show.fill_mode === 'replace') { + plmod.handleClear(proxy); + } + + playlist.forEach((entry, idx) => { + plmod.handleQueue(proxy, { + id: entry.id, + type: entry.type, + pos: idx === 0 && entry.pos === 'next' ? 'next' : 'end' + }); + }); + + if (show.start_playback) { + const first = plmod.items.first; + if (first) { + plmod.handleJumpTo(proxy, first.uid); + } + } +} + +async function runShow(show) { + const server = Server.getServer(); + if (!server || !server.isChannelLoaded(show.channel_name)) { + throw new Error('Channel is not currently active'); + } + + const chan = server.getChannel(show.channel_name); + applyShowToChannel(chan, show); +} + +async function pollAndRunDueShows() { + const due = await showDB.claimDueShows(20); + for (const show of due) { + try { + await runShow(show); + const nextRun = computeNextRunAt(show); + await showDB.completeRun({ + id: show.id, + recurrence: show.recurrence, + nextRunAt: nextRun, + updatedBy: '[scheduler]' + }); + LOGGER.info('Executed show %s on channel %s', show.id, show.channel_name); + } catch (error) { + await showDB.failRun({ + id: show.id, + updatedBy: '[scheduler]', + error: error.message || 'Unknown execution error' + }); + LOGGER.error('Failed to execute show %s: %s', show.id, error.stack || error.message || error); + } + } +} + +module.exports = { + pollAndRunDueShows, + runShow, + computeNextRunAt +}; diff --git a/src/web/routes/api/index.js b/src/web/routes/api/index.js index f6c57d11..79d8aee4 100644 --- a/src/web/routes/api/index.js +++ b/src/web/routes/api/index.js @@ -6,6 +6,7 @@ router.use('/channels/:channel/bots', require('./bots')); router.use('/channels/:channel/emotes', require('./emotes')); router.use('/channels/:channel/playlist', require('./playlist')); router.use('/channels/:channel/settings', require('./settings')); +router.use('/channels/:channel/shows', require('./shows')); router.use('/channels/:channel', require('./moderation')); module.exports = router; diff --git a/src/web/routes/api/shows.js b/src/web/routes/api/shows.js new file mode 100644 index 00000000..55334acd --- /dev/null +++ b/src/web/routes/api/shows.js @@ -0,0 +1,262 @@ +const express = require('express'); +const webserver = require('../../webserver'); +const showDB = require('../../../database/shows'); +const shows = require('../../../shows'); +const { getChannelRow, getUserEffectiveRank } = require('./middleware'); + +const router = express.Router({ mergeParams: true }); + +const SHOW_STATUSES = new Set(['draft', 'scheduled', 'paused', 'running', 'completed', 'failed', 'canceled']); +const RECURRENCES = new Set(['none', 'daily', 'weekly']); +const FILL_MODES = new Set(['append', 'replace']); +const CONFLICT_MODES = new Set(['force', 'skip']); + +function sanitizePlaylist(list) { + if (!Array.isArray(list)) return []; + return list + .map(item => ({ + id: item && item.id ? String(item.id).trim() : '', + type: item && item.type ? String(item.type).trim() : '', + pos: item && item.pos === 'next' ? 'next' : 'end' + })) + .filter(item => item.id && item.type); +} + +function parseSchedule(input) { + const ms = Date.parse(input); + if (isNaN(ms)) return null; + return ms; +} + +function validateShowPayload(body, old = null) { + const name = (body.name || (old && old.name) || '').trim(); + if (!name || name.length > 100) { + return { error: 'Show name must be 1-100 characters' }; + } + + const playlist = body.playlist !== undefined ? sanitizePlaylist(body.playlist) : (old ? old.playlist : []); + if (!Array.isArray(playlist) || playlist.length === 0) { + return { error: 'Show playlist must contain at least one item' }; + } + + const timezone = String(body.timezone || (old && old.timezone) || 'UTC').trim(); + const scheduledInput = body.scheduled_for !== undefined ? body.scheduled_for : (old ? old.scheduled_for : null); + const scheduledFor = typeof scheduledInput === 'number' ? scheduledInput : parseSchedule(scheduledInput); + if (!scheduledFor) { + return { error: 'scheduled_for must be a valid date or timestamp' }; + } + + const recurrence = String(body.recurrence || (old && old.recurrence) || 'none'); + if (!RECURRENCES.has(recurrence)) { + return { error: 'Invalid recurrence' }; + } + + const fillMode = String(body.fill_mode || (old && old.fill_mode) || 'append'); + if (!FILL_MODES.has(fillMode)) { + return { error: 'Invalid fill_mode' }; + } + + const conflictMode = String(body.conflict_mode || (old && old.conflict_mode) || 'force'); + if (!CONFLICT_MODES.has(conflictMode)) { + return { error: 'Invalid conflict_mode' }; + } + + const startPlayback = body.start_playback !== undefined + ? !!body.start_playback + : old + ? !!old.start_playback + : false; + + let status = String(body.status || (old && old.status) || 'scheduled'); + if (!SHOW_STATUSES.has(status)) { + return { error: 'Invalid status' }; + } + + if (status === 'running') { + status = 'scheduled'; + } + + const nextRunAt = status === 'scheduled' ? scheduledFor : (old ? old.next_run_at : scheduledFor); + + return { + value: { + name, + playlist, + timezone, + scheduled_for: scheduledFor, + next_run_at: nextRunAt, + status, + recurrence, + recurrence_meta: null, + fill_mode: fillMode, + conflict_mode: conflictMode, + start_playback: startPlayback + } + }; +} + +async function authorizeChannel(req, res) { + const user = await webserver.authorize(req); + if (!user) { + res.status(401).json({ error: 'Unauthorized' }); + return null; + } + + let channelRow; + try { + channelRow = await getChannelRow(req.params.channel); + } catch (_err) { + res.status(404).json({ error: 'Channel not found' }); + return null; + } + + const rank = await getUserEffectiveRank(user, channelRow); + if (rank < 2) { + res.status(403).json({ error: 'Insufficient rank' }); + return null; + } + + return { user, channelRow, rank }; +} + +router.get('/', async (req, res) => { + const auth = await authorizeChannel(req, res); + if (!auth) return; + + const showsList = await showDB.listShows(auth.channelRow.id); + res.json(showsList); +}); + +router.post('/', async (req, res) => { + const auth = await authorizeChannel(req, res); + if (!auth) return; + + const validated = validateShowPayload(req.body || null); + if (validated.error) return res.status(400).json({ error: validated.error }); + + const id = await showDB.createShow({ + channelId: auth.channelRow.id, + createdBy: auth.user.name, + input: validated.value + }); + + const row = await showDB.getShowById(id, auth.channelRow.id); + res.status(201).json(row); +}); + +router.put('/:id', async (req, res) => { + const auth = await authorizeChannel(req, res); + if (!auth) return; + + const id = parseInt(req.params.id, 10); + if (isNaN(id)) return res.status(400).json({ error: 'Invalid show id' }); + + const current = await showDB.getShowById(id, auth.channelRow.id); + if (!current) return res.status(404).json({ error: 'Show not found' }); + + const validated = validateShowPayload(req.body || {}, current); + if (validated.error) return res.status(400).json({ error: validated.error }); + + await showDB.updateShow({ + id, + channelId: auth.channelRow.id, + input: { + ...validated.value, + updated_by: auth.user.name + } + }); + + const row = await showDB.getShowById(id, auth.channelRow.id); + res.json(row); +}); + +router.delete('/:id', async (req, res) => { + const auth = await authorizeChannel(req, res); + if (!auth) return; + + const id = parseInt(req.params.id, 10); + if (isNaN(id)) return res.status(400).json({ error: 'Invalid show id' }); + + const current = await showDB.getShowById(id, auth.channelRow.id); + if (!current) return res.status(404).json({ error: 'Show not found' }); + + await showDB.deleteShow(id, auth.channelRow.id); + res.json({ success: true }); +}); + +router.post('/:id/action', async (req, res) => { + const auth = await authorizeChannel(req, res); + if (!auth) return; + + const id = parseInt(req.params.id, 10); + if (isNaN(id)) return res.status(400).json({ error: 'Invalid show id' }); + + const show = await showDB.getShowById(id, auth.channelRow.id); + if (!show) return res.status(404).json({ error: 'Show not found' }); + + const action = String((req.body && req.body.action) || '').toLowerCase(); + if (!action) return res.status(400).json({ error: 'action is required' }); + + if (action === 'pause') { + await showDB.updateShowStatus({ + id, + channelId: auth.channelRow.id, + status: 'paused', + updatedBy: auth.user.name + }); + } else if (action === 'resume') { + await showDB.updateShow({ + id, + channelId: auth.channelRow.id, + input: { + ...show, + status: 'scheduled', + next_run_at: Date.now(), + updated_by: auth.user.name + } + }); + } else if (action === 'cancel') { + await showDB.updateShowStatus({ + id, + channelId: auth.channelRow.id, + status: 'canceled', + updatedBy: auth.user.name + }); + } else if (action === 'run') { + try { + const forced = { + ...show, + channel_name: auth.channelRow.name, + id + }; + await shows.runShow(forced); + const nextRun = shows.computeNextRunAt(forced); + await showDB.completeRun({ + id, + recurrence: show.recurrence, + nextRunAt: nextRun, + updatedBy: auth.user.name + }); + } catch (error) { + return res.status(400).json({ error: error.message || 'Failed to execute show' }); + } + } else if (action === 'schedule') { + await showDB.updateShow({ + id, + channelId: auth.channelRow.id, + input: { + ...show, + status: 'scheduled', + next_run_at: show.scheduled_for, + updated_by: auth.user.name + } + }); + } else { + return res.status(400).json({ error: 'Unknown action' }); + } + + const row = await showDB.getShowById(id, auth.channelRow.id); + res.json(row); +}); + +module.exports = router; diff --git a/templates/channel.pug b/templates/channel.pug index 326e323b..9e9b2817 100644 --- a/templates/channel.pug +++ b/templates/channel.pug @@ -222,7 +222,8 @@ html(lang="en") li: a(href="#cs-permedit", data-toggle="tab", tabindex="-1") Permissions li: a(href="#cs-chanranks", data-toggle="tab", tabindex="-1", onclick="javascript:socket.emit('requestChannelRanks')") Moderators li: a(href="#cs-banlist", data-toggle="tab", tabindex="-1", onclick="javascript:socket.emit('requestBanlist')") Ban list - li: a(href="#cs-bots", data-toggle="tab", tabindex="-1", onclick="javascript:CSTBots.load()") Bots + li: a(href="#cs-bots", data-toggle="tab", tabindex="-1", onclick="javascript:if(window.CSTBots&&CSTBots.load){CSTBots.load()}") Bots + li: a(href="#cs-shows", data-toggle="tab", tabindex="-1", onclick="javascript:if(window.CSTShows&&CSTShows.load){CSTShows.load()}") Shows li: a(href="#cs-chanlog", data-toggle="tab", onclick="javascript:socket.emit('readChanLog')") Log .modal-body .tab-content @@ -238,6 +239,7 @@ html(lang="en") +chatfilters() +emotes() +bots() + +shows() +chanlog() +permeditor() .modal-footer diff --git a/templates/channeloptions.pug b/templates/channeloptions.pug index 9919616c..61274272 100644 --- a/templates/channeloptions.pug +++ b/templates/channeloptions.pug @@ -253,5 +253,74 @@ mixin bots th Last connected tbody#cs-bots-list +mixin shows + #cs-shows.tab-pane + h4 Shows + p Create scheduled playlist runs. Each show stores playlist items and executes at the scheduled time. + form.form-horizontal(action="javascript:void(0)", role="form") + .form-group + label.control-label.col-sm-3(for="cs-shows-name") Name + .col-sm-9 + input#cs-shows-name.form-control(type="text", placeholder="Show name", maxlength="100") + .form-group + label.control-label.col-sm-3(for="cs-shows-scheduled-for") Scheduled For + .col-sm-9 + input#cs-shows-scheduled-for.form-control(type="datetime-local") + .form-group + label.control-label.col-sm-3(for="cs-shows-timezone") Timezone + .col-sm-9 + input#cs-shows-timezone.form-control(type="text", placeholder="e.g. Europe/Berlin or America/New_York") + .form-group + label.control-label.col-sm-3(for="cs-shows-recurrence") Recurrence + .col-sm-9 + select#cs-shows-recurrence.form-control + option(value="none") One-time + option(value="daily") Daily + option(value="weekly") Weekly + .form-group + label.control-label.col-sm-3(for="cs-shows-fill-mode") Fill Mode + .col-sm-9 + select#cs-shows-fill-mode.form-control + option(value="append") Append to queue + option(value="replace") Replace queue + .form-group + label.control-label.col-sm-3(for="cs-shows-conflict-mode") Conflict Mode + .col-sm-9 + select#cs-shows-conflict-mode.form-control + option(value="force") Force run + option(value="skip") Skip if queue not empty + .form-group + .col-sm-9.col-sm-offset-3 + .checkbox + label(for="cs-shows-start-playback") + input#cs-shows-start-playback(type="checkbox") + | Start playback immediately + .form-group + label.control-label.col-sm-3(for="cs-shows-mediaurl") Show Playlist + .col-sm-9 + .input-group(style="margin-bottom:8px") + input#cs-shows-mediaurl.form-control(type="text", placeholder="Media URL") + span.input-group-btn + button#cs-shows-add-next.btn.btn-default(type="button") Add Next + span.input-group-btn + button#cs-shows-add-end.btn.btn-default(type="button") Add End + ul#cs-shows-playlist-list.videolist(style="max-height:220px; overflow:auto") + p.text-muted.small(style="margin-top:6px") Build the show playlist like the main channel queue. Drag to reorder, use controls to remove/move. + .form-group + .col-sm-9.col-sm-offset-3 + button#cs-shows-create.btn.btn-primary Create Show + button#cs-shows-update.btn.btn-default(type="button") Update Selected + button#cs-shows-clear.btn.btn-default(type="button") Clear Form + table.table.table-striped.table-condensed(style="margin-top:12px") + thead + tr + th Name + th Status + th Next Run + th Timezone + th Recurrence + th Actions + tbody#cs-shows-list + mixin permeditor #cs-permedit.tab-pane diff --git a/www/js/ui.js b/www/js/ui.js index 744d9c17..070fde34 100644 --- a/www/js/ui.js +++ b/www/js/ui.js @@ -1173,3 +1173,308 @@ var CSTBots = (function () { return { load: load }; })(); + +var CSTShows = (function () { + var selectedId = null; + var draftPlaylist = []; + + function apiBase() { + return '/api/v1/channels/' + CHANNEL.name + '/shows'; + } + + function toLocalDateInput(ms) { + if (!ms) return ''; + var d = new Date(ms); + var pad = function (n) { return String(n).padStart(2, '0'); }; + return d.getFullYear() + '-' + + pad(d.getMonth() + 1) + '-' + + pad(d.getDate()) + 'T' + + pad(d.getHours()) + ':' + + pad(d.getMinutes()); + } + + function renderDraftPlaylist() { + var ul = $('#cs-shows-playlist-list').empty(); + if (!draftPlaylist.length) { + ul.append('