diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 57a8ef10aa..527564f36b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -84,7 +84,7 @@ named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET, HostTargetingStatement) from cassandra.marshal import int64_pack -from cassandra.tablets import Tablet +from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block from cassandra.timestamps import MonotonicTimestampGenerator from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query @@ -3058,6 +3058,8 @@ def _create_response_future(self, query, parameters, trace, custom_payload, continuous_paging_options, statement_keyspace) elif isinstance(query, BoundStatement): prepared_statement = query.prepared_statement + # The tablet_version_block is filled in per-target-host at send time + # (see ResponseFuture._query), because V2 is negotiated per connection. message = ExecuteMessage( prepared_statement.query_id, query.values, cl, serial_cl, fetch_size, paging_state, timestamp, @@ -3092,6 +3094,42 @@ def _create_response_future(self, query, parameters, trace, custom_payload, load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan, continuous_paging_state=None, host=host) + def _compute_tablet_version_block(self, query): + """ + Compute the tablet_version_block byte for a BoundStatement. + + Always returns an int in [0, 255]. When no cached tablet is known for the + routing key (unknown keyspace/table, vnode table, cold cache, or a + missing token map) a random block is returned; the server treats that as + a version miss and replies with fresh routing info. Callers invoke this + only for connections that negotiated TABLETS_ROUTING_V2. + """ + routing_key = query.routing_key + if routing_key is None: + return random_tablet_version_block() + + keyspace = query.keyspace or self.keyspace + table = query.table + if not keyspace or not table: + return random_tablet_version_block() + + # Skip the Murmur3 token hash + tablet lookup when we have no cached + # tablets for this table (vnode tables, or tablet tables on cold start); + # both correctly fall back to a random block below. + if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table): + return random_tablet_version_block() + + token_map = self.cluster.metadata.token_map + if token_map is None: + return random_tablet_version_block() + + t = query.routing_token(token_map.token_class) + tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t) + if tablet is None or tablet.tablet_version is None: + return random_tablet_version_block() + + return choose_tablet_version_block(tablet.tablet_version) + def get_execution_profile(self, name): """ Returns the execution profile associated with the provided ``name``. @@ -3768,7 +3806,6 @@ class PeersQueryType(object): _schema_meta_page_size = 1000 _uses_peers_v2 = True - _tablets_routing_v1 = False # for testing purposes _time = time @@ -3902,8 +3939,6 @@ def _try_connect(self, endpoint): self._metadata_request_timeout = None if connection.features.sharding_info is None or not self._cluster.metadata_request_timeout \ else datetime.timedelta(seconds=self._cluster.metadata_request_timeout) - self._tablets_routing_v1 = connection.features.tablets_routing_v1 - # use weak references in both directions # _clear_watcher will be called when this ControlConnection is about to be finalized # _watch_callback will get the actual callback from the Connection and relay it to @@ -4717,6 +4752,7 @@ class ResponseFuture(object): _host = None _control_connection_query_attempted = False _TABLET_ROUTING_CTYPE = None + _TABLET_ROUTING_V2_CTYPE = None _warned_timeout = False @@ -4934,6 +4970,36 @@ def _handle_control_connection_response(self, connection, cb, response): connection.in_flight -= 1 cb(response) + def _prepare_message_for_connection(self, message, connection): + """ + Return the message to send on ``connection``, attaching the + tablet_version_block to ExecuteMessages according to the capability that + *this specific connection* negotiated. + + Keying off the borrowed connection (already in hand at every call site) + is both necessary and sufficient: a connection that negotiated + TABLETS_ROUTING_V2 always gets the block -- even if the pool was created, + and any cached flag latched, before the cluster feature was enabled + (e.g. mid rolling-upgrade) -- while a non-V2 connection never gets one, + even if a sibling shard connection in the same pool already negotiated + V2. The server reads the trailing byte only on V2 connections, so + attaching it to a non-V2 connection would leave an unread trailing byte + and desync the frame; a pool-level flag cannot get this right. + + ExecuteMessage is copied per send because ``self.message`` is shared + across speculative executions and retries that may run concurrently on + different threads; mutating tablet_version_block on the shared instance + would race with another in-flight send encoding the same object. + """ + if not isinstance(message, ExecuteMessage): + return message + message = copy(message) + if connection.features.tablets_routing_v2: + message.tablet_version_block = self.session._compute_tablet_version_block(self.query) + else: + message.tablet_version_block = None + return message + def _query_control_connection(self, message=None, cb=None, connection=None, host=None): self._control_connection_query_attempted = True @@ -4961,6 +5027,9 @@ def _query_control_connection(self, message=None, cb=None, connection=None, host cb = partial(self._set_result, host, connection, None) cb = partial(self._handle_control_connection_response, connection, cb) + # The control connection may also be a V2 connection, so it needs the + # trailing tablet_version_block byte just like a pooled send. + message = self._prepare_message_for_connection(message, connection) log.debug("No usable node pools; falling back to control connection for host %s", host) self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, @@ -5006,7 +5075,12 @@ def _query(self, host, message=None, cb=None): try: # TODO get connectTimeout from cluster settings if self.query: - connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table) + # Pass the statement so the pool can reuse the ring token it + # memoized for this request instead of re-hashing the routing key. + connection, request_id = pool.borrow_connection( + timeout=2.0, routing_key=self.query.routing_key, + keyspace=self.query.keyspace, table=self.query.table, + query=self.query) else: connection, request_id = pool.borrow_connection(timeout=2.0) self._connection = connection @@ -5015,6 +5089,8 @@ def _query(self, host, message=None, cb=None): if cb is None: cb = partial(self._set_result, host, connection, pool) + message = self._prepare_message_for_connection(message, connection) + self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, decoder=self._protocol_handler.decode_message, @@ -5117,6 +5193,27 @@ def _reprepare(self, prepare_message, host, connection, pool): # try to submit the original prepared statement on some other host self.send_request() + def _cache_tablet_from_payload(self, payload_key, ctype): + """ + Parse a tablets-routing ``custom_payload`` entry and cache the Tablet. + + ``ctype`` is the tuple type for the negotiated extension. The V1 and V2 + layouts differ only by a trailing ``tablet_version`` field, and + ``Tablet.from_row`` accepts that as an optional final argument, so + unpacking the decoded tuple positionally serves both. The tablet is + cached under the effective keyspace (the statement's, else the + session's) so a prepared statement executed in a session keyspace lands + under the same key ``_compute_tablet_version_block`` looks it up by; + otherwise that lookup always misses. + """ + info = self._custom_payload.get(payload_key) + protocol = self.session.cluster.protocol_version + tablet = Tablet.from_row(*ctype.from_binary(info, protocol)) + keyspace = self.query.keyspace or self.session.keyspace + table = self.query.table + if tablet and keyspace and table: + self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet) + def _set_result(self, host, connection, pool, response): try: self.coordinator_host = host @@ -5132,21 +5229,23 @@ def _set_result(self, host, connection, pool, response): self._warnings = getattr(response, 'warnings', None) self._custom_payload = getattr(response, 'custom_payload', None) - if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload: - protocol = self.session.cluster.protocol_version - info = self._custom_payload.get('tablets-routing-v1') - ctype = ResponseFuture._TABLET_ROUTING_CTYPE - if ctype is None: - ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))') - ResponseFuture._TABLET_ROUTING_CTYPE = ctype - tablet_routing_info = ctype.from_binary(info, protocol) - first_token = tablet_routing_info[0] - last_token = tablet_routing_info[1] - tablet_replicas = tablet_routing_info[2] - tablet = Tablet.from_row(first_token, last_token, tablet_replicas) - keyspace = self.query.keyspace - table = self.query.table - self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet) + if self._custom_payload and connection is not None: + # Parse the routing payload according to what the connection that + # *served this request* negotiated, not the control connection: + # during a rolling upgrade connections may differ, and each + # payload key matches the extension its own connection negotiated. + if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload: + ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE + if ctype is None: + ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)') + ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype + self._cache_tablet_from_payload('tablets-routing-v2', ctype) + elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload: + ctype = ResponseFuture._TABLET_ROUTING_CTYPE + if ctype is None: + ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))') + ResponseFuture._TABLET_ROUTING_CTYPE = ctype + self._cache_tablet_from_payload('tablets-routing-v1', ctype) if isinstance(response, ResultMessage): if response.kind == RESULT_KIND_SET_KEYSPACE: @@ -5560,7 +5659,7 @@ def add_callback(self, fn, *args, **kwargs): Note: in the case that the result is not available when the callback is added, the callback is executed by IO event thread. This means that the callback - should not block or attempt further synchronous requests, because no further + should not block or attempt further synchronous requests because no further IO will be processed until the callback returns. **Important**: if the callback you attach results in an exception being diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 43399b7152..94f7f24abb 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -194,7 +194,8 @@ def _update_keyspace(self, keyspace_meta, new_user_types=None): keyspace_meta.functions = old_keyspace_meta.functions keyspace_meta.aggregates = old_keyspace_meta.aggregates keyspace_meta.views = old_keyspace_meta.views - if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): + if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy or + keyspace_meta.strongly_consistent != old_keyspace_meta.strongly_consistent): self._keyspace_updated(ks_name) else: self._keyspace_added(ks_name) @@ -801,6 +802,14 @@ class KeyspaceMetadata(object): A string indicating whether a graph engine is enabled for this keyspace (Core/Classic). """ + strongly_consistent = False + """ + A boolean indicating whether this keyspace uses strongly-consistent (Raft-based) + tablets. ``True`` only for ScyllaDB keyspaces whose ``consistency`` option is + not eventual (i.e. ``local`` or ``global``). ``False`` for eventually-consistent + keyspaces and for non-ScyllaDB clusters. + """ + _exc_info = None """ set if metadata parsing failed """ @@ -815,6 +824,7 @@ def __init__(self, name, durable_writes, strategy_class, strategy_options, graph self.aggregates = {} self.views = {} self.graph_engine = graph_engine + self.strongly_consistent = False @property def is_graph_enabled(self): @@ -2577,6 +2587,11 @@ class SchemaParserV3(SchemaParserV22): _SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates" _SELECT_VIEWS = "SELECT * FROM system_schema.views" + # ScyllaDB-only: per-keyspace consistency option. The column is null for + # eventually-consistent keyspaces (and the whole table is absent on Cassandra + # and on Scylla versions without strongly-consistent tablets). + _SELECT_SCYLLA_KEYSPACES = "SELECT keyspace_name, consistency FROM system_schema.scylla_keyspaces" + def _is_not_scylla(self): """Check if NOT connected to ScyllaDB by checking for shard awareness.""" return getattr(getattr(self.connection, 'features', None), 'shard_id', None) is None @@ -2610,12 +2625,65 @@ def __init__(self, connection, timeout, fetch_size, metadata_request_timeout): self.indexes_result = [] self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list)) self.keyspace_view_rows = defaultdict(list) + self._scylla_consistency_cache = None + + @staticmethod + def _is_strongly_consistent(consistency): + # The server stores the keyspace consistency option as: + # 'eventual' (equivalently: null), 'local', or 'global'. + # Because Scylla only supports global consistency for now, + # we associate strongly consistent tables with those that + # use 'global' consistency. + return consistency == "global" + + def _get_scylla_keyspaces_consistency(self): + """ + Return a ``{keyspace_name: consistency}`` map read from + ``system_schema.scylla_keyspaces``. + + Only ScyllaDB has this table, and only for keyspaces with a non-default + consistency option. Returns ``{}`` on non-Scylla clusters or when the + table/column is unavailable (older Scylla), in which case every keyspace + is treated as eventually consistent. The result is cached per parser + instance so it is fetched at most once per schema refresh. + + A transient failure to read the table (timeout, connection or server + error) propagates like any other schema-refresh query. That aborts the + refresh, so the previously known metadata -- including each keyspace's + consistency mode -- is left in place and retried, rather than every + keyspace being silently reset to eventual (which would disable leader + routing and evict the tablet cache). A missing table/column is not an + error: _query_build_rows absorbs it via expected_failures and yields an + empty map. + """ + if self._scylla_consistency_cache is not None: + return self._scylla_consistency_cache + + if self._is_not_scylla(): + self._scylla_consistency_cache = {} + return self._scylla_consistency_cache + + rows = self._query_build_rows(self._SELECT_SCYLLA_KEYSPACES, lambda row: row) + self._scylla_consistency_cache = {row["keyspace_name"]: row.get("consistency") for row in rows} + return self._scylla_consistency_cache + + def _set_strong_consistency(self, keyspace_meta): + consistency = self._get_scylla_keyspaces_consistency().get(keyspace_meta.name) + keyspace_meta.strongly_consistent = self._is_strongly_consistent(consistency) + return keyspace_meta + + def get_keyspace(self, keyspaces, keyspace): + keyspace_meta = super(SchemaParserV3, self).get_keyspace(keyspaces, keyspace) + if keyspace_meta is not None: + self._set_strong_consistency(keyspace_meta) + return keyspace_meta def get_all_keyspaces(self): for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces(): for row in self.keyspace_view_rows[keyspace_meta.name]: view_meta = self._build_view_metadata(row) keyspace_meta._add_view_metadata(view_meta) + self._set_strong_consistency(keyspace_meta) yield keyspace_meta def get_table(self, keyspaces, keyspace, table): diff --git a/cassandra/policies.py b/cassandra/policies.py index 14c79fd70e..debfcda758 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -503,14 +503,33 @@ def make_query_plan(self, working_keyspace=None, query=None): return replicas = [] - tablet = self._cluster_metadata._tablets.get_tablet_for_key( - keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key)) + leader_host = None + token = query.routing_token(self._cluster_metadata.token_map.token_class) + tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, token) if tablet is not None: replicas_mapped = set(map(lambda r: r[0], tablet.replicas)) child_plan = child.make_query_plan(keyspace, query) replicas = [host for host in child_plan if host.host_id in replicas_mapped] + + # The leader concept only exists for strongly-consistent keyspaces. + # TABLETS_ROUTING_V2 assigns a tablet_version to *every* tablet table + # (eventually- and strongly-consistent alike), so the version alone + # must not be used to infer a leader. Conversely, replicas[0] is only + # leader-ordered for a tablet that came from a V2 payload, so a + # versionless tablet (V1-sourced, or stale across a consistency flip) + # must not be treated as a leader hint either. Require both a + # strongly-consistent keyspace and a versioned tablet; otherwise keep + # normal token-aware/shuffled ordering. + ks_meta = self._cluster_metadata.keyspaces.get(keyspace) + if (ks_meta is not None and ks_meta.strongly_consistent + and tablet.tablet_version is not None and tablet.replicas): + leader_host_id = tablet.replicas[0][0] + for host in replicas: + if host.host_id == leader_host_id: + leader_host = host + break else: replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key) @@ -523,10 +542,21 @@ def yield_in_order(hosts): if replica.is_up and child.distance(replica) == distance: yield replica - # yield replicas: local_rack, local, remote - yield from yield_in_order(replicas) + # If we have a leader hint, yield it first -- but respect the child + # policy's own filter: never front-run a host the child policy would + # exclude (e.g. one a custom policy reports as IGNORED). + if (leader_host is not None and leader_host.is_up + and child.distance(leader_host) != HostDistance.IGNORED): + yield leader_host + + # yield replicas: local_rack, local, remote (skipping leader already yielded) + for host in yield_in_order(replicas): + if host is not leader_host: + yield host # yield rest of the cluster: local_rack, local, remote - yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]) + for host in yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]): + if host is not leader_host: + yield host def on_up(self, *args, **kwargs): return self._child_policy.on_up(*args, **kwargs) diff --git a/cassandra/pool.py b/cassandra/pool.py index 18bed1bbdc..5b2649439f 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -389,8 +389,6 @@ class HostConnection(object): # the number below, all excess connections will be closed. max_excess_connections_per_shard_multiplier = 3 - tablets_routing_v1 = False - def __init__(self, host, host_distance, session): self.host = host self.host_distance = host_distance @@ -436,11 +434,37 @@ def __init__(self, host, host_distance, session): if first_connection.features.sharding_info and not self._session.cluster.shard_aware_options.disable: self.host.sharding_info = first_connection.features.sharding_info self._open_connections_for_all_shards(first_connection.features.shard_id) - self.tablets_routing_v1 = first_connection.features.tablets_routing_v1 log.debug("Finished initializing connection for host %s", self.host) - def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None): + @property + def supports_tablet_routing(self): + # True when any live connection to this host negotiated a tablet-routing + # extension (V1 or V2). Both are treated identically here: the sole use + # is deciding whether to consult tablet metadata for shard selection + # below. This never affects EXECUTE framing -- that is gated + # per-connection on the connection's own negotiated feature -- so a + # stale value here can at worst pick a suboptimal shard, never desync a + # frame. + # + # Derive it from the live connections instead of latching a value at + # init time: the capability can flip from False to True after the pool + # is created (e.g. once the last node finishes a rolling upgrade), and + # latching would leave the pool stuck on the stale-low value until it is + # recreated. + # + # Snapshot the values first: iterating self._connections lazily lets a + # concurrent shard (re)connection that mutates the dict raise + # "dictionary changed size during iteration". Skip closed/defunct + # connections so a dead connection cannot keep the capability latched on + # while it is being torn down. any() short-circuits, so the common + # (enabled) case is cheap. + connections = tuple(self._connections.values()) + return any(c.features.tablets_routing_v1 or c.features.tablets_routing_v2 + for c in connections + if not c.is_closed and not c.is_defunct) + + def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None, query=None): if self.is_shutdown: raise ConnectionException( "Pool for %s is shutdown" % (self.host,), self.host) @@ -450,15 +474,22 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table shard_id = None if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key: - t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key) - - shard_id = None - if self.tablets_routing_v1 and table is not None: + token_class = self._session.cluster.metadata.token_map.token_class + # Reuse the token the statement already memoized for this request when + # available, so the routing-key hash runs once per request instead of + # again here; fall back to hashing the routing key directly otherwise. + t = query.routing_token(token_class) if query is not None else None + if t is None: + t = token_class.from_key(routing_key) + if self.supports_tablet_routing and table is not None: if keyspace is None: keyspace = self._keyspace tablet = self._session.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t) + # In both V1 and V2 the request is sent to this host, so we pick + # the shard that this host owns for the tablet. Leader-aware host + # selection (V2) happens earlier, in the load balancing policy. if tablet is not None: for replica in tablet.replicas: if replica[0] == self.host.host_id: @@ -506,15 +537,15 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table return random.choice(active_connections) return random.choice(list(self._connections.values())) - def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None): - conn = self._get_connection_for_routing_key(routing_key, keyspace, table) + def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None, query=None): + conn = self._get_connection_for_routing_key(routing_key, keyspace, table, query) start = time.time() remaining = timeout last_retry = False while True: if conn.is_closed: # The connection might have been closed in the meantime - if so, try again - conn = self._get_connection_for_routing_key(routing_key, keyspace, table) + conn = self._get_connection_for_routing_key(routing_key, keyspace, table, query) with conn.lock: if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id: # On last retry we ignore connection status, since it is better to return closed connection than diff --git a/cassandra/protocol.py b/cassandra/protocol.py index bb2865ee53..3b19d1b410 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -632,9 +632,11 @@ class ExecuteMessage(_QueryMessage): def __init__(self, query_id, query_params, consistency_level, serial_consistency_level=None, fetch_size=None, paging_state=None, timestamp=None, skip_meta=False, - continuous_paging_options=None, result_metadata_id=None): + continuous_paging_options=None, result_metadata_id=None, + tablet_version_block=None): self.query_id = query_id self.result_metadata_id = result_metadata_id + self.tablet_version_block = tablet_version_block super(ExecuteMessage, self).__init__(query_params, consistency_level, serial_consistency_level, fetch_size, paging_state, timestamp, skip_meta, continuous_paging_options) @@ -646,6 +648,8 @@ def send_body(self, f, protocol_version): if ProtocolVersion.uses_prepared_metadata(protocol_version): write_string(f, self.result_metadata_id) self._write_query_params(f, protocol_version) + if self.tablet_version_block is not None: + write_byte(f, self.tablet_version_block) CUSTOM_TYPE = object() diff --git a/cassandra/protocol_features.py b/cassandra/protocol_features.py index 877998be7d..af954f05e4 100644 --- a/cassandra/protocol_features.py +++ b/cassandra/protocol_features.py @@ -10,19 +10,26 @@ LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK" RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR" TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1" +# The server advertises and expects this exact extension name in SUPPORTED/STARTUP +# (see scylladb transport/cql_protocol_extension.cc). While the feature is gated +# behind the server's `strongly-consistent-tables` experimental flag, the wire +# name carries the `_EXPERIMENTAL` suffix. +TABLETS_ROUTING_V2 = "TABLETS_ROUTING_V2_EXPERIMENTAL" class ProtocolFeatures(object): rate_limit_error = None shard_id = 0 sharding_info = None tablets_routing_v1 = False + tablets_routing_v2 = False lwt_info = None - def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None): + def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, tablets_routing_v2=False, lwt_info=None): self.rate_limit_error = rate_limit_error self.shard_id = shard_id self.sharding_info = sharding_info self.tablets_routing_v1 = tablets_routing_v1 + self.tablets_routing_v2 = tablets_routing_v2 self.lwt_info = lwt_info @staticmethod @@ -30,8 +37,9 @@ def parse_from_supported(supported): rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported) shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported) tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported) + tablets_routing_v2 = ProtocolFeatures.parse_tablets_v2_info(supported) lwt_info = ProtocolFeatures.parse_lwt_info(supported) - return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info) + return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, tablets_routing_v2, lwt_info) @staticmethod def maybe_parse_rate_limit_error(supported): @@ -53,7 +61,11 @@ def get_cql_extension_field(vals, key): def add_startup_options(self, options): if self.rate_limit_error is not None: options[RATE_LIMIT_ERROR_EXTENSION] = "" - if self.tablets_routing_v1: + # Only one of TABLETS_ROUTING_V{1,2} should be negotiated + # per connection. Hence the if-else branch. + if self.tablets_routing_v2: + options[TABLETS_ROUTING_V2] = "" + elif self.tablets_routing_v1: options[TABLETS_ROUTING_V1] = "" if self.lwt_info is not None: options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask) @@ -81,6 +93,10 @@ def parse_sharding_info(options): def parse_tablets_info(options): return TABLETS_ROUTING_V1 in options + @staticmethod + def parse_tablets_v2_info(options): + return TABLETS_ROUTING_V2 in options + @staticmethod def parse_lwt_info(options): value_list = options.get(LWT_ADD_METADATA_MARK, [None]) diff --git a/cassandra/query.py b/cassandra/query.py index 6c6878fdb4..9e1bd7f930 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -275,6 +275,8 @@ class Statement(object): _serial_consistency_level = None _routing_key = None + _routing_token = None + _routing_token_class = None def __init__(self, retry_policy=None, consistency_level=None, routing_key=None, serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None, @@ -314,9 +316,14 @@ def _set_routing_key(self, key): self._routing_key = b"".join(self._key_parts_packed(key)) else: self._routing_key = key + # The memoized ring token is derived from the routing key; invalidate it. + self._routing_token = None + self._routing_token_class = None def _del_routing_key(self): self._routing_key = None + self._routing_token = None + self._routing_token_class = None routing_key = property( _get_routing_key, @@ -331,6 +338,30 @@ def _del_routing_key(self): components should be strings. """) + def routing_token(self, token_class): + """ + Return the ring token for this statement's :attr:`routing_key`, computed + at most once per statement. + + The token is a pure function of the routing key and the cluster's + partitioner (``token_class``), so the (Murmur3) hash is memoized. A + single request would otherwise recompute it three times -- in the load + balancing policy, when selecting the target shard, and when building the + tablet_version_block -- so caching keeps that hot path to one hash. The + cache is keyed on ``token_class`` so reusing a statement against a + cluster with a different partitioner recomputes the token instead of + returning a stale one. Returns ``None`` when there is no routing key. + """ + routing_key = self.routing_key + if routing_key is None: + return None + token = self._routing_token + if token is None or self._routing_token_class is not token_class: + token = token_class.from_key(routing_key) + self._routing_token = token + self._routing_token_class = token_class + return token + def _get_serial_consistency_level(self): return self._serial_consistency_level diff --git a/cassandra/tablets.py b/cassandra/tablets.py index 96e61a50c2..3739b144a0 100644 --- a/cassandra/tablets.py +++ b/cassandra/tablets.py @@ -1,5 +1,6 @@ from bisect import bisect_left from operator import attrgetter +from random import getrandbits from threading import Lock from typing import Optional from uuid import UUID @@ -9,6 +10,32 @@ _get_last_token = attrgetter("last_token") +def choose_tablet_version_block(tablet_version): + """ + Encode a tablet_version_block byte from a cached tablet_version. + Picks a block index at random across calls. + Returns an int in [0, 255]. + + The byte layout: the high nibble is the block index, the low nibble is the value + of that block. Blocks are indexed from the least significant bits to the most + significant ones, so block `idx` occupies bits [idx*4, idx*4 + 4). + """ + # Pick the block index in [0, 15]; getrandbits(4) is a fast C call with no + # application-level shared state. + idx = getrandbits(4) + # Extract the 4-bit nibble at block index `idx` (0 = least significant). + shift = idx * 4 + nibble = (tablet_version >> shift) & 0xF + return (idx << 4) | nibble + + +def random_tablet_version_block(): + """ + Generate a random tablet_version_block byte for cold start. + """ + return getrandbits(8) + + class Tablet(object): """ Represents a single ScyllaDB tablet. @@ -18,15 +45,17 @@ class Tablet(object): first_token = 0 last_token = 0 replicas = None + tablet_version = None # uint64 hash; None means unknown (cold start) - def __init__(self, first_token=0, last_token=0, replicas=None): + def __init__(self, first_token=0, last_token=0, replicas=None, tablet_version=None): self.first_token = first_token self.last_token = last_token self.replicas = replicas + self.tablet_version = tablet_version def __str__(self): - return "" \ - % (self.first_token, self.last_token, self.replicas) + return "" \ + % (self.first_token, self.last_token, self.replicas, self.tablet_version) __repr__ = __str__ @staticmethod @@ -34,9 +63,9 @@ def _is_valid_tablet(replicas): return replicas is not None and len(replicas) != 0 @staticmethod - def from_row(first_token, last_token, replicas): + def from_row(first_token, last_token, replicas, tablet_version=None): if Tablet._is_valid_tablet(replicas): - tablet = Tablet(first_token, last_token, replicas) + tablet = Tablet(first_token, last_token, replicas, tablet_version) return tablet return None diff --git a/docs/scylla-specific.rst b/docs/scylla-specific.rst index 4b28781f1c..0f40630183 100644 --- a/docs/scylla-specific.rst +++ b/docs/scylla-specific.rst @@ -148,7 +148,7 @@ For more details on paging, see :ref:`query-paging`. Tablet Awareness ---------------- -**scylla-driver** is tablet-aware, which means that it is able to parse `TABLETS_ROUTING_V1` extension to ProtocolFeatures, recieve tablet information sent by Scylla in the `custom_payload` part of the `RESULT` message, and utilize it. +**scylla-driver** is tablet-aware, which means that it is able to parse the `TABLETS_ROUTING_V1` and `TABLETS_ROUTING_V2` extensions to ProtocolFeatures, receive tablet information sent by Scylla in the `custom_payload` part of the `RESULT` message, and utilize it. Thanks to this, queries to tablet-based tables are still shard-aware. Details on the scylla cql protocol extensions @@ -156,3 +156,58 @@ https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md Details on the sending tablet information to the drivers https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md#sending-tablet-info-to-the-drivers + + +Tablet version tracking and leader-aware routing +------------------------------------------------ + +When the cluster offers it, the driver negotiates ``TABLETS_ROUTING_V2`` in +preference to V1. The negotiation happens per connection, so a mixed-version +cluster during a rolling upgrade is handled transparently. V2 adds two +capabilities on top of V1, both invisible to application code. + +**Tablet version tracking.** Every tablet now carries a ``tablet_version`` that +changes whenever its replica set is reconfigured. The driver caches the version +it last saw for each tablet and, on every prepared-statement execution over a V2 +connection, appends a single ``tablet_version_block`` byte derived from it. The +server returns updated routing information in the ``custom_payload`` only when +that byte shows the driver's cached view is stale, instead of attaching it to +every response. This keeps the cached routing information fresh while avoiding +the per-response overhead that V1 incurs. + +**Leader-aware routing for strongly-consistent tables.** Tables in a +strongly-consistent keyspace -- one created with a ``consistency`` option and +backed by Raft -- have a tablet leader that coordinates operations. For those +tables the driver sends each request directly to the leader, saving the extra +hop the coordinator would otherwise take to forward it. Eventually-consistent +tables are unaffected and keep their usual token-aware (optionally shuffled) +replica ordering. + +Leader-aware routing is best-effort and bounded by the load-balancing policy: +the leader is only targeted directly if the wrapped policy would consider it in +the first place. For example, a ``DCAwareRoundRobinPolicy`` configured with no +remote hosts will not send cross-datacenter traffic to a leader in another +datacenter; the request goes to a local replica and the server forwards it to +the leader, exactly as it would without V2. + +No configuration is required: as with V1, a ``TokenAwarePolicy`` is all that is +needed. Whether a keyspace is strongly consistent is also surfaced on its +metadata, should you need it: + +.. code:: python + + from cassandra.cluster import Cluster + + cluster = Cluster() + session = cluster.connect() + + ks = cluster.metadata.keyspaces["my_keyspace"] + if ks.strongly_consistent: + print("requests to this keyspace's tablet tables are routed to the leader") + +.. note:: + + While strongly-consistent tables are gated behind Scylla's + ``STRONGLY_CONSISTENT_TABLES`` cluster feature, the extension is advertised + on the wire as ``TABLETS_ROUTING_V2_EXPERIMENTAL``. Connecting to a cluster + that does not offer it transparently falls back to ``TABLETS_ROUTING_V1``. diff --git a/tests/integration/standard/test_tablets_routing_v2.py b/tests/integration/standard/test_tablets_routing_v2.py new file mode 100644 index 0000000000..038246e382 --- /dev/null +++ b/tests/integration/standard/test_tablets_routing_v2.py @@ -0,0 +1,545 @@ +""" +End-to-end tests for TABLETS_ROUTING_V2 against a V2-capable Scylla build. + +Unlike the unit tests in tests/unit/test_tablets.py and tests/unit/test_policies.py, +these tests cross the driver<->server boundary: they validate that the driver +negotiates the extension, parses the server's `tablets-routing-v2` payload with +the correct field layout, and that the tablet_version_block it sends actually +matches the server's encoding. + +The whole module is opt-in: the server only advertises the extension when started +with the `strongly-consistent-tables` experimental feature, and it is exchanged on +the wire under the name `TABLETS_ROUTING_V2_EXPERIMENTAL`. When run against a +server that does not advertise it (e.g. a released Scylla), every test is skipped. +""" + +from contextlib import contextmanager + +import pytest + +import cassandra.cqltypes as types +from cassandra import ConsistencyLevel +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT +from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy +from cassandra.protocol import ExecuteMessage, ProtocolException +from cassandra.protocol_features import ( + ProtocolFeatures, RATE_LIMIT_ERROR_EXTENSION, LWT_ADD_METADATA_MARK, + TABLETS_ROUTING_V1, TABLETS_ROUTING_V2, +) + +from tests.integration import PROTOCOL_VERSION, use_cluster + + +def setup_module(module): + try: + use_cluster('tablets_routing_v2', [3], start=True, set_keyspace=False, + configuration_options={ + # `strongly-consistent-tables` is what gates the server's + # advertisement of TABLETS_ROUTING_V2_EXPERIMENTAL. + 'experimental_features': ['lwt', 'udf', 'strongly-consistent-tables'], + }) + except Exception as exc: + pytest.skip("Could not start a Scylla cluster with the " + "'strongly-consistent-tables' experimental feature: {}".format(exc), + allow_module_level=True) + + +def _startup_with_both_extensions(self, options): + """ + Drop-in replacement for ProtocolFeatures.add_startup_options that negotiates + BOTH tablets_routing_v1 and tablets_routing_v2 on the same connection. + + The real driver makes the two mutually exclusive (V2 wins). Forcing both lets + us prove the server-side precedence rules: scylla checks V2 first and only + falls back to V1 when V2 is not set. + """ + if self.rate_limit_error is not None: + options[RATE_LIMIT_ERROR_EXTENSION] = "" + if self.tablets_routing_v2: + options[TABLETS_ROUTING_V2] = "" + if self.tablets_routing_v1: + options[TABLETS_ROUTING_V1] = "" + if self.lwt_info is not None: + options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask) + + +class TestTabletsRoutingV2Integration: + @classmethod + def setup_class(cls): + cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], + protocol_version=PROTOCOL_VERSION, + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + }, + reconnection_policy=ConstantReconnectionPolicy(1)) + cls.session = cls.cluster.connect() + cls._create_schema(cls.session) + + @classmethod + def teardown_class(cls): + cls.cluster.shutdown() + + @classmethod + def _create_schema(cls, session): + session.execute("DROP KEYSPACE IF EXISTS test_v2") + session.execute( + """ + CREATE KEYSPACE test_v2 + WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} + AND tablets = {'initial': 8} + """) + session.execute("CREATE TABLE test_v2.t (pk int PRIMARY KEY, v int)") + prepared = session.prepare("INSERT INTO test_v2.t (pk, v) VALUES (?, ?)") + for i in range(50): + session.execute(prepared.bind((i, i))) + + session.execute("DROP KEYSPACE IF EXISTS test_v2_sc") + session.execute( + """ + CREATE KEYSPACE test_v2_sc + WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} + AND tablets = {'initial': 8} + AND consistency = 'global' + """) + session.execute("CREATE TABLE test_v2_sc.t (pk int PRIMARY KEY, v int)") + prepared_sc = session.prepare("INSERT INTO test_v2_sc.t (pk, v) VALUES (?, ?)") + # Writes to a strongly-consistent (Raft) table are rejected unless they + # use QUORUM/LOCAL_QUORUM. The session default is LOCAL_ONE, so request + # LOCAL_QUORUM for these inserts; it propagates to every statement bound + # from this prepared one. + prepared_sc.consistency_level = ConsistencyLevel.LOCAL_QUORUM + for i in range(50): + session.execute(prepared_sc.bind((i, i))) + + # -- helpers ---------------------------------------------------------------- + + def _v2_negotiated(self): + connection = self.session.cluster.control_connection._connection + return bool(connection and connection.features.tablets_routing_v2) + + def _skip_if_no_v2(self): + if not self._v2_negotiated(): + pytest.skip("Server did not advertise TABLETS_ROUTING_V2_EXPERIMENTAL; " + "needs a build started with the 'strongly-consistent-tables' feature") + + def _cached_tablet(self, bound): + md = self.session.cluster.metadata + token = md.token_map.token_class.from_key(bound.routing_key) + tablet = md._tablets.get_tablet_for_key(bound.keyspace, bound.table, token) + return tablet, token + + def _ensure_cached(self, bound, attempts=30): + """ + Drive requests until the V2 routing cache is populated for `bound`. + + On a cold start the driver sends a *random* tablet_version_block, which + only matches the server ~1/16 of the time; on a mismatch the server + returns routing info and the cache is filled. We retry until that happens. + """ + for _ in range(attempts): + self.session.execute(bound) + tablet, _token = self._cached_tablet(bound) + if tablet is not None and tablet.tablet_version is not None: + return tablet + raise AssertionError("V2 routing cache was never populated; the server " + "never returned a 'tablets-routing-v2' payload") + + # -- tests ------------------------------------------------------------------ + + def test_v2_is_negotiated(self): + self._skip_if_no_v2() + # Every per-host pool must report tablet-routing support, and every live + # connection in it must have negotiated V2 -- V2 gates the per-connection + # EXECUTE framing. + for pool in self.session._pools.values(): + assert pool.supports_tablet_routing is True + for conn in pool._connections.values(): + assert conn.features.tablets_routing_v2 is True + + def test_v2_payload_populates_cache_with_valid_fields(self): + """Test guarding against the payload tuple being decoded out of order.""" + self._skip_if_no_v2() + + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([2]) + + tablet = self._ensure_cached(bound) + _, token = self._cached_tablet(bound) + + # If the tuple were decoded in the wrong order, first_token/last_token + # would actually carry the version / replica list and these invariants + # would not hold. + assert tablet.tablet_version is not None + assert tablet.first_token <= tablet.last_token + # get_tablet_for_key matches first_token < token <= last_token. + assert tablet.first_token < token.value and token.value <= tablet.last_token + + # Replicas must be real hosts known to the cluster with sane shard ids. + known_host_ids = {h.host_id for h in self.session.cluster.metadata.all_hosts()} + assert tablet.replicas, "tablet has no replicas" + for host_id, shard in tablet.replicas: + assert host_id in known_host_ids, \ + "replica host_id {} is not a known host (corrupt payload?)".format(host_id) + assert isinstance(shard, int) and shard >= 0 + + def test_matching_block_yields_no_payload(self): + """Test guarding against a wrong tablet_version_block bit-shift.""" + self._skip_if_no_v2() + + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([7]) + + # Populate the cache so the driver knows the current tablet_version. + self._ensure_cached(bound) + + # The next request carries a block derived from the cached version. If the + # driver's encoding agrees with the server, the versions match and NO + # routing payload is returned. A wrong shift would mismatch and the server + # would keep returning routing info. + result = self.session.execute(bound) + assert result.one() is not None + payload = result.response_future.custom_payload + assert not (payload and 'tablets-routing-v2' in payload), ( + "Server returned routing info despite a cached, up-to-date " + "tablet_version; the driver's tablet_version_block encoding likely " + "disagrees with the server (locator::compare_tablet_version_block)") + + # -- low-level helpers ------------------------------------------------------ + + @staticmethod + def _right_block(version, idx=0): + """ + Build a tablet_version_block that the server will accept as a match for + block `idx` of `version` (high nibble = index, low nibble = that nibble + of the version). + """ + idx &= 0xF + return (idx << 4) | ((version >> (idx * 4)) & 0xF) + + def _send_raw_execute(self, conn, bound, tablet_version_block): + """ + Send an EXECUTE directly on a specific shard connection with a chosen + tablet_version_block (or None to omit the byte entirely, i.e. behave like + the pre-V2 protocol), and return the decoded response message. + + This bypasses ResponseFuture/load balancing so we control exactly which + node+shard the request hits and which byte is on the wire; it also avoids + polluting the driver's tablet cache. + """ + ps = bound.prepared_statement + msg = ExecuteMessage( + ps.query_id, bound.values, ConsistencyLevel.LOCAL_ONE, + serial_consistency_level=None, fetch_size=None, paging_state=None, + timestamp=None, skip_meta=False, + result_metadata_id=ps.result_metadata_id, + tablet_version_block=tablet_version_block) + return conn.wait_for_response(msg, timeout=30) + + def _decode_v2_payload(self, payload): + ctype = types.lookup_casstype( + 'TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)') + info = ctype.from_binary(payload['tablets-routing-v2'], self.cluster.protocol_version) + return {'first_token': info[0], 'last_token': info[1], + 'replicas': info[2], 'tablet_version': info[3]} + + def _any_connection(self): + for pool in self.session._pools.values(): + for conn in pool._connections.values(): + return conn + raise AssertionError("no shard connections available") + + @staticmethod + def _all_shard_connections(session): + for host, pool in session._pools.items(): + for shard, conn in pool._connections.items(): + yield host, shard, conn + + @staticmethod + def _wait_for_shard_connections(session, timeout=15): + """Wait until each pool has filled its shard-aware connections (background).""" + import time + deadline = time.time() + timeout + while time.time() < deadline: + if all( + len(pool._connections) >= (min(host.sharding_info.shards_count, 2) + if host.sharding_info else 1) + for host, pool in session._pools.items() + ): + return + time.sleep(0.05) + raise AssertionError("Shard-aware connection pools did not fill within {}s".format(timeout)) + + @contextmanager + def _cluster_with_v1_and_v2(self): + """ + Yield a (cluster, session) whose connections negotiated BOTH V1 and V2. + Restores the original startup behavior and shuts the cluster down on exit. + """ + original = ProtocolFeatures.add_startup_options + ProtocolFeatures.add_startup_options = _startup_with_both_extensions + cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], + protocol_version=PROTOCOL_VERSION, + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + }, + reconnection_policy=ConstantReconnectionPolicy(1)) + try: + session = cluster.connect('test_v2') + self._wait_for_shard_connections(session) + yield cluster, session + finally: + cluster.shutdown() + ProtocolFeatures.add_startup_options = original + + @staticmethod + def _find_replica_wrong_shard(session, tablet): + """ + Find a connection to a host that *is* a replica of `tablet` but on a shard + that the host does NOT own for it ("right node, wrong shard"). Returns + (host, owner_shard, wrong_shard, conn) or None if no host has >=2 shards. + """ + replica_shard = {host_id: shard for host_id, shard in tablet.replicas} + for host, pool in session._pools.items(): + owner = replica_shard.get(host.host_id) + if owner is None: + continue + for shard, conn in pool._connections.items(): + if shard != owner: + return host, owner, shard, conn + return None + + # -- scenario tests --------------------------------------------------------- + + def test_index0_all_block_values_exactly_one_match(self): + """ + Scenario 1: for block index 0, exactly one of the 16 possible values + matches the server's tablet_version; every other value is reported as a + mismatch carrying that same tablet_version, whose nibble 0 equals the + value that matched. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([11]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + conn = self._any_connection() + + matched_values = [] + reported_versions = [] + for value in range(16): + block = value # index 0 -> high nibble 0, low nibble = value + resp = self._send_raw_execute(conn, bound, block) + payload = resp.custom_payload or {} + if 'tablets-routing-v2' in payload: + reported_versions.append(self._decode_v2_payload(payload)['tablet_version']) + else: + matched_values.append(value) + + # Exactly one value matches: the low nibble of the version. + assert matched_values == [version & 0xF], \ + "expected exactly one matching block value, got {}".format(matched_values) + # All 15 mismatches report the same tablet_version ... + assert len(reported_versions) == 15 + assert set(reported_versions) == {version} + # ... and that version's block-0 nibble is the value that matched. + assert (version & 0xF) == matched_values[0] + + def test_right_block_to_all_nodes_and_shards_never_returns_payload(self): + """ + Scenario 2: a correct tablet_version_block matches on every node and every + shard (the server's V2 check ignores shard), so no routing payload is ever + returned. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([13]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + sent = 0 + for host, shard, conn in self._all_shard_connections(self.session): + # Vary the block index per shard to also exercise non-zero indices. + block = self._right_block(version, idx=shard) + resp = self._send_raw_execute(conn, bound, block) + payload = resp.custom_payload or {} + assert 'tablets-routing-v2' not in payload, ( + "host {} shard {} returned a routing payload for a correct " + "tablet_version_block".format(host, shard)) + sent += 1 + assert sent >= 1, "no shard connections were exercised" + + def test_v2_takes_precedence_over_v1_no_v1_payload_on_wrong_shard(self): + """ + Scenario 3: with BOTH V1 and V2 negotiated, send a correct V2 block to the + wrong shard. The server checks V2 first; since the block matches there is + no payload at all -- crucially no `tablets-routing-v1`, which V1 would have + emitted for a wrong-shard request. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([17]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + with self._cluster_with_v1_and_v2() as (_cluster, session): + target = self._find_replica_wrong_shard(session, tablet) + if target is None: + pytest.skip("need a replica host with >=2 shards to target a wrong shard") + _host, _owner_shard, _wrong_shard, conn = target + assert conn.features.tablets_routing_v1 and conn.features.tablets_routing_v2, \ + "test setup failed: connection did not negotiate both V1 and V2" + + resp = self._send_raw_execute(conn, bound, self._right_block(version)) + payload = resp.custom_payload or {} + assert 'tablets-routing-v1' not in payload, ( + "server emitted V1 routing info despite V2 being negotiated; " + "V2 must take precedence (select_statement.cc)") + # The correct V2 block also means no V2 payload. + assert 'tablets-routing-v2' not in payload + + def test_v2_negotiated_missing_block_returns_protocol_error(self): + """ + Scenario 4: same setup as scenario 3 (both V1 and V2 negotiated, wrong + shard) but follow the old V1 protocol and send NO tablet_version_block. + + Because V2 is negotiated the server's request parser unconditionally reads + the mandatory trailing byte. Omitting it leaves the frame one byte short, + so the server rejects the request with a protocol error (code 0x000A) + rather than silently falling back to V1. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([19]) + tablet = self._ensure_cached(bound) + + with self._cluster_with_v1_and_v2() as (_cluster, session): + target = self._find_replica_wrong_shard(session, tablet) + if target is None: + pytest.skip("need a replica host with >=2 shards to target a wrong shard") + _host, _owner_shard, _wrong_shard, conn = target + assert conn.features.tablets_routing_v1 and conn.features.tablets_routing_v2, \ + "test setup failed: connection did not negotiate both V1 and V2" + + with pytest.raises(ProtocolException) as exc_info: + self._send_raw_execute(conn, bound, None) + assert exc_info.value.code == 0x000A, \ + "expected a protocol error, got: {!r}".format(exc_info.value) + + # -- strongly-consistent (leader-aware) routing ----------------------------- + + def test_strongly_consistent_keyspace_metadata(self): + """ + The driver must learn from system_schema.scylla_keyspaces which keyspaces + are strongly consistent: test_v2_sc (consistency='global') is, test_v2 + (no consistency clause) is not. This flag is the precondition for + leader-aware routing in TokenAwarePolicy.make_query_plan. + """ + self._skip_if_no_v2() + self.session.cluster.refresh_schema_metadata() + keyspaces = self.session.cluster.metadata.keyspaces + assert keyspaces['test_v2_sc'].strongly_consistent is True + assert keyspaces['test_v2'].strongly_consistent is False + + def test_strongly_consistent_flag_tracks_dynamic_keyspace_changes(self): + """ + The driver reads system_schema.scylla_keyspaces on every schema refresh + and sets KeyspaceMetadata.strongly_consistent for each keyspace. + + The flag is not a one-off computed at connect time -- it has to track the + live schema. Because the driver refreshes its metadata synchronously in + response to a DDL it executes (ResponseFuture handles + RESULT_KIND_SCHEMA_CHANGE by refreshing before returning), a keyspace + created or dropped *after* connecting is immediately reflected in + cluster.metadata.keyspaces, with no sleep or manual refresh required. A + keyspace created by some *other* client is picked up through the very same + refresh path, driven by the control connection's schema-change events + (subject to the schema refresh window); this test drives the changes + through the connected session so the assertions stay deterministic. + """ + self._skip_if_no_v2() + + metadata = self.session.cluster.metadata + ec_ks = "dyn_ec_ks" # eventually consistent (no consistency clause) + sc_ks = "dyn_sc_ks" # strongly consistent (Raft, consistency='global') + + # Start from a known-clean slate so the test is repeatable. + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(ec_ks)) + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(sc_ks)) + try: + assert ec_ks not in metadata.keyspaces + assert sc_ks not in metadata.keyspaces + + # Create an eventually-consistent keyspace after connecting: it shows + # up in the map with strongly_consistent == False. This exercises the + # "absent from scylla_keyspaces -> eventual" path. + self.session.execute( + "CREATE KEYSPACE {0} WITH replication = " + "{{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} " + "AND tablets = {{'initial': 1}}".format(ec_ks)) + assert ec_ks in metadata.keyspaces + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + # Create a strongly-consistent keyspace: same map, flag now True. + # (RF is irrelevant here -- the flag only tracks consistency='global'.) + self.session.execute( + "CREATE KEYSPACE {0} WITH replication = " + "{{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} " + "AND tablets = {{'initial': 1}} AND consistency = 'global'".format(sc_ks)) + assert sc_ks in metadata.keyspaces + assert metadata.keyspaces[sc_ks].strongly_consistent is True + # The previously-created keyspace keeps its (False) flag. + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + # A full refresh (the bulk get_all_keyspaces path, as opposed to the + # single-keyspace get_keyspace path the DDL above exercised) agrees. + self.session.cluster.refresh_schema_metadata() + assert metadata.keyspaces[ec_ks].strongly_consistent is False + assert metadata.keyspaces[sc_ks].strongly_consistent is True + + # Dropping a keyspace removes it from the map and leaves the other + # keyspace's flag untouched. + self.session.execute("DROP KEYSPACE {0}".format(sc_ks)) + assert sc_ks not in metadata.keyspaces + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + self.session.execute("DROP KEYSPACE {0}".format(ec_ks)) + assert ec_ks not in metadata.keyspaces + finally: + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(ec_ks)) + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(sc_ks)) + + def test_leader_aware_routing_targets_the_raft_leader(self): + """ + For a strongly-consistent table, the server orders the replica list with + the Raft leader first. Once that payload is cached, a TokenAwarePolicy + must route every request for the tablet to replicas[0] (the leader), + saving the extra coordinator->leader hop. This is the strongly-consistent + counterpart to the eventually consistent test_v2 tests above, which never + assert *which* replica is hit. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2_sc.t WHERE pk = ?") + bound = select.bind([2]) + + tablet = self._ensure_cached(bound) + assert tablet.replicas, "strongly-consistent tablet has no replicas" + leader_host_id = tablet.replicas[0][0] + + # Leader-first routing only triggers when the keyspace is known to be + # strongly consistent. + ks_meta = self.session.cluster.metadata.keyspaces['test_v2_sc'] + assert ks_meta.strongly_consistent is True + + # With an up-to-date cache the block always matches, so the server returns + # no further payload and replicas[0] stays the leader; every request must + # therefore be coordinated by that leader. + for _ in range(10): + result = self.session.execute(bound) + coordinator = result.response_future.coordinator_host + assert coordinator is not None and coordinator.host_id == leader_host_id, ( + "request coordinated by {} but the Raft leader is replicas[0]={}; " + "leader-aware routing did not target the leader".format( + getattr(coordinator, 'host_id', None), leader_host_id)) diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index 15cf283777..ad799450bf 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -32,10 +32,11 @@ _UnknownStrategy, ColumnMetadata, TableMetadata, IndexMetadata, Function, Aggregate, Metadata, TokenMap, ReplicationFactor, - SchemaParserDSE68) + SchemaParserDSE68, SchemaParserV3) from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host from cassandra.protocol import QueryMessage +from cassandra.tablets import Tablet from tests.util import assertCountEqual import pytest @@ -552,6 +553,100 @@ def test_export_as_string_user_types(self): );""" == keyspace.export_as_string() +class KeyspaceConsistencyTabletInvalidationTest(unittest.TestCase): + """ + Metadata._update_keyspace must drop cached tablets when a keyspace's + strong-consistency mode changes, not only when its replication strategy + changes. A tablet cached while the keyspace was eventually consistent has no + leader ordering, so it must not survive an eventual->global flip and then be + misread as a leader hint by TokenAwarePolicy.make_query_plan. + """ + + def _ks_meta(self, strongly_consistent): + meta = KeyspaceMetadata('ks', True, 'NetworkTopologyStrategy', {'replication_factor': '1'}) + meta.strongly_consistent = strongly_consistent + return meta + + def _add_cached_tablet(self, metadata): + tablet = Tablet(first_token=-100, last_token=100, + replicas=[(uuid.uuid4(), 0)], tablet_version=1) + metadata._tablets.add_tablet('ks', 'tbl', tablet) + + def test_consistency_flip_drops_tablets(self): + metadata = Metadata() + metadata._update_keyspace(self._ks_meta(strongly_consistent=False)) + self._add_cached_tablet(metadata) + assert metadata._tablets.table_has_tablets('ks', 'tbl') + + # Same replication strategy, consistency flips False -> True: the stale + # tablet cache must be dropped. + metadata._update_keyspace(self._ks_meta(strongly_consistent=True)) + assert not metadata._tablets.table_has_tablets('ks', 'tbl') + + def test_no_consistency_change_keeps_tablets(self): + metadata = Metadata() + metadata._update_keyspace(self._ks_meta(strongly_consistent=False)) + self._add_cached_tablet(metadata) + assert metadata._tablets.table_has_tablets('ks', 'tbl') + + # No replication change and no consistency change: cache is preserved. + metadata._update_keyspace(self._ks_meta(strongly_consistent=False)) + assert metadata._tablets.table_has_tablets('ks', 'tbl') + + +class ScyllaKeyspaceConsistencyParsingTest(unittest.TestCase): + """ + SchemaParserV3._set_strong_consistency maps the server's per-keyspace + consistency option to the strongly_consistent flag. A transient failure + reading the consistency table propagates so the schema refresh aborts and + the previously known metadata is retried, rather than being reset to + eventual. + """ + + def _parser_with_consistency(self, consistency_by_ks): + # Build the parser without a connection: _set_strong_consistency only + # needs the cached consistency result. + parser = SchemaParserV3.__new__(SchemaParserV3) + parser._scylla_consistency_cache = consistency_by_ks + return parser + + def _flag_for(self, parser, ks_name): + meta = KeyspaceMetadata(ks_name, True, 'NetworkTopologyStrategy', {'replication_factor': '1'}) + parser._set_strong_consistency(meta) + return meta.strongly_consistent + + def test_only_global_is_strongly_consistent(self): + # For now, strong consistency can only be used with 'global' + # consistency. To play it safe, we use the equivalence: + # strongly consistent === uses global consistency, + # in the driver. This test verifies it. + # It should be modified or removed after 'local' consistency + # is implemented. + parser = self._parser_with_consistency( + {'g': 'global', 'l': 'local', 'e': 'eventual'}) + assert self._flag_for(parser, 'g') is True + assert self._flag_for(parser, 'l') is False + assert self._flag_for(parser, 'e') is False + # A keyspace absent from the (successful) read defaults to eventual. + assert self._flag_for(parser, 'absent') is False + + def test_read_failure_propagates(self): + # A transient failure reading system_schema.scylla_keyspaces must + # propagate (not be swallowed into "eventual"), so the schema refresh + # aborts and the previously known consistency modes are retried. + parser = SchemaParserV3.__new__(SchemaParserV3) + parser._scylla_consistency_cache = None + parser._is_not_scylla = lambda: False + + def _raise_timeout(*args, **kwargs): + raise cassandra.OperationTimedOut("scylla_keyspaces read timed out") + parser._query_build_rows = _raise_timeout + + meta = KeyspaceMetadata('g', True, 'NetworkTopologyStrategy', {'replication_factor': '1'}) + with pytest.raises(cassandra.OperationTimedOut): + parser._set_strong_consistency(meta) + + class UserTypesTest(unittest.TestCase): def test_as_cql_query(self): diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 63a3c3d12d..0949b4359f 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -943,6 +943,291 @@ def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key): child_policy.make_query_plan.assert_called_once_with(keyspace, query) assert patched_shuffle.call_count == 1 + def test_leader_aware_routing_with_tablet_version(self): + """ + For a strongly-consistent keyspace, the leader (first replica in the + tablet's replica list) must be yielded first in the query plan, even + when it is not the closest replica. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + # The leader is hosts[2] (first in tablet.replicas). + leader = hosts[2] + other_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(leader.host_id, 0), (other_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [leader, other_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + # Put the leader last in the child plan and make it the farther replica + # by distance (LOCAL vs LOCAL_RACK). Without leader-first routing, + # other_replica would be yielded before the leader. + child_policy.make_query_plan.return_value = [hosts[0], hosts[1], other_replica, leader] + distances = { + leader: HostDistance.LOCAL, + other_replica: HostDistance.LOCAL_RACK, + hosts[0]: HostDistance.LOCAL, + hosts[1]: HostDistance.LOCAL, + } + child_policy.distance.side_effect = lambda host: distances.get(host, HostDistance.LOCAL) + + # shuffle_replicas=False keeps replica ordering deterministic so the only + # thing that can pull the leader to the front is the leader-first logic. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader must be first, even though other_replica is closer (LOCAL_RACK + # vs LOCAL) and the leader is last in the child plan. + self.assertEqual(qplan[0], leader) + # The closer replica follows, and the leader appears exactly once. + self.assertEqual(qplan[1], other_replica) + self.assertEqual(qplan.count(leader), 1) + + def test_leader_fallback_when_leader_is_down(self): + """ + When the leader host is down, the driver should fall back to other + replicas without crashing. The leader should NOT appear in the plan. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + leader = hosts[2] + other_replica = hosts[3] + leader.set_down() # Simulate leader being unreachable. + + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(leader.host_id, 0), (other_replica.host_id, 1)], + tablet_version=0xCAFEBABE00000001 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [leader, other_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + child_policy.make_query_plan.return_value = hosts + child_policy.distance.return_value = HostDistance.LOCAL + + policy = TokenAwarePolicy(child_policy) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader is down, should not appear in the plan. + self.assertNotIn(leader, qplan) + # Other replica should be first. + self.assertEqual(qplan[0], other_replica) + + def test_no_leader_routing_without_tablet_version(self): + """ + replicas[0] is only leader-ordered for a tablet that came from a + TABLETS_ROUTING_V2 payload. A versionless tablet (tablet_version=None, + e.g. cached from V1 or stale across a consistency flip) has arbitrary + replica order, so leader-first routing must NOT fire even for a + strongly-consistent keyspace. The version gate is the only thing that + should suppress it here. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + first_replica = hosts[2] + second_replica = hosts[3] + # Strongly-consistent keyspace but a versionless (V1-style) tablet. + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(first_replica.host_id, 0), (second_replica.host_id, 1)], + tablet_version=None + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [first_replica, second_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + # Order the child plan so the second replica comes before replicas[0]; if + # leader-first wrongly triggered, first_replica would be forced to front. + child_policy.make_query_plan.return_value = [second_replica, first_replica, hosts[0], hosts[1]] + child_policy.distance.return_value = HostDistance.LOCAL + + # shuffle_replicas=False keeps replica ordering deterministic so we can + # assert that no leader is forced to the front. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader-first must NOT apply without a tablet_version: ordering follows + # the child plan, so the second replica (not replicas[0]) stays first. + self.assertEqual(qplan[0], second_replica) + self.assertEqual(len(qplan), 4) + + def test_no_leader_routing_for_eventually_consistent_keyspace(self): + """ + A tablet_version is assigned to eventually-consistent tablet tables too + (TABLETS_ROUTING_V2), but the leader concept only exists for + strongly-consistent keyspaces. For an eventually-consistent keyspace the + leader-first optimization must NOT apply even when a tablet_version is + present. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + first_replica = hosts[2] + second_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(first_replica.host_id, 0), (second_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [first_replica, second_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=False)} + + child_policy = Mock() + # Order the child plan so the second replica comes before the first; if + # leader-first logic wrongly triggered, first_replica would be forced to + # the front instead. + child_policy.make_query_plan.return_value = [second_replica, first_replica, hosts[0], hosts[1]] + child_policy.distance.return_value = HostDistance.LOCAL + + # shuffle_replicas=False keeps replica ordering deterministic so we can + # assert that no leader is forced to the front. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader-first must NOT apply: ordering follows the child plan, so the + # second replica (not replicas[0]) stays first. + self.assertEqual(qplan[0], second_replica) + self.assertEqual(len(qplan), 4) + + def test_no_leader_routing_when_keyspace_metadata_missing(self): + """ + If keyspace metadata is unavailable (e.g. schema refresh disabled), the + policy must safely fall back to no leader-first routing rather than + crashing or guessing. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + first_replica = hosts[2] + second_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(first_replica.host_id, 0), (second_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [first_replica, second_replica] + cluster.metadata.keyspaces = {} # no metadata for 'ks' + + child_policy = Mock() + child_policy.make_query_plan.return_value = [second_replica, first_replica, hosts[0], hosts[1]] + child_policy.distance.return_value = HostDistance.LOCAL + + # shuffle_replicas=False keeps replica ordering deterministic so we can + # assert that no leader is forced to the front. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + self.assertEqual(qplan[0], second_replica) + self.assertEqual(len(qplan), 4) + + def test_leader_skipped_when_child_policy_ignores_it(self): + """ + The leader is yielded first only if the child policy would actually use + it. If a (custom) child policy reports the leader as IGNORED, leader-first + routing must respect that and not front-run an excluded host. The leader + should not appear in the plan at all. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + leader = hosts[2] + other_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(leader.host_id, 0), (other_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [leader, other_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + # The child policy yields the leader but reports it as IGNORED, i.e. it + # would never actually route to it. + child_policy.make_query_plan.return_value = [leader, other_replica, hosts[0], hosts[1]] + distances = { + leader: HostDistance.IGNORED, + other_replica: HostDistance.LOCAL, + hosts[0]: HostDistance.LOCAL, + hosts[1]: HostDistance.LOCAL, + } + child_policy.distance.side_effect = lambda host: distances.get(host, HostDistance.LOCAL) + + # shuffle_replicas=False keeps replica ordering deterministic. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # The IGNORED leader must not be front-run, nor appear at all. + self.assertNotIn(leader, qplan) + self.assertEqual(qplan[0], other_replica) + @patch('cassandra.policies.shuffle') def test_no_shuffle_for_serial_consistency(self, patched_shuffle): diff --git a/tests/unit/test_protocol_features.py b/tests/unit/test_protocol_features.py index 895c384f7e..f754a677fe 100644 --- a/tests/unit/test_protocol_features.py +++ b/tests/unit/test_protocol_features.py @@ -2,7 +2,7 @@ import logging -from cassandra.protocol_features import ProtocolFeatures +from cassandra.protocol_features import ProtocolFeatures, TABLETS_ROUTING_V1, TABLETS_ROUTING_V2 LOGGER = logging.getLogger(__name__) @@ -22,3 +22,45 @@ class OptionsHolder(object): assert protocol_features.rate_limit_error == 123 assert protocol_features.shard_id == 0 assert protocol_features.sharding_info is None + + def test_tablets_routing_v2_negotiation(self): + """V2 is detected from SUPPORTED and subsumes V1 in STARTUP options.""" + options = { + TABLETS_ROUTING_V1: [''], + TABLETS_ROUTING_V2: [''], + } + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is True + assert features.tablets_routing_v2 is True + + # V2 subsumes V1: only TABLETS_ROUTING_V2 should appear in startup. + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V2 in startup + assert TABLETS_ROUTING_V1 not in startup + + def test_tablets_routing_v1_only(self): + """When server only advertises V1, only V1 is negotiated.""" + options = { + TABLETS_ROUTING_V1: [''], + } + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is True + assert features.tablets_routing_v2 is False + + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V1 in startup + assert TABLETS_ROUTING_V2 not in startup + + def test_no_tablets_routing(self): + """When server advertises neither V1 nor V2.""" + options = {} + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is False + assert features.tablets_routing_v2 is False + + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V1 not in startup + assert TABLETS_ROUTING_V2 not in startup diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 9673b0d634..306f3bb2c4 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -40,7 +40,6 @@ class ResponseFutureTests(unittest.TestCase): def make_basic_session(self): s = Mock(spec=Session) s.row_factory = lambda col_names, rows: [(col_names, rows)] - s.cluster.control_connection._tablets_routing_v1 = False s.cluster.allow_control_connection_query_fallback = ControlConnectionQueryFallback.Disabled return s @@ -64,6 +63,11 @@ def make_control_connection(self): connection.is_control_connection = True connection.get_request_id.return_value = 7 connection.send_msg.return_value = 128 + # These tests exercise control-connection query fallback, not tablet + # routing; default the tablet features off so _set_result skips + # tablet-payload parsing for the mocked responses. + connection.features.tablets_routing_v2 = False + connection.features.tablets_routing_v1 = False return connection def make_session(self): @@ -93,7 +97,7 @@ def test_result_message(self): rf.send_request() rf.session._pools.get.assert_called_once_with('ip1') - pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) @@ -137,6 +141,9 @@ def test_schema_change_result(self): kind=RESULT_KIND_SCHEMA_CHANGE, schema_change_event=event_results) connection = Mock() + # Skip tablet-payload parsing for this mocked response/connection pair. + connection.features.tablets_routing_v2 = False + connection.features.tablets_routing_v1 = False rf._set_result(None, connection, None, result) session.submit.assert_called_once_with(ANY, ANY, rf, connection, **event_results) @@ -284,7 +291,7 @@ def test_retry_policy_says_retry(self): rf.send_request() rf.session._pools.get.assert_called_once_with('ip1') - pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) result = Mock(spec=UnavailableErrorMessage, info={}) @@ -303,7 +310,7 @@ def test_retry_policy_says_retry(self): # it should try again with the same host since this was # an UnavailableException rf.session._pools.get.assert_called_with(host) - pool.borrow_connection.assert_called_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_with(rf.message, 2, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) def test_retry_with_different_host(self): @@ -318,7 +325,7 @@ def test_retry_with_different_host(self): rf.send_request() rf.session._pools.get.assert_called_once_with('ip1') - pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) assert ConsistencyLevel.QUORUM == rf.message.consistency_level @@ -337,7 +344,7 @@ def test_retry_with_different_host(self): # it should try with a different host rf.session._pools.get.assert_called_with('ip2') - pool.borrow_connection.assert_called_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_with(rf.message, 2, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) # the consistency level should be the same @@ -982,7 +989,7 @@ def test_single_host_query_plan_exhausted_after_one_retry(self): # Verify initial request was sent rf.session._pools.get.assert_called_once_with(specific_host) - pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY) + pool.borrow_connection.assert_called_once_with(timeout=ANY, routing_key=ANY, keyspace=ANY, table=ANY, query=ANY) connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) # Simulate a ServerError response (which triggers RETRY_NEXT_HOST by default) diff --git a/tests/unit/test_tablets.py b/tests/unit/test_tablets.py index 7a40e7de4d..87478af46e 100644 --- a/tests/unit/test_tablets.py +++ b/tests/unit/test_tablets.py @@ -1,6 +1,6 @@ import unittest -from cassandra.tablets import Tablets, Tablet +from cassandra.tablets import Tablets, Tablet, choose_tablet_version_block, random_tablet_version_block class TabletsTest(unittest.TestCase): def compare_ranges(self, tablets, ranges): @@ -124,3 +124,68 @@ def __init__(self, v): # Token value 50 is not > first_token (100) of the tablet whose # last_token (200) is >= 50, so no match. self.assertIsNone(tablets.get_tablet_for_key("ks", "tb", Token(50))) + + +class TabletVersionBlockTest(unittest.TestCase): + """Tests for tablet_version_block encoding used by TABLETS_ROUTING_V2.""" + + def _server_block_matches(self, version, block): + """Reimplements the server's locator::compare_tablet_version_block.""" + block_value = block & 0x0F + block_index = (block & 0xF0) >> 4 + hash_block = (version >> (block_index * 4)) & 0x0F + return hash_block == block_value + + def test_choose_tablet_version_block_matches_server(self): + """Every block produced by the driver must match the server's check.""" + version = 0x0123456789ABCDEF + # The index is chosen randomly; sample enough times to exercise many indices. + for _ in range(256): + block = choose_tablet_version_block(version) + self.assertTrue(self._server_block_matches(version, block), + f"Block 0x{block:02X} did not match server check for version 0x{version:016X}") + + def test_choose_tablet_version_block_covers_all_indices(self): + """Over many calls the random index selection should probe every block + index, so that any server-side version change is eventually detected.""" + version = 0xFFFFFFFFFFFFFFFF # All nibbles are 0xF + seen_indices = set() + # 16 indices; 1000 draws makes a missing index astronomically unlikely. + for _ in range(1000): + block = choose_tablet_version_block(version) + seen_indices.add((block >> 4) & 0xF) + self.assertEqual(seen_indices, set(range(16))) + + def test_choose_tablet_version_block_matches_server_for_signed_version(self): + """tablet_version is decoded as a *signed* 64-bit int (LongType), so a + version with the high bit set is stored negative in the driver while the + server treats it as unsigned. The block the driver emits must still match + the server's check computed on the unsigned value (sign-boundary guard).""" + for unsigned in (0x8000000000000000, 0xDEADBEEFCAFEBABE, 0xFFFFFFFFFFFFFFFF): + signed = unsigned - (1 << 64) # how LongType stores a high-bit value + self.assertLess(signed, 0) + # Sample enough times to exercise every one of the 16 block indices. + for _ in range(256): + block = choose_tablet_version_block(signed) + self.assertTrue( + self._server_block_matches(unsigned, block), + f"signed version {signed} (unsigned 0x{unsigned:016X}) produced " + f"block 0x{block:02X} that failed the server check") + + def test_random_tablet_version_block_returns_byte(self): + """Verify random_tablet_version_block returns a value in [0, 255].""" + for _ in range(100): + block = random_tablet_version_block() + self.assertIsInstance(block, int) + self.assertGreaterEqual(block, 0) + self.assertLessEqual(block, 255) + + def test_from_row_stores_tablet_version(self): + """Tablet.from_row stores the tablet_version it is given (the V2 payload field).""" + version = 0xDEADBEEFCAFEBABE + tablet = Tablet.from_row(-100, 100, [("host1", 0), ("host2", 1)], tablet_version=version) + self.assertIsNotNone(tablet) + self.assertEqual(tablet.tablet_version, version) + self.assertEqual(tablet.first_token, -100) + self.assertEqual(tablet.last_token, 100) + self.assertEqual(len(tablet.replicas), 2)