HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux ip-10-0-8-47 6.8.0-1021-aws #23~22.04.1-Ubuntu SMP Tue Dec 10 16:31:58 UTC 2024 aarch64
User: ubuntu (1000)
PHP: 8.1.2-1ubuntu2.22
Disabled: NONE
Upload Files
File: /var/www/api.javaapp.co.uk/node_modules/@google-cloud/firestore/build/src/bulk-writer.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.BulkWriter = exports.BulkWriterError = exports.DEFAULT_JITTER_FACTOR = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = exports.RETRY_MAX_BATCH_SIZE = void 0;
const assert = require("assert");
const backoff_1 = require("./backoff");
const rate_limiter_1 = require("./rate-limiter");
const timestamp_1 = require("./timestamp");
const util_1 = require("./util");
const write_batch_1 = require("./write-batch");
const validate_1 = require("./validate");
const logger_1 = require("./logger");
/*!
 * The maximum number of writes that can be in a single batch.
 */
const MAX_BATCH_SIZE = 20;
/*!
 * The maximum number of writes can be can in a single batch that is being retried.
 */
exports.RETRY_MAX_BATCH_SIZE = 10;
/*!
 * The starting maximum number of operations per second as allowed by the
 * 500/50/5 rule.
 *
 * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
 */
exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = 500;
/*!
 * The maximum number of operations per second as allowed by the 500/50/5 rule.
 * By default the rate limiter will not exceed this value.
 *
 * https://firebase.google.com/docs/firestore/best-practices#ramping_up_traffic.
 */
exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = 10000;
/*!
 * The default jitter to apply to the exponential backoff used in retries. For
 * example, a factor of 0.3 means a 30% jitter is applied.
 */
exports.DEFAULT_JITTER_FACTOR = 0.3;
/*!
 * The rate by which to increase the capacity as specified by the 500/50/5 rule.
 */
const RATE_LIMITER_MULTIPLIER = 1.5;
/*!
 * How often the operations per second capacity should increase in milliseconds
 * as specified by the 500/50/5 rule.
 */
const RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;
/*!
 * The default maximum number of pending operations that can be enqueued onto a
 * BulkWriter instance. An operation is considered pending if BulkWriter has
 * sent it via RPC and is awaiting the result. BulkWriter buffers additional
 * writes after this many pending operations in order to avoiding going OOM.
 */
const DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;
/**
 * Represents a single write for BulkWriter, encapsulating operation dispatch
 * and error handling.
 * @private
 * @internal
 */
class BulkWriterOperation {
    /**
     * @param ref The document reference being written to.
     * @param type The type of operation that created this write.
     * @param sendFn A callback to invoke when the operation should be sent.
     * @param errorFn The user provided global error callback.
     * @param successFn The user provided global success callback.
     */
    constructor(ref, type, sendFn, errorFn, successFn) {
        this.ref = ref;
        this.type = type;
        this.sendFn = sendFn;
        this.errorFn = errorFn;
        this.successFn = successFn;
        this.deferred = new util_1.Deferred();
        this.failedAttempts = 0;
        this._backoffDuration = 0;
        /** Whether flush() was called when this was the last enqueued operation. */
        this._flushed = false;
    }
    get promise() {
        return this.deferred.promise;
    }
    get backoffDuration() {
        return this._backoffDuration;
    }
    markFlushed() {
        this._flushed = true;
    }
    get flushed() {
        return this._flushed;
    }
    onError(error) {
        ++this.failedAttempts;
        try {
            const bulkWriterError = new BulkWriterError(error.code, error.message, this.ref, this.type, this.failedAttempts);
            const shouldRetry = this.errorFn(bulkWriterError);
            (0, logger_1.logger)('BulkWriter.errorFn', null, 'Ran error callback on error code:', error.code, ', shouldRetry:', shouldRetry, ' for document:', this.ref.path);
            if (shouldRetry) {
                this.lastStatus = error.code;
                this.updateBackoffDuration();
                this.sendFn(this);
            }
            else {
                this.deferred.reject(bulkWriterError);
            }
        }
        catch (userCallbackError) {
            this.deferred.reject(userCallbackError);
        }
    }
    updateBackoffDuration() {
        if (this.lastStatus === 8 /* StatusCode.RESOURCE_EXHAUSTED */) {
            this._backoffDuration = backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS;
        }
        else if (this._backoffDuration === 0) {
            this._backoffDuration = backoff_1.DEFAULT_BACKOFF_INITIAL_DELAY_MS;
        }
        else {
            this._backoffDuration *= backoff_1.DEFAULT_BACKOFF_FACTOR;
        }
    }
    onSuccess(result) {
        try {
            this.successFn(this.ref, result);
            this.deferred.resolve(result);
        }
        catch (userCallbackError) {
            this.deferred.reject(userCallbackError);
        }
    }
}
/**
 * Used to represent a batch on the BatchQueue.
 *
 * @private
 * @internal
 */
