PYTHON-924 - Use electionId to detect stale primaries.

This commit is contained in:
A. Jesse Jiryu Davis 2015-05-01 13:35:23 -04:00
parent 51cadce6fb
commit 3d1c20669c
17 changed files with 508 additions and 26 deletions

View File

@ -111,6 +111,10 @@ class IsMaster(object):
def max_wire_version(self):
return self._doc.get('maxWireVersion', common.MAX_WIRE_VERSION)
@property
def election_id(self):
return self._doc.get('electionId')
@property
def is_writable(self):
return self._is_writable

View File

@ -32,7 +32,8 @@ class ServerDescription(object):
'_address', '_server_type', '_all_hosts', '_tags', '_replica_set_name',
'_primary', '_max_bson_size', '_max_message_size',
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
'_round_trip_time', '_is_writable', '_is_readable', '_error')
'_round_trip_time', '_is_writable', '_is_readable', '_error',
'_election_id')
def __init__(
self,
@ -54,6 +55,7 @@ class ServerDescription(object):
self._max_write_batch_size = ismaster.max_write_batch_size
self._min_wire_version = ismaster.min_wire_version
self._max_wire_version = ismaster.max_wire_version
self._election_id = ismaster.election_id
self._is_writable = ismaster.is_writable
self._is_readable = ismaster.is_readable
self._round_trip_time = round_trip_time
@ -106,6 +108,10 @@ class ServerDescription(object):
def max_wire_version(self):
return self._max_wire_version
@property
def election_id(self):
return self._election_id
@property
def round_trip_time(self):
"""The current average latency or None."""

View File

@ -40,7 +40,8 @@ class Topology(object):
topology_description = TopologyDescription(
topology_settings.get_topology_type(),
topology_settings.get_server_descriptions(),
topology_settings.replica_set_name)
topology_settings.replica_set_name,
None)
self._description = topology_description
# Store the seed list to help diagnose errors in _error_message().

View File

