mirror of
https://github.com/Azure/cosmos-explorer.git
synced 2026-01-08 20:17:03 +00:00
add copy of cosmos node.js SDK as local dependency
This commit is contained in:
committed by
Chris Anderson
parent
679e4e56df
commit
2afb2d82e4
@@ -7,7 +7,7 @@ import { StatusCodes, SubStatusCodes } from "../common";
|
|||||||
import { MetadataLookUpType } from "../CosmosDiagnostics";
|
import { MetadataLookUpType } from "../CosmosDiagnostics";
|
||||||
import { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel";
|
import { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel";
|
||||||
import { DiagnosticNodeInternal, DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal";
|
import { DiagnosticNodeInternal, DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal";
|
||||||
import { RUCapPerOperationExceededErrorCode, } from "../request/RUCapPerOperationExceededError";
|
import { RUCapPerOperationExceededErrorCode } from "../request/RUCapPerOperationExceededError";
|
||||||
import { QueryRange } from "../routing/QueryRange";
|
import { QueryRange } from "../routing/QueryRange";
|
||||||
import { SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider";
|
import { SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider";
|
||||||
import { addDignosticChild } from "../utils/diagnostics";
|
import { addDignosticChild } from "../utils/diagnostics";
|
||||||
@@ -18,461 +18,474 @@ const logger = createClientLogger("parallelQueryExecutionContextBase");
|
|||||||
/** @hidden */
|
/** @hidden */
|
||||||
export var ParallelQueryExecutionContextBaseStates;
|
export var ParallelQueryExecutionContextBaseStates;
|
||||||
(function (ParallelQueryExecutionContextBaseStates) {
|
(function (ParallelQueryExecutionContextBaseStates) {
|
||||||
ParallelQueryExecutionContextBaseStates["started"] = "started";
|
ParallelQueryExecutionContextBaseStates["started"] = "started";
|
||||||
ParallelQueryExecutionContextBaseStates["inProgress"] = "inProgress";
|
ParallelQueryExecutionContextBaseStates["inProgress"] = "inProgress";
|
||||||
ParallelQueryExecutionContextBaseStates["ended"] = "ended";
|
ParallelQueryExecutionContextBaseStates["ended"] = "ended";
|
||||||
})(ParallelQueryExecutionContextBaseStates || (ParallelQueryExecutionContextBaseStates = {}));
|
})(ParallelQueryExecutionContextBaseStates || (ParallelQueryExecutionContextBaseStates = {}));
|
||||||
/** @hidden */
|
/** @hidden */
|
||||||
export class ParallelQueryExecutionContextBase {
|
export class ParallelQueryExecutionContextBase {
|
||||||
/**
|
/**
|
||||||
* Provides the ParallelQueryExecutionContextBase.
|
* Provides the ParallelQueryExecutionContextBase.
|
||||||
* This is the base class that ParallelQueryExecutionContext and OrderByQueryExecutionContext will derive from.
|
* This is the base class that ParallelQueryExecutionContext and OrderByQueryExecutionContext will derive from.
|
||||||
*
|
*
|
||||||
* When handling a parallelized query, it instantiates one instance of
|
* When handling a parallelized query, it instantiates one instance of
|
||||||
* DocumentProcuder per target partition key range and aggregates the result of each.
|
* DocumentProcuder per target partition key range and aggregates the result of each.
|
||||||
*
|
*
|
||||||
* @param clientContext - The service endpoint to use to create the client.
|
* @param clientContext - The service endpoint to use to create the client.
|
||||||
* @param collectionLink - The Collection Link
|
* @param collectionLink - The Collection Link
|
||||||
* @param options - Represents the feed options.
|
* @param options - Represents the feed options.
|
||||||
* @param partitionedQueryExecutionInfo - PartitionedQueryExecutionInfo
|
* @param partitionedQueryExecutionInfo - PartitionedQueryExecutionInfo
|
||||||
* @hidden
|
* @hidden
|
||||||
*/
|
*/
|
||||||
constructor(clientContext, collectionLink, query, options, partitionedQueryExecutionInfo) {
|
constructor(clientContext, collectionLink, query, options, partitionedQueryExecutionInfo) {
|
||||||
this.clientContext = clientContext;
|
this.clientContext = clientContext;
|
||||||
this.collectionLink = collectionLink;
|
this.collectionLink = collectionLink;
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
|
this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
|
||||||
this.initializedPriorityQueue = false;
|
this.initializedPriorityQueue = false;
|
||||||
this.ruCapExceededError = undefined;
|
this.ruCapExceededError = undefined;
|
||||||
this.clientContext = clientContext;
|
this.clientContext = clientContext;
|
||||||
this.collectionLink = collectionLink;
|
this.collectionLink = collectionLink;
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
|
this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
|
||||||
this.diagnosticNodeWrapper = {
|
this.diagnosticNodeWrapper = {
|
||||||
consumed: false,
|
consumed: false,
|
||||||
diagnosticNode: new DiagnosticNodeInternal(clientContext.diagnosticLevel, DiagnosticNodeType.PARALLEL_QUERY_NODE, null),
|
diagnosticNode: new DiagnosticNodeInternal(
|
||||||
};
|
clientContext.diagnosticLevel,
|
||||||
this.diagnosticNodeWrapper.diagnosticNode.addData({ stateful: true });
|
DiagnosticNodeType.PARALLEL_QUERY_NODE,
|
||||||
this.err = undefined;
|
null,
|
||||||
this.state = ParallelQueryExecutionContextBase.STATES.started;
|
),
|
||||||
this.routingProvider = new SmartRoutingMapProvider(this.clientContext);
|
};
|
||||||
this.sortOrders = this.partitionedQueryExecutionInfo.queryInfo.orderBy;
|
this.diagnosticNodeWrapper.diagnosticNode.addData({ stateful: true });
|
||||||
this.requestContinuation = options ? options.continuationToken || options.continuation : null;
|
this.err = undefined;
|
||||||
// response headers of undergoing operation
|
this.state = ParallelQueryExecutionContextBase.STATES.started;
|
||||||
this.respHeaders = getInitialHeader();
|
this.routingProvider = new SmartRoutingMapProvider(this.clientContext);
|
||||||
// Make priority queue for documentProducers
|
this.sortOrders = this.partitionedQueryExecutionInfo.queryInfo.orderBy;
|
||||||
// The comparator is supplied by the derived class
|
this.requestContinuation = options ? options.continuationToken || options.continuation : null;
|
||||||
this.orderByPQ = new PriorityQueue((a, b) => this.documentProducerComparator(b, a));
|
// response headers of undergoing operation
|
||||||
this.nextItemfetchSemaphore = semaphore(1);
|
this.respHeaders = getInitialHeader();
|
||||||
}
|
// Make priority queue for documentProducers
|
||||||
_mergeWithActiveResponseHeaders(headers) {
|
// The comparator is supplied by the derived class
|
||||||
mergeHeaders(this.respHeaders, headers);
|
this.orderByPQ = new PriorityQueue((a, b) => this.documentProducerComparator(b, a));
|
||||||
}
|
this.nextItemfetchSemaphore = semaphore(1);
|
||||||
_getAndResetActiveResponseHeaders() {
|
}
|
||||||
const ret = this.respHeaders;
|
_mergeWithActiveResponseHeaders(headers) {
|
||||||
this.respHeaders = getInitialHeader();
|
mergeHeaders(this.respHeaders, headers);
|
||||||
return ret;
|
}
|
||||||
}
|
_getAndResetActiveResponseHeaders() {
|
||||||
getDiagnosticNode() {
|
const ret = this.respHeaders;
|
||||||
return this.diagnosticNodeWrapper.diagnosticNode;
|
this.respHeaders = getInitialHeader();
|
||||||
}
|
return ret;
|
||||||
async _onTargetPartitionRanges() {
|
}
|
||||||
// invokes the callback when the target partition ranges are ready
|
getDiagnosticNode() {
|
||||||
const parsedRanges = this.partitionedQueryExecutionInfo.queryRanges;
|
return this.diagnosticNodeWrapper.diagnosticNode;
|
||||||
const queryRanges = parsedRanges.map((item) => QueryRange.parseFromDict(item));
|
}
|
||||||
return this.routingProvider.getOverlappingRanges(this.collectionLink, queryRanges, this.getDiagnosticNode());
|
async _onTargetPartitionRanges() {
|
||||||
}
|
// invokes the callback when the target partition ranges are ready
|
||||||
/**
|
const parsedRanges = this.partitionedQueryExecutionInfo.queryRanges;
|
||||||
* Gets the replacement ranges for a partitionkeyrange that has been split
|
const queryRanges = parsedRanges.map((item) => QueryRange.parseFromDict(item));
|
||||||
*/
|
return this.routingProvider.getOverlappingRanges(this.collectionLink, queryRanges, this.getDiagnosticNode());
|
||||||
async _getReplacementPartitionKeyRanges(documentProducer) {
|
}
|
||||||
const partitionKeyRange = documentProducer.targetPartitionKeyRange;
|
/**
|
||||||
// Download the new routing map
|
* Gets the replacement ranges for a partitionkeyrange that has been split
|
||||||
this.routingProvider = new SmartRoutingMapProvider(this.clientContext);
|
*/
|
||||||
// Get the queryRange that relates to this partitionKeyRange
|
async _getReplacementPartitionKeyRanges(documentProducer) {
|
||||||
const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange);
|
const partitionKeyRange = documentProducer.targetPartitionKeyRange;
|
||||||
return this.routingProvider.getOverlappingRanges(this.collectionLink, [queryRange], this.getDiagnosticNode());
|
// Download the new routing map
|
||||||
}
|
this.routingProvider = new SmartRoutingMapProvider(this.clientContext);
|
||||||
// TODO: P0 Code smell - can barely tell what this is doing
|
// Get the queryRange that relates to this partitionKeyRange
|
||||||
/**
|
const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange);
|
||||||
* Removes the current document producer from the priqueue,
|
return this.routingProvider.getOverlappingRanges(this.collectionLink, [queryRange], this.getDiagnosticNode());
|
||||||
* replaces that document producer with child document producers,
|
}
|
||||||
* then reexecutes the originFunction with the corrrected executionContext
|
// TODO: P0 Code smell - can barely tell what this is doing
|
||||||
*/
|
/**
|
||||||
async _repairExecutionContext(diagnosticNode, originFunction) {
|
* Removes the current document producer from the priqueue,
|
||||||
// TODO: any
|
* replaces that document producer with child document producers,
|
||||||
// Get the replacement ranges
|
* then reexecutes the originFunction with the corrrected executionContext
|
||||||
// Removing the invalid documentProducer from the orderByPQ
|
*/
|
||||||
const parentDocumentProducer = this.orderByPQ.deq();
|
async _repairExecutionContext(diagnosticNode, originFunction) {
|
||||||
|
// TODO: any
|
||||||
|
// Get the replacement ranges
|
||||||
|
// Removing the invalid documentProducer from the orderByPQ
|
||||||
|
const parentDocumentProducer = this.orderByPQ.deq();
|
||||||
|
try {
|
||||||
|
const replacementPartitionKeyRanges = await this._getReplacementPartitionKeyRanges(parentDocumentProducer);
|
||||||
|
const replacementDocumentProducers = [];
|
||||||
|
// Create the replacement documentProducers
|
||||||
|
replacementPartitionKeyRanges.forEach((partitionKeyRange) => {
|
||||||
|
// Create replacment document producers with the parent's continuationToken
|
||||||
|
const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext(
|
||||||
|
partitionKeyRange,
|
||||||
|
parentDocumentProducer.continuationToken,
|
||||||
|
);
|
||||||
|
replacementDocumentProducers.push(replacementDocumentProducer);
|
||||||
|
});
|
||||||
|
// We need to check if the documentProducers even has anything left to fetch from before enqueing them
|
||||||
|
const checkAndEnqueueDocumentProducer = async (documentProducerToCheck, checkNextDocumentProducerCallback) => {
|
||||||
try {
|
try {
|
||||||
const replacementPartitionKeyRanges = await this._getReplacementPartitionKeyRanges(parentDocumentProducer);
|
const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode);
|
||||||
const replacementDocumentProducers = [];
|
if (afterItem === undefined) {
|
||||||
// Create the replacement documentProducers
|
// no more results left in this document producer, so we don't enqueue it
|
||||||
replacementPartitionKeyRanges.forEach((partitionKeyRange) => {
|
} else {
|
||||||
// Create replacment document producers with the parent's continuationToken
|
// Safe to put document producer back in the queue
|
||||||
const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext(partitionKeyRange, parentDocumentProducer.continuationToken);
|
this.orderByPQ.enq(documentProducerToCheck);
|
||||||
replacementDocumentProducers.push(replacementDocumentProducer);
|
}
|
||||||
});
|
await checkNextDocumentProducerCallback();
|
||||||
// We need to check if the documentProducers even has anything left to fetch from before enqueing them
|
} catch (err) {
|
||||||
const checkAndEnqueueDocumentProducer = async (documentProducerToCheck, checkNextDocumentProducerCallback) => {
|
this.err = err;
|
||||||
try {
|
return;
|
||||||
const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode);
|
|
||||||
if (afterItem === undefined) {
|
|
||||||
// no more results left in this document producer, so we don't enqueue it
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Safe to put document producer back in the queue
|
|
||||||
this.orderByPQ.enq(documentProducerToCheck);
|
|
||||||
}
|
|
||||||
await checkNextDocumentProducerCallback();
|
|
||||||
}
|
|
||||||
catch (err) {
|
|
||||||
this.err = err;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const checkAndEnqueueDocumentProducers = async (rdp) => {
|
|
||||||
if (rdp.length > 0) {
|
|
||||||
// We still have a replacementDocumentProducer to check
|
|
||||||
const replacementDocumentProducer = rdp.shift();
|
|
||||||
await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => {
|
|
||||||
await checkAndEnqueueDocumentProducers(rdp);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// reexecutes the originFunction with the corrrected executionContext
|
|
||||||
return originFunction();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// Invoke the recursive function to get the ball rolling
|
|
||||||
await checkAndEnqueueDocumentProducers(replacementDocumentProducers);
|
|
||||||
}
|
}
|
||||||
catch (err) {
|
};
|
||||||
|
const checkAndEnqueueDocumentProducers = async (rdp) => {
|
||||||
|
if (rdp.length > 0) {
|
||||||
|
// We still have a replacementDocumentProducer to check
|
||||||
|
const replacementDocumentProducer = rdp.shift();
|
||||||
|
await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => {
|
||||||
|
await checkAndEnqueueDocumentProducers(rdp);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// reexecutes the originFunction with the corrrected executionContext
|
||||||
|
return originFunction();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Invoke the recursive function to get the ball rolling
|
||||||
|
await checkAndEnqueueDocumentProducers(replacementDocumentProducers);
|
||||||
|
} catch (err) {
|
||||||
|
this.err = err;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static _needPartitionKeyRangeCacheRefresh(error) {
|
||||||
|
// TODO: any error
|
||||||
|
return (
|
||||||
|
error.code === StatusCodes.Gone &&
|
||||||
|
"substatus" in error &&
|
||||||
|
error["substatus"] === SubStatusCodes.PartitionKeyRangeGone
|
||||||
|
);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Checks to see if the executionContext needs to be repaired.
|
||||||
|
* if so it repairs the execution context and executes the ifCallback,
|
||||||
|
* else it continues with the current execution context and executes the elseCallback
|
||||||
|
*/
|
||||||
|
async _repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback, operationOptions, ruConsumedManager) {
|
||||||
|
const documentProducer = this.orderByPQ.peek();
|
||||||
|
// Check if split happened
|
||||||
|
try {
|
||||||
|
await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager);
|
||||||
|
elseCallback();
|
||||||
|
} catch (err) {
|
||||||
|
if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) {
|
||||||
|
// Split has happened so we need to repair execution context before continueing
|
||||||
|
return addDignosticChild(
|
||||||
|
(childNode) => this._repairExecutionContext(childNode, ifCallback),
|
||||||
|
diagnosticNode,
|
||||||
|
DiagnosticNodeType.QUERY_REPAIR_NODE,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Something actually bad happened ...
|
||||||
|
this.err = err;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Fetches the next element in the ParallelQueryExecutionContextBase.
|
||||||
|
*/
|
||||||
|
async nextItem(diagnosticNode, operationOptions, ruConsumedManager) {
|
||||||
|
if (this.err) {
|
||||||
|
// if there is a prior error return error
|
||||||
|
throw this.err;
|
||||||
|
}
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.nextItemfetchSemaphore.take(async () => {
|
||||||
|
// document producer queue initilization
|
||||||
|
if (!this.initializedPriorityQueue) {
|
||||||
|
try {
|
||||||
|
await this._createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager);
|
||||||
|
this.initializedPriorityQueue = true;
|
||||||
|
} catch (err) {
|
||||||
this.err = err;
|
this.err = err;
|
||||||
throw err;
|
// release the lock before invoking callback
|
||||||
|
this.nextItemfetchSemaphore.leave();
|
||||||
|
reject(this.err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if (!this.diagnosticNodeWrapper.consumed) {
|
||||||
static _needPartitionKeyRangeCacheRefresh(error) {
|
diagnosticNode.addChildNode(
|
||||||
// TODO: any error
|
this.diagnosticNodeWrapper.diagnosticNode,
|
||||||
return (error.code === StatusCodes.Gone &&
|
CosmosDbDiagnosticLevel.debug,
|
||||||
"substatus" in error &&
|
MetadataLookUpType.QueryPlanLookUp,
|
||||||
error["substatus"] === SubStatusCodes.PartitionKeyRangeGone);
|
);
|
||||||
}
|
this.diagnosticNodeWrapper.diagnosticNode = undefined;
|
||||||
/**
|
this.diagnosticNodeWrapper.consumed = true;
|
||||||
* Checks to see if the executionContext needs to be repaired.
|
} else {
|
||||||
* if so it repairs the execution context and executes the ifCallback,
|
this.diagnosticNodeWrapper.diagnosticNode = diagnosticNode;
|
||||||
* else it continues with the current execution context and executes the elseCallback
|
|
||||||
*/
|
|
||||||
async _repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback, operationOptions, ruConsumedManager) {
|
|
||||||
const documentProducer = this.orderByPQ.peek();
|
|
||||||
// Check if split happened
|
|
||||||
try {
|
|
||||||
await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager);
|
|
||||||
elseCallback();
|
|
||||||
}
|
}
|
||||||
catch (err) {
|
// NOTE: lock must be released before invoking quitting
|
||||||
if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) {
|
|
||||||
// Split has happened so we need to repair execution context before continueing
|
|
||||||
return addDignosticChild((childNode) => this._repairExecutionContext(childNode, ifCallback), diagnosticNode, DiagnosticNodeType.QUERY_REPAIR_NODE);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Something actually bad happened ...
|
|
||||||
this.err = err;
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Fetches the next element in the ParallelQueryExecutionContextBase.
|
|
||||||
*/
|
|
||||||
async nextItem(diagnosticNode, operationOptions, ruConsumedManager) {
|
|
||||||
if (this.err) {
|
if (this.err) {
|
||||||
// if there is a prior error return error
|
// release the lock before invoking callback
|
||||||
throw this.err;
|
this.nextItemfetchSemaphore.leave();
|
||||||
|
this.err.headers = this._getAndResetActiveResponseHeaders();
|
||||||
|
reject(this.err);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
return new Promise((resolve, reject) => {
|
if (this.orderByPQ.size() === 0) {
|
||||||
this.nextItemfetchSemaphore.take(async () => {
|
// there is no more results
|
||||||
// document producer queue initilization
|
this.state = ParallelQueryExecutionContextBase.STATES.ended;
|
||||||
if (!this.initializedPriorityQueue) {
|
// release the lock before invoking callback
|
||||||
try {
|
this.nextItemfetchSemaphore.leave();
|
||||||
await this._createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager);
|
return resolve({
|
||||||
this.initializedPriorityQueue = true;
|
result: undefined,
|
||||||
}
|
headers: this._getAndResetActiveResponseHeaders(),
|
||||||
catch (err) {
|
});
|
||||||
this.err = err;
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
reject(this.err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!this.diagnosticNodeWrapper.consumed) {
|
|
||||||
diagnosticNode.addChildNode(this.diagnosticNodeWrapper.diagnosticNode, CosmosDbDiagnosticLevel.debug, MetadataLookUpType.QueryPlanLookUp);
|
|
||||||
this.diagnosticNodeWrapper.diagnosticNode = undefined;
|
|
||||||
this.diagnosticNodeWrapper.consumed = true;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.diagnosticNodeWrapper.diagnosticNode = diagnosticNode;
|
|
||||||
}
|
|
||||||
// NOTE: lock must be released before invoking quitting
|
|
||||||
if (this.err) {
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
this.err.headers = this._getAndResetActiveResponseHeaders();
|
|
||||||
reject(this.err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.orderByPQ.size() === 0) {
|
|
||||||
// there is no more results
|
|
||||||
this.state = ParallelQueryExecutionContextBase.STATES.ended;
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
return resolve({
|
|
||||||
result: undefined,
|
|
||||||
headers: this._getAndResetActiveResponseHeaders(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
const ifCallback = () => {
|
|
||||||
// Release the semaphore to avoid deadlock
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
// Reexcute the function
|
|
||||||
return resolve(this.nextItem(diagnosticNode, operationOptions, ruConsumedManager));
|
|
||||||
};
|
|
||||||
const elseCallback = async () => {
|
|
||||||
let documentProducer;
|
|
||||||
try {
|
|
||||||
documentProducer = this.orderByPQ.deq();
|
|
||||||
}
|
|
||||||
catch (e) {
|
|
||||||
// if comparing elements of the priority queue throws exception
|
|
||||||
// set that error and return error
|
|
||||||
this.err = e;
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
this.err.headers = this._getAndResetActiveResponseHeaders();
|
|
||||||
reject(this.err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let item;
|
|
||||||
let headers;
|
|
||||||
try {
|
|
||||||
const response = await documentProducer.nextItem(diagnosticNode, operationOptions, ruConsumedManager);
|
|
||||||
item = response.result;
|
|
||||||
headers = response.headers;
|
|
||||||
this._mergeWithActiveResponseHeaders(headers);
|
|
||||||
if (item === undefined) {
|
|
||||||
// this should never happen
|
|
||||||
// because the documentProducer already has buffered an item
|
|
||||||
// assert item !== undefined
|
|
||||||
this.err = new Error(`Extracted DocumentProducer from the priority queue \
|
|
||||||
doesn't have any buffered item!`);
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
return resolve({
|
|
||||||
result: undefined,
|
|
||||||
headers: this._getAndResetActiveResponseHeaders(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (err) {
|
|
||||||
if (err.code === RUCapPerOperationExceededErrorCode) {
|
|
||||||
this._updateErrorObjectWithBufferedData(err);
|
|
||||||
this.err = err;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.err = new Error(`Extracted DocumentProducer from the priority queue fails to get the \
|
|
||||||
buffered item. Due to ${JSON.stringify(err)}`);
|
|
||||||
this.err.headers = this._getAndResetActiveResponseHeaders();
|
|
||||||
}
|
|
||||||
// release the lock before invoking callback
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
reject(this.err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// we need to put back the document producer to the queue if it has more elements.
|
|
||||||
// the lock will be released after we know document producer must be put back in the queue or not
|
|
||||||
try {
|
|
||||||
const { result: afterItem, headers: otherHeaders } = await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager);
|
|
||||||
this._mergeWithActiveResponseHeaders(otherHeaders);
|
|
||||||
if (afterItem === undefined) {
|
|
||||||
// no more results is left in this document producer
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
try {
|
|
||||||
const headItem = documentProducer.fetchResults[0];
|
|
||||||
if (typeof headItem === "undefined") {
|
|
||||||
throw new Error("Extracted DocumentProducer from PQ is invalid state with no result!");
|
|
||||||
}
|
|
||||||
this.orderByPQ.enq(documentProducer);
|
|
||||||
}
|
|
||||||
catch (e) {
|
|
||||||
// if comparing elements in priority queue throws exception
|
|
||||||
// set error
|
|
||||||
this.err = e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (err) {
|
|
||||||
if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) {
|
|
||||||
// We want the document producer enqueued
|
|
||||||
// So that later parts of the code can repair the execution context
|
|
||||||
this.orderByPQ.enq(documentProducer);
|
|
||||||
}
|
|
||||||
else if (err.code === RUCapPerOperationExceededErrorCode) {
|
|
||||||
this._updateErrorObjectWithBufferedData(err);
|
|
||||||
this.err = err;
|
|
||||||
reject(this.err);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Something actually bad happened
|
|
||||||
this.err = err;
|
|
||||||
reject(this.err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
// release the lock before returning
|
|
||||||
this.nextItemfetchSemaphore.leave();
|
|
||||||
}
|
|
||||||
// invoke the callback on the item
|
|
||||||
return resolve({
|
|
||||||
result: item,
|
|
||||||
headers: this._getAndResetActiveResponseHeaders(),
|
|
||||||
});
|
|
||||||
};
|
|
||||||
this._repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback).catch(reject);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
_updateErrorObjectWithBufferedData(err) {
|
|
||||||
this.orderByPQ.forEach((dp) => {
|
|
||||||
const bufferedItems = dp.peekBufferedItems();
|
|
||||||
err.fetchedResults.push(...bufferedItems);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Determine if there are still remaining resources to processs based on the value of the continuation
|
|
||||||
* token or the elements remaining on the current batch in the QueryIterator.
|
|
||||||
* @returns true if there is other elements to process in the ParallelQueryExecutionContextBase.
|
|
||||||
*/
|
|
||||||
hasMoreResults() {
|
|
||||||
return !(this.state === ParallelQueryExecutionContextBase.STATES.ended || this.err !== undefined);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Creates document producers
|
|
||||||
*/
|
|
||||||
_createTargetPartitionQueryExecutionContext(partitionKeyTargetRange, continuationToken) {
|
|
||||||
// TODO: any
|
|
||||||
// creates target partition range Query Execution Context
|
|
||||||
let rewrittenQuery = this.partitionedQueryExecutionInfo.queryInfo.rewrittenQuery;
|
|
||||||
let sqlQuerySpec;
|
|
||||||
const query = this.query;
|
|
||||||
if (typeof query === "string") {
|
|
||||||
sqlQuerySpec = { query };
|
|
||||||
}
|
}
|
||||||
else {
|
const ifCallback = () => {
|
||||||
sqlQuerySpec = query;
|
// Release the semaphore to avoid deadlock
|
||||||
}
|
this.nextItemfetchSemaphore.leave();
|
||||||
const formatPlaceHolder = "{documentdb-formattableorderbyquery-filter}";
|
// Reexcute the function
|
||||||
if (rewrittenQuery) {
|
return resolve(this.nextItem(diagnosticNode, operationOptions, ruConsumedManager));
|
||||||
sqlQuerySpec = JSON.parse(JSON.stringify(sqlQuerySpec));
|
};
|
||||||
// We hardcode the formattable filter to true for now
|
const elseCallback = async () => {
|
||||||
rewrittenQuery = rewrittenQuery.replace(formatPlaceHolder, "true");
|
let documentProducer;
|
||||||
sqlQuerySpec["query"] = rewrittenQuery;
|
try {
|
||||||
}
|
documentProducer = this.orderByPQ.deq();
|
||||||
const options = Object.assign({}, this.options);
|
} catch (e) {
|
||||||
options.continuationToken = continuationToken;
|
// if comparing elements of the priority queue throws exception
|
||||||
return new DocumentProducer(this.clientContext, this.collectionLink, sqlQuerySpec, partitionKeyTargetRange, options);
|
// set that error and return error
|
||||||
}
|
this.err = e;
|
||||||
async _createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager) {
|
// release the lock before invoking callback
|
||||||
try {
|
this.nextItemfetchSemaphore.leave();
|
||||||
const targetPartitionRanges = await this._onTargetPartitionRanges();
|
this.err.headers = this._getAndResetActiveResponseHeaders();
|
||||||
const maxDegreeOfParallelism = this.options.maxDegreeOfParallelism === undefined || this.options.maxDegreeOfParallelism < 1
|
reject(this.err);
|
||||||
? targetPartitionRanges.length
|
return;
|
||||||
: Math.min(this.options.maxDegreeOfParallelism, targetPartitionRanges.length);
|
}
|
||||||
logger.info("Query starting against " +
|
let item;
|
||||||
targetPartitionRanges.length +
|
let headers;
|
||||||
" ranges with parallelism of " +
|
try {
|
||||||
maxDegreeOfParallelism);
|
const response = await documentProducer.nextItem(diagnosticNode, operationOptions, ruConsumedManager);
|
||||||
let filteredPartitionKeyRanges = [];
|
item = response.result;
|
||||||
// The document producers generated from filteredPartitionKeyRanges
|
headers = response.headers;
|
||||||
const targetPartitionQueryExecutionContextList = [];
|
|
||||||
if (this.requestContinuation) {
|
|
||||||
throw new Error("Continuation tokens are not yet supported for cross partition queries");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
filteredPartitionKeyRanges = targetPartitionRanges;
|
|
||||||
}
|
|
||||||
// Create one documentProducer for each partitionTargetRange
|
|
||||||
filteredPartitionKeyRanges.forEach((partitionTargetRange) => {
|
|
||||||
// TODO: any partitionTargetRange
|
|
||||||
// no async callback
|
|
||||||
targetPartitionQueryExecutionContextList.push(this._createTargetPartitionQueryExecutionContext(partitionTargetRange));
|
|
||||||
});
|
|
||||||
// Fill up our priority queue with documentProducers
|
|
||||||
let inProgressPromises = [];
|
|
||||||
for (const documentProducer of targetPartitionQueryExecutionContextList) {
|
|
||||||
// Don't enqueue any new promise if RU cap exceeded
|
|
||||||
if (this.ruCapExceededError) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
const promise = this._processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager);
|
|
||||||
inProgressPromises.push(promise);
|
|
||||||
// Limit concurrent executions
|
|
||||||
if (inProgressPromises.length === maxDegreeOfParallelism) {
|
|
||||||
await Promise.all(inProgressPromises);
|
|
||||||
inProgressPromises = [];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Wait for all promises to complete
|
|
||||||
await Promise.all(inProgressPromises);
|
|
||||||
if (this.err) {
|
|
||||||
if (this.ruCapExceededError) {
|
|
||||||
// merge the buffered items
|
|
||||||
this.orderByPQ.forEach((dp) => {
|
|
||||||
const bufferedItems = dp.peekBufferedItems();
|
|
||||||
this.ruCapExceededError.fetchedResults.push(...bufferedItems);
|
|
||||||
});
|
|
||||||
throw this.ruCapExceededError;
|
|
||||||
}
|
|
||||||
throw this.err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (err) {
|
|
||||||
this.err = err;
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async _processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager) {
|
|
||||||
try {
|
|
||||||
const { result: document, headers } = await documentProducer.current(this.getDiagnosticNode(), operationOptions, ruConsumedManager);
|
|
||||||
this._mergeWithActiveResponseHeaders(headers);
|
this._mergeWithActiveResponseHeaders(headers);
|
||||||
if (document !== undefined) {
|
if (item === undefined) {
|
||||||
this.orderByPQ.enq(documentProducer);
|
// this should never happen
|
||||||
|
// because the documentProducer already has buffered an item
|
||||||
|
// assert item !== undefined
|
||||||
|
this.err = new Error(`Extracted DocumentProducer from the priority queue \
|
||||||
|
doesn't have any buffered item!`);
|
||||||
|
// release the lock before invoking callback
|
||||||
|
this.nextItemfetchSemaphore.leave();
|
||||||
|
return resolve({
|
||||||
|
result: undefined,
|
||||||
|
headers: this._getAndResetActiveResponseHeaders(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
} catch (err) {
|
||||||
catch (err) {
|
|
||||||
this._mergeWithActiveResponseHeaders(err.headers);
|
|
||||||
this.err = err;
|
|
||||||
if (err.code === RUCapPerOperationExceededErrorCode) {
|
if (err.code === RUCapPerOperationExceededErrorCode) {
|
||||||
// would be halting further execution of other promises
|
this._updateErrorObjectWithBufferedData(err);
|
||||||
if (!this.ruCapExceededError) {
|
this.err = err;
|
||||||
this.ruCapExceededError = err;
|
} else {
|
||||||
}
|
this.err = new Error(`Extracted DocumentProducer from the priority queue fails to get the \
|
||||||
else {
|
buffered item. Due to ${JSON.stringify(err)}`);
|
||||||
// merge the buffered items
|
this.err.headers = this._getAndResetActiveResponseHeaders();
|
||||||
if (err.fetchedResults) {
|
|
||||||
this.ruCapExceededError.fetchedResults.push(...err.fetchedResults);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
// release the lock before invoking callback
|
||||||
throw err;
|
this.nextItemfetchSemaphore.leave();
|
||||||
|
reject(this.err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// we need to put back the document producer to the queue if it has more elements.
|
||||||
|
// the lock will be released after we know document producer must be put back in the queue or not
|
||||||
|
try {
|
||||||
|
const { result: afterItem, headers: otherHeaders } = await documentProducer.current(
|
||||||
|
diagnosticNode,
|
||||||
|
operationOptions,
|
||||||
|
ruConsumedManager,
|
||||||
|
);
|
||||||
|
this._mergeWithActiveResponseHeaders(otherHeaders);
|
||||||
|
if (afterItem === undefined) {
|
||||||
|
// no more results is left in this document producer
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
const headItem = documentProducer.fetchResults[0];
|
||||||
|
if (typeof headItem === "undefined") {
|
||||||
|
throw new Error("Extracted DocumentProducer from PQ is invalid state with no result!");
|
||||||
|
}
|
||||||
|
this.orderByPQ.enq(documentProducer);
|
||||||
|
} catch (e) {
|
||||||
|
// if comparing elements in priority queue throws exception
|
||||||
|
// set error
|
||||||
|
this.err = e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} catch (err) {
|
||||||
return;
|
if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) {
|
||||||
|
// We want the document producer enqueued
|
||||||
|
// So that later parts of the code can repair the execution context
|
||||||
|
this.orderByPQ.enq(documentProducer);
|
||||||
|
} else if (err.code === RUCapPerOperationExceededErrorCode) {
|
||||||
|
this._updateErrorObjectWithBufferedData(err);
|
||||||
|
this.err = err;
|
||||||
|
reject(this.err);
|
||||||
|
} else {
|
||||||
|
// Something actually bad happened
|
||||||
|
this.err = err;
|
||||||
|
reject(this.err);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// release the lock before returning
|
||||||
|
this.nextItemfetchSemaphore.leave();
|
||||||
|
}
|
||||||
|
// invoke the callback on the item
|
||||||
|
return resolve({
|
||||||
|
result: item,
|
||||||
|
headers: this._getAndResetActiveResponseHeaders(),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
this._repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback).catch(reject);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_updateErrorObjectWithBufferedData(err) {
|
||||||
|
this.orderByPQ.forEach((dp) => {
|
||||||
|
const bufferedItems = dp.peekBufferedItems();
|
||||||
|
err.fetchedResults.push(...bufferedItems);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Determine if there are still remaining resources to processs based on the value of the continuation
|
||||||
|
* token or the elements remaining on the current batch in the QueryIterator.
|
||||||
|
* @returns true if there is other elements to process in the ParallelQueryExecutionContextBase.
|
||||||
|
*/
|
||||||
|
hasMoreResults() {
|
||||||
|
return !(this.state === ParallelQueryExecutionContextBase.STATES.ended || this.err !== undefined);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Creates document producers
|
||||||
|
*/
|
||||||
|
_createTargetPartitionQueryExecutionContext(partitionKeyTargetRange, continuationToken) {
|
||||||
|
// TODO: any
|
||||||
|
// creates target partition range Query Execution Context
|
||||||
|
let rewrittenQuery = this.partitionedQueryExecutionInfo.queryInfo.rewrittenQuery;
|
||||||
|
let sqlQuerySpec;
|
||||||
|
const query = this.query;
|
||||||
|
if (typeof query === "string") {
|
||||||
|
sqlQuerySpec = { query };
|
||||||
|
} else {
|
||||||
|
sqlQuerySpec = query;
|
||||||
}
|
}
|
||||||
|
const formatPlaceHolder = "{documentdb-formattableorderbyquery-filter}";
|
||||||
|
if (rewrittenQuery) {
|
||||||
|
sqlQuerySpec = JSON.parse(JSON.stringify(sqlQuerySpec));
|
||||||
|
// We hardcode the formattable filter to true for now
|
||||||
|
rewrittenQuery = rewrittenQuery.replace(formatPlaceHolder, "true");
|
||||||
|
sqlQuerySpec["query"] = rewrittenQuery;
|
||||||
|
}
|
||||||
|
const options = Object.assign({}, this.options);
|
||||||
|
options.continuationToken = continuationToken;
|
||||||
|
return new DocumentProducer(
|
||||||
|
this.clientContext,
|
||||||
|
this.collectionLink,
|
||||||
|
sqlQuerySpec,
|
||||||
|
partitionKeyTargetRange,
|
||||||
|
options,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
async _createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager) {
|
||||||
|
try {
|
||||||
|
const targetPartitionRanges = await this._onTargetPartitionRanges();
|
||||||
|
const maxDegreeOfParallelism =
|
||||||
|
this.options.maxDegreeOfParallelism === undefined || this.options.maxDegreeOfParallelism < 1
|
||||||
|
? targetPartitionRanges.length
|
||||||
|
: Math.min(this.options.maxDegreeOfParallelism, targetPartitionRanges.length);
|
||||||
|
logger.info(
|
||||||
|
"Query starting against " +
|
||||||
|
targetPartitionRanges.length +
|
||||||
|
" ranges with parallelism of " +
|
||||||
|
maxDegreeOfParallelism,
|
||||||
|
);
|
||||||
|
let filteredPartitionKeyRanges = [];
|
||||||
|
// The document producers generated from filteredPartitionKeyRanges
|
||||||
|
const targetPartitionQueryExecutionContextList = [];
|
||||||
|
if (this.requestContinuation) {
|
||||||
|
throw new Error("Continuation tokens are not yet supported for cross partition queries");
|
||||||
|
} else {
|
||||||
|
filteredPartitionKeyRanges = targetPartitionRanges;
|
||||||
|
}
|
||||||
|
// Create one documentProducer for each partitionTargetRange
|
||||||
|
filteredPartitionKeyRanges.forEach((partitionTargetRange) => {
|
||||||
|
// TODO: any partitionTargetRange
|
||||||
|
// no async callback
|
||||||
|
targetPartitionQueryExecutionContextList.push(
|
||||||
|
this._createTargetPartitionQueryExecutionContext(partitionTargetRange),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
// Fill up our priority queue with documentProducers
|
||||||
|
let inProgressPromises = [];
|
||||||
|
for (const documentProducer of targetPartitionQueryExecutionContextList) {
|
||||||
|
// Don't enqueue any new promise if RU cap exceeded
|
||||||
|
if (this.ruCapExceededError) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const promise = this._processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager);
|
||||||
|
inProgressPromises.push(promise);
|
||||||
|
// Limit concurrent executions
|
||||||
|
if (inProgressPromises.length === maxDegreeOfParallelism) {
|
||||||
|
await Promise.all(inProgressPromises);
|
||||||
|
inProgressPromises = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Wait for all promises to complete
|
||||||
|
await Promise.all(inProgressPromises);
|
||||||
|
if (this.err) {
|
||||||
|
if (this.ruCapExceededError) {
|
||||||
|
// merge the buffered items
|
||||||
|
this.orderByPQ.forEach((dp) => {
|
||||||
|
const bufferedItems = dp.peekBufferedItems();
|
||||||
|
this.ruCapExceededError.fetchedResults.push(...bufferedItems);
|
||||||
|
});
|
||||||
|
throw this.ruCapExceededError;
|
||||||
|
}
|
||||||
|
throw this.err;
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.err = err;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async _processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager) {
|
||||||
|
try {
|
||||||
|
const { result: document, headers } = await documentProducer.current(
|
||||||
|
this.getDiagnosticNode(),
|
||||||
|
operationOptions,
|
||||||
|
ruConsumedManager,
|
||||||
|
);
|
||||||
|
this._mergeWithActiveResponseHeaders(headers);
|
||||||
|
if (document !== undefined) {
|
||||||
|
this.orderByPQ.enq(documentProducer);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this._mergeWithActiveResponseHeaders(err.headers);
|
||||||
|
this.err = err;
|
||||||
|
if (err.code === RUCapPerOperationExceededErrorCode) {
|
||||||
|
// would be halting further execution of other promises
|
||||||
|
if (!this.ruCapExceededError) {
|
||||||
|
this.ruCapExceededError = err;
|
||||||
|
} else {
|
||||||
|
// merge the buffered items
|
||||||
|
if (err.fetchedResults) {
|
||||||
|
this.ruCapExceededError.fetchedResults.push(...err.fetchedResults);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ParallelQueryExecutionContextBase.STATES = ParallelQueryExecutionContextBaseStates;
|
ParallelQueryExecutionContextBase.STATES = ParallelQueryExecutionContextBaseStates;
|
||||||
//# sourceMappingURL=parallelQueryExecutionContextBase.js.map
|
//# sourceMappingURL=parallelQueryExecutionContextBase.js.map
|
||||||
|
|||||||
@@ -4,276 +4,313 @@ import { __asyncGenerator, __await } from "tslib";
|
|||||||
import { getPathFromLink, ResourceType, RUConsumedManager, StatusCodes } from "./common";
|
import { getPathFromLink, ResourceType, RUConsumedManager, StatusCodes } from "./common";
|
||||||
import { MetadataLookUpType } from "./CosmosDiagnostics";
|
import { MetadataLookUpType } from "./CosmosDiagnostics";
|
||||||
import { DiagnosticNodeInternal, DiagnosticNodeType } from "./diagnostics/DiagnosticNodeInternal";
|
import { DiagnosticNodeInternal, DiagnosticNodeType } from "./diagnostics/DiagnosticNodeInternal";
|
||||||
import { DefaultQueryExecutionContext, getInitialHeader, mergeHeaders, PipelinedQueryExecutionContext, } from "./queryExecutionContext";
|
import {
|
||||||
|
DefaultQueryExecutionContext,
|
||||||
|
getInitialHeader,
|
||||||
|
mergeHeaders,
|
||||||
|
PipelinedQueryExecutionContext,
|
||||||
|
} from "./queryExecutionContext";
|
||||||
import { FeedResponse } from "./request/FeedResponse";
|
import { FeedResponse } from "./request/FeedResponse";
|
||||||
import { RUCapPerOperationExceededErrorCode } from "./request/RUCapPerOperationExceededError";
|
import { RUCapPerOperationExceededErrorCode } from "./request/RUCapPerOperationExceededError";
|
||||||
import { getEmptyCosmosDiagnostics, withDiagnostics, withMetadataDiagnostics, } from "./utils/diagnostics";
|
import { getEmptyCosmosDiagnostics, withDiagnostics, withMetadataDiagnostics } from "./utils/diagnostics";
|
||||||
/**
|
/**
|
||||||
* Represents a QueryIterator Object, an implementation of feed or query response that enables
|
* Represents a QueryIterator Object, an implementation of feed or query response that enables
|
||||||
* traversal and iterating over the response
|
* traversal and iterating over the response
|
||||||
* in the Azure Cosmos DB database service.
|
* in the Azure Cosmos DB database service.
|
||||||
*/
|
*/
|
||||||
export class QueryIterator {
|
export class QueryIterator {
|
||||||
/**
|
/**
|
||||||
* @hidden
|
* @hidden
|
||||||
*/
|
*/
|
||||||
constructor(clientContext, query, options, fetchFunctions, resourceLink, resourceType) {
|
constructor(clientContext, query, options, fetchFunctions, resourceLink, resourceType) {
|
||||||
this.clientContext = clientContext;
|
this.clientContext = clientContext;
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.fetchFunctions = fetchFunctions;
|
this.fetchFunctions = fetchFunctions;
|
||||||
this.resourceLink = resourceLink;
|
this.resourceLink = resourceLink;
|
||||||
this.resourceType = resourceType;
|
this.resourceType = resourceType;
|
||||||
this.nonStreamingOrderBy = false;
|
this.nonStreamingOrderBy = false;
|
||||||
this.query = query;
|
this.query = query;
|
||||||
this.fetchFunctions = fetchFunctions;
|
this.fetchFunctions = fetchFunctions;
|
||||||
this.options = options || {};
|
this.options = options || {};
|
||||||
this.resourceLink = resourceLink;
|
this.resourceLink = resourceLink;
|
||||||
this.fetchAllLastResHeaders = getInitialHeader();
|
this.fetchAllLastResHeaders = getInitialHeader();
|
||||||
this.reset();
|
this.reset();
|
||||||
this.isInitialized = false;
|
this.isInitialized = false;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Gets an async iterator that will yield results until completion.
|
* Gets an async iterator that will yield results until completion.
|
||||||
*
|
*
|
||||||
* NOTE: AsyncIterators are a very new feature and you might need to
|
* NOTE: AsyncIterators are a very new feature and you might need to
|
||||||
* use polyfils/etc. in order to use them in your code.
|
* use polyfils/etc. in order to use them in your code.
|
||||||
*
|
*
|
||||||
* If you're using TypeScript, you can use the following polyfill as long
|
* If you're using TypeScript, you can use the following polyfill as long
|
||||||
* as you target ES6 or higher and are running on Node 6 or higher.
|
* as you target ES6 or higher and are running on Node 6 or higher.
|
||||||
*
|
*
|
||||||
* ```typescript
|
* ```typescript
|
||||||
* if (!Symbol || !Symbol.asyncIterator) {
|
* if (!Symbol || !Symbol.asyncIterator) {
|
||||||
* (Symbol as any).asyncIterator = Symbol.for("Symbol.asyncIterator");
|
* (Symbol as any).asyncIterator = Symbol.for("Symbol.asyncIterator");
|
||||||
* }
|
* }
|
||||||
* ```
|
* ```
|
||||||
*
|
*
|
||||||
* @example Iterate over all databases
|
* @example Iterate over all databases
|
||||||
* ```typescript
|
* ```typescript
|
||||||
* for await(const { resources: db } of client.databases.readAll().getAsyncIterator()) {
|
* for await(const { resources: db } of client.databases.readAll().getAsyncIterator()) {
|
||||||
* console.log(`Got ${db} from AsyncIterator`);
|
* console.log(`Got ${db} from AsyncIterator`);
|
||||||
* }
|
* }
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
getAsyncIterator(options) {
|
getAsyncIterator(options) {
|
||||||
return __asyncGenerator(this, arguments, function* getAsyncIterator_1() {
|
return __asyncGenerator(this, arguments, function* getAsyncIterator_1() {
|
||||||
this.reset();
|
this.reset();
|
||||||
let diagnosticNode = new DiagnosticNodeInternal(this.clientContext.diagnosticLevel, DiagnosticNodeType.CLIENT_REQUEST_NODE, null);
|
let diagnosticNode = new DiagnosticNodeInternal(
|
||||||
this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode);
|
this.clientContext.diagnosticLevel,
|
||||||
let ruConsumedManager;
|
DiagnosticNodeType.CLIENT_REQUEST_NODE,
|
||||||
if (options && options.ruCapPerOperation) {
|
null,
|
||||||
ruConsumedManager = new RUConsumedManager();
|
);
|
||||||
}
|
this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode);
|
||||||
while (this.queryExecutionContext.hasMoreResults()) {
|
let ruConsumedManager;
|
||||||
let response;
|
if (options && options.ruCapPerOperation) {
|
||||||
try {
|
ruConsumedManager = new RUConsumedManager();
|
||||||
response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager));
|
}
|
||||||
}
|
while (this.queryExecutionContext.hasMoreResults()) {
|
||||||
catch (error) {
|
|
||||||
if (this.needsQueryPlan(error)) {
|
|
||||||
yield __await(this.createPipelinedExecutionContext());
|
|
||||||
try {
|
|
||||||
response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager));
|
|
||||||
}
|
|
||||||
catch (queryError) {
|
|
||||||
this.handleSplitError(queryError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const feedResponse = new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults(), diagnosticNode.toDiagnostic(this.clientContext.getClientConfig()));
|
|
||||||
diagnosticNode = new DiagnosticNodeInternal(this.clientContext.diagnosticLevel, DiagnosticNodeType.CLIENT_REQUEST_NODE, null);
|
|
||||||
if (response.result !== undefined) {
|
|
||||||
yield yield __await(feedResponse);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Determine if there are still remaining resources to process based on the value of the continuation token or the
|
|
||||||
* elements remaining on the current batch in the QueryIterator.
|
|
||||||
* @returns true if there is other elements to process in the QueryIterator.
|
|
||||||
*/
|
|
||||||
hasMoreResults() {
|
|
||||||
return this.queryExecutionContext.hasMoreResults();
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Fetch all pages for the query and return a single FeedResponse.
|
|
||||||
*/
|
|
||||||
async fetchAll(options) {
|
|
||||||
return withDiagnostics(async (diagnosticNode) => {
|
|
||||||
return this.fetchAllInternal(diagnosticNode, options);
|
|
||||||
}, this.clientContext);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @hidden
|
|
||||||
*/
|
|
||||||
async fetchAllInternal(diagnosticNode, options) {
|
|
||||||
this.reset();
|
|
||||||
let response;
|
let response;
|
||||||
try {
|
try {
|
||||||
response = await this.toArrayImplementation(diagnosticNode, options);
|
response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager));
|
||||||
}
|
} catch (error) {
|
||||||
catch (error) {
|
if (this.needsQueryPlan(error)) {
|
||||||
this.handleSplitError(error);
|
yield __await(this.createPipelinedExecutionContext());
|
||||||
}
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Retrieve the next batch from the feed.
|
|
||||||
*
|
|
||||||
* This may or may not fetch more pages from the backend depending on your settings
|
|
||||||
* and the type of query. Aggregate queries will generally fetch all backend pages
|
|
||||||
* before returning the first batch of responses.
|
|
||||||
*/
|
|
||||||
async fetchNext(options) {
|
|
||||||
return withDiagnostics(async (diagnosticNode) => {
|
|
||||||
// Enabling RU metrics for all the fetchNext operations
|
|
||||||
const ruConsumedManager = new RUConsumedManager();
|
|
||||||
this.queryPlanPromise = withMetadataDiagnostics(async (metadataNode) => {
|
|
||||||
return this.fetchQueryPlan(metadataNode);
|
|
||||||
}, diagnosticNode, MetadataLookUpType.QueryPlanLookUp);
|
|
||||||
if (!this.isInitialized) {
|
|
||||||
await this.init();
|
|
||||||
}
|
|
||||||
let response;
|
|
||||||
try {
|
try {
|
||||||
response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager);
|
response = yield __await(
|
||||||
|
this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager),
|
||||||
|
);
|
||||||
|
} catch (queryError) {
|
||||||
|
this.handleSplitError(queryError);
|
||||||
}
|
}
|
||||||
catch (error) {
|
} else {
|
||||||
if (this.needsQueryPlan(error)) {
|
throw error;
|
||||||
await this.createPipelinedExecutionContext();
|
}
|
||||||
try {
|
}
|
||||||
response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager);
|
const feedResponse = new FeedResponse(
|
||||||
}
|
response.result,
|
||||||
catch (queryError) {
|
response.headers,
|
||||||
this.handleSplitError(queryError);
|
this.queryExecutionContext.hasMoreResults(),
|
||||||
}
|
diagnosticNode.toDiagnostic(this.clientContext.getClientConfig()),
|
||||||
}
|
);
|
||||||
else {
|
diagnosticNode = new DiagnosticNodeInternal(
|
||||||
throw error;
|
this.clientContext.diagnosticLevel,
|
||||||
}
|
DiagnosticNodeType.CLIENT_REQUEST_NODE,
|
||||||
}
|
null,
|
||||||
return new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults(), getEmptyCosmosDiagnostics());
|
);
|
||||||
}, this.clientContext);
|
if (response.result !== undefined) {
|
||||||
|
yield yield __await(feedResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Determine if there are still remaining resources to process based on the value of the continuation token or the
|
||||||
|
* elements remaining on the current batch in the QueryIterator.
|
||||||
|
* @returns true if there is other elements to process in the QueryIterator.
|
||||||
|
*/
|
||||||
|
hasMoreResults() {
|
||||||
|
return this.queryExecutionContext.hasMoreResults();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Fetch all pages for the query and return a single FeedResponse.
|
||||||
|
*/
|
||||||
|
async fetchAll(options) {
|
||||||
|
return withDiagnostics(async (diagnosticNode) => {
|
||||||
|
return this.fetchAllInternal(diagnosticNode, options);
|
||||||
|
}, this.clientContext);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @hidden
|
||||||
|
*/
|
||||||
|
async fetchAllInternal(diagnosticNode, options) {
|
||||||
|
this.reset();
|
||||||
|
let response;
|
||||||
|
try {
|
||||||
|
response = await this.toArrayImplementation(diagnosticNode, options);
|
||||||
|
} catch (error) {
|
||||||
|
this.handleSplitError(error);
|
||||||
}
|
}
|
||||||
/**
|
return response;
|
||||||
* Reset the QueryIterator to the beginning and clear all the resources inside it
|
}
|
||||||
*/
|
/**
|
||||||
reset() {
|
* Retrieve the next batch from the feed.
|
||||||
this.queryPlanPromise = undefined;
|
*
|
||||||
this.fetchAllLastResHeaders = getInitialHeader();
|
* This may or may not fetch more pages from the backend depending on your settings
|
||||||
this.fetchAllTempResources = [];
|
* and the type of query. Aggregate queries will generally fetch all backend pages
|
||||||
this.queryExecutionContext = new DefaultQueryExecutionContext(this.options, this.fetchFunctions);
|
* before returning the first batch of responses.
|
||||||
|
*/
|
||||||
|
async fetchNext(options) {
|
||||||
|
return withDiagnostics(async (diagnosticNode) => {
|
||||||
|
// Enabling RU metrics for all the fetchNext operations
|
||||||
|
const ruConsumedManager = new RUConsumedManager();
|
||||||
|
this.queryPlanPromise = withMetadataDiagnostics(
|
||||||
|
async (metadataNode) => {
|
||||||
|
return this.fetchQueryPlan(metadataNode);
|
||||||
|
},
|
||||||
|
diagnosticNode,
|
||||||
|
MetadataLookUpType.QueryPlanLookUp,
|
||||||
|
);
|
||||||
|
if (!this.isInitialized) {
|
||||||
|
await this.init();
|
||||||
|
}
|
||||||
|
let response;
|
||||||
|
try {
|
||||||
|
response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager);
|
||||||
|
} catch (error) {
|
||||||
|
if (this.needsQueryPlan(error)) {
|
||||||
|
await this.createPipelinedExecutionContext();
|
||||||
|
try {
|
||||||
|
response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager);
|
||||||
|
} catch (queryError) {
|
||||||
|
this.handleSplitError(queryError);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new FeedResponse(
|
||||||
|
response.result,
|
||||||
|
response.headers,
|
||||||
|
this.queryExecutionContext.hasMoreResults(),
|
||||||
|
getEmptyCosmosDiagnostics(),
|
||||||
|
);
|
||||||
|
}, this.clientContext);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Reset the QueryIterator to the beginning and clear all the resources inside it
|
||||||
|
*/
|
||||||
|
reset() {
|
||||||
|
this.queryPlanPromise = undefined;
|
||||||
|
this.fetchAllLastResHeaders = getInitialHeader();
|
||||||
|
this.fetchAllTempResources = [];
|
||||||
|
this.queryExecutionContext = new DefaultQueryExecutionContext(this.options, this.fetchFunctions);
|
||||||
|
}
|
||||||
|
async toArrayImplementation(diagnosticNode, options) {
|
||||||
|
let ruConsumedManager;
|
||||||
|
if (options && options.ruCapPerOperation) {
|
||||||
|
ruConsumedManager = new RUConsumedManager();
|
||||||
}
|
}
|
||||||
async toArrayImplementation(diagnosticNode, options) {
|
this.queryPlanPromise = withMetadataDiagnostics(
|
||||||
let ruConsumedManager;
|
async (metadataNode) => {
|
||||||
if (options && options.ruCapPerOperation) {
|
return this.fetchQueryPlan(metadataNode);
|
||||||
ruConsumedManager = new RUConsumedManager();
|
},
|
||||||
}
|
diagnosticNode,
|
||||||
this.queryPlanPromise = withMetadataDiagnostics(async (metadataNode) => {
|
MetadataLookUpType.QueryPlanLookUp,
|
||||||
return this.fetchQueryPlan(metadataNode);
|
);
|
||||||
}, diagnosticNode, MetadataLookUpType.QueryPlanLookUp);
|
// this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode);
|
||||||
// this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode);
|
if (!this.isInitialized) {
|
||||||
if (!this.isInitialized) {
|
await this.init();
|
||||||
await this.init();
|
|
||||||
}
|
|
||||||
while (this.queryExecutionContext.hasMoreResults()) {
|
|
||||||
let response;
|
|
||||||
try {
|
|
||||||
response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager);
|
|
||||||
}
|
|
||||||
catch (error) {
|
|
||||||
if (this.needsQueryPlan(error)) {
|
|
||||||
await this.createPipelinedExecutionContext();
|
|
||||||
response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager);
|
|
||||||
}
|
|
||||||
else if (error.code === RUCapPerOperationExceededErrorCode && error.fetchedResults) {
|
|
||||||
error.fetchedResults.forEach((item) => {
|
|
||||||
this.fetchAllTempResources.push(item);
|
|
||||||
});
|
|
||||||
error.fetchedResults = this.fetchAllTempResources;
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const { result, headers } = response;
|
|
||||||
// concatenate the results and fetch more
|
|
||||||
mergeHeaders(this.fetchAllLastResHeaders, headers);
|
|
||||||
if (result !== undefined) {
|
|
||||||
if (this.nonStreamingOrderBy &&
|
|
||||||
typeof result === "object" &&
|
|
||||||
Object.keys(result).length === 0) {
|
|
||||||
// ignore empty results from NonStreamingOrderBy Endpoint components.
|
|
||||||
}
|
|
||||||
else
|
|
||||||
this.fetchAllTempResources.push(result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new FeedResponse(this.fetchAllTempResources, this.fetchAllLastResHeaders, this.queryExecutionContext.hasMoreResults(), getEmptyCosmosDiagnostics());
|
|
||||||
}
|
}
|
||||||
async createPipelinedExecutionContext() {
|
while (this.queryExecutionContext.hasMoreResults()) {
|
||||||
const queryPlanResponse = await this.queryPlanPromise;
|
let response;
|
||||||
// We always coerce queryPlanPromise to resolved. So if it errored, we need to manually inspect the resolved value
|
try {
|
||||||
if (queryPlanResponse instanceof Error) {
|
response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager);
|
||||||
throw queryPlanResponse;
|
} catch (error) {
|
||||||
|
if (this.needsQueryPlan(error)) {
|
||||||
|
await this.createPipelinedExecutionContext();
|
||||||
|
response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager);
|
||||||
|
} else if (error.code === RUCapPerOperationExceededErrorCode && error.fetchedResults) {
|
||||||
|
error.fetchedResults.forEach((item) => {
|
||||||
|
this.fetchAllTempResources.push(item);
|
||||||
|
});
|
||||||
|
error.fetchedResults = this.fetchAllTempResources;
|
||||||
|
throw error;
|
||||||
|
} else {
|
||||||
|
throw error;
|
||||||
}
|
}
|
||||||
const queryPlan = queryPlanResponse.result;
|
}
|
||||||
const queryInfo = queryPlan.queryInfo;
|
const { result, headers } = response;
|
||||||
//this.nonStreamingOrderBy = queryInfo.hasNonStreamingOrderBy ? true : false;
|
// concatenate the results and fetch more
|
||||||
this.nonStreamingOrderBy = false;
|
mergeHeaders(this.fetchAllLastResHeaders, headers);
|
||||||
/*if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) {
|
if (result !== undefined) {
|
||||||
|
if (this.nonStreamingOrderBy && typeof result === "object" && Object.keys(result).length === 0) {
|
||||||
|
// ignore empty results from NonStreamingOrderBy Endpoint components.
|
||||||
|
} else this.fetchAllTempResources.push(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new FeedResponse(
|
||||||
|
this.fetchAllTempResources,
|
||||||
|
this.fetchAllLastResHeaders,
|
||||||
|
this.queryExecutionContext.hasMoreResults(),
|
||||||
|
getEmptyCosmosDiagnostics(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
async createPipelinedExecutionContext() {
|
||||||
|
const queryPlanResponse = await this.queryPlanPromise;
|
||||||
|
// We always coerce queryPlanPromise to resolved. So if it errored, we need to manually inspect the resolved value
|
||||||
|
if (queryPlanResponse instanceof Error) {
|
||||||
|
throw queryPlanResponse;
|
||||||
|
}
|
||||||
|
const queryPlan = queryPlanResponse.result;
|
||||||
|
const queryInfo = queryPlan.queryInfo;
|
||||||
|
//this.nonStreamingOrderBy = queryInfo.hasNonStreamingOrderBy ? true : false;
|
||||||
|
this.nonStreamingOrderBy = false;
|
||||||
|
/*if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) {
|
||||||
throw new Error("Aggregate queries must use the VALUE keyword");
|
throw new Error("Aggregate queries must use the VALUE keyword");
|
||||||
} */
|
} */
|
||||||
this.queryExecutionContext = new PipelinedQueryExecutionContext(this.clientContext, this.resourceLink, this.query, this.options, queryPlan);
|
this.queryExecutionContext = new PipelinedQueryExecutionContext(
|
||||||
|
this.clientContext,
|
||||||
|
this.resourceLink,
|
||||||
|
this.query,
|
||||||
|
this.options,
|
||||||
|
queryPlan,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
async fetchQueryPlan(diagnosticNode) {
|
||||||
|
if (!this.queryPlanPromise && this.resourceType === ResourceType.item) {
|
||||||
|
return this.clientContext
|
||||||
|
.getQueryPlan(
|
||||||
|
getPathFromLink(this.resourceLink) + "/docs",
|
||||||
|
ResourceType.item,
|
||||||
|
this.resourceLink,
|
||||||
|
this.query,
|
||||||
|
this.options,
|
||||||
|
diagnosticNode,
|
||||||
|
)
|
||||||
|
.catch((error) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored.
|
||||||
}
|
}
|
||||||
async fetchQueryPlan(diagnosticNode) {
|
return this.queryPlanPromise;
|
||||||
if (!this.queryPlanPromise && this.resourceType === ResourceType.item) {
|
}
|
||||||
return this.clientContext
|
needsQueryPlan(error) {
|
||||||
.getQueryPlan(getPathFromLink(this.resourceLink) + "/docs", ResourceType.item, this.resourceLink, this.query, this.options, diagnosticNode)
|
var _a;
|
||||||
.catch((error) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored.
|
let needsQueryPlanValue = false;
|
||||||
}
|
if (
|
||||||
return this.queryPlanPromise;
|
((_a = error.body) === null || _a === void 0 ? void 0 : _a.additionalErrorInfo) ||
|
||||||
|
error.message.includes("Cross partition query only supports")
|
||||||
|
) {
|
||||||
|
needsQueryPlanValue = error.code === StatusCodes.BadRequest && this.resourceType === ResourceType.item;
|
||||||
}
|
}
|
||||||
needsQueryPlan(error) {
|
return needsQueryPlanValue;
|
||||||
var _a;
|
}
|
||||||
let needsQueryPlanValue = false;
|
async init() {
|
||||||
if (((_a = error.body) === null || _a === void 0 ? void 0 : _a.additionalErrorInfo) ||
|
if (this.isInitialized === true) {
|
||||||
error.message.includes("Cross partition query only supports")) {
|
return;
|
||||||
needsQueryPlanValue =
|
|
||||||
error.code === StatusCodes.BadRequest && this.resourceType === ResourceType.item;
|
|
||||||
}
|
|
||||||
return needsQueryPlanValue;
|
|
||||||
}
|
}
|
||||||
async init() {
|
if (this.initPromise === undefined) {
|
||||||
if (this.isInitialized === true) {
|
this.initPromise = this._init();
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.initPromise === undefined) {
|
|
||||||
this.initPromise = this._init();
|
|
||||||
}
|
|
||||||
return this.initPromise;
|
|
||||||
}
|
}
|
||||||
async _init() {
|
return this.initPromise;
|
||||||
if (this.options.forceQueryPlan === true && this.resourceType === ResourceType.item) {
|
}
|
||||||
await this.createPipelinedExecutionContext();
|
async _init() {
|
||||||
}
|
if (this.options.forceQueryPlan === true && this.resourceType === ResourceType.item) {
|
||||||
this.isInitialized = true;
|
await this.createPipelinedExecutionContext();
|
||||||
}
|
}
|
||||||
handleSplitError(err) {
|
this.isInitialized = true;
|
||||||
if (err.code === 410) {
|
}
|
||||||
const error = new Error("Encountered partition split and could not recover. This request is retryable");
|
handleSplitError(err) {
|
||||||
error.code = 503;
|
if (err.code === 410) {
|
||||||
error.originalError = err;
|
const error = new Error("Encountered partition split and could not recover. This request is retryable");
|
||||||
throw error;
|
error.code = 503;
|
||||||
}
|
error.originalError = err;
|
||||||
else {
|
throw error;
|
||||||
throw err;
|
} else {
|
||||||
}
|
throw err;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//# sourceMappingURL=queryIterator.js.map
|
//# sourceMappingURL=queryIterator.js.map
|
||||||
|
|||||||
56
package-lock.json
generated
56
package-lock.json
generated
@@ -209,6 +209,57 @@
|
|||||||
"extraneous": true,
|
"extraneous": true,
|
||||||
"license": "ISC"
|
"license": "ISC"
|
||||||
},
|
},
|
||||||
|
"local_dependencies/@azure/cosmos": {
|
||||||
|
"version": "4.0.1-beta.3",
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"@azure/abort-controller": "^1.0.0",
|
||||||
|
"@azure/core-auth": "^1.3.0",
|
||||||
|
"@azure/core-rest-pipeline": "^1.2.0",
|
||||||
|
"@azure/core-tracing": "^1.0.0",
|
||||||
|
"debug": "^4.1.1",
|
||||||
|
"fast-json-stable-stringify": "^2.1.0",
|
||||||
|
"jsbi": "^3.1.3",
|
||||||
|
"node-abort-controller": "^3.0.0",
|
||||||
|
"priorityqueuejs": "^2.0.0",
|
||||||
|
"semaphore": "^1.0.5",
|
||||||
|
"tslib": "^2.2.0",
|
||||||
|
"universal-user-agent": "^6.0.0",
|
||||||
|
"uuid": "^8.3.0"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=18.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"local_dependencies/@azure/cosmos/node_modules/tslib": {
|
||||||
|
"version": "2.7.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.7.0.tgz",
|
||||||
|
"integrity": "sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA=="
|
||||||
|
},
|
||||||
|
"local_dependencies/cosmos": {
|
||||||
|
"name": "@azure/cosmos",
|
||||||
|
"version": "4.0.1-beta.3",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"@azure/abort-controller": "^1.0.0",
|
||||||
|
"@azure/core-auth": "^1.3.0",
|
||||||
|
"@azure/core-rest-pipeline": "^1.2.0",
|
||||||
|
"@azure/core-tracing": "^1.0.0",
|
||||||
|
"debug": "^4.1.1",
|
||||||
|
"fast-json-stable-stringify": "^2.1.0",
|
||||||
|
"jsbi": "^3.1.3",
|
||||||
|
"node-abort-controller": "^3.0.0",
|
||||||
|
"priorityqueuejs": "^2.0.0",
|
||||||
|
"semaphore": "^1.0.5",
|
||||||
|
"tslib": "^2.2.0",
|
||||||
|
"universal-user-agent": "^6.0.0",
|
||||||
|
"uuid": "^8.3.0"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=18.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/@aashutoshrathi/word-wrap": {
|
"node_modules/@aashutoshrathi/word-wrap": {
|
||||||
"version": "1.2.6",
|
"version": "1.2.6",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
@@ -429,11 +480,6 @@
|
|||||||
"node": ">=0.10.0"
|
"node": ">=0.10.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@azure/cosmos/node_modules/tslib": {
|
|
||||||
"version": "2.6.2",
|
|
||||||
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz",
|
|
||||||
"integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q=="
|
|
||||||
},
|
|
||||||
"node_modules/@azure/identity": {
|
"node_modules/@azure/identity": {
|
||||||
"version": "1.5.2",
|
"version": "1.5.2",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ export const CassandraProxyOutboundIPs: { [key: string]: string[] } = {
|
|||||||
[CassandraProxyEndpoints.Mooncake]: ["40.73.99.146", "143.64.62.47"],
|
[CassandraProxyEndpoints.Mooncake]: ["40.73.99.146", "143.64.62.47"],
|
||||||
};
|
};
|
||||||
|
|
||||||
export const allowedEmulatorEndpoints: ReadonlyArray<string> = ["https://localhost:8081","http://localhost:8081"];
|
export const allowedEmulatorEndpoints: ReadonlyArray<string> = ["https://localhost:8081", "http://localhost:8081"];
|
||||||
|
|
||||||
export const allowedMongoBackendEndpoints: ReadonlyArray<string> = ["https://localhost:1234"];
|
export const allowedMongoBackendEndpoints: ReadonlyArray<string> = ["https://localhost:1234"];
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user