From 0dbe087198926198af2cce468b66b2c8a33ab21f Mon Sep 17 00:00:00 2001 From: Frederick Wijayadisusilo Date: Wed, 17 Jun 2026 17:22:07 +0700 Subject: [PATCH] feat: implement maxcompute extract logic for json string value --- plugins/extractors/maxcompute/README.md | 3 + .../extractors/maxcompute/client/client.go | 33 ++++- .../extractors/maxcompute/config/config.go | 13 +- plugins/extractors/maxcompute/maxcompute.go | 133 +++++++++++++++++- .../extractors/maxcompute/maxcompute_test.go | 94 +++++++++++-- 5 files changed, 247 insertions(+), 29 deletions(-) diff --git a/plugins/extractors/maxcompute/README.md b/plugins/extractors/maxcompute/README.md index 47b726f3..e3a0020f 100644 --- a/plugins/extractors/maxcompute/README.md +++ b/plugins/extractors/maxcompute/README.md @@ -36,6 +36,9 @@ source: | `exclude.tables` | `[]string` | `["schema_c.table_a"]` | List of tables to exclude | *optional* | | `exclude.min_table_lifecycle` | `int` | `8` | Exclude tables with a lifecycle less than this value (in days). Value must more than 1. | *optional* | | `concurrency` | `int` | `10` | Number of concurrent requests to MaxCompute | *optional* | +| `max_preview_rows` | `int` | `30` | Number of rows to fetch for the table preview. Set to `0` to disable preview entirely. | *optional* | +| `include_preview_rows` | `bool` | `true` | Whether to emit preview row data. When `false` (the default), `preview_fields` is still populated but `preview_rows` is omitted. | *optional* | +| `mix_values` | `bool` | `false` | Shuffle each preview column independently across rows to break value correlations. | *optional* | ### *Notes* diff --git a/plugins/extractors/maxcompute/client/client.go b/plugins/extractors/maxcompute/client/client.go index 318b874a..1893a9cf 100644 --- a/plugins/extractors/maxcompute/client/client.go +++ b/plugins/extractors/maxcompute/client/client.go @@ -2,6 +2,7 @@ package client import ( "context" + "sort" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" @@ -95,13 +96,15 @@ func (*Client) GetTableSchema(_ context.Context, table *odps.Table) (string, *ta return table.Type().String(), &tableSchema, nil } -func (c *Client) GetTablePreview(_ context.Context, partitionValue string, table *odps.Table, maxRows int) ( - previewFields []string, previewRows *structpb.ListValue, err error, -) { +func (c *Client) GetTablePreview(_ context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error) { if table.Type().String() == config.TableTypeView { return nil, nil, nil } + if partitionValue == "" { + partitionValue = latestPartitionValue(table) + } + records, err := c.tunnel.Preview(table, partitionValue, int64(maxRows)) if err != nil { return nil, nil, err @@ -112,18 +115,34 @@ func (c *Client) GetTablePreview(_ context.Context, partitionValue string, table columnNames[i] = column.Name } - var protoRows []*structpb.Value + var previewRows []*structpb.Value for _, record := range records { var rowValues []*structpb.Value for _, value := range record { rowValues = append(rowValues, structpb.NewStringValue(value.String())) } - protoRows = append(protoRows, structpb.NewListValue(&structpb.ListValue{Values: rowValues})) + + previewRows = append(previewRows, structpb.NewListValue(&structpb.ListValue{Values: rowValues})) + } + + return columnNames, &structpb.ListValue{Values: previewRows}, nil +} + +func latestPartitionValue(table *odps.Table) string { + partitions, err := table.GetPartitions() + if err != nil || len(partitions) == 0 { + return "" } - protoList := &structpb.ListValue{Values: protoRows} + sort.Slice(partitions, func(first, second int) bool { + if !partitions[first].LastModifiedTime().Equal(partitions[second].LastModifiedTime()) { + return partitions[first].LastModifiedTime().After(partitions[second].LastModifiedTime()) + } + + return partitions[first].Value() > partitions[second].Value() + }) - return columnNames, protoList, nil + return partitions[0].Value() } func (*Client) GetMaskingPolicies(table *odps.Table) (maskingPolicies map[Column][]Policy, err error) { diff --git a/plugins/extractors/maxcompute/config/config.go b/plugins/extractors/maxcompute/config/config.go index 2896e12c..96603e57 100644 --- a/plugins/extractors/maxcompute/config/config.go +++ b/plugins/extractors/maxcompute/config/config.go @@ -15,10 +15,11 @@ type Config struct { Tables []string `mapstructure:"tables"` MinTableLifecycle int `mapstructure:"min_table_lifecycle"` } `mapstructure:"exclude,omitempty"` - MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"` - MixValues bool `mapstructure:"mix_values,omitempty"` - Concurrency int `mapstructure:"concurrency,omitempty"` - BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"` - ShieldHost string `mapstructure:"shield_host,omitempty" validate:"omitempty,url"` - ShieldHeader map[string]string `mapstructure:"shield_header,omitempty"` + MaxPreviewRows int `mapstructure:"max_preview_rows,omitempty"` + IncludePreviewRows bool `mapstructure:"include_preview_rows,omitempty"` + MixValues bool `mapstructure:"mix_values,omitempty"` + Concurrency int `mapstructure:"concurrency,omitempty"` + BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"` + ShieldHost string `mapstructure:"shield_host,omitempty" validate:"omitempty,url"` + ShieldHeader map[string]string `mapstructure:"shield_header,omitempty"` } diff --git a/plugins/extractors/maxcompute/maxcompute.go b/plugins/extractors/maxcompute/maxcompute.go index 64665528..a05bd6bb 100644 --- a/plugins/extractors/maxcompute/maxcompute.go +++ b/plugins/extractors/maxcompute/maxcompute.go @@ -8,6 +8,7 @@ import ( "math/rand" "net/http" "regexp" + "sort" "strings" "sync/atomic" "time" @@ -286,10 +287,7 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, tableURN := plugins.MaxComputeURN(e.config.ProjectName, schemaName, tableSchema.TableName) - var previewFields []string - var previewRows *structpb.ListValue - var err error - previewFields, previewRows, err = e.buildPreview(ctx, table, tableSchema) + previewFields, previewRows, err := e.buildPreview(ctx, table, tableSchema) if err != nil { e.logger.Warn("error building preview", "err", err, "table", tableSchema.TableName) } @@ -393,7 +391,9 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema, maxPreviewRows := e.config.MaxPreviewRows if maxPreviewRows > 0 { tableData.PreviewFields = previewFields - tableData.PreviewRows = previewRows + if e.config.IncludePreviewRows { + tableData.PreviewRows = previewRows + } } ddl, err := getDDLStatement(tableSchema, e.config.ProjectName, schemaName) @@ -646,9 +646,132 @@ func (e *Extractor) buildPreview(ctx context.Context, t *odps.Table, tSchema *ta } } + previewFields, previewRows = flattenJSONColumns(previewFields, previewRows, stringColumnSet(tSchema)) + return previewFields, previewRows, nil } +func stringColumnSet(tableSchema *tableschema.TableSchema) map[string]bool { + if tableSchema == nil { + return nil + } + + stringCols := make(map[string]bool, len(tableSchema.Columns)) + for _, col := range tableSchema.Columns { + if dataTypeToString(col.Type) == datatype.STRING.String() { + stringCols[col.Name] = true + } + } + + return stringCols +} + +func flattenJSONColumns(fields []string, rows *structpb.ListValue, isStringCol map[string]bool) ([]string, *structpb.ListValue) { + if rows == nil || len(isStringCol) == 0 { + return fields, rows + } + + subKeysByColumn := jsonSubKeys(fields, rows, isStringCol) + if len(subKeysByColumn) == 0 { + return fields, rows + } + + newFields := make([]string, 0, len(fields)) + for colIdx, fieldName := range fields { + newFields = append(newFields, fieldName) + for _, subKey := range subKeysByColumn[colIdx] { + newFields = append(newFields, fieldName+"."+subKey) + } + } + + newRows := make([]*structpb.Value, len(rows.GetValues())) + for rowIdx, row := range rows.GetValues() { + newRows[rowIdx] = expandJSONRow(row, len(newFields), subKeysByColumn) + } + + return newFields, &structpb.ListValue{Values: newRows} +} + +func jsonSubKeys(fields []string, rows *structpb.ListValue, isStringCol map[string]bool) map[int][]string { + subKeysByColumn := map[int][]string{} + for colIdx, fieldName := range fields { + if !isStringCol[fieldName] { + continue + } + + keySet := map[string]struct{}{} + for _, row := range rows.GetValues() { + for key := range jsonObjectAt(row, colIdx) { + keySet[key] = struct{}{} + } + } + + if len(keySet) == 0 { + continue + } + + keys := make([]string, 0, len(keySet)) + for key := range keySet { + keys = append(keys, key) + } + + sort.Strings(keys) + subKeysByColumn[colIdx] = keys + } + + return subKeysByColumn +} + +func expandJSONRow(row *structpb.Value, width int, subKeysByColumn map[int][]string) *structpb.Value { + expandedCells := make([]*structpb.Value, 0, width) + for colIdx, cell := range row.GetListValue().GetValues() { + expandedCells = append(expandedCells, cell) + subKeys, ok := subKeysByColumn[colIdx] + if !ok { + continue + } + + jsonObj := jsonObjectAt(row, colIdx) + for _, subKey := range subKeys { + expandedCells = append(expandedCells, structpb.NewStringValue(jsonText(jsonObj[subKey]))) + } + } + + return structpb.NewListValue(&structpb.ListValue{Values: expandedCells}) +} + +func jsonObjectAt(row *structpb.Value, col int) map[string]json.RawMessage { + cells := row.GetListValue().GetValues() + if col >= len(cells) { + return nil + } + + text := strings.TrimSpace(cells[col].GetStringValue()) + if text == "" || text[0] != '{' { + return nil + } + + var obj map[string]json.RawMessage + if json.Unmarshal([]byte(text), &obj) != nil { + return nil + } + + return obj +} + +func jsonText(raw json.RawMessage) string { + if len(raw) == 0 || string(raw) == "null" { + return "" + } + + var text string + if json.Unmarshal(raw, &text) == nil { + return text + } + + return string(raw) +} + func (e *Extractor) mixValuesIfNeeded(rows []interface{}, rndSeed int64) ([]interface{}, error) { if !e.config.MixValues || len(rows) < 2 { return rows, nil diff --git a/plugins/extractors/maxcompute/maxcompute_test.go b/plugins/extractors/maxcompute/maxcompute_test.go index dde9f159..4cd64b5e 100644 --- a/plugins/extractors/maxcompute/maxcompute_test.go +++ b/plugins/extractors/maxcompute/maxcompute_test.go @@ -336,10 +336,11 @@ func TestExtract(t *testing.T) { "id": "access_key_id", "secret": "access_key_secret", }, - "endpoint_project": "https://example.com/some-api", - "max_preview_rows": 3, - "mix_values": false, - "build_view_lineage": false, + "endpoint_project": "https://example.com/some-api", + "max_preview_rows": 3, + "include_preview_rows": true, + "mix_values": false, + "build_view_lineage": false, }, }, func(mockClient *mocks.MaxComputeClient) { mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) @@ -390,10 +391,11 @@ func TestExtract(t *testing.T) { "id": "access_key_id", "secret": "access_key_secret", }, - "endpoint_project": "https://example.com/some-api", - "max_preview_rows": 3, - "mix_values": false, - "build_view_lineage": true, + "endpoint_project": "https://example.com/some-api", + "max_preview_rows": 3, + "include_preview_rows": true, + "mix_values": false, + "build_view_lineage": true, }, }, func(mockClient *mocks.MaxComputeClient) { mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) @@ -1083,9 +1085,10 @@ func TestExtract(t *testing.T) { "id": "access_key_id", "secret": "access_key_secret", }, - "endpoint_project": "https://example.com/some-api", - "max_preview_rows": 3, - "mix_values": true, + "endpoint_project": "https://example.com/some-api", + "max_preview_rows": 3, + "include_preview_rows": true, + "mix_values": true, }, }, func(mockClient *mocks.MaxComputeClient) { mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) @@ -1132,4 +1135,73 @@ func TestExtract(t *testing.T) { assert.Equal(t, "1", secondRow[0].GetStringValue()) assert.Equal(t, "user1@example.com", secondRow[1].GetStringValue()) }) + + t.Run("should flatten JSON string columns in preview", func(t *testing.T) { + actual, err := runTest(t, plugins.Config{ + URNScope: "test-maxcompute", + RawConfig: map[string]interface{}{ + "project_name": projectID, + "access_key": map[string]interface{}{ + "id": "access_key_id", + "secret": "access_key_secret", + }, + "endpoint_project": "https://example.com/some-api", + "max_preview_rows": 3, + "include_preview_rows": true, + "mix_values": false, + }, + }, func(mockClient *mocks.MaxComputeClient) { + mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil) + mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil) + mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil) + mockClient.EXPECT().GetTablePreview(mock.Anything, "", table1[1], 3).Return( + []string{"user_id", "email"}, + &structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewListValue(&structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewStringValue("1"), + structpb.NewStringValue(`{"plan":"pro","active":"true"}`), + }, + }), + structpb.NewListValue(&structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewStringValue("2"), + structpb.NewStringValue(`{"plan":"free"}`), + }, + }), + }, + }, + nil, + ) + mockClient.EXPECT().GetMaskingPolicies(mock.Anything).Return(map[string][]string{}, nil) + }, nil) + + require.NoError(t, err) + require.Len(t, actual, 1) + + tableData := &v1beta2.Table{} + require.NoError(t, proto.Unmarshal(actual[0].GetData().GetValue(), tableData)) + + assert.Equal(t, []string{ + "user_id", "email", "email.active", "email.plan", + }, tableData.GetPreviewFields()) + + rows := tableData.GetPreviewRows().GetValues() + require.Len(t, rows, 2) + + firstRow := rows[0].GetListValue().GetValues() + require.Len(t, firstRow, 4) + assert.Equal(t, "1", firstRow[0].GetStringValue()) + assert.Equal(t, `{"plan":"pro","active":"true"}`, firstRow[1].GetStringValue()) + assert.Equal(t, "true", firstRow[2].GetStringValue()) + assert.Equal(t, "pro", firstRow[3].GetStringValue()) + + secondRow := rows[1].GetListValue().GetValues() + require.Len(t, secondRow, 4) + assert.Equal(t, "2", secondRow[0].GetStringValue()) + assert.Equal(t, `{"plan":"free"}`, secondRow[1].GetStringValue()) + assert.Equal(t, "", secondRow[2].GetStringValue()) + assert.Equal(t, "free", secondRow[3].GetStringValue()) + }) }