diff --git a/local_dependencies/@azure/cosmos/dist-esm/src/queryExecutionContext/parallelQueryExecutionContextBase.js b/local_dependencies/@azure/cosmos/dist-esm/src/queryExecutionContext/parallelQueryExecutionContextBase.js index 7d823212a..406287266 100644 --- a/local_dependencies/@azure/cosmos/dist-esm/src/queryExecutionContext/parallelQueryExecutionContextBase.js +++ b/local_dependencies/@azure/cosmos/dist-esm/src/queryExecutionContext/parallelQueryExecutionContextBase.js @@ -7,7 +7,7 @@ import { StatusCodes, SubStatusCodes } from "../common"; import { MetadataLookUpType } from "../CosmosDiagnostics"; import { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel"; import { DiagnosticNodeInternal, DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal"; -import { RUCapPerOperationExceededErrorCode, } from "../request/RUCapPerOperationExceededError"; +import { RUCapPerOperationExceededErrorCode } from "../request/RUCapPerOperationExceededError"; import { QueryRange } from "../routing/QueryRange"; import { SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider"; import { addDignosticChild } from "../utils/diagnostics"; @@ -18,461 +18,474 @@ const logger = createClientLogger("parallelQueryExecutionContextBase"); /** @hidden */ export var ParallelQueryExecutionContextBaseStates; (function (ParallelQueryExecutionContextBaseStates) { - ParallelQueryExecutionContextBaseStates["started"] = "started"; - ParallelQueryExecutionContextBaseStates["inProgress"] = "inProgress"; - ParallelQueryExecutionContextBaseStates["ended"] = "ended"; + ParallelQueryExecutionContextBaseStates["started"] = "started"; + ParallelQueryExecutionContextBaseStates["inProgress"] = "inProgress"; + ParallelQueryExecutionContextBaseStates["ended"] = "ended"; })(ParallelQueryExecutionContextBaseStates || (ParallelQueryExecutionContextBaseStates = {})); /** @hidden */ export class ParallelQueryExecutionContextBase { - /** - * Provides the ParallelQueryExecutionContextBase. - * This is the base class that ParallelQueryExecutionContext and OrderByQueryExecutionContext will derive from. - * - * When handling a parallelized query, it instantiates one instance of - * DocumentProcuder per target partition key range and aggregates the result of each. - * - * @param clientContext - The service endpoint to use to create the client. - * @param collectionLink - The Collection Link - * @param options - Represents the feed options. - * @param partitionedQueryExecutionInfo - PartitionedQueryExecutionInfo - * @hidden - */ - constructor(clientContext, collectionLink, query, options, partitionedQueryExecutionInfo) { - this.clientContext = clientContext; - this.collectionLink = collectionLink; - this.query = query; - this.options = options; - this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo; - this.initializedPriorityQueue = false; - this.ruCapExceededError = undefined; - this.clientContext = clientContext; - this.collectionLink = collectionLink; - this.query = query; - this.options = options; - this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo; - this.diagnosticNodeWrapper = { - consumed: false, - diagnosticNode: new DiagnosticNodeInternal(clientContext.diagnosticLevel, DiagnosticNodeType.PARALLEL_QUERY_NODE, null), - }; - this.diagnosticNodeWrapper.diagnosticNode.addData({ stateful: true }); - this.err = undefined; - this.state = ParallelQueryExecutionContextBase.STATES.started; - this.routingProvider = new SmartRoutingMapProvider(this.clientContext); - this.sortOrders = this.partitionedQueryExecutionInfo.queryInfo.orderBy; - this.requestContinuation = options ? options.continuationToken || options.continuation : null; - // response headers of undergoing operation - this.respHeaders = getInitialHeader(); - // Make priority queue for documentProducers - // The comparator is supplied by the derived class - this.orderByPQ = new PriorityQueue((a, b) => this.documentProducerComparator(b, a)); - this.nextItemfetchSemaphore = semaphore(1); - } - _mergeWithActiveResponseHeaders(headers) { - mergeHeaders(this.respHeaders, headers); - } - _getAndResetActiveResponseHeaders() { - const ret = this.respHeaders; - this.respHeaders = getInitialHeader(); - return ret; - } - getDiagnosticNode() { - return this.diagnosticNodeWrapper.diagnosticNode; - } - async _onTargetPartitionRanges() { - // invokes the callback when the target partition ranges are ready - const parsedRanges = this.partitionedQueryExecutionInfo.queryRanges; - const queryRanges = parsedRanges.map((item) => QueryRange.parseFromDict(item)); - return this.routingProvider.getOverlappingRanges(this.collectionLink, queryRanges, this.getDiagnosticNode()); - } - /** - * Gets the replacement ranges for a partitionkeyrange that has been split - */ - async _getReplacementPartitionKeyRanges(documentProducer) { - const partitionKeyRange = documentProducer.targetPartitionKeyRange; - // Download the new routing map - this.routingProvider = new SmartRoutingMapProvider(this.clientContext); - // Get the queryRange that relates to this partitionKeyRange - const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange); - return this.routingProvider.getOverlappingRanges(this.collectionLink, [queryRange], this.getDiagnosticNode()); - } - // TODO: P0 Code smell - can barely tell what this is doing - /** - * Removes the current document producer from the priqueue, - * replaces that document producer with child document producers, - * then reexecutes the originFunction with the corrrected executionContext - */ - async _repairExecutionContext(diagnosticNode, originFunction) { - // TODO: any - // Get the replacement ranges - // Removing the invalid documentProducer from the orderByPQ - const parentDocumentProducer = this.orderByPQ.deq(); + /** + * Provides the ParallelQueryExecutionContextBase. + * This is the base class that ParallelQueryExecutionContext and OrderByQueryExecutionContext will derive from. + * + * When handling a parallelized query, it instantiates one instance of + * DocumentProcuder per target partition key range and aggregates the result of each. + * + * @param clientContext - The service endpoint to use to create the client. + * @param collectionLink - The Collection Link + * @param options - Represents the feed options. + * @param partitionedQueryExecutionInfo - PartitionedQueryExecutionInfo + * @hidden + */ + constructor(clientContext, collectionLink, query, options, partitionedQueryExecutionInfo) { + this.clientContext = clientContext; + this.collectionLink = collectionLink; + this.query = query; + this.options = options; + this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo; + this.initializedPriorityQueue = false; + this.ruCapExceededError = undefined; + this.clientContext = clientContext; + this.collectionLink = collectionLink; + this.query = query; + this.options = options; + this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo; + this.diagnosticNodeWrapper = { + consumed: false, + diagnosticNode: new DiagnosticNodeInternal( + clientContext.diagnosticLevel, + DiagnosticNodeType.PARALLEL_QUERY_NODE, + null, + ), + }; + this.diagnosticNodeWrapper.diagnosticNode.addData({ stateful: true }); + this.err = undefined; + this.state = ParallelQueryExecutionContextBase.STATES.started; + this.routingProvider = new SmartRoutingMapProvider(this.clientContext); + this.sortOrders = this.partitionedQueryExecutionInfo.queryInfo.orderBy; + this.requestContinuation = options ? options.continuationToken || options.continuation : null; + // response headers of undergoing operation + this.respHeaders = getInitialHeader(); + // Make priority queue for documentProducers + // The comparator is supplied by the derived class + this.orderByPQ = new PriorityQueue((a, b) => this.documentProducerComparator(b, a)); + this.nextItemfetchSemaphore = semaphore(1); + } + _mergeWithActiveResponseHeaders(headers) { + mergeHeaders(this.respHeaders, headers); + } + _getAndResetActiveResponseHeaders() { + const ret = this.respHeaders; + this.respHeaders = getInitialHeader(); + return ret; + } + getDiagnosticNode() { + return this.diagnosticNodeWrapper.diagnosticNode; + } + async _onTargetPartitionRanges() { + // invokes the callback when the target partition ranges are ready + const parsedRanges = this.partitionedQueryExecutionInfo.queryRanges; + const queryRanges = parsedRanges.map((item) => QueryRange.parseFromDict(item)); + return this.routingProvider.getOverlappingRanges(this.collectionLink, queryRanges, this.getDiagnosticNode()); + } + /** + * Gets the replacement ranges for a partitionkeyrange that has been split + */ + async _getReplacementPartitionKeyRanges(documentProducer) { + const partitionKeyRange = documentProducer.targetPartitionKeyRange; + // Download the new routing map + this.routingProvider = new SmartRoutingMapProvider(this.clientContext); + // Get the queryRange that relates to this partitionKeyRange + const queryRange = QueryRange.parsePartitionKeyRange(partitionKeyRange); + return this.routingProvider.getOverlappingRanges(this.collectionLink, [queryRange], this.getDiagnosticNode()); + } + // TODO: P0 Code smell - can barely tell what this is doing + /** + * Removes the current document producer from the priqueue, + * replaces that document producer with child document producers, + * then reexecutes the originFunction with the corrrected executionContext + */ + async _repairExecutionContext(diagnosticNode, originFunction) { + // TODO: any + // Get the replacement ranges + // Removing the invalid documentProducer from the orderByPQ + const parentDocumentProducer = this.orderByPQ.deq(); + try { + const replacementPartitionKeyRanges = await this._getReplacementPartitionKeyRanges(parentDocumentProducer); + const replacementDocumentProducers = []; + // Create the replacement documentProducers + replacementPartitionKeyRanges.forEach((partitionKeyRange) => { + // Create replacment document producers with the parent's continuationToken + const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext( + partitionKeyRange, + parentDocumentProducer.continuationToken, + ); + replacementDocumentProducers.push(replacementDocumentProducer); + }); + // We need to check if the documentProducers even has anything left to fetch from before enqueing them + const checkAndEnqueueDocumentProducer = async (documentProducerToCheck, checkNextDocumentProducerCallback) => { try { - const replacementPartitionKeyRanges = await this._getReplacementPartitionKeyRanges(parentDocumentProducer); - const replacementDocumentProducers = []; - // Create the replacement documentProducers - replacementPartitionKeyRanges.forEach((partitionKeyRange) => { - // Create replacment document producers with the parent's continuationToken - const replacementDocumentProducer = this._createTargetPartitionQueryExecutionContext(partitionKeyRange, parentDocumentProducer.continuationToken); - replacementDocumentProducers.push(replacementDocumentProducer); - }); - // We need to check if the documentProducers even has anything left to fetch from before enqueing them - const checkAndEnqueueDocumentProducer = async (documentProducerToCheck, checkNextDocumentProducerCallback) => { - try { - const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode); - if (afterItem === undefined) { - // no more results left in this document producer, so we don't enqueue it - } - else { - // Safe to put document producer back in the queue - this.orderByPQ.enq(documentProducerToCheck); - } - await checkNextDocumentProducerCallback(); - } - catch (err) { - this.err = err; - return; - } - }; - const checkAndEnqueueDocumentProducers = async (rdp) => { - if (rdp.length > 0) { - // We still have a replacementDocumentProducer to check - const replacementDocumentProducer = rdp.shift(); - await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => { - await checkAndEnqueueDocumentProducers(rdp); - }); - } - else { - // reexecutes the originFunction with the corrrected executionContext - return originFunction(); - } - }; - // Invoke the recursive function to get the ball rolling - await checkAndEnqueueDocumentProducers(replacementDocumentProducers); + const { result: afterItem } = await documentProducerToCheck.current(diagnosticNode); + if (afterItem === undefined) { + // no more results left in this document producer, so we don't enqueue it + } else { + // Safe to put document producer back in the queue + this.orderByPQ.enq(documentProducerToCheck); + } + await checkNextDocumentProducerCallback(); + } catch (err) { + this.err = err; + return; } - catch (err) { + }; + const checkAndEnqueueDocumentProducers = async (rdp) => { + if (rdp.length > 0) { + // We still have a replacementDocumentProducer to check + const replacementDocumentProducer = rdp.shift(); + await checkAndEnqueueDocumentProducer(replacementDocumentProducer, async () => { + await checkAndEnqueueDocumentProducers(rdp); + }); + } else { + // reexecutes the originFunction with the corrrected executionContext + return originFunction(); + } + }; + // Invoke the recursive function to get the ball rolling + await checkAndEnqueueDocumentProducers(replacementDocumentProducers); + } catch (err) { + this.err = err; + throw err; + } + } + static _needPartitionKeyRangeCacheRefresh(error) { + // TODO: any error + return ( + error.code === StatusCodes.Gone && + "substatus" in error && + error["substatus"] === SubStatusCodes.PartitionKeyRangeGone + ); + } + /** + * Checks to see if the executionContext needs to be repaired. + * if so it repairs the execution context and executes the ifCallback, + * else it continues with the current execution context and executes the elseCallback + */ + async _repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback, operationOptions, ruConsumedManager) { + const documentProducer = this.orderByPQ.peek(); + // Check if split happened + try { + await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager); + elseCallback(); + } catch (err) { + if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { + // Split has happened so we need to repair execution context before continueing + return addDignosticChild( + (childNode) => this._repairExecutionContext(childNode, ifCallback), + diagnosticNode, + DiagnosticNodeType.QUERY_REPAIR_NODE, + ); + } else { + // Something actually bad happened ... + this.err = err; + throw err; + } + } + } + /** + * Fetches the next element in the ParallelQueryExecutionContextBase. + */ + async nextItem(diagnosticNode, operationOptions, ruConsumedManager) { + if (this.err) { + // if there is a prior error return error + throw this.err; + } + return new Promise((resolve, reject) => { + this.nextItemfetchSemaphore.take(async () => { + // document producer queue initilization + if (!this.initializedPriorityQueue) { + try { + await this._createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager); + this.initializedPriorityQueue = true; + } catch (err) { this.err = err; - throw err; + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + reject(this.err); + return; + } } - } - static _needPartitionKeyRangeCacheRefresh(error) { - // TODO: any error - return (error.code === StatusCodes.Gone && - "substatus" in error && - error["substatus"] === SubStatusCodes.PartitionKeyRangeGone); - } - /** - * Checks to see if the executionContext needs to be repaired. - * if so it repairs the execution context and executes the ifCallback, - * else it continues with the current execution context and executes the elseCallback - */ - async _repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback, operationOptions, ruConsumedManager) { - const documentProducer = this.orderByPQ.peek(); - // Check if split happened - try { - await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager); - elseCallback(); + if (!this.diagnosticNodeWrapper.consumed) { + diagnosticNode.addChildNode( + this.diagnosticNodeWrapper.diagnosticNode, + CosmosDbDiagnosticLevel.debug, + MetadataLookUpType.QueryPlanLookUp, + ); + this.diagnosticNodeWrapper.diagnosticNode = undefined; + this.diagnosticNodeWrapper.consumed = true; + } else { + this.diagnosticNodeWrapper.diagnosticNode = diagnosticNode; } - catch (err) { - if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { - // Split has happened so we need to repair execution context before continueing - return addDignosticChild((childNode) => this._repairExecutionContext(childNode, ifCallback), diagnosticNode, DiagnosticNodeType.QUERY_REPAIR_NODE); - } - else { - // Something actually bad happened ... - this.err = err; - throw err; - } - } - } - /** - * Fetches the next element in the ParallelQueryExecutionContextBase. - */ - async nextItem(diagnosticNode, operationOptions, ruConsumedManager) { + // NOTE: lock must be released before invoking quitting if (this.err) { - // if there is a prior error return error - throw this.err; + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; } - return new Promise((resolve, reject) => { - this.nextItemfetchSemaphore.take(async () => { - // document producer queue initilization - if (!this.initializedPriorityQueue) { - try { - await this._createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager); - this.initializedPriorityQueue = true; - } - catch (err) { - this.err = err; - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - reject(this.err); - return; - } - } - if (!this.diagnosticNodeWrapper.consumed) { - diagnosticNode.addChildNode(this.diagnosticNodeWrapper.diagnosticNode, CosmosDbDiagnosticLevel.debug, MetadataLookUpType.QueryPlanLookUp); - this.diagnosticNodeWrapper.diagnosticNode = undefined; - this.diagnosticNodeWrapper.consumed = true; - } - else { - this.diagnosticNodeWrapper.diagnosticNode = diagnosticNode; - } - // NOTE: lock must be released before invoking quitting - if (this.err) { - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - this.err.headers = this._getAndResetActiveResponseHeaders(); - reject(this.err); - return; - } - if (this.orderByPQ.size() === 0) { - // there is no more results - this.state = ParallelQueryExecutionContextBase.STATES.ended; - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - return resolve({ - result: undefined, - headers: this._getAndResetActiveResponseHeaders(), - }); - } - const ifCallback = () => { - // Release the semaphore to avoid deadlock - this.nextItemfetchSemaphore.leave(); - // Reexcute the function - return resolve(this.nextItem(diagnosticNode, operationOptions, ruConsumedManager)); - }; - const elseCallback = async () => { - let documentProducer; - try { - documentProducer = this.orderByPQ.deq(); - } - catch (e) { - // if comparing elements of the priority queue throws exception - // set that error and return error - this.err = e; - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - this.err.headers = this._getAndResetActiveResponseHeaders(); - reject(this.err); - return; - } - let item; - let headers; - try { - const response = await documentProducer.nextItem(diagnosticNode, operationOptions, ruConsumedManager); - item = response.result; - headers = response.headers; - this._mergeWithActiveResponseHeaders(headers); - if (item === undefined) { - // this should never happen - // because the documentProducer already has buffered an item - // assert item !== undefined - this.err = new Error(`Extracted DocumentProducer from the priority queue \ - doesn't have any buffered item!`); - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - return resolve({ - result: undefined, - headers: this._getAndResetActiveResponseHeaders(), - }); - } - } - catch (err) { - if (err.code === RUCapPerOperationExceededErrorCode) { - this._updateErrorObjectWithBufferedData(err); - this.err = err; - } - else { - this.err = new Error(`Extracted DocumentProducer from the priority queue fails to get the \ - buffered item. Due to ${JSON.stringify(err)}`); - this.err.headers = this._getAndResetActiveResponseHeaders(); - } - // release the lock before invoking callback - this.nextItemfetchSemaphore.leave(); - reject(this.err); - return; - } - // we need to put back the document producer to the queue if it has more elements. - // the lock will be released after we know document producer must be put back in the queue or not - try { - const { result: afterItem, headers: otherHeaders } = await documentProducer.current(diagnosticNode, operationOptions, ruConsumedManager); - this._mergeWithActiveResponseHeaders(otherHeaders); - if (afterItem === undefined) { - // no more results is left in this document producer - } - else { - try { - const headItem = documentProducer.fetchResults[0]; - if (typeof headItem === "undefined") { - throw new Error("Extracted DocumentProducer from PQ is invalid state with no result!"); - } - this.orderByPQ.enq(documentProducer); - } - catch (e) { - // if comparing elements in priority queue throws exception - // set error - this.err = e; - } - } - } - catch (err) { - if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { - // We want the document producer enqueued - // So that later parts of the code can repair the execution context - this.orderByPQ.enq(documentProducer); - } - else if (err.code === RUCapPerOperationExceededErrorCode) { - this._updateErrorObjectWithBufferedData(err); - this.err = err; - reject(this.err); - } - else { - // Something actually bad happened - this.err = err; - reject(this.err); - } - } - finally { - // release the lock before returning - this.nextItemfetchSemaphore.leave(); - } - // invoke the callback on the item - return resolve({ - result: item, - headers: this._getAndResetActiveResponseHeaders(), - }); - }; - this._repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback).catch(reject); - }); - }); - } - _updateErrorObjectWithBufferedData(err) { - this.orderByPQ.forEach((dp) => { - const bufferedItems = dp.peekBufferedItems(); - err.fetchedResults.push(...bufferedItems); - }); - } - /** - * Determine if there are still remaining resources to processs based on the value of the continuation - * token or the elements remaining on the current batch in the QueryIterator. - * @returns true if there is other elements to process in the ParallelQueryExecutionContextBase. - */ - hasMoreResults() { - return !(this.state === ParallelQueryExecutionContextBase.STATES.ended || this.err !== undefined); - } - /** - * Creates document producers - */ - _createTargetPartitionQueryExecutionContext(partitionKeyTargetRange, continuationToken) { - // TODO: any - // creates target partition range Query Execution Context - let rewrittenQuery = this.partitionedQueryExecutionInfo.queryInfo.rewrittenQuery; - let sqlQuerySpec; - const query = this.query; - if (typeof query === "string") { - sqlQuerySpec = { query }; + if (this.orderByPQ.size() === 0) { + // there is no more results + this.state = ParallelQueryExecutionContextBase.STATES.ended; + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + return resolve({ + result: undefined, + headers: this._getAndResetActiveResponseHeaders(), + }); } - else { - sqlQuerySpec = query; - } - const formatPlaceHolder = "{documentdb-formattableorderbyquery-filter}"; - if (rewrittenQuery) { - sqlQuerySpec = JSON.parse(JSON.stringify(sqlQuerySpec)); - // We hardcode the formattable filter to true for now - rewrittenQuery = rewrittenQuery.replace(formatPlaceHolder, "true"); - sqlQuerySpec["query"] = rewrittenQuery; - } - const options = Object.assign({}, this.options); - options.continuationToken = continuationToken; - return new DocumentProducer(this.clientContext, this.collectionLink, sqlQuerySpec, partitionKeyTargetRange, options); - } - async _createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager) { - try { - const targetPartitionRanges = await this._onTargetPartitionRanges(); - const maxDegreeOfParallelism = this.options.maxDegreeOfParallelism === undefined || this.options.maxDegreeOfParallelism < 1 - ? targetPartitionRanges.length - : Math.min(this.options.maxDegreeOfParallelism, targetPartitionRanges.length); - logger.info("Query starting against " + - targetPartitionRanges.length + - " ranges with parallelism of " + - maxDegreeOfParallelism); - let filteredPartitionKeyRanges = []; - // The document producers generated from filteredPartitionKeyRanges - const targetPartitionQueryExecutionContextList = []; - if (this.requestContinuation) { - throw new Error("Continuation tokens are not yet supported for cross partition queries"); - } - else { - filteredPartitionKeyRanges = targetPartitionRanges; - } - // Create one documentProducer for each partitionTargetRange - filteredPartitionKeyRanges.forEach((partitionTargetRange) => { - // TODO: any partitionTargetRange - // no async callback - targetPartitionQueryExecutionContextList.push(this._createTargetPartitionQueryExecutionContext(partitionTargetRange)); - }); - // Fill up our priority queue with documentProducers - let inProgressPromises = []; - for (const documentProducer of targetPartitionQueryExecutionContextList) { - // Don't enqueue any new promise if RU cap exceeded - if (this.ruCapExceededError) { - break; - } - const promise = this._processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager); - inProgressPromises.push(promise); - // Limit concurrent executions - if (inProgressPromises.length === maxDegreeOfParallelism) { - await Promise.all(inProgressPromises); - inProgressPromises = []; - } - } - // Wait for all promises to complete - await Promise.all(inProgressPromises); - if (this.err) { - if (this.ruCapExceededError) { - // merge the buffered items - this.orderByPQ.forEach((dp) => { - const bufferedItems = dp.peekBufferedItems(); - this.ruCapExceededError.fetchedResults.push(...bufferedItems); - }); - throw this.ruCapExceededError; - } - throw this.err; - } - } - catch (err) { - this.err = err; - throw err; - } - } - async _processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager) { - try { - const { result: document, headers } = await documentProducer.current(this.getDiagnosticNode(), operationOptions, ruConsumedManager); + const ifCallback = () => { + // Release the semaphore to avoid deadlock + this.nextItemfetchSemaphore.leave(); + // Reexcute the function + return resolve(this.nextItem(diagnosticNode, operationOptions, ruConsumedManager)); + }; + const elseCallback = async () => { + let documentProducer; + try { + documentProducer = this.orderByPQ.deq(); + } catch (e) { + // if comparing elements of the priority queue throws exception + // set that error and return error + this.err = e; + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + this.err.headers = this._getAndResetActiveResponseHeaders(); + reject(this.err); + return; + } + let item; + let headers; + try { + const response = await documentProducer.nextItem(diagnosticNode, operationOptions, ruConsumedManager); + item = response.result; + headers = response.headers; this._mergeWithActiveResponseHeaders(headers); - if (document !== undefined) { - this.orderByPQ.enq(documentProducer); + if (item === undefined) { + // this should never happen + // because the documentProducer already has buffered an item + // assert item !== undefined + this.err = new Error(`Extracted DocumentProducer from the priority queue \ + doesn't have any buffered item!`); + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + return resolve({ + result: undefined, + headers: this._getAndResetActiveResponseHeaders(), + }); } - } - catch (err) { - this._mergeWithActiveResponseHeaders(err.headers); - this.err = err; + } catch (err) { if (err.code === RUCapPerOperationExceededErrorCode) { - // would be halting further execution of other promises - if (!this.ruCapExceededError) { - this.ruCapExceededError = err; - } - else { - // merge the buffered items - if (err.fetchedResults) { - this.ruCapExceededError.fetchedResults.push(...err.fetchedResults); - } - } + this._updateErrorObjectWithBufferedData(err); + this.err = err; + } else { + this.err = new Error(`Extracted DocumentProducer from the priority queue fails to get the \ + buffered item. Due to ${JSON.stringify(err)}`); + this.err.headers = this._getAndResetActiveResponseHeaders(); } - else { - throw err; + // release the lock before invoking callback + this.nextItemfetchSemaphore.leave(); + reject(this.err); + return; + } + // we need to put back the document producer to the queue if it has more elements. + // the lock will be released after we know document producer must be put back in the queue or not + try { + const { result: afterItem, headers: otherHeaders } = await documentProducer.current( + diagnosticNode, + operationOptions, + ruConsumedManager, + ); + this._mergeWithActiveResponseHeaders(otherHeaders); + if (afterItem === undefined) { + // no more results is left in this document producer + } else { + try { + const headItem = documentProducer.fetchResults[0]; + if (typeof headItem === "undefined") { + throw new Error("Extracted DocumentProducer from PQ is invalid state with no result!"); + } + this.orderByPQ.enq(documentProducer); + } catch (e) { + // if comparing elements in priority queue throws exception + // set error + this.err = e; + } } - } - return; + } catch (err) { + if (ParallelQueryExecutionContextBase._needPartitionKeyRangeCacheRefresh(err)) { + // We want the document producer enqueued + // So that later parts of the code can repair the execution context + this.orderByPQ.enq(documentProducer); + } else if (err.code === RUCapPerOperationExceededErrorCode) { + this._updateErrorObjectWithBufferedData(err); + this.err = err; + reject(this.err); + } else { + // Something actually bad happened + this.err = err; + reject(this.err); + } + } finally { + // release the lock before returning + this.nextItemfetchSemaphore.leave(); + } + // invoke the callback on the item + return resolve({ + result: item, + headers: this._getAndResetActiveResponseHeaders(), + }); + }; + this._repairExecutionContextIfNeeded(diagnosticNode, ifCallback, elseCallback).catch(reject); + }); + }); + } + _updateErrorObjectWithBufferedData(err) { + this.orderByPQ.forEach((dp) => { + const bufferedItems = dp.peekBufferedItems(); + err.fetchedResults.push(...bufferedItems); + }); + } + /** + * Determine if there are still remaining resources to processs based on the value of the continuation + * token or the elements remaining on the current batch in the QueryIterator. + * @returns true if there is other elements to process in the ParallelQueryExecutionContextBase. + */ + hasMoreResults() { + return !(this.state === ParallelQueryExecutionContextBase.STATES.ended || this.err !== undefined); + } + /** + * Creates document producers + */ + _createTargetPartitionQueryExecutionContext(partitionKeyTargetRange, continuationToken) { + // TODO: any + // creates target partition range Query Execution Context + let rewrittenQuery = this.partitionedQueryExecutionInfo.queryInfo.rewrittenQuery; + let sqlQuerySpec; + const query = this.query; + if (typeof query === "string") { + sqlQuerySpec = { query }; + } else { + sqlQuerySpec = query; } + const formatPlaceHolder = "{documentdb-formattableorderbyquery-filter}"; + if (rewrittenQuery) { + sqlQuerySpec = JSON.parse(JSON.stringify(sqlQuerySpec)); + // We hardcode the formattable filter to true for now + rewrittenQuery = rewrittenQuery.replace(formatPlaceHolder, "true"); + sqlQuerySpec["query"] = rewrittenQuery; + } + const options = Object.assign({}, this.options); + options.continuationToken = continuationToken; + return new DocumentProducer( + this.clientContext, + this.collectionLink, + sqlQuerySpec, + partitionKeyTargetRange, + options, + ); + } + async _createDocumentProducersAndFillUpPriorityQueue(operationOptions, ruConsumedManager) { + try { + const targetPartitionRanges = await this._onTargetPartitionRanges(); + const maxDegreeOfParallelism = + this.options.maxDegreeOfParallelism === undefined || this.options.maxDegreeOfParallelism < 1 + ? targetPartitionRanges.length + : Math.min(this.options.maxDegreeOfParallelism, targetPartitionRanges.length); + logger.info( + "Query starting against " + + targetPartitionRanges.length + + " ranges with parallelism of " + + maxDegreeOfParallelism, + ); + let filteredPartitionKeyRanges = []; + // The document producers generated from filteredPartitionKeyRanges + const targetPartitionQueryExecutionContextList = []; + if (this.requestContinuation) { + throw new Error("Continuation tokens are not yet supported for cross partition queries"); + } else { + filteredPartitionKeyRanges = targetPartitionRanges; + } + // Create one documentProducer for each partitionTargetRange + filteredPartitionKeyRanges.forEach((partitionTargetRange) => { + // TODO: any partitionTargetRange + // no async callback + targetPartitionQueryExecutionContextList.push( + this._createTargetPartitionQueryExecutionContext(partitionTargetRange), + ); + }); + // Fill up our priority queue with documentProducers + let inProgressPromises = []; + for (const documentProducer of targetPartitionQueryExecutionContextList) { + // Don't enqueue any new promise if RU cap exceeded + if (this.ruCapExceededError) { + break; + } + const promise = this._processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager); + inProgressPromises.push(promise); + // Limit concurrent executions + if (inProgressPromises.length === maxDegreeOfParallelism) { + await Promise.all(inProgressPromises); + inProgressPromises = []; + } + } + // Wait for all promises to complete + await Promise.all(inProgressPromises); + if (this.err) { + if (this.ruCapExceededError) { + // merge the buffered items + this.orderByPQ.forEach((dp) => { + const bufferedItems = dp.peekBufferedItems(); + this.ruCapExceededError.fetchedResults.push(...bufferedItems); + }); + throw this.ruCapExceededError; + } + throw this.err; + } + } catch (err) { + this.err = err; + throw err; + } + } + async _processAndEnqueueDocumentProducer(documentProducer, operationOptions, ruConsumedManager) { + try { + const { result: document, headers } = await documentProducer.current( + this.getDiagnosticNode(), + operationOptions, + ruConsumedManager, + ); + this._mergeWithActiveResponseHeaders(headers); + if (document !== undefined) { + this.orderByPQ.enq(documentProducer); + } + } catch (err) { + this._mergeWithActiveResponseHeaders(err.headers); + this.err = err; + if (err.code === RUCapPerOperationExceededErrorCode) { + // would be halting further execution of other promises + if (!this.ruCapExceededError) { + this.ruCapExceededError = err; + } else { + // merge the buffered items + if (err.fetchedResults) { + this.ruCapExceededError.fetchedResults.push(...err.fetchedResults); + } + } + } else { + throw err; + } + } + return; + } } ParallelQueryExecutionContextBase.STATES = ParallelQueryExecutionContextBaseStates; -//# sourceMappingURL=parallelQueryExecutionContextBase.js.map \ No newline at end of file +//# sourceMappingURL=parallelQueryExecutionContextBase.js.map diff --git a/local_dependencies/@azure/cosmos/dist-esm/src/queryIterator.js b/local_dependencies/@azure/cosmos/dist-esm/src/queryIterator.js index efb2e0f1b..0f922feed 100644 --- a/local_dependencies/@azure/cosmos/dist-esm/src/queryIterator.js +++ b/local_dependencies/@azure/cosmos/dist-esm/src/queryIterator.js @@ -4,276 +4,313 @@ import { __asyncGenerator, __await } from "tslib"; import { getPathFromLink, ResourceType, RUConsumedManager, StatusCodes } from "./common"; import { MetadataLookUpType } from "./CosmosDiagnostics"; import { DiagnosticNodeInternal, DiagnosticNodeType } from "./diagnostics/DiagnosticNodeInternal"; -import { DefaultQueryExecutionContext, getInitialHeader, mergeHeaders, PipelinedQueryExecutionContext, } from "./queryExecutionContext"; +import { + DefaultQueryExecutionContext, + getInitialHeader, + mergeHeaders, + PipelinedQueryExecutionContext, +} from "./queryExecutionContext"; import { FeedResponse } from "./request/FeedResponse"; import { RUCapPerOperationExceededErrorCode } from "./request/RUCapPerOperationExceededError"; -import { getEmptyCosmosDiagnostics, withDiagnostics, withMetadataDiagnostics, } from "./utils/diagnostics"; +import { getEmptyCosmosDiagnostics, withDiagnostics, withMetadataDiagnostics } from "./utils/diagnostics"; /** * Represents a QueryIterator Object, an implementation of feed or query response that enables * traversal and iterating over the response * in the Azure Cosmos DB database service. */ export class QueryIterator { - /** - * @hidden - */ - constructor(clientContext, query, options, fetchFunctions, resourceLink, resourceType) { - this.clientContext = clientContext; - this.query = query; - this.options = options; - this.fetchFunctions = fetchFunctions; - this.resourceLink = resourceLink; - this.resourceType = resourceType; - this.nonStreamingOrderBy = false; - this.query = query; - this.fetchFunctions = fetchFunctions; - this.options = options || {}; - this.resourceLink = resourceLink; - this.fetchAllLastResHeaders = getInitialHeader(); - this.reset(); - this.isInitialized = false; - } - /** - * Gets an async iterator that will yield results until completion. - * - * NOTE: AsyncIterators are a very new feature and you might need to - * use polyfils/etc. in order to use them in your code. - * - * If you're using TypeScript, you can use the following polyfill as long - * as you target ES6 or higher and are running on Node 6 or higher. - * - * ```typescript - * if (!Symbol || !Symbol.asyncIterator) { - * (Symbol as any).asyncIterator = Symbol.for("Symbol.asyncIterator"); - * } - * ``` - * - * @example Iterate over all databases - * ```typescript - * for await(const { resources: db } of client.databases.readAll().getAsyncIterator()) { - * console.log(`Got ${db} from AsyncIterator`); - * } - * ``` - */ - getAsyncIterator(options) { - return __asyncGenerator(this, arguments, function* getAsyncIterator_1() { - this.reset(); - let diagnosticNode = new DiagnosticNodeInternal(this.clientContext.diagnosticLevel, DiagnosticNodeType.CLIENT_REQUEST_NODE, null); - this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode); - let ruConsumedManager; - if (options && options.ruCapPerOperation) { - ruConsumedManager = new RUConsumedManager(); - } - while (this.queryExecutionContext.hasMoreResults()) { - let response; - try { - response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager)); - } - catch (error) { - if (this.needsQueryPlan(error)) { - yield __await(this.createPipelinedExecutionContext()); - try { - response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager)); - } - catch (queryError) { - this.handleSplitError(queryError); - } - } - else { - throw error; - } - } - const feedResponse = new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults(), diagnosticNode.toDiagnostic(this.clientContext.getClientConfig())); - diagnosticNode = new DiagnosticNodeInternal(this.clientContext.diagnosticLevel, DiagnosticNodeType.CLIENT_REQUEST_NODE, null); - if (response.result !== undefined) { - yield yield __await(feedResponse); - } - } - }); - } - /** - * Determine if there are still remaining resources to process based on the value of the continuation token or the - * elements remaining on the current batch in the QueryIterator. - * @returns true if there is other elements to process in the QueryIterator. - */ - hasMoreResults() { - return this.queryExecutionContext.hasMoreResults(); - } - /** - * Fetch all pages for the query and return a single FeedResponse. - */ - async fetchAll(options) { - return withDiagnostics(async (diagnosticNode) => { - return this.fetchAllInternal(diagnosticNode, options); - }, this.clientContext); - } - /** - * @hidden - */ - async fetchAllInternal(diagnosticNode, options) { - this.reset(); + /** + * @hidden + */ + constructor(clientContext, query, options, fetchFunctions, resourceLink, resourceType) { + this.clientContext = clientContext; + this.query = query; + this.options = options; + this.fetchFunctions = fetchFunctions; + this.resourceLink = resourceLink; + this.resourceType = resourceType; + this.nonStreamingOrderBy = false; + this.query = query; + this.fetchFunctions = fetchFunctions; + this.options = options || {}; + this.resourceLink = resourceLink; + this.fetchAllLastResHeaders = getInitialHeader(); + this.reset(); + this.isInitialized = false; + } + /** + * Gets an async iterator that will yield results until completion. + * + * NOTE: AsyncIterators are a very new feature and you might need to + * use polyfils/etc. in order to use them in your code. + * + * If you're using TypeScript, you can use the following polyfill as long + * as you target ES6 or higher and are running on Node 6 or higher. + * + * ```typescript + * if (!Symbol || !Symbol.asyncIterator) { + * (Symbol as any).asyncIterator = Symbol.for("Symbol.asyncIterator"); + * } + * ``` + * + * @example Iterate over all databases + * ```typescript + * for await(const { resources: db } of client.databases.readAll().getAsyncIterator()) { + * console.log(`Got ${db} from AsyncIterator`); + * } + * ``` + */ + getAsyncIterator(options) { + return __asyncGenerator(this, arguments, function* getAsyncIterator_1() { + this.reset(); + let diagnosticNode = new DiagnosticNodeInternal( + this.clientContext.diagnosticLevel, + DiagnosticNodeType.CLIENT_REQUEST_NODE, + null, + ); + this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode); + let ruConsumedManager; + if (options && options.ruCapPerOperation) { + ruConsumedManager = new RUConsumedManager(); + } + while (this.queryExecutionContext.hasMoreResults()) { let response; try { - response = await this.toArrayImplementation(diagnosticNode, options); - } - catch (error) { - this.handleSplitError(error); - } - return response; - } - /** - * Retrieve the next batch from the feed. - * - * This may or may not fetch more pages from the backend depending on your settings - * and the type of query. Aggregate queries will generally fetch all backend pages - * before returning the first batch of responses. - */ - async fetchNext(options) { - return withDiagnostics(async (diagnosticNode) => { - // Enabling RU metrics for all the fetchNext operations - const ruConsumedManager = new RUConsumedManager(); - this.queryPlanPromise = withMetadataDiagnostics(async (metadataNode) => { - return this.fetchQueryPlan(metadataNode); - }, diagnosticNode, MetadataLookUpType.QueryPlanLookUp); - if (!this.isInitialized) { - await this.init(); - } - let response; + response = yield __await(this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager)); + } catch (error) { + if (this.needsQueryPlan(error)) { + yield __await(this.createPipelinedExecutionContext()); try { - response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager); + response = yield __await( + this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager), + ); + } catch (queryError) { + this.handleSplitError(queryError); } - catch (error) { - if (this.needsQueryPlan(error)) { - await this.createPipelinedExecutionContext(); - try { - response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager); - } - catch (queryError) { - this.handleSplitError(queryError); - } - } - else { - throw error; - } - } - return new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults(), getEmptyCosmosDiagnostics()); - }, this.clientContext); + } else { + throw error; + } + } + const feedResponse = new FeedResponse( + response.result, + response.headers, + this.queryExecutionContext.hasMoreResults(), + diagnosticNode.toDiagnostic(this.clientContext.getClientConfig()), + ); + diagnosticNode = new DiagnosticNodeInternal( + this.clientContext.diagnosticLevel, + DiagnosticNodeType.CLIENT_REQUEST_NODE, + null, + ); + if (response.result !== undefined) { + yield yield __await(feedResponse); + } + } + }); + } + /** + * Determine if there are still remaining resources to process based on the value of the continuation token or the + * elements remaining on the current batch in the QueryIterator. + * @returns true if there is other elements to process in the QueryIterator. + */ + hasMoreResults() { + return this.queryExecutionContext.hasMoreResults(); + } + /** + * Fetch all pages for the query and return a single FeedResponse. + */ + async fetchAll(options) { + return withDiagnostics(async (diagnosticNode) => { + return this.fetchAllInternal(diagnosticNode, options); + }, this.clientContext); + } + /** + * @hidden + */ + async fetchAllInternal(diagnosticNode, options) { + this.reset(); + let response; + try { + response = await this.toArrayImplementation(diagnosticNode, options); + } catch (error) { + this.handleSplitError(error); } - /** - * Reset the QueryIterator to the beginning and clear all the resources inside it - */ - reset() { - this.queryPlanPromise = undefined; - this.fetchAllLastResHeaders = getInitialHeader(); - this.fetchAllTempResources = []; - this.queryExecutionContext = new DefaultQueryExecutionContext(this.options, this.fetchFunctions); + return response; + } + /** + * Retrieve the next batch from the feed. + * + * This may or may not fetch more pages from the backend depending on your settings + * and the type of query. Aggregate queries will generally fetch all backend pages + * before returning the first batch of responses. + */ + async fetchNext(options) { + return withDiagnostics(async (diagnosticNode) => { + // Enabling RU metrics for all the fetchNext operations + const ruConsumedManager = new RUConsumedManager(); + this.queryPlanPromise = withMetadataDiagnostics( + async (metadataNode) => { + return this.fetchQueryPlan(metadataNode); + }, + diagnosticNode, + MetadataLookUpType.QueryPlanLookUp, + ); + if (!this.isInitialized) { + await this.init(); + } + let response; + try { + response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager); + } catch (error) { + if (this.needsQueryPlan(error)) { + await this.createPipelinedExecutionContext(); + try { + response = await this.queryExecutionContext.fetchMore(diagnosticNode, options, ruConsumedManager); + } catch (queryError) { + this.handleSplitError(queryError); + } + } else { + throw error; + } + } + return new FeedResponse( + response.result, + response.headers, + this.queryExecutionContext.hasMoreResults(), + getEmptyCosmosDiagnostics(), + ); + }, this.clientContext); + } + /** + * Reset the QueryIterator to the beginning and clear all the resources inside it + */ + reset() { + this.queryPlanPromise = undefined; + this.fetchAllLastResHeaders = getInitialHeader(); + this.fetchAllTempResources = []; + this.queryExecutionContext = new DefaultQueryExecutionContext(this.options, this.fetchFunctions); + } + async toArrayImplementation(diagnosticNode, options) { + let ruConsumedManager; + if (options && options.ruCapPerOperation) { + ruConsumedManager = new RUConsumedManager(); } - async toArrayImplementation(diagnosticNode, options) { - let ruConsumedManager; - if (options && options.ruCapPerOperation) { - ruConsumedManager = new RUConsumedManager(); - } - this.queryPlanPromise = withMetadataDiagnostics(async (metadataNode) => { - return this.fetchQueryPlan(metadataNode); - }, diagnosticNode, MetadataLookUpType.QueryPlanLookUp); - // this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode); - if (!this.isInitialized) { - await this.init(); - } - while (this.queryExecutionContext.hasMoreResults()) { - let response; - try { - response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager); - } - catch (error) { - if (this.needsQueryPlan(error)) { - await this.createPipelinedExecutionContext(); - response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager); - } - else if (error.code === RUCapPerOperationExceededErrorCode && error.fetchedResults) { - error.fetchedResults.forEach((item) => { - this.fetchAllTempResources.push(item); - }); - error.fetchedResults = this.fetchAllTempResources; - throw error; - } - else { - throw error; - } - } - const { result, headers } = response; - // concatenate the results and fetch more - mergeHeaders(this.fetchAllLastResHeaders, headers); - if (result !== undefined) { - if (this.nonStreamingOrderBy && - typeof result === "object" && - Object.keys(result).length === 0) { - // ignore empty results from NonStreamingOrderBy Endpoint components. - } - else - this.fetchAllTempResources.push(result); - } - } - return new FeedResponse(this.fetchAllTempResources, this.fetchAllLastResHeaders, this.queryExecutionContext.hasMoreResults(), getEmptyCosmosDiagnostics()); + this.queryPlanPromise = withMetadataDiagnostics( + async (metadataNode) => { + return this.fetchQueryPlan(metadataNode); + }, + diagnosticNode, + MetadataLookUpType.QueryPlanLookUp, + ); + // this.queryPlanPromise = this.fetchQueryPlan(diagnosticNode); + if (!this.isInitialized) { + await this.init(); } - async createPipelinedExecutionContext() { - const queryPlanResponse = await this.queryPlanPromise; - // We always coerce queryPlanPromise to resolved. So if it errored, we need to manually inspect the resolved value - if (queryPlanResponse instanceof Error) { - throw queryPlanResponse; + while (this.queryExecutionContext.hasMoreResults()) { + let response; + try { + response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager); + } catch (error) { + if (this.needsQueryPlan(error)) { + await this.createPipelinedExecutionContext(); + response = await this.queryExecutionContext.nextItem(diagnosticNode, options, ruConsumedManager); + } else if (error.code === RUCapPerOperationExceededErrorCode && error.fetchedResults) { + error.fetchedResults.forEach((item) => { + this.fetchAllTempResources.push(item); + }); + error.fetchedResults = this.fetchAllTempResources; + throw error; + } else { + throw error; } - const queryPlan = queryPlanResponse.result; - const queryInfo = queryPlan.queryInfo; - //this.nonStreamingOrderBy = queryInfo.hasNonStreamingOrderBy ? true : false; - this.nonStreamingOrderBy = false; - /*if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) { + } + const { result, headers } = response; + // concatenate the results and fetch more + mergeHeaders(this.fetchAllLastResHeaders, headers); + if (result !== undefined) { + if (this.nonStreamingOrderBy && typeof result === "object" && Object.keys(result).length === 0) { + // ignore empty results from NonStreamingOrderBy Endpoint components. + } else this.fetchAllTempResources.push(result); + } + } + return new FeedResponse( + this.fetchAllTempResources, + this.fetchAllLastResHeaders, + this.queryExecutionContext.hasMoreResults(), + getEmptyCosmosDiagnostics(), + ); + } + async createPipelinedExecutionContext() { + const queryPlanResponse = await this.queryPlanPromise; + // We always coerce queryPlanPromise to resolved. So if it errored, we need to manually inspect the resolved value + if (queryPlanResponse instanceof Error) { + throw queryPlanResponse; + } + const queryPlan = queryPlanResponse.result; + const queryInfo = queryPlan.queryInfo; + //this.nonStreamingOrderBy = queryInfo.hasNonStreamingOrderBy ? true : false; + this.nonStreamingOrderBy = false; + /*if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) { throw new Error("Aggregate queries must use the VALUE keyword"); } */ - this.queryExecutionContext = new PipelinedQueryExecutionContext(this.clientContext, this.resourceLink, this.query, this.options, queryPlan); + this.queryExecutionContext = new PipelinedQueryExecutionContext( + this.clientContext, + this.resourceLink, + this.query, + this.options, + queryPlan, + ); + } + async fetchQueryPlan(diagnosticNode) { + if (!this.queryPlanPromise && this.resourceType === ResourceType.item) { + return this.clientContext + .getQueryPlan( + getPathFromLink(this.resourceLink) + "/docs", + ResourceType.item, + this.resourceLink, + this.query, + this.options, + diagnosticNode, + ) + .catch((error) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored. } - async fetchQueryPlan(diagnosticNode) { - if (!this.queryPlanPromise && this.resourceType === ResourceType.item) { - return this.clientContext - .getQueryPlan(getPathFromLink(this.resourceLink) + "/docs", ResourceType.item, this.resourceLink, this.query, this.options, diagnosticNode) - .catch((error) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored. - } - return this.queryPlanPromise; + return this.queryPlanPromise; + } + needsQueryPlan(error) { + var _a; + let needsQueryPlanValue = false; + if ( + ((_a = error.body) === null || _a === void 0 ? void 0 : _a.additionalErrorInfo) || + error.message.includes("Cross partition query only supports") + ) { + needsQueryPlanValue = error.code === StatusCodes.BadRequest && this.resourceType === ResourceType.item; } - needsQueryPlan(error) { - var _a; - let needsQueryPlanValue = false; - if (((_a = error.body) === null || _a === void 0 ? void 0 : _a.additionalErrorInfo) || - error.message.includes("Cross partition query only supports")) { - needsQueryPlanValue = - error.code === StatusCodes.BadRequest && this.resourceType === ResourceType.item; - } - return needsQueryPlanValue; + return needsQueryPlanValue; + } + async init() { + if (this.isInitialized === true) { + return; } - async init() { - if (this.isInitialized === true) { - return; - } - if (this.initPromise === undefined) { - this.initPromise = this._init(); - } - return this.initPromise; + if (this.initPromise === undefined) { + this.initPromise = this._init(); } - async _init() { - if (this.options.forceQueryPlan === true && this.resourceType === ResourceType.item) { - await this.createPipelinedExecutionContext(); - } - this.isInitialized = true; + return this.initPromise; + } + async _init() { + if (this.options.forceQueryPlan === true && this.resourceType === ResourceType.item) { + await this.createPipelinedExecutionContext(); } - handleSplitError(err) { - if (err.code === 410) { - const error = new Error("Encountered partition split and could not recover. This request is retryable"); - error.code = 503; - error.originalError = err; - throw error; - } - else { - throw err; - } + this.isInitialized = true; + } + handleSplitError(err) { + if (err.code === 410) { + const error = new Error("Encountered partition split and could not recover. This request is retryable"); + error.code = 503; + error.originalError = err; + throw error; + } else { + throw err; } + } } -//# sourceMappingURL=queryIterator.js.map \ No newline at end of file +//# sourceMappingURL=queryIterator.js.map diff --git a/package-lock.json b/package-lock.json index d47dced71..054b4583c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -209,6 +209,57 @@ "extraneous": true, "license": "ISC" }, + "local_dependencies/@azure/cosmos": { + "version": "4.0.1-beta.3", + "license": "MIT", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-auth": "^1.3.0", + "@azure/core-rest-pipeline": "^1.2.0", + "@azure/core-tracing": "^1.0.0", + "debug": "^4.1.1", + "fast-json-stable-stringify": "^2.1.0", + "jsbi": "^3.1.3", + "node-abort-controller": "^3.0.0", + "priorityqueuejs": "^2.0.0", + "semaphore": "^1.0.5", + "tslib": "^2.2.0", + "universal-user-agent": "^6.0.0", + "uuid": "^8.3.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "local_dependencies/@azure/cosmos/node_modules/tslib": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.7.0.tgz", + "integrity": "sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==" + }, + "local_dependencies/cosmos": { + "name": "@azure/cosmos", + "version": "4.0.1-beta.3", + "extraneous": true, + "license": "MIT", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-auth": "^1.3.0", + "@azure/core-rest-pipeline": "^1.2.0", + "@azure/core-tracing": "^1.0.0", + "debug": "^4.1.1", + "fast-json-stable-stringify": "^2.1.0", + "jsbi": "^3.1.3", + "node-abort-controller": "^3.0.0", + "priorityqueuejs": "^2.0.0", + "semaphore": "^1.0.5", + "tslib": "^2.2.0", + "universal-user-agent": "^6.0.0", + "uuid": "^8.3.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "license": "MIT", @@ -429,11 +480,6 @@ "node": ">=0.10.0" } }, - "node_modules/@azure/cosmos/node_modules/tslib": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", - "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" - }, "node_modules/@azure/identity": { "version": "1.5.2", "license": "MIT", diff --git a/src/Utils/EndpointUtils.ts b/src/Utils/EndpointUtils.ts index c0617ec17..25c194439 100644 --- a/src/Utils/EndpointUtils.ts +++ b/src/Utils/EndpointUtils.ts @@ -131,7 +131,7 @@ export const CassandraProxyOutboundIPs: { [key: string]: string[] } = { [CassandraProxyEndpoints.Mooncake]: ["40.73.99.146", "143.64.62.47"], }; -export const allowedEmulatorEndpoints: ReadonlyArray = ["https://localhost:8081","http://localhost:8081"]; +export const allowedEmulatorEndpoints: ReadonlyArray = ["https://localhost:8081", "http://localhost:8081"]; export const allowedMongoBackendEndpoints: ReadonlyArray = ["https://localhost:1234"];