Adds retry logic to Upload JSON (#684)

This commit is contained in:
Steve Faulkner 2021-04-16 13:23:03 -05:00 committed by GitHub
parent 649b6a93b4
commit a264ea2275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 11 deletions

View File

@ -22,9 +22,12 @@ export const bulkCreateDocument = async (
); );
const successCount = response.filter((r) => r.statusCode === 201).length; const successCount = response.filter((r) => r.statusCode === 201).length;
const throttledCount = response.filter((r) => r.statusCode === 429).length;
logConsoleInfo( logConsoleInfo(
`${documents.length} operations completed for container ${collection.id()}. ${successCount} operations succeeded` `${
documents.length
} operations completed for container ${collection.id()}. ${successCount} operations succeeded. ${throttledCount} operations throttled`
); );
return response; return response;
} catch (error) { } catch (error) {

View File

@ -18,6 +18,7 @@ import { UploadDetailsRecord } from "../../Contracts/ViewModels";
import { Action, ActionModifiers } from "../../Shared/Telemetry/TelemetryConstants"; import { Action, ActionModifiers } from "../../Shared/Telemetry/TelemetryConstants";
import * as TelemetryProcessor from "../../Shared/Telemetry/TelemetryProcessor"; import * as TelemetryProcessor from "../../Shared/Telemetry/TelemetryProcessor";
import { userContext } from "../../UserContext"; import { userContext } from "../../UserContext";
import { logConsoleInfo } from "../../Utils/NotificationConsoleUtils";
import Explorer from "../Explorer"; import Explorer from "../Explorer";
import { CassandraAPIDataClient, CassandraTableKey, CassandraTableKeys } from "../Tables/TableDataClient"; import { CassandraAPIDataClient, CassandraTableKey, CassandraTableKeys } from "../Tables/TableDataClient";
import ConflictsTab from "../Tabs/ConflictsTab"; import ConflictsTab from "../Tabs/ConflictsTab";
@ -1031,21 +1032,36 @@ export default class Collection implements ViewModels.Collection {
try { try {
const parsedContent = JSON.parse(documentContent); const parsedContent = JSON.parse(documentContent);
if (Array.isArray(parsedContent)) { if (Array.isArray(parsedContent)) {
const chunkSize = 50; // 100 is the max # of bulk operations the SDK currently accepts but usually results in throttles on 400RU collections const chunkSize = 100; // 100 is the max # of bulk operations the SDK currently accepts
const chunkedContent = Array.from({ length: Math.ceil(parsedContent.length / chunkSize) }, (_, index) => const chunkedContent = Array.from({ length: Math.ceil(parsedContent.length / chunkSize) }, (_, index) =>
parsedContent.slice(index * chunkSize, index * chunkSize + chunkSize) parsedContent.slice(index * chunkSize, index * chunkSize + chunkSize)
); );
for (const chunk of chunkedContent) { for (const chunk of chunkedContent) {
const responses = await bulkCreateDocument(this, chunk); let retryAttempts = 0;
for (const response of responses) { let chunkComplete = false;
let documentsToAttempt = chunk;
while (retryAttempts < 10 && !chunkComplete) {
const responses = await bulkCreateDocument(this, documentsToAttempt);
const attemptedDocuments = [...documentsToAttempt];
documentsToAttempt = [];
responses.forEach((response, index) => {
if (response.statusCode === 201) { if (response.statusCode === 201) {
record.numSucceeded++; record.numSucceeded++;
} else if (response.statusCode === 429) { } else if (response.statusCode === 429) {
record.numThrottled++; documentsToAttempt.push(attemptedDocuments[index]);
} else { } else {
record.numFailed++; record.numFailed++;
record.errors = [...record.errors, `${response.statusCode} ${response.resourceBody}`];
} }
});
if (documentsToAttempt.length === 0) {
chunkComplete = true;
break;
}
logConsoleInfo(
`${documentsToAttempt.length} document creations were throttled. Waiting ${retryAttempts} seconds and retrying throttled documents`
);
retryAttempts++;
await sleep(retryAttempts);
} }
} }
} else { } else {
@ -1145,3 +1161,7 @@ export default class Collection implements ViewModels.Collection {
} }
} }
} }
function sleep(seconds: number) {
return new Promise((resolve) => setTimeout(resolve, seconds * 1000));
}