Skip to content

glassflow/glassflow-python-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

523 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GlassFlow Python SDK


A Python SDK for creating and managing data pipelines between Kafka and ClickHouse.

Features

  • Create and manage data pipelines between Kafka and ClickHouse
  • Ingest from Kafka sources or OTLP signals (logs, metrics, traces)
  • Unified transforms pipeline: dedup, filter, and stateless transformations
  • Temporal joins between sources based on a common key with a given time window
  • Per-source Schema Registry integration
  • Pipeline configuration via YAML or JSON
  • Schema validation and configuration management
  • Fine-grained resource control per pipeline component
  • Enterprise Edition client (glassflow.ee) with DLQ reprocessing and discard

Installation

pip install glassflow

Quick Start

Initialize client

from glassflow.etl import Client

client = Client(host="your-glassflow-etl-url")

Create a pipeline

The example below uses pipeline version v3. See Migrating from V2 to V3 if you have existing v2 configurations.

pipeline_config = {
    "version": "v3",
    "pipeline_id": "my-pipeline-id",
    "sources": [
        {
            "type": "kafka",
            "source_id": "users",
            "connection_params": {
                "brokers": ["my.kafka.broker:9093"],
                "protocol": "PLAINTEXT",
            },
            "topic": "users",
            "consumer_group_initial_offset": "latest",
            "schema_fields": [
                {"name": "event_id",   "type": "string"},
                {"name": "user_id",    "type": "string"},
                {"name": "created_at", "type": "string"},
                {"name": "name",       "type": "string"},
                {"name": "email",      "type": "string"},
            ],
        }
    ],
    "transforms": [
        {
            "type": "dedup",
            "source_id": "users",
            "config": {
                "key": "event_id",
                "time_window": "1h",
            },
        }
    ],
    "sink": {
        "type": "clickhouse",
        "connection_params": {
            "host": "my.clickhouse.server",
            "port": "9000",
            "database": "default",
            "username": "default",
            "password": "mysecret",
            "secure": False,
        },
        "table": "users",
        "mapping": [
            {"name": "event_id",   "column_name": "event_id",   "column_type": "UUID"},
            {"name": "user_id",    "column_name": "user_id",    "column_type": "UUID"},
            {"name": "created_at", "column_name": "created_at", "column_type": "DateTime"},
            {"name": "name",       "column_name": "name",       "column_type": "String"},
            {"name": "email",      "column_name": "email",      "column_type": "String"},
        ],
    },
}

pipeline = client.create_pipeline(pipeline_config)

You can also load configurations from YAML or JSON files:

pipeline = client.create_pipeline(
    pipeline_config_yaml_path="pipeline.yaml"
)
# or
pipeline = client.create_pipeline(
    pipeline_config_json_path="pipeline.json"
)

For full configuration reference — including Schema Registry, joins, OTLP sources, and resource controls — see the GlassFlow docs.

Get pipeline

pipeline = client.get_pipeline("my-pipeline-id")

List pipelines

pipelines = client.list_pipelines()
for pipeline in pipelines:
    print(f"Pipeline ID: {pipeline['pipeline_id']}, State: {pipeline['state']}")

Stop / Terminate / Resume pipeline

pipeline = client.get_pipeline("my-pipeline-id")

pipeline.stop()                                          # graceful stop → STOPPING
client.stop_pipeline("my-pipeline-id", terminate=True)  # ungraceful    → TERMINATING
pipeline.resume()                                        # restart       → RESUMING

Delete pipeline

Only stopped or terminated pipelines can be deleted.

client.delete_pipeline("my-pipeline-id")
# or
pipeline.delete()

Enterprise Edition

The GlassFlow Enterprise Edition adds capabilities on top of the Open Source engine. The SDK exposes them through a drop-in client that extends the Open Source one. Import Client from glassflow.ee instead of glassflow.etl:

from glassflow.ee import Client

