Skip to content

Commit

Permalink
feat(collection): add task split (#780)
Browse files Browse the repository at this point in the history
Because

- it make pipeline easier to manipulate array

This commit

- add task split
  • Loading branch information
chuang8511 authored Oct 29, 2024
1 parent 9bc6998 commit 1719e48
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 204 deletions.
29 changes: 29 additions & 0 deletions pkg/component/generic/collection/v0/.compogen/bottom.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,32 @@ output:
title: Object
value: ${text-object.output.data}
```

### Use `TASK_SPLIT` to split an array of strings into groups of a specified size
```yaml
# This pipeline splits an array of elements into groups of a specified size.
# Examples:
# ["foo", "bar", "bat", "zot"], 2 -> [["foo", "bar"], ["bat", "zot"]]
# ["foo", "bar", "bat", "zot"], 3 -> [["foo", "bar", "bat"], ["zot"]]
variable:
texts:
instill-format: array:string
title: Text
group-size:
instill-format: number
title: Group Size
component:
split:
type: collection
input:
array: ${variable.texts}
group-size: ${variable.group-size}
condition:
task: TASK_SPLIT
output:
split:
title: Split
value: ${split.output}
```
55 changes: 55 additions & 0 deletions pkg/component/generic/collection/v0/README.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ It can carry out the following tasks:
- [Intersection](#intersection)
- [Difference](#difference)
- [Concat](#concat)
- [Split](#split)



Expand Down Expand Up @@ -179,6 +180,31 @@ Concatenate the arrays. i.e. `[1, 2] + [3, 4] = [1, 2, 3, 4]`
| Array | `array` | array | The concatenated arrays. |
</div>

### Split

Split the array into an array of arrays with group size. i.e. `[1, 2, 3, 4, 5, 6]` with group size 2 = `[[1, 2], [3, 4], [5, 6]]`. If the array length is not divisible by the group size, the last group will have fewer elements.

<div class="markdown-col-no-wrap" data-col-1 data-col-2>

| Input | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Task ID (required) | `task` | string | `TASK_SPLIT` |
| Array (required) | `array` | array | The array to be split. |
| Group Size (required) | `group-size` | integer | The size of each group. |
</div>






<div class="markdown-col-no-wrap" data-col-1 data-col-2>

| Output | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Arrays | `arrays` | array | The array of arrays with group size. |
</div>


## Example Recipes

Expand Down Expand Up @@ -251,3 +277,32 @@ output:
title: Object
value: ${text-object.output.data}
```

### Use `TASK_SPLIT` to split an array of strings into groups of a specified size
```yaml
# This pipeline splits an array of elements into groups of a specified size.
# Examples:
# ["foo", "bar", "bat", "zot"], 2 -> [["foo", "bar"], ["bat", "zot"]]
# ["foo", "bar", "bat", "zot"], 3 -> [["foo", "bar", "bat"], ["zot"]]
variable:
texts:
instill-format: array:string
title: Text
group-size:
instill-format: number
title: Group Size
component:
split:
type: collection
input:
array: ${variable.texts}
group-size: ${variable.group-size}
condition:
task: TASK_SPLIT
output:
split:
title: Split
value: ${split.output}
```
3 changes: 2 additions & 1 deletion pkg/component/generic/collection/v0/config/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"TASK_UNION",
"TASK_INTERSECTION",
"TASK_DIFFERENCE",
"TASK_CONCAT"
"TASK_CONCAT",
"TASK_SPLIT"
],
"custom": false,
"documentationUrl": "https://www.instill.tech/docs/component/generic/collection",
Expand Down
64 changes: 64 additions & 0 deletions pkg/component/generic/collection/v0/config/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -373,5 +373,69 @@
"title": "Output",
"type": "object"
}
},
"TASK_SPLIT": {
"instillShortDescription": "Split the array into an array of arrays with group size. i.e. `[1, 2, 3, 4, 5, 6]` with group size 2 = `[[1, 2], [3, 4], [5, 6]]`. If the array length is not divisible by the group size, the last group will have fewer elements.",
"input": {
"description": "Input",
"instillUIOrder": 0,
"properties": {
"array": {
"description": "The array to be split.",
"instillAcceptFormats": [
"array:*"
],
"instillUIOrder": 0,
"instillUpstreamTypes": [
"value",
"reference",
"template"
],
"items": {},
"required": [],
"title": "Array",
"type": "array"
},
"group-size": {
"description": "The size of each group.",
"instillUIOrder": 1,
"instillAcceptFormats": [
"integer"
],
"instillUpstreamTypes": [
"value",
"reference",
"template"
],
"title": "Group Size",
"type": "integer"
}
},
"required": [
"array",
"group-size"
],
"title": "Input",
"type": "object"
},
"output": {
"description": "Output",
"instillUIOrder": 1,
"properties": {
"arrays": {
"description": "The array of arrays with group size.",
"instillFormat": "array:array:*",
"instillUIOrder": 0,
"required": [],
"title": "Arrays",
"type": "array"
}
},
"required": [
"arrays"
],
"title": "Output",
"type": "object"
}
}
}
172 changes: 6 additions & 166 deletions pkg/component/generic/collection/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ package collection

