Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 120 additions & 21 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
HostTargetingStatement)
from cassandra.marshal import int64_pack
from cassandra.tablets import Tablet
from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block
from cassandra.timestamps import MonotonicTimestampGenerator
from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query

Expand Down Expand Up @@ -3058,6 +3058,8 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
continuous_paging_options, statement_keyspace)
elif isinstance(query, BoundStatement):
prepared_statement = query.prepared_statement
# The tablet_version_block is filled in per-target-host at send time
# (see ResponseFuture._query), because V2 is negotiated per connection.
message = ExecuteMessage(
prepared_statement.query_id, query.values, cl,
serial_cl, fetch_size, paging_state, timestamp,
Expand Down Expand Up @@ -3092,6 +3094,42 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
continuous_paging_state=None, host=host)

def _compute_tablet_version_block(self, query):
"""
Compute the tablet_version_block byte for a BoundStatement.

Always returns an int in [0, 255]. When no cached tablet is known for the
routing key (unknown keyspace/table, vnode table, cold cache, or a
missing token map) a random block is returned; the server treats that as
a version miss and replies with fresh routing info. Callers invoke this
only for connections that negotiated TABLETS_ROUTING_V2.
"""
routing_key = query.routing_key
if routing_key is None:
return random_tablet_version_block()

keyspace = query.keyspace or self.keyspace
table = query.table
if not keyspace or not table:
return random_tablet_version_block()

# Skip the Murmur3 token hash + tablet lookup when we have no cached
# tablets for this table (vnode tables, or tablet tables on cold start);
# both correctly fall back to a random block below.
if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table):
return random_tablet_version_block()

token_map = self.cluster.metadata.token_map
if token_map is None:
return random_tablet_version_block()

t = query.routing_token(token_map.token_class)
tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t)
if tablet is None or tablet.tablet_version is None:
return random_tablet_version_block()

return choose_tablet_version_block(tablet.tablet_version)

def get_execution_profile(self, name):
"""
Returns the execution profile associated with the provided ``name``.
Expand Down Expand Up @@ -3768,7 +3806,6 @@ class PeersQueryType(object):
_schema_meta_page_size = 1000

_uses_peers_v2 = True
_tablets_routing_v1 = False

# for testing purposes
_time = time
Expand Down Expand Up @@ -3902,8 +3939,6 @@ def _try_connect(self, endpoint):
self._metadata_request_timeout = None if connection.features.sharding_info is None or not self._cluster.metadata_request_timeout \
else datetime.timedelta(seconds=self._cluster.metadata_request_timeout)

self._tablets_routing_v1 = connection.features.tablets_routing_v1

# use weak references in both directions
# _clear_watcher will be called when this ControlConnection is about to be finalized
# _watch_callback will get the actual callback from the Connection and relay it to
Expand Down Expand Up @@ -4717,6 +4752,7 @@ class ResponseFuture(object):
_host = None
_control_connection_query_attempted = False
_TABLET_ROUTING_CTYPE = None
_TABLET_ROUTING_V2_CTYPE = None

_warned_timeout = False

Expand Down Expand Up @@ -4934,6 +4970,36 @@ def _handle_control_connection_response(self, connection, cb, response):
connection.in_flight -= 1
cb(response)

def _prepare_message_for_connection(self, message, connection):
"""
Return the message to send on ``connection``, attaching the
tablet_version_block to ExecuteMessages according to the capability that
*this specific connection* negotiated.

Keying off the borrowed connection (already in hand at every call site)
is both necessary and sufficient: a connection that negotiated
TABLETS_ROUTING_V2 always gets the block -- even if the pool was created,
and any cached flag latched, before the cluster feature was enabled
(e.g. mid rolling-upgrade) -- while a non-V2 connection never gets one,
even if a sibling shard connection in the same pool already negotiated
V2. The server reads the trailing byte only on V2 connections, so
attaching it to a non-V2 connection would leave an unread trailing byte
and desync the frame; a pool-level flag cannot get this right.