client = Client(host="your-glassflow-etl-url")

The Enterprise client does everything the Open Source client does, plus the Enterprise-only features below. Entitlement is enforced by the backend: calling an Enterprise-only operation against a backend that is not licensed for it raises FeatureNotLicensedError.

DLQ message processing

When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, pipeline.dlq adds message management on top of the Open Source state, consume, and purge:

  • list(batch_size, cursor, component): non-destructive paginated read. Returns a page dict with messages (each carrying a stable message_id, component, error, original_message, and received_at), has_more, and next_cursor. Pass component to filter to a single component (ingestor, join, sink, dedup, oltp-receiver), and pass next_cursor back as cursor to page.
  • list_iter(batch_size, component): lazily iterate over every message, paging via the cursor for you. Yields individual messages, so you do not manage the cursor by hand.
  • reprocess(message_ids) / reprocess_all(): move messages back into the pipeline input to be processed again.
  • discard(message_ids) / discard_all(): permanently remove messages.
pipeline = client.get_pipeline("my-pipeline-id")

# Inspect failed messages from the sink only (paged automatically)
ids = [m["message_id"] for m in pipeline.dlq.list_iter(component="sink")]

# Retry them after fixing the underlying issue
pipeline.dlq.reprocess(ids)         # or pipeline.dlq.reprocess_all()

# Or drop the ones you do not want
pipeline.dlq.discard(["seq_200"])   # or pipeline.dlq.discard_all()

Reprocessing replays messages through the running pipeline, so the pipeline must be in the Running state. Calling reprocess on a stopped, terminated, or failed pipeline raises PipelineNotRunningError. Discard acts on the queue directly and works in any state.

reprocess and discard accept at most 1000 message_id values per call. For larger sets, use the *_all variants. See the DLQ documentation for the full reference.

Migrating from V2 to V3

Pipeline version v2 has been removed. Use Client.migrate_pipeline_v2_to_v3() to convert an existing configuration automatically:

from glassflow.etl import Client

client = Client(host="your-glassflow-etl-url")
v2_config = ...  # your existing v2 pipeline config dict
v3_config = client.migrate_pipeline_v2_to_v3(v2_config)
pipeline = client.create_pipeline(v3_config)

If you prefer to migrate manually, the key changes are:

Area V2 V3
version "v2" "v3"
Sources source: {type, connection_params, topics: [...]} sources: [{type, source_id, connection_params, topic, ...}] flat list
Schema top-level schema.fields block sources[].schema_fields per source
Deduplication per-topic deduplication: {enabled, id_field, ...} transforms: [{type: "dedup", source_id, config: {key, time_window}}]
Filter top-level filter: {enabled, expression} transforms: [{type: "filter", source_id, config: {expression}}]
Transformation top-level stateless_transformation transforms: [{type: "stateless", source_id, config: {transforms: [...]}}]
Join join.sources: [{source_id, key, orientation}] join: {left_source: {...}, right_source: {...}, output_fields: [...]}
Sink connection flat fields (host, port, ...) at top level nested sink.connection_params object
Sink field mapping top-level schema.fields with source_id sink.mapping list of {name, column_name, column_type}
Resources pipeline_resources: {ingestor, transform, ...} resources: {sources: [...], transform: [...], ...}
Sink password base64-encoded plain text

Tracking

The SDK includes anonymous usage stats collection to help improve the product. It collects non-identifying information such as SDK version, Python version, and feature flags (e.g., whether joins or deduplication are enabled). No personally identifiable information is collected.

Usage states collection is enabled by default. To disable it:

export GF_USAGESTATS_ENABLED=false
client.disable_usagestats()

Development

Setup

  1. Clone the repository
  2. Create a virtual environment
  3. Install dependencies:
uv venv
source .venv/bin/activate
uv pip install -e .[dev]

Testing

pytest

About

GlassFlow Python SDK to publish and consume data to your pipelines at Glassflow.dev

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages