Files
cosmos-explorer/src/Common/dataAccess/dataTransfers.ts
T
2026-05-12 15:53:47 -07:00

272 lines
8.8 KiB
TypeScript

import { configContext } from "ConfigContext";
import { Keys, t } from "Localization";
import { ApiType, userContext } from "UserContext";
import * as NotificationConsoleUtils from "Utils/NotificationConsoleUtils";
import {
cancel,
complete,
create,
get,
listByDatabaseAccount,
pause,
resume,
} from "Utils/arm/generatedClients/dataTransferService/dataTransferJobs";
import {
CosmosCassandraDataTransferDataSourceSink,
CosmosMongoDataTransferDataSourceSink,
CosmosSqlDataTransferDataSourceSink,
CreateJobRequest,
DataTransferJobFeedResults,
DataTransferJobGetResults,
} from "Utils/arm/generatedClients/dataTransferService/types";
import { armRequest } from "Utils/arm/request";
import { addToPolling, removeFromPolling, updateDataTransferJob, useDataTransferJobs } from "hooks/useDataTransferJobs";
import promiseRetry, { AbortError, FailedAttemptError } from "p-retry";
export const DATA_TRANSFER_JOB_API_VERSION = "2025-05-01-preview";
export interface DataTransferParams {
jobName: string;
apiType: ApiType;
subscriptionId: string;
resourceGroupName: string;
accountName: string;
sourceDatabaseName: string;
sourceCollectionName: string;
targetDatabaseName: string;
targetCollectionName: string;
mode?: "Offline" | "Online";
}
export const getDataTransferJobs = async (
subscriptionId: string,
resourceGroup: string,
accountName: string,
signal?: AbortSignal,
): Promise<DataTransferJobGetResults[]> => {
let dataTransferJobs: DataTransferJobGetResults[] = [];
let dataTransferFeeds: DataTransferJobFeedResults = await listByDatabaseAccount(
subscriptionId,
resourceGroup,
accountName,
signal,
);
dataTransferJobs = [...dataTransferJobs, ...(dataTransferFeeds?.value || [])];
while (dataTransferFeeds?.nextLink) {
/**
* The `nextLink` URL returned by the Cosmos DB SQL API pointed to an incorrect endpoint, causing timeouts.
* (i.e: https://cdbmgmtprodby.documents.azure.com:450/subscriptions/{subId}/resourceGroups/{rg}/providers/Microsoft.DocumentDB/databaseAccounts/{account}/sql/dataTransferJobs?$top=100&$skiptoken=...)
* We manipulate the URL by parsing it to extract the path and query parameters,
* then construct the correct URL for the Azure Resource Manager (ARM) API.
* This ensures that the request is made to the correct base URL (`configContext.ARM_ENDPOINT`),
* which is required for ARM operations.
*/
const parsedUrl = new URL(dataTransferFeeds.nextLink);
const nextUrlPath = parsedUrl.pathname + parsedUrl.search;
dataTransferFeeds = await armRequest({
host: configContext.ARM_ENDPOINT,
path: nextUrlPath,
method: "GET",
apiVersion: DATA_TRANSFER_JOB_API_VERSION,
});
dataTransferJobs.push(...(dataTransferFeeds?.value || []));
}
return dataTransferJobs;
};
export const initiateDataTransfer = async (params: DataTransferParams): Promise<DataTransferJobGetResults> => {
const {
jobName,
apiType,
subscriptionId,
resourceGroupName,
accountName,
sourceDatabaseName,
sourceCollectionName,
targetDatabaseName,
targetCollectionName,
mode,
} = params;
const sourcePayload = createPayload(apiType, sourceDatabaseName, sourceCollectionName);
const targetPayload = createPayload(apiType, targetDatabaseName, targetCollectionName);
const body: CreateJobRequest = {
properties: {
source: sourcePayload,
destination: targetPayload,
...(mode ? { mode } : {}),
},
};
return create(subscriptionId, resourceGroupName, accountName, jobName, body);
};
export const pollDataTransferJob = async (
jobName: string,
subscriptionId: string,
resourceGroupName: string,
accountName: string,
): Promise<unknown> => {
const currentPollingJobs = useDataTransferJobs.getState().pollingDataTransferJobs;
if (currentPollingJobs.has(jobName)) {
return;
}
let clearMessage = NotificationConsoleUtils.logConsoleProgress(`Data transfer job ${jobName} in progress`);
return await promiseRetry(
() => pollDataTransferJobOperation(jobName, subscriptionId, resourceGroupName, accountName, clearMessage),
{
retries: 500,
maxTimeout: 5000,
onFailedAttempt: (error: FailedAttemptError) => {
clearMessage();
clearMessage = NotificationConsoleUtils.logConsoleProgress(error.message);
},
},
);
};
const pollDataTransferJobOperation = async (
jobName: string,
subscriptionId: string,
resourceGroupName: string,
accountName: string,
clearMessage?: () => void,
): Promise<DataTransferJobGetResults> => {
if (!userContext.authorizationToken) {
throw new Error("No authority token provided");
}
addToPolling(jobName);
const body: DataTransferJobGetResults = await get(subscriptionId, resourceGroupName, accountName, jobName);
const status = body?.properties?.status;
updateDataTransferJob(body);
if (status === "Cancelled") {
removeFromPolling(jobName);
clearMessage && clearMessage();
const cancelMessage = t(Keys.containerCopy.dataTransfers.polling.cancelConsoleMessage);
NotificationConsoleUtils.logConsoleError(cancelMessage);
throw new AbortError(cancelMessage);
}
if (status === "Paused") {
removeFromPolling(jobName);
clearMessage && clearMessage();
NotificationConsoleUtils.logConsoleInfo(t(Keys.containerCopy.dataTransfers.polling.pauseConsoleMessage));
return body;
}
if (status === "Failed" || status === "Faulted") {
removeFromPolling(jobName);
const errorMessage = body?.properties?.error
? JSON.stringify(body?.properties?.error)
: t(Keys.containerCopy.dataTransfers.polling.defaultErrorMessage);
const error = new Error(errorMessage);
clearMessage && clearMessage();
NotificationConsoleUtils.logConsoleError(
t(Keys.containerCopy.dataTransfers.polling.errorConsoleMessage, {
errorMessage: errorMessage,
jobName: jobName,
}),
);
throw new AbortError(error);
}
if (status === "Completed") {
removeFromPolling(jobName);
clearMessage && clearMessage();
NotificationConsoleUtils.logConsoleInfo(
t(Keys.containerCopy.dataTransfers.polling.completedConsoleMessage, {
jobName: jobName,
}),
);
return body;
}
const processedCount = body.properties.processedCount;
const totalCount = body.properties.totalCount;
throw new Error(
t(Keys.containerCopy.dataTransfers.polling.retryConsoleMessage, {
jobName: jobName,
processedCount: processedCount,
totalCount: totalCount,
}),
);
};
export const cancelDataTransferJob = async (
subscriptionId: string,
resourceGroupName: string,
accountName: string,
jobName: string,
): Promise<void> => {
const cancelResult: DataTransferJobGetResults = await cancel(subscriptionId, resourceGroupName, accountName, jobName);
updateDataTransferJob(cancelResult);
removeFromPolling(cancelResult?.properties?.jobName);
};
export const pauseDataTransferJob = async (
subscriptionId: string,
resourceGroupName: string,
accountName: string,
jobName: string,
): Promise<void> => {
const pauseResult: DataTransferJobGetResults = await pause(subscriptionId, resourceGroupName, accountName, jobName);
updateDataTransferJob(pauseResult);
removeFromPolling(pauseResult?.properties?.jobName);
};
export const resumeDataTransferJob = async (
subscriptionId: string,
resourceGroupName: string,
accountName: string,
jobName: string,
): Promise<void> => {
const resumeResult: DataTransferJobGetResults = await resume(subscriptionId, resourceGroupName, accountName, jobName);
updateDataTransferJob(resumeResult);
};
export const completeDataTransferJob = async (
subscriptionId: string,
resourceGroupName: string,
accountName: string,
jobName: string,
): Promise<void> => {
const completeResult: DataTransferJobGetResults = await complete(
subscriptionId,
resourceGroupName,
accountName,
jobName,
);
updateDataTransferJob(completeResult);
removeFromPolling(completeResult?.properties?.jobName);
};
const createPayload = (
apiType: ApiType,
databaseName: string,
containerName: string,
):
| CosmosSqlDataTransferDataSourceSink
| CosmosMongoDataTransferDataSourceSink
| CosmosCassandraDataTransferDataSourceSink => {
switch (apiType) {
case "SQL":
return {
component: "CosmosDBSql",
databaseName: databaseName,
containerName: containerName,
} as CosmosSqlDataTransferDataSourceSink;
case "Mongo":
return {
component: "CosmosDBMongo",
databaseName: databaseName,
collectionName: containerName,
} as CosmosMongoDataTransferDataSourceSink;
case "Cassandra":
return {
component: "CosmosDBCassandra",
keyspaceName: databaseName,
tableName: containerName,
};
default:
throw new Error(`Unsupported API type for data transfer: ${apiType}`);
}
};