PYTHON-4021 - Publish TopologyDescriptionChangedEvent on topology close (#1762)

This commit is contained in:
Noah Stapp 2024-08-02 09:42:49 -07:00 committed by GitHub
parent 5699f8029d
commit 294f10b724
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 595 additions and 121 deletions

View File

@ -664,9 +664,31 @@ class Topology:
# Publish only after releasing the lock.
if self._publish_tp:
assert self._events is not None
old_td = self._description
self._description = TopologyDescription(
TOPOLOGY_TYPE.Unknown,
{},
self._description.replica_set_name,
self._description.max_set_version,
self._description.max_election_id,
self._description._topology_settings,
)
self._events.put(
(
self._listeners.publish_topology_description_changed,
(
old_td,
self._description,
self._topology_id,
),
)
)
self._events.put((self._listeners.publish_topology_closed, (self._topology_id,)))
if self._publish_server or self._publish_tp:
# Make sure the events executor thread is fully closed before publishing the remaining events
self.__events_executor.close()
self.__events_executor.join(1)
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]
@property
def description(self) -> TopologyDescription:

View File

@ -662,9 +662,31 @@ class Topology:
# Publish only after releasing the lock.
if self._publish_tp:
assert self._events is not None
old_td = self._description
self._description = TopologyDescription(
TOPOLOGY_TYPE.Unknown,
{},
self._description.replica_set_name,
self._description.max_set_version,
self._description.max_election_id,
self._description._topology_settings,
)
self._events.put(
(
self._listeners.publish_topology_description_changed,
(
old_td,
self._description,
self._topology_id,
),
)
)
self._events.put((self._listeners.publish_topology_closed, (self._topology_id,)))
if self._publish_server or self._publish_tp:
# Make sure the events executor thread is fully closed before publishing the remaining events
self.__events_executor.close()
self.__events_executor.join(1)
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]
@property
def description(self) -> TopologyDescription:

View File

@ -16,7 +16,7 @@
"b:27017"
],
"minWireVersion": 0,
"maxWireVersion": 6
"maxWireVersion": 21
}
],
[

View File

@ -16,7 +16,7 @@
"b:27017"
],
"minWireVersion": 0,
"maxWireVersion": 6
"maxWireVersion": 21
}
]
],

View File

@ -23,7 +23,7 @@
"isWritablePrimary": true,
"msg": "isdbgrid",
"minWireVersion": 0,
"maxWireVersion": 6
"maxWireVersion": 21
}
]
],

View File

@ -11,7 +11,7 @@
"helloOk": true,
"isWritablePrimary": true,
"minWireVersion": 0,
"maxWireVersion": 6
"maxWireVersion": 21
}
]
],

View File

@ -1,5 +1,5 @@
{
"description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 6",
"description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 21",
"uri": "mongodb://a",
"phases": [
{
@ -35,7 +35,7 @@
"helloOk": true,
"isWritablePrimary": true,
"minWireVersion": 0,
"maxWireVersion": 6
"maxWireVersion": 21
}
]
],

View File