@ -28,7 +28,12 @@ TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
class TopologyDescription(object):
def __init__(self, topology_type, server_descriptions, replica_set_name):
def __init__(
self,
topology_type,
server_descriptions,
replica_set_name,
max_election_id):
"""Represent a topology of servers.
:Parameters:
@ -36,10 +41,12 @@ class TopologyDescription(object):
- `server_descriptions`: dict of (address, ServerDescription) for
all seeds
- `replica_set_name`: replica set name or None
- `max_election_id`: greatest electionId seen from a primary, or None
"""
self._topology_type = topology_type
self._replica_set_name = replica_set_name
self._server_descriptions = server_descriptions
self._max_election_id = max_election_id
# Is PyMongo compatible with all servers' wire protocols?
self._incompatible_err = None
@ -96,7 +103,11 @@ class TopologyDescription(object):
sds = dict((address, ServerDescription(address))
for address in self._server_descriptions)
return TopologyDescription(topology_type, sds, self._replica_set_name)
return TopologyDescription(
topology_type,
sds,
self._replica_set_name,
self._max_election_id)
def server_descriptions(self):
"""Dict of (address, ServerDescription)."""
@ -111,6 +122,11 @@ class TopologyDescription(object):
"""The replica set name."""
return self._replica_set_name
@property
def max_election_id(self):
"""Greatest electionId seen from a primary, or None."""
return self._max_election_id
@property
def known_servers(self):
"""List of Servers of types besides Unknown."""
@ -146,6 +162,7 @@ def updated_topology_description(topology_description, server_description):
# TopologyDescription.
topology_type = topology_description.topology_type
set_name = topology_description.replica_set_name
max_election_id = topology_description.max_election_id
server_type = server_description.server_type
# Don't mutate the original dict of server descriptions; copy it.
@ -156,7 +173,11 @@ def updated_topology_description(topology_description, server_description):
if topology_type == TOPOLOGY_TYPE.Single:
# Single type never changes.
return TopologyDescription(TOPOLOGY_TYPE.Single, sds, set_name)
return TopologyDescription(
TOPOLOGY_TYPE.Single,
sds,
set_name,
max_election_id)
if topology_type == TOPOLOGY_TYPE.Unknown:
if server_type == SERVER_TYPE.Standalone:
@ -174,8 +195,8 @@ def updated_topology_description(topology_description, server_description):
sds.pop(address)
elif server_type == SERVER_TYPE.RSPrimary:
topology_type, set_name = _update_rs_from_primary(
sds, set_name, server_description)
topology_type, set_name, max_election_id = _update_rs_from_primary(
sds, set_name, server_description, max_election_id)
elif server_type in (
SERVER_TYPE.RSSecondary,
@ -190,8 +211,8 @@ def updated_topology_description(topology_description, server_description):
topology_type = _check_has_primary(sds)
elif server_type == SERVER_TYPE.RSPrimary:
topology_type, set_name = _update_rs_from_primary(
sds, set_name, server_description)
topology_type, set_name, max_election_id = _update_rs_from_primary(
sds, set_name, server_description, max_election_id)
elif server_type in (
SERVER_TYPE.RSSecondary,
@ -205,16 +226,21 @@ def updated_topology_description(topology_description, server_description):
topology_type = _check_has_primary(sds)
# Return updated copy.
return TopologyDescription(topology_type, sds, set_name)
return TopologyDescription(topology_type, sds, set_name, max_election_id)
def _update_rs_from_primary(sds, replica_set_name, server_description):
def _update_rs_from_primary(
sds,
replica_set_name,
server_description,
max_election_id):
"""Update topology description from a primary's ismaster response.
Pass in a dict of ServerDescriptions, current replica set name, and the
ServerDescription we are processing.
Pass in a dict of ServerDescriptions, current replica set name, the
ServerDescription we are processing, and the TopologyDescription's
max_election_id if any.
Returns (new topology type, new replica_set_name).
Returns (new topology type, new replica_set_name, new max_election_id).
"""
if replica_set_name is None:
replica_set_name = server_description.replica_set_name
@ -223,7 +249,16 @@ def _update_rs_from_primary(sds, replica_set_name, server_description):
# We found a primary but it doesn't have the replica_set_name
# provided by the user.
sds.pop(server_description.address)
return _check_has_primary(sds), replica_set_name
return _check_has_primary(sds), replica_set_name, max_election_id
if server_description.election_id is not None:
if max_election_id and max_election_id > server_description.election_id:
# Stale primary, set to type Unknown.
address = server_description.address
sds[address] = ServerDescription(address)
return _check_has_primary(sds), replica_set_name, max_election_id
max_election_id = server_description.election_id
# We've heard from the primary. Is it the same primary as before?
for server in sds.values():
@ -247,7 +282,7 @@ def _update_rs_from_primary(sds, replica_set_name, server_description):
# If the host list differs from the seed list, we may not have a primary
# after all.
return _check_has_primary(sds), replica_set_name
return _check_has_primary(sds), replica_set_name, max_election_id
def _update_rs_with_primary_from_member(

View File

@ -0,0 +1,58 @@
{
"description": "New primary with equal electionId",
"phases": [
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": {
"$oid": "000000000000000000000001"
},
"setName": "rs",
"type": "RSPrimary"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
],
[
"b:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
}
],
"uri": "mongodb://a/?replicaSet=rs"
}

View File

@ -0,0 +1,117 @@
{
"description": "New primary with greater electionId",
"phases": [
{
"outcome": {
"servers": {
"a:27017": {
"electionId": {
"$oid": "000000000000000000000001"
},
"setName": "rs",
"type": "RSPrimary"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": {
"$oid": "000000000000000000000002"
},
"setName": "rs",
"type": "RSPrimary"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"b:27017",
{
"electionId": {
"$oid": "000000000000000000000002"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": {
"$oid": "000000000000000000000002"
},
"setName": "rs",
"type": "RSPrimary"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
}
],
"uri": "mongodb://a/?replicaSet=rs"
}

View File

@ -0,0 +1,107 @@
{
"description": "Primary with no electionId, then a primary with electionId",
"phases": [
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": "rs",
"type": "RSPrimary"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": {
"$oid": "000000000000000000000001"
},
"setName": "rs",
"type": "RSPrimary"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"b:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": "rs",
"type": "RSPrimary"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
}
],
"uri": "mongodb://a/?replicaSet=rs"
}

View File

@ -0,0 +1,154 @@
{
"description": "Disconnected from primary, reject stale primary",
"phases": [
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": {
"$oid": "000000000000000000000002"
},
"setName": "rs",
"type": "RSPrimary"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
],
[
"b:27017",
{
"electionId": {
"$oid": "000000000000000000000002"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetNoPrimary"
},
"responses": [
[
"b:27017",
{}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetNoPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000001"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
},
{
"outcome": {
"servers": {
"a:27017": {
"electionId": {
"$oid": "000000000000000000000003"
},
"setName": "rs",
"type": "RSPrimary"
},
"b:27017": {
"electionId": null,
"setName": null,
"type": "Unknown"
}
},
"setName": "rs",
"topologyType": "ReplicaSetWithPrimary"
},
"responses": [
[
"a:27017",
{
"electionId": {
"$oid": "000000000000000000000003"
},
"hosts": [
"a:27017",
"b:27017"
],
"ismaster": true,
"ok": 1,
"setName": "rs"
}
]
]
}
],
"uri": "mongodb://a/?replicaSet=rs"
}

View File

@ -27,5 +27,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -24,5 +24,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -28,5 +28,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -27,5 +27,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -28,5 +28,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -23,5 +23,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -23,5 +23,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -20,5 +20,5 @@
]
}
],
"uri": "mongodb://a/?connect=direct"
"uri": "mongodb://a"
}

View File

@ -14,13 +14,13 @@
"""Test the topology module."""
import json
import os
import sys
import threading
sys.path[0:0] = [""]
from bson import json_util
from pymongo import common
from pymongo.topology import Topology
from pymongo.topology_description import TOPOLOGY_TYPE
@ -175,7 +175,7 @@ def create_tests():
for filename in filenames:
with open(os.path.join(dirpath, filename)) as scenario_stream:
scenario_def = json.load(scenario_stream)
scenario_def = json_util.loads(scenario_stream.read())
# Construct test from scenario.
new_test = create_test(scenario_def)