Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion docs/integrations/engines/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,54 @@ If a model has many records in each partition, you may see additional performanc

Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer.

## Multi-gateway setup

ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`).

When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time.

### How the virtual catalog works

- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present.
- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`.
- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution.
- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level.

### Adding a second gateway to an existing ClickHouse-only project

!!! warning "Re-materialization required"
Adding a catalog-aware gateway (such as Trino or BigQuery) to a project that previously used ClickHouse as the only gateway triggers a **full re-materialization of every ClickHouse model** on the next `sqlmesh apply`. Plan for this before making the change.

If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`):

- `FULL` models are recreated once — cost is proportional to the size of each table.
- `INCREMENTAL_BY_TIME_RANGE` models require a **full historical backfill** from the model's configured start date.
- The old 2-level model names appear as **Removed** in the plan and will be cleaned up after the environment TTL expires.

This is a one-time cost at the transition point and does not recur. There is no way to skip it — `--forward-only` does not apply because SQLMesh treats the 3-level names as new models, not modified ones.

### Virtual catalog naming

By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object.

You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration:

```yaml
gateways:
clickhouse:
connection:
type: clickhouse
host: my-clickhouse-host
username: default
virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__)
trino:
connection:
type: trino
...
```

With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`.

## Local/Built-in Scheduler