@ -4,11 +4,11 @@
"runOnRequirements": [
{
"minServerVersion": "4.9",
"serverless": "forbid",
"topologies": [
"replicaset",
"sharded"
],
"serverless": "forbid"
]
}
],
"createEntities": [
@ -39,13 +39,6 @@
"client": {
"id": "client",
"useMultipleMongoses": false,
"uriOptions": {
"connectTimeoutMS": 500,
"heartbeatFrequencyMS": 500,
"appname": "interruptInUse",
"retryReads": false,
"minPoolSize": 0
},
"observeEvents": [
"poolClearedEvent",
"connectionClosedEvent",
@ -54,7 +47,14 @@
"commandFailedEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
]
],
"uriOptions": {
"connectTimeoutMS": 500,
"heartbeatFrequencyMS": 500,
"appname": "interruptInUse",
"retryReads": false,
"minPoolSize": 0
}
}
},
{
@ -83,7 +83,9 @@
"name": "insertOne",
"object": "collection",
"arguments": {
"document": { "_id" : 1 }
"document": {
"_id": 1
}
}
},
{
@ -92,14 +94,16 @@
"arguments": {
"thread": "thread1",
"operation": {
"name": "find",
"object": "collection",
"arguments": {
"filter": { "$where": "sleep(2000) || true" }
},
"expectError": {
"isError": true
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$where": "sleep(2000) || true"
}
},
"expectError": {
"isError": true
}
}
}
},
@ -107,6 +111,7 @@
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "setupClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
@ -114,22 +119,22 @@
},
"data": {
"failCommands": [
"hello", "isMaster"
"hello",
"isMaster"
],
"blockConnection": true,
"blockTimeMS": 1500,
"appName": "interruptInUse"
}
},
"client": "setupClient"
}
}
},
{
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
}
],
"expectEvents": [
@ -157,20 +162,20 @@
"commandName": "find"
}
}
]
},
{
]
},
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"connectionCheckedInEvent": { }
"connectionCheckedInEvent": {}
},
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"poolClearedEvent": {
@ -181,16 +186,22 @@
"connectionCheckedInEvent": {}
},
{
"connectionClosedEvent": { }
"connectionClosedEvent": {}
}
]
}
]
}
],
"outcome": [{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [{ "_id": 1 }]
}]
"outcome": [
{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [
{
"_id": 1
}
]
}
]
},
{
"description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable",
@ -204,22 +215,22 @@
"client": {
"id": "client",
"useMultipleMongoses": false,
"observeEvents": [
"poolClearedEvent",
"connectionClosedEvent",
"commandStartedEvent",
"commandFailedEvent",
"commandSucceededEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
],
"uriOptions": {
"connectTimeoutMS": 500,
"heartbeatFrequencyMS": 500,
"appname": "interruptInUseRetryable",
"retryReads": true,
"minPoolSize": 0
},
"observeEvents": [
"poolClearedEvent",
"connectionClosedEvent",
"commandFailedEvent",
"commandStartedEvent",
"commandSucceededEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
]
}
}
},
{
@ -248,7 +259,9 @@
"name": "insertOne",
"object": "collection",
"arguments": {
"document": { "_id" : 1 }
"document": {
"_id": 1
}
}
},
{
@ -257,11 +270,13 @@
"arguments": {
"thread": "thread1",
"operation": {
"name": "find",
"object": "collection",
"arguments": {
"filter": { "$where": "sleep(2000) || true" }
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$where": "sleep(2000) || true"
}
}
}
}
},
@ -269,6 +284,7 @@
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "setupClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
@ -276,22 +292,22 @@
},
"data": {
"failCommands": [
"hello", "isMaster"
"hello",
"isMaster"
],
"blockConnection": true,
"blockTimeMS": 1500,
"appName": "interruptInUseRetryable"
}
},
"client": "setupClient"
}
}
},
{
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
}
],
"expectEvents": [
@ -329,20 +345,20 @@
"commandName": "find"
}
}
]
},
{
]
},
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"connectionCheckedInEvent": { }
"connectionCheckedInEvent": {}
},
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"poolClearedEvent": {
@ -353,7 +369,7 @@
"connectionCheckedInEvent": {}
},
{
"connectionClosedEvent": { }
"connectionClosedEvent": {}
},
{
"connectionCheckedOutEvent": {}
@ -361,14 +377,20 @@
{
"connectionCheckedInEvent": {}
}
]
}
]
}
],
"outcome": [{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [{ "_id": 1 }]
}]
"outcome": [
{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [
{
"_id": 1
}
]
}
]
},
{
"description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write",
@ -382,22 +404,23 @@
"client": {
"id": "client",
"useMultipleMongoses": false,
"observeEvents": [
"poolClearedEvent",
"connectionClosedEvent",
"commandStartedEvent",
"commandFailedEvent",
"commandSucceededEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
],
"uriOptions": {
"connectTimeoutMS": 500,
"heartbeatFrequencyMS": 500,
"appname": "interruptInUseRetryableWrite",
"retryWrites": true,
"minPoolSize": 0
},
"observeEvents": [
"poolClearedEvent",
"connectionClosedEvent",
"commandFailedEvent",
"commandStartedEvent",
"commandSucceededEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
]}
}
}
},
{
"database": {
@ -425,7 +448,9 @@
"name": "insertOne",
"object": "collection",
"arguments": {
"document": { "_id": 1 }
"document": {
"_id": 1
}
}
},
{
@ -437,9 +462,15 @@
"name": "updateOne",
"object": "collection",
"arguments": {
"filter": { "$where": "sleep(2000) || true" },
"update": [ { "$set": { "a": "bar" } } ]
}
"filter": {
"$where": "sleep(2000) || true"
},
"update": {
"$set": {
"a": "bar"
}
}
}
}
}
},
@ -447,6 +478,7 @@
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "setupClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
@ -454,22 +486,22 @@
},
"data": {
"failCommands": [
"hello", "isMaster"
"hello",
"isMaster"
],
"blockConnection": true,
"blockTimeMS": 1500,
"appName": "interruptInUseRetryableWrite"
}
},
"client": "setupClient"
}
}
},
{
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread1"
}
}
],
"expectEvents": [
@ -507,20 +539,20 @@
"commandName": "update"
}
}
]
},
{
]
},
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"connectionCheckedInEvent": { }
"connectionCheckedInEvent": {}
},
{
"connectionCheckedOutEvent": { }
"connectionCheckedOutEvent": {}
},
{
"poolClearedEvent": {
@ -531,7 +563,7 @@
"connectionCheckedInEvent": {}
},
{
"connectionClosedEvent": { }
"connectionClosedEvent": {}
},
{
"connectionCheckedOutEvent": {}
@ -539,14 +571,21 @@
{
"connectionCheckedInEvent": {}
}
]
}
]
}
],
"outcome": [{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [{ "_id": 1, "a" : "bar"}]
}]
"outcome": [
{
"collectionName": "interruptInUse",
"databaseName": "sdam-tests",
"documents": [
{
"_id": 1,
"a": "bar"
}
]
}
]
}
]
}
}

View File

