From a264ea22756de1407c2a87b8b003bceeb2b6293c Mon Sep 17 00:00:00 2001 From: Steve Faulkner Date: Fri, 16 Apr 2021 13:23:03 -0500 Subject: [PATCH] Adds retry logic to Upload JSON (#684) --- src/Common/dataAccess/bulkCreateDocument.ts | 5 ++- src/Explorer/Tree/Collection.ts | 40 +++++++++++++++------ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/Common/dataAccess/bulkCreateDocument.ts b/src/Common/dataAccess/bulkCreateDocument.ts index 0eb0d4475..ec7ae7b8f 100644 --- a/src/Common/dataAccess/bulkCreateDocument.ts +++ b/src/Common/dataAccess/bulkCreateDocument.ts @@ -22,9 +22,12 @@ export const bulkCreateDocument = async ( ); const successCount = response.filter((r) => r.statusCode === 201).length; + const throttledCount = response.filter((r) => r.statusCode === 429).length; logConsoleInfo( - `${documents.length} operations completed for container ${collection.id()}. ${successCount} operations succeeded` + `${ + documents.length + } operations completed for container ${collection.id()}. ${successCount} operations succeeded. ${throttledCount} operations throttled` ); return response; } catch (error) { diff --git a/src/Explorer/Tree/Collection.ts b/src/Explorer/Tree/Collection.ts index 62670cf54..11ae8e3b4 100644 --- a/src/Explorer/Tree/Collection.ts +++ b/src/Explorer/Tree/Collection.ts @@ -18,6 +18,7 @@ import { UploadDetailsRecord } from "../../Contracts/ViewModels"; import { Action, ActionModifiers } from "../../Shared/Telemetry/TelemetryConstants"; import * as TelemetryProcessor from "../../Shared/Telemetry/TelemetryProcessor"; import { userContext } from "../../UserContext"; +import { logConsoleInfo } from "../../Utils/NotificationConsoleUtils"; import Explorer from "../Explorer"; import { CassandraAPIDataClient, CassandraTableKey, CassandraTableKeys } from "../Tables/TableDataClient"; import ConflictsTab from "../Tabs/ConflictsTab"; @@ -1031,21 +1032,36 @@ export default class Collection implements ViewModels.Collection { try { const parsedContent = JSON.parse(documentContent); if (Array.isArray(parsedContent)) { - const chunkSize = 50; // 100 is the max # of bulk operations the SDK currently accepts but usually results in throttles on 400RU collections + const chunkSize = 100; // 100 is the max # of bulk operations the SDK currently accepts const chunkedContent = Array.from({ length: Math.ceil(parsedContent.length / chunkSize) }, (_, index) => parsedContent.slice(index * chunkSize, index * chunkSize + chunkSize) ); for (const chunk of chunkedContent) { - const responses = await bulkCreateDocument(this, chunk); - for (const response of responses) { - if (response.statusCode === 201) { - record.numSucceeded++; - } else if (response.statusCode === 429) { - record.numThrottled++; - } else { - record.numFailed++; - record.errors = [...record.errors, `${response.statusCode} ${response.resourceBody}`]; + let retryAttempts = 0; + let chunkComplete = false; + let documentsToAttempt = chunk; + while (retryAttempts < 10 && !chunkComplete) { + const responses = await bulkCreateDocument(this, documentsToAttempt); + const attemptedDocuments = [...documentsToAttempt]; + documentsToAttempt = []; + responses.forEach((response, index) => { + if (response.statusCode === 201) { + record.numSucceeded++; + } else if (response.statusCode === 429) { + documentsToAttempt.push(attemptedDocuments[index]); + } else { + record.numFailed++; + } + }); + if (documentsToAttempt.length === 0) { + chunkComplete = true; + break; } + logConsoleInfo( + `${documentsToAttempt.length} document creations were throttled. Waiting ${retryAttempts} seconds and retrying throttled documents` + ); + retryAttempts++; + await sleep(retryAttempts); } } } else { @@ -1145,3 +1161,7 @@ export default class Collection implements ViewModels.Collection { } } } + +function sleep(seconds: number) { + return new Promise((resolve) => setTimeout(resolve, seconds * 1000)); +}