class BulkCommitBatch extends write_batch_1.WriteBatch {
    constructor(firestore, maxBatchSize) {
        super(firestore);
        // The set of document reference paths present in the WriteBatch.
        this.docPaths = new Set();
        // An array of pending write operations. Only contains writes that have not
        // been resolved.
        this.pendingOps = [];
        this._maxBatchSize = maxBatchSize;
    }
    get maxBatchSize() {
        return this._maxBatchSize;
    }
    setMaxBatchSize(size) {
        assert(this.pendingOps.length <= size, 'New batch size cannot be less than the number of enqueued writes');
        this._maxBatchSize = size;
    }
    has(documentRef) {
        return this.docPaths.has(documentRef.path);
    }
    async bulkCommit(options = {}) {
        var _a;
        const tag = (_a = options === null || options === void 0 ? void 0 : options.requestTag) !== null && _a !== void 0 ? _a : (0, util_1.requestTag)();
        // Capture the error stack to preserve stack tracing across async calls.
        const stack = Error().stack;
        let response;
        try {
            (0, logger_1.logger)('BulkCommitBatch.bulkCommit', tag, `Sending next batch with ${this._opCount} writes`);
            const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
            response = await this._commit({ retryCodes, methodName: 'batchWrite', requestTag: tag });
        }
        catch (err) {
            // Map the failure to each individual write's result.
            const ops = Array.from({ length: this.pendingOps.length });
            response = {
                writeResults: ops.map(() => {
                    return {};
                }),
                status: ops.map(() => err),
            };
        }
        for (let i = 0; i < (response.writeResults || []).length; ++i) {
            // Since delete operations currently do not have write times, use a
            // sentinel Timestamp value.
            // TODO(b/158502664): Use actual delete timestamp.
            const DELETE_TIMESTAMP_SENTINEL = timestamp_1.Timestamp.fromMillis(0);
            const status = (response.status || [])[i];
            if (status.code === 0 /* StatusCode.OK */) {
                const updateTime = timestamp_1.Timestamp.fromProto(response.writeResults[i].updateTime || DELETE_TIMESTAMP_SENTINEL);
                this.pendingOps[i].onSuccess(new write_batch_1.WriteResult(updateTime));
            }
            else {
                const error = new (require('google-gax/build/src/fallback').GoogleError)(status.message || undefined);
                error.code = status.code;
                this.pendingOps[i].onError((0, util_1.wrapError)(error, stack));
            }
        }
    }
    /**
     * Helper to update data structures associated with the operation and returns
     * the result.
     */
    processLastOperation(op) {
        assert(!this.docPaths.has(op.ref.path), 'Batch should not contain writes to the same document');
        this.docPaths.add(op.ref.path);
        this.pendingOps.push(op);
    }
}
/**
 * Used to represent a buffered BulkWriterOperation.
 *
 * @private
 * @internal
 */