@ -0,0 +1,88 @@
{
"description": "loadbalanced-emit-topology-description-changed-before-close",
"schemaVersion": "1.20",
"runOnRequirements": [
{
"topologies": [
"load-balanced"
],
"minServerVersion": "4.4"
}
],
"tests": [
{
"description": "Topology lifecycle",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"topologyDescriptionChangedEvent",
"topologyOpeningEvent",
"topologyClosedEvent"
]
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 2
}
},
{
"name": "close",
"object": "client"
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"events": [
{
"topologyOpeningEvent": {}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Unknown"
},
"newDescription": {}
}
},
{
"topologyDescriptionChangedEvent": {
"newDescription": {
"type": "LoadBalanced"
}
}
},
{
"topologyDescriptionChangedEvent": {
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyClosedEvent": {}
}
]
}
]
}
]
}

View File

@ -0,0 +1,89 @@
{
"description": "replicaset-emit-topology-description-changed-before-close",
"schemaVersion": "1.20",
"runOnRequirements": [
{
"topologies": [
"replicaset"
],
"minServerVersion": "4.4"
}
],
"tests": [
{
"description": "Topology lifecycle",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"topologyDescriptionChangedEvent",
"topologyOpeningEvent",
"topologyClosedEvent"
]
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 4
}
},
{
"name": "close",
"object": "client"
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": false,
"events": [
{
"topologyOpeningEvent": {}
},
{
"topologyDescriptionChangedEvent": {}
},
{
"topologyDescriptionChangedEvent": {}
},
{
"topologyDescriptionChangedEvent": {}
},
{
"topologyDescriptionChangedEvent": {}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "ReplicaSetWithPrimary"
},
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyClosedEvent": {}
}
]
}
]
}
]
}

View File

@ -0,0 +1,108 @@
{
"description": "sharded-emit-topology-description-changed-before-close",
"schemaVersion": "1.20",
"runOnRequirements": [
{
"topologies": [
"sharded"
],
"minServerVersion": "4.4"
}
],
"tests": [
{
"description": "Topology lifecycle",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"topologyDescriptionChangedEvent",
"topologyOpeningEvent",
"topologyClosedEvent"
],
"useMultipleMongoses": true
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 3
}
},
{
"name": "close",
"object": "client"
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": false,
"events": [
{
"topologyOpeningEvent": {}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Unknown"
},
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Unknown"
},
"newDescription": {
"type": "Sharded"
}
}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Sharded"
},
"newDescription": {
"type": "Sharded"
}
}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Sharded"
},
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyClosedEvent": {}
}
]
}
]
}
]
}

View File

@ -0,0 +1,97 @@
{
"description": "standalone-emit-topology-description-changed-before-close",
"schemaVersion": "1.20",
"runOnRequirements": [
{
"topologies": [
"single"
],
"minServerVersion": "4.4"
}
],
"tests": [
{
"description": "Topology lifecycle",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"topologyDescriptionChangedEvent",
"topologyOpeningEvent",
"topologyClosedEvent"
]
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 2
}
},
{
"name": "close",
"object": "client"
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": false,
"events": [
{
"topologyOpeningEvent": {}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Unknown"
},
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Unknown"
},
"newDescription": {
"type": "Single"
}
}
},
{
"topologyDescriptionChangedEvent": {
"previousDescription": {
"type": "Single"
},
"newDescription": {
"type": "Unknown"
}
}
},
{
"topologyClosedEvent": {}
}
]
}
]
}
]
}

View File

@ -122,7 +122,7 @@ def compare_events(expected_dict, actual):
elif expected_type == "topology_opening_event":
if not isinstance(actual, monitoring.TopologyOpenedEvent):
return False, "Expected TopologyOpeningEvent, got %s" % (actual.__class__)
return False, "Expected TopologyOpenedEvent, got %s" % (actual.__class__)
elif expected_type == "topology_description_changed_event":
if not isinstance(actual, monitoring.TopologyDescriptionChangedEvent):

View File

@ -475,6 +475,11 @@ class EntityMapUtil:
if entity_type == "client":
kwargs: dict = {}
observe_events = spec.get("observeEvents", [])
# The unified tests use topologyOpeningEvent, we use topologyOpenedEvent
for i in range(len(observe_events)):
if "topologyOpeningEvent" == observe_events[i]:
observe_events[i] = "topologyOpenedEvent"
ignore_commands = spec.get("ignoreCommandMonitoringEvents", [])
observe_sensitive_commands = spec.get("observeSensitiveCommands", False)
ignore_commands = [cmd.lower() for cmd in ignore_commands]
@ -924,6 +929,10 @@ class MatchEvaluatorUtil:
self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent)
elif name == "topologyDescriptionChangedEvent":
self.test.assertIsInstance(actual, TopologyDescriptionChangedEvent)
elif name == "topologyOpeningEvent":
self.test.assertIsInstance(actual, TopologyOpenedEvent)
elif name == "topologyClosedEvent":
self.test.assertIsInstance(actual, TopologyClosedEvent)
else:
raise Exception(f"Unsupported event type {name}")