ExecuteMessage is copied per send because ``self.message`` is shared
across speculative executions and retries that may run concurrently on
different threads; mutating tablet_version_block on the shared instance
would race with another in-flight send encoding the same object.
"""
if not isinstance(message, ExecuteMessage):
return message
message = copy(message)
if connection.features.tablets_routing_v2:
message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
else:
message.tablet_version_block = None
return message

def _query_control_connection(self, message=None, cb=None, connection=None, host=None):
self._control_connection_query_attempted = True

Expand Down Expand Up @@ -4961,6 +5027,9 @@ def _query_control_connection(self, message=None, cb=None, connection=None, host
cb = partial(self._set_result, host, connection, None)
cb = partial(self._handle_control_connection_response, connection, cb)

# The control connection may also be a V2 connection, so it needs the
# trailing tablet_version_block byte just like a pooled send.
message = self._prepare_message_for_connection(message, connection)
log.debug("No usable node pools; falling back to control connection for host %s", host)
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
encoder=self._protocol_handler.encode_message,
Expand Down Expand Up @@ -5006,7 +5075,12 @@ def _query(self, host, message=None, cb=None):
try:
# TODO get connectTimeout from cluster settings
if self.query:
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
# Pass the statement so the pool can reuse the ring token it
# memoized for this request instead of re-hashing the routing key.
connection, request_id = pool.borrow_connection(
timeout=2.0, routing_key=self.query.routing_key,
keyspace=self.query.keyspace, table=self.query.table,
query=self.query)
else:
connection, request_id = pool.borrow_connection(timeout=2.0)
self._connection = connection
Expand All @@ -5015,6 +5089,8 @@ def _query(self, host, message=None, cb=None):
if cb is None:
cb = partial(self._set_result, host, connection, pool)

message = self._prepare_message_for_connection(message, connection)

self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
encoder=self._protocol_handler.encode_message,
decoder=self._protocol_handler.decode_message,
Expand Down Expand Up @@ -5117,6 +5193,27 @@ def _reprepare(self, prepare_message, host, connection, pool):
# try to submit the original prepared statement on some other host
self.send_request()

def _cache_tablet_from_payload(self, payload_key, ctype):
"""
Parse a tablets-routing ``custom_payload`` entry and cache the Tablet.

``ctype`` is the tuple type for the negotiated extension. The V1 and V2
layouts differ only by a trailing ``tablet_version`` field, and
``Tablet.from_row`` accepts that as an optional final argument, so
unpacking the decoded tuple positionally serves both. The tablet is
cached under the effective keyspace (the statement's, else the
session's) so a prepared statement executed in a session keyspace lands
under the same key ``_compute_tablet_version_block`` looks it up by;
otherwise that lookup always misses.
"""
info = self._custom_payload.get(payload_key)
protocol = self.session.cluster.protocol_version
tablet = Tablet.from_row(*ctype.from_binary(info, protocol))
keyspace = self.query.keyspace or self.session.keyspace
table = self.query.table
if tablet and keyspace and table:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)

def _set_result(self, host, connection, pool, response):
try:
self.coordinator_host = host
Expand All @@ -5132,21 +5229,23 @@ def _set_result(self, host, connection, pool, response):
self._warnings = getattr(response, 'warnings', None)
self._custom_payload = getattr(response, 'custom_payload', None)

if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v1')
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
keyspace = self.query.keyspace
table = self.query.table
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
if self._custom_payload and connection is not None:
# Parse the routing payload according to what the connection that
# *served this request* negotiated, not the control connection:
# during a rolling upgrade connections may differ, and each
# payload key matches the extension its own connection negotiated.
if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload:
ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)')
ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype
self._cache_tablet_from_payload('tablets-routing-v2', ctype)
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
self._cache_tablet_from_payload('tablets-routing-v1', ctype)

if isinstance(response, ResultMessage):
if response.kind == RESULT_KIND_SET_KEYSPACE:
Expand Down Expand Up @@ -5560,7 +5659,7 @@ def add_callback(self, fn, *args, **kwargs):

Note: in the case that the result is not available when the callback is added,
the callback is executed by IO event thread. This means that the callback
should not block or attempt further synchronous requests, because no further
should not block or attempt further synchronous requests because no further
IO will be processed until the callback returns.

**Important**: if the callback you attach results in an exception being
Expand Down
70 changes: 69 additions & 1 deletion cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def _update_keyspace(self, keyspace_meta, new_user_types=None):
keyspace_meta.functions = old_keyspace_meta.functions
keyspace_meta.aggregates = old_keyspace_meta.aggregates
keyspace_meta.views = old_keyspace_meta.views
if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy):
if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy or
keyspace_meta.strongly_consistent != old_keyspace_meta.strongly_consistent):
self._keyspace_updated(ks_name)
else:
self._keyspace_added(ks_name)
Expand Down Expand Up @@ -801,6 +802,14 @@ class KeyspaceMetadata(object):
A string indicating whether a graph engine is enabled for this keyspace (Core/Classic).
"""

strongly_consistent = False
"""
A boolean indicating whether this keyspace uses strongly-consistent (Raft-based)
tablets. ``True`` only for ScyllaDB keyspaces whose ``consistency`` option is
not eventual (i.e. ``local`` or ``global``). ``False`` for eventually-consistent
keyspaces and for non-ScyllaDB clusters.
Comment on lines +805 to +810

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

Document strongly_consistent as global-only.

These lines still say local and global map to True, but _is_strongly_consistent() and the new tests only mark global as strongly consistent. Please tighten the docstring so the public metadata contract matches the implementation.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cassandra/metadata.py` around lines 805 - 810, Update the documentation for
the strongly_consistent metadata field so it matches the behavior in
_is_strongly_consistent() and the new tests. In the keyspace metadata docstring
near strongly_consistent, change the description to state that only ScyllaDB
keyspaces with consistency set to global are marked True, while local, eventual,
and non-ScyllaDB cases remain False. Keep the public contract aligned with the
implementation by tightening the wording around strongly_consistent.

Comment on lines +808 to +810
"""

_exc_info = None
""" set if metadata parsing failed """

Expand All @@ -815,6 +824,7 @@ def __init__(self, name, durable_writes, strategy_class, strategy_options, graph
self.aggregates = {}
self.views = {}
self.graph_engine = graph_engine
self.strongly_consistent = False

@property
def is_graph_enabled(self):
Expand Down Expand Up @@ -2577,6 +2587,11 @@ class SchemaParserV3(SchemaParserV22):
_SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates"
_SELECT_VIEWS = "SELECT * FROM system_schema.views"

# ScyllaDB-only: per-keyspace consistency option. The column is null for
# eventually-consistent keyspaces (and the whole table is absent on Cassandra
# and on Scylla versions without strongly-consistent tablets).
_SELECT_SCYLLA_KEYSPACES = "SELECT keyspace_name, consistency FROM system_schema.scylla_keyspaces"

def _is_not_scylla(self):
"""Check if NOT connected to ScyllaDB by checking for shard awareness."""
return getattr(getattr(self.connection, 'features', None), 'shard_id', None) is None
Expand Down Expand Up @@ -2610,12 +2625,65 @@ def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
self.indexes_result = []
self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list))
self.keyspace_view_rows = defaultdict(list)
self._scylla_consistency_cache = None

