ExportEtherpad: Parallelize record reads

This commit is contained in:
Richard Hansen 2022-04-16 22:28:59 -04:00
parent 88c0ab8255
commit 6a183db850
2 changed files with 34 additions and 21 deletions

View File

@ -37,6 +37,8 @@
of database records for managing browser sessions.
* When copying a pad, the pad's records are copied in batches to avoid database
timeouts with large pads.
* Exporting a large pad to `.etherpad` format should be faster thanks to bulk
database record fetches.
#### For plugin authors

View File

@ -15,34 +15,45 @@
* limitations under the License.
*/
const Stream = require('./Stream');
const assert = require('assert').strict;
const authorManager = require('../db/AuthorManager');
const hooks = require('../../static/js/pluginfw/hooks');
const padManager = require('../db/PadManager');
exports.getPadRaw = async (padId, readOnlyId) => {
const pad = await padManager.getPad(padId);
const pfx = `pad:${readOnlyId || padId}`;
const data = {[pfx]: pad};
for (const authorId of pad.getAllAuthors()) {
const authorEntry = await authorManager.getAuthor(authorId);
if (!authorEntry) continue;
data[`globalAuthor:${authorId}`] = authorEntry;
if (!authorEntry.padIDs) continue;
authorEntry.padIDs = readOnlyId || padId;
}
for (let i = 0; i <= pad.head; ++i) data[`${pfx}:revs:${i}`] = await pad.getRevision(i);
for (let i = 0; i <= pad.chatHead; ++i) data[`${pfx}:chat:${i}`] = await pad.getChatMessage(i);
const prefixes = await hooks.aCallAll('exportEtherpadAdditionalContent');
await Promise.all(prefixes.map(async (prefix) => {
const srcPfx = `${prefix}:${padId}`;
const dstPfx = `${prefix}:${readOnlyId || padId}`;
data[dstPfx] = await pad.db.get(srcPfx);
const dstPfx = `pad:${readOnlyId || padId}`;
const [pad, customPrefixes] = await Promise.all([
padManager.getPad(padId),
hooks.aCallAll('exportEtherpadAdditionalContent'),
]);
const pluginRecords = await Promise.all(customPrefixes.map(async (customPrefix) => {
const srcPfx = `${customPrefix}:${padId}`;
const dstPfx = `${customPrefix}:${readOnlyId || padId}`;
assert(!srcPfx.includes('*'));
for (const k of await pad.db.findKeys(`${srcPfx}:*`, null)) {
assert(k.startsWith(`${srcPfx}:`));
data[`${dstPfx}:${k.slice(srcPfx.length + 1)}`] = await pad.db.get(k);
}
const srcKeys = await pad.db.findKeys(`${srcPfx}:*`, null);
return (function* () {
yield [dstPfx, pad.db.get(srcPfx)];
for (const k of srcKeys) {
assert(k.startsWith(`${srcPfx}:`));
yield [`${dstPfx}${k.slice(srcPfx.length)}`, pad.db.get(k)];
}
})();
}));
const records = (function* () {
for (const authorId of pad.getAllAuthors()) {
yield [`globalAuthor:${authorId}`, (async () => {
const authorEntry = await authorManager.getAuthor(authorId);
if (!authorEntry) return undefined; // Becomes unset when converted to JSON.
if (authorEntry.padIDs) authorEntry.padIDs = readOnlyId || padId;
return authorEntry;
})()];
}
for (let i = 0; i <= pad.head; ++i) yield [`${dstPfx}:revs:${i}`, pad.getRevision(i)];
for (let i = 0; i <= pad.chatHead; ++i) yield [`${dstPfx}:chat:${i}`, pad.getChatMessage(i)];
for (const gen of pluginRecords) yield* gen;
})();
const data = {[dstPfx]: pad};
for (const [dstKey, p] of new Stream(records).batch(100).buffer(99)) data[dstKey] = await p;
return data;
};