import (
"context"
"encoding/json"
"fmt"
"sync"

_ "embed"

"github.com/samber/lo"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"

"github.com/instill-ai/pipeline-backend/pkg/component/base"
Expand All @@ -24,6 +21,7 @@ const (
taskDifference = "TASK_DIFFERENCE"
taskAppend = "TASK_APPEND"
taskConcat = "TASK_CONCAT"
taskSplit = "TASK_SPLIT"
)

var (
Expand All @@ -43,7 +41,7 @@ type component struct {
type execution struct {
base.ComponentExecution

execute func(*structpb.Struct) (*structpb.Struct, error)
execute func(*structpb.Struct, *base.Job, context.Context) (*structpb.Struct, error)
}

// Init returns an implementation of IOperator that processes JSON objects.
Expand Down Expand Up @@ -76,6 +74,8 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution,
e.execute = e.append
case taskConcat:
e.execute = e.concat
case taskSplit:
e.execute = e.split
default:
return nil, errmsg.AddMessage(
fmt.Errorf("not supported task: %s", x.Task),
Expand All @@ -85,167 +85,7 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution,
return e, nil
}

func (e *execution) concat(in *structpb.Struct) (*structpb.Struct, error) {
arrays := in.Fields["arrays"].GetListValue().Values
concat := &structpb.ListValue{Values: []*structpb.Value{}}

for _, a := range arrays {
concat.Values = append(concat.Values, a.GetListValue().Values...)
}

out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["array"] = structpb.NewListValue(concat)
return out, nil
}

func (e *execution) union(in *structpb.Struct) (*structpb.Struct, error) {
sets := in.Fields["sets"].GetListValue().Values
cache := [][]string{}

for _, s := range sets {
c := []string{}
for _, v := range s.GetListValue().Values {
b, err := protojson.Marshal(v)
if err != nil {
return nil, err
}
c = append(c, string(b))
}
cache = append(cache, c)
}

set := &structpb.ListValue{Values: []*structpb.Value{}}
un := lo.Union(cache...)
for _, u := range un {
var a any
err := json.Unmarshal([]byte(u), &a)
if err != nil {
return nil, err
}
v, err := structpb.NewValue(a)
if err != nil {
return nil, err
}
set.Values = append(set.Values, v)
}

out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["set"] = structpb.NewListValue(set)
return out, nil
}

func (e *execution) intersection(in *structpb.Struct) (*structpb.Struct, error) {
sets := in.Fields["sets"].GetListValue().Values

if len(sets) == 1 {
out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["set"] = structpb.NewListValue(sets[0].GetListValue())
return out, nil
}

curr := make([]string, len(sets[0].GetListValue().Values))
for idx, v := range sets[0].GetListValue().Values {
b, err := protojson.Marshal(v)
if err != nil {
return nil, err
}
curr[idx] = string(b)
}

for _, s := range sets[1:] {
next := make([]string, len(s.GetListValue().Values))
for idx, v := range s.GetListValue().Values {
b, err := protojson.Marshal(v)
if err != nil {
return nil, err
}
next[idx] = string(b)
}

i := lo.Intersect(curr, next)
curr = i

}

set := &structpb.ListValue{Values: make([]*structpb.Value, len(curr))}

for idx, c := range curr {
var a any
err := json.Unmarshal([]byte(c), &a)
if err != nil {
return nil, err
}
v, err := structpb.NewValue(a)
if err != nil {
return nil, err
}
set.Values[idx] = v
}

out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["set"] = structpb.NewListValue(set)
return out, nil
}

func (e *execution) difference(in *structpb.Struct) (*structpb.Struct, error) {
setA := in.Fields["set-a"]
setB := in.Fields["set-b"]

valuesA := make([]string, len(setA.GetListValue().Values))
for idx, v := range setA.GetListValue().Values {
b, err := protojson.Marshal(v)
if err != nil {
return nil, err
}
valuesA[idx] = string(b)
}

valuesB := make([]string, len(setB.GetListValue().Values))
for idx, v := range setB.GetListValue().Values {
b, err := protojson.Marshal(v)
if err != nil {
return nil, err
}
valuesB[idx] = string(b)
}
dif, _ := lo.Difference(valuesA, valuesB)

set := &structpb.ListValue{Values: make([]*structpb.Value, len(dif))}

for idx, c := range dif {
var a any

err := json.Unmarshal([]byte(c), &a)
if err != nil {
return nil, err
}
v, err := structpb.NewValue(a)
if err != nil {
return nil, err
}
set.Values[idx] = v
}

out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["set"] = structpb.NewListValue(set)
return out, nil
}

func (e *execution) assign(in *structpb.Struct) (*structpb.Struct, error) {
out := in
return out, nil
}

func (e *execution) append(in *structpb.Struct) (*structpb.Struct, error) {
arr := in.Fields["array"]
element := in.Fields["element"]
arr.GetListValue().Values = append(arr.GetListValue().Values, element)

out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
out.Fields["array"] = arr
return out, nil
}

// Execute processes the input JSON object and returns the result.
func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
return base.SequentialExecutor(ctx, jobs, e.execute)
return base.ConcurrentExecutor(ctx, jobs, e.execute)
}
Loading

0 comments on commit 1719e48

Please sign in to comment.