@staticmethod
def _is_strongly_consistent(consistency):
# The server stores the keyspace consistency option as:
# 'eventual' (equivalently: null), 'local', or 'global'.
# Because Scylla only supports global consistency for now,
# we associate strongly consistent tables with those that
# use 'global' consistency.
return consistency == "global"

def _get_scylla_keyspaces_consistency(self):
"""
Return a ``{keyspace_name: consistency}`` map read from
``system_schema.scylla_keyspaces``.

Only ScyllaDB has this table, and only for keyspaces with a non-default
consistency option. Returns ``{}`` on non-Scylla clusters or when the
table/column is unavailable (older Scylla), in which case every keyspace
is treated as eventually consistent. The result is cached per parser
instance so it is fetched at most once per schema refresh.

A transient failure to read the table (timeout, connection or server
error) propagates like any other schema-refresh query. That aborts the
refresh, so the previously known metadata -- including each keyspace's
consistency mode -- is left in place and retried, rather than every
keyspace being silently reset to eventual (which would disable leader
routing and evict the tablet cache). A missing table/column is not an
error: _query_build_rows absorbs it via expected_failures and yields an
empty map.
"""
if self._scylla_consistency_cache is not None:
return self._scylla_consistency_cache

if self._is_not_scylla():
self._scylla_consistency_cache = {}
return self._scylla_consistency_cache

rows = self._query_build_rows(self._SELECT_SCYLLA_KEYSPACES, lambda row: row)
self._scylla_consistency_cache = {row["keyspace_name"]: row.get("consistency") for row in rows}
return self._scylla_consistency_cache

def _set_strong_consistency(self, keyspace_meta):
consistency = self._get_scylla_keyspaces_consistency().get(keyspace_meta.name)
keyspace_meta.strongly_consistent = self._is_strongly_consistent(consistency)
return keyspace_meta

def get_keyspace(self, keyspaces, keyspace):
keyspace_meta = super(SchemaParserV3, self).get_keyspace(keyspaces, keyspace)
if keyspace_meta is not None:
self._set_strong_consistency(keyspace_meta)
return keyspace_meta

def get_all_keyspaces(self):
for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces():
for row in self.keyspace_view_rows[keyspace_meta.name]:
view_meta = self._build_view_metadata(row)
keyspace_meta._add_view_metadata(view_meta)
self._set_strong_consistency(keyspace_meta)
yield keyspace_meta

def get_table(self, keyspaces, keyspace, table):
Expand Down
Loading
Loading