**Engine Adapter Type**: `clickhouse`
Expand All @@ -446,4 +494,5 @@ If a model has many records in each partition, you may see additional performanc
| `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N |
| `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N |
| `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N |
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |
20 changes: 19 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,7 @@ class ClickhouseConnectionConfig(ConnectionConfig):
password: t.Optional[str] = None
port: t.Optional[int] = None
cluster: t.Optional[str] = None
virtual_catalog: t.Optional[str] = None
connect_timeout: int = 10
send_receive_timeout: int = 300
query_limit: int = 0
Expand Down Expand Up @@ -2119,6 +2120,19 @@ class ClickhouseConnectionConfig(ConnectionConfig):

_engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse")

@field_validator("virtual_catalog")
def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]:
if v is not None and not v.strip():
raise ConfigError(
"virtual_catalog cannot be an empty string. "
"Omit the field to use the default synthetic prefix (__<gateway_name>__)."
)
if v is not None and "." in v:
raise ConfigError(
f"virtual_catalog must be a single identifier with no dots (got: {v!r})"
)
return v

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
kwargs = {
Expand Down Expand Up @@ -2180,7 +2194,11 @@ def cloud_mode(self) -> bool:

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"cluster": self.cluster, "cloud_mode": self.cloud_mode}
return {
"cluster": self.cluster,
"cloud_mode": self.cloud_mode,
"virtual_catalog": self.virtual_catalog,
}

@property
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
Expand Down
22 changes: 21 additions & 1 deletion sqlmesh/core/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,29 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:

def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
default_catalogs_per_gateway: t.Dict[str, str] = {}
unsupported_gateways = []

for gateway, adapter in context.engine_adapters.items():
if catalog := adapter.default_catalog:
if adapter.catalog_support.is_unsupported:
unsupported_gateways.append((gateway, adapter))
elif catalog := adapter.default_catalog:
default_catalogs_per_gateway[gateway] = catalog

# When catalog-aware gateways exist, assign the gateway name as a virtual catalog for
# catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the
# project have a uniform 3-level FQN and the MappingSchema nesting level check passes.
# Only adapters that explicitly return True from supports_virtual_catalog() are mutated;
# other UNSUPPORTED adapters are left unchanged to avoid silent breakage.
if default_catalogs_per_gateway and unsupported_gateways:
for gateway, adapter in unsupported_gateways:
if adapter.supports_virtual_catalog():
adapter.inject_virtual_catalog(gateway)
# Read the actual virtual catalog name back from the adapter — it may differ
# from the gateway name if the user configured a custom virtual_catalog value.
# inject_virtual_catalog() always sets _default_catalog so default_catalog
# cannot return None at this point.
default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment]

return default_catalogs_per_gateway


Expand Down
10 changes: 10 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def engine_adapter(self) -> EngineAdapter:
@property
def snapshot_evaluator(self) -> SnapshotEvaluator:
if not self._snapshot_evaluator:
self._ensure_virtual_catalog_injection()
self._snapshot_evaluator = SnapshotEvaluator(
{
gateway: adapter.with_settings(execute_log_level=logging.INFO)
Expand All @@ -497,6 +498,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator:
)
return self._snapshot_evaluator

def _ensure_virtual_catalog_injection(self) -> None:
"""Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator.

Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires
earlier (default_catalog is accessed during model loading), but this guard covers the edge
case where snapshot_evaluator is accessed directly on a fresh context before any model ops.
"""
_ = self.default_catalog_per_gateway

def execution_context(
self,
deployability_index: t.Optional[DeployabilityIndex] = None,
Expand Down
25 changes: 25 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,31 @@ def comments_enabled(self) -> bool:
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.UNSUPPORTED

def supports_virtual_catalog(self) -> bool:
"""Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment.

When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways
(e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays
consistent. Adapters that return True here opt in to receiving an injected virtual catalog
via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog
from DDL expressions rather than raising UnsupportedCatalogOperationError.
"""
return False

def inject_virtual_catalog(self, gateway: str) -> None:
"""Inject a gateway name to configure the adapter's virtual catalog.

The adapter determines the final catalog name from the gateway name (e.g. ClickHouse
wraps it as __{gateway}__). Only call this on adapters that return True from
supports_virtual_catalog(). After injection, catalog_support should return
SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL
expressions instead of raising an error.
"""
raise NotImplementedError(
f"{self.dialect} does not support virtual catalog injection. "
"Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()."
)

@cached_property
def schema_differ(self) -> SchemaDiffer:
return SchemaDiffer(
Expand Down
54 changes: 53 additions & 1 deletion sqlmesh/core/engine_adapter/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
from sqlmesh.core.engine_adapter.shared import (
CatalogSupport,
DataObject,
DataObjectType,
EngineRunMode,
Expand Down Expand Up @@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
DEFAULT_TABLE_ENGINE = "MergeTree"
ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"

@property
def catalog_support(self) -> CatalogSupport:
# This property is intentionally dynamic: it transitions from UNSUPPORTED to
# SINGLE_CATALOG_ONLY after inject_virtual_catalog() sets _default_catalog. Callers must
# not cache the result — always read it live so they see the post-injection state.
if self._default_catalog:
return CatalogSupport.SINGLE_CATALOG_ONLY
return CatalogSupport.UNSUPPORTED

def supports_virtual_catalog(self) -> bool:
return True

def inject_virtual_catalog(self, gateway: str) -> None:
configured = self._extra_config.get("virtual_catalog")
self._default_catalog = f"__{gateway}__" if configured is None else configured

@property
def engine_run_mode(self) -> EngineRunMode:
if self._extra_config.get("cloud_mode"):
Expand Down Expand Up @@ -172,10 +189,27 @@ def create_schema(

Clickhouse has a two-level naming scheme [database].[table].
"""
from sqlmesh.utils.errors import SQLMeshError

properties_copy = properties.copy()
if self.engine_run_mode.is_cluster:
properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))

# ClickHouse does not support catalogs. When a virtual catalog has been injected
# (self._default_catalog is set), strip it from the schema name. This mirrors the
# SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here
# because this override is not wrapped by @set_catalog().
if self._default_catalog:
schema_exp = to_schema(schema_name)
catalog_name = schema_exp.catalog
if catalog_name:
if catalog_name != self._default_catalog:
raise SQLMeshError(
f"clickhouse requires that all catalog operations be against a single catalog: "
f"{self._default_catalog}. Provided catalog: {catalog_name}"
)
schema_name = self._strip_virtual_catalog(schema_exp)

# can't call super() because it will try to set a catalog
return self._create_schema(
schema_name=schema_name,
Expand Down Expand Up @@ -446,6 +480,7 @@ def insert_overwrite_by_partition(
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
source_columns: t.Optional[t.List[str]] = None,
) -> None:
table_name = self._strip_virtual_catalog(table_name)
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
query_or_df,
target_columns_to_types,
Expand Down Expand Up @@ -560,6 +595,20 @@ def _create_table(
target_columns_to_types or self.columns(table_name),
)

def _strip_virtual_catalog(self, name: "TableName") -> exp.Table:
"""Strip the virtual catalog prefix from a table name if present.

When a virtual catalog has been injected, ClickHouse table names carry a
synthetic catalog prefix (e.g. ``__gw__``) so they match the 3-level FQN
depth of catalog-aware peers. This helper removes that prefix before any
SQL is sent to the wire, since ClickHouse only supports a two-level
``[database].[table]`` naming scheme.
"""
table = exp.to_table(name)
if self._default_catalog and table.catalog == self._default_catalog:
table.set("catalog", None)
return table

def _exchange_tables(
self,
old_table_name: TableName,
Expand Down Expand Up @@ -598,7 +647,7 @@ def _rename_table(
self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}")

def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None:
delete_expr = exp.delete(table_name, where)
delete_expr = exp.delete(self._strip_virtual_catalog(table_name), where)
if self.engine_run_mode.is_cluster:
delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)))
self.execute(delete_expr)
Expand All @@ -614,6 +663,9 @@ def alter_table(
for alter_expression in [
x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions
]:
if self._default_catalog and isinstance(alter_expression.this, exp.Table):
if alter_expression.this.catalog == self._default_catalog:
alter_expression.this.set("catalog", None)
if self.engine_run_mode.is_cluster:
alter_expression.set(
"cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))
Expand Down
Loading
Loading