From 294f10b72423fda7aefd0167471d6109b0dc548f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 2 Aug 2024 09:42:49 -0700 Subject: [PATCH] PYTHON-4021 - Publish TopologyDescriptionChangedEvent on topology close (#1762) --- pymongo/asynchronous/topology.py | 22 ++ pymongo/synchronous/topology.py | 22 ++ .../rs/compatible.json | 2 +- .../rs/compatible_unknown.json | 2 +- .../sharded/compatible.json | 2 +- .../single/compatible.json | 2 +- .../single/too_old_then_upgraded.json | 4 +- .../unified/interruptInUse-pool-clear.json | 267 ++++++++++-------- ...ed-emit-topology-changed-before-close.json | 88 ++++++ ...et-emit-topology-changed-before-close.json | 89 ++++++ ...ed-emit-topology-changed-before-close.json | 108 +++++++ ...ne-emit-topology-changed-before-close.json | 97 +++++++ test/test_sdam_monitoring_spec.py | 2 +- test/unified_format.py | 9 + 14 files changed, 595 insertions(+), 121 deletions(-) create mode 100644 test/discovery_and_monitoring/unified/loadbalanced-emit-topology-changed-before-close.json create mode 100644 test/discovery_and_monitoring/unified/replicaset-emit-topology-changed-before-close.json create mode 100644 test/discovery_and_monitoring/unified/sharded-emit-topology-changed-before-close.json create mode 100644 test/discovery_and_monitoring/unified/standalone-emit-topology-changed-before-close.json diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 16cdd0eba..183c459f2 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -664,9 +664,31 @@ class Topology: # Publish only after releasing the lock. if self._publish_tp: assert self._events is not None + old_td = self._description + self._description = TopologyDescription( + TOPOLOGY_TYPE.Unknown, + {}, + self._description.replica_set_name, + self._description.max_set_version, + self._description.max_election_id, + self._description._topology_settings, + ) + self._events.put( + ( + self._listeners.publish_topology_description_changed, + ( + old_td, + self._description, + self._topology_id, + ), + ) + ) self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) if self._publish_server or self._publish_tp: + # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() + self.__events_executor.join(1) + process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] @property def description(self) -> TopologyDescription: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index b2c102ae0..eda5f01d3 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -662,9 +662,31 @@ class Topology: # Publish only after releasing the lock. if self._publish_tp: assert self._events is not None + old_td = self._description + self._description = TopologyDescription( + TOPOLOGY_TYPE.Unknown, + {}, + self._description.replica_set_name, + self._description.max_set_version, + self._description.max_election_id, + self._description._topology_settings, + ) + self._events.put( + ( + self._listeners.publish_topology_description_changed, + ( + old_td, + self._description, + self._topology_id, + ), + ) + ) self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) if self._publish_server or self._publish_tp: + # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() + self.__events_executor.join(1) + process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] @property def description(self) -> TopologyDescription: diff --git a/test/discovery_and_monitoring/rs/compatible.json b/test/discovery_and_monitoring/rs/compatible.json index 444b13e9d..dfd5d57df 100644 --- a/test/discovery_and_monitoring/rs/compatible.json +++ b/test/discovery_and_monitoring/rs/compatible.json @@ -16,7 +16,7 @@ "b:27017" ], "minWireVersion": 0, - "maxWireVersion": 6 + "maxWireVersion": 21 } ], [ diff --git a/test/discovery_and_monitoring/rs/compatible_unknown.json b/test/discovery_and_monitoring/rs/compatible_unknown.json index cf92dd1ed..95e03ea95 100644 --- a/test/discovery_and_monitoring/rs/compatible_unknown.json +++ b/test/discovery_and_monitoring/rs/compatible_unknown.json @@ -16,7 +16,7 @@ "b:27017" ], "minWireVersion": 0, - "maxWireVersion": 6 + "maxWireVersion": 21 } ] ], diff --git a/test/discovery_and_monitoring/sharded/compatible.json b/test/discovery_and_monitoring/sharded/compatible.json index e531db97f..ceb0ec24c 100644 --- a/test/discovery_and_monitoring/sharded/compatible.json +++ b/test/discovery_and_monitoring/sharded/compatible.json @@ -23,7 +23,7 @@ "isWritablePrimary": true, "msg": "isdbgrid", "minWireVersion": 0, - "maxWireVersion": 6 + "maxWireVersion": 21 } ] ], diff --git a/test/discovery_and_monitoring/single/compatible.json b/test/discovery_and_monitoring/single/compatible.json index 302927598..493d9b748 100644 --- a/test/discovery_and_monitoring/single/compatible.json +++ b/test/discovery_and_monitoring/single/compatible.json @@ -11,7 +11,7 @@ "helloOk": true, "isWritablePrimary": true, "minWireVersion": 0, - "maxWireVersion": 6 + "maxWireVersion": 21 } ] ], diff --git a/test/discovery_and_monitoring/single/too_old_then_upgraded.json b/test/discovery_and_monitoring/single/too_old_then_upgraded.json index 58ae7d9de..c3dd98cf6 100644 --- a/test/discovery_and_monitoring/single/too_old_then_upgraded.json +++ b/test/discovery_and_monitoring/single/too_old_then_upgraded.json @@ -1,5 +1,5 @@ { - "description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 6", + "description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 21", "uri": "mongodb://a", "phases": [ { @@ -35,7 +35,7 @@ "helloOk": true, "isWritablePrimary": true, "minWireVersion": 0, - "maxWireVersion": 6 + "maxWireVersion": 21 } ] ], diff --git a/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json b/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json index 6fdef55b4..a20d79030 100644 --- a/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json +++ b/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json @@ -4,11 +4,11 @@ "runOnRequirements": [ { "minServerVersion": "4.9", + "serverless": "forbid", "topologies": [ "replicaset", "sharded" - ], - "serverless": "forbid" + ] } ], "createEntities": [ @@ -39,13 +39,6 @@ "client": { "id": "client", "useMultipleMongoses": false, - "uriOptions": { - "connectTimeoutMS": 500, - "heartbeatFrequencyMS": 500, - "appname": "interruptInUse", - "retryReads": false, - "minPoolSize": 0 - }, "observeEvents": [ "poolClearedEvent", "connectionClosedEvent", @@ -54,7 +47,14 @@ "commandFailedEvent", "connectionCheckedOutEvent", "connectionCheckedInEvent" - ] + ], + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUse", + "retryReads": false, + "minPoolSize": 0 + } } }, { @@ -83,7 +83,9 @@ "name": "insertOne", "object": "collection", "arguments": { - "document": { "_id" : 1 } + "document": { + "_id": 1 + } } }, { @@ -92,14 +94,16 @@ "arguments": { "thread": "thread1", "operation": { - "name": "find", - "object": "collection", - "arguments": { - "filter": { "$where": "sleep(2000) || true" } - }, - "expectError": { - "isError": true + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "$where": "sleep(2000) || true" } + }, + "expectError": { + "isError": true + } } } }, @@ -107,6 +111,7 @@ "name": "failPoint", "object": "testRunner", "arguments": { + "client": "setupClient", "failPoint": { "configureFailPoint": "failCommand", "mode": { @@ -114,22 +119,22 @@ }, "data": { "failCommands": [ - "hello", "isMaster" + "hello", + "isMaster" ], "blockConnection": true, "blockTimeMS": 1500, "appName": "interruptInUse" } - }, - "client": "setupClient" + } } }, { - "name": "waitForThread", - "object": "testRunner", - "arguments": { - "thread": "thread1" - } + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } } ], "expectEvents": [ @@ -157,20 +162,20 @@ "commandName": "find" } } - ] - }, - { + ] + }, + { "client": "client", "eventType": "cmap", "events": [ { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { - "connectionCheckedInEvent": { } + "connectionCheckedInEvent": {} }, { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { "poolClearedEvent": { @@ -181,16 +186,22 @@ "connectionCheckedInEvent": {} }, { - "connectionClosedEvent": { } + "connectionClosedEvent": {} } - ] - } + ] + } ], - "outcome": [{ - "collectionName": "interruptInUse", - "databaseName": "sdam-tests", - "documents": [{ "_id": 1 }] - }] + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + } + ] + } + ] }, { "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable", @@ -204,22 +215,22 @@ "client": { "id": "client", "useMultipleMongoses": false, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ], "uriOptions": { "connectTimeoutMS": 500, "heartbeatFrequencyMS": 500, "appname": "interruptInUseRetryable", "retryReads": true, "minPoolSize": 0 - }, - "observeEvents": [ - "poolClearedEvent", - "connectionClosedEvent", - "commandFailedEvent", - "commandStartedEvent", - "commandSucceededEvent", - "connectionCheckedOutEvent", - "connectionCheckedInEvent" - ] + } } }, { @@ -248,7 +259,9 @@ "name": "insertOne", "object": "collection", "arguments": { - "document": { "_id" : 1 } + "document": { + "_id": 1 + } } }, { @@ -257,11 +270,13 @@ "arguments": { "thread": "thread1", "operation": { - "name": "find", - "object": "collection", - "arguments": { - "filter": { "$where": "sleep(2000) || true" } + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "$where": "sleep(2000) || true" } + } } } }, @@ -269,6 +284,7 @@ "name": "failPoint", "object": "testRunner", "arguments": { + "client": "setupClient", "failPoint": { "configureFailPoint": "failCommand", "mode": { @@ -276,22 +292,22 @@ }, "data": { "failCommands": [ - "hello", "isMaster" + "hello", + "isMaster" ], "blockConnection": true, "blockTimeMS": 1500, "appName": "interruptInUseRetryable" } - }, - "client": "setupClient" + } } }, { - "name": "waitForThread", - "object": "testRunner", - "arguments": { - "thread": "thread1" - } + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } } ], "expectEvents": [ @@ -329,20 +345,20 @@ "commandName": "find" } } - ] - }, - { + ] + }, + { "client": "client", "eventType": "cmap", "events": [ { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { - "connectionCheckedInEvent": { } + "connectionCheckedInEvent": {} }, { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { "poolClearedEvent": { @@ -353,7 +369,7 @@ "connectionCheckedInEvent": {} }, { - "connectionClosedEvent": { } + "connectionClosedEvent": {} }, { "connectionCheckedOutEvent": {} @@ -361,14 +377,20 @@ { "connectionCheckedInEvent": {} } - ] - } + ] + } ], - "outcome": [{ - "collectionName": "interruptInUse", - "databaseName": "sdam-tests", - "documents": [{ "_id": 1 }] - }] + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + } + ] + } + ] }, { "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write", @@ -382,22 +404,23 @@ "client": { "id": "client", "useMultipleMongoses": false, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandFailedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ], "uriOptions": { "connectTimeoutMS": 500, "heartbeatFrequencyMS": 500, "appname": "interruptInUseRetryableWrite", "retryWrites": true, "minPoolSize": 0 - }, - "observeEvents": [ - "poolClearedEvent", - "connectionClosedEvent", - "commandFailedEvent", - "commandStartedEvent", - "commandSucceededEvent", - "connectionCheckedOutEvent", - "connectionCheckedInEvent" - ]} + } + } }, { "database": { @@ -425,7 +448,9 @@ "name": "insertOne", "object": "collection", "arguments": { - "document": { "_id": 1 } + "document": { + "_id": 1 + } } }, { @@ -437,9 +462,15 @@ "name": "updateOne", "object": "collection", "arguments": { - "filter": { "$where": "sleep(2000) || true" }, - "update": [ { "$set": { "a": "bar" } } ] - } + "filter": { + "$where": "sleep(2000) || true" + }, + "update": { + "$set": { + "a": "bar" + } + } + } } } }, @@ -447,6 +478,7 @@ "name": "failPoint", "object": "testRunner", "arguments": { + "client": "setupClient", "failPoint": { "configureFailPoint": "failCommand", "mode": { @@ -454,22 +486,22 @@ }, "data": { "failCommands": [ - "hello", "isMaster" + "hello", + "isMaster" ], "blockConnection": true, "blockTimeMS": 1500, "appName": "interruptInUseRetryableWrite" } - }, - "client": "setupClient" + } } }, { - "name": "waitForThread", - "object": "testRunner", - "arguments": { - "thread": "thread1" - } + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } } ], "expectEvents": [ @@ -507,20 +539,20 @@ "commandName": "update" } } - ] - }, - { + ] + }, + { "client": "client", "eventType": "cmap", "events": [ { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { - "connectionCheckedInEvent": { } + "connectionCheckedInEvent": {} }, { - "connectionCheckedOutEvent": { } + "connectionCheckedOutEvent": {} }, { "poolClearedEvent": { @@ -531,7 +563,7 @@ "connectionCheckedInEvent": {} }, { - "connectionClosedEvent": { } + "connectionClosedEvent": {} }, { "connectionCheckedOutEvent": {} @@ -539,14 +571,21 @@ { "connectionCheckedInEvent": {} } - ] - } + ] + } ], - "outcome": [{ - "collectionName": "interruptInUse", - "databaseName": "sdam-tests", - "documents": [{ "_id": 1, "a" : "bar"}] - }] + "outcome": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1, + "a": "bar" + } + ] + } + ] } ] -} \ No newline at end of file +} diff --git a/test/discovery_and_monitoring/unified/loadbalanced-emit-topology-changed-before-close.json b/test/discovery_and_monitoring/unified/loadbalanced-emit-topology-changed-before-close.json new file mode 100644 index 000000000..30c065763 --- /dev/null +++ b/test/discovery_and_monitoring/unified/loadbalanced-emit-topology-changed-before-close.json @@ -0,0 +1,88 @@ +{ + "description": "loadbalanced-emit-topology-description-changed-before-close", + "schemaVersion": "1.20", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ], + "minServerVersion": "4.4" + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "topologyDescriptionChangedEvent", + "topologyOpeningEvent", + "topologyClosedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "events": [ + { + "topologyOpeningEvent": {} + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Unknown" + }, + "newDescription": {} + } + }, + { + "topologyDescriptionChangedEvent": { + "newDescription": { + "type": "LoadBalanced" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyClosedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/replicaset-emit-topology-changed-before-close.json b/test/discovery_and_monitoring/unified/replicaset-emit-topology-changed-before-close.json new file mode 100644 index 000000000..066a4ffee --- /dev/null +++ b/test/discovery_and_monitoring/unified/replicaset-emit-topology-changed-before-close.json @@ -0,0 +1,89 @@ +{ + "description": "replicaset-emit-topology-description-changed-before-close", + "schemaVersion": "1.20", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ], + "minServerVersion": "4.4" + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "topologyDescriptionChangedEvent", + "topologyOpeningEvent", + "topologyClosedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "topologyOpeningEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "ReplicaSetWithPrimary" + }, + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyClosedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/sharded-emit-topology-changed-before-close.json b/test/discovery_and_monitoring/unified/sharded-emit-topology-changed-before-close.json new file mode 100644 index 000000000..98fb58553 --- /dev/null +++ b/test/discovery_and_monitoring/unified/sharded-emit-topology-changed-before-close.json @@ -0,0 +1,108 @@ +{ + "description": "sharded-emit-topology-description-changed-before-close", + "schemaVersion": "1.20", + "runOnRequirements": [ + { + "topologies": [ + "sharded" + ], + "minServerVersion": "4.4" + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "topologyDescriptionChangedEvent", + "topologyOpeningEvent", + "topologyClosedEvent" + ], + "useMultipleMongoses": true + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 3 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "topologyOpeningEvent": {} + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Unknown" + }, + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Unknown" + }, + "newDescription": { + "type": "Sharded" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Sharded" + }, + "newDescription": { + "type": "Sharded" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Sharded" + }, + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyClosedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/standalone-emit-topology-changed-before-close.json b/test/discovery_and_monitoring/unified/standalone-emit-topology-changed-before-close.json new file mode 100644 index 000000000..27b5444d5 --- /dev/null +++ b/test/discovery_and_monitoring/unified/standalone-emit-topology-changed-before-close.json @@ -0,0 +1,97 @@ +{ + "description": "standalone-emit-topology-description-changed-before-close", + "schemaVersion": "1.20", + "runOnRequirements": [ + { + "topologies": [ + "single" + ], + "minServerVersion": "4.4" + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "topologyDescriptionChangedEvent", + "topologyOpeningEvent", + "topologyClosedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "topologyOpeningEvent": {} + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Unknown" + }, + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Unknown" + }, + "newDescription": { + "type": "Single" + } + } + }, + { + "topologyDescriptionChangedEvent": { + "previousDescription": { + "type": "Single" + }, + "newDescription": { + "type": "Unknown" + } + } + }, + { + "topologyClosedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/test_sdam_monitoring_spec.py b/test/test_sdam_monitoring_spec.py index 5faee9b10..63281c987 100644 --- a/test/test_sdam_monitoring_spec.py +++ b/test/test_sdam_monitoring_spec.py @@ -122,7 +122,7 @@ def compare_events(expected_dict, actual): elif expected_type == "topology_opening_event": if not isinstance(actual, monitoring.TopologyOpenedEvent): - return False, "Expected TopologyOpeningEvent, got %s" % (actual.__class__) + return False, "Expected TopologyOpenedEvent, got %s" % (actual.__class__) elif expected_type == "topology_description_changed_event": if not isinstance(actual, monitoring.TopologyDescriptionChangedEvent): diff --git a/test/unified_format.py b/test/unified_format.py index df9e2af7f..d311d9729 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -475,6 +475,11 @@ class EntityMapUtil: if entity_type == "client": kwargs: dict = {} observe_events = spec.get("observeEvents", []) + + # The unified tests use topologyOpeningEvent, we use topologyOpenedEvent + for i in range(len(observe_events)): + if "topologyOpeningEvent" == observe_events[i]: + observe_events[i] = "topologyOpenedEvent" ignore_commands = spec.get("ignoreCommandMonitoringEvents", []) observe_sensitive_commands = spec.get("observeSensitiveCommands", False) ignore_commands = [cmd.lower() for cmd in ignore_commands] @@ -924,6 +929,10 @@ class MatchEvaluatorUtil: self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent) elif name == "topologyDescriptionChangedEvent": self.test.assertIsInstance(actual, TopologyDescriptionChangedEvent) + elif name == "topologyOpeningEvent": + self.test.assertIsInstance(actual, TopologyOpenedEvent) + elif name == "topologyClosedEvent": + self.test.assertIsInstance(actual, TopologyClosedEvent) else: raise Exception(f"Unsupported event type {name}")