From a1cfaca46cd0fc7ab02e6e0476525c125e6aa35f Mon Sep 17 00:00:00 2001 From: Nicoleta Lazar <2325898+nicolazar@users.noreply.github.com> Date: Tue, 30 Jun 2026 23:53:25 +0100 Subject: [PATCH] [elixir] feat: Support full table descriptor options in create_table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit brings the Elixir's create_table descriptor to full parity with the native builder by also supporting :bucket_keys, :partition_keys, :custom_properties & comment. The optional fields are bundled into a Fluss.TableDescriptor.Options struct decoded across the NIF boundary as a NifStruct (NifTableOptions), rather than growing table_descriptor_new to seven positional arguments. This mirrors the existing NifDatabaseDescriptor pattern and the Python binding, which builds the core descriptor with unconditional .properties / .custom_properties / .partitioned_by / .distributed_by calls. :properties is now a map (was a list of {k, v} tuples), fixing a write/read asymmetry — get_table_info already reports properties as a map — and matching :custom_properties. --- bindings/elixir/lib/fluss/native.ex | 2 +- bindings/elixir/lib/fluss/table_descriptor.ex | 63 +++++++++++++- .../elixir/native/fluss_nif/src/schema.rs | 31 +++++-- .../elixir/test/integration/admin_test.exs | 65 ++++++++++++++ .../elixir/test/table_descriptor_test.exs | 86 +++++++++++++++++++ 5 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 bindings/elixir/test/table_descriptor_test.exs diff --git a/bindings/elixir/lib/fluss/native.ex b/bindings/elixir/lib/fluss/native.ex index 6de93683..3713e720 100644 --- a/bindings/elixir/lib/fluss/native.ex +++ b/bindings/elixir/lib/fluss/native.ex @@ -61,7 +61,7 @@ defmodule Fluss.Native do do: :erlang.nif_error(:nif_not_loaded) # Schema / TableDescriptor - def table_descriptor_new(_schema, _bucket_count, _properties), + def table_descriptor_new(_schema, _options), do: :erlang.nif_error(:nif_not_loaded) # Table diff --git a/bindings/elixir/lib/fluss/table_descriptor.ex b/bindings/elixir/lib/fluss/table_descriptor.ex index b95b5a50..9bd8485f 100644 --- a/bindings/elixir/lib/fluss/table_descriptor.ex +++ b/bindings/elixir/lib/fluss/table_descriptor.ex @@ -19,25 +19,80 @@ defmodule Fluss.TableDescriptor do @moduledoc """ Descriptor for creating a Fluss table. - Options: `:bucket_count`, `:properties` (list of `{key, value}` string tuples). + Bundles a `Fluss.Schema` with optional table settings; pass the result to + `Fluss.Admin.create_table/5`. + + ## Options + + `new!/2` takes a keyword list: + + * `:bucket_count` - number of buckets to distribute the table into + (non-negative integer). Omitted means the server decides. + * `:bucket_keys` - column names to hash for bucketing. For primary-key + tables these must be a subset of the primary key (excluding partition + keys), defaulting to the primary key columns when omitted. For log + tables they are unconstrained. + * `:partition_keys` - column names to partition the table by. + * `:properties` - map of recognized Fluss table properties that the engine + interprets, e.g. `%{"table.log.format" => "ARROW"}`. + * `:custom_properties` - map of arbitrary string metadata (e.g. ownership, + team). Stored verbatim and never interpreted by Fluss. + * `:comment` - table comment. ## Examples Fluss.TableDescriptor.new!(schema) Fluss.TableDescriptor.new!(schema, bucket_count: 3) + Fluss.TableDescriptor.new!(schema, + bucket_count: 4, + bucket_keys: ["id"], + partition_keys: ["dt"], + custom_properties: %{"owner" => "data-platform"}, + comment: "events table" + ) + """ + defmodule Options do + @moduledoc false + # Internal boundary struct decoded by the `table_descriptor_new` NIF. + # Users supply these as a keyword list to `Fluss.TableDescriptor.new!/2`; + # `new!/2` normalizes that into this struct before crossing into Rust. + + @type t :: %__MODULE__{ + bucket_count: non_neg_integer() | nil, + bucket_keys: list(), + partition_keys: list(), + properties: map(), + custom_properties: map(), + comment: String.t() | nil + } + + defstruct bucket_count: nil, + bucket_keys: [], + partition_keys: [], + properties: %{}, + custom_properties: %{}, + comment: nil + end + alias Fluss.Native @type t :: reference() + @doc """ + Builds a table descriptor from a `Fluss.Schema` and a keyword list of options. + + See the module documentation for the available options. Raises `Fluss.Error` + if the descriptor cannot be built (e.g. an invalid bucket-key/partition-key + combination), or `KeyError` if an unknown option key is given. + """ @spec new!(Fluss.Schema.t(), keyword()) :: t() def new!(%Fluss.Schema{} = schema, opts \\ []) do - bucket_count = Keyword.get(opts, :bucket_count) - properties = Keyword.get(opts, :properties, []) + opts_struct = struct!(Fluss.TableDescriptor.Options, opts) - case Native.table_descriptor_new(schema, bucket_count, properties) do + case Native.table_descriptor_new(schema, opts_struct) do {:error, %Fluss.Error{} = err} -> raise err ref -> ref end diff --git a/bindings/elixir/native/fluss_nif/src/schema.rs b/bindings/elixir/native/fluss_nif/src/schema.rs index 410334fc..95b31bf2 100644 --- a/bindings/elixir/native/fluss_nif/src/schema.rs +++ b/bindings/elixir/native/fluss_nif/src/schema.rs @@ -19,6 +19,7 @@ use crate::atoms::to_nif_err; use fluss::error::Error; use fluss::metadata::{self, DataTypes, Schema, TableDescriptor}; use rustler::{NifStruct, NifTaggedEnum, ResourceArc}; +use std::collections::HashMap; pub struct TableDescriptorResource { pub inner: TableDescriptor, @@ -129,11 +130,21 @@ impl NifSchema { } } +#[derive(NifStruct)] +#[module = "Fluss.TableDescriptor.Options"] +pub struct NifTableOptions { + pub bucket_count: Option, + pub bucket_keys: Vec, + pub partition_keys: Vec, + pub properties: HashMap, + pub custom_properties: HashMap, + pub comment: Option, +} + #[rustler::nif] fn table_descriptor_new( schema: NifSchema, - bucket_count: Option, - properties: Vec<(String, String)>, + opts: NifTableOptions, ) -> Result, rustler::Error> { let mut schema_builder = Schema::builder(); for (name, dt) in &schema.columns { @@ -142,14 +153,18 @@ fn table_descriptor_new( if !schema.primary_key.is_empty() { schema_builder = schema_builder.primary_key(schema.primary_key); } + let built_schema = schema_builder.build().map_err(to_nif_err)?; - let mut builder = TableDescriptor::builder().schema(built_schema); - if let Some(count) = bucket_count { - builder = builder.distributed_by(Some(count), vec![]); - } - for (key, value) in properties { - builder = builder.property(&key, &value); + let mut builder = TableDescriptor::builder() + .schema(built_schema) + .properties(opts.properties) + .custom_properties(opts.custom_properties) + .partitioned_by(opts.partition_keys) + .distributed_by(opts.bucket_count, opts.bucket_keys); + + if let Some(comment) = opts.comment { + builder = builder.comment(comment); } let descriptor = builder.build().map_err(to_nif_err)?; Ok(ResourceArc::new(TableDescriptorResource { diff --git a/bindings/elixir/test/integration/admin_test.exs b/bindings/elixir/test/integration/admin_test.exs index 8c34187a..1de7a1e7 100644 --- a/bindings/elixir/test/integration/admin_test.exs +++ b/bindings/elixir/test/integration/admin_test.exs @@ -400,4 +400,69 @@ defmodule Fluss.Integration.AdminTest do Fluss.Admin.get_table_schema(admin, @database, table) end end + + describe "create_table/5 with descriptor options" do + test "round-trips comment and custom_properties", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + + schema = + Fluss.Schema.new() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("name", :string) + + descriptor = + Fluss.TableDescriptor.new!(schema, + comment: "events table", + custom_properties: %{"owner" => "data-platform"} + ) + + :ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true) + on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end) + + assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table) + assert info.comment == "events table" + assert info.custom_properties == %{"owner" => "data-platform"} + end + + test "round-trips explicit bucket_keys on a primary-key table", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + + schema = + Fluss.Schema.new() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("region", :string) + |> Fluss.Schema.primary_key(["id", "region"]) + + descriptor = Fluss.TableDescriptor.new!(schema, bucket_count: 3, bucket_keys: ["id"]) + + :ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true) + on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end) + + assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table) + assert info.num_buckets == 3 + assert info.bucket_keys == ["id"] + # An explicit, non-default bucket key (the default would be the full PK). + assert info.is_default_bucket_key == false + end + + test "round-trips partition_keys and reports the table as partitioned", %{admin: admin} do + table = "fluss_table_#{:rand.uniform(100_000)}" + + # Partition keys must be a subset of the primary key for PK tables. + schema = + Fluss.Schema.new() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("dt", :string) + |> Fluss.Schema.primary_key(["id", "dt"]) + + descriptor = Fluss.TableDescriptor.new!(schema, partition_keys: ["dt"], bucket_count: 2) + + :ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true) + on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end) + + assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table) + assert info.is_partitioned == true + assert info.partition_keys == ["dt"] + end + end end diff --git a/bindings/elixir/test/table_descriptor_test.exs b/bindings/elixir/test/table_descriptor_test.exs new file mode 100644 index 00000000..3c391319 --- /dev/null +++ b/bindings/elixir/test/table_descriptor_test.exs @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.TableDescriptorTest do + use ExUnit.Case, async: true + + alias Fluss.Schema + alias Fluss.TableDescriptor + + describe "new!/2" do + test "returns a descriptor reference for a minimal schema" do + schema = + Schema.new() + |> Schema.column("id", :int) + |> Schema.column("name", :string) + + assert is_reference(TableDescriptor.new!(schema)) + end + + test "returns a descriptor reference with all options set" do + schema = + Schema.new() + |> Schema.column("id", :int) + |> Schema.column("dt", :string) + |> Schema.primary_key(["id", "dt"]) + + descriptor = + TableDescriptor.new!(schema, + bucket_count: 3, + bucket_keys: ["id"], + partition_keys: ["dt"], + properties: %{"table.replication.factor" => "1"}, + custom_properties: %{"owner" => "data-platform"}, + comment: "events table" + ) + + assert is_reference(descriptor) + end + + test "raises Fluss.Error when bucket keys are not a subset of the primary key" do + schema = + Schema.new() + |> Schema.column("id", :int) + |> Schema.column("region", :string) + |> Schema.primary_key(["id"]) + + assert_raise Fluss.Error, fn -> + TableDescriptor.new!(schema, bucket_keys: ["region"]) + end + end + + test "raises Fluss.Error when bucket keys overlap partition keys" do + schema = + Schema.new() + |> Schema.column("id", :int) + |> Schema.column("dt", :string) + + assert_raise Fluss.Error, fn -> + TableDescriptor.new!(schema, partition_keys: ["dt"], bucket_keys: ["dt"]) + end + end + + # `new!/2` rejects unknown keys rather than silently dropping them. + test "raises KeyError on an unknown option" do + schema = Schema.new() |> Schema.column("id", :int) + + assert_raise KeyError, fn -> + TableDescriptor.new!(schema, bogus: 1) + end + end + end +end