diff --git a/cmd/lint/main.go b/cmd/lint/main.go index 3cfd8b705a..80df86c9e3 100644 --- a/cmd/lint/main.go +++ b/cmd/lint/main.go @@ -126,6 +126,9 @@ var flagRules = []linter.FlagRule{ "source-bootstrap-server", "update-schema-registry", "worker-configurations", + "watermark-column-name", + "distributed-by-column-names", + "distributed-by-buckets", ), ), linter.FlagFilter( @@ -139,6 +142,7 @@ var flagRules = []linter.FlagRule{ "schema-registry-api-key", "schema-registry-api-secret", "skip-message-on-error", + "distributed-by-column-names", ), ), } diff --git a/go.mod b/go.mod index 76b93041dc..adc4c0639a 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 - github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 + github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0 @@ -164,6 +164,7 @@ require ( github.com/charmbracelet/x/term v0.1.1 // indirect github.com/charmbracelet/x/windows v0.1.0 // indirect github.com/cloudflare/circl v1.6.1 // indirect + github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway v0.27.0 // indirect github.com/confluentinc/proto-go-setter v0.3.0 // indirect github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/distribution/reference v0.6.0 // indirect diff --git a/go.sum b/go.sum index 3ed9d35988..b1a1a80221 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,10 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 h1:19qEGhkbZa5fopKCe0VPIV+Sasby4Pv10z9ZaktwWso= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52/go.mod h1:62EMf+5uFEt1BJ2q8WMrUoI9VUSxAbDnmZCGRt/MbA0= +github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway v0.26.0 h1:SpINYIWdqK1tRGjHsN5Fwq/k/+YIgbJdksnHzB796s0= +github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway v0.26.0/go.mod h1:zNoX10GjoVTzo4dYii6eXaYhfIi7J61qCYiSNEGGbrI= +github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway v0.27.0 h1:4nJe4mkLV/gHsYeVb0ZMXsgFbnastCiBv4JLkGTdN/4= +github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway v0.27.0/go.mod h1:vN5BamRyiS+s+Jy3NNWJCDkgadC8xEKdK7wac970wOI= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 h1:zSF4OQUJXWH2JeAo9rsq13ibk+JFdzITGR8S7cFMpzw= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0/go.mod h1:DoxqzzF3JzvJr3fWkvCiOHFlE0GoYpozWxFZ1Ud9ntA= github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 h1:8fWyLwMuy8ec0MVF5Avd54UvbIxhDFhZzanHBVwgxdw= @@ -218,8 +222,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEt github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0/go.mod h1:jnWqax4kM22sutPGMtGmHqe2usgfqYig4UtmHsLENz0= github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 h1:xD7CXcyAqezFnSVFB4U27oWUY4FlbyciVP0ftDIiI18= diff --git a/internal/flink/command.go b/internal/flink/command.go index 75305c1439..9bab43c8c0 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -55,6 +55,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { cmd.AddCommand(c.newConnectivityTypeCommand()) cmd.AddCommand(c.newEndpointCommand()) cmd.AddCommand(c.newRegionCommand()) + cmd.AddCommand(c.newMaterializedTableCommand()) return cmd } diff --git a/internal/flink/command_materialized_table.go b/internal/flink/command_materialized_table.go new file mode 100644 index 0000000000..93069d2546 --- /dev/null +++ b/internal/flink/command_materialized_table.go @@ -0,0 +1,86 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +type materializedTableOut struct { + Name string `human:"Name" serialized:"name"` + ClusterID string `human:"Kafka Cluster ID" serialized:"kafka_cluster_id"` + Environment string `human:"Environment" serialized:"environment"` + ComputePool string `human:"Compute Pool" serialized:"compute_pool"` + ServiceAccount string `human:"Service Account" serialized:"service_account"` + Query string `human:"Query,omitempty" serialized:"query,omitempty"` + Columns []string `human:"Columns,omitempty" serialized:"columns,omitempty"` + WaterMarkColumnName string `human:"Watermark Column Name,omitempty" serialized:"watermark_column_name,omitempty"` + WaterMarkExpression string `human:"Watermark Expression,omitempty" serialized:"watermark_expression,omitempty"` + Constraints []string `human:"Constraints,omitempty" serialized:"constraints,omitempty"` + DistributedByColumnNames []string `human:"Distributed By Column Names,omitempty" serialized:"distributed_by_column_names,omitempty"` + DistributedByBuckets int `human:"Distributed By Buckets,omitempty" serialized:"distributed_by_buckets,omitempty"` +} + +func (c *command) newMaterializedTableCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "materialized-table", + Short: "Manage Flink materialized tables.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.AddCommand(c.newMaterializedTableCreateCommand()) + cmd.AddCommand(c.newMaterializedTableDeleteCommand()) + cmd.AddCommand(c.newMaterializedTableDescribeCommand()) + cmd.AddCommand(c.newMaterializedTableListCommand()) + cmd.AddCommand(c.newMaterializedTableUpdateCommand()) + cmd.AddCommand(c.newMaterializedTableStopCommand()) + cmd.AddCommand(c.newMaterializedTableResumeCommand()) + + return cmd +} + +func (c *command) validMaterializedTableArgs(cmd *cobra.Command, args []string) []string { + if len(args) > 0 { + return nil + } + + return c.validMaterializedTablesArgsMultiple(cmd, args) +} + +func (c *command) validMaterializedTablesArgsMultiple(cmd *cobra.Command, args []string) []string { + if err := c.PersistentPreRunE(cmd, args); err != nil { + return nil + } + + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return nil + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return nil + } + + tables, err := client.ListMaterializedTable(environmentId, c.Context.GetCurrentOrganization()) + if err != nil { + return nil + } + + suggestions := make([]string, len(tables)) + for i, table := range tables { + suggestions[i] = table.GetName() + } + return suggestions +} + +func (c *command) addOptionalMaterializedTableFlags(cmd *cobra.Command) { + cmd.Flags().String("column-physical", "", "Path to the columns data for type physical.") + cmd.Flags().String("column-metadata", "", "Path to the columns data for type metadata.") + cmd.Flags().String("column-computed", "", "Path to the columns data for type computed.") + cmd.Flags().String("watermark-column-name", "", "The name of the watermark columns.") + cmd.Flags().String("watermark-expression", "", "The watermark expression.") + cmd.Flags().String("constraints", "", "Path to the constraints.") + cmd.Flags().String("distributed-by-column-names", "", "The names of the columns the table is distributed by.") + cmd.Flags().Int("distributed-by-buckets", 0, "The number of buckets.") +} diff --git a/internal/flink/command_materialized_table_create.go b/internal/flink/command_materialized_table_create.go new file mode 100644 index 0000000000..22754987ae --- /dev/null +++ b/internal/flink/command_materialized_table_create.go @@ -0,0 +1,406 @@ +package flink + +import ( + "fmt" + "os" + "strconv" + "strings" + + "github.com/spf13/cobra" + + flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/confluentinc/cli/v4/pkg/properties" +) + +func (c *command) newMaterializedTableCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create ", + Short: "Create a Flink materialized table.", + Args: cobra.ExactArgs(1), + RunE: c.materializedTableCreate, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Create Flink materialized table "my-connection" in AWS us-west-2.`, + Code: "flink materialized-table create my-table --cloud aws --region us-west-2 --database lkc01 --compute-pool pool1 --service-account principal1 --query query1", + }, + ), + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + cmd.Flags().String("compute-pool", "", "The id associated with the compute pool in context.") + cmd.Flags().String("service-account", "", "The id of a principal this Materialized Table query runs as.") + cmd.Flags().String("query", "", "The query section of the latest Materialized Table.") + + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + + c.addOptionalMaterializedTableFlags(cmd) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + cobra.CheckErr(cmd.MarkFlagRequired("compute-pool")) + cobra.CheckErr(cmd.MarkFlagRequired("service-account")) + cobra.CheckErr(cmd.MarkFlagRequired("query")) + + return cmd +} + +func (c *command) materializedTableCreate(cmd *cobra.Command, args []string) error { + kafkaId, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + computePool, err := cmd.Flags().GetString("compute-pool") + if err != nil { + return err + } + + serviceAccount, err := cmd.Flags().GetString("service-account") + if err != nil { + return err + } + + query, err := cmd.Flags().GetString("query") + if err != nil { + return err + } + + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + if _, err := c.V2Client.GetOrgEnvironment(environmentId); err != nil { + return errors.NewErrorWithSuggestions(err.Error(), fmt.Sprintf(envNotFoundErrorMsg, environmentId)) + } + + columnComputed, err := cmd.Flags().GetString("column-computed") + if err != nil { + return err + } + + columnPhysical, err := cmd.Flags().GetString("column-physical") + if err != nil { + return err + } + + columnMetadata, err := cmd.Flags().GetString("column-metadata") + if err != nil { + return err + } + + var colDetails []flinkgatewayv1.SqlV1MaterializedTableColumnDetails + if columnComputed != "" { + colDetails, err = addComputedColumns(columnComputed, colDetails) + } + + if columnPhysical != "" { + colDetails, err = addPhysicalColumns(columnPhysical, colDetails) + } + if columnMetadata != "" { + colDetails, err = addMetadataColumns(columnMetadata, colDetails) + } + if err != nil { + return err + } + + watermarkColumnName, err := cmd.Flags().GetString("watermark-column-name") + if err != nil { + return err + } + + watermarkExpression, err := cmd.Flags().GetString("watermark-expression") + if err != nil { + return err + } + + constraints, err := cmd.Flags().GetString("constraints") + if err != nil { + return err + } + + var constr []flinkgatewayv1.SqlV1MaterializedTableConstraint + if constraints != "" { + constr, err = addConstraints(constraints, constr) + } + if err != nil { + return err + } + + distributedByColumnNames, err := cmd.Flags().GetString("distributed-by-column-names") + if err != nil { + return err + } + + distributedByColumnNamesArray := csvToStringSlicePtr(distributedByColumnNames) + + distributedByBuckets, err := cmd.Flags().GetInt("distributed-by-buckets") + if err != nil { + return err + } + + distributedByBucketsInt32 := int32(distributedByBuckets) + + name := args[0] + + orgId := c.Context.GetCurrentOrganization() + if err != nil { + return err + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + table := flinkgatewayv1.SqlV1MaterializedTable{ + Name: name, + EnvironmentId: environmentId, + OrganizationId: orgId, + + Spec: flinkgatewayv1.SqlV1MaterializedTableSpec{ + KafkaClusterId: &kafkaId, + ComputePoolId: &computePool, + Principal: &serviceAccount, + Query: &query, + Columns: &colDetails, + Watermark: &flinkgatewayv1.SqlV1MaterializedTableWatermark{ + ColumnName: &watermarkColumnName, + Expression: &watermarkExpression, + }, + DistributedBy: &flinkgatewayv1.SqlV1MaterializedTableDistribution{ + ColumnNames: distributedByColumnNamesArray, + Buckets: &distributedByBucketsInt32, + }, + Constraints: &constr, + }, + } + + materializedTable, err := client.CreateMaterializedTable(table, environmentId, orgId, kafkaId) + if err != nil { + //panic("HERE") + return err + } + + outputTable := output.NewTable(cmd) + mtableOut := materializedTableOut{ + Name: materializedTable.GetName(), + ClusterID: materializedTable.Spec.GetKafkaClusterId(), + Environment: materializedTable.GetEnvironmentId(), + ComputePool: materializedTable.Spec.GetComputePoolId(), + ServiceAccount: materializedTable.Spec.GetPrincipal(), + Query: materializedTable.Spec.GetQuery(), + Columns: convertToArrayColumns(materializedTable.Spec.GetColumns()), + Constraints: convertToArrayConstraints(materializedTable.Spec.GetConstraints()), + } + + if materializedTable.Spec.Watermark != nil { + wm := materializedTable.Spec.GetWatermark() + mtableOut.WaterMarkColumnName = wm.GetColumnName() + mtableOut.WaterMarkExpression = wm.GetExpression() + } + + if materializedTable.Spec.DistributedBy != nil { + db := materializedTable.Spec.GetDistributedBy() + mtableOut.DistributedByColumnNames = db.GetColumnNames() + mtableOut.DistributedByBuckets = int(db.GetBuckets()) + } + + outputTable.Add(&mtableOut) + return outputTable.Print() +} + +func addComputedColumns(path string, colDetails []flinkgatewayv1.SqlV1MaterializedTableColumnDetails) ([]flinkgatewayv1.SqlV1MaterializedTableColumnDetails, error) { + buf, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + tablesContent := properties.ParseLines(string(buf)) + for index := range len(tablesContent) { + columnComputed := tablesContent[index] + values := strings.Split(columnComputed, ",") + if len(values) != 6 { + return nil, fmt.Errorf("the computed column must be in the format [name,type,comment,kind,expression,virtual]") + } + columnName := values[0] + columnType := values[1] + columnComment := values[2] + columnKind := values[3] + columnExpression := values[4] + virtual := values[5] + var columnVirtual bool + if virtual != "" { + columnVirtual, err = strconv.ParseBool(values[5]) + if err != nil { + return nil, fmt.Errorf("please enter true or false for virtual field") + } + } + computedColumn := flinkgatewayv1.SqlV1ComputedColumn{ + Name: columnName, + Type: columnType, + Comment: &columnComment, + Kind: columnKind, + Expression: columnExpression, + Virtual: &columnVirtual, + } + column := flinkgatewayv1.SqlV1MaterializedTableColumnDetails{ + SqlV1ComputedColumn: &computedColumn, + } + colDetails = append(colDetails, column) + } + return colDetails, nil +} + +func addMetadataColumns(path string, colDetails []flinkgatewayv1.SqlV1MaterializedTableColumnDetails) ([]flinkgatewayv1.SqlV1MaterializedTableColumnDetails, error) { + buf, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + tablesContent := properties.ParseLines(string(buf)) + for index := range len(tablesContent) { + columnComputed := tablesContent[index] + values := strings.Split(columnComputed, ",") + if len(values) != 6 { + return nil, fmt.Errorf("the metadata column must be in the format [name,type,comment,kind,key,virtual]") + } + columnName := values[0] + columnType := values[1] + columnComment := values[2] + columnKind := values[3] + columnKey := values[4] + virtual := values[5] + var columnVirtual bool + if virtual != "" { + columnVirtual, err = strconv.ParseBool(values[5]) + if err != nil { + return nil, fmt.Errorf("please enter true or false for virtual field") + } + } + metadataColumn := flinkgatewayv1.SqlV1MetadataColumn{ + Name: columnName, + Type: columnType, + Comment: &columnComment, + Kind: columnKind, + MetadataKey: columnKey, + Virtual: &columnVirtual, + } + column := flinkgatewayv1.SqlV1MaterializedTableColumnDetails{ + SqlV1MetadataColumn: &metadataColumn, + } + colDetails = append(colDetails, column) + } + return colDetails, nil +} + +func addPhysicalColumns(path string, colDetails []flinkgatewayv1.SqlV1MaterializedTableColumnDetails) ([]flinkgatewayv1.SqlV1MaterializedTableColumnDetails, error) { + buf, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + tablesContent := properties.ParseLines(string(buf)) + for index := range len(tablesContent) { + columnComputed := tablesContent[index] + values := strings.Split(columnComputed, ",") + if len(values) != 4 { + return nil, fmt.Errorf("the physical column must be in the format [name,type,comment,kind]") + } + columnName := values[0] + columnType := values[1] + columnComment := values[2] + columnKind := values[3] + physicalColumn := flinkgatewayv1.SqlV1PhysicalColumn{ + Name: columnName, + Type: columnType, + Comment: &columnComment, + Kind: columnKind, + } + column := flinkgatewayv1.SqlV1MaterializedTableColumnDetails{ + SqlV1PhysicalColumn: &physicalColumn, + } + colDetails = append(colDetails, column) + } + return colDetails, nil +} + +func addConstraints(path string, constr []flinkgatewayv1.SqlV1MaterializedTableConstraint) ([]flinkgatewayv1.SqlV1MaterializedTableConstraint, error) { + buf, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + tablesContent := properties.ParseLines(string(buf)) + for index := range len(tablesContent) { + content := tablesContent[index] + values := strings.Split(content, ",") + if len(values) != 4 { + return nil, fmt.Errorf("the constraints must be in the format [name,type,colNmae1|colName2,enforced]") + } + constraintsName := values[0] + constraintsType := values[1] + constraintsColumnNames := values[2] + constraintsColumnNameArray := csvToStringSlicePtrConstraint(constraintsColumnNames) + constraintsEnforced := values[3] + constraintsEnforcedBool, err := strconv.ParseBool(constraintsEnforced) + if err != nil { + return nil, err + } + constraint := flinkgatewayv1.SqlV1MaterializedTableConstraint{ + Name: &constraintsName, + Kind: &constraintsType, + ColumnNames: constraintsColumnNameArray, + Enforced: &constraintsEnforcedBool, + } + constr = append(constr, constraint) + } + return constr, nil +} + +func csvToStringSlicePtr(csv string) *[]string { + if csv == "" { + return &[]string{} + } + values := strings.Split(csv, ",") + return &values +} + +func csvToStringSlicePtrConstraint(csv string) *[]string { + if csv == "" { + return &[]string{} + } + values := strings.Split(csv, "|") + return &values +} + +func convertToArrayColumns(columns []flinkgatewayv1.SqlV1MaterializedTableColumnDetails) []string { + var cols []string + for _, value := range columns { + if value.SqlV1PhysicalColumn != nil { + cols = append(cols, fmt.Sprintf("{%s, %s, %s, %s}", value.SqlV1PhysicalColumn.GetName(), value.SqlV1PhysicalColumn.GetType(), value.SqlV1PhysicalColumn.GetComment(), value.SqlV1PhysicalColumn.GetKind())) + } + if value.SqlV1ComputedColumn != nil { + cols = append(cols, fmt.Sprintf("{%s, %s, %s, %s, %s, %t}", value.SqlV1ComputedColumn.GetName(), value.SqlV1ComputedColumn.GetType(), value.SqlV1ComputedColumn.GetComment(), value.SqlV1ComputedColumn.GetKind(), value.SqlV1ComputedColumn.GetExpression(), value.SqlV1ComputedColumn.GetVirtual())) + } + if value.SqlV1MetadataColumn != nil { + cols = append(cols, fmt.Sprintf("{%s, %s, %s, %s, %s, %t}", value.SqlV1MetadataColumn.GetName(), value.SqlV1MetadataColumn.GetType(), value.SqlV1MetadataColumn.GetComment(), value.SqlV1MetadataColumn.GetKind(), value.SqlV1MetadataColumn.GetMetadataKey(), value.SqlV1MetadataColumn.GetVirtual())) + } + } + return cols +} + +func convertToArrayConstraints(constraints []flinkgatewayv1.SqlV1MaterializedTableConstraint) []string { + constr := make([]string, 0, len(constraints)) + for _, value := range constraints { + constr = append(constr, fmt.Sprintf("{%s, %s, %s, %t}", value.GetName(), value.GetKind(), value.GetColumnNames(), value.GetEnforced())) + } + return constr +} diff --git a/internal/flink/command_materialized_table_delete.go b/internal/flink/command_materialized_table_delete.go new file mode 100644 index 0000000000..e80b78caaf --- /dev/null +++ b/internal/flink/command_materialized_table_delete.go @@ -0,0 +1,70 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newMaterializedTableDeleteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete [name-2] ... [name-n]", + Short: "Delete one or more materialized tables.", + Args: cobra.MinimumNArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validConnectionArgsMultiple), + RunE: c.materializedTableDelete, + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddForceFlag(cmd) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) materializedTableDelete(cmd *cobra.Command, args []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + if _, err := c.V2Client.GetOrgEnvironment(environmentId); err != nil { + return errors.NewErrorWithSuggestions(err.Error(), fmt.Sprintf(envNotFoundErrorMsg, environmentId)) + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + kafkaId, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + existenceFunc := func(id string) bool { + _, err := client.DescribeMaterializedTable(environmentId, id, c.Context.GetCurrentOrganization(), kafkaId) + return err == nil + } + + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.MaterializedTable); err != nil { + return err + } + + deleteFunc := func(id string) error { + return client.DeleteMaterializedTable(environmentId, c.Context.GetCurrentOrganization(), kafkaId, id) + } + + _, err = deletion.Delete(cmd, args, deleteFunc, resource.MaterializedTable) + return err +} diff --git a/internal/flink/command_materialized_table_describe.go b/internal/flink/command_materialized_table_describe.go new file mode 100644 index 0000000000..0d6e925681 --- /dev/null +++ b/internal/flink/command_materialized_table_describe.go @@ -0,0 +1,92 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newMaterializedTableDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a materialized table.", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStatementArgs), + RunE: c.tableDescribe, + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) tableDescribe(cmd *cobra.Command, args []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + if _, err := c.V2Client.GetOrgEnvironment(environmentId); err != nil { + return errors.NewErrorWithSuggestions(err.Error(), fmt.Sprintf(envNotFoundErrorMsg, environmentId)) + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + orgId := c.Context.GetCurrentOrganization() + if err != nil { + return err + } + + kafkaId, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + name := args[0] + + materializedTable, err := client.DescribeMaterializedTable(environmentId, orgId, kafkaId, name) + if err != nil { + return err + } + + outputTable := output.NewTable(cmd) + mtableOut := materializedTableOut{ + Name: materializedTable.GetName(), + ClusterID: materializedTable.Spec.GetKafkaClusterId(), + Environment: materializedTable.GetEnvironmentId(), + ComputePool: materializedTable.Spec.GetComputePoolId(), + ServiceAccount: materializedTable.Spec.GetPrincipal(), + Query: materializedTable.Spec.GetQuery(), + Columns: convertToArrayColumns(materializedTable.Spec.GetColumns()), + Constraints: convertToArrayConstraints(materializedTable.Spec.GetConstraints()), + } + + if materializedTable.Spec.Watermark != nil { + wm := materializedTable.Spec.GetWatermark() + mtableOut.WaterMarkColumnName = wm.GetColumnName() + mtableOut.WaterMarkExpression = wm.GetExpression() + } + + if materializedTable.Spec.DistributedBy != nil { + db := materializedTable.Spec.GetDistributedBy() + mtableOut.DistributedByColumnNames = db.GetColumnNames() + mtableOut.DistributedByBuckets = int(db.GetBuckets()) + } + + outputTable.Add(&mtableOut) + return outputTable.Print() +} diff --git a/internal/flink/command_materialized_table_list.go b/internal/flink/command_materialized_table_list.go new file mode 100644 index 0000000000..347d30db2c --- /dev/null +++ b/internal/flink/command_materialized_table_list.go @@ -0,0 +1,80 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newMaterializedTableListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List materialized tables.", + Args: cobra.NoArgs, + RunE: c.tableList, + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) tableList(cmd *cobra.Command, _ []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + if _, err := c.V2Client.GetOrgEnvironment(environmentId); err != nil { + return errors.NewErrorWithSuggestions(err.Error(), fmt.Sprintf(envNotFoundErrorMsg, environmentId)) + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + tables, err := client.ListMaterializedTable(environmentId, c.Context.GetCurrentOrganization()) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, materializedTable := range tables { + mtableOut := materializedTableOut{ + Name: materializedTable.GetName(), + ClusterID: materializedTable.Spec.GetKafkaClusterId(), + Environment: materializedTable.GetEnvironmentId(), + ComputePool: materializedTable.Spec.GetComputePoolId(), + ServiceAccount: materializedTable.Spec.GetPrincipal(), + Query: materializedTable.Spec.GetQuery(), + Columns: convertToArrayColumns(materializedTable.Spec.GetColumns()), + Constraints: convertToArrayConstraints(materializedTable.Spec.GetConstraints()), + } + + if materializedTable.Spec.Watermark != nil { + wm := materializedTable.Spec.GetWatermark() + mtableOut.WaterMarkColumnName = wm.GetColumnName() + mtableOut.WaterMarkExpression = wm.GetExpression() + } + + if materializedTable.Spec.DistributedBy != nil { + db := materializedTable.Spec.GetDistributedBy() + mtableOut.DistributedByColumnNames = db.GetColumnNames() + mtableOut.DistributedByBuckets = int(db.GetBuckets()) + } + list.Add(&mtableOut) + } + return list.Print() +} diff --git a/internal/flink/command_materialized_table_resume.go b/internal/flink/command_materialized_table_resume.go new file mode 100644 index 0000000000..793d52dbf5 --- /dev/null +++ b/internal/flink/command_materialized_table_resume.go @@ -0,0 +1,69 @@ +package flink + +import ( + "github.com/spf13/cobra" + + flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newMaterializedTableResumeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "resume ", + Short: "Resume a Flink materialized table.", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validMaterializedTableArgs), + RunE: c.materializedTableResume, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Request to resume the materialized table "materialized-table-1".`, + Code: "confluent flink materialized-table resume materialized-table-1 --database lk01", + }, + ), + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) materializedTableResume(cmd *cobra.Command, args []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + kafkaID, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + table, err := client.DescribeMaterializedTable(environmentId, c.Context.GetCurrentOrganization(), kafkaID, args[0]) + if err != nil { + return err + } + table.Spec.Stopped = flinkgatewayv1.PtrBool(false) + + if _, err := client.UpdateMaterializedTable(table, environmentId, c.Context.GetCurrentOrganization(), kafkaID, args[0]); err != nil { + return err + } + + output.Printf(c.Config.EnableColor, "Requested to resume %s \"%s\".\n", resource.MaterializedTable, args[0]) + return nil +} diff --git a/internal/flink/command_materialized_table_stop.go b/internal/flink/command_materialized_table_stop.go new file mode 100644 index 0000000000..e7f045ae41 --- /dev/null +++ b/internal/flink/command_materialized_table_stop.go @@ -0,0 +1,69 @@ +package flink + +import ( + "github.com/spf13/cobra" + + flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newMaterializedTableStopCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stop ", + Short: "Stop a Flink materialized table.", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validMaterializedTableArgs), + RunE: c.materializedTableStop, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Request to stop the materialized table "materialized-table-1".`, + Code: "confluent flink materialized-table stop materialized-table-1 --database lk01", + }, + ), + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) materializedTableStop(cmd *cobra.Command, args []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + kafkaID, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + table, err := client.DescribeMaterializedTable(environmentId, c.Context.GetCurrentOrganization(), kafkaID, args[0]) + if err != nil { + return err + } + table.Spec.Stopped = flinkgatewayv1.PtrBool(true) + + if _, err := client.UpdateMaterializedTable(table, environmentId, c.Context.GetCurrentOrganization(), kafkaID, args[0]); err != nil { + return err + } + + output.Printf(c.Config.EnableColor, "Requested to stop %s \"%s\".\n", resource.MaterializedTable, args[0]) + return nil +} diff --git a/internal/flink/command_materialized_table_update.go b/internal/flink/command_materialized_table_update.go new file mode 100644 index 0000000000..6b2a3bd349 --- /dev/null +++ b/internal/flink/command_materialized_table_update.go @@ -0,0 +1,203 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newMaterializedTableUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink materialized table.", + Args: cobra.ExactArgs(1), + RunE: c.materializedTableUpdate, + } + + cmd.Flags().String("database", "", "The ID of Kafka cluster hosting the Materialized Table's topic.") + cmd.Flags().String("compute-pool", "", "The id associated with the compute pool in context.") + cmd.Flags().String("service-account", "", "The id of a principal this Materialized Table query runs as.") + cmd.Flags().String("query", "", "The query section of the latest Materialized Table.") + cmd.Flags().Bool("stopped", false, "Determine whether stopped or not.") + + c.addOptionalMaterializedTableFlags(cmd) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("database")) + + return cmd +} + +func (c *command) materializedTableUpdate(cmd *cobra.Command, args []string) error { + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + client, err := c.GetFlinkGatewayClientInternal(false) + if err != nil { + return err + } + + kafkaId, err := cmd.Flags().GetString("database") + if err != nil { + return err + } + + table, err := client.DescribeMaterializedTable(environmentId, c.Context.GetCurrentOrganization(), kafkaId, args[0]) + if err != nil { + return err + } + + principal, err := cmd.Flags().GetString("service-account") + if err != nil { + return err + } + if principal != "" { + table.Spec.SetPrincipal(principal) + } + + computePool, err := cmd.Flags().GetString("compute-pool") + if err != nil { + return err + } + if computePool != "" { + table.Spec.SetComputePoolId(computePool) + } + + query, err := cmd.Flags().GetString("query") + if err != nil { + return err + } + if query != "" { + table.Spec.SetQuery(query) + } + + if cmd.Flags().Changed("stopped") { + stopped, err := cmd.Flags().GetBool("stopped") + if err != nil { + return err + } + table.Spec.SetStopped(stopped) + } + + columnComputed, err := cmd.Flags().GetString("column-computed") + if err != nil { + return err + } + + columnPhysical, err := cmd.Flags().GetString("column-physical") + if err != nil { + return err + } + + columnMetadata, err := cmd.Flags().GetString("column-metadata") + if err != nil { + return err + } + + colDetails := table.Spec.GetColumns() + if columnComputed != "" { + colDetails, _ = addComputedColumns(columnComputed, colDetails) + } + + if columnPhysical != "" { + colDetails, _ = addPhysicalColumns(columnPhysical, colDetails) + } + + if columnMetadata != "" { + colDetails, _ = addMetadataColumns(columnMetadata, colDetails) + } + + table.Spec.SetColumns(colDetails) + watermarkColumnName, err := cmd.Flags().GetString("watermark-column-name") + if err != nil { + return err + } + if watermarkColumnName != "" { + table.Spec.Watermark.SetColumnName(watermarkColumnName) + } + + watermarkExpression, err := cmd.Flags().GetString("watermark-expression") + if err != nil { + return err + } + if watermarkExpression != "" { + table.Spec.Watermark.SetExpression(watermarkExpression) + } + + constraints, err := cmd.Flags().GetString("constraints") + if err != nil { + return err + } + + constr := table.Spec.GetConstraints() + if constraints != "" { + constr, err = addConstraints(constraints, constr) + if err != nil { + return err + } + } + + table.Spec.SetConstraints(constr) + distributedByColumnNames, err := cmd.Flags().GetString("distributed-by-column-names") + distributedByColumnNamesArray := csvToStringSlicePtr(distributedByColumnNames) + + if err != nil { + return err + } + if distributedByColumnNames != "" { + table.Spec.DistributedBy.SetColumnNames(*distributedByColumnNamesArray) + } + + distributedByBuckets, err := cmd.Flags().GetInt("distributed-by-buckets") + distributedByBucketsInt32 := int32(distributedByBuckets) + + if err != nil { + return err + } + if distributedByBuckets > 0 { + table.Spec.DistributedBy.SetBuckets(distributedByBucketsInt32) + } + + if err != nil { + return err + } + + materializedTable, err := client.UpdateMaterializedTable(table, environmentId, c.Context.GetCurrentOrganization(), kafkaId, args[0]) + if err != nil { + return err + } + + outputTable := output.NewTable(cmd) + mtableOut := materializedTableOut{ + Name: materializedTable.GetName(), + ClusterID: materializedTable.Spec.GetKafkaClusterId(), + Environment: materializedTable.GetEnvironmentId(), + ComputePool: materializedTable.Spec.GetComputePoolId(), + ServiceAccount: materializedTable.Spec.GetPrincipal(), + Query: materializedTable.Spec.GetQuery(), + Columns: convertToArrayColumns(materializedTable.Spec.GetColumns()), + Constraints: convertToArrayConstraints(materializedTable.Spec.GetConstraints()), + } + + if materializedTable.Spec.Watermark != nil { + wm := materializedTable.Spec.GetWatermark() + mtableOut.WaterMarkColumnName = wm.GetColumnName() + mtableOut.WaterMarkExpression = wm.GetExpression() + } + + if materializedTable.Spec.DistributedBy != nil { + db := materializedTable.Spec.GetDistributedBy() + mtableOut.DistributedByColumnNames = db.GetColumnNames() + mtableOut.DistributedByBuckets = int(db.GetBuckets()) + } + + outputTable.Add(&mtableOut) + + return outputTable.Print() +} diff --git a/pkg/ccloudv2/flink_gateway.go b/pkg/ccloudv2/flink_gateway.go index 9575168579..b98df72546 100644 --- a/pkg/ccloudv2/flink_gateway.go +++ b/pkg/ccloudv2/flink_gateway.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + flinkgatewayinternalv1 "github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway/v1" flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" "github.com/confluentinc/cli/v4/pkg/errors/flink" @@ -33,6 +34,11 @@ type FlinkGatewayClient struct { AuthToken string } +type FlinkGatewayClientInternal struct { + *flinkgatewayinternalv1.APIClient + AuthToken string +} + func NewFlinkGatewayClient(url, userAgent string, unsafeTrace bool, authToken string) *FlinkGatewayClient { cfg := flinkgatewayv1.NewConfiguration() cfg.Debug = unsafeTrace @@ -46,6 +52,19 @@ func NewFlinkGatewayClient(url, userAgent string, unsafeTrace bool, authToken st } } +func NewFlinkGatewayClientInternal(url, userAgent string, unsafeTrace bool, authToken string) *FlinkGatewayClientInternal { + cfg := flinkgatewayinternalv1.NewConfiguration() + cfg.Debug = unsafeTrace + cfg.HTTPClient = NewRetryableHttpClientWithRedirect(unsafeTrace, checkRedirect) + cfg.Servers = flinkgatewayinternalv1.ServerConfigurations{{URL: url}} + cfg.UserAgent = userAgent + + return &FlinkGatewayClientInternal{ + APIClient: flinkgatewayinternalv1.NewAPIClient(cfg), + AuthToken: authToken, + } +} + func checkRedirect(req *http.Request, via []*http.Request) error { // Default net/http implementation allows 10 redirects - https://go.dev/src/net/http/client.go. // Lowered the redirect limit to fail fast in case of redirect cycles @@ -76,6 +95,10 @@ func (c *FlinkGatewayClient) flinkGatewayApiContext() context.Context { return context.WithValue(context.Background(), flinkgatewayv1.ContextAccessToken, c.AuthToken) } +func (c *FlinkGatewayClientInternal) flinkGatewayApiContextInternal() context.Context { + return context.WithValue(context.Background(), flinkgatewayinternalv1.ContextAccessToken, c.AuthToken) +} + func (c *FlinkGatewayClient) DeleteStatement(environmentId, statementName, orgId string) error { httpResp, err := c.StatementsSqlV1Api.DeleteSqlv1Statement(c.flinkGatewayApiContext(), orgId, environmentId, statementName).Execute() return flinkerror.CatchError(err, httpResp) @@ -149,11 +172,61 @@ func (c *FlinkGatewayClient) GetExceptions(environmentId, statementName, orgId s return resp.GetData(), nil } +func (c *FlinkGatewayClientInternal) DescribeMaterializedTable(environmentId, orgId, kafkaId, tableName string) (flinkgatewayinternalv1.SqlV1MaterializedTable, error) { + resp, httpResp, err := c.MaterializedTablesSqlV1Api.GetSqlv1MaterializedTable(c.flinkGatewayApiContextInternal(), orgId, environmentId, kafkaId, tableName).Execute() + return resp, flinkerror.CatchError(err, httpResp) +} + +func (c *FlinkGatewayClientInternal) ListMaterializedTable(environmentId, orgId string) ([]flinkgatewayinternalv1.SqlV1MaterializedTable, error) { + var allTables []flinkgatewayinternalv1.SqlV1MaterializedTable + pageToken := "" + done := false + + for !done { + tableListResponse, err := c.executeMaterializedTablesList(environmentId, orgId, pageToken) + if err != nil { + return nil, err + } + allTables = append(allTables, tableListResponse.GetData()...) + nextUrl := tableListResponse.Metadata.GetNext() + pageToken, done, err = extractNextPageToken(flinkgatewayv1.NewNullableString(&nextUrl)) + if err != nil { + return nil, err + } + } + return allTables, nil +} + +func (c *FlinkGatewayClientInternal) executeMaterializedTablesList(environmentId, orgId, pageToken string) (flinkgatewayinternalv1.SqlV1MaterializedTableList, error) { + req := c.MaterializedTablesSqlV1Api.ListSqlv1MaterializedTables(c.flinkGatewayApiContextInternal(), orgId, environmentId).PageSize(ccloudV2ListPageSize) + + if pageToken != "" { + req = req.PageToken(pageToken) + } + resp, httpResp, err := req.Execute() + return resp, flinkerror.CatchError(err, httpResp) +} + +func (c *FlinkGatewayClientInternal) UpdateMaterializedTable(table flinkgatewayinternalv1.SqlV1MaterializedTable, environmentId, orgId, kafkaId, tableName string) (flinkgatewayinternalv1.SqlV1MaterializedTable, error) { + resp, httpResp, err := c.MaterializedTablesSqlV1Api.UpdateSqlv1MaterializedTable(c.flinkGatewayApiContextInternal(), orgId, environmentId, kafkaId, tableName).SqlV1MaterializedTable(table).Execute() + return resp, flinkerror.CatchError(err, httpResp) +} + +func (c *FlinkGatewayClientInternal) DeleteMaterializedTable(environmentId, orgId, kafkaId, tableName string) error { + httpResp, err := c.MaterializedTablesSqlV1Api.DeleteSqlv1MaterializedTable(c.flinkGatewayApiContextInternal(), orgId, environmentId, kafkaId, tableName).Execute() + return flinkerror.CatchError(err, httpResp) +} + func (c *FlinkGatewayClient) CreateConnection(connection flinkgatewayv1.SqlV1Connection, environmentId, orgId string) (flinkgatewayv1.SqlV1Connection, error) { resp, httpResp, err := c.ConnectionsSqlV1Api.CreateSqlv1Connection(c.flinkGatewayApiContext(), orgId, environmentId).SqlV1Connection(connection).Execute() return resp, flinkerror.CatchError(err, httpResp) } +func (c *FlinkGatewayClientInternal) CreateMaterializedTable(table flinkgatewayinternalv1.SqlV1MaterializedTable, environmentId, orgId, kafkaId string) (flinkgatewayinternalv1.SqlV1MaterializedTable, error) { + resp, httpResp, err := c.MaterializedTablesSqlV1Api.CreateSqlv1MaterializedTable(c.flinkGatewayApiContextInternal(), orgId, environmentId, kafkaId).SqlV1MaterializedTable(table).Execute() + return resp, flinkerror.CatchError(err, httpResp) +} + func (c *FlinkGatewayClient) executeListConnection(environmentId, orgId, pageToken, connectionType string) (flinkgatewayv1.SqlV1ConnectionList, error) { req := c.ConnectionsSqlV1Api.ListSqlv1Connections(c.flinkGatewayApiContext(), orgId, environmentId).PageSize(ccloudV2ListPageSize) diff --git a/pkg/cmd/authenticated_cli_command.go b/pkg/cmd/authenticated_cli_command.go index aab6b390e3..4dde3782b1 100644 --- a/pkg/cmd/authenticated_cli_command.go +++ b/pkg/cmd/authenticated_cli_command.go @@ -36,10 +36,11 @@ type AuthenticatedCLICommand struct { MDSv2Client *mdsv2alpha1.APIClient V2Client *ccloudv2.Client - flinkGatewayClient *ccloudv2.FlinkGatewayClient - metricsClient *ccloudv2.MetricsClient - schemaRegistryClient *schemaregistry.Client - usageLimitsClient *kafkausagelimits.UsageLimitsClient + flinkGatewayClient *ccloudv2.FlinkGatewayClient + flinkGatewayClientInternal *ccloudv2.FlinkGatewayClientInternal + metricsClient *ccloudv2.MetricsClient + schemaRegistryClient *schemaregistry.Client + usageLimitsClient *kafkausagelimits.UsageLimitsClient Context *config.Context } @@ -98,6 +99,48 @@ func (c *AuthenticatedCLICommand) GetFlinkGatewayClient(computePoolOnly bool) (* return c.flinkGatewayClient, nil } +func (c *AuthenticatedCLICommand) GetFlinkGatewayClientInternal(computePoolOnly bool) (*ccloudv2.FlinkGatewayClientInternal, error) { + if c.flinkGatewayClientInternal == nil { + var url string + var err error + + if c.Context.GetCurrentFlinkEndpoint() != "" { + url = c.Context.GetCurrentFlinkEndpoint() + } else if computePoolOnly { + if computePoolId := c.Context.GetCurrentFlinkComputePool(); computePoolId != "" { + url, err = c.getGatewayUrlForComputePool(c.Context.GetCurrentFlinkAccessType(), computePoolId) + if err != nil { + return nil, err + } + } else { + return nil, errors.NewErrorWithSuggestions("no compute pool selected", "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.") + } + } else if c.Context.GetCurrentFlinkRegion() != "" && c.Context.GetCurrentFlinkCloudProvider() != "" { + url, err = c.getGatewayUrlForRegion(c.Context.GetCurrentFlinkAccessType(), c.Context.GetCurrentFlinkCloudProvider(), c.Context.GetCurrentFlinkRegion()) + if err != nil { + return nil, err + } + } else { + return nil, errors.NewErrorWithSuggestions("no cloud provider and region selected", "Select a cloud provider and region with `confluent flink region use` or `--cloud` and `--region`.") + } + + unsafeTrace, err := c.Flags().GetBool("unsafe-trace") + if err != nil { + return nil, err + } + + dataplaneToken, err := auth.GetDataplaneToken(c.Context) + if err != nil { + return nil, err + } + + log.CliLogger.Debugf("The final url used for setting up Flink dataplane client is: %s\n", url) + c.flinkGatewayClientInternal = ccloudv2.NewFlinkGatewayClientInternal(url, c.Version.UserAgent, unsafeTrace, dataplaneToken) + } + + return c.flinkGatewayClientInternal, nil +} + func (c *AuthenticatedCLICommand) getGatewayUrlForComputePool(access, id string) (string, error) { if c.Config.IsTest { return testserver.TestFlinkGatewayUrl.String(), nil diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index a56592ddd9..093bb13571 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -58,6 +58,7 @@ const ( KafkaCluster = "Kafka cluster" Kek = "KEK" KsqlCluster = "KSQL cluster" + MaterializedTable = "Materialized Table" MirrorTopic = "mirror topic" Network = "network" NetworkLinkEndpoint = "network link endpoint" diff --git a/test/fixtures/output/flink/help.golden b/test/fixtures/output/flink/help.golden index 43943b39a1..36b80f38fc 100644 --- a/test/fixtures/output/flink/help.golden +++ b/test/fixtures/output/flink/help.golden @@ -9,6 +9,7 @@ Available Commands: connection Manage Flink connections. connectivity-type Manage Flink connectivity type. endpoint Manage Flink endpoint. + materialized-table Manage Flink materialized tables. region Manage Flink regions. shell Start Flink interactive SQL client. statement Manage Flink SQL statements. diff --git a/test/fixtures/output/flink/materialized-table/create/create-column.golden b/test/fixtures/output/flink/materialized-table/create/create-column.golden new file mode 100644 index 0000000000..fbde5a142e --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/create/create-column.golden @@ -0,0 +1,15 @@ ++------------------------+--------------------------------+ +| Name | my-table | +| Kafka Cluster ID | lkc01 | +| Environment | env-596 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query1 | +| Columns | {name, type, comment, | +| | Computed, exp1, true}, {name, | +| | type, comment, Physical}, | +| | {name2, type2, comment2, | +| | Physical}, {name, type, | +| | comment, Metadata, key, true} | +| Distributed By Buckets | 32 | ++------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/create/create-filled.golden b/test/fixtures/output/flink/materialized-table/create/create-filled.golden new file mode 100644 index 0000000000..ba3292bcd7 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/create/create-filled.golden @@ -0,0 +1,14 @@ ++-----------------------------+--------------------------------+ +| Name | my-table | +| Kafka Cluster ID | lkc01 | +| Environment | env-596 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query1 | +| Watermark Column Name | wname1 | +| Watermark Expression | wexp1 | +| Constraints | {name, type, [colName1 | +| | colName2], true} | +| Distributed By Column Names | col1, col2 | +| Distributed By Buckets | 32 | ++-----------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/create/create.golden b/test/fixtures/output/flink/materialized-table/create/create.golden new file mode 100644 index 0000000000..f57a5b73a5 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/create/create.golden @@ -0,0 +1,8 @@ ++------------------+------------+ +| Name | my-table | +| Kafka Cluster ID | lkc01 | +| Environment | env-596 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query1 | ++------------------+------------+ diff --git a/test/fixtures/output/flink/materialized-table/delete/delete.golden b/test/fixtures/output/flink/materialized-table/delete/delete.golden new file mode 100644 index 0000000000..ab69d6a4ca --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/delete/delete.golden @@ -0,0 +1 @@ +Are you sure you want to delete Materialized Table "my-table"? (y/n): Deleted Materialized Table "my-table". diff --git a/test/fixtures/output/flink/materialized-table/describe/describe-noKafka.golden b/test/fixtures/output/flink/materialized-table/describe/describe-noKafka.golden new file mode 100644 index 0000000000..55c382c32b --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/describe/describe-noKafka.golden @@ -0,0 +1,17 @@ +Error: required flag(s) "database" not set +Usage: + confluent flink materialized-table describe [flags] + +Flags: + --database string REQUIRED: The ID of Kafka cluster hosting the Materialized Table's topic. + --cloud string Specify the cloud provider as "aws", "azure", or "gcp". + --region string Cloud region for Flink (use "confluent flink region list" to see all). + --environment string Environment ID. + --context string CLI context name. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + diff --git a/test/fixtures/output/flink/materialized-table/describe/describe.golden b/test/fixtures/output/flink/materialized-table/describe/describe.golden new file mode 100644 index 0000000000..94604b650a --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/describe/describe.golden @@ -0,0 +1,16 @@ ++-----------------------------+--------------------------------+ +| Name | table-1 | +| Kafka Cluster ID | lkc01 | +| Environment | env-1 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query | +| Columns | {Name1, Type1, Comment1, | +| | Computed, Expression1, true} | +| Watermark Column Name | Col1 | +| Watermark Expression | Expr1 | +| Constraints | {constr1, PRIMARY_KEY, | +| | [user_id region], true} | +| Distributed By Column Names | user_id, region | +| Distributed By Buckets | 8 | ++-----------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/list/list.golden b/test/fixtures/output/flink/materialized-table/list/list.golden new file mode 100644 index 0000000000..60a5f80499 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/list/list.golden @@ -0,0 +1,6 @@ + Name | Kafka Cluster ID | Environment | Compute Pool | Service Account | Query | Columns | Watermark Column Name | Watermark Expression | Constraints | Distributed By Column Names | Distributed By Buckets +----------+------------------+-------------+--------------+-----------------+-------+--------------------------------+-----------------------+----------------------+--------------------------------+-----------------------------+------------------------- + table-1 | lkc01 | env-1 | pool1 | principal1 | query | {Name1, Type1, Comment1, | Col1 | Expr1 | {constr1, PRIMARY_KEY, | user_id, region | 8 + | | | | | | Computed, Expression1, true}, | | | [user_id region], true}, | | + | | | | | | {Name2, Type2, Comment2, | | | {constr2, PRIMARY_KEY, | | + | | | | | | Physical} | | | [user_id1 region1], false} | | diff --git a/test/fixtures/output/flink/materialized-table/resume.golden b/test/fixtures/output/flink/materialized-table/resume.golden new file mode 100644 index 0000000000..d0e228d9b6 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/resume.golden @@ -0,0 +1 @@ +Requested to resume Materialized Table "my-table-1". diff --git a/test/fixtures/output/flink/materialized-table/stop.golden b/test/fixtures/output/flink/materialized-table/stop.golden new file mode 100644 index 0000000000..6b20940999 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/stop.golden @@ -0,0 +1 @@ +Requested to stop Materialized Table "my-table-1". diff --git a/test/fixtures/output/flink/materialized-table/update/no-cp.golden b/test/fixtures/output/flink/materialized-table/update/no-cp.golden new file mode 100644 index 0000000000..2bd5de1a1d --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/update/no-cp.golden @@ -0,0 +1,16 @@ ++-----------------------------+--------------------------------+ +| Name | table-1 | +| Kafka Cluster ID | lkc01 | +| Environment | env-1 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query1 | +| Columns | {Name1, Type1, Comment1, | +| | Computed, Expression1, true} | +| Watermark Column Name | Col1 | +| Watermark Expression | Expr1 | +| Constraints | {constr1, PRIMARY_KEY, | +| | [user_id region], true} | +| Distributed By Column Names | user_id, region | +| Distributed By Buckets | 8 | ++-----------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/update/update-2.golden b/test/fixtures/output/flink/materialized-table/update/update-2.golden new file mode 100644 index 0000000000..4214cc5262 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/update/update-2.golden @@ -0,0 +1,16 @@ ++-----------------------------+--------------------------------+ +| Name | table-1 | +| Kafka Cluster ID | lkc01 | +| Environment | env-1 | +| Compute Pool | pool2 | +| Service Account | principal1 | +| Query | query1 | +| Columns | {Name1, Type1, Comment1, | +| | Computed, Expression1, true} | +| Watermark Column Name | Col1 | +| Watermark Expression | Expr1 | +| Constraints | {constr1, PRIMARY_KEY, | +| | [user_id region], true} | +| Distributed By Column Names | user_id, region | +| Distributed By Buckets | 8 | ++-----------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/update/update-3.golden b/test/fixtures/output/flink/materialized-table/update/update-3.golden new file mode 100644 index 0000000000..50ea77e761 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/update/update-3.golden @@ -0,0 +1,20 @@ ++-----------------------------+--------------------------------+ +| Name | table-1 | +| Kafka Cluster ID | lkc01 | +| Environment | env-1 | +| Compute Pool | pool2 | +| Service Account | principal1 | +| Query | query1 | +| Columns | {Name1, Type1, Comment1, | +| | Computed, Expression1, | +| | true}, {name, type, comment2, | +| | Physical} | +| Watermark Column Name | Col1 | +| Watermark Expression | expNew | +| Constraints | {constr1, PRIMARY_KEY, | +| | [user_id region], true}, | +| | {name, type, [colName1 | +| | colName2], true} | +| Distributed By Column Names | user_id, region | +| Distributed By Buckets | 8 | ++-----------------------------+--------------------------------+ diff --git a/test/fixtures/output/flink/materialized-table/update/update.golden b/test/fixtures/output/flink/materialized-table/update/update.golden new file mode 100644 index 0000000000..04401de548 --- /dev/null +++ b/test/fixtures/output/flink/materialized-table/update/update.golden @@ -0,0 +1,16 @@ ++-----------------------------+--------------------------------+ +| Name | table-1 | +| Kafka Cluster ID | lkc01 | +| Environment | env-1 | +| Compute Pool | pool1 | +| Service Account | principal1 | +| Query | query2 | +| Columns | {Name1, Type1, Comment1, | +| | Computed, Expression1, true} | +| Watermark Column Name | Col1 | +| Watermark Expression | Expr1 | +| Constraints | {constr1, PRIMARY_KEY, | +| | [user_id region], true} | +| Distributed By Column Names | user_id, region | +| Distributed By Buckets | 8 | ++-----------------------------+--------------------------------+ diff --git a/test/flink_test.go b/test/flink_test.go index 1601d87215..594f88228b 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -107,6 +107,131 @@ func (s *CLITestSuite) TestFlinkConnection() { } } +func (s *CLITestSuite) TestFlinkMaterializedTableCreate() { + file, _ := os.CreateTemp(os.TempDir(), "test") + _, _ = file.Write([]byte("name,type,comment,Physical\n")) + _, _ = file.Write([]byte("name2,type2,comment2,Physical\n")) + defer func() { + _ = os.Remove(file.Name()) + }() + + fileMetadata, _ := os.CreateTemp(os.TempDir(), "test") + line := fmt.Sprintf("name,type,comment,Metadata,key,%t", true) + _, _ = fileMetadata.Write([]byte(line)) + defer func() { + _ = os.Remove(fileMetadata.Name()) + }() + + fileComputed, _ := os.CreateTemp(os.TempDir(), "test") + lineComputed := fmt.Sprintf("name,type,comment,Computed,exp1,%t", true) + _, _ = fileComputed.Write([]byte(lineComputed)) + defer func() { + _ = os.Remove(fileComputed.Name()) + }() + + fileConstraints, _ := os.CreateTemp(os.TempDir(), "test") + _, _ = fileConstraints.Write([]byte("name,type,colName1|colName2,true")) + defer func() { + _ = os.Remove(fileConstraints.Name()) + }() + tests := []CLITest{ + {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, + {args: "flink endpoint use http://127.0.0.1:1026", fixture: "flink/endpoint/use-public.golden"}, + {args: "flink materialized-table create my-table --cloud aws --region eu-west-1 --database lkc01 --compute-pool pool1 --service-account principal1 --query query1 ", fixture: "flink/materialized-table/create/create.golden"}, + {args: fmt.Sprintf("flink materialized-table create my-table --cloud aws --region eu-west-1 "+ + "--database lkc01 --compute-pool pool1 --service-account principal1 "+ + "--query query1 --column-physical %s --column-metadata %s --column-computed %s --distributed-by-buckets 32", file.Name(), + fileMetadata.Name(), fileComputed.Name(), + ), fixture: "flink/materialized-table/create/create-column.golden"}, + {args: fmt.Sprintf("flink materialized-table create my-table --cloud aws --region eu-west-1 "+ + "--database lkc01 --compute-pool pool1 --service-account principal1 "+ + "--query query1 --constraints %s --distributed-by-buckets 32 --distributed-by-column-names col1,col2 --watermark-column-name wname1 "+ + "--watermark-expression wexp1", + fileConstraints.Name(), + ), fixture: "flink/materialized-table/create/create-filled.golden"}, + } + + for _, test := range tests { + test.workflow = true + test.login = "cloud" + s.runIntegrationTest(test) + } +} + +func (s *CLITestSuite) TestFlinkMaterializedTableDescribe() { + tests := []CLITest{ + {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, + {args: "flink endpoint use http://127.0.0.1:1026", fixture: "flink/endpoint/use-public.golden"}, + {args: "flink materialized-table describe my-table --cloud aws --region eu-west-1 --database lkc01", fixture: "flink/materialized-table/describe/describe.golden"}, + {args: "flink materialized-table describe my-table --cloud aws --region eu-west-1", exitCode: 1, fixture: "flink/materialized-table/describe/describe-noKafka.golden"}, + } + + for _, test := range tests { + test.workflow = true + test.login = "cloud" + s.runIntegrationTest(test) + } +} + +func (s *CLITestSuite) TestFlinkMaterializedTableList() { + tests := []CLITest{ + {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, + {args: "flink endpoint use http://127.0.0.1:1026", fixture: "flink/endpoint/use-public.golden"}, + {args: "flink materialized-table list --cloud aws --region eu-west-1 --database lkc01", fixture: "flink/materialized-table/list/list.golden"}, + } + + for _, test := range tests { + test.workflow = true + test.login = "cloud" + s.runIntegrationTest(test) + } +} + +func (s *CLITestSuite) TestFlinkMaterializedTableDelete() { + tests := []CLITest{ + {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, + {args: "flink endpoint use http://127.0.0.1:1026", fixture: "flink/endpoint/use-public.golden"}, + {args: "flink materialized-table delete my-table --cloud aws --region eu-west-1 --database lkc01", input: "y\n", fixture: "flink/materialized-table/delete/delete.golden"}, + } + + for _, test := range tests { + test.workflow = true + test.login = "cloud" + s.runIntegrationTest(test) + } +} + +func (s *CLITestSuite) TestFlinkMaterializedTableUpdate() { + file, _ := os.CreateTemp(os.TempDir(), "test") + _, _ = file.Write([]byte("name,type,comment2,Physical")) + defer func() { + _ = os.Remove(file.Name()) + }() + + fileConstraints, _ := os.CreateTemp(os.TempDir(), "test") + _, _ = fileConstraints.Write([]byte("name,type,colName1|colName2,true")) + defer func() { + _ = os.Remove(fileConstraints.Name()) + }() + + tests := []CLITest{ + {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, + {args: "flink endpoint use http://127.0.0.1:1026", fixture: "flink/endpoint/use-public.golden"}, + {args: "flink materialized-table update my-table-1 --cloud aws --region eu-west-1 --database lkc01 --compute-pool pool1 --service-account principal1 --query query2", fixture: "flink/materialized-table/update/update.golden"}, + {args: "flink materialized-table update my-table-1 --cloud aws --region eu-west-1 --database lkc01 --compute-pool pool2 --service-account principal1 --query query1", fixture: "flink/materialized-table/update/update-2.golden"}, + {args: "flink materialized-table update my-table-1 --cloud aws --region eu-west-1 --database lkc01 --service-account principal1 --query query1", fixture: "flink/materialized-table/update/no-cp.golden"}, + {args: fmt.Sprintf("flink materialized-table update my-table-1 --cloud aws --region eu-west-1 --database lkc01 --compute-pool pool2 --service-account principal1 --query query1 --watermark-expression expNew --constraints %s --column-physical %s", fileConstraints.Name(), file.Name()), fixture: "flink/materialized-table/update/update-3.golden"}, + {args: "flink materialized-table stop my-table-1 --cloud aws --region eu-west-1 --database lkc01", fixture: "flink/materialized-table/stop.golden"}, + {args: "flink materialized-table resume my-table-1 --cloud aws --region eu-west-1 --database lkc01", fixture: "flink/materialized-table/resume.golden"}, + } + + for _, test := range tests { + test.workflow = true + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkConnectionWrongEnv() { tests := []CLITest{ {args: "flink connection create my-connection --cloud aws --region eu-west-1 --type openai --endpoint https://api.openai.com/v1/chat/completions --api-key 0000000000000000 --environment env-dne", fixture: "flink/connection/create/create-wrong-env.golden", exitCode: 1}, diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index da626fe3ae..5c443b33ca 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -12,6 +12,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" + flinkgatewayv1internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/flink-gateway/v1" flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" ) @@ -26,6 +27,9 @@ var flinkGatewayRoutes = []route{ {"/sql/v1/organizations/{organization_id}/environments/{environment}/statements/{statement}/exceptions", handleSqlEnvironmentsEnvironmentStatementExceptions}, {"/sql/v1/organizations/{organization_id}/environments/{environment_id}/connections", handleSqlEnvironmentsEnvironmentConnections}, {"/sql/v1/organizations/{organization_id}/environments/{environment_id}/connections/{connection}", handleSqlEnvironmentsEnvironmentConnectionsConnection}, + {"/sql/v1/organizations/{organization_id}/environments/{environment_id}/databases/{kafka_cluster_id}/materialized-tables", handleSqlMaterializedTables}, + {"/sql/v1/organizations/{organization_id}/environments/{environment_id}/materialized-tables", handleSqlMaterializedTablesList}, + {"/sql/v1/organizations/{organization_id}/environments/{environment_id}/databases/{kafka_cluster_id}/materialized-tables/{table_name}", handleSqlMaterializedTablesTable}, } func NewFlinkGatewayRouter(t *testing.T) *mux.Router { @@ -283,3 +287,212 @@ func handleStatementUpdate(t *testing.T) http.HandlerFunc { w.WriteHeader(http.StatusAccepted) } } + +func handleSqlMaterializedTables(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + tables := flinkgatewayv1internal.SqlV1MaterializedTableList{Data: []flinkgatewayv1internal.SqlV1MaterializedTable{{ + Name: "table-1", + OrganizationId: "org-1", + EnvironmentId: "env-1", + Spec: flinkgatewayv1internal.SqlV1MaterializedTableSpec{ + KafkaClusterId: flinkgatewayv1internal.PtrString("lkc01"), + ComputePoolId: flinkgatewayv1internal.PtrString("pool1"), + Principal: flinkgatewayv1internal.PtrString("principal1"), + Query: flinkgatewayv1internal.PtrString("query"), + Columns: &[]flinkgatewayv1internal.SqlV1MaterializedTableColumnDetails{ + { + SqlV1ComputedColumn: &flinkgatewayv1internal.SqlV1ComputedColumn{ + Name: "Name1", + Type: "Type1", + Comment: flinkgatewayv1internal.PtrString("Comment1"), + Kind: "Computed", + Expression: "Expression1", + Virtual: flinkgatewayv1internal.PtrBool(true), + }, + }, + }, + Watermark: &flinkgatewayv1internal.SqlV1MaterializedTableWatermark{ + ColumnName: flinkgatewayv1internal.PtrString("Col1"), + Expression: flinkgatewayv1internal.PtrString("Expr1"), + }, + DistributedBy: &flinkgatewayv1internal.SqlV1MaterializedTableDistribution{ + ColumnNames: &[]string{"user_id", "region"}, + Buckets: flinkgatewayv1internal.PtrInt32(int32(8)), + }, + Constraints: &[]flinkgatewayv1internal.SqlV1MaterializedTableConstraint{ + { + Name: flinkgatewayv1internal.PtrString("constr1"), + Kind: flinkgatewayv1internal.PtrString("PRIMARY_KEY"), + ColumnNames: &[]string{"user_id", "region"}, + Enforced: flinkgatewayv1internal.PtrBool(true), + }, + }, + }, + Status: &flinkgatewayv1internal.SqlV1MaterializedTableStatus{ + Phase: flinkgatewayv1internal.PtrString("COMPLETED"), + Detail: flinkgatewayv1internal.PtrString("Table1 is completed"), + }, + Metadata: flinkgatewayv1internal.ObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))}, + }, + }} + setPageToken(&tables, &tables.Metadata, r.URL) + err := json.NewEncoder(w).Encode(tables) + require.NoError(t, err) + case http.MethodPost: + table := &flinkgatewayv1internal.SqlV1MaterializedTable{} + err := json.NewDecoder(r.Body).Decode(table) + require.NoError(t, err) + + table.Metadata = flinkgatewayv1internal.ObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))} + table.Status = &flinkgatewayv1internal.SqlV1MaterializedTableStatus{Phase: flinkgatewayv1internal.PtrString("COMPLETED")} + table.Spec.KafkaClusterId = flinkgatewayv1internal.PtrString("lkc01") + table.Spec.ComputePoolId = flinkgatewayv1internal.PtrString("pool1") + table.Spec.Principal = flinkgatewayv1internal.PtrString("principal1") + table.Spec.Query = flinkgatewayv1internal.PtrString("query1") + err = json.NewEncoder(w).Encode(table) + require.NoError(t, err) + } + } +} + +func handleSqlMaterializedTablesList(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + tables := flinkgatewayv1internal.SqlV1MaterializedTableList{Data: []flinkgatewayv1internal.SqlV1MaterializedTable{{ + Name: "table-1", + OrganizationId: "org-1", + EnvironmentId: "env-1", + Spec: flinkgatewayv1internal.SqlV1MaterializedTableSpec{ + KafkaClusterId: flinkgatewayv1internal.PtrString("lkc01"), + ComputePoolId: flinkgatewayv1internal.PtrString("pool1"), + Principal: flinkgatewayv1internal.PtrString("principal1"), + Query: flinkgatewayv1internal.PtrString("query"), + Columns: &[]flinkgatewayv1internal.SqlV1MaterializedTableColumnDetails{ + { + SqlV1ComputedColumn: &flinkgatewayv1internal.SqlV1ComputedColumn{ + Name: "Name1", + Type: "Type1", + Comment: flinkgatewayv1internal.PtrString("Comment1"), + Kind: "Computed", + Expression: "Expression1", + Virtual: flinkgatewayv1internal.PtrBool(true), + }, + }, + { + SqlV1PhysicalColumn: &flinkgatewayv1internal.SqlV1PhysicalColumn{ + Name: "Name2", + Type: "Type2", + Comment: flinkgatewayv1internal.PtrString("Comment2"), + Kind: "Physical", + }, + }, + }, + Watermark: &flinkgatewayv1internal.SqlV1MaterializedTableWatermark{ + ColumnName: flinkgatewayv1internal.PtrString("Col1"), + Expression: flinkgatewayv1internal.PtrString("Expr1"), + }, + DistributedBy: &flinkgatewayv1internal.SqlV1MaterializedTableDistribution{ + ColumnNames: &[]string{"user_id", "region"}, + Buckets: flinkgatewayv1internal.PtrInt32(int32(8)), + }, + Constraints: &[]flinkgatewayv1internal.SqlV1MaterializedTableConstraint{ + { + Name: flinkgatewayv1internal.PtrString("constr1"), + Kind: flinkgatewayv1internal.PtrString("PRIMARY_KEY"), + ColumnNames: &[]string{"user_id", "region"}, + Enforced: flinkgatewayv1internal.PtrBool(true), + }, + { + Name: flinkgatewayv1internal.PtrString("constr2"), + Kind: flinkgatewayv1internal.PtrString("PRIMARY_KEY"), + ColumnNames: &[]string{"user_id1", "region1"}, + Enforced: flinkgatewayv1internal.PtrBool(false), + }}, + }, + Status: &flinkgatewayv1internal.SqlV1MaterializedTableStatus{ + Phase: flinkgatewayv1internal.PtrString("COMPLETED"), + Detail: flinkgatewayv1internal.PtrString("Table1 is completed"), + }, + Metadata: flinkgatewayv1internal.ObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))}, + }, + }} + setPageToken(&tables, &tables.Metadata, r.URL) + err := json.NewEncoder(w).Encode(tables) + require.NoError(t, err) + } + } +} + +func handleSqlMaterializedTablesTable(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + connectionName := mux.Vars(r)["materialized-table"] + if strings.Contains(connectionName, "nonexist") { + err := writeResourceNotFoundError(w) + require.NoError(t, err) + return + } + table := flinkgatewayv1internal.SqlV1MaterializedTable{ + Name: "table-1", + OrganizationId: "org-1", + EnvironmentId: "env-1", + Spec: flinkgatewayv1internal.SqlV1MaterializedTableSpec{ + KafkaClusterId: flinkgatewayv1internal.PtrString("lkc01"), + ComputePoolId: flinkgatewayv1internal.PtrString("pool1"), + Principal: flinkgatewayv1internal.PtrString("principal1"), + Query: flinkgatewayv1internal.PtrString("query"), + Columns: &[]flinkgatewayv1internal.SqlV1MaterializedTableColumnDetails{ + { + SqlV1ComputedColumn: &flinkgatewayv1internal.SqlV1ComputedColumn{ + Name: "Name1", + Type: "Type1", + Comment: flinkgatewayv1internal.PtrString("Comment1"), + Kind: "Computed", + Expression: "Expression1", + Virtual: flinkgatewayv1internal.PtrBool(true), + }, + }, + }, + Watermark: &flinkgatewayv1internal.SqlV1MaterializedTableWatermark{ + ColumnName: flinkgatewayv1internal.PtrString("Col1"), + Expression: flinkgatewayv1internal.PtrString("Expr1"), + }, + DistributedBy: &flinkgatewayv1internal.SqlV1MaterializedTableDistribution{ + ColumnNames: &[]string{"user_id", "region"}, + Buckets: flinkgatewayv1internal.PtrInt32(int32(8)), + }, + Constraints: &[]flinkgatewayv1internal.SqlV1MaterializedTableConstraint{ + { + Name: flinkgatewayv1internal.PtrString("constr1"), + Kind: flinkgatewayv1internal.PtrString("PRIMARY_KEY"), + ColumnNames: &[]string{"user_id", "region"}, + Enforced: flinkgatewayv1internal.PtrBool(true), + }}, + }, + } + err := json.NewEncoder(w).Encode(table) + require.NoError(t, err) + case http.MethodDelete: + w.WriteHeader(http.StatusNoContent) + case http.MethodPut: + handleTableUpdate(t)(w, r) + } + } +} + +func handleTableUpdate(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + table := &flinkgatewayv1internal.SqlV1MaterializedTable{} + err := json.NewDecoder(r.Body).Decode(table) + require.NoError(t, err) + + table.Metadata = flinkgatewayv1internal.ObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))} + table.Status = &flinkgatewayv1internal.SqlV1MaterializedTableStatus{Phase: flinkgatewayv1internal.PtrString("COMPLETED")} + err = json.NewEncoder(w).Encode(table) + require.NoError(t, err) + } +}