2015-09-26 14:21:42 -07:00
|
|
|
import Promise from 'bluebird';
|
|
|
|
|
import { ChannelStateSizeError,
|
2015-09-26 15:33:13 -07:00
|
|
|
ChannelNotFoundError } from '../errors';
|
2015-09-26 14:21:42 -07:00
|
|
|
import db from '../database';
|
|
|
|
|
import Logger from '../logger';
|
|
|
|
|
|
|
|
|
|
const SIZE_LIMIT = 1048576;
|
|
|
|
|
const QUERY_CHANNEL_ID_FOR_NAME = 'SELECT id FROM channels WHERE name = ?';
|
|
|
|
|
const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?';
|
|
|
|
|
const QUERY_UPDATE_CHANNEL_DATA =
|
|
|
|
|
'INSERT INTO channel_data VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `value` = ?';
|
|
|
|
|
|
|
|
|
|
function queryAsync(query, substitutions) {
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
|
db.query(query, substitutions, (err, res) => {
|
|
|
|
|
if (err) {
|
|
|
|
|
if (!(err instanceof Error)) {
|
|
|
|
|
err = new Error(err);
|
|
|
|
|
}
|
|
|
|
|
reject(err);
|
|
|
|
|
} else {
|
|
|
|
|
resolve(res);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-26 15:33:13 -07:00
|
|
|
function buildUpdateQuery(numEntries) {
|
|
|
|
|
const values = [];
|
|
|
|
|
for (let i = 0; i < numEntries; i++) {
|
|
|
|
|
values.push('(?, ?, ?)');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return `INSERT INTO channel_data VALUES ${values.join(', ')} ` +
|
|
|
|
|
'ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)';
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-26 14:21:42 -07:00
|
|
|
export class DatabaseStore {
|
|
|
|
|
load(channelName) {
|
|
|
|
|
return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => {
|
|
|
|
|
if (rows.length === 0) {
|
|
|
|
|
throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return queryAsync(QUERY_CHANNEL_DATA, [rows[0].id]);
|
|
|
|
|
}).then(rows => {
|
|
|
|
|
const data = {};
|
|
|
|
|
for (const row of rows) {
|
|
|
|
|
try {
|
|
|
|
|
data[row.key] = JSON.parse(row.value);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
Logger.errlog.log(`Channel data for channel "${channelName}", ` +
|
|
|
|
|
`key "${row.key}" is invalid: ${e}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return data;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
save(channelName, data) {
|
|
|
|
|
return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => {
|
|
|
|
|
if (rows.length === 0) {
|
|
|
|
|
throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let totalSize = 0;
|
2015-09-26 15:33:13 -07:00
|
|
|
let rowCount = 0;
|
2015-09-26 14:21:42 -07:00
|
|
|
const id = rows[0].id;
|
|
|
|
|
const substitutions = [];
|
|
|
|
|
for (const key of Object.keys(data)) {
|
2015-09-26 15:33:13 -07:00
|
|
|
rowCount++;
|
2015-09-26 14:21:42 -07:00
|
|
|
const value = JSON.stringify(data[key]);
|
|
|
|
|
totalSize += value.length;
|
2015-09-26 15:33:13 -07:00
|
|
|
substitutions.push(id);
|
|
|
|
|
substitutions.push(key);
|
|
|
|
|
substitutions.push(value);
|
2015-09-26 14:21:42 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (totalSize > SIZE_LIMIT) {
|
|
|
|
|
throw new ChannelStateSizeError('Channel state size is too large', {
|
|
|
|
|
limit: SIZE_LIMIT,
|
|
|
|
|
actual: totalSize
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-26 15:33:13 -07:00
|
|
|
return queryAsync(buildUpdateQuery(rowCount), substitutions);
|
2015-09-26 14:21:42 -07:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|