fix
This commit is contained in:
74
node_modules/mongodb/src/sdam/common.ts
generated
vendored
Normal file
74
node_modules/mongodb/src/sdam/common.ts
generated
vendored
Normal file
@@ -0,0 +1,74 @@
|
||||
import type { Binary, Long, Timestamp } from '../bson';
|
||||
import type { ClientSession } from '../sessions';
|
||||
import type { Topology } from './topology';
|
||||
|
||||
// shared state names
|
||||
export const STATE_CLOSING = 'closing';
|
||||
export const STATE_CLOSED = 'closed';
|
||||
export const STATE_CONNECTING = 'connecting';
|
||||
export const STATE_CONNECTED = 'connected';
|
||||
|
||||
/**
|
||||
* An enumeration of topology types we know about
|
||||
* @public
|
||||
*/
|
||||
export const TopologyType = Object.freeze({
|
||||
Single: 'Single',
|
||||
ReplicaSetNoPrimary: 'ReplicaSetNoPrimary',
|
||||
ReplicaSetWithPrimary: 'ReplicaSetWithPrimary',
|
||||
Sharded: 'Sharded',
|
||||
Unknown: 'Unknown',
|
||||
LoadBalanced: 'LoadBalanced'
|
||||
} as const);
|
||||
|
||||
/** @public */
|
||||
export type TopologyType = (typeof TopologyType)[keyof typeof TopologyType];
|
||||
|
||||
/**
|
||||
* An enumeration of server types we know about
|
||||
* @public
|
||||
*/
|
||||
export const ServerType = Object.freeze({
|
||||
Standalone: 'Standalone',
|
||||
Mongos: 'Mongos',
|
||||
PossiblePrimary: 'PossiblePrimary',
|
||||
RSPrimary: 'RSPrimary',
|
||||
RSSecondary: 'RSSecondary',
|
||||
RSArbiter: 'RSArbiter',
|
||||
RSOther: 'RSOther',
|
||||
RSGhost: 'RSGhost',
|
||||
Unknown: 'Unknown',
|
||||
LoadBalancer: 'LoadBalancer'
|
||||
} as const);
|
||||
|
||||
/** @public */
|
||||
export type ServerType = (typeof ServerType)[keyof typeof ServerType];
|
||||
|
||||
/**
|
||||
* @public
|
||||
* Gossiped in component for the cluster time tracking the state of user databases
|
||||
* across the cluster. It may optionally include a signature identifying the process that
|
||||
* generated such a value.
|
||||
*/
|
||||
export interface ClusterTime {
|
||||
clusterTime: Timestamp;
|
||||
/** Used to validate the identity of a request or response's ClusterTime. */
|
||||
signature?: {
|
||||
hash: Binary;
|
||||
keyId: Long;
|
||||
};
|
||||
}
|
||||
|
||||
/** Shared function to determine clusterTime for a given topology or session */
|
||||
export function _advanceClusterTime(
|
||||
entity: Topology | ClientSession,
|
||||
$clusterTime: ClusterTime
|
||||
): void {
|
||||
if (entity.clusterTime == null) {
|
||||
entity.clusterTime = $clusterTime;
|
||||
} else {
|
||||
if ($clusterTime.clusterTime.greaterThan(entity.clusterTime.clusterTime)) {
|
||||
entity.clusterTime = $clusterTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
219
node_modules/mongodb/src/sdam/events.ts
generated
vendored
Normal file
219
node_modules/mongodb/src/sdam/events.ts
generated
vendored
Normal file
@@ -0,0 +1,219 @@
|
||||
import type { Document } from '../bson';
|
||||
import {
|
||||
SERVER_CLOSED,
|
||||
SERVER_DESCRIPTION_CHANGED,
|
||||
SERVER_HEARTBEAT_FAILED,
|
||||
SERVER_HEARTBEAT_STARTED,
|
||||
SERVER_HEARTBEAT_SUCCEEDED,
|
||||
SERVER_OPENING,
|
||||
TOPOLOGY_CLOSED,
|
||||
TOPOLOGY_DESCRIPTION_CHANGED,
|
||||
TOPOLOGY_OPENING
|
||||
} from '../constants';
|
||||
import type { ServerDescription } from './server_description';
|
||||
import type { TopologyDescription } from './topology_description';
|
||||
|
||||
/**
|
||||
* Emitted when server description changes, but does NOT include changes to the RTT.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerDescriptionChangedEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** The address (host/port pair) of the server */
|
||||
address: string;
|
||||
/** The previous server description */
|
||||
previousDescription: ServerDescription;
|
||||
/** The new server description */
|
||||
newDescription: ServerDescription;
|
||||
name = SERVER_DESCRIPTION_CHANGED;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
topologyId: number,
|
||||
address: string,
|
||||
previousDescription: ServerDescription,
|
||||
newDescription: ServerDescription
|
||||
) {
|
||||
this.topologyId = topologyId;
|
||||
this.address = address;
|
||||
this.previousDescription = previousDescription;
|
||||
this.newDescription = newDescription;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when server is initialized.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerOpeningEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** The address (host/port pair) of the server */
|
||||
address: string;
|
||||
/** @internal */
|
||||
name = SERVER_OPENING;
|
||||
|
||||
/** @internal */
|
||||
constructor(topologyId: number, address: string) {
|
||||
this.topologyId = topologyId;
|
||||
this.address = address;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when server is closed.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerClosedEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** The address (host/port pair) of the server */
|
||||
address: string;
|
||||
/** @internal */
|
||||
name = SERVER_CLOSED;
|
||||
|
||||
/** @internal */
|
||||
constructor(topologyId: number, address: string) {
|
||||
this.topologyId = topologyId;
|
||||
this.address = address;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when topology description changes.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class TopologyDescriptionChangedEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** The old topology description */
|
||||
previousDescription: TopologyDescription;
|
||||
/** The new topology description */
|
||||
newDescription: TopologyDescription;
|
||||
/** @internal */
|
||||
name = TOPOLOGY_DESCRIPTION_CHANGED;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
topologyId: number,
|
||||
previousDescription: TopologyDescription,
|
||||
newDescription: TopologyDescription
|
||||
) {
|
||||
this.topologyId = topologyId;
|
||||
this.previousDescription = previousDescription;
|
||||
this.newDescription = newDescription;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when topology is initialized.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class TopologyOpeningEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** @internal */
|
||||
name = TOPOLOGY_OPENING;
|
||||
|
||||
/** @internal */
|
||||
constructor(topologyId: number) {
|
||||
this.topologyId = topologyId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when topology is closed.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class TopologyClosedEvent {
|
||||
/** A unique identifier for the topology */
|
||||
topologyId: number;
|
||||
/** @internal */
|
||||
name = TOPOLOGY_CLOSED;
|
||||
|
||||
/** @internal */
|
||||
constructor(topologyId: number) {
|
||||
this.topologyId = topologyId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when the server monitor’s hello command is started - immediately before
|
||||
* the hello command is serialized into raw BSON and written to the socket.
|
||||
*
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerHeartbeatStartedEvent {
|
||||
/** The connection id for the command */
|
||||
connectionId: string;
|
||||
/** Is true when using the streaming protocol */
|
||||
awaited: boolean;
|
||||
/** @internal */
|
||||
name = SERVER_HEARTBEAT_STARTED;
|
||||
|
||||
/** @internal */
|
||||
constructor(connectionId: string, awaited: boolean) {
|
||||
this.connectionId = connectionId;
|
||||
this.awaited = awaited;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when the server monitor’s hello succeeds.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerHeartbeatSucceededEvent {
|
||||
/** The connection id for the command */
|
||||
connectionId: string;
|
||||
/** The execution time of the event in ms */
|
||||
duration: number;
|
||||
/** The command reply */
|
||||
reply: Document;
|
||||
/** Is true when using the streaming protocol */
|
||||
awaited: boolean;
|
||||
/** @internal */
|
||||
name = SERVER_HEARTBEAT_SUCCEEDED;
|
||||
|
||||
/** @internal */
|
||||
constructor(connectionId: string, duration: number, reply: Document | null, awaited: boolean) {
|
||||
this.connectionId = connectionId;
|
||||
this.duration = duration;
|
||||
this.reply = reply ?? {};
|
||||
this.awaited = awaited;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when the server monitor’s hello fails, either with an “ok: 0” or a socket exception.
|
||||
* @public
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerHeartbeatFailedEvent {
|
||||
/** The connection id for the command */
|
||||
connectionId: string;
|
||||
/** The execution time of the event in ms */
|
||||
duration: number;
|
||||
/** The command failure */
|
||||
failure: Error;
|
||||
/** Is true when using the streaming protocol */
|
||||
awaited: boolean;
|
||||
/** @internal */
|
||||
name = SERVER_HEARTBEAT_FAILED;
|
||||
|
||||
/** @internal */
|
||||
constructor(connectionId: string, duration: number, failure: Error, awaited: boolean) {
|
||||
this.connectionId = connectionId;
|
||||
this.duration = duration;
|
||||
this.failure = failure;
|
||||
this.awaited = awaited;
|
||||
}
|
||||
}
|
||||
771
node_modules/mongodb/src/sdam/monitor.ts
generated
vendored
Normal file
771
node_modules/mongodb/src/sdam/monitor.ts
generated
vendored
Normal file
@@ -0,0 +1,771 @@
|
||||
import { clearTimeout, setTimeout } from 'timers';
|
||||
|
||||
import { type Document, Long } from '../bson';
|
||||
import { connect, makeConnection, makeSocket, performInitialHandshake } from '../cmap/connect';
|
||||
import type { Connection, ConnectionOptions } from '../cmap/connection';
|
||||
import { getFAASEnv } from '../cmap/handshake/client_metadata';
|
||||
import { LEGACY_HELLO_COMMAND } from '../constants';
|
||||
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
|
||||
import { MongoLoggableComponent } from '../mongo_logger';
|
||||
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
|
||||
import {
|
||||
calculateDurationInMs,
|
||||
type Callback,
|
||||
type EventEmitterWithState,
|
||||
makeStateMachine,
|
||||
noop,
|
||||
now,
|
||||
ns
|
||||
} from '../utils';
|
||||
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';
|
||||
import {
|
||||
ServerHeartbeatFailedEvent,
|
||||
ServerHeartbeatStartedEvent,
|
||||
ServerHeartbeatSucceededEvent
|
||||
} from './events';
|
||||
import { Server } from './server';
|
||||
import type { TopologyVersion } from './server_description';
|
||||
|
||||
const STATE_IDLE = 'idle';
|
||||
const STATE_MONITORING = 'monitoring';
|
||||
const stateTransition = makeStateMachine({
|
||||
[STATE_CLOSING]: [STATE_CLOSING, STATE_IDLE, STATE_CLOSED],
|
||||
[STATE_CLOSED]: [STATE_CLOSED, STATE_MONITORING],
|
||||
[STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, STATE_CLOSING],
|
||||
[STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, STATE_CLOSING]
|
||||
});
|
||||
|
||||
const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]);
|
||||
function isInCloseState(monitor: Monitor) {
|
||||
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export const ServerMonitoringMode = Object.freeze({
|
||||
auto: 'auto',
|
||||
poll: 'poll',
|
||||
stream: 'stream'
|
||||
} as const);
|
||||
|
||||
/** @public */
|
||||
export type ServerMonitoringMode = (typeof ServerMonitoringMode)[keyof typeof ServerMonitoringMode];
|
||||
|
||||
/** @internal */
|
||||
export interface MonitorPrivate {
|
||||
state: string;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export interface MonitorOptions
|
||||
extends Omit<ConnectionOptions, 'id' | 'generation' | 'hostAddress'> {
|
||||
connectTimeoutMS: number;
|
||||
heartbeatFrequencyMS: number;
|
||||
minHeartbeatFrequencyMS: number;
|
||||
serverMonitoringMode: ServerMonitoringMode;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export type MonitorEvents = {
|
||||
serverHeartbeatStarted(event: ServerHeartbeatStartedEvent): void;
|
||||
serverHeartbeatSucceeded(event: ServerHeartbeatSucceededEvent): void;
|
||||
serverHeartbeatFailed(event: ServerHeartbeatFailedEvent): void;
|
||||
resetServer(error?: MongoError): void;
|
||||
resetConnectionPool(): void;
|
||||
close(): void;
|
||||
} & EventEmitterWithState;
|
||||
|
||||
/** @internal */
|
||||
export class Monitor extends TypedEventEmitter<MonitorEvents> {
|
||||
/** @internal */
|
||||
s: MonitorPrivate;
|
||||
address: string;
|
||||
options: Readonly<
|
||||
Pick<
|
||||
MonitorOptions,
|
||||
| 'connectTimeoutMS'
|
||||
| 'heartbeatFrequencyMS'
|
||||
| 'minHeartbeatFrequencyMS'
|
||||
| 'serverMonitoringMode'
|
||||
>
|
||||
>;
|
||||
connectOptions: ConnectionOptions;
|
||||
isRunningInFaasEnv: boolean;
|
||||
server: Server;
|
||||
connection: Connection | null;
|
||||
cancellationToken: CancellationToken;
|
||||
/** @internal */
|
||||
monitorId?: MonitorInterval;
|
||||
rttPinger?: RTTPinger;
|
||||
/** @internal */
|
||||
override component = MongoLoggableComponent.TOPOLOGY;
|
||||
/** @internal */
|
||||
private rttSampler: RTTSampler;
|
||||
|
||||
constructor(server: Server, options: MonitorOptions) {
|
||||
super();
|
||||
this.on('error', noop);
|
||||
|
||||
this.server = server;
|
||||
this.connection = null;
|
||||
this.cancellationToken = new CancellationToken();
|
||||
this.cancellationToken.setMaxListeners(Infinity);
|
||||
this.monitorId = undefined;
|
||||
this.s = {
|
||||
state: STATE_CLOSED
|
||||
};
|
||||
this.address = server.description.address;
|
||||
this.options = Object.freeze({
|
||||
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
|
||||
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
|
||||
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
|
||||
serverMonitoringMode: options.serverMonitoringMode
|
||||
});
|
||||
this.isRunningInFaasEnv = getFAASEnv() != null;
|
||||
this.mongoLogger = this.server.topology.client?.mongoLogger;
|
||||
this.rttSampler = new RTTSampler(10);
|
||||
|
||||
const cancellationToken = this.cancellationToken;
|
||||
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
|
||||
const connectOptions = {
|
||||
id: '<monitor>' as const,
|
||||
generation: server.pool.generation,
|
||||
cancellationToken,
|
||||
hostAddress: server.description.hostAddress,
|
||||
...options,
|
||||
// force BSON serialization options
|
||||
raw: false,
|
||||
useBigInt64: false,
|
||||
promoteLongs: true,
|
||||
promoteValues: true,
|
||||
promoteBuffers: true
|
||||
};
|
||||
|
||||
// ensure no authentication is used for monitoring
|
||||
delete connectOptions.credentials;
|
||||
if (connectOptions.autoEncrypter) {
|
||||
delete connectOptions.autoEncrypter;
|
||||
}
|
||||
|
||||
this.connectOptions = Object.freeze(connectOptions);
|
||||
}
|
||||
|
||||
connect(): void {
|
||||
if (this.s.state !== STATE_CLOSED) {
|
||||
return;
|
||||
}
|
||||
|
||||
// start
|
||||
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
|
||||
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
|
||||
this.monitorId = new MonitorInterval(monitorServer(this), {
|
||||
heartbeatFrequencyMS: heartbeatFrequencyMS,
|
||||
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,
|
||||
immediate: true
|
||||
});
|
||||
}
|
||||
|
||||
requestCheck(): void {
|
||||
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.monitorId?.wake();
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
const topologyVersion = this.server.description.topologyVersion;
|
||||
if (isInCloseState(this) || topologyVersion == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
stateTransition(this, STATE_CLOSING);
|
||||
resetMonitorState(this);
|
||||
|
||||
// restart monitor
|
||||
stateTransition(this, STATE_IDLE);
|
||||
|
||||
// restart monitoring
|
||||
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
|
||||
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
|
||||
this.monitorId = new MonitorInterval(monitorServer(this), {
|
||||
heartbeatFrequencyMS: heartbeatFrequencyMS,
|
||||
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS
|
||||
});
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (isInCloseState(this)) {
|
||||
return;
|
||||
}
|
||||
|
||||
stateTransition(this, STATE_CLOSING);
|
||||
resetMonitorState(this);
|
||||
|
||||
// close monitor
|
||||
this.emit('close');
|
||||
stateTransition(this, STATE_CLOSED);
|
||||
}
|
||||
|
||||
get roundTripTime(): number {
|
||||
return this.rttSampler.average();
|
||||
}
|
||||
|
||||
get minRoundTripTime(): number {
|
||||
return this.rttSampler.min();
|
||||
}
|
||||
|
||||
get latestRtt(): number | null {
|
||||
return this.rttSampler.last;
|
||||
}
|
||||
|
||||
addRttSample(rtt: number) {
|
||||
this.rttSampler.addSample(rtt);
|
||||
}
|
||||
|
||||
clearRttSamples() {
|
||||
this.rttSampler.clear();
|
||||
}
|
||||
}
|
||||
|
||||
function resetMonitorState(monitor: Monitor) {
|
||||
monitor.monitorId?.stop();
|
||||
monitor.monitorId = undefined;
|
||||
|
||||
monitor.rttPinger?.close();
|
||||
monitor.rttPinger = undefined;
|
||||
|
||||
monitor.cancellationToken.emit('cancel');
|
||||
|
||||
monitor.connection?.destroy();
|
||||
monitor.connection = null;
|
||||
|
||||
monitor.clearRttSamples();
|
||||
}
|
||||
|
||||
function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
|
||||
// If we have no topology version we always poll no matter
|
||||
// what the user provided, since the server does not support
|
||||
// the streaming protocol.
|
||||
if (topologyVersion == null) return false;
|
||||
|
||||
const serverMonitoringMode = monitor.options.serverMonitoringMode;
|
||||
if (serverMonitoringMode === ServerMonitoringMode.poll) return false;
|
||||
if (serverMonitoringMode === ServerMonitoringMode.stream) return true;
|
||||
|
||||
// If we are in auto mode, we need to figure out if we're in a FaaS
|
||||
// environment or not and choose the appropriate mode.
|
||||
if (monitor.isRunningInFaasEnv) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
|
||||
let start: number;
|
||||
let awaited: boolean;
|
||||
const topologyVersion = monitor.server.description.topologyVersion;
|
||||
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
|
||||
monitor.emitAndLogHeartbeat(
|
||||
Server.SERVER_HEARTBEAT_STARTED,
|
||||
monitor.server.topology.s.id,
|
||||
undefined,
|
||||
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
|
||||
);
|
||||
|
||||
function onHeartbeatFailed(err: Error) {
|
||||
monitor.connection?.destroy();
|
||||
monitor.connection = null;
|
||||
monitor.emitAndLogHeartbeat(
|
||||
Server.SERVER_HEARTBEAT_FAILED,
|
||||
monitor.server.topology.s.id,
|
||||
undefined,
|
||||
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
|
||||
);
|
||||
|
||||
const error = !(err instanceof MongoError)
|
||||
? new MongoError(MongoError.buildErrorMessage(err), { cause: err })
|
||||
: err;
|
||||
error.addErrorLabel(MongoErrorLabel.ResetPool);
|
||||
if (error instanceof MongoNetworkTimeoutError) {
|
||||
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
|
||||
}
|
||||
|
||||
monitor.emit('resetServer', error);
|
||||
callback(err);
|
||||
}
|
||||
|
||||
function onHeartbeatSucceeded(hello: Document) {
|
||||
if (!('isWritablePrimary' in hello)) {
|
||||
// Provide hello-style response document.
|
||||
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
|
||||
}
|
||||
|
||||
// NOTE: here we use the latestRtt as this measurement corresponds with the value
|
||||
// obtained for this successful heartbeat, if there is no latestRtt, then we calculate the
|
||||
// duration
|
||||
const duration =
|
||||
isAwaitable && monitor.rttPinger
|
||||
? (monitor.rttPinger.latestRtt ?? calculateDurationInMs(start))
|
||||
: calculateDurationInMs(start);
|
||||
|
||||
monitor.addRttSample(duration);
|
||||
|
||||
monitor.emitAndLogHeartbeat(
|
||||
Server.SERVER_HEARTBEAT_SUCCEEDED,
|
||||
monitor.server.topology.s.id,
|
||||
hello.connectionId,
|
||||
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
|
||||
);
|
||||
|
||||
if (isAwaitable) {
|
||||
// If we are using the streaming protocol then we immediately issue another 'started'
|
||||
// event, otherwise the "check" is complete and return to the main monitor loop
|
||||
monitor.emitAndLogHeartbeat(
|
||||
Server.SERVER_HEARTBEAT_STARTED,
|
||||
monitor.server.topology.s.id,
|
||||
undefined,
|
||||
new ServerHeartbeatStartedEvent(monitor.address, true)
|
||||
);
|
||||
// We have not actually sent an outgoing handshake, but when we get the next response we
|
||||
// want the duration to reflect the time since we last heard from the server
|
||||
start = now();
|
||||
} else {
|
||||
monitor.rttPinger?.close();
|
||||
monitor.rttPinger = undefined;
|
||||
|
||||
callback(undefined, hello);
|
||||
}
|
||||
}
|
||||
|
||||
const { connection } = monitor;
|
||||
if (connection && !connection.closed) {
|
||||
const { serverApi, helloOk } = connection;
|
||||
const connectTimeoutMS = monitor.options.connectTimeoutMS;
|
||||
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
|
||||
|
||||
const cmd = {
|
||||
[serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
|
||||
...(isAwaitable && topologyVersion
|
||||
? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
|
||||
: {})
|
||||
};
|
||||
|
||||
const options = isAwaitable
|
||||
? {
|
||||
socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
|
||||
exhaustAllowed: true
|
||||
}
|
||||
: { socketTimeoutMS: connectTimeoutMS };
|
||||
|
||||
if (isAwaitable && monitor.rttPinger == null) {
|
||||
monitor.rttPinger = new RTTPinger(monitor);
|
||||
}
|
||||
|
||||
// Record new start time before sending handshake
|
||||
start = now();
|
||||
|
||||
if (isAwaitable) {
|
||||
awaited = true;
|
||||
return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => {
|
||||
if (error) return onHeartbeatFailed(error);
|
||||
return onHeartbeatSucceeded(hello);
|
||||
});
|
||||
}
|
||||
|
||||
awaited = false;
|
||||
connection
|
||||
.command(ns('admin.$cmd'), cmd, options)
|
||||
.then(onHeartbeatSucceeded, onHeartbeatFailed);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// connecting does an implicit `hello`
|
||||
(async () => {
|
||||
const socket = await makeSocket(monitor.connectOptions);
|
||||
const connection = makeConnection(monitor.connectOptions, socket);
|
||||
// The start time is after socket creation but before the handshake
|
||||
start = now();
|
||||
try {
|
||||
await performInitialHandshake(connection, monitor.connectOptions);
|
||||
return connection;
|
||||
} catch (error) {
|
||||
connection.destroy();
|
||||
throw error;
|
||||
}
|
||||
})().then(
|
||||
connection => {
|
||||
if (isInCloseState(monitor)) {
|
||||
connection.destroy();
|
||||
return;
|
||||
}
|
||||
const duration = calculateDurationInMs(start);
|
||||
monitor.addRttSample(duration);
|
||||
|
||||
monitor.connection = connection;
|
||||
monitor.emitAndLogHeartbeat(
|
||||
Server.SERVER_HEARTBEAT_SUCCEEDED,
|
||||
monitor.server.topology.s.id,
|
||||
connection.hello?.connectionId,
|
||||
new ServerHeartbeatSucceededEvent(
|
||||
monitor.address,
|
||||
duration,
|
||||
connection.hello,
|
||||
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
|
||||
)
|
||||
);
|
||||
|
||||
callback(undefined, connection.hello);
|
||||
},
|
||||
error => {
|
||||
monitor.connection = null;
|
||||
awaited = false;
|
||||
onHeartbeatFailed(error);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
function monitorServer(monitor: Monitor) {
|
||||
return (callback: Callback) => {
|
||||
if (monitor.s.state === STATE_MONITORING) {
|
||||
process.nextTick(callback);
|
||||
return;
|
||||
}
|
||||
stateTransition(monitor, STATE_MONITORING);
|
||||
function done() {
|
||||
if (!isInCloseState(monitor)) {
|
||||
stateTransition(monitor, STATE_IDLE);
|
||||
}
|
||||
|
||||
callback();
|
||||
}
|
||||
|
||||
checkServer(monitor, (err, hello) => {
|
||||
if (err) {
|
||||
// otherwise an error occurred on initial discovery, also bail
|
||||
if (monitor.server.description.type === ServerType.Unknown) {
|
||||
return done();
|
||||
}
|
||||
}
|
||||
|
||||
// if the check indicates streaming is supported, immediately reschedule monitoring
|
||||
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
|
||||
setTimeout(() => {
|
||||
if (!isInCloseState(monitor)) {
|
||||
monitor.monitorId?.wake();
|
||||
}
|
||||
}, 0);
|
||||
}
|
||||
|
||||
done();
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function makeTopologyVersion(tv: TopologyVersion) {
|
||||
return {
|
||||
processId: tv.processId,
|
||||
// tests mock counter as just number, but in a real situation counter should always be a Long
|
||||
// TODO(NODE-2674): Preserve int64 sent from MongoDB
|
||||
counter: Long.isLong(tv.counter) ? tv.counter : Long.fromNumber(tv.counter)
|
||||
};
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface RTTPingerOptions extends ConnectionOptions {
|
||||
heartbeatFrequencyMS: number;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export class RTTPinger {
|
||||
connection?: Connection;
|
||||
/** @internal */
|
||||
cancellationToken: CancellationToken;
|
||||
/** @internal */
|
||||
monitorId: NodeJS.Timeout;
|
||||
/** @internal */
|
||||
monitor: Monitor;
|
||||
closed: boolean;
|
||||
/** @internal */
|
||||
latestRtt?: number;
|
||||
|
||||
constructor(monitor: Monitor) {
|
||||
this.connection = undefined;
|
||||
this.cancellationToken = monitor.cancellationToken;
|
||||
this.closed = false;
|
||||
this.monitor = monitor;
|
||||
this.latestRtt = monitor.latestRtt ?? undefined;
|
||||
|
||||
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
|
||||
this.monitorId = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
|
||||
}
|
||||
|
||||
get roundTripTime(): number {
|
||||
return this.monitor.roundTripTime;
|
||||
}
|
||||
|
||||
get minRoundTripTime(): number {
|
||||
return this.monitor.minRoundTripTime;
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.closed = true;
|
||||
clearTimeout(this.monitorId);
|
||||
|
||||
this.connection?.destroy();
|
||||
this.connection = undefined;
|
||||
}
|
||||
|
||||
private measureAndReschedule(start: number, conn?: Connection) {
|
||||
if (this.closed) {
|
||||
conn?.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.connection == null) {
|
||||
this.connection = conn;
|
||||
}
|
||||
|
||||
this.latestRtt = calculateDurationInMs(start);
|
||||
this.monitorId = setTimeout(
|
||||
() => this.measureRoundTripTime(),
|
||||
this.monitor.options.heartbeatFrequencyMS
|
||||
);
|
||||
}
|
||||
|
||||
private measureRoundTripTime() {
|
||||
const start = now();
|
||||
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
const connection = this.connection;
|
||||
if (connection == null) {
|
||||
connect(this.monitor.connectOptions).then(
|
||||
connection => {
|
||||
this.measureAndReschedule(start, connection);
|
||||
},
|
||||
() => {
|
||||
this.connection = undefined;
|
||||
}
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const commandName =
|
||||
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
|
||||
|
||||
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
|
||||
() => this.measureAndReschedule(start),
|
||||
() => {
|
||||
this.connection?.destroy();
|
||||
this.connection = undefined;
|
||||
return;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export interface MonitorIntervalOptions {
|
||||
/** The interval to execute a method on */
|
||||
heartbeatFrequencyMS: number;
|
||||
/** A minimum interval that must elapse before the method is called */
|
||||
minHeartbeatFrequencyMS: number;
|
||||
/** Whether the method should be called immediately when the interval is started */
|
||||
immediate: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export class MonitorInterval {
|
||||
fn: (callback: Callback) => void;
|
||||
timerId: NodeJS.Timeout | undefined;
|
||||
lastExecutionEnded: number;
|
||||
isExpeditedCallToFnScheduled = false;
|
||||
stopped = false;
|
||||
isExecutionInProgress = false;
|
||||
hasExecutedOnce = false;
|
||||
|
||||
heartbeatFrequencyMS: number;
|
||||
minHeartbeatFrequencyMS: number;
|
||||
|
||||
constructor(fn: (callback: Callback) => void, options: Partial<MonitorIntervalOptions> = {}) {
|
||||
this.fn = fn;
|
||||
this.lastExecutionEnded = -Infinity;
|
||||
|
||||
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
|
||||
this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
|
||||
|
||||
if (options.immediate) {
|
||||
this._executeAndReschedule();
|
||||
} else {
|
||||
this._reschedule(undefined);
|
||||
}
|
||||
}
|
||||
|
||||
wake() {
|
||||
const currentTime = now();
|
||||
const timeSinceLastCall = currentTime - this.lastExecutionEnded;
|
||||
|
||||
// TODO(NODE-4674): Add error handling and logging to the monitor
|
||||
if (timeSinceLastCall < 0) {
|
||||
return this._executeAndReschedule();
|
||||
}
|
||||
|
||||
if (this.isExecutionInProgress) {
|
||||
return;
|
||||
}
|
||||
|
||||
// debounce multiple calls to wake within the `minInterval`
|
||||
if (this.isExpeditedCallToFnScheduled) {
|
||||
return;
|
||||
}
|
||||
|
||||
// reschedule a call as soon as possible, ensuring the call never happens
|
||||
// faster than the `minInterval`
|
||||
if (timeSinceLastCall < this.minHeartbeatFrequencyMS) {
|
||||
this.isExpeditedCallToFnScheduled = true;
|
||||
this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall);
|
||||
return;
|
||||
}
|
||||
|
||||
this._executeAndReschedule();
|
||||
}
|
||||
|
||||
stop() {
|
||||
this.stopped = true;
|
||||
if (this.timerId) {
|
||||
clearTimeout(this.timerId);
|
||||
this.timerId = undefined;
|
||||
}
|
||||
|
||||
this.lastExecutionEnded = -Infinity;
|
||||
this.isExpeditedCallToFnScheduled = false;
|
||||
}
|
||||
|
||||
toString() {
|
||||
return JSON.stringify(this);
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
const currentTime = now();
|
||||
const timeSinceLastCall = currentTime - this.lastExecutionEnded;
|
||||
return {
|
||||
timerId: this.timerId != null ? 'set' : 'cleared',
|
||||
lastCallTime: this.lastExecutionEnded,
|
||||
isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled,
|
||||
stopped: this.stopped,
|
||||
heartbeatFrequencyMS: this.heartbeatFrequencyMS,
|
||||
minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS,
|
||||
currentTime,
|
||||
timeSinceLastCall
|
||||
};
|
||||
}
|
||||
|
||||
private _reschedule(ms?: number) {
|
||||
if (this.stopped) return;
|
||||
if (this.timerId) {
|
||||
clearTimeout(this.timerId);
|
||||
}
|
||||
|
||||
this.timerId = setTimeout(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);
|
||||
}
|
||||
|
||||
private _executeAndReschedule = () => {
|
||||
if (this.stopped) return;
|
||||
if (this.timerId) {
|
||||
clearTimeout(this.timerId);
|
||||
}
|
||||
|
||||
this.isExpeditedCallToFnScheduled = false;
|
||||
this.isExecutionInProgress = true;
|
||||
|
||||
this.fn(() => {
|
||||
this.lastExecutionEnded = now();
|
||||
this.isExecutionInProgress = false;
|
||||
this._reschedule(this.heartbeatFrequencyMS);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/** @internal
|
||||
* This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
|
||||
*
|
||||
* This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
|
||||
* the most recent `windowSize` samples
|
||||
* */
|
||||
export class RTTSampler {
|
||||
/** Index of the next slot to be overwritten */
|
||||
private writeIndex: number;
|
||||
private length: number;
|
||||
private rttSamples: Float64Array;
|
||||
|
||||
constructor(windowSize = 10) {
|
||||
this.rttSamples = new Float64Array(windowSize);
|
||||
this.length = 0;
|
||||
this.writeIndex = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an rtt sample to the end of the circular buffer
|
||||
* When `windowSize` samples have been collected, `addSample` overwrites the least recently added
|
||||
* sample
|
||||
*/
|
||||
addSample(sample: number) {
|
||||
this.rttSamples[this.writeIndex++] = sample;
|
||||
if (this.length < this.rttSamples.length) {
|
||||
this.length++;
|
||||
}
|
||||
|
||||
this.writeIndex %= this.rttSamples.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* When \< 2 samples have been collected, returns 0
|
||||
* Otherwise computes the minimum value samples contained in the buffer
|
||||
*/
|
||||
min(): number {
|
||||
if (this.length < 2) return 0;
|
||||
let min = this.rttSamples[0];
|
||||
for (let i = 1; i < this.length; i++) {
|
||||
if (this.rttSamples[i] < min) min = this.rttSamples[i];
|
||||
}
|
||||
|
||||
return min;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns mean of samples contained in the buffer
|
||||
*/
|
||||
average(): number {
|
||||
if (this.length === 0) return 0;
|
||||
let sum = 0;
|
||||
for (let i = 0; i < this.length; i++) {
|
||||
sum += this.rttSamples[i];
|
||||
}
|
||||
|
||||
return sum / this.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns most recently inserted element in the buffer
|
||||
* Returns null if the buffer is empty
|
||||
* */
|
||||
get last(): number | null {
|
||||
if (this.length === 0) return null;
|
||||
return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1];
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the buffer
|
||||
* NOTE: this does not overwrite the data held in the internal array, just the pointers into
|
||||
* this array
|
||||
*/
|
||||
clear() {
|
||||
this.length = 0;
|
||||
this.writeIndex = 0;
|
||||
}
|
||||
}
|
||||
577
node_modules/mongodb/src/sdam/server.ts
generated
vendored
Normal file
577
node_modules/mongodb/src/sdam/server.ts
generated
vendored
Normal file
@@ -0,0 +1,577 @@
|
||||
import type { Document } from '../bson';
|
||||
import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter';
|
||||
import { type CommandOptions, Connection } from '../cmap/connection';
|
||||
import {
|
||||
ConnectionPool,
|
||||
type ConnectionPoolEvents,
|
||||
type ConnectionPoolOptions
|
||||
} from '../cmap/connection_pool';
|
||||
import { PoolClearedError } from '../cmap/errors';
|
||||
import {
|
||||
APM_EVENTS,
|
||||
CLOSED,
|
||||
CMAP_EVENTS,
|
||||
CONNECT,
|
||||
DESCRIPTION_RECEIVED,
|
||||
ENDED,
|
||||
HEARTBEAT_EVENTS,
|
||||
SERVER_HEARTBEAT_FAILED,
|
||||
SERVER_HEARTBEAT_STARTED,
|
||||
SERVER_HEARTBEAT_SUCCEEDED
|
||||
} from '../constants';
|
||||
import {
|
||||
type AnyError,
|
||||
isNodeShuttingDownError,
|
||||
isSDAMUnrecoverableError,
|
||||
MONGODB_ERROR_CODES,
|
||||
MongoError,
|
||||
MongoErrorLabel,
|
||||
MongoNetworkError,
|
||||
MongoNetworkTimeoutError,
|
||||
MongoRuntimeError,
|
||||
MongoServerClosedError,
|
||||
type MongoServerError,
|
||||
needsRetryableWriteLabel
|
||||
} from '../error';
|
||||
import type { ServerApi } from '../mongo_client';
|
||||
import { type Abortable, TypedEventEmitter } from '../mongo_types';
|
||||
import { AggregateOperation } from '../operations/aggregate';
|
||||
import type { GetMoreOptions } from '../operations/get_more';
|
||||
import { type AbstractOperation } from '../operations/operation';
|
||||
import type { ClientSession } from '../sessions';
|
||||
import { type TimeoutContext } from '../timeout';
|
||||
import { isTransactionCommand } from '../transactions';
|
||||
import {
|
||||
abortable,
|
||||
type EventEmitterWithState,
|
||||
makeStateMachine,
|
||||
maxWireVersion,
|
||||
noop,
|
||||
squashError,
|
||||
supportsRetryableWrites
|
||||
} from '../utils';
|
||||
import { throwIfWriteConcernError } from '../write_concern';
|
||||
import {
|
||||
type ClusterTime,
|
||||
STATE_CLOSED,
|
||||
STATE_CLOSING,
|
||||
STATE_CONNECTED,
|
||||
STATE_CONNECTING,
|
||||
TopologyType
|
||||
} from './common';
|
||||
import type {
|
||||
ServerHeartbeatFailedEvent,
|
||||
ServerHeartbeatStartedEvent,
|
||||
ServerHeartbeatSucceededEvent
|
||||
} from './events';
|
||||
import { Monitor, type MonitorOptions } from './monitor';
|
||||
import { compareTopologyVersion, ServerDescription } from './server_description';
|
||||
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from './server_selection';
|
||||
import type { Topology } from './topology';
|
||||
|
||||
const stateTransition = makeStateMachine({
|
||||
[STATE_CLOSED]: [STATE_CLOSED, STATE_CONNECTING],
|
||||
[STATE_CONNECTING]: [STATE_CONNECTING, STATE_CLOSING, STATE_CONNECTED, STATE_CLOSED],
|
||||
[STATE_CONNECTED]: [STATE_CONNECTED, STATE_CLOSING, STATE_CLOSED],
|
||||
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
|
||||
});
|
||||
|
||||
/** @internal */
|
||||
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
|
||||
MonitorOptions;
|
||||
|
||||
/** @internal */
|
||||
export interface ServerPrivate {
|
||||
/** The server description for this server */
|
||||
description: ServerDescription;
|
||||
/** A copy of the options used to construct this instance */
|
||||
options: ServerOptions;
|
||||
/** The current state of the Server */
|
||||
state: string;
|
||||
/** MongoDB server API version */
|
||||
serverApi?: ServerApi;
|
||||
/** A count of the operations currently running against the server. */
|
||||
operationCount: number;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export type ServerEvents = {
|
||||
serverHeartbeatStarted(event: ServerHeartbeatStartedEvent): void;
|
||||
serverHeartbeatSucceeded(event: ServerHeartbeatSucceededEvent): void;
|
||||
serverHeartbeatFailed(event: ServerHeartbeatFailedEvent): void;
|
||||
/** Top level MongoClient doesn't emit this so it is marked: @internal */
|
||||
connect(server: Server): void;
|
||||
descriptionReceived(description: ServerDescription): void;
|
||||
closed(): void;
|
||||
ended(): void;
|
||||
} & ConnectionPoolEvents &
|
||||
EventEmitterWithState;
|
||||
|
||||
/** @internal */
|
||||
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
|
||||
timeoutContext: TimeoutContext;
|
||||
returnFieldSelector?: Document | null;
|
||||
} & Abortable;
|
||||
|
||||
/** @internal */
|
||||
export class Server extends TypedEventEmitter<ServerEvents> {
|
||||
/** @internal */
|
||||
s: ServerPrivate;
|
||||
/** @internal */
|
||||
topology: Topology;
|
||||
/** @internal */
|
||||
pool: ConnectionPool;
|
||||
serverApi?: ServerApi;
|
||||
hello?: Document;
|
||||
monitor: Monitor | null;
|
||||
|
||||
/** @event */
|
||||
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
|
||||
/** @event */
|
||||
static readonly SERVER_HEARTBEAT_SUCCEEDED = SERVER_HEARTBEAT_SUCCEEDED;
|
||||
/** @event */
|
||||
static readonly SERVER_HEARTBEAT_FAILED = SERVER_HEARTBEAT_FAILED;
|
||||
/** @event */
|
||||
static readonly CONNECT = CONNECT;
|
||||
/** @event */
|
||||
static readonly DESCRIPTION_RECEIVED = DESCRIPTION_RECEIVED;
|
||||
/** @event */
|
||||
static readonly CLOSED = CLOSED;
|
||||
/** @event */
|
||||
static readonly ENDED = ENDED;
|
||||
|
||||
/**
|
||||
* Create a server
|
||||
*/
|
||||
constructor(topology: Topology, description: ServerDescription, options: ServerOptions) {
|
||||
super();
|
||||
this.on('error', noop);
|
||||
|
||||
this.serverApi = options.serverApi;
|
||||
|
||||
const poolOptions = { hostAddress: description.hostAddress, ...options };
|
||||
|
||||
this.topology = topology;
|
||||
this.pool = new ConnectionPool(this, poolOptions);
|
||||
|
||||
this.s = {
|
||||
description,
|
||||
options,
|
||||
state: STATE_CLOSED,
|
||||
operationCount: 0
|
||||
};
|
||||
|
||||
for (const event of [...CMAP_EVENTS, ...APM_EVENTS]) {
|
||||
this.pool.on(event, (e: any) => this.emit(event, e));
|
||||
}
|
||||
|
||||
this.pool.on(Connection.CLUSTER_TIME_RECEIVED, (clusterTime: ClusterTime) => {
|
||||
this.clusterTime = clusterTime;
|
||||
});
|
||||
|
||||
if (this.loadBalanced) {
|
||||
this.monitor = null;
|
||||
// monitoring is disabled in load balancing mode
|
||||
return;
|
||||
}
|
||||
|
||||
// create the monitor
|
||||
this.monitor = new Monitor(this, this.s.options);
|
||||
|
||||
for (const event of HEARTBEAT_EVENTS) {
|
||||
this.monitor.on(event, (e: any) => this.emit(event, e));
|
||||
}
|
||||
|
||||
this.monitor.on('resetServer', (error: MongoServerError) => markServerUnknown(this, error));
|
||||
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
|
||||
this.emit(
|
||||
Server.DESCRIPTION_RECEIVED,
|
||||
new ServerDescription(this.description.hostAddress, event.reply, {
|
||||
roundTripTime: this.monitor?.roundTripTime,
|
||||
minRoundTripTime: this.monitor?.minRoundTripTime
|
||||
})
|
||||
);
|
||||
|
||||
if (this.s.state === STATE_CONNECTING) {
|
||||
stateTransition(this, STATE_CONNECTED);
|
||||
this.emit(Server.CONNECT, this);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
get clusterTime(): ClusterTime | undefined {
|
||||
return this.topology.clusterTime;
|
||||
}
|
||||
|
||||
set clusterTime(clusterTime: ClusterTime | undefined) {
|
||||
this.topology.clusterTime = clusterTime;
|
||||
}
|
||||
|
||||
get description(): ServerDescription {
|
||||
return this.s.description;
|
||||
}
|
||||
|
||||
get name(): string {
|
||||
return this.s.description.address;
|
||||
}
|
||||
|
||||
get autoEncrypter(): AutoEncrypter | undefined {
|
||||
if (this.s.options && this.s.options.autoEncrypter) {
|
||||
return this.s.options.autoEncrypter;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
get loadBalanced(): boolean {
|
||||
return this.topology.description.type === TopologyType.LoadBalanced;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate server connect
|
||||
*/
|
||||
connect(): void {
|
||||
if (this.s.state !== STATE_CLOSED) {
|
||||
return;
|
||||
}
|
||||
|
||||
stateTransition(this, STATE_CONNECTING);
|
||||
|
||||
// If in load balancer mode we automatically set the server to
|
||||
// a load balancer. It never transitions out of this state and
|
||||
// has no monitor.
|
||||
if (!this.loadBalanced) {
|
||||
this.monitor?.connect();
|
||||
} else {
|
||||
stateTransition(this, STATE_CONNECTED);
|
||||
this.emit(Server.CONNECT, this);
|
||||
}
|
||||
}
|
||||
|
||||
closeCheckedOutConnections() {
|
||||
return this.pool.closeCheckedOutConnections();
|
||||
}
|
||||
|
||||
/** Destroy the server connection */
|
||||
close(): void {
|
||||
if (this.s.state === STATE_CLOSED) {
|
||||
return;
|
||||
}
|
||||
|
||||
stateTransition(this, STATE_CLOSING);
|
||||
|
||||
if (!this.loadBalanced) {
|
||||
this.monitor?.close();
|
||||
}
|
||||
|
||||
this.pool.close();
|
||||
stateTransition(this, STATE_CLOSED);
|
||||
this.emit('closed');
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediately schedule monitoring of this server. If there already an attempt being made
|
||||
* this will be a no-op.
|
||||
*/
|
||||
requestCheck(): void {
|
||||
if (!this.loadBalanced) {
|
||||
this.monitor?.requestCheck();
|
||||
}
|
||||
}
|
||||
|
||||
public async command<TResult>(
|
||||
operation: AbstractOperation<TResult>,
|
||||
timeoutContext: TimeoutContext
|
||||
): Promise<InstanceType<typeof operation.SERVER_COMMAND_RESPONSE_TYPE>> {
|
||||
if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) {
|
||||
throw new MongoServerClosedError();
|
||||
}
|
||||
const session = operation.session;
|
||||
|
||||
let conn = session?.pinnedConnection;
|
||||
|
||||
this.incrementOperationCount();
|
||||
if (conn == null) {
|
||||
try {
|
||||
conn = await this.pool.checkOut({ timeoutContext, signal: operation.options.signal });
|
||||
} catch (checkoutError) {
|
||||
this.decrementOperationCount();
|
||||
if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError);
|
||||
throw checkoutError;
|
||||
}
|
||||
}
|
||||
|
||||
let reauthPromise: Promise<void> | null = null;
|
||||
const cleanup = () => {
|
||||
this.decrementOperationCount();
|
||||
if (session?.pinnedConnection !== conn) {
|
||||
if (reauthPromise != null) {
|
||||
// The reauth promise only exists if it hasn't thrown.
|
||||
const checkBackIn = () => {
|
||||
this.pool.checkIn(conn);
|
||||
};
|
||||
void reauthPromise.then(checkBackIn, checkBackIn);
|
||||
} else {
|
||||
this.pool.checkIn(conn);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let cmd;
|
||||
try {
|
||||
cmd = operation.buildCommand(conn, session);
|
||||
} catch (e) {
|
||||
cleanup();
|
||||
throw e;
|
||||
}
|
||||
|
||||
const options = operation.buildOptions(timeoutContext);
|
||||
const ns = operation.ns;
|
||||
|
||||
if (this.loadBalanced && isPinnableCommand(cmd, session) && !session?.pinnedConnection) {
|
||||
session?.pin(conn);
|
||||
}
|
||||
|
||||
options.directConnection = this.topology.s.options.directConnection;
|
||||
|
||||
const omitReadPreference =
|
||||
operation instanceof AggregateOperation &&
|
||||
operation.hasWriteStage &&
|
||||
maxWireVersion(conn) < MIN_SECONDARY_WRITE_WIRE_VERSION;
|
||||
if (omitReadPreference) {
|
||||
delete options.readPreference;
|
||||
}
|
||||
|
||||
if (this.description.iscryptd) {
|
||||
options.omitMaxTimeMS = true;
|
||||
}
|
||||
|
||||
try {
|
||||
try {
|
||||
const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE);
|
||||
throwIfWriteConcernError(res);
|
||||
return res;
|
||||
} catch (commandError) {
|
||||
throw this.decorateCommandError(conn, cmd, options, commandError);
|
||||
}
|
||||
} catch (operationError) {
|
||||
if (
|
||||
operationError instanceof MongoError &&
|
||||
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
|
||||
) {
|
||||
reauthPromise = this.pool.reauthenticate(conn);
|
||||
reauthPromise.then(undefined, error => {
|
||||
reauthPromise = null;
|
||||
squashError(error);
|
||||
});
|
||||
|
||||
await abortable(reauthPromise, options);
|
||||
reauthPromise = null; // only reachable if reauth succeeds
|
||||
|
||||
try {
|
||||
const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE);
|
||||
throwIfWriteConcernError(res);
|
||||
return res;
|
||||
} catch (commandError) {
|
||||
throw this.decorateCommandError(conn, cmd, options, commandError);
|
||||
}
|
||||
} else {
|
||||
throw operationError;
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle SDAM error
|
||||
* @internal
|
||||
*/
|
||||
handleError(error: AnyError, connection?: Connection) {
|
||||
if (!(error instanceof MongoError)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const isStaleError =
|
||||
error.connectionGeneration && error.connectionGeneration < this.pool.generation;
|
||||
if (isStaleError) {
|
||||
return;
|
||||
}
|
||||
|
||||
const isNetworkNonTimeoutError =
|
||||
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
|
||||
const isNetworkTimeoutBeforeHandshakeError =
|
||||
error instanceof MongoNetworkError && error.beforeHandshake;
|
||||
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
|
||||
if (isNetworkNonTimeoutError || isNetworkTimeoutBeforeHandshakeError || isAuthHandshakeError) {
|
||||
// In load balanced mode we never mark the server as unknown and always
|
||||
// clear for the specific service id.
|
||||
if (!this.loadBalanced) {
|
||||
error.addErrorLabel(MongoErrorLabel.ResetPool);
|
||||
markServerUnknown(this, error);
|
||||
} else if (connection) {
|
||||
this.pool.clear({ serviceId: connection.serviceId });
|
||||
}
|
||||
} else {
|
||||
if (isSDAMUnrecoverableError(error)) {
|
||||
if (shouldHandleStateChangeError(this, error)) {
|
||||
const shouldClearPool = isNodeShuttingDownError(error);
|
||||
if (this.loadBalanced && connection && shouldClearPool) {
|
||||
this.pool.clear({ serviceId: connection.serviceId });
|
||||
}
|
||||
|
||||
if (!this.loadBalanced) {
|
||||
if (shouldClearPool) {
|
||||
error.addErrorLabel(MongoErrorLabel.ResetPool);
|
||||
}
|
||||
markServerUnknown(this, error);
|
||||
process.nextTick(() => this.requestCheck());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that error is properly decorated and internal state is updated before throwing
|
||||
* @internal
|
||||
*/
|
||||
private decorateCommandError(
|
||||
connection: Connection,
|
||||
cmd: Document,
|
||||
options: CommandOptions | GetMoreOptions | undefined,
|
||||
error: unknown
|
||||
): Error {
|
||||
if (typeof error !== 'object' || error == null || !('name' in error)) {
|
||||
throw new MongoRuntimeError('An unexpected error type: ' + typeof error);
|
||||
}
|
||||
|
||||
if (error.name === 'AbortError' && 'cause' in error && error.cause instanceof MongoError) {
|
||||
error = error.cause;
|
||||
}
|
||||
|
||||
if (!(error instanceof MongoError)) {
|
||||
// Node.js or some other error we have not special handling for
|
||||
return error as Error;
|
||||
}
|
||||
|
||||
if (connectionIsStale(this.pool, connection)) {
|
||||
return error;
|
||||
}
|
||||
|
||||
const session = options?.session;
|
||||
if (error instanceof MongoNetworkError) {
|
||||
if (session && !session.hasEnded && session.serverSession) {
|
||||
session.serverSession.isDirty = true;
|
||||
}
|
||||
|
||||
// inActiveTransaction check handles commit and abort.
|
||||
if (
|
||||
inActiveTransaction(session, cmd) &&
|
||||
!error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)
|
||||
) {
|
||||
error.addErrorLabel(MongoErrorLabel.TransientTransactionError);
|
||||
}
|
||||
|
||||
if (
|
||||
(isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) &&
|
||||
supportsRetryableWrites(this) &&
|
||||
!inActiveTransaction(session, cmd)
|
||||
) {
|
||||
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
|
||||
}
|
||||
} else {
|
||||
if (
|
||||
(isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) &&
|
||||
needsRetryableWriteLabel(error, maxWireVersion(this), this.description.type) &&
|
||||
!inActiveTransaction(session, cmd)
|
||||
) {
|
||||
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
session &&
|
||||
session.isPinned &&
|
||||
error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)
|
||||
) {
|
||||
session.unpin({ force: true });
|
||||
}
|
||||
|
||||
this.handleError(error, connection);
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the operation count, returning the new count.
|
||||
*/
|
||||
private decrementOperationCount(): number {
|
||||
return (this.s.operationCount -= 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the operation count, returning the new count.
|
||||
*/
|
||||
private incrementOperationCount(): number {
|
||||
return (this.s.operationCount += 1);
|
||||
}
|
||||
}
|
||||
|
||||
function markServerUnknown(server: Server, error?: MongoError) {
|
||||
// Load balancer servers can never be marked unknown.
|
||||
if (server.loadBalanced) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
|
||||
server.monitor?.reset();
|
||||
}
|
||||
|
||||
server.emit(
|
||||
Server.DESCRIPTION_RECEIVED,
|
||||
new ServerDescription(server.description.hostAddress, undefined, { error })
|
||||
);
|
||||
}
|
||||
|
||||
function isPinnableCommand(cmd: Document, session?: ClientSession): boolean {
|
||||
if (session) {
|
||||
return (
|
||||
session.inTransaction() ||
|
||||
(session.transaction.isCommitted && 'commitTransaction' in cmd) ||
|
||||
'aggregate' in cmd ||
|
||||
'find' in cmd ||
|
||||
'getMore' in cmd ||
|
||||
'listCollections' in cmd ||
|
||||
'listIndexes' in cmd ||
|
||||
'bulkWrite' in cmd
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function connectionIsStale(pool: ConnectionPool, connection: Connection) {
|
||||
if (connection.serviceId) {
|
||||
return (
|
||||
connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString())
|
||||
);
|
||||
}
|
||||
|
||||
return connection.generation !== pool.generation;
|
||||
}
|
||||
|
||||
function shouldHandleStateChangeError(server: Server, err: MongoError) {
|
||||
const etv = err.topologyVersion;
|
||||
const stv = server.description.topologyVersion;
|
||||
return compareTopologyVersion(stv, etv) < 0;
|
||||
}
|
||||
|
||||
function inActiveTransaction(session: ClientSession | undefined, cmd: Document) {
|
||||
return session && session.inTransaction() && !isTransactionCommand(cmd);
|
||||
}
|
||||
|
||||
/** this checks the retryWrites option passed down from the client options, it
|
||||
* does not check if the server supports retryable writes */
|
||||
function isRetryableWritesEnabled(topology: Topology) {
|
||||
return topology.s.options.retryWrites !== false;
|
||||
}
|
||||
291
node_modules/mongodb/src/sdam/server_description.ts
generated
vendored
Normal file
291
node_modules/mongodb/src/sdam/server_description.ts
generated
vendored
Normal file
@@ -0,0 +1,291 @@
|
||||
import { type Document, Long, type ObjectId } from '../bson';
|
||||
import { type MongoError, MongoRuntimeError } from '../error';
|
||||
import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils';
|
||||
import { type ClusterTime, ServerType } from './common';
|
||||
|
||||
const WRITABLE_SERVER_TYPES = new Set<ServerType>([
|
||||
ServerType.RSPrimary,
|
||||
ServerType.Standalone,
|
||||
ServerType.Mongos,
|
||||
ServerType.LoadBalancer
|
||||
]);
|
||||
|
||||
const DATA_BEARING_SERVER_TYPES = new Set<ServerType>([
|
||||
ServerType.RSPrimary,
|
||||
ServerType.RSSecondary,
|
||||
ServerType.Mongos,
|
||||
ServerType.Standalone,
|
||||
ServerType.LoadBalancer
|
||||
]);
|
||||
|
||||
/** @public */
|
||||
export interface TopologyVersion {
|
||||
processId: ObjectId;
|
||||
counter: Long;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export type TagSet = { [key: string]: string };
|
||||
|
||||
/** @internal */
|
||||
export interface ServerDescriptionOptions {
|
||||
/** An Error used for better reporting debugging */
|
||||
error?: MongoError;
|
||||
|
||||
/** The average round trip time to ping this server (in ms) */
|
||||
roundTripTime?: number;
|
||||
/** The minimum round trip time to ping this server over the past 10 samples(in ms) */
|
||||
minRoundTripTime?: number;
|
||||
|
||||
/** If the client is in load balancing mode. */
|
||||
loadBalanced?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* The client's view of a single server, based on the most recent hello outcome.
|
||||
*
|
||||
* Internal type, not meant to be directly instantiated
|
||||
* @public
|
||||
*/
|
||||
export class ServerDescription {
|
||||
address: string;
|
||||
type: ServerType;
|
||||
hosts: string[];
|
||||
passives: string[];
|
||||
arbiters: string[];
|
||||
tags: TagSet;
|
||||
error: MongoError | null;
|
||||
topologyVersion: TopologyVersion | null;
|
||||
minWireVersion: number;
|
||||
maxWireVersion: number;
|
||||
roundTripTime: number;
|
||||
/** The minimum measurement of the last 10 measurements of roundTripTime that have been collected */
|
||||
minRoundTripTime: number;
|
||||
lastUpdateTime: number;
|
||||
lastWriteDate: number;
|
||||
me: string | null;
|
||||
primary: string | null;
|
||||
setName: string | null;
|
||||
setVersion: number | null;
|
||||
electionId: ObjectId | null;
|
||||
logicalSessionTimeoutMinutes: number | null;
|
||||
/** The max message size in bytes for the server. */
|
||||
maxMessageSizeBytes: number | null;
|
||||
/** The max number of writes in a bulk write command. */
|
||||
maxWriteBatchSize: number | null;
|
||||
/** The max bson object size. */
|
||||
maxBsonObjectSize: number | null;
|
||||
/** Indicates server is a mongocryptd instance. */
|
||||
iscryptd: boolean;
|
||||
|
||||
// NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level
|
||||
$clusterTime?: ClusterTime;
|
||||
|
||||
/**
|
||||
* Create a ServerDescription
|
||||
* @internal
|
||||
*
|
||||
* @param address - The address of the server
|
||||
* @param hello - An optional hello response for this server
|
||||
*/
|
||||
constructor(
|
||||
address: HostAddress | string,
|
||||
hello?: Document,
|
||||
options: ServerDescriptionOptions = {}
|
||||
) {
|
||||
if (address == null || address === '') {
|
||||
throw new MongoRuntimeError('ServerDescription must be provided with a non-empty address');
|
||||
}
|
||||
|
||||
this.address =
|
||||
typeof address === 'string'
|
||||
? HostAddress.fromString(address).toString() // Use HostAddress to normalize
|
||||
: address.toString();
|
||||
this.type = parseServerType(hello, options);
|
||||
this.hosts = hello?.hosts?.map((host: string) => host.toLowerCase()) ?? [];
|
||||
this.passives = hello?.passives?.map((host: string) => host.toLowerCase()) ?? [];
|
||||
this.arbiters = hello?.arbiters?.map((host: string) => host.toLowerCase()) ?? [];
|
||||
this.tags = hello?.tags ?? {};
|
||||
this.minWireVersion = hello?.minWireVersion ?? 0;
|
||||
this.maxWireVersion = hello?.maxWireVersion ?? 0;
|
||||
this.roundTripTime = options?.roundTripTime ?? -1;
|
||||
this.minRoundTripTime = options?.minRoundTripTime ?? 0;
|
||||
this.lastUpdateTime = now();
|
||||
this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;
|
||||
// NOTE: This actually builds the stack string instead of holding onto the getter and all its
|
||||
// associated references. This is done to prevent a memory leak.
|
||||
this.error = options.error ?? null;
|
||||
this.error?.stack;
|
||||
// TODO(NODE-2674): Preserve int64 sent from MongoDB
|
||||
this.topologyVersion = this.error?.topologyVersion ?? hello?.topologyVersion ?? null;
|
||||
this.setName = hello?.setName ?? null;
|
||||
this.setVersion = hello?.setVersion ?? null;
|
||||
this.electionId = hello?.electionId ?? null;
|
||||
this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null;
|
||||
this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? null;
|
||||
this.maxWriteBatchSize = hello?.maxWriteBatchSize ?? null;
|
||||
this.maxBsonObjectSize = hello?.maxBsonObjectSize ?? null;
|
||||
this.primary = hello?.primary ?? null;
|
||||
this.me = hello?.me?.toLowerCase() ?? null;
|
||||
this.$clusterTime = hello?.$clusterTime ?? null;
|
||||
this.iscryptd = Boolean(hello?.iscryptd);
|
||||
}
|
||||
|
||||
get hostAddress(): HostAddress {
|
||||
return HostAddress.fromString(this.address);
|
||||
}
|
||||
|
||||
get allHosts(): string[] {
|
||||
return this.hosts.concat(this.arbiters).concat(this.passives);
|
||||
}
|
||||
|
||||
/** Is this server available for reads*/
|
||||
get isReadable(): boolean {
|
||||
return this.type === ServerType.RSSecondary || this.isWritable;
|
||||
}
|
||||
|
||||
/** Is this server data bearing */
|
||||
get isDataBearing(): boolean {
|
||||
return DATA_BEARING_SERVER_TYPES.has(this.type);
|
||||
}
|
||||
|
||||
/** Is this server available for writes */
|
||||
get isWritable(): boolean {
|
||||
return WRITABLE_SERVER_TYPES.has(this.type);
|
||||
}
|
||||
|
||||
get host(): string {
|
||||
const chopLength = `:${this.port}`.length;
|
||||
return this.address.slice(0, -chopLength);
|
||||
}
|
||||
|
||||
get port(): number {
|
||||
const port = this.address.split(':').pop();
|
||||
return port ? Number.parseInt(port, 10) : 27017;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if another `ServerDescription` is equal to this one per the rules defined in the SDAM specification.
|
||||
* @see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.md
|
||||
*/
|
||||
equals(other?: ServerDescription | null): boolean {
|
||||
// Despite using the comparator that would determine a nullish topologyVersion as greater than
|
||||
// for equality we should only always perform direct equality comparison
|
||||
const topologyVersionsEqual =
|
||||
this.topologyVersion === other?.topologyVersion ||
|
||||
compareTopologyVersion(this.topologyVersion, other?.topologyVersion) === 0;
|
||||
|
||||
const electionIdsEqual =
|
||||
this.electionId != null && other?.electionId != null
|
||||
? compareObjectId(this.electionId, other.electionId) === 0
|
||||
: this.electionId === other?.electionId;
|
||||
|
||||
return (
|
||||
other != null &&
|
||||
other.iscryptd === this.iscryptd &&
|
||||
errorStrictEqual(this.error, other.error) &&
|
||||
this.type === other.type &&
|
||||
this.minWireVersion === other.minWireVersion &&
|
||||
arrayStrictEqual(this.hosts, other.hosts) &&
|
||||
tagsStrictEqual(this.tags, other.tags) &&
|
||||
this.setName === other.setName &&
|
||||
this.setVersion === other.setVersion &&
|
||||
electionIdsEqual &&
|
||||
this.primary === other.primary &&
|
||||
this.logicalSessionTimeoutMinutes === other.logicalSessionTimeoutMinutes &&
|
||||
topologyVersionsEqual
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Parses a `hello` message and determines the server type
|
||||
export function parseServerType(hello?: Document, options?: ServerDescriptionOptions): ServerType {
|
||||
if (options?.loadBalanced) {
|
||||
return ServerType.LoadBalancer;
|
||||
}
|
||||
|
||||
if (!hello || !hello.ok) {
|
||||
return ServerType.Unknown;
|
||||
}
|
||||
|
||||
if (hello.isreplicaset) {
|
||||
return ServerType.RSGhost;
|
||||
}
|
||||
|
||||
if (hello.msg && hello.msg === 'isdbgrid') {
|
||||
return ServerType.Mongos;
|
||||
}
|
||||
|
||||
if (hello.setName) {
|
||||
if (hello.hidden) {
|
||||
return ServerType.RSOther;
|
||||
} else if (hello.isWritablePrimary) {
|
||||
return ServerType.RSPrimary;
|
||||
} else if (hello.secondary) {
|
||||
return ServerType.RSSecondary;
|
||||
} else if (hello.arbiterOnly) {
|
||||
return ServerType.RSArbiter;
|
||||
} else {
|
||||
return ServerType.RSOther;
|
||||
}
|
||||
}
|
||||
|
||||
return ServerType.Standalone;
|
||||
}
|
||||
|
||||
function tagsStrictEqual(tags: TagSet, tags2: TagSet): boolean {
|
||||
const tagsKeys = Object.keys(tags);
|
||||
const tags2Keys = Object.keys(tags2);
|
||||
|
||||
return (
|
||||
tagsKeys.length === tags2Keys.length &&
|
||||
tagsKeys.every((key: string) => tags2[key] === tags[key])
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two topology versions.
|
||||
*
|
||||
* 1. If the response topologyVersion is unset or the ServerDescription's
|
||||
* topologyVersion is null, the client MUST assume the response is more recent.
|
||||
* 1. If the response's topologyVersion.processId is not equal to the
|
||||
* ServerDescription's, the client MUST assume the response is more recent.
|
||||
* 1. If the response's topologyVersion.processId is equal to the
|
||||
* ServerDescription's, the client MUST use the counter field to determine
|
||||
* which topologyVersion is more recent.
|
||||
*
|
||||
* ```ts
|
||||
* currentTv < newTv === -1
|
||||
* currentTv === newTv === 0
|
||||
* currentTv > newTv === 1
|
||||
* ```
|
||||
*/
|
||||
export function compareTopologyVersion(
|
||||
currentTv?: TopologyVersion | null,
|
||||
newTv?: TopologyVersion | null
|
||||
): 0 | -1 | 1 {
|
||||
if (currentTv == null || newTv == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!currentTv.processId.equals(newTv.processId)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO(NODE-2674): Preserve int64 sent from MongoDB
|
||||
const currentCounter =
|
||||
typeof currentTv.counter === 'bigint'
|
||||
? Long.fromBigInt(currentTv.counter)
|
||||
: Long.isLong(currentTv.counter)
|
||||
? currentTv.counter
|
||||
: Long.fromNumber(currentTv.counter);
|
||||
|
||||
const newCounter =
|
||||
typeof newTv.counter === 'bigint'
|
||||
? Long.fromBigInt(newTv.counter)
|
||||
: Long.isLong(newTv.counter)
|
||||
? newTv.counter
|
||||
: Long.fromNumber(newTv.counter);
|
||||
|
||||
return currentCounter.compare(newCounter);
|
||||
}
|
||||
323
node_modules/mongodb/src/sdam/server_selection.ts
generated
vendored
Normal file
323
node_modules/mongodb/src/sdam/server_selection.ts
generated
vendored
Normal file
@@ -0,0 +1,323 @@
|
||||
import { MongoInvalidArgumentError } from '../error';
|
||||
import { ReadPreference } from '../read_preference';
|
||||
import { ServerType, TopologyType } from './common';
|
||||
import type { ServerDescription, TagSet } from './server_description';
|
||||
import type { TopologyDescription } from './topology_description';
|
||||
|
||||
// max staleness constants
|
||||
const IDLE_WRITE_PERIOD = 10000;
|
||||
const SMALLEST_MAX_STALENESS_SECONDS = 90;
|
||||
|
||||
// Minimum version to try writes on secondaries.
|
||||
export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
|
||||
|
||||
/** @internal */
|
||||
export type ServerSelector = (
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[],
|
||||
deprioritized?: ServerDescription[]
|
||||
) => ServerDescription[];
|
||||
|
||||
/**
|
||||
* Returns a server selector that selects for writable servers
|
||||
*/
|
||||
export function writableServerSelector(): ServerSelector {
|
||||
return function writableServer(
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[]
|
||||
): ServerDescription[] {
|
||||
return latencyWindowReducer(
|
||||
topologyDescription,
|
||||
servers.filter((s: ServerDescription) => s.isWritable)
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* The purpose of this selector is to select the same server, only
|
||||
* if it is in a state that it can have commands sent to it.
|
||||
*/
|
||||
export function sameServerSelector(description?: ServerDescription): ServerSelector {
|
||||
return function sameServerSelector(
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[]
|
||||
): ServerDescription[] {
|
||||
if (!description) return [];
|
||||
// Filter the servers to match the provided description only if
|
||||
// the type is not unknown.
|
||||
return servers.filter(sd => {
|
||||
return sd.address === description.address && sd.type !== ServerType.Unknown;
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a server selector that uses a read preference to select a
|
||||
* server potentially for a write on a secondary.
|
||||
*/
|
||||
export function secondaryWritableServerSelector(
|
||||
wireVersion?: number,
|
||||
readPreference?: ReadPreference
|
||||
): ServerSelector {
|
||||
// If server version < 5.0, read preference always primary.
|
||||
// If server version >= 5.0...
|
||||
// - If read preference is supplied, use that.
|
||||
// - If no read preference is supplied, use primary.
|
||||
if (
|
||||
!readPreference ||
|
||||
!wireVersion ||
|
||||
(wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)
|
||||
) {
|
||||
return readPreferenceServerSelector(ReadPreference.primary);
|
||||
}
|
||||
return readPreferenceServerSelector(readPreference);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
|
||||
* found here:
|
||||
*
|
||||
* @see https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.md
|
||||
*
|
||||
* @param readPreference - The read preference providing max staleness guidance
|
||||
* @param topologyDescription - The topology description
|
||||
* @param servers - The list of server descriptions to be reduced
|
||||
* @returns The list of servers that satisfy the requirements of max staleness
|
||||
*/
|
||||
function maxStalenessReducer(
|
||||
readPreference: ReadPreference,
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[]
|
||||
): ServerDescription[] {
|
||||
if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
|
||||
return servers;
|
||||
}
|
||||
|
||||
const maxStaleness = readPreference.maxStalenessSeconds;
|
||||
const maxStalenessVariance =
|
||||
(topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
|
||||
if (maxStaleness < maxStalenessVariance) {
|
||||
throw new MongoInvalidArgumentError(
|
||||
`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`
|
||||
);
|
||||
}
|
||||
|
||||
if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
|
||||
throw new MongoInvalidArgumentError(
|
||||
`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`
|
||||
);
|
||||
}
|
||||
|
||||
if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) {
|
||||
const primary: ServerDescription = Array.from(topologyDescription.servers.values()).filter(
|
||||
primaryFilter
|
||||
)[0];
|
||||
|
||||
return servers.reduce((result: ServerDescription[], server: ServerDescription) => {
|
||||
const stalenessMS =
|
||||
server.lastUpdateTime -
|
||||
server.lastWriteDate -
|
||||
(primary.lastUpdateTime - primary.lastWriteDate) +
|
||||
topologyDescription.heartbeatFrequencyMS;
|
||||
|
||||
const staleness = stalenessMS / 1000;
|
||||
const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
|
||||
if (staleness <= maxStalenessSeconds) {
|
||||
result.push(server);
|
||||
}
|
||||
|
||||
return result;
|
||||
}, []);
|
||||
}
|
||||
|
||||
if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) {
|
||||
if (servers.length === 0) {
|
||||
return servers;
|
||||
}
|
||||
|
||||
const sMax = servers.reduce((max: ServerDescription, s: ServerDescription) =>
|
||||
s.lastWriteDate > max.lastWriteDate ? s : max
|
||||
);
|
||||
|
||||
return servers.reduce((result: ServerDescription[], server: ServerDescription) => {
|
||||
const stalenessMS =
|
||||
sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
|
||||
|
||||
const staleness = stalenessMS / 1000;
|
||||
const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
|
||||
if (staleness <= maxStalenessSeconds) {
|
||||
result.push(server);
|
||||
}
|
||||
|
||||
return result;
|
||||
}, []);
|
||||
}
|
||||
|
||||
return servers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a server's tags match a given set of tags
|
||||
*
|
||||
* @param tagSet - The requested tag set to match
|
||||
* @param serverTags - The server's tags
|
||||
*/
|
||||
function tagSetMatch(tagSet: TagSet, serverTags: TagSet) {
|
||||
const keys = Object.keys(tagSet);
|
||||
const serverTagKeys = Object.keys(serverTags);
|
||||
for (let i = 0; i < keys.length; ++i) {
|
||||
const key = keys[i];
|
||||
if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces a set of server descriptions based on tags requested by the read preference
|
||||
*
|
||||
* @param readPreference - The read preference providing the requested tags
|
||||
* @param servers - The list of server descriptions to reduce
|
||||
* @returns The list of servers matching the requested tags
|
||||
*/
|
||||
function tagSetReducer(
|
||||
readPreference: ReadPreference,
|
||||
servers: ServerDescription[]
|
||||
): ServerDescription[] {
|
||||
if (
|
||||
readPreference.tags == null ||
|
||||
(Array.isArray(readPreference.tags) && readPreference.tags.length === 0)
|
||||
) {
|
||||
return servers;
|
||||
}
|
||||
|
||||
for (let i = 0; i < readPreference.tags.length; ++i) {
|
||||
const tagSet = readPreference.tags[i];
|
||||
const serversMatchingTagset = servers.reduce(
|
||||
(matched: ServerDescription[], server: ServerDescription) => {
|
||||
if (tagSetMatch(tagSet, server.tags)) matched.push(server);
|
||||
return matched;
|
||||
},
|
||||
[]
|
||||
);
|
||||
|
||||
if (serversMatchingTagset.length) {
|
||||
return serversMatchingTagset;
|
||||
}
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces a list of servers to ensure they fall within an acceptable latency window. This is
|
||||
* further specified in the "Server Selection" specification, found here:
|
||||
*
|
||||
* @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md
|
||||
*
|
||||
* @param topologyDescription - The topology description
|
||||
* @param servers - The list of servers to reduce
|
||||
* @returns The servers which fall within an acceptable latency window
|
||||
*/
|
||||
function latencyWindowReducer(
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[]
|
||||
): ServerDescription[] {
|
||||
const low = servers.reduce(
|
||||
(min: number, server: ServerDescription) => Math.min(server.roundTripTime, min),
|
||||
Infinity
|
||||
);
|
||||
|
||||
const high = low + topologyDescription.localThresholdMS;
|
||||
return servers.reduce((result: ServerDescription[], server: ServerDescription) => {
|
||||
if (server.roundTripTime <= high && server.roundTripTime >= low) result.push(server);
|
||||
return result;
|
||||
}, []);
|
||||
}
|
||||
|
||||
// filters
|
||||
function primaryFilter(server: ServerDescription): boolean {
|
||||
return server.type === ServerType.RSPrimary;
|
||||
}
|
||||
|
||||
function secondaryFilter(server: ServerDescription): boolean {
|
||||
return server.type === ServerType.RSSecondary;
|
||||
}
|
||||
|
||||
function nearestFilter(server: ServerDescription): boolean {
|
||||
return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary;
|
||||
}
|
||||
|
||||
function knownFilter(server: ServerDescription): boolean {
|
||||
return server.type !== ServerType.Unknown;
|
||||
}
|
||||
|
||||
function loadBalancerFilter(server: ServerDescription): boolean {
|
||||
return server.type === ServerType.LoadBalancer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a function which selects servers based on a provided read preference
|
||||
*
|
||||
* @param readPreference - The read preference to select with
|
||||
*/
|
||||
export function readPreferenceServerSelector(readPreference: ReadPreference): ServerSelector {
|
||||
if (!readPreference.isValid()) {
|
||||
throw new MongoInvalidArgumentError('Invalid read preference specified');
|
||||
}
|
||||
|
||||
return function readPreferenceServers(
|
||||
topologyDescription: TopologyDescription,
|
||||
servers: ServerDescription[],
|
||||
deprioritized: ServerDescription[] = []
|
||||
): ServerDescription[] {
|
||||
if (topologyDescription.type === TopologyType.LoadBalanced) {
|
||||
return servers.filter(loadBalancerFilter);
|
||||
}
|
||||
|
||||
if (topologyDescription.type === TopologyType.Unknown) {
|
||||
return [];
|
||||
}
|
||||
|
||||
if (topologyDescription.type === TopologyType.Single) {
|
||||
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
|
||||
}
|
||||
|
||||
if (topologyDescription.type === TopologyType.Sharded) {
|
||||
const filtered = servers.filter(server => {
|
||||
return !deprioritized.includes(server);
|
||||
});
|
||||
const selectable = filtered.length > 0 ? filtered : deprioritized;
|
||||
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
|
||||
}
|
||||
|
||||
const mode = readPreference.mode;
|
||||
if (mode === ReadPreference.PRIMARY) {
|
||||
return servers.filter(primaryFilter);
|
||||
}
|
||||
|
||||
if (mode === ReadPreference.PRIMARY_PREFERRED) {
|
||||
const result = servers.filter(primaryFilter);
|
||||
if (result.length) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
const filter = mode === ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
|
||||
const selectedServers = latencyWindowReducer(
|
||||
topologyDescription,
|
||||
tagSetReducer(
|
||||
readPreference,
|
||||
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
|
||||
)
|
||||
);
|
||||
|
||||
if (mode === ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
|
||||
return servers.filter(primaryFilter);
|
||||
}
|
||||
|
||||
return selectedServers;
|
||||
};
|
||||
}
|
||||
142
node_modules/mongodb/src/sdam/server_selection_events.ts
generated
vendored
Normal file
142
node_modules/mongodb/src/sdam/server_selection_events.ts
generated
vendored
Normal file
@@ -0,0 +1,142 @@
|
||||
import { HostAddress } from '.././utils';
|
||||
import {
|
||||
SERVER_SELECTION_FAILED,
|
||||
SERVER_SELECTION_STARTED,
|
||||
SERVER_SELECTION_SUCCEEDED,
|
||||
WAITING_FOR_SUITABLE_SERVER
|
||||
} from '../constants';
|
||||
import { type ReadPreference } from '../read_preference';
|
||||
import { type ServerSelector } from './server_selection';
|
||||
import type { TopologyDescription } from './topology_description';
|
||||
|
||||
/**
|
||||
* The base export class for all logs published from server selection
|
||||
* @internal
|
||||
* @category Log Type
|
||||
*/
|
||||
export abstract class ServerSelectionEvent {
|
||||
/** String representation of the selector being used to select the server.
|
||||
* Defaults to 'custom selector' for application-provided custom selector case.
|
||||
*/
|
||||
selector: string | ReadPreference | ServerSelector;
|
||||
/** The name of the operation for which a server is being selected. */
|
||||
operation: string;
|
||||
/** The current topology description. */
|
||||
topologyDescription: TopologyDescription;
|
||||
|
||||
/** @internal */
|
||||
abstract name:
|
||||
| typeof SERVER_SELECTION_STARTED
|
||||
| typeof SERVER_SELECTION_SUCCEEDED
|
||||
| typeof SERVER_SELECTION_FAILED
|
||||
| typeof WAITING_FOR_SUITABLE_SERVER;
|
||||
|
||||
abstract message: string;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
selector: string | ReadPreference | ServerSelector,
|
||||
topologyDescription: TopologyDescription,
|
||||
operation: string
|
||||
) {
|
||||
this.selector = selector;
|
||||
this.operation = operation;
|
||||
this.topologyDescription = topologyDescription;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An event published when server selection starts
|
||||
* @internal
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerSelectionStartedEvent extends ServerSelectionEvent {
|
||||
/** @internal */
|
||||
name = SERVER_SELECTION_STARTED;
|
||||
message = 'Server selection started';
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
selector: string | ReadPreference | ServerSelector,
|
||||
topologyDescription: TopologyDescription,
|
||||
operation: string
|
||||
) {
|
||||
super(selector, topologyDescription, operation);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An event published when a server selection fails
|
||||
* @internal
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerSelectionFailedEvent extends ServerSelectionEvent {
|
||||
/** @internal */
|
||||
name = SERVER_SELECTION_FAILED;
|
||||
message = 'Server selection failed';
|
||||
/** Representation of the error the driver will throw regarding server selection failing. */
|
||||
failure: Error;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
selector: string | ReadPreference | ServerSelector,
|
||||
topologyDescription: TopologyDescription,
|
||||
error: Error,
|
||||
operation: string
|
||||
) {
|
||||
super(selector, topologyDescription, operation);
|
||||
this.failure = error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An event published when server selection succeeds
|
||||
* @internal
|
||||
* @category Event
|
||||
*/
|
||||
export class ServerSelectionSucceededEvent extends ServerSelectionEvent {
|
||||
/** @internal */
|
||||
name = SERVER_SELECTION_SUCCEEDED;
|
||||
message = 'Server selection succeeded';
|
||||
/** The hostname, IP address, or Unix domain socket path for the selected server. */
|
||||
serverHost: string;
|
||||
/** The port for the selected server. Optional; not present for Unix domain sockets. When the user does not specify a port and the default (27017) is used, the driver SHOULD include it here. */
|
||||
serverPort: number | undefined;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
selector: string | ReadPreference | ServerSelector,
|
||||
topologyDescription: TopologyDescription,
|
||||
address: string,
|
||||
operation: string
|
||||
) {
|
||||
super(selector, topologyDescription, operation);
|
||||
const { host, port } = HostAddress.fromString(address).toHostPort();
|
||||
this.serverHost = host;
|
||||
this.serverPort = port;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An event published when server selection is waiting for a suitable server to become available
|
||||
* @internal
|
||||
* @category Event
|
||||
*/
|
||||
export class WaitingForSuitableServerEvent extends ServerSelectionEvent {
|
||||
/** @internal */
|
||||
name = WAITING_FOR_SUITABLE_SERVER;
|
||||
message = 'Waiting for suitable server to become available';
|
||||
/** The remaining time left until server selection will time out. */
|
||||
remainingTimeMS: number;
|
||||
|
||||
/** @internal */
|
||||
constructor(
|
||||
selector: string | ReadPreference | ServerSelector,
|
||||
topologyDescription: TopologyDescription,
|
||||
remainingTimeMS: number,
|
||||
operation: string
|
||||
) {
|
||||
super(selector, topologyDescription, operation);
|
||||
this.remainingTimeMS = remainingTimeMS;
|
||||
}
|
||||
}
|
||||
146
node_modules/mongodb/src/sdam/srv_polling.ts
generated
vendored
Normal file
146
node_modules/mongodb/src/sdam/srv_polling.ts
generated
vendored
Normal file
@@ -0,0 +1,146 @@
|
||||
import * as dns from 'dns';
|
||||
import { clearTimeout, setTimeout } from 'timers';
|
||||
|
||||
import { MongoRuntimeError } from '../error';
|
||||
import { TypedEventEmitter } from '../mongo_types';
|
||||
import { checkParentDomainMatch, HostAddress, noop, squashError } from '../utils';
|
||||
|
||||
/**
|
||||
* @internal
|
||||
* @category Event
|
||||
*/
|
||||
export class SrvPollingEvent {
|
||||
srvRecords: dns.SrvRecord[];
|
||||
constructor(srvRecords: dns.SrvRecord[]) {
|
||||
this.srvRecords = srvRecords;
|
||||
}
|
||||
|
||||
hostnames(): Set<string> {
|
||||
return new Set(this.srvRecords.map(r => HostAddress.fromSrvRecord(r).toString()));
|
||||
}
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface SrvPollerOptions {
|
||||
srvServiceName: string;
|
||||
srvMaxHosts: number;
|
||||
srvHost: string;
|
||||
heartbeatFrequencyMS: number;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export type SrvPollerEvents = {
|
||||
srvRecordDiscovery(event: SrvPollingEvent): void;
|
||||
};
|
||||
|
||||
/** @internal */
|
||||
export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
|
||||
srvHost: string;
|
||||
rescanSrvIntervalMS: number;
|
||||
heartbeatFrequencyMS: number;
|
||||
haMode: boolean;
|
||||
generation: number;
|
||||
srvMaxHosts: number;
|
||||
srvServiceName: string;
|
||||
_timeout?: NodeJS.Timeout;
|
||||
|
||||
/** @event */
|
||||
static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const;
|
||||
|
||||
constructor(options: SrvPollerOptions) {
|
||||
super();
|
||||
this.on('error', noop);
|
||||
|
||||
if (!options || !options.srvHost) {
|
||||
throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost');
|
||||
}
|
||||
|
||||
this.srvHost = options.srvHost;
|
||||
this.srvMaxHosts = options.srvMaxHosts ?? 0;
|
||||
this.srvServiceName = options.srvServiceName ?? 'mongodb';
|
||||
this.rescanSrvIntervalMS = 60000;
|
||||
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 10000;
|
||||
|
||||
this.haMode = false;
|
||||
this.generation = 0;
|
||||
|
||||
this._timeout = undefined;
|
||||
}
|
||||
|
||||
get srvAddress(): string {
|
||||
return `_${this.srvServiceName}._tcp.${this.srvHost}`;
|
||||
}
|
||||
|
||||
get intervalMS(): number {
|
||||
return this.haMode ? this.heartbeatFrequencyMS : this.rescanSrvIntervalMS;
|
||||
}
|
||||
|
||||
start(): void {
|
||||
if (!this._timeout) {
|
||||
this.schedule();
|
||||
}
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
if (this._timeout) {
|
||||
clearTimeout(this._timeout);
|
||||
this.generation += 1;
|
||||
this._timeout = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(NODE-4994): implement new logging logic for SrvPoller failures
|
||||
schedule(): void {
|
||||
if (this._timeout) {
|
||||
clearTimeout(this._timeout);
|
||||
}
|
||||
|
||||
this._timeout = setTimeout(() => {
|
||||
this._poll().then(undefined, squashError);
|
||||
}, this.intervalMS);
|
||||
}
|
||||
|
||||
success(srvRecords: dns.SrvRecord[]): void {
|
||||
this.haMode = false;
|
||||
this.schedule();
|
||||
this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords));
|
||||
}
|
||||
|
||||
failure(): void {
|
||||
this.haMode = true;
|
||||
this.schedule();
|
||||
}
|
||||
|
||||
async _poll(): Promise<void> {
|
||||
const generation = this.generation;
|
||||
let srvRecords;
|
||||
|
||||
try {
|
||||
srvRecords = await dns.promises.resolveSrv(this.srvAddress);
|
||||
} catch {
|
||||
this.failure();
|
||||
return;
|
||||
}
|
||||
|
||||
if (generation !== this.generation) {
|
||||
return;
|
||||
}
|
||||
|
||||
const finalAddresses: dns.SrvRecord[] = [];
|
||||
for (const record of srvRecords) {
|
||||
try {
|
||||
checkParentDomainMatch(record.name, this.srvHost);
|
||||
finalAddresses.push(record);
|
||||
} catch (error) {
|
||||
squashError(error);
|
||||
}
|
||||
}
|
||||
|
||||
if (!finalAddresses.length) {
|
||||
this.failure();
|
||||
return;
|
||||
}
|
||||
|
||||
this.success(finalAddresses);
|
||||
}
|
||||
}
|
||||
1154
node_modules/mongodb/src/sdam/topology.ts
generated
vendored
Normal file
1154
node_modules/mongodb/src/sdam/topology.ts
generated
vendored
Normal file
File diff suppressed because it is too large
Load Diff
548
node_modules/mongodb/src/sdam/topology_description.ts
generated
vendored
Normal file
548
node_modules/mongodb/src/sdam/topology_description.ts
generated
vendored
Normal file
@@ -0,0 +1,548 @@
|
||||
import { EJSON, type ObjectId } from '../bson';
|
||||
import * as WIRE_CONSTANTS from '../cmap/wire_protocol/constants';
|
||||
import { type MongoError, MongoRuntimeError, MongoStalePrimaryError } from '../error';
|
||||
import { compareObjectId, shuffle } from '../utils';
|
||||
import { ServerType, TopologyType } from './common';
|
||||
import { ServerDescription } from './server_description';
|
||||
import type { SrvPollingEvent } from './srv_polling';
|
||||
|
||||
// constants related to compatibility checks
|
||||
const MIN_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_SERVER_VERSION;
|
||||
const MAX_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_SERVER_VERSION;
|
||||
const MIN_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_WIRE_VERSION;
|
||||
const MAX_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_WIRE_VERSION;
|
||||
|
||||
const MONGOS_OR_UNKNOWN = new Set<ServerType>([ServerType.Mongos, ServerType.Unknown]);
|
||||
const MONGOS_OR_STANDALONE = new Set<ServerType>([ServerType.Mongos, ServerType.Standalone]);
|
||||
const NON_PRIMARY_RS_MEMBERS = new Set<ServerType>([
|
||||
ServerType.RSSecondary,
|
||||
ServerType.RSArbiter,
|
||||
ServerType.RSOther
|
||||
]);
|
||||
|
||||
/** @public */
|
||||
export interface TopologyDescriptionOptions {
|
||||
heartbeatFrequencyMS?: number;
|
||||
localThresholdMS?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Representation of a deployment of servers
|
||||
* @public
|
||||
*/
|
||||
export class TopologyDescription {
|
||||
type: TopologyType;
|
||||
setName: string | null;
|
||||
maxSetVersion: number | null;
|
||||
maxElectionId: ObjectId | null;
|
||||
servers: Map<string, ServerDescription>;
|
||||
stale: boolean;
|
||||
compatible: boolean;
|
||||
compatibilityError?: string;
|
||||
logicalSessionTimeoutMinutes: number | null;
|
||||
heartbeatFrequencyMS: number;
|
||||
localThresholdMS: number;
|
||||
commonWireVersion: number;
|
||||
/**
|
||||
* Create a TopologyDescription
|
||||
*/
|
||||
constructor(
|
||||
topologyType: TopologyType,
|
||||
serverDescriptions: Map<string, ServerDescription> | null = null,
|
||||
setName: string | null = null,
|
||||
maxSetVersion: number | null = null,
|
||||
maxElectionId: ObjectId | null = null,
|
||||
commonWireVersion: number | null = null,
|
||||
options: TopologyDescriptionOptions | null = null
|
||||
) {
|
||||
options = options ?? {};
|
||||
|
||||
this.type = topologyType ?? TopologyType.Unknown;
|
||||
this.servers = serverDescriptions ?? new Map();
|
||||
this.stale = false;
|
||||
this.compatible = true;
|
||||
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 0;
|
||||
this.localThresholdMS = options.localThresholdMS ?? 15;
|
||||
this.setName = setName ?? null;
|
||||
this.maxElectionId = maxElectionId ?? null;
|
||||
this.maxSetVersion = maxSetVersion ?? null;
|
||||
this.commonWireVersion = commonWireVersion ?? 0;
|
||||
|
||||
// determine server compatibility
|
||||
for (const serverDescription of this.servers.values()) {
|
||||
// Load balancer mode is always compatible.
|
||||
if (
|
||||
serverDescription.type === ServerType.Unknown ||
|
||||
serverDescription.type === ServerType.LoadBalancer
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) {
|
||||
this.compatible = false;
|
||||
this.compatibilityError = `Server at ${serverDescription.address} requires wire version ${serverDescription.minWireVersion}, but this version of the driver only supports up to ${MAX_SUPPORTED_WIRE_VERSION} (MongoDB ${MAX_SUPPORTED_SERVER_VERSION})`;
|
||||
}
|
||||
|
||||
if (serverDescription.maxWireVersion < MIN_SUPPORTED_WIRE_VERSION) {
|
||||
this.compatible = false;
|
||||
this.compatibilityError = `Server at ${serverDescription.address} reports wire version ${serverDescription.maxWireVersion}, but this version of the driver requires at least ${MIN_SUPPORTED_WIRE_VERSION} (MongoDB ${MIN_SUPPORTED_SERVER_VERSION}).`;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Whenever a client updates the TopologyDescription from a hello response, it MUST set
|
||||
// TopologyDescription.logicalSessionTimeoutMinutes to the smallest logicalSessionTimeoutMinutes
|
||||
// value among ServerDescriptions of all data-bearing server types. If any have a null
|
||||
// logicalSessionTimeoutMinutes, then TopologyDescription.logicalSessionTimeoutMinutes MUST be
|
||||
// set to null.
|
||||
this.logicalSessionTimeoutMinutes = null;
|
||||
for (const [, server] of this.servers) {
|
||||
if (server.isReadable) {
|
||||
if (server.logicalSessionTimeoutMinutes == null) {
|
||||
// If any of the servers have a null logicalSessionsTimeout, then the whole topology does
|
||||
this.logicalSessionTimeoutMinutes = null;
|
||||
break;
|
||||
}
|
||||
|
||||
if (this.logicalSessionTimeoutMinutes == null) {
|
||||
// First server with a non null logicalSessionsTimeout
|
||||
this.logicalSessionTimeoutMinutes = server.logicalSessionTimeoutMinutes;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Always select the smaller of the:
|
||||
// current server logicalSessionsTimeout and the topologies logicalSessionsTimeout
|
||||
this.logicalSessionTimeoutMinutes = Math.min(
|
||||
this.logicalSessionTimeoutMinutes,
|
||||
server.logicalSessionTimeoutMinutes
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new TopologyDescription based on the SrvPollingEvent
|
||||
* @internal
|
||||
*/
|
||||
updateFromSrvPollingEvent(ev: SrvPollingEvent, srvMaxHosts = 0): TopologyDescription {
|
||||
/** The SRV addresses defines the set of addresses we should be using */
|
||||
const incomingHostnames = ev.hostnames();
|
||||
const currentHostnames = new Set(this.servers.keys());
|
||||
|
||||
const hostnamesToAdd = new Set<string>(incomingHostnames);
|
||||
const hostnamesToRemove = new Set<string>();
|
||||
for (const hostname of currentHostnames) {
|
||||
// filter hostnamesToAdd (made from incomingHostnames) down to what is *not* present in currentHostnames
|
||||
hostnamesToAdd.delete(hostname);
|
||||
if (!incomingHostnames.has(hostname)) {
|
||||
// If the SRV Records no longer include this hostname
|
||||
// we have to stop using it
|
||||
hostnamesToRemove.add(hostname);
|
||||
}
|
||||
}
|
||||
|
||||
if (hostnamesToAdd.size === 0 && hostnamesToRemove.size === 0) {
|
||||
// No new hosts to add and none to remove
|
||||
return this;
|
||||
}
|
||||
|
||||
const serverDescriptions = new Map(this.servers);
|
||||
for (const removedHost of hostnamesToRemove) {
|
||||
serverDescriptions.delete(removedHost);
|
||||
}
|
||||
|
||||
if (hostnamesToAdd.size > 0) {
|
||||
if (srvMaxHosts === 0) {
|
||||
// Add all!
|
||||
for (const hostToAdd of hostnamesToAdd) {
|
||||
serverDescriptions.set(hostToAdd, new ServerDescription(hostToAdd));
|
||||
}
|
||||
} else if (serverDescriptions.size < srvMaxHosts) {
|
||||
// Add only the amount needed to get us back to srvMaxHosts
|
||||
const selectedHosts = shuffle(hostnamesToAdd, srvMaxHosts - serverDescriptions.size);
|
||||
for (const selectedHostToAdd of selectedHosts) {
|
||||
serverDescriptions.set(selectedHostToAdd, new ServerDescription(selectedHostToAdd));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new TopologyDescription(
|
||||
this.type,
|
||||
serverDescriptions,
|
||||
this.setName,
|
||||
this.maxSetVersion,
|
||||
this.maxElectionId,
|
||||
this.commonWireVersion,
|
||||
{ heartbeatFrequencyMS: this.heartbeatFrequencyMS, localThresholdMS: this.localThresholdMS }
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of this description updated with a given ServerDescription
|
||||
* @internal
|
||||
*/
|
||||
update(serverDescription: ServerDescription): TopologyDescription {
|
||||
const address = serverDescription.address;
|
||||
|
||||
// potentially mutated values
|
||||
let { type: topologyType, setName, maxSetVersion, maxElectionId, commonWireVersion } = this;
|
||||
|
||||
const serverType = serverDescription.type;
|
||||
const serverDescriptions = new Map(this.servers);
|
||||
|
||||
// update common wire version
|
||||
if (serverDescription.maxWireVersion !== 0) {
|
||||
if (commonWireVersion == null) {
|
||||
commonWireVersion = serverDescription.maxWireVersion;
|
||||
} else {
|
||||
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
typeof serverDescription.setName === 'string' &&
|
||||
typeof setName === 'string' &&
|
||||
serverDescription.setName !== setName
|
||||
) {
|
||||
if (topologyType === TopologyType.Single) {
|
||||
// "Single" Topology with setName mismatch is direct connection usage, mark unknown do not remove
|
||||
serverDescription = new ServerDescription(address);
|
||||
} else {
|
||||
serverDescriptions.delete(address);
|
||||
}
|
||||
}
|
||||
|
||||
// update the actual server description
|
||||
serverDescriptions.set(address, serverDescription);
|
||||
|
||||
if (topologyType === TopologyType.Single) {
|
||||
// once we are defined as single, that never changes
|
||||
return new TopologyDescription(
|
||||
TopologyType.Single,
|
||||
serverDescriptions,
|
||||
setName,
|
||||
maxSetVersion,
|
||||
maxElectionId,
|
||||
commonWireVersion,
|
||||
{ heartbeatFrequencyMS: this.heartbeatFrequencyMS, localThresholdMS: this.localThresholdMS }
|
||||
);
|
||||
}
|
||||
|
||||
if (topologyType === TopologyType.Unknown) {
|
||||
if (serverType === ServerType.Standalone && this.servers.size !== 1) {
|
||||
serverDescriptions.delete(address);
|
||||
} else {
|
||||
topologyType = topologyTypeForServerType(serverType);
|
||||
}
|
||||
}
|
||||
|
||||
if (topologyType === TopologyType.Sharded) {
|
||||
if (!MONGOS_OR_UNKNOWN.has(serverType)) {
|
||||
serverDescriptions.delete(address);
|
||||
}
|
||||
}
|
||||
|
||||
if (topologyType === TopologyType.ReplicaSetNoPrimary) {
|
||||
if (MONGOS_OR_STANDALONE.has(serverType)) {
|
||||
serverDescriptions.delete(address);
|
||||
}
|
||||
|
||||
if (serverType === ServerType.RSPrimary) {
|
||||
const result = updateRsFromPrimary(
|
||||
serverDescriptions,
|
||||
serverDescription,
|
||||
setName,
|
||||
maxSetVersion,
|
||||
maxElectionId
|
||||
);
|
||||
|
||||
topologyType = result[0];
|
||||
setName = result[1];
|
||||
maxSetVersion = result[2];
|
||||
maxElectionId = result[3];
|
||||
} else if (NON_PRIMARY_RS_MEMBERS.has(serverType)) {
|
||||
const result = updateRsNoPrimaryFromMember(serverDescriptions, serverDescription, setName);
|
||||
topologyType = result[0];
|
||||
setName = result[1];
|
||||
}
|
||||
}
|
||||
|
||||
if (topologyType === TopologyType.ReplicaSetWithPrimary) {
|
||||
if (MONGOS_OR_STANDALONE.has(serverType)) {
|
||||
serverDescriptions.delete(address);
|
||||
topologyType = checkHasPrimary(serverDescriptions);
|
||||
} else if (serverType === ServerType.RSPrimary) {
|
||||
const result = updateRsFromPrimary(
|
||||
serverDescriptions,
|
||||
serverDescription,
|
||||
setName,
|
||||
maxSetVersion,
|
||||
maxElectionId
|
||||
);
|
||||
|
||||
topologyType = result[0];
|
||||
setName = result[1];
|
||||
maxSetVersion = result[2];
|
||||
maxElectionId = result[3];
|
||||
} else if (NON_PRIMARY_RS_MEMBERS.has(serverType)) {
|
||||
topologyType = updateRsWithPrimaryFromMember(
|
||||
serverDescriptions,
|
||||
serverDescription,
|
||||
setName
|
||||
);
|
||||
} else {
|
||||
topologyType = checkHasPrimary(serverDescriptions);
|
||||
}
|
||||
}
|
||||
|
||||
return new TopologyDescription(
|
||||
topologyType,
|
||||
serverDescriptions,
|
||||
setName,
|
||||
maxSetVersion,
|
||||
maxElectionId,
|
||||
commonWireVersion,
|
||||
{ heartbeatFrequencyMS: this.heartbeatFrequencyMS, localThresholdMS: this.localThresholdMS }
|
||||
);
|
||||
}
|
||||
|
||||
get error(): MongoError | null {
|
||||
const descriptionsWithError = Array.from(this.servers.values()).filter(
|
||||
(sd: ServerDescription) => sd.error
|
||||
);
|
||||
|
||||
if (descriptionsWithError.length > 0) {
|
||||
return descriptionsWithError[0].error;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the topology description has any known servers
|
||||
*/
|
||||
get hasKnownServers(): boolean {
|
||||
return Array.from(this.servers.values()).some(
|
||||
(sd: ServerDescription) => sd.type !== ServerType.Unknown
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if this topology description has a data-bearing server available.
|
||||
*/
|
||||
get hasDataBearingServers(): boolean {
|
||||
return Array.from(this.servers.values()).some((sd: ServerDescription) => sd.isDataBearing);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the topology has a definition for the provided address
|
||||
* @internal
|
||||
*/
|
||||
hasServer(address: string): boolean {
|
||||
return this.servers.has(address);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a JSON-serializable representation of the TopologyDescription. This is primarily
|
||||
* intended for use with JSON.stringify().
|
||||
*
|
||||
* This method will not throw.
|
||||
*/
|
||||
toJSON() {
|
||||
return EJSON.serialize(this);
|
||||
}
|
||||
}
|
||||
|
||||
function topologyTypeForServerType(serverType: ServerType): TopologyType {
|
||||
switch (serverType) {
|
||||
case ServerType.Standalone:
|
||||
return TopologyType.Single;
|
||||
case ServerType.Mongos:
|
||||
return TopologyType.Sharded;
|
||||
case ServerType.RSPrimary:
|
||||
return TopologyType.ReplicaSetWithPrimary;
|
||||
case ServerType.RSOther:
|
||||
case ServerType.RSSecondary:
|
||||
return TopologyType.ReplicaSetNoPrimary;
|
||||
default:
|
||||
return TopologyType.Unknown;
|
||||
}
|
||||
}
|
||||
|
||||
function updateRsFromPrimary(
|
||||
serverDescriptions: Map<string, ServerDescription>,
|
||||
serverDescription: ServerDescription,
|
||||
setName: string | null = null,
|
||||
maxSetVersion: number | null = null,
|
||||
maxElectionId: ObjectId | null = null
|
||||
): [TopologyType, string | null, number | null, ObjectId | null] {
|
||||
const setVersionElectionIdMismatch = (
|
||||
serverDescription: ServerDescription,
|
||||
maxSetVersion: number | null,
|
||||
maxElectionId: ObjectId | null
|
||||
) => {
|
||||
return (
|
||||
`primary marked stale due to electionId/setVersion mismatch:` +
|
||||
` server setVersion: ${serverDescription.setVersion},` +
|
||||
` server electionId: ${serverDescription.electionId},` +
|
||||
` topology setVersion: ${maxSetVersion},` +
|
||||
` topology electionId: ${maxElectionId}`
|
||||
);
|
||||
};
|
||||
setName = setName || serverDescription.setName;
|
||||
if (setName !== serverDescription.setName) {
|
||||
serverDescriptions.delete(serverDescription.address);
|
||||
return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
|
||||
}
|
||||
|
||||
if (serverDescription.maxWireVersion >= 17) {
|
||||
const electionIdComparison = compareObjectId(maxElectionId, serverDescription.electionId);
|
||||
const maxElectionIdIsEqual = electionIdComparison === 0;
|
||||
const maxElectionIdIsLess = electionIdComparison === -1;
|
||||
const maxSetVersionIsLessOrEqual =
|
||||
(maxSetVersion ?? -1) <= (serverDescription.setVersion ?? -1);
|
||||
|
||||
if (maxElectionIdIsLess || (maxElectionIdIsEqual && maxSetVersionIsLessOrEqual)) {
|
||||
// The reported electionId was greater
|
||||
// or the electionId was equal and reported setVersion was greater
|
||||
// Always update both values, they are a tuple
|
||||
maxElectionId = serverDescription.electionId;
|
||||
maxSetVersion = serverDescription.setVersion;
|
||||
} else {
|
||||
// Stale primary
|
||||
// replace serverDescription with a default ServerDescription of type "Unknown"
|
||||
serverDescriptions.set(
|
||||
serverDescription.address,
|
||||
new ServerDescription(serverDescription.address, undefined, {
|
||||
error: new MongoStalePrimaryError(
|
||||
setVersionElectionIdMismatch(serverDescription, maxSetVersion, maxElectionId)
|
||||
)
|
||||
})
|
||||
);
|
||||
|
||||
return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
|
||||
}
|
||||
} else {
|
||||
const electionId = serverDescription.electionId ? serverDescription.electionId : null;
|
||||
if (serverDescription.setVersion && electionId) {
|
||||
if (maxSetVersion && maxElectionId) {
|
||||
if (
|
||||
maxSetVersion > serverDescription.setVersion ||
|
||||
compareObjectId(maxElectionId, electionId) > 0
|
||||
) {
|
||||
// this primary is stale, we must remove it
|
||||
serverDescriptions.set(
|
||||
serverDescription.address,
|
||||
new ServerDescription(serverDescription.address, undefined, {
|
||||
error: new MongoStalePrimaryError(
|
||||
setVersionElectionIdMismatch(serverDescription, maxSetVersion, maxElectionId)
|
||||
)
|
||||
})
|
||||
);
|
||||
|
||||
return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
|
||||
}
|
||||
}
|
||||
|
||||
maxElectionId = serverDescription.electionId;
|
||||
}
|
||||
|
||||
if (
|
||||
serverDescription.setVersion != null &&
|
||||
(maxSetVersion == null || serverDescription.setVersion > maxSetVersion)
|
||||
) {
|
||||
maxSetVersion = serverDescription.setVersion;
|
||||
}
|
||||
}
|
||||
|
||||
// We've heard from the primary. Is it the same primary as before?
|
||||
for (const [address, server] of serverDescriptions) {
|
||||
if (server.type === ServerType.RSPrimary && server.address !== serverDescription.address) {
|
||||
// Reset old primary's type to Unknown.
|
||||
serverDescriptions.set(
|
||||
address,
|
||||
new ServerDescription(server.address, undefined, {
|
||||
error: new MongoStalePrimaryError(
|
||||
'primary marked stale due to discovery of newer primary'
|
||||
)
|
||||
})
|
||||
);
|
||||
|
||||
// There can only be one primary
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Discover new hosts from this primary's response.
|
||||
serverDescription.allHosts.forEach((address: string) => {
|
||||
if (!serverDescriptions.has(address)) {
|
||||
serverDescriptions.set(address, new ServerDescription(address));
|
||||
}
|
||||
});
|
||||
|
||||
// Remove hosts not in the response.
|
||||
const currentAddresses = Array.from(serverDescriptions.keys());
|
||||
const responseAddresses = serverDescription.allHosts;
|
||||
currentAddresses
|
||||
.filter((addr: string) => responseAddresses.indexOf(addr) === -1)
|
||||
.forEach((address: string) => {
|
||||
serverDescriptions.delete(address);
|
||||
});
|
||||
|
||||
return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
|
||||
}
|
||||
|
||||
function updateRsWithPrimaryFromMember(
|
||||
serverDescriptions: Map<string, ServerDescription>,
|
||||
serverDescription: ServerDescription,
|
||||
setName: string | null = null
|
||||
): TopologyType {
|
||||
if (setName == null) {
|
||||
// TODO(NODE-3483): should be an appropriate runtime error
|
||||
throw new MongoRuntimeError('Argument "setName" is required if connected to a replica set');
|
||||
}
|
||||
|
||||
if (
|
||||
setName !== serverDescription.setName ||
|
||||
(serverDescription.me && serverDescription.address !== serverDescription.me)
|
||||
) {
|
||||
serverDescriptions.delete(serverDescription.address);
|
||||
}
|
||||
|
||||
return checkHasPrimary(serverDescriptions);
|
||||
}
|
||||
|
||||
function updateRsNoPrimaryFromMember(
|
||||
serverDescriptions: Map<string, ServerDescription>,
|
||||
serverDescription: ServerDescription,
|
||||
setName: string | null = null
|
||||
): [TopologyType, string | null] {
|
||||
const topologyType = TopologyType.ReplicaSetNoPrimary;
|
||||
setName = setName ?? serverDescription.setName;
|
||||
if (setName !== serverDescription.setName) {
|
||||
serverDescriptions.delete(serverDescription.address);
|
||||
return [topologyType, setName];
|
||||
}
|
||||
|
||||
serverDescription.allHosts.forEach((address: string) => {
|
||||
if (!serverDescriptions.has(address)) {
|
||||
serverDescriptions.set(address, new ServerDescription(address));
|
||||
}
|
||||
});
|
||||
|
||||
if (serverDescription.me && serverDescription.address !== serverDescription.me) {
|
||||
serverDescriptions.delete(serverDescription.address);
|
||||
}
|
||||
|
||||
return [topologyType, setName];
|
||||
}
|
||||
|
||||
function checkHasPrimary(serverDescriptions: Map<string, ServerDescription>): TopologyType {
|
||||
for (const serverDescription of serverDescriptions.values()) {
|
||||
if (serverDescription.type === ServerType.RSPrimary) {
|
||||
return TopologyType.ReplicaSetWithPrimary;
|
||||
}
|
||||
}
|
||||
|
||||
return TopologyType.ReplicaSetNoPrimary;
|
||||
}
|
||||
Reference in New Issue
Block a user