HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux ip-10-0-8-47 6.8.0-1021-aws #23~22.04.1-Ubuntu SMP Tue Dec 10 16:31:58 UTC 2024 aarch64
User: ubuntu (1000)
PHP: 8.1.2-1ubuntu2.22
Disabled: NONE
Upload Files
File: /var/www/api.javaapp.co.uk/node_modules/@grpc/grpc-js/src/load-balancing-call.ts
/*
 * Copyright 2022 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

import { CallCredentials } from './call-credentials';
import {
  Call,
  DeadlineInfoProvider,
  InterceptingListener,
  MessageContext,
  StatusObject,
} from './call-interface';
import { SubchannelCall } from './subchannel-call';
import { ConnectivityState } from './connectivity-state';
import { LogVerbosity, Status } from './constants';
import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline';
import { InternalChannel } from './internal-channel';
import { Metadata } from './metadata';
import { PickResultType } from './picker';
import { CallConfig } from './resolver';
import { splitHostPort } from './uri-parser';
import * as logging from './logging';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import * as http2 from 'http2';

const TRACER_NAME = 'load_balancing_call';

export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';

export interface StatusObjectWithProgress extends StatusObject {
  progress: RpcProgress;
}

export interface LoadBalancingCallInterceptingListener
  extends InterceptingListener {
  onReceiveStatus(status: StatusObjectWithProgress): void;
}

export class LoadBalancingCall implements Call, DeadlineInfoProvider {
  private child: SubchannelCall | null = null;
  private readPending = false;
  private pendingMessage: { context: MessageContext; message: Buffer } | null =
    null;
  private pendingHalfClose = false;
  private ended = false;
  private serviceUrl: string;
  private metadata: Metadata | null = null;
  private listener: InterceptingListener | null = null;
  private onCallEnded: ((statusCode: Status) => void) | null = null;
  private startTime: Date;
  private childStartTime: Date | null = null;
  constructor(
    private readonly channel: InternalChannel,
    private readonly callConfig: CallConfig,
    private readonly methodName: string,
    private readonly host: string,
    private readonly credentials: CallCredentials,
    private readonly deadline: Deadline,
    private readonly callNumber: number
  ) {
    const splitPath: string[] = this.methodName.split('/');
    let serviceName = '';
    /* The standard path format is "/{serviceName}/{methodName}", so if we split
     * by '/', the first item should be empty and the second should be the
     * service name */
    if (splitPath.length >= 2) {
      serviceName = splitPath[1];
    }
    const hostname = splitHostPort(this.host)?.host ?? 'localhost';
    /* Currently, call credentials are only allowed on HTTPS connections, so we
     * can assume that the scheme is "https" */
    this.serviceUrl = `https://${hostname}/${serviceName}`;
    this.startTime = new Date();
  }
  getDeadlineInfo(): string[] {
    const deadlineInfo: string[] = [];
    if (this.childStartTime) {
      if (this.childStartTime > this.startTime) {
        if (this.metadata?.getOptions().waitForReady) {
          deadlineInfo.push('wait_for_ready');
        }
        deadlineInfo.push(`LB pick: ${formatDateDifference(this.startTime, this.childStartTime)}`);
      }
      deadlineInfo.push(...this.child!.getDeadlineInfo());
      return deadlineInfo;
    } else {
      if (this.metadata?.getOptions().waitForReady) {
        deadlineInfo.push('wait_for_ready');
      }
      deadlineInfo.push('Waiting for LB pick');
    }
    return deadlineInfo;
  }

  private trace(text: string): void {
    logging.trace(
      LogVerbosity.DEBUG,
      TRACER_NAME,
      '[' + this.callNumber + '] ' + text
    );
  }

  private outputStatus(status: StatusObject, progress: RpcProgress) {
    if (!this.ended) {
      this.ended = true;
      this.trace(
        'ended with status: code=' +
          status.code +
          ' details="' +
          status.details +
          '" start time=' +
          this.startTime.toISOString()
      );
      const finalStatus = { ...status, progress };
      this.listener?.onReceiveStatus(finalStatus);
      this.onCallEnded?.(finalStatus.code);
    }
  }

  doPick() {
    if (this.ended) {
      return;
    }
    if (!this.metadata) {
      throw new Error('doPick called before start');
    }
    this.trace('Pick called');
    const finalMetadata = this.metadata.clone();
    const pickResult = this.channel.doPick(
      finalMetadata,
      this.callConfig.pickInformation
    );
    const subchannelString = pickResult.subchannel
      ? '(' +
        pickResult.subchannel.getChannelzRef().id +
        ') ' +
        pickResult.subchannel.getAddress()
      : '' + pickResult.subchannel;
    this.trace(
      'Pick result: ' +
        PickResultType[pickResult.pickResultType] +
        ' subchannel: ' +
        subchannelString +
        ' status: ' +
        pickResult.status?.code +
        ' ' +
        pickResult.status?.details
    );
    switch (pickResult.pickResultType) {
      case PickResultType.COMPLETE:
        this.credentials
          .generateMetadata({ service_url: this.serviceUrl })
          .then(
            credsMetadata => {
              /* If this call was cancelled (e.g. by the deadline) before
               * metadata generation finished, we shouldn't do anything with
               * it. */
              if (this.ended) {
                this.trace(
                  'Credentials metadata generation finished after call ended'
                );
                return;
              }
              finalMetadata.merge(credsMetadata);
              if (finalMetadata.get('authorization').length > 1) {
                this.outputStatus(
                  {
                    code: Status.INTERNAL,
                    details:
                      '"authorization" metadata cannot have multiple values',
                    metadata: new Metadata(),
                  },
                  'PROCESSED'
                );
              }
              if (
                pickResult.subchannel!.getConnectivityState() !==
                ConnectivityState.READY
              ) {
                this.trace(
                  'Picked subchannel ' +
                    subchannelString +
                    ' has state ' +
                    ConnectivityState[
                      pickResult.subchannel!.getConnectivityState()
                    ] +
                    ' after getting credentials metadata. Retrying pick'
                );
                this.doPick();
                return;
              }

              if (this.deadline !== Infinity) {
                finalMetadata.set(
                  'grpc-timeout',
                  getDeadlineTimeoutString(this.deadline)
                );
              }
              try {
                this.child = pickResult
                  .subchannel!.getRealSubchannel()
                  .createCall(finalMetadata, this.host, this.methodName, {
                    onReceiveMetadata: metadata => {
                      this.trace('Received metadata');
                      this.listener!.onReceiveMetadata(metadata);
                    },
                    onReceiveMessage: message => {
                      this.trace('Received message');
                      this.listener!.onReceiveMessage(message);
                    },
                    onReceiveStatus: status => {
                      this.trace('Received status');
                      if (
                        status.rstCode ===
                        http2.constants.NGHTTP2_REFUSED_STREAM
                      ) {
                        this.outputStatus(status, 'REFUSED');
                      } else {
                        this.outputStatus(status, 'PROCESSED');
                      }
                    },
                  });
                this.childStartTime = new Date();
              } catch (error) {
                this.trace(
                  'Failed to start call on picked subchannel ' +
                    subchannelString +
                    ' with error ' +
                    (error as Error).message
                );
                this.outputStatus(
                  {
                    code: Status.INTERNAL,
                    details:
                      'Failed to start HTTP/2 stream with error ' +
                      (error as Error).message,
                    metadata: new Metadata(),
                  },
                  'NOT_STARTED'
                );
                return;
              }
              this.callConfig.onCommitted?.();
              pickResult.onCallStarted?.();
              this.onCallEnded = pickResult.onCallEnded;
              this.trace(
                'Created child call [' + this.child.getCallNumber() + ']'
              );
              if (this.readPending) {
                this.child.startRead();
              }
              if (this.pendingMessage) {
                this.child.sendMessageWithContext(
                  this.pendingMessage.context,
                  this.pendingMessage.message
                );
              }
              if (this.pendingHalfClose) {
                this.child.halfClose();
              }
            },
            (error: Error & { code: number }) => {
              // We assume the error code isn't 0 (Status.OK)
              const { code, details } = restrictControlPlaneStatusCode(
                typeof error.code === 'number' ? error.code : Status.UNKNOWN,
                `Getting metadata from plugin failed with error: ${error.message}`
              );
              this.outputStatus(
                {
                  code: code,
                  details: details,
                  metadata: new Metadata(),
                },
                'PROCESSED'
              );
            }
          );
        break;
      case PickResultType.DROP:
        const { code, details } = restrictControlPlaneStatusCode(
          pickResult.status!.code,
          pickResult.status!.details
        );
        setImmediate(() => {
          this.outputStatus(
            { code, details, metadata: pickResult.status!.metadata },
            'DROP'
          );
        });
        break;
      case PickResultType.TRANSIENT_FAILURE:
        if (this.metadata.getOptions().waitForReady) {
          this.channel.queueCallForPick(this);
        } else {
          const { code, details } = restrictControlPlaneStatusCode(
            pickResult.status!.code,
            pickResult.status!.details
          );
          setImmediate(() => {
            this.outputStatus(
              { code, details, metadata: pickResult.status!.metadata },
              'PROCESSED'
            );
          });
        }
        break;
      case PickResultType.QUEUE:
        this.channel.queueCallForPick(this);
    }
  }

  cancelWithStatus(status: Status, details: string): void {
    this.trace(
      'cancelWithStatus code: ' + status + ' details: "' + details + '"'
    );
    this.child?.cancelWithStatus(status, details);
    this.outputStatus(
      { code: status, details: details, metadata: new Metadata() },
      'PROCESSED'
    );
  }
  getPeer(): string {
    return this.child?.getPeer() ?? this.channel.getTarget();
  }
  start(
    metadata: Metadata,
    listener: LoadBalancingCallInterceptingListener
  ): void {
    this.trace('start called');
    this.listener = listener;
    this.metadata = metadata;
    this.doPick();
  }
  sendMessageWithContext(context: MessageContext, message: Buffer): void {
    this.trace('write() called with message of length ' + message.length);
    if (this.child) {
      this.child.sendMessageWithContext(context, message);
    } else {
      this.pendingMessage = { context, message };
    }
  }
  startRead(): void {
    this.trace('startRead called');
    if (this.child) {
      this.child.startRead();
    } else {
      this.readPending = true;
    }
  }
  halfClose(): void {
    this.trace('halfClose called');
    if (this.child) {
      this.child.halfClose();
    } else {
      this.pendingHalfClose = true;
    }
  }
  setCredentials(credentials: CallCredentials): void {
    throw new Error('Method not implemented.');
  }

  getCallNumber(): number {
    return this.callNumber;
  }
}