class BufferedOperation {
    constructor(operation, sendFn) {
        this.operation = operation;
        this.sendFn = sendFn;
    }
}
/**
 * The error thrown when a BulkWriter operation fails.
 *
 * @class BulkWriterError
 */
class BulkWriterError extends Error {
    /**
     * @private
     * @internal
     */
    constructor(
    /** The status code of the error. */
    code, 
    /** The error message of the error. */
    message, 
    /** The document reference the operation was performed on. */
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    documentRef, 
    /** The type of operation performed. */
    operationType, 
    /** How many times this operation has been attempted unsuccessfully. */
    failedAttempts) {
        super(message);
        this.code = code;
        this.message = message;
        this.documentRef = documentRef;
        this.operationType = operationType;
        this.failedAttempts = failedAttempts;
    }
}
exports.BulkWriterError = BulkWriterError;
/**
 * A Firestore BulkWriter that can be used to perform a large number of writes
 * in parallel.
 *
 * @class BulkWriter
 */
class BulkWriter {
    // Visible for testing.
    /**
     * @private
     * @internal
     */
    _getBufferedOperationsCount() {
        return this._bufferedOperations.length;
    }
    // Visible for testing.
    /**
     * @private
     * @internal
     */
    _setMaxBatchSize(size) {
        assert(this._bulkCommitBatch.pendingOps.length === 0, 'BulkCommitBatch should be empty');
        this._maxBatchSize = size;
        this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
    }
    // Visible for testing.
    /**
     * @private
     * @internal
     */
    _setMaxPendingOpCount(newMax) {
        this._maxPendingOpCount = newMax;
    }
    /** @private */
    constructor(firestore, options) {
        var _a, _b;
        this.firestore = firestore;
        /**
         * The maximum number of writes that can be in a single batch.
         * Visible for testing.
         * @private
         * @internal
         */
        this._maxBatchSize = MAX_BATCH_SIZE;
        /**
         * The batch that is currently used to schedule operations. Once this batch
         * reaches maximum capacity, a new batch is created.
         * @private
         * @internal
         */
        this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
        /**
         * A pointer to the tail of all active BulkWriter operations. This pointer
         * is advanced every time a new write is enqueued.
         * @private
         * @internal
         */
        this._lastOp = Promise.resolve();
        /**
         * Whether this BulkWriter instance has started to close. Afterwards, no
         * new operations can be enqueued, except for retry operations scheduled by
         * the error handler.
         * @private
         * @internal
         */
        this._closing = false;
        /**
         * The number of pending operations enqueued on this BulkWriter instance.
         * An operation is considered pending if BulkWriter has sent it via RPC and
         * is awaiting the result.
         * @private
         * @internal
         */
        this._pendingOpsCount = 0;
        /**
         * An array containing buffered BulkWriter operations after the maximum number
         * of pending operations has been enqueued.
         * @private
         * @internal
         */
        this._bufferedOperations = [];
        /**
         * Whether a custom error handler has been set. BulkWriter only swallows
         * errors if an error handler is set. Otherwise, an UnhandledPromiseRejection
         * is thrown by Node if an operation promise is rejected without being
         * handled.
         * @private
         * @internal
         */
        this._errorHandlerSet = false;
        /**
         * The maximum number of pending operations that can be enqueued onto this
         * BulkWriter instance. Once the this number of writes have been enqueued,
         * subsequent writes are buffered.
         * @private
         * @internal
         */
        this._maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;
        /**
         * The user-provided callback to be run every time a BulkWriter operation
         * successfully completes.
         * @private
         * @internal
         */
        this._successFn = () => { };
        /**
         * The user-provided callback to be run every time a BulkWriter operation
         * fails.
         * @private
         * @internal
         */
        this._errorFn = error => {
            const isRetryableDeleteError = error.operationType === 'delete' &&
                error.code === 13 /* StatusCode.INTERNAL */;
            const retryCodes = (0, util_1.getRetryCodes)('batchWrite');
            return ((retryCodes.includes(error.code) || isRetryableDeleteError) &&
                error.failedAttempts < backoff_1.MAX_RETRY_ATTEMPTS);
        };
        this.firestore._incrementBulkWritersCount();
        validateBulkWriterOptions(options);
        if ((options === null || options === void 0 ? void 0 : options.throttling) === false) {
            this._rateLimiter = new rate_limiter_1.RateLimiter(Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY);
        }
        else {
            let startingRate = exports.DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT;
            let maxRate = exports.DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT;
            if (typeof (options === null || options === void 0 ? void 0 : options.throttling) !== 'boolean') {
                if (((_a = options === null || options === void 0 ? void 0 : options.throttling) === null || _a === void 0 ? void 0 : _a.maxOpsPerSecond) !== undefined) {
                    maxRate = options.throttling.maxOpsPerSecond;
                }
                if (((_b = options === null || options === void 0 ? void 0 : options.throttling) === null || _b === void 0 ? void 0 : _b.initialOpsPerSecond) !== undefined) {
                    startingRate = options.throttling.initialOpsPerSecond;
                }
                // The initial validation step ensures that the maxOpsPerSecond is
                // greater than initialOpsPerSecond. If this inequality is true, that
                // means initialOpsPerSecond was not set and maxOpsPerSecond is less
                // than the default starting rate.
                if (maxRate < startingRate) {
                    startingRate = maxRate;
                }
                // Ensure that the batch size is not larger than the number of allowed
                // operations per second.
                if (startingRate < this._maxBatchSize) {
                    this._maxBatchSize = startingRate;
                }
            }
            this._rateLimiter = new rate_limiter_1.RateLimiter(startingRate, RATE_LIMITER_MULTIPLIER, RATE_LIMITER_MULTIPLIER_MILLIS, maxRate);
        }
    }
    /**
     * Create a document with the provided data. This single operation will fail
     * if a document exists at its location.
     *
     * @param {DocumentReference} documentRef A reference to the document to be
     * created.
     * @param {T} data The object to serialize as the document.
     * @throws {Error} If the provided input is not a valid Firestore document.
     * @returns {Promise<WriteResult>} A promise that resolves with the result of
     * the write. If the write fails, the promise is rejected with a
     * [BulkWriterError]{@link BulkWriterError}.
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     * let documentRef = firestore.collection('col').doc();
     *
     * bulkWriter
     *  .create(documentRef, {foo: 'bar'})
     *  .then(result => {
     *    console.log('Successfully executed write at: ', result);
     *  })
     *  .catch(err => {
     *    console.log('Write failed with: ', err);
     *  });
     * });
     * ```
     */
    create(documentRef, data) {
        this._verifyNotClosed();
        return this._enqueue(documentRef, 'create', bulkCommitBatch => bulkCommitBatch.create(documentRef, data));
    }
    /**
     * Delete a document from the database.
     *
     * @param {DocumentReference} documentRef A reference to the document to be
     * deleted.
     * @param {Precondition=} precondition A precondition to enforce for this
     * delete.
     * @param {Timestamp=} precondition.lastUpdateTime If set, enforces that the
     * document was last updated at lastUpdateTime. Fails the batch if the
     * document doesn't exist or was last updated at a different time.
     * @returns {Promise<WriteResult>} A promise that resolves with the result of
     * the delete. If the delete fails, the promise is rejected with a
     * [BulkWriterError]{@link BulkWriterError}.
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     * let documentRef = firestore.doc('col/doc');
     *
     * bulkWriter
     *  .delete(documentRef)
     *  .then(result => {
     *    console.log('Successfully deleted document');
     *  })
     *  .catch(err => {
     *    console.log('Delete failed with: ', err);
     *  });
     * });
     * ```
     */
    delete(documentRef, precondition) {
        this._verifyNotClosed();
        return this._enqueue(documentRef, 'delete', bulkCommitBatch => bulkCommitBatch.delete(documentRef, precondition));
    }
    /**
     * Write to the document referred to by the provided
     * [DocumentReference]{@link DocumentReference}. If the document does not
     * exist yet, it will be created. If you pass [SetOptions]{@link SetOptions}.,
     * the provided data can be merged into the existing document.
     *
     * @param {DocumentReference} documentRef A reference to the document to be
     * set.
     * @param {T} data The object to serialize as the document.
     * @param {SetOptions=} options An object to configure the set behavior.
     * @throws {Error} If the provided input is not a valid Firestore document.
     * @param {boolean=} options.merge - If true, set() merges the values
     * specified in its data argument. Fields omitted from this set() call remain
     * untouched. If your input sets any field to an empty map, all nested fields
     * are overwritten.
     * @param {Array.<string|FieldPath>=} options.mergeFields - If provided, set()
     * only replaces the specified field paths. Any field path that is not
     * specified is ignored and remains untouched. If your input sets any field to
     * an empty map, all nested fields are overwritten.
     * @returns {Promise<WriteResult>} A promise that resolves with the result of
     * the write. If the write fails, the promise is rejected with a
     * [BulkWriterError]{@link BulkWriterError}.
     *
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     * let documentRef = firestore.collection('col').doc();
     *
     * bulkWriter
     *  .set(documentRef, {foo: 'bar'})
     *  .then(result => {
     *    console.log('Successfully executed write at: ', result);
     *  })
     *  .catch(err => {
     *    console.log('Write failed with: ', err);
     *  });
     * });
     * ```
     */
    set(documentRef, data, options) {
        this._verifyNotClosed();
        return this._enqueue(documentRef, 'set', bulkCommitBatch => {
            if (options) {
                return bulkCommitBatch.set(documentRef, data, options);
            }
            else {
                return bulkCommitBatch.set(documentRef, data);
            }
        });
    }
    /**
     * Update fields of the document referred to by the provided
     * [DocumentReference]{@link DocumentReference}. If the document doesn't yet
     * exist, the update fails and the entire batch will be rejected.
     *
     * The update() method accepts either an object with field paths encoded as
     * keys and field values encoded as values, or a variable number of arguments
     * that alternate between field paths and field values. Nested fields can be
     * updated by providing dot-separated field path strings or by providing
     * FieldPath objects.
     *
     *
     * A Precondition restricting this update can be specified as the last
     * argument.
     *
     * @param {DocumentReference} documentRef A reference to the document to be
     * updated.
     * @param {UpdateData|string|FieldPath} dataOrField An object containing the
     * fields and values with which to update the document or the path of the
     * first field to update.
     * @param {...(Precondition|*|string|FieldPath)} preconditionOrValues - An
     * alternating list of field paths and values to update or a Precondition to
     * restrict this update
     * @throws {Error} If the provided input is not valid Firestore data.
     * @returns {Promise<WriteResult>} A promise that resolves with the result of
     * the write. If the write fails, the promise is rejected with a
     * [BulkWriterError]{@link BulkWriterError}.
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     * let documentRef = firestore.doc('col/doc');
     *
     * bulkWriter
     *  .update(documentRef, {foo: 'bar'})
     *  .then(result => {
     *    console.log('Successfully executed write at: ', result);
     *  })
     *  .catch(err => {
     *    console.log('Write failed with: ', err);
     *  });
     * });
     * ```
     */
    update(documentRef, dataOrField, ...preconditionOrValues) {
        this._verifyNotClosed();
        return this._enqueue(documentRef, 'update', bulkCommitBatch => bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues));
    }
    /**
     * Callback function set by {@link BulkWriter#onWriteResult} that is run
     * every time a {@link BulkWriter} operation successfully completes.
     *
     * @callback BulkWriter~successCallback
     * @param {DocumentReference} documentRef The document reference the
     * operation was performed on
     * @param {WriteResult} result The server write time of the operation.
     */
    /**
     * Attaches a listener that is run every time a BulkWriter operation
     * successfully completes.
     *
     * @param {BulkWriter~successCallback} successCallback A callback to be
     * called every time a BulkWriter operation successfully completes.
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     *
     * bulkWriter
     *   .onWriteResult((documentRef, result) => {
     *     console.log(
     *       'Successfully executed write on document: ',
     *       documentRef,
     *       ' at: ',
     *       result
     *     );
     *   });
     * ```
     */
    onWriteResult(successCallback) {
        this._successFn = successCallback;
    }
    /**
     * Callback function set by {@link BulkWriter#onWriteError} that is run when
     * a write fails in order to determine whether {@link BulkWriter} should
     * retry the operation.
     *
     * @callback BulkWriter~shouldRetryCallback
     * @param {BulkWriterError} error The error object with information about the
     * operation and error.
     * @returns {boolean} Whether or not to retry the failed operation. Returning
     * `true` retries the operation. Returning `false` will stop the retry loop.
     */
    /**
     * Attaches an error handler listener that is run every time a BulkWriter
     * operation fails.
     *
     * BulkWriter has a default error handler that retries UNAVAILABLE and
     * ABORTED errors up to a maximum of 10 failed attempts. When an error
     * handler is specified, the default error handler will be overwritten.
     *
     * @param shouldRetryCallback {BulkWriter~shouldRetryCallback} A callback to
     * be called every time a BulkWriter operation fails. Returning `true` will
     * retry the operation. Returning `false` will stop the retry loop.
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     *
     * bulkWriter
     *   .onWriteError((error) => {
     *     if (
     *       error.code === GrpcStatus.UNAVAILABLE &&
     *       error.failedAttempts < MAX_RETRY_ATTEMPTS
     *     ) {
     *       return true;
     *     } else {
     *       console.log('Failed write at document: ', error.documentRef);
     *       return false;
     *     }
     *   });
     * ```
     */
    onWriteError(shouldRetryCallback) {
        this._errorHandlerSet = true;
        this._errorFn = shouldRetryCallback;
    }
    /**
     * Commits all writes that have been enqueued up to this point in parallel.
     *
     * Returns a Promise that resolves when all currently queued operations have
     * been committed. The Promise will never be rejected since the results for
     * each individual operation are conveyed via their individual Promises.
     *
     * The Promise resolves immediately if there are no pending writes. Otherwise,
     * the Promise waits for all previously issued writes, but it does not wait
     * for writes that were added after the method is called. If you want to wait
     * for additional writes, call `flush()` again.
     *
     * @return {Promise<void>} A promise that resolves when all enqueued writes
     * up to this point have been committed.
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     *
     * bulkWriter.create(documentRef, {foo: 'bar'});
     * bulkWriter.update(documentRef2, {foo: 'bar'});
     * bulkWriter.delete(documentRef3);
     * await flush().then(() => {
     *   console.log('Executed all writes');
     * });
     * ```
     */
    flush() {
        this._verifyNotClosed();
        this._scheduleCurrentBatch(/* flush= */ true);
        // Mark the most recent operation as flushed to ensure that the batch
        // containing it will be sent once it's popped from the buffer.
        if (this._bufferedOperations.length > 0) {
            this._bufferedOperations[this._bufferedOperations.length - 1].operation.markFlushed();
        }
        return this._lastOp;
    }
    /**
     * Commits all enqueued writes and marks the BulkWriter instance as closed.
     *
     * After calling `close()`, calling any method will throw an error. Any
     * retries scheduled as part of an `onWriteError()` handler will be run
     * before the `close()` promise resolves.
     *
     * Returns a Promise that resolves when there are no more pending writes. The
     * Promise will never be rejected. Calling this method will send all requests.
     * The promise resolves immediately if there are no pending writes.
     *
     * @return {Promise<void>} A promise that resolves when all enqueued writes
     * up to this point have been committed.
     *
     * @example
     * ```
     * let bulkWriter = firestore.bulkWriter();
     *
     * bulkWriter.create(documentRef, {foo: 'bar'});
     * bulkWriter.update(documentRef2, {foo: 'bar'});
     * bulkWriter.delete(documentRef3);
     * await close().then(() => {
     *   console.log('Executed all writes');
     * });
     * ```
     */
    close() {
        this._verifyNotClosed();
        this.firestore._decrementBulkWritersCount();
        const flushPromise = this.flush();
        this._closing = true;
        return flushPromise;
    }
    /**
     * Throws an error if the BulkWriter instance has been closed.
     * @private
     * @internal
     */
    _verifyNotClosed() {
        if (this._closing) {
            throw new Error('BulkWriter has already been closed.');
        }
    }
    /**
     * Sends the current batch and resets `this._bulkCommitBatch`.
     *
     * @param flush If provided, keeps re-sending operations until no more
     * operations are enqueued. This allows retries to resolve as part of a
     * `flush()` or `close()` call.
     * @private
     * @internal
     */
    _scheduleCurrentBatch(flush = false) {
        if (this._bulkCommitBatch._opCount === 0)
            return;
        const pendingBatch = this._bulkCommitBatch;
        this._bulkCommitBatch = new BulkCommitBatch(this.firestore, this._maxBatchSize);
        // Use the write with the longest backoff duration when determining backoff.
        const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) => (prev.backoffDuration > cur.backoffDuration ? prev : cur)).backoffDuration;
        const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
        const delayedExecution = new util_1.Deferred();
        if (backoffMsWithJitter > 0) {
            (0, backoff_1.delayExecution)(() => delayedExecution.resolve(), backoffMsWithJitter);
        }
        else {
            delayedExecution.resolve();
        }
        delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush));
    }
    /**
     * Sends the provided batch once the rate limiter does not require any delay.
     * @private
     * @internal
     */
    async _sendBatch(batch, flush = false) {
        const tag = (0, util_1.requestTag)();
        // Send the batch if it is does not require any delay, or schedule another
        // attempt after the appropriate timeout.
        const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount);
        if (underRateLimit) {
            await batch.bulkCommit({ requestTag: tag });
            if (flush)
                this._scheduleCurrentBatch(flush);
        }
        else {
            const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount);
            (0, logger_1.logger)('BulkWriter._sendBatch', tag, `Backing off for ${delayMs} seconds`);
            (0, backoff_1.delayExecution)(() => this._sendBatch(batch, flush), delayMs);
        }
    }
    /**
     * Adds a 30% jitter to the provided backoff.
     *
     * @private
     * @internal
     */
    static _applyJitter(backoffMs) {
        if (backoffMs === 0)
            return 0;
        // Random value in [-0.3, 0.3].
        const jitter = exports.DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
        return Math.min(backoff_1.DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
    }
    /**
     * Schedules and runs the provided operation on the next available batch.
     * @private
     * @internal
     */
    _enqueue(ref, type, enqueueOnBatchCallback) {
        const bulkWriterOp = new BulkWriterOperation(ref, type, this._sendFn.bind(this, enqueueOnBatchCallback), this._errorFn.bind(this), this._successFn.bind(this));
        // Swallow the error if the developer has set an error listener. This
        // prevents UnhandledPromiseRejections from being thrown if a floating
        // BulkWriter operation promise fails when an error handler is specified.
        //
        // This is done here in order to chain the caught promise onto `lastOp`,
        // which ensures that flush() resolves after the operation promise.
        const userPromise = bulkWriterOp.promise.catch(err => {
            if (!this._errorHandlerSet) {
                throw err;
            }
            else {
                return bulkWriterOp.promise;
            }
        });
        // Advance the `_lastOp` pointer. This ensures that `_lastOp` only resolves
        // when both the previous and the current write resolve.
        this._lastOp = this._lastOp.then(() => (0, util_1.silencePromise)(userPromise));
        // Schedule the operation if the BulkWriter has fewer than the maximum
        // number of allowed pending operations, or add the operation to the
        // buffer.
        if (this._pendingOpsCount < this._maxPendingOpCount) {
            this._pendingOpsCount++;
            this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
        }
        else {
            this._bufferedOperations.push(new BufferedOperation(bulkWriterOp, () => {
                this._pendingOpsCount++;
                this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
            }));
        }
        // Chain the BulkWriter operation promise with the buffer processing logic
        // in order to ensure that it runs and that subsequent operations are
        // enqueued before the next batch is scheduled in `_sendBatch()`.
        return userPromise
            .then(res => {
            this._pendingOpsCount--;
            this._processBufferedOps();
            return res;
        })
            .catch(err => {
            this._pendingOpsCount--;
            this._processBufferedOps();
            throw err;
        });
    }
    /**
     * Manages the pending operation counter and schedules the next BulkWriter
     * operation if we're under the maximum limit.
     * @private
     * @internal
     */
    _processBufferedOps() {
        if (this._pendingOpsCount < this._maxPendingOpCount &&
            this._bufferedOperations.length > 0) {
            const nextOp = this._bufferedOperations.shift();
            nextOp.sendFn();
        }
    }
    /**
     * Schedules the provided operations on current BulkCommitBatch.
     * Sends the BulkCommitBatch if it reaches maximum capacity.
     *
     * @private
     * @internal
     */
    _sendFn(enqueueOnBatchCallback, op) {
        // A backoff duration greater than 0 implies that this batch is a retry.
        // Retried writes are sent with a batch size of 10 in order to guarantee
        // that the batch is under the 10MiB limit.
        if (op.backoffDuration > 0) {
            if (this._bulkCommitBatch.pendingOps.length >= exports.RETRY_MAX_BATCH_SIZE) {
                this._scheduleCurrentBatch(/* flush= */ false);
            }
            this._bulkCommitBatch.setMaxBatchSize(exports.RETRY_MAX_BATCH_SIZE);
        }
        if (this._bulkCommitBatch.has(op.ref)) {
            // Create a new batch since the backend doesn't support batches with two
            // writes to the same document.
            this._scheduleCurrentBatch();
        }
        enqueueOnBatchCallback(this._bulkCommitBatch);
        this._bulkCommitBatch.processLastOperation(op);
        if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
            this._scheduleCurrentBatch();
        }
        else if (op.flushed) {
            // If flush() was called before this operation was enqueued into a batch,
            // we still need to schedule it.
            this._scheduleCurrentBatch(/* flush= */ true);
        }
    }
}
exports.BulkWriter = BulkWriter;
/**
 * Validates the use of 'value' as BulkWriterOptions.
 *
 * @private
 * @internal
 * @param value The BulkWriterOptions object to validate.
 * @throws if the input is not a valid BulkWriterOptions object.
 */
function validateBulkWriterOptions(value) {
    if ((0, validate_1.validateOptional)(value, { optional: true })) {
        return;
    }
    const argName = 'options';
    if (!(0, util_1.isObject)(value)) {
        throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} Input is not an object.`);
    }
    const options = value;
    if (options.throttling === undefined ||
        typeof options.throttling === 'boolean') {
        return;
    }
    if (options.throttling.initialOpsPerSecond !== undefined) {
        (0, validate_1.validateInteger)('initialOpsPerSecond', options.throttling.initialOpsPerSecond, {
            minValue: 1,
        });
    }
    if (options.throttling.maxOpsPerSecond !== undefined) {
        (0, validate_1.validateInteger)('maxOpsPerSecond', options.throttling.maxOpsPerSecond, {
            minValue: 1,
        });
        if (options.throttling.initialOpsPerSecond !== undefined &&
            options.throttling.initialOpsPerSecond >
                options.throttling.maxOpsPerSecond) {
            throw new Error(`${(0, validate_1.invalidArgumentMessage)(argName, 'bulkWriter() options argument')} "maxOpsPerSecond" cannot be less than "initialOpsPerSecond".`);
        }
    }
}
//# sourceMappingURL=bulk-writer.js.map