From 1719e48fbea205ed8a8ac84c6d6c262fd89ab86e Mon Sep 17 00:00:00 2001
From: ChunHao <64747455+chuang8511@users.noreply.github.com>
Date: Tue, 29 Oct 2024 02:19:01 +0000
Subject: [PATCH] feat(collection): add task split (#780)
Because
- it make pipeline easier to manipulate array
This commit
- add task split
---
.../collection/v0/.compogen/bottom.mdx | 29 +++
.../generic/collection/v0/README.mdx | 55 ++++++
.../collection/v0/config/definition.json | 3 +-
.../generic/collection/v0/config/tasks.json | 64 +++++++
pkg/component/generic/collection/v0/main.go | 172 +-----------------
.../generic/collection/v0/main_test.go | 128 +++++++++----
.../generic/collection/v0/task_append.go | 19 ++
.../generic/collection/v0/task_assign.go | 14 ++
.../generic/collection/v0/task_concat.go | 22 +++
.../generic/collection/v0/task_difference.go | 56 ++++++
.../collection/v0/task_intersection.go | 65 +++++++
.../generic/collection/v0/task_split.go | 32 ++++
.../generic/collection/v0/task_union.go | 48 +++++
13 files changed, 503 insertions(+), 204 deletions(-)
create mode 100644 pkg/component/generic/collection/v0/task_append.go
create mode 100644 pkg/component/generic/collection/v0/task_assign.go
create mode 100644 pkg/component/generic/collection/v0/task_concat.go
create mode 100644 pkg/component/generic/collection/v0/task_difference.go
create mode 100644 pkg/component/generic/collection/v0/task_intersection.go
create mode 100644 pkg/component/generic/collection/v0/task_split.go
create mode 100644 pkg/component/generic/collection/v0/task_union.go
diff --git a/pkg/component/generic/collection/v0/.compogen/bottom.mdx b/pkg/component/generic/collection/v0/.compogen/bottom.mdx
index e1029031c..fa017ea61 100644
--- a/pkg/component/generic/collection/v0/.compogen/bottom.mdx
+++ b/pkg/component/generic/collection/v0/.compogen/bottom.mdx
@@ -69,3 +69,32 @@ output:
title: Object
value: ${text-object.output.data}
```
+
+### Use `TASK_SPLIT` to split an array of strings into groups of a specified size
+```yaml
+# This pipeline splits an array of elements into groups of a specified size.
+# Examples:
+# ["foo", "bar", "bat", "zot"], 2 -> [["foo", "bar"], ["bat", "zot"]]
+# ["foo", "bar", "bat", "zot"], 3 -> [["foo", "bar", "bat"], ["zot"]]
+variable:
+ texts:
+ instill-format: array:string
+ title: Text
+ group-size:
+ instill-format: number
+ title: Group Size
+
+component:
+ split:
+ type: collection
+ input:
+ array: ${variable.texts}
+ group-size: ${variable.group-size}
+ condition:
+ task: TASK_SPLIT
+
+output:
+ split:
+ title: Split
+ value: ${split.output}
+```
\ No newline at end of file
diff --git a/pkg/component/generic/collection/v0/README.mdx b/pkg/component/generic/collection/v0/README.mdx
index c6bf8c8b3..b6ed3da8c 100644
--- a/pkg/component/generic/collection/v0/README.mdx
+++ b/pkg/component/generic/collection/v0/README.mdx
@@ -13,6 +13,7 @@ It can carry out the following tasks:
- [Intersection](#intersection)
- [Difference](#difference)
- [Concat](#concat)
+- [Split](#split)
@@ -179,6 +180,31 @@ Concatenate the arrays. i.e. `[1, 2] + [3, 4] = [1, 2, 3, 4]`
| Array | `array` | array | The concatenated arrays. |
+### Split
+
+Split the array into an array of arrays with group size. i.e. `[1, 2, 3, 4, 5, 6]` with group size 2 = `[[1, 2], [3, 4], [5, 6]]`. If the array length is not divisible by the group size, the last group will have fewer elements.
+
+
+
+| Input | ID | Type | Description |
+| :--- | :--- | :--- | :--- |
+| Task ID (required) | `task` | string | `TASK_SPLIT` |
+| Array (required) | `array` | array | The array to be split. |
+| Group Size (required) | `group-size` | integer | The size of each group. |
+
+
+
+
+
+
+
+
+
+| Output | ID | Type | Description |
+| :--- | :--- | :--- | :--- |
+| Arrays | `arrays` | array | The array of arrays with group size. |
+
+
## Example Recipes
@@ -251,3 +277,32 @@ output:
title: Object
value: ${text-object.output.data}
```
+
+### Use `TASK_SPLIT` to split an array of strings into groups of a specified size
+```yaml
+# This pipeline splits an array of elements into groups of a specified size.
+# Examples:
+# ["foo", "bar", "bat", "zot"], 2 -> [["foo", "bar"], ["bat", "zot"]]
+# ["foo", "bar", "bat", "zot"], 3 -> [["foo", "bar", "bat"], ["zot"]]
+variable:
+ texts:
+ instill-format: array:string
+ title: Text
+ group-size:
+ instill-format: number
+ title: Group Size
+
+component:
+ split:
+ type: collection
+ input:
+ array: ${variable.texts}
+ group-size: ${variable.group-size}
+ condition:
+ task: TASK_SPLIT
+
+output:
+ split:
+ title: Split
+ value: ${split.output}
+```
\ No newline at end of file
diff --git a/pkg/component/generic/collection/v0/config/definition.json b/pkg/component/generic/collection/v0/config/definition.json
index 42ef31092..b072cd936 100644
--- a/pkg/component/generic/collection/v0/config/definition.json
+++ b/pkg/component/generic/collection/v0/config/definition.json
@@ -5,7 +5,8 @@
"TASK_UNION",
"TASK_INTERSECTION",
"TASK_DIFFERENCE",
- "TASK_CONCAT"
+ "TASK_CONCAT",
+ "TASK_SPLIT"
],
"custom": false,
"documentationUrl": "https://www.instill.tech/docs/component/generic/collection",
diff --git a/pkg/component/generic/collection/v0/config/tasks.json b/pkg/component/generic/collection/v0/config/tasks.json
index 33aaaacd4..7eccf4860 100644
--- a/pkg/component/generic/collection/v0/config/tasks.json
+++ b/pkg/component/generic/collection/v0/config/tasks.json
@@ -373,5 +373,69 @@
"title": "Output",
"type": "object"
}
+ },
+ "TASK_SPLIT": {
+ "instillShortDescription": "Split the array into an array of arrays with group size. i.e. `[1, 2, 3, 4, 5, 6]` with group size 2 = `[[1, 2], [3, 4], [5, 6]]`. If the array length is not divisible by the group size, the last group will have fewer elements.",
+ "input": {
+ "description": "Input",
+ "instillUIOrder": 0,
+ "properties": {
+ "array": {
+ "description": "The array to be split.",
+ "instillAcceptFormats": [
+ "array:*"
+ ],
+ "instillUIOrder": 0,
+ "instillUpstreamTypes": [
+ "value",
+ "reference",
+ "template"
+ ],
+ "items": {},
+ "required": [],
+ "title": "Array",
+ "type": "array"
+ },
+ "group-size": {
+ "description": "The size of each group.",
+ "instillUIOrder": 1,
+ "instillAcceptFormats": [
+ "integer"
+ ],
+ "instillUpstreamTypes": [
+ "value",
+ "reference",
+ "template"
+ ],
+ "title": "Group Size",
+ "type": "integer"
+ }
+ },
+ "required": [
+ "array",
+ "group-size"
+ ],
+ "title": "Input",
+ "type": "object"
+ },
+ "output": {
+ "description": "Output",
+ "instillUIOrder": 1,
+ "properties": {
+ "arrays": {
+ "description": "The array of arrays with group size.",
+ "instillFormat": "array:array:*",
+ "instillUIOrder": 0,
+ "required": [],
+ "title": "Arrays",
+ "type": "array"
+ }
+ },
+ "required": [
+ "arrays"
+ ],
+ "title": "Output",
+ "type": "object"
+ }
}
}
diff --git a/pkg/component/generic/collection/v0/main.go b/pkg/component/generic/collection/v0/main.go
index 63b6a0c8d..33f989c46 100644
--- a/pkg/component/generic/collection/v0/main.go
+++ b/pkg/component/generic/collection/v0/main.go
@@ -3,14 +3,11 @@ package collection
import (
"context"
- "encoding/json"
"fmt"
"sync"
_ "embed"
- "github.com/samber/lo"
- "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
"github.com/instill-ai/pipeline-backend/pkg/component/base"
@@ -24,6 +21,7 @@ const (
taskDifference = "TASK_DIFFERENCE"
taskAppend = "TASK_APPEND"
taskConcat = "TASK_CONCAT"
+ taskSplit = "TASK_SPLIT"
)
var (
@@ -43,7 +41,7 @@ type component struct {
type execution struct {
base.ComponentExecution
- execute func(*structpb.Struct) (*structpb.Struct, error)
+ execute func(*structpb.Struct, *base.Job, context.Context) (*structpb.Struct, error)
}
// Init returns an implementation of IOperator that processes JSON objects.
@@ -76,6 +74,8 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution,
e.execute = e.append
case taskConcat:
e.execute = e.concat
+ case taskSplit:
+ e.execute = e.split
default:
return nil, errmsg.AddMessage(
fmt.Errorf("not supported task: %s", x.Task),
@@ -85,167 +85,7 @@ func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution,
return e, nil
}
-func (e *execution) concat(in *structpb.Struct) (*structpb.Struct, error) {
- arrays := in.Fields["arrays"].GetListValue().Values
- concat := &structpb.ListValue{Values: []*structpb.Value{}}
-
- for _, a := range arrays {
- concat.Values = append(concat.Values, a.GetListValue().Values...)
- }
-
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["array"] = structpb.NewListValue(concat)
- return out, nil
-}
-
-func (e *execution) union(in *structpb.Struct) (*structpb.Struct, error) {
- sets := in.Fields["sets"].GetListValue().Values
- cache := [][]string{}
-
- for _, s := range sets {
- c := []string{}
- for _, v := range s.GetListValue().Values {
- b, err := protojson.Marshal(v)
- if err != nil {
- return nil, err
- }
- c = append(c, string(b))
- }
- cache = append(cache, c)
- }
-
- set := &structpb.ListValue{Values: []*structpb.Value{}}
- un := lo.Union(cache...)
- for _, u := range un {
- var a any
- err := json.Unmarshal([]byte(u), &a)
- if err != nil {
- return nil, err
- }
- v, err := structpb.NewValue(a)
- if err != nil {
- return nil, err
- }
- set.Values = append(set.Values, v)
- }
-
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["set"] = structpb.NewListValue(set)
- return out, nil
-}
-
-func (e *execution) intersection(in *structpb.Struct) (*structpb.Struct, error) {
- sets := in.Fields["sets"].GetListValue().Values
-
- if len(sets) == 1 {
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["set"] = structpb.NewListValue(sets[0].GetListValue())
- return out, nil
- }
-
- curr := make([]string, len(sets[0].GetListValue().Values))
- for idx, v := range sets[0].GetListValue().Values {
- b, err := protojson.Marshal(v)
- if err != nil {
- return nil, err
- }
- curr[idx] = string(b)
- }
-
- for _, s := range sets[1:] {
- next := make([]string, len(s.GetListValue().Values))
- for idx, v := range s.GetListValue().Values {
- b, err := protojson.Marshal(v)
- if err != nil {
- return nil, err
- }
- next[idx] = string(b)
- }
-
- i := lo.Intersect(curr, next)
- curr = i
-
- }
-
- set := &structpb.ListValue{Values: make([]*structpb.Value, len(curr))}
-
- for idx, c := range curr {
- var a any
- err := json.Unmarshal([]byte(c), &a)
- if err != nil {
- return nil, err
- }
- v, err := structpb.NewValue(a)
- if err != nil {
- return nil, err
- }
- set.Values[idx] = v
- }
-
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["set"] = structpb.NewListValue(set)
- return out, nil
-}
-
-func (e *execution) difference(in *structpb.Struct) (*structpb.Struct, error) {
- setA := in.Fields["set-a"]
- setB := in.Fields["set-b"]
-
- valuesA := make([]string, len(setA.GetListValue().Values))
- for idx, v := range setA.GetListValue().Values {
- b, err := protojson.Marshal(v)
- if err != nil {
- return nil, err
- }
- valuesA[idx] = string(b)
- }
-
- valuesB := make([]string, len(setB.GetListValue().Values))
- for idx, v := range setB.GetListValue().Values {
- b, err := protojson.Marshal(v)
- if err != nil {
- return nil, err
- }
- valuesB[idx] = string(b)
- }
- dif, _ := lo.Difference(valuesA, valuesB)
-
- set := &structpb.ListValue{Values: make([]*structpb.Value, len(dif))}
-
- for idx, c := range dif {
- var a any
-
- err := json.Unmarshal([]byte(c), &a)
- if err != nil {
- return nil, err
- }
- v, err := structpb.NewValue(a)
- if err != nil {
- return nil, err
- }
- set.Values[idx] = v
- }
-
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["set"] = structpb.NewListValue(set)
- return out, nil
-}
-
-func (e *execution) assign(in *structpb.Struct) (*structpb.Struct, error) {
- out := in
- return out, nil
-}
-
-func (e *execution) append(in *structpb.Struct) (*structpb.Struct, error) {
- arr := in.Fields["array"]
- element := in.Fields["element"]
- arr.GetListValue().Values = append(arr.GetListValue().Values, element)
-
- out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
- out.Fields["array"] = arr
- return out, nil
-}
-
+// Execute processes the input JSON object and returns the result.
func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
- return base.SequentialExecutor(ctx, jobs, e.execute)
+ return base.ConcurrentExecutor(ctx, jobs, e.execute)
}
diff --git a/pkg/component/generic/collection/v0/main_test.go b/pkg/component/generic/collection/v0/main_test.go
index 79aed5fd4..8676e1db0 100644
--- a/pkg/component/generic/collection/v0/main_test.go
+++ b/pkg/component/generic/collection/v0/main_test.go
@@ -3,60 +3,114 @@ package collection
import (
"testing"
+ "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
qt "github.com/frankban/quicktest"
)
+const (
+ arrays1 = `
+{
+ "arrays": [
+ ["a", "b"],
+ ["c", "d"]
+ ]
+}`
+
+ arrays2 = `
+{
+ "arrays": [
+ ["a", "b"],
+ ["c", "d"],
+ ["e"]
+ ]
+}`
+
+ array1 = `
+{
+ "array": ["a", "b", "c", "d"]
+}`
+
+ array2 = `
+{
+ "array": ["a", "b", "c", "d", "e"]
+}`
+)
+
func Test_Concat(t *testing.T) {
c := qt.New(t)
c.Run("Concat", func(c *qt.C) {
e := &execution{}
- inputArrays := []*structpb.Value{
- structpb.NewListValue(&structpb.ListValue{
- Values: []*structpb.Value{
- structpb.NewStringValue("a"),
- structpb.NewStringValue("b"),
- },
- }),
- structpb.NewListValue(&structpb.ListValue{
- Values: []*structpb.Value{
- structpb.NewStringValue("c"),
- structpb.NewStringValue("d"),
- },
- }),
- }
-
- inputStruct := &structpb.Struct{
- Fields: map[string]*structpb.Value{
- "arrays": structpb.NewListValue(&structpb.ListValue{
- Values: inputArrays,
- }),
- },
- }
-
- expectedOutput := &structpb.Struct{
- Fields: map[string]*structpb.Value{
- "array": structpb.NewListValue(&structpb.ListValue{
- Values: []*structpb.Value{
- structpb.NewStringValue("a"),
- structpb.NewStringValue("b"),
- structpb.NewStringValue("c"),
- structpb.NewStringValue("d"),
- },
- }),
- },
- }
-
- out, err := e.concat(inputStruct)
+ inputStruct := &structpb.Struct{}
+ err := protojson.Unmarshal([]byte(arrays1), inputStruct)
+ c.Assert(err, qt.IsNil)
+
+ expectedOutput := &structpb.Struct{}
+ err = protojson.Unmarshal([]byte(array1), expectedOutput)
+ c.Assert(err, qt.IsNil)
+
+ out, err := e.concat(inputStruct, nil, nil)
c.Assert(err, qt.IsNil)
c.Assert(proto.Equal(out, expectedOutput), qt.IsTrue)
})
+}
+
+func Test_Split(t *testing.T) {
+ c := qt.New(t)
+
+ testcases := []struct {
+ name string
+ groupSize int
+ arrayInput string
+ output string
+ }{
+ {
+ name: "Split without remaining",
+ groupSize: 2,
+ arrayInput: array1,
+ output: arrays1,
+ },
+ {
+ name: "Split with remaining",
+ groupSize: 2,
+ arrayInput: array2,
+ output: arrays2,
+ },
+ }
+
+ for _, tc := range testcases {
+
+ c.Run(tc.name, func(c *qt.C) {
+ e := &execution{}
+
+ inputStruct := &structpb.Struct{}
+ err := protojson.Unmarshal([]byte(tc.arrayInput), inputStruct)
+
+ c.Assert(err, qt.IsNil)
+
+ // set group-size to 2
+ inputStruct.Fields["group-size"] = &structpb.Value{
+ Kind: &structpb.Value_NumberValue{NumberValue: float64(tc.groupSize)},
+ }
+
+ expectedOutput := &structpb.Struct{}
+ err = protojson.Unmarshal([]byte(tc.output), expectedOutput)
+ c.Assert(err, qt.IsNil)
+
+ out, err := e.split(inputStruct, nil, nil)
+
+ c.Assert(err, qt.IsNil)
+
+ c.Assert(proto.Equal(out, expectedOutput), qt.IsTrue)
+ })
+
+ }
}
diff --git a/pkg/component/generic/collection/v0/task_append.go b/pkg/component/generic/collection/v0/task_append.go
new file mode 100644
index 000000000..5ff04ea28
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_append.go
@@ -0,0 +1,19 @@
+package collection
+
+import (
+ "context"
+
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) append(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ arr := in.Fields["array"]
+ element := in.Fields["element"]
+ arr.GetListValue().Values = append(arr.GetListValue().Values, element)
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["array"] = arr
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_assign.go b/pkg/component/generic/collection/v0/task_assign.go
new file mode 100644
index 000000000..23c7546e8
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_assign.go
@@ -0,0 +1,14 @@
+package collection
+
+import (
+ "context"
+
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) assign(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ out := in
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_concat.go b/pkg/component/generic/collection/v0/task_concat.go
new file mode 100644
index 000000000..62b46a51c
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_concat.go
@@ -0,0 +1,22 @@
+package collection
+
+import (
+ "context"
+
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) concat(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ arrays := in.Fields["arrays"].GetListValue().Values
+ concat := &structpb.ListValue{Values: []*structpb.Value{}}
+
+ for _, a := range arrays {
+ concat.Values = append(concat.Values, a.GetListValue().Values...)
+ }
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["array"] = structpb.NewListValue(concat)
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_difference.go b/pkg/component/generic/collection/v0/task_difference.go
new file mode 100644
index 000000000..1cf869dd2
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_difference.go
@@ -0,0 +1,56 @@
+package collection
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/samber/lo"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) difference(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ setA := in.Fields["set-a"]
+ setB := in.Fields["set-b"]
+
+ valuesA := make([]string, len(setA.GetListValue().Values))
+ for idx, v := range setA.GetListValue().Values {
+ b, err := protojson.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ valuesA[idx] = string(b)
+ }
+
+ valuesB := make([]string, len(setB.GetListValue().Values))
+ for idx, v := range setB.GetListValue().Values {
+ b, err := protojson.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ valuesB[idx] = string(b)
+ }
+ dif, _ := lo.Difference(valuesA, valuesB)
+
+ set := &structpb.ListValue{Values: make([]*structpb.Value, len(dif))}
+
+ for idx, c := range dif {
+ var a any
+
+ err := json.Unmarshal([]byte(c), &a)
+ if err != nil {
+ return nil, err
+ }
+ v, err := structpb.NewValue(a)
+ if err != nil {
+ return nil, err
+ }
+ set.Values[idx] = v
+ }
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["set"] = structpb.NewListValue(set)
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_intersection.go b/pkg/component/generic/collection/v0/task_intersection.go
new file mode 100644
index 000000000..4bddfc36a
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_intersection.go
@@ -0,0 +1,65 @@
+package collection
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/samber/lo"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) intersection(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ sets := in.Fields["sets"].GetListValue().Values
+
+ if len(sets) == 1 {
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["set"] = structpb.NewListValue(sets[0].GetListValue())
+ return out, nil
+ }
+
+ curr := make([]string, len(sets[0].GetListValue().Values))
+ for idx, v := range sets[0].GetListValue().Values {
+ b, err := protojson.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ curr[idx] = string(b)
+ }
+
+ for _, s := range sets[1:] {
+ next := make([]string, len(s.GetListValue().Values))
+ for idx, v := range s.GetListValue().Values {
+ b, err := protojson.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ next[idx] = string(b)
+ }
+
+ i := lo.Intersect(curr, next)
+ curr = i
+
+ }
+
+ set := &structpb.ListValue{Values: make([]*structpb.Value, len(curr))}
+
+ for idx, c := range curr {
+ var a any
+ err := json.Unmarshal([]byte(c), &a)
+ if err != nil {
+ return nil, err
+ }
+ v, err := structpb.NewValue(a)
+ if err != nil {
+ return nil, err
+ }
+ set.Values[idx] = v
+ }
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["set"] = structpb.NewListValue(set)
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_split.go b/pkg/component/generic/collection/v0/task_split.go
new file mode 100644
index 000000000..84d4bbc19
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_split.go
@@ -0,0 +1,32 @@
+package collection
+
+import (
+ "context"
+
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) split(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ arr := in.Fields["array"].GetListValue().Values
+ size := int(in.Fields["group-size"].GetNumberValue())
+ groups := make([][]*structpb.Value, 0)
+
+ for i := 0; i < len(arr); i += size {
+ end := i + size
+ if end > len(arr) {
+ end = len(arr)
+ }
+ groups = append(groups, arr[i:end])
+ }
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["arrays"] = structpb.NewListValue(&structpb.ListValue{Values: make([]*structpb.Value, len(groups))})
+
+ for idx, g := range groups {
+ out.Fields["arrays"].GetListValue().Values[idx] = structpb.NewListValue(&structpb.ListValue{Values: g})
+ }
+
+ return out, nil
+}
diff --git a/pkg/component/generic/collection/v0/task_union.go b/pkg/component/generic/collection/v0/task_union.go
new file mode 100644
index 000000000..839d72135
--- /dev/null
+++ b/pkg/component/generic/collection/v0/task_union.go
@@ -0,0 +1,48 @@
+package collection
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/samber/lo"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/instill-ai/pipeline-backend/pkg/component/base"
+)
+
+func (e *execution) union(in *structpb.Struct, _ *base.Job, _ context.Context) (*structpb.Struct, error) {
+ sets := in.Fields["sets"].GetListValue().Values
+ cache := [][]string{}
+
+ for _, s := range sets {
+ c := []string{}
+ for _, v := range s.GetListValue().Values {
+ b, err := protojson.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ c = append(c, string(b))
+ }
+ cache = append(cache, c)
+ }
+
+ set := &structpb.ListValue{Values: []*structpb.Value{}}
+ un := lo.Union(cache...)
+ for _, u := range un {
+ var a any
+ err := json.Unmarshal([]byte(u), &a)
+ if err != nil {
+ return nil, err
+ }
+ v, err := structpb.NewValue(a)
+ if err != nil {
+ return nil, err
+ }
+ set.Values = append(set.Values, v)
+ }
+
+ out := &structpb.Struct{Fields: make(map[string]*structpb.Value)}
+ out.Fields["set"] = structpb.NewListValue(set)
+ return out, nil
+}