From 8d9c675be7cc57b614f31f7a50ab712674612e50 Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 13:46:08 -0400 Subject: [PATCH 1/6] fix(clickhouse): support multi-gateway projects with catalog-aware engines ClickHouse's UNSUPPORTED catalog_support caused 2-level FQNs that broke sqlglot MappingSchema when mixed with 3-level FQNs from Trino or other catalog-aware gateways. Fix: auto-inject a virtual catalog (gateway name) for UNSUPPORTED adapters when catalog-aware peers exist, then strip it before any SQL reaches ClickHouse. Signed-off-by: mday-io --- sqlmesh/core/config/scheduler.py | 18 +++- sqlmesh/core/context.py | 6 ++ sqlmesh/core/engine_adapter/base.py | 23 ++++ sqlmesh/core/engine_adapter/clickhouse.py | 35 ++++++ tests/core/engine_adapter/test_clickhouse.py | 37 +++++++ tests/core/test_context.py | 108 +++++++++++++++++++ 6 files changed, 226 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 9d9d1d3c79..0567592b11 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -138,9 +138,25 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: default_catalogs_per_gateway: t.Dict[str, str] = {} + unsupported_gateways = [] + for gateway, adapter in context.engine_adapters.items(): - if catalog := adapter.default_catalog: + if adapter.catalog_support.is_unsupported: + unsupported_gateways.append((gateway, adapter)) + elif catalog := adapter.default_catalog: default_catalogs_per_gateway[gateway] = catalog + + # When catalog-aware gateways exist, assign the gateway name as a virtual catalog for + # catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the + # project have a uniform 3-level FQN and the MappingSchema nesting level check passes. + # Only adapters that explicitly return True from supports_virtual_catalog() are mutated; + # other UNSUPPORTED adapters are left unchanged to avoid silent breakage. + if default_catalogs_per_gateway and unsupported_gateways: + for gateway, adapter in unsupported_gateways: + if adapter.supports_virtual_catalog(): + adapter.inject_virtual_catalog(gateway) + default_catalogs_per_gateway[gateway] = gateway + return default_catalogs_per_gateway diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 9d5fe2ff88..5d539c158a 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -487,6 +487,12 @@ def engine_adapter(self) -> EngineAdapter: @property def snapshot_evaluator(self) -> SnapshotEvaluator: if not self._snapshot_evaluator: + # Ensure virtual catalog injection (via default_catalog_per_gateway) has run before + # cloning adapters with with_settings(). Adapters that support virtual catalogs (e.g. + # ClickHouse alongside catalog-aware gateways) mutate _default_catalog during + # get_default_catalog_per_gateway. with_settings() forwards _default_catalog to the + # clone, so the mutation must happen first or the clones will miss the virtual catalog. + self.default_catalog_per_gateway # noqa: B018 self._snapshot_evaluator = SnapshotEvaluator( { gateway: adapter.with_settings(execute_log_level=logging.INFO) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 5465ea1197..e2ec6e07a7 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -218,6 +218,29 @@ def comments_enabled(self) -> bool: def catalog_support(self) -> CatalogSupport: return CatalogSupport.UNSUPPORTED + def supports_virtual_catalog(self) -> bool: + """Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment. + + When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways + (e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays + consistent. Adapters that return True here opt in to receiving an injected virtual catalog + via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog + from DDL expressions rather than raising UnsupportedCatalogOperationError. + """ + return False + + def inject_virtual_catalog(self, catalog: str) -> None: + """Inject a virtual catalog name for multi-gateway nesting alignment. + + Only call this on adapters that return True from supports_virtual_catalog(). After + injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator + strips the virtual catalog from DDL expressions instead of raising an error. + """ + raise NotImplementedError( + f"{self.dialect} does not support virtual catalog injection. " + "Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()." + ) + @cached_property def schema_differ(self) -> SchemaDiffer: return SchemaDiffer( diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index c41681ade2..946579ab6d 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -8,6 +8,7 @@ from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport from sqlmesh.core.engine_adapter.shared import ( + CatalogSupport, DataObject, DataObjectType, EngineRunMode, @@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): DEFAULT_TABLE_ENGINE = "MergeTree" ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" + @property + def catalog_support(self) -> CatalogSupport: + # When a virtual catalog has been injected via inject_virtual_catalog() (to align + # nesting levels with catalog-aware gateways in the same project), treat ClickHouse as + # SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL + # expressions instead of raising UnsupportedCatalogOperationError. + if self._default_catalog: + return CatalogSupport.SINGLE_CATALOG_ONLY + return CatalogSupport.UNSUPPORTED + + def supports_virtual_catalog(self) -> bool: + return True + + def inject_virtual_catalog(self, catalog: str) -> None: + self._default_catalog = catalog + @property def engine_run_mode(self) -> EngineRunMode: if self._extra_config.get("cloud_mode"): @@ -172,10 +189,28 @@ def create_schema( Clickhouse has a two-level naming scheme [database].[table]. """ + from sqlmesh.utils.errors import SQLMeshError + properties_copy = properties.copy() if self.engine_run_mode.is_cluster: properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) + # ClickHouse does not support catalogs. When a virtual catalog has been injected + # (self._default_catalog is set), strip it from the schema name. This mirrors the + # SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here + # because this override is not wrapped by @set_catalog(). + if self._default_catalog: + schema_exp = to_schema(schema_name) + catalog_name = schema_exp.catalog + if catalog_name: + if catalog_name != self._default_catalog: + raise SQLMeshError( + f"clickhouse requires that all catalog operations be against a single catalog: " + f"{self._default_catalog}. Provided catalog: {catalog_name}" + ) + schema_exp.set("catalog", None) + schema_name = schema_exp + # can't call super() because it will try to set a catalog return self._create_schema( schema_name=schema_name, diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index b2ff0592d2..30f8d8c226 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1407,3 +1407,40 @@ def test_exchange_tables( 'RENAME TABLE "table2" TO "table1"', 'DROP TABLE IF EXISTS "__temp_table1_abcd"', ] + + +def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable): + """After inject_virtual_catalog(), create_schema() with the virtual catalog prefix must strip + the catalog and execute without raising, and with a wrong catalog must raise SQLMeshError.""" + from sqlmesh.utils.errors import SQLMeshError + + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + + assert adapter.supports_virtual_catalog() is True + adapter.inject_virtual_catalog("clickhouse_gw") + + # catalog_support must switch to SINGLE_CATALOG_ONLY after injection + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + assert adapter._default_catalog == "clickhouse_gw" + + # create_schema with the virtual catalog prefix must strip the catalog and not raise + adapter.create_schema("clickhouse_gw.mydb") + assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"'] + + # create_schema with a wrong catalog must raise SQLMeshError + with pytest.raises(SQLMeshError, match="clickhouse_gw"): + adapter.create_schema("wrong_catalog.mydb") + + +def test_supports_virtual_catalog_returns_true(): + """ClickhouseEngineAdapter.supports_virtual_catalog() must return True without any connection.""" + from unittest.mock import MagicMock + + adapter = ClickhouseEngineAdapter( + lambda *a, **k: MagicMock(), + dialect="clickhouse", + ) + assert adapter.supports_virtual_catalog() is True + assert adapter._default_catalog is None diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 49b7e56e55..b46e7f477c 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -399,6 +399,114 @@ def test_multiple_gateways(tmp_path: Path): assert context.dag._sorted == ['"db"."staging"."stg_model"', '"db"."main"."final_model"'] +def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): + """ClickHouse (catalog UNSUPPORTED) alongside DuckDB (catalog FULL_SUPPORT) must not raise a + nesting-level SchemaError when models are loaded. + + Expected behaviour after the fix: + - get_default_catalog_per_gateway assigns the gateway name as a virtual catalog for + catalog-unsupported gateways when catalog-aware gateways are present. + - ClickHouse models end up with a 3-level FQN so the MappingSchema nesting is uniform. + - The virtual catalog is stripped from DDL expressions (not raised as an error) because the + adapter's catalog_support flips to SINGLE_CATALOG_ONLY when _default_catalog is set. + """ + + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + # Build a real DuckDB adapter for the primary gateway. + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Build a minimal ClickHouse adapter stub — no real connection needed. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + # Simulate the context's engine_adapters dict and call the scheduler directly. + engine_adapters = { + "duckdb_gw": duck_adapter, + "clickhouse_gw": ch_adapter, + } + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = engine_adapters + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # DuckDB gateway must have a real catalog entry. + assert "duckdb_gw" in catalog_per_gw + # DuckDB's default catalog is the database filename without extension. + assert catalog_per_gw["duckdb_gw"] == "db" + # ClickHouse gateway must now also have a virtual catalog equal to its gateway name. + assert "clickhouse_gw" in catalog_per_gw + assert catalog_per_gw["clickhouse_gw"] == "clickhouse_gw" + + # The ClickHouse adapter's _default_catalog must be mutated to the virtual catalog name. + assert ch_adapter._default_catalog == "clickhouse_gw" + + # The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED), + # so that the set_catalog decorator strips the virtual catalog instead of raising. + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + # Loading models for both gateways must not raise a SchemaError. + duckdb_model = load_sql_based_model( + parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"), + default_catalog="db", + ) + ch_model = load_sql_based_model( + parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), + default_catalog="clickhouse_gw", + ) + + # Both models must have 3-level FQNs so MappingSchema nesting is uniform. + assert duckdb_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}" + ) + assert ch_model.fqn.count(".") == 2, f"Expected 3-level FQN for ch model, got: {ch_model.fqn}" + + # Both models loaded into the same MappingSchema must not raise a nesting SchemaError. + from sqlglot.schema import MappingSchema + + schema = MappingSchema(normalize=False) + schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {}) + schema.add_table(ch_model.fqn, ch_model.columns_to_types or {}) + + +def test_single_gateway_clickhouse_no_virtual_catalog(mocker): + """When ClickHouse is the only gateway (no catalog-aware peer), it must NOT receive a virtual + catalog. Models remain 2-level and catalog_support stays UNSUPPORTED.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # With only a catalog-unsupported gateway there must be no entry at all. + assert "clickhouse_gw" not in catalog_per_gw + + # The adapter must remain unchanged — no virtual catalog injected. + assert ch_adapter._default_catalog is None + assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 72de11235ae125eda1897420c0e9f653cd797989 Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 15:30:10 -0400 Subject: [PATCH 2/6] Clarify virtual catalog Signed-off-by: mday-io --- docs/integrations/engines/clickhouse.md | 42 ++++++++++++++++++- sqlmesh/core/config/connection.py | 7 +++- sqlmesh/core/config/scheduler.py | 6 ++- sqlmesh/core/engine_adapter/clickhouse.py | 5 ++- tests/core/engine_adapter/test_clickhouse.py | 36 ++++++++++++++-- tests/core/test_context.py | 44 +++++++++++++++++--- 6 files changed, 127 insertions(+), 13 deletions(-) diff --git a/docs/integrations/engines/clickhouse.md b/docs/integrations/engines/clickhouse.md index 14e931b046..6074e75e48 100644 --- a/docs/integrations/engines/clickhouse.md +++ b/docs/integrations/engines/clickhouse.md @@ -420,6 +420,45 @@ If a model has many records in each partition, you may see additional performanc Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer. +## Multi-gateway setup + +ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`). + +When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time. + +### How the virtual catalog works + +- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present. +- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`. +- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution. +- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level. + +### Adding a second gateway to an existing ClickHouse-only project + +If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway for the first time causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`), triggering a full re-materialization on the next `sqlmesh apply`. This is a one-time cost. + +### Virtual catalog naming + +By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object. + +You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration: + +```yaml +gateways: + clickhouse: + connection: + type: clickhouse + host: my-clickhouse-host + username: default + virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__) + trino: + connection: + type: trino + ... +``` + +With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`. + ## Local/Built-in Scheduler **Engine Adapter Type**: `clickhouse` @@ -446,4 +485,5 @@ If a model has many records in each partition, you may see additional performanc | `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N | | `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N | | `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N | -| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | \ No newline at end of file +| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | +| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N | \ No newline at end of file diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 343414eab2..ff53ef8dab 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2085,6 +2085,7 @@ class ClickhouseConnectionConfig(ConnectionConfig): password: t.Optional[str] = None port: t.Optional[int] = None cluster: t.Optional[str] = None + virtual_catalog: t.Optional[str] = None connect_timeout: int = 10 send_receive_timeout: int = 300 query_limit: int = 0 @@ -2180,7 +2181,11 @@ def cloud_mode(self) -> bool: @property def _extra_engine_config(self) -> t.Dict[str, t.Any]: - return {"cluster": self.cluster, "cloud_mode": self.cloud_mode} + return { + "cluster": self.cluster, + "cloud_mode": self.cloud_mode, + "virtual_catalog": self.virtual_catalog, + } @property def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 0567592b11..4cce9b0f76 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -155,7 +155,11 @@ def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str for gateway, adapter in unsupported_gateways: if adapter.supports_virtual_catalog(): adapter.inject_virtual_catalog(gateway) - default_catalogs_per_gateway[gateway] = gateway + # Read the actual virtual catalog name back from the adapter — it may differ + # from the gateway name if the user configured a custom virtual_catalog value. + # inject_virtual_catalog() always sets _default_catalog so default_catalog + # cannot return None at this point. + default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment] return default_catalogs_per_gateway diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 946579ab6d..284f3dd485 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -56,8 +56,9 @@ def catalog_support(self) -> CatalogSupport: def supports_virtual_catalog(self) -> bool: return True - def inject_virtual_catalog(self, catalog: str) -> None: - self._default_catalog = catalog + def inject_virtual_catalog(self, gateway: str) -> None: + configured = self._extra_config.get("virtual_catalog") + self._default_catalog = f"__{gateway}__" if configured is None else configured @property def engine_run_mode(self) -> EngineRunMode: diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index 30f8d8c226..3075e6ecea 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1423,14 +1423,15 @@ def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable): from sqlmesh.core.engine_adapter.shared import CatalogSupport assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY - assert adapter._default_catalog == "clickhouse_gw" + # The default synthetic virtual catalog wraps the gateway name in double underscores. + assert adapter._default_catalog == "__clickhouse_gw__" # create_schema with the virtual catalog prefix must strip the catalog and not raise - adapter.create_schema("clickhouse_gw.mydb") + adapter.create_schema("__clickhouse_gw__.mydb") assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"'] # create_schema with a wrong catalog must raise SQLMeshError - with pytest.raises(SQLMeshError, match="clickhouse_gw"): + with pytest.raises(SQLMeshError, match="__clickhouse_gw__"): adapter.create_schema("wrong_catalog.mydb") @@ -1444,3 +1445,32 @@ def test_supports_virtual_catalog_returns_true(): ) assert adapter.supports_virtual_catalog() is True assert adapter._default_catalog is None + + +def test_inject_virtual_catalog_uses_custom_config(make_mocked_engine_adapter: t.Callable): + """When virtual_catalog is set in _extra_config, inject_virtual_catalog uses that value + instead of the synthetic __gateway_name__ default.""" + adapter = make_mocked_engine_adapter( + ClickhouseEngineAdapter, + virtual_catalog="my_custom_catalog", + ) + + adapter.inject_virtual_catalog("clickhouse_gw") + + # The user-configured value must take precedence over the synthetic default. + assert adapter._default_catalog == "my_custom_catalog" + + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + +def test_clickhouse_connection_config_virtual_catalog_extra_engine_config(): + """virtual_catalog set on ClickhouseConnectionConfig must appear in _extra_engine_config + so that the value reaches the adapter's _extra_config dict.""" + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + + config = ClickhouseConnectionConfig( + host="localhost", username="user", virtual_catalog="my_catalog" + ) + assert config._extra_engine_config.get("virtual_catalog") == "my_catalog" diff --git a/tests/core/test_context.py b/tests/core/test_context.py index b46e7f477c..19089ee8af 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -446,12 +446,12 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): assert "duckdb_gw" in catalog_per_gw # DuckDB's default catalog is the database filename without extension. assert catalog_per_gw["duckdb_gw"] == "db" - # ClickHouse gateway must now also have a virtual catalog equal to its gateway name. + # ClickHouse gateway must now also have a virtual catalog wrapped in double underscores. assert "clickhouse_gw" in catalog_per_gw - assert catalog_per_gw["clickhouse_gw"] == "clickhouse_gw" + assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__" - # The ClickHouse adapter's _default_catalog must be mutated to the virtual catalog name. - assert ch_adapter._default_catalog == "clickhouse_gw" + # The ClickHouse adapter's _default_catalog must be mutated to the synthetic virtual catalog. + assert ch_adapter._default_catalog == "__clickhouse_gw__" # The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED), # so that the set_catalog decorator strips the virtual catalog instead of raising. @@ -464,7 +464,7 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): ) ch_model = load_sql_based_model( parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), - default_catalog="clickhouse_gw", + default_catalog="__clickhouse_gw__", ) # Both models must have 3-level FQNs so MappingSchema nesting is uniform. @@ -507,6 +507,40 @@ def test_single_gateway_clickhouse_no_virtual_catalog(mocker): assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED +def test_multi_gateway_clickhouse_custom_virtual_catalog(tmp_path: Path, mocker): + """When virtual_catalog is configured on the ClickHouse connection, that value is used as the + virtual catalog instead of the synthetic __gateway_name__ default.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Pass virtual_catalog via _extra_config (the same path used by ClickhouseConnectionConfig). + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + virtual_catalog="my_custom_catalog", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"duckdb_gw": duck_adapter, "clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # The configured virtual_catalog value must be used, not __clickhouse_gw__. + assert catalog_per_gw["clickhouse_gw"] == "my_custom_catalog" + assert ch_adapter._default_catalog == "my_custom_catalog" + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 33facd86ff996a126b2d695c4d5d56e9903d9286 Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 15:53:29 -0400 Subject: [PATCH 3/6] harden virtual catalog injection and config validation Signed-off-by: mday-io --- sqlmesh/core/config/connection.py | 9 +++++++++ sqlmesh/core/context.py | 16 ++++++++++------ sqlmesh/core/engine_adapter/clickhouse.py | 7 +++---- tests/core/engine_adapter/test_clickhouse.py | 16 ++++++++++++++++ tests/core/test_context.py | 17 +++++++++++++++++ 5 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index ff53ef8dab..ca68f89b36 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2120,6 +2120,15 @@ class ClickhouseConnectionConfig(ConnectionConfig): _engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse") + @field_validator("virtual_catalog") + def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]: + if v is not None and not v.strip(): + raise ConfigError( + "virtual_catalog cannot be an empty string. " + "Omit the field to use the default synthetic prefix (____)." + ) + return v + @property def _connection_kwargs_keys(self) -> t.Set[str]: kwargs = { diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 5d539c158a..19271debc1 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -487,12 +487,7 @@ def engine_adapter(self) -> EngineAdapter: @property def snapshot_evaluator(self) -> SnapshotEvaluator: if not self._snapshot_evaluator: - # Ensure virtual catalog injection (via default_catalog_per_gateway) has run before - # cloning adapters with with_settings(). Adapters that support virtual catalogs (e.g. - # ClickHouse alongside catalog-aware gateways) mutate _default_catalog during - # get_default_catalog_per_gateway. with_settings() forwards _default_catalog to the - # clone, so the mutation must happen first or the clones will miss the virtual catalog. - self.default_catalog_per_gateway # noqa: B018 + self._ensure_virtual_catalog_injection() self._snapshot_evaluator = SnapshotEvaluator( { gateway: adapter.with_settings(execute_log_level=logging.INFO) @@ -503,6 +498,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator: ) return self._snapshot_evaluator + def _ensure_virtual_catalog_injection(self) -> None: + """Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator. + + Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires + earlier (default_catalog is accessed during model loading), but this guard covers the edge + case where snapshot_evaluator is accessed directly on a fresh context before any model ops. + """ + _ = self.default_catalog_per_gateway + def execution_context( self, deployability_index: t.Optional[DeployabilityIndex] = None, diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 284f3dd485..8408ed73e8 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -45,10 +45,9 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): @property def catalog_support(self) -> CatalogSupport: - # When a virtual catalog has been injected via inject_virtual_catalog() (to align - # nesting levels with catalog-aware gateways in the same project), treat ClickHouse as - # SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL - # expressions instead of raising UnsupportedCatalogOperationError. + # This property is intentionally dynamic: it transitions from UNSUPPORTED to + # SINGLE_CATALOG_ONLY after inject_virtual_catalog() sets _default_catalog. Callers must + # not cache the result — always read it live so they see the post-injection state. if self._default_catalog: return CatalogSupport.SINGLE_CATALOG_ONLY return CatalogSupport.UNSUPPORTED diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index 3075e6ecea..b72aa422a4 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1474,3 +1474,19 @@ def test_clickhouse_connection_config_virtual_catalog_extra_engine_config(): host="localhost", username="user", virtual_catalog="my_catalog" ) assert config._extra_engine_config.get("virtual_catalog") == "my_catalog" + + +def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected(): + """virtual_catalog: "" is a footgun — the empty string propagates to _default_catalog, + which is falsy, so catalog_support stays UNSUPPORTED and the nesting error persists. + Reject it at config parse time with a clear message.""" + import pytest + + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + from sqlmesh.utils.errors import ConfigError + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog="") + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ") diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 19089ee8af..761207a0d3 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -541,6 +541,23 @@ def test_multi_gateway_clickhouse_custom_virtual_catalog(tmp_path: Path, mocker) assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY +def test_snapshot_evaluator_calls_ensure_virtual_catalog_injection(mocker): + """snapshot_evaluator must call _ensure_virtual_catalog_injection before cloning adapters. + + This guards the edge case where snapshot_evaluator is the first property accessed on a fresh + context — before default_catalog fires during model loading — and ensures virtual catalog + injection still happens even in that order. + """ + ctx = Context(config=Config()) + ctx._snapshot_evaluator = None # force re-initialization + + inject_spy = mocker.patch.object(ctx, "_ensure_virtual_catalog_injection") + + _ = ctx.snapshot_evaluator + + inject_spy.assert_called_once() + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From a1f622676c0c4b3d9ba2628b1ec50f52a0cc2a0c Mon Sep 17 00:00:00 2001 From: mday-io Date: Fri, 19 Jun 2026 08:49:57 -0400 Subject: [PATCH 4/6] fix(clickhouse): complete virtual-catalog stripping across all overrides MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address reviewer feedback on PR #5826: - Add _strip_virtual_catalog helper to ClickhouseEngineAdapter; apply to delete_from, insert_overwrite_by_partition, and alter_table overrides that previously bypassed both the @set_catalog decorator and the create_schema manual strip — preventing __gateway__ prefix leaking into DELETE/INSERT OVERWRITE/ALTER TABLE SQL sent to ClickHouse - Deduplicate catalog-stripping logic in create_schema via the helper - Rename inject_virtual_catalog param catalog→gateway in base class and align docstring (adapter decides the final catalog name) - Reject virtual_catalog values containing '.' to prevent 4-level FQN - Add unit tests verifying SQL output is free of the virtual catalog prefix for each newly covered method - Add integration test exercising get_default_catalog_per_gateway + FQN uniformity + create_schema stripping end-to-end - Fix clickhouse.md trailing newline Signed-off-by: Michael Day Signed-off-by: mday-io --- docs/integrations/engines/clickhouse.md | 2 +- sqlmesh/core/config/connection.py | 4 + sqlmesh/core/engine_adapter/base.py | 14 ++- sqlmesh/core/engine_adapter/clickhouse.py | 23 +++- tests/core/engine_adapter/test_clickhouse.py | 85 +++++++++++++ tests/core/test_context.py | 121 ++++++++++++++++++- 6 files changed, 238 insertions(+), 11 deletions(-) diff --git a/docs/integrations/engines/clickhouse.md b/docs/integrations/engines/clickhouse.md index 6074e75e48..f699489357 100644 --- a/docs/integrations/engines/clickhouse.md +++ b/docs/integrations/engines/clickhouse.md @@ -486,4 +486,4 @@ With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytab | `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N | | `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N | | `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | -| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N | \ No newline at end of file +| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N | diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index ca68f89b36..ac31e31910 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2127,6 +2127,10 @@ def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]: "virtual_catalog cannot be an empty string. " "Omit the field to use the default synthetic prefix (____)." ) + if v is not None and "." in v: + raise ConfigError( + f"virtual_catalog must be a single identifier with no dots (got: {v!r})" + ) return v @property diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index e2ec6e07a7..b0cf3ff84c 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -229,12 +229,14 @@ def supports_virtual_catalog(self) -> bool: """ return False - def inject_virtual_catalog(self, catalog: str) -> None: - """Inject a virtual catalog name for multi-gateway nesting alignment. - - Only call this on adapters that return True from supports_virtual_catalog(). After - injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator - strips the virtual catalog from DDL expressions instead of raising an error. + def inject_virtual_catalog(self, gateway: str) -> None: + """Inject a gateway name to configure the adapter's virtual catalog. + + The adapter determines the final catalog name from the gateway name (e.g. ClickHouse + wraps it as __{gateway}__). Only call this on adapters that return True from + supports_virtual_catalog(). After injection, catalog_support should return + SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL + expressions instead of raising an error. """ raise NotImplementedError( f"{self.dialect} does not support virtual catalog injection. " diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 8408ed73e8..f5ef705b08 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -208,8 +208,7 @@ def create_schema( f"clickhouse requires that all catalog operations be against a single catalog: " f"{self._default_catalog}. Provided catalog: {catalog_name}" ) - schema_exp.set("catalog", None) - schema_name = schema_exp + schema_name = self._strip_virtual_catalog(schema_exp) # can't call super() because it will try to set a catalog return self._create_schema( @@ -481,6 +480,7 @@ def insert_overwrite_by_partition( target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, source_columns: t.Optional[t.List[str]] = None, ) -> None: + table_name = self._strip_virtual_catalog(table_name) source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( query_or_df, target_columns_to_types, @@ -595,6 +595,20 @@ def _create_table( target_columns_to_types or self.columns(table_name), ) + def _strip_virtual_catalog(self, name: "TableName") -> exp.Table: + """Strip the virtual catalog prefix from a table name if present. + + When a virtual catalog has been injected, ClickHouse table names carry a + synthetic catalog prefix (e.g. ``__gw__``) so they match the 3-level FQN + depth of catalog-aware peers. This helper removes that prefix before any + SQL is sent to the wire, since ClickHouse only supports a two-level + ``[database].[table]`` naming scheme. + """ + table = exp.to_table(name) + if self._default_catalog and table.catalog == self._default_catalog: + table.set("catalog", None) + return table + def _exchange_tables( self, old_table_name: TableName, @@ -633,7 +647,7 @@ def _rename_table( self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: - delete_expr = exp.delete(table_name, where) + delete_expr = exp.delete(self._strip_virtual_catalog(table_name), where) if self.engine_run_mode.is_cluster: delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) self.execute(delete_expr) @@ -649,6 +663,9 @@ def alter_table( for alter_expression in [ x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions ]: + if self._default_catalog and isinstance(alter_expression.this, exp.Table): + if alter_expression.this.catalog == self._default_catalog: + alter_expression.this.set("catalog", None) if self.engine_run_mode.is_cluster: alter_expression.set( "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index b72aa422a4..110f937ccb 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1490,3 +1490,88 @@ def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected(): with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ") + + +def test_virtual_catalog_stripped_in_delete_from(make_mocked_engine_adapter: t.Callable): + """delete_from() must strip the virtual catalog prefix before building the DELETE expression.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + adapter.delete_from("__ch_gw__.mydb.my_table", "a = 1") + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 1 + assert "__ch_gw__" not in sql_calls[0] + assert "mydb" in sql_calls[0] + assert "my_table" in sql_calls[0] + assert "DELETE FROM" in sql_calls[0] + + +def test_virtual_catalog_stripped_in_insert_overwrite_by_partition( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + """insert_overwrite_by_partition() must strip the virtual catalog prefix before any SQL is sent.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + # Patch _insert_overwrite_by_condition so we can inspect how table_name was passed. + overwrite_mock = mocker.patch.object(adapter, "_insert_overwrite_by_condition") + # Also patch _get_source_queries_and_columns_to_types to avoid needing a real query. + source_query_mock = mocker.patch.object( + adapter, + "_get_source_queries_and_columns_to_types", + return_value=([], {}), + ) + + adapter.insert_overwrite_by_partition( + "__ch_gw__.mydb.my_table", + parse_one("SELECT 1 AS col"), + partitioned_by=[exp.column("ds")], + ) + + # The table_name passed to _insert_overwrite_by_condition must not contain the virtual catalog. + assert overwrite_mock.called + table_name_arg = overwrite_mock.call_args[0][0] + table_name_sql = ( + table_name_arg.sql("clickhouse") + if isinstance(table_name_arg, exp.Expression) + else str(table_name_arg) + ) + assert "__ch_gw__" not in table_name_sql + + # The target_table passed to _get_source_queries_and_columns_to_types must also be stripped. + assert source_query_mock.called + target_table_kwarg = source_query_mock.call_args[1].get( + "target_table", + source_query_mock.call_args[0][2] if len(source_query_mock.call_args[0]) > 2 else None, + ) + if target_table_kwarg is not None: + target_sql = ( + target_table_kwarg.sql("clickhouse") + if isinstance(target_table_kwarg, exp.Expression) + else str(target_table_kwarg) + ) + assert "__ch_gw__" not in target_sql + + +def test_virtual_catalog_stripped_in_alter_table(make_mocked_engine_adapter: t.Callable): + """alter_table() must strip the virtual catalog prefix from each ALTER TABLE statement.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + alter_expr = exp.Alter( + this=exp.to_table("__ch_gw__.mydb.my_table"), + kind="TABLE", + actions=[exp.Drop(this=exp.to_column("col_a"), kind="COLUMN")], + ) + adapter.alter_table([alter_expr]) + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 1 + assert "__ch_gw__" not in sql_calls[0] + assert "mydb" in sql_calls[0] + assert "my_table" in sql_calls[0] + assert "ALTER TABLE" in sql_calls[0] diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 761207a0d3..9508f173d9 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -468,10 +468,13 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): ) # Both models must have 3-level FQNs so MappingSchema nesting is uniform. + # count(".") == 2 means 3 parts (catalog.db.table), i.e. a 3-level FQN. assert duckdb_model.fqn.count(".") == 2, ( f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}" ) - assert ch_model.fqn.count(".") == 2, f"Expected 3-level FQN for ch model, got: {ch_model.fqn}" + assert ch_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for ch model, got: {ch_model.fqn}" + ) # 3 parts = 2 dots # Both models loaded into the same MappingSchema must not raise a nesting SchemaError. from sqlglot.schema import MappingSchema @@ -558,6 +561,122 @@ def test_snapshot_evaluator_calls_ensure_virtual_catalog_injection(mocker): inject_spy.assert_called_once() +@pytest.mark.fast +def test_multi_gateway_virtual_catalog_create_schema_strips_prefix(tmp_path: Path, mocker): + """Integration test: create_schema with a 3-level virtual-catalog FQN must strip the synthetic + catalog prefix before sending DDL to ClickHouse. + + Flow exercised: + 1. get_default_catalog_per_gateway detects a catalog-aware gateway (DuckDB) alongside + a catalog-unsupported gateway (ClickHouse) and calls inject_virtual_catalog(). + 2. The ClickHouse adapter's _default_catalog is set to ``__clickhouse_gw__``. + 3. A ClickHouse model loaded with that virtual catalog gets a 3-level FQN. + 4. When create_schema is called with the 3-level schema name the virtual catalog prefix + is stripped, so the SQL that reaches the wire uses only a 2-level name. + 5. The DuckDB adapter's _default_catalog is NOT set to a synthetic value. + """ + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # ClickHouse adapter with a mocked connection — no real server needed. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = { + "duckdb_gw": duck_adapter, + "clickhouse_gw": ch_adapter, + } + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # --- Phase 1: virtual catalog injection assertions --- + + # DuckDB gateway must carry a real catalog entry. + assert "duckdb_gw" in catalog_per_gw + + # ClickHouse gateway must receive the synthetic ``__clickhouse_gw__`` virtual catalog. + assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__" + + # The ClickHouse adapter's _default_catalog must be mutated. + assert ch_adapter._default_catalog == "__clickhouse_gw__" + + # catalog_support must flip to SINGLE_CATALOG_ONLY so the set_catalog decorator strips + # the virtual catalog instead of raising when DDL is executed. + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + # DuckDB adapter must be untouched — it already has real catalog support. + assert duck_adapter._default_catalog != "__duckdb_gw__" + + # --- Phase 2: FQN uniformity --- + + ch_model = load_sql_based_model( + parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), + default_catalog="__clickhouse_gw__", + ) + duckdb_model = load_sql_based_model( + parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"), + default_catalog=catalog_per_gw["duckdb_gw"], + ) + + # Both models must have 3-level FQNs (catalog.db.table → 2 dots) so MappingSchema nesting + # is uniform and does not raise a SchemaError. + assert ch_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for ClickHouse model, got: {ch_model.fqn}" + ) + assert duckdb_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for DuckDB model, got: {duckdb_model.fqn}" + ) + + from sqlglot.schema import MappingSchema + + schema = MappingSchema(normalize=False) + schema.add_table(ch_model.fqn, ch_model.columns_to_types or {}) + schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {}) + + # --- Phase 3: create_schema strips the virtual catalog prefix --- + + # Spy on _create_schema to inspect what schema name is passed after stripping. + create_schema_calls: t.List[str] = [] + + def _capture_create_schema( + schema_name, + ignore_if_exists, + warn_on_error, + properties, + kind, + ): + create_schema_calls.append( + schema_name.sql(dialect="clickhouse") + if hasattr(schema_name, "sql") + else str(schema_name) + ) + + mocker.patch.object(ch_adapter, "_create_schema", side_effect=_capture_create_schema) + + # Call create_schema with the 3-level virtual-catalog-prefixed schema name. + ch_adapter.create_schema("__clickhouse_gw__.mydb") + + assert len(create_schema_calls) == 1, "Expected exactly one _create_schema call" + passed_schema = create_schema_calls[0] + # The virtual catalog prefix must NOT appear in the SQL sent to the wire. + assert "__clickhouse_gw__" not in passed_schema, ( + f"Virtual catalog prefix should be stripped before reaching _create_schema, got: {passed_schema!r}" + ) + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 78da02376e3138fe9f60a00a48f5ebdeb5207a23 Mon Sep 17 00:00:00 2001 From: mday-io Date: Fri, 19 Jun 2026 08:53:08 -0400 Subject: [PATCH 5/6] docs(clickhouse): strengthen re-materialization warning for multi-gateway migration Expand the "Adding a second gateway" section to make the cost explicit: - Callout box flagging full re-materialization is required - Distinguish FULL (recreate once) vs INCREMENTAL_BY_TIME_RANGE (full historical backfill) impact - Note that old 2-level names appear as Removed and --forward-only does not help (models are treated as new, not modified) Signed-off-by: Michael Day Signed-off-by: mday-io --- docs/integrations/engines/clickhouse.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/integrations/engines/clickhouse.md b/docs/integrations/engines/clickhouse.md index f699489357..4c2aab6e78 100644 --- a/docs/integrations/engines/clickhouse.md +++ b/docs/integrations/engines/clickhouse.md @@ -435,7 +435,16 @@ When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as ### Adding a second gateway to an existing ClickHouse-only project -If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway for the first time causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`), triggering a full re-materialization on the next `sqlmesh apply`. This is a one-time cost. +!!! warning "Re-materialization required" + Adding a catalog-aware gateway (such as Trino or BigQuery) to a project that previously used ClickHouse as the only gateway triggers a **full re-materialization of every ClickHouse model** on the next `sqlmesh apply`. Plan for this before making the change. + +If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`): + +- `FULL` models are recreated once — cost is proportional to the size of each table. +- `INCREMENTAL_BY_TIME_RANGE` models require a **full historical backfill** from the model's configured start date. +- The old 2-level model names appear as **Removed** in the plan and will be cleaned up after the environment TTL expires. + +This is a one-time cost at the transition point and does not recur. There is no way to skip it — `--forward-only` does not apply because SQLMesh treats the 3-level names as new models, not modified ones. ### Virtual catalog naming From 2d23216801a7fc3a682aa369ab31de433eb17d0c Mon Sep 17 00:00:00 2001 From: mday-io Date: Fri, 19 Jun 2026 09:16:15 -0400 Subject: [PATCH 6/6] fix(clickhouse): warn at plan time when virtual catalog triggers re-materialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a catalog-aware gateway is added to an existing ClickHouse-only project, ClickHouse model FQNs change from 2-level to 3-level, causing SQLMesh to treat them as brand-new snapshots. This triggers full re-materialization — including historical backfills for incremental models — with no user-visible signal before apply. Add _warn_if_virtual_catalog_rematerialization() called from plan() between build() and console.plan(). It detects when new 3-level CH snapshots map to existing 2-level names in the current environment and emits a console warning listing the affected models with a plain-language explanation of the cost (FULL = recreate once, IBTR = full backfill). Signed-off-by: Michael Day Signed-off-by: mday-io --- sqlmesh/core/context.py | 57 ++++++++++++++++ tests/core/test_context.py | 131 +++++++++++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 19271debc1..4a0f871c01 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1445,6 +1445,8 @@ def plan( plan = plan_builder.build() + self._warn_if_virtual_catalog_rematerialization(plan) + if no_auto_categorization or plan.uncategorized: # Prompts are required if the auto categorization is disabled # or if there are any uncategorized snapshots in the plan @@ -2743,6 +2745,61 @@ def _run_plan_tests(self, skip_tests: bool = False) -> t.Optional[ModelTextTestR return result return None + def _warn_if_virtual_catalog_rematerialization(self, plan: "Plan") -> None: + """Warn when ClickHouse models appear as new snapshots solely because a virtual catalog + prefix was added to their FQNs after a catalog-aware gateway joined the project. + + This situation causes every previously-applied ClickHouse model to be treated as brand-new + by SQLMesh, triggering full re-materialization and historical backfills. Emitting a warning + before the plan is displayed gives users a chance to understand the cost before applying. + """ + from sqlglot import exp + + # Collect the set of old 2-level snapshot names from the current environment so we can + # detect which new 3-level names are renames rather than genuinely new models. + old_names: t.Set[str] = set() + for s_id in plan.context_diff.removed_snapshots: + old_names.add(s_id.name) + for name in plan.context_diff.snapshots_by_name: + old_names.add(name) + + affected: t.List[t.Tuple[str, str]] = [] # (new_3level_name, old_2level_name) + + for gateway, adapter in self.engine_adapters.items(): + if not adapter.supports_virtual_catalog() or not adapter._default_catalog: + continue + virtual_catalog = adapter._default_catalog + + for snapshot in plan.new_snapshots: + table = exp.to_table(snapshot.name) + if table.catalog != virtual_catalog: + continue + # Reconstruct the 2-level name that would have been used before injection. + old_name = f"{table.db}.{table.name}" + if old_name in old_names: + affected.append((snapshot.name, old_name)) + + if not affected: + return + + max_display = 10 + model_lines = "\n".join( + f" - {new_name} (was: {old_name})" for new_name, old_name in affected[:max_display] + ) + if len(affected) > max_display: + model_lines += f"\n ... and {len(affected) - max_display} more" + + self.console.log_warning( + "ClickHouse models are being re-materialized due to virtual catalog FQN change.\n\n" + "The following ClickHouse models appear as new because their fully-qualified\n" + "names changed from 2-level (db.table) to 3-level (__gateway__.db.table):\n\n" + f"{model_lines}\n\n" + "FULL models will be recreated once. INCREMENTAL_BY_TIME_RANGE models will\n" + "require a full historical backfill from their configured start date.\n\n" + "This is a one-time cost when first adding a catalog-aware gateway to an\n" + "existing ClickHouse project. To proceed, run `sqlmesh apply`." + ) + @property def _model_tables(self) -> t.Dict[str, str]: """Mapping of model name to physical table name. diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 9508f173d9..365d31d3fd 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -677,6 +677,137 @@ def _capture_create_schema( ) +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_emits_warning(mocker): + """_warn_if_virtual_catalog_rematerialization must emit a log_warning when new snapshots have + 3-level FQNs that map to existing 2-level FQNs in the current environment, indicating that the + virtual catalog prefix was added to previously-applied ClickHouse models.""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.snapshot.definition import SnapshotId + + # Build a minimal Context with no models. + ctx = Context(config=Config()) + + # Create a ClickHouse adapter with a virtual catalog already injected. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + ch_adapter._default_catalog = "__ch_gw__" + + # Override engine_adapters so the context sees our prepared adapter. + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + # Build a mock snapshot with a 3-level name that has the virtual catalog prefix. + new_snapshot = MagicMock() + new_snapshot.name = "__ch_gw__.mydb.my_table" + + # The old 2-level name must appear in snapshots_by_name so we detect the rename. + old_snapshot_id = SnapshotId(name="mydb.my_table", identifier="abc123") + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + context_diff.snapshots_by_name = {"mydb.my_table": MagicMock()} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_called_once() + warning_text = warning_mock.call_args[0][0] + assert "__ch_gw__" in warning_text + assert "mydb.my_table" in warning_text + + +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_no_warning_when_genuinely_new(mocker): + """_warn_if_virtual_catalog_rematerialization must NOT warn when there is no matching old + 2-level name — i.e. the model is a brand-new model, not a renamed existing one.""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + + ctx = Context(config=Config()) + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + ch_adapter._default_catalog = "__ch_gw__" + + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + new_snapshot = MagicMock() + new_snapshot.name = "__ch_gw__.mydb.brand_new_table" + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + # No matching old name. + context_diff.snapshots_by_name = {} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_not_called() + + +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_no_warning_without_virtual_catalog(mocker): + """_warn_if_virtual_catalog_rematerialization must NOT warn when the ClickHouse adapter has no + virtual catalog injected (i.e. _default_catalog is None).""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + + ctx = Context(config=Config()) + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + # No virtual catalog injected — adapter stays at 2-level mode. + assert ch_adapter._default_catalog is None + + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + new_snapshot = MagicMock() + new_snapshot.name = "mydb.my_table" + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + context_diff.snapshots_by_name = {} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_not_called() + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model(