Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial Map-Reduce #491

Merged
merged 17 commits into from
Oct 10, 2024
Merged

Partial Map-Reduce #491

merged 17 commits into from
Oct 10, 2024

Conversation

Gerold103
Copy link
Collaborator

Finalization of #442 from the community. The feature introduces map-reduce on a part of the nodes, depending on the specified bucket IDs. They automatically get located and on the subset of replicasets the map-reduce gets executed.

Compared to the full map_callrw(), this partial one can be faster when covers not all the replicasets.

Upgrade-tests use git to clone the current repository and
check the necessary versions out.

The cloned repository was always saved in vshard/test/var, even
when the actual --var argument for test-run.py was /tmp/var.

Lets better make the copied code also stored by the path given
in --var.

NO_DOC=internal
@Gerold103 Gerold103 self-assigned this Sep 23, 2024
@Gerold103 Gerold103 force-pushed the gerold103/gh-442-map_part_callrw branch from 73fd42f to 8b2a301 Compare September 24, 2024 20:08
@Gerold103 Gerold103 marked this pull request as ready for review September 24, 2024 20:46
Copy link
Contributor

@Serpentian Serpentian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just a couple of philosophical questions and comments)

vshard/router/init.lua Outdated Show resolved Hide resolved
g.test_map_part_single_rs = function(cg)
local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 4)
local res = router_do_map(cg.router, {bids[3], bids[2]},
{123}, {timeout = iwait_timeout})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, how this test even starts on CI. On my machine it's completely broken, iwait_timeout is not defined on the test instance:

Test fails
[001] router-luatest/map_part_test.lua                                [ fail ]
[001] Test failed! Output from reject file /tmp/t/rejects/router-luatest/map_part.reject:
[001] Tarantool version is 3.3.0-entrypoint-99-gf1144c533
[001] TAP version 13
[001] 1..6
[001] # Started on Tue Oct  1 19:37:20 2024
[001] # Starting group: router
[001] not ok 1	router.test_map_part_single_rs
[001] #   ...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:89: variable 'iwait_timeout' is not declared
[001] #   stack traceback:
[001] #   	builtin/strict.lua:32: in function '__index'
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:89: in function 'router.test_map_part_single_rs'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] not ok 2	router.test_map_part_multi_rs
[001] #   ...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:101: variable 'iwait_timeout' is not declared
[001] #   stack traceback:
[001] #   	builtin/strict.lua:32: in function '__index'
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:101: in function 'router.test_map_part_multi_rs'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] not ok 3	router.test_map_part_ref
[001] #   ...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:122: variable 'iwait_timeout' is not declared
[001] #   stack traceback:
[001] #   	builtin/strict.lua:32: in function '__index'
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:122: in function 'router.test_map_part_ref'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] not ok 4	router.test_map_part_double_ref
[001] #   ...entian/Programming/tnt/vshard/test/instances/storage.lua:101: Still have SENT buckets
[001] #   stack traceback:
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:177: in function 'router.test_map_part_double_ref'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] not ok 5	router.test_map_part_map
[001] #   ...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:216: variable 'iwait_timeout' is not declared
[001] #   stack traceback:
[001] #   	builtin/strict.lua:32: in function '__index'
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:216: in function 'router.test_map_part_map'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] not ok 6	router.test_map_part_callrw_raw
[001] #   ...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:249: variable 'iwait_timeout' is not declared
[001] #   stack traceback:
[001] #   	builtin/strict.lua:32: in function '__index'
[001] #   	...ramming/tnt/vshard/test/router-luatest/map_part_test.lua:249: in function 'router.test_map_part_callrw_raw'
[001] #   	...
[001] #   	[C]: in function 'xpcall'
[001] # Ran 6 tests in 51.957 seconds, 0 succeeded, 6 errored

Please, apply the following patch:

Diff
diff --git a/test/router-luatest/map_part_test.lua b/test/router-luatest/map_part_test.lua
index f393aea..322799d 100644
--- a/test/router-luatest/map_part_test.lua
+++ b/test/router-luatest/map_part_test.lua
@@ -1,6 +1,7 @@
 local t = require('luatest')
 local vtest = require('test.luatest_helpers.vtest')
 local vutil = require('vshard.util')
+local wait_timeout = vtest.wait_timeout
 
 local g = t.group('router')
 local cfg_template = {
@@ -86,7 +87,7 @@ end
 g.test_map_part_single_rs = function(cg)
     local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 4)
     local res = router_do_map(cg.router, {bids[3], bids[2]},
-                              {123}, {timeout = iwait_timeout})
+                              {123}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -98,7 +99,7 @@ g.test_map_part_multi_rs = function(cg)
     local bid1 = vtest.storage_first_bucket(cg.replica_1_a)
     local bid2 = vtest.storage_first_bucket(cg.replica_2_a)
     local res = router_do_map(cg.router, {bid1, bid2},
-                              {123}, {timeout = iwait_timeout})
+                              {123}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -119,7 +120,7 @@ g.test_map_part_ref = function(cg)
     end, {bids1[1], bids1[2], cg.rs2_uuid})
     -- The buckets are ACTIVE on rs2, so the partial map should succeed.
     local res = router_do_map(cg.router, {bids1[1], bids1[2]},
-                              {42}, {timeout = iwait_timeout})
+                              {42}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -148,7 +149,7 @@ g.test_map_part_ref = function(cg)
     end, {bids1[1], bids1[2], cg.rs1_uuid})
 
     res = router_do_map(cg.router, {bids1[1], bids1[2]},
-                        {42}, {timeout = iwait_timeout})
+                        {42}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -183,7 +184,7 @@ g.test_map_part_double_ref = function(cg)
     -- 1. ref rs2 and returns the moved bucket;
     -- 2. discover the moved bucket on rs2 and avoid double reference;
     local res = router_do_map(cg.router, {bid1, bid2},
-                              {42}, {timeout = iwait_timeout})
+                              {42}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -213,7 +214,7 @@ g.test_map_part_map = function(cg)
         end
     end)
     local res = router_do_map(cg.router, {bid1, bid2},
-                              {3}, {timeout = iwait_timeout})
+                              {3}, {timeout = wait_timeout})
     t.assert_equals(res.val, nil)
     t.assert_covers(res.err, {
         code = box.error.PROC_LUA,
@@ -229,7 +230,7 @@ g.test_map_part_map = function(cg)
         _G.do_map = _G.old_do_map
         _G.old_do_map = nil
     end)
-    res = router_do_map(cg.router, {bid1, bid2}, {3}, {timeout = iwait_timeout})
+    res = router_do_map(cg.router, {bid1, bid2}, {3}, {timeout = wait_timeout})
     t.assert_equals(res.err, nil, res.err)
     t.assert_equals(res.err_uuid, nil)
     t.assert_equals(res.val, {
@@ -246,7 +247,7 @@ g.test_map_part_callrw_raw = function(cg)
     local bid1 = vtest.storage_first_bucket(cg.replica_1_a)
     local bid2 = vtest.storage_first_bucket(cg.replica_2_a)
     local res = router_do_map(cg.router, {bid1, bid2}, {3},
-                              {timeout = iwait_timeout, return_raw = true})
+                              {timeout = wait_timeout, return_raw = true})
     t.assert_equals(res.val, {
         [cg.rs1_uuid] = {{cg.rs1_uuid, 3}},
         [cg.rs2_uuid] = {{cg.rs2_uuid, 3}},
@@ -263,7 +264,7 @@ g.test_map_part_callrw_raw = function(cg)
         end
     end)
     res = router_do_map(cg.router, {bid1, bid2}, {},
-                        {timeout = iwait_timeout, return_raw = true})
+                        {timeout = wait_timeout, return_raw = true})
     t.assert_equals(res.val, {
         [cg.rs1_uuid] = {{cg.rs1_uuid}},
     })
@@ -276,7 +277,7 @@ g.test_map_part_callrw_raw = function(cg)
         end
     end)
     res = router_do_map(cg.router, {bid1, bid2}, {},
-                        {timeout = iwait_timeout, return_raw = true})
+                        {timeout = wait_timeout, return_raw = true})
     t.assert_equals(res.val, nil)
     t.assert_covers(res.err, {
         code = box.error.PROC_LUA,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. I think it works in some mode in CI, where unknown globals are nil. So those timeouts were all ignored and default. Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And one last time)

diff --git a/test/router-luatest/map_callrw_test.lua b/test/router-luatest/map_callrw_test.lua
index 6e308f2..3c64c0c 100644
--- a/test/router-luatest/map_callrw_test.lua
+++ b/test/router-luatest/map_callrw_test.lua
@@ -355,7 +355,7 @@ g.test_map_all_callrw_raw = function(cg)
     -- Successful map.
     --
     local res = router_do_map(cg.router, {3}, {
-        timeout = iwait_timeout,
+        timeout = vtest.wait_timeout,
         return_raw = true,
     })
     t.assert_equals(res.val, {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this is super wrong. Thanks for noticing, I've created a ticket. It can't keep happening - #492.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still doesn't work. The test is broken on debug version:

[001] not ok 6	router.test_map_part_ref_timeout
[001] #   ...mming/tnt/vshard/test/router-luatest/map_callrw_test.lua:319: variable 'test_ref' is not declared
[001] #   stack traceback:
[001] #   	...mming/tnt/vshard/test/router-luatest/map_callrw_test.lua:317: in function 'router.test_map_part_ref_timeout'
[001] #   	...
[001] #   	[C]: in function 'xpcall'

Please, let's do smth like that:

diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua
index 123a7c8..2418263 100644
--- a/test/luatest_helpers/vtest.lua
+++ b/test/luatest_helpers/vtest.lua
@@ -1,3 +1,5 @@
+require('strict').on()
+
 local t = require('luatest')
 local helpers = require('test.luatest_helpers')
 local cluster = require('test.luatest_helpers.cluster')

This way it will fail on release version too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I've enabled it in a separate commit.

vshard/router/init.lua Outdated Show resolved Hide resolved
vshard/router/init.lua Outdated Show resolved Hide resolved
vshard/router/init.lua Show resolved Hide resolved
@Serpentian Serpentian removed their assignment Oct 1, 2024
@Gerold103 Gerold103 force-pushed the gerold103/gh-442-map_part_callrw branch 2 times, most recently from 623460d to 8b99814 Compare October 4, 2024 17:56
Copy link
Contributor

@Serpentian Serpentian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid work 💪! Let's figure out these comments and call it a day)

vshard/router/init.lua Outdated Show resolved Hide resolved
g.test_map_part_single_rs = function(cg)
local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 4)
local res = router_do_map(cg.router, {bids[3], bids[2]},
{123}, {timeout = iwait_timeout})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And one last time)

diff --git a/test/router-luatest/map_callrw_test.lua b/test/router-luatest/map_callrw_test.lua
index 6e308f2..3c64c0c 100644
--- a/test/router-luatest/map_callrw_test.lua
+++ b/test/router-luatest/map_callrw_test.lua
@@ -355,7 +355,7 @@ g.test_map_all_callrw_raw = function(cg)
     -- Successful map.
     --
     local res = router_do_map(cg.router, {3}, {
-        timeout = iwait_timeout,
+        timeout = vtest.wait_timeout,
         return_raw = true,
     })
     t.assert_equals(res.val, {

vshard/router/init.lua Show resolved Hide resolved
vshard/router/init.lua Outdated Show resolved Hide resolved
vshard/router/init.lua Show resolved Hide resolved
vshard/router/init.lua Outdated Show resolved Hide resolved
vshard/storage/init.lua Outdated Show resolved Hide resolved
vshard/storage/init.lua Show resolved Hide resolved
@Serpentian Serpentian removed their assignment Oct 9, 2024
@Gerold103 Gerold103 force-pushed the gerold103/gh-442-map_part_callrw branch 3 times, most recently from a90ccad to 78c6303 Compare October 9, 2024 22:24
Copy link
Contributor

@Serpentian Serpentian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the patch! It seems I have no more objections.

For the testing we should definitely enable strict mode, probably not in the scope of this patch. But let's at least fix the test, so that it passes on debug tarantool (or on strict release).

Gerold103 and others added 12 commits October 10, 2024 18:45
It makes the unknown variables treated as errors, not as 'nil's.
Otherwise it is too easy to use a wrong variable name somewhere
and get it as nil, and the tests would even pass, but not test
what they are supposed to.

NO_DOC=internal
The only map-reduce function is router.map_callrw(). At some point
there was a task to introduce a new mode of Map-Reduce - partial,
by bucket IDs, not on the whole cluster. For that task was
introduced a new function router.map_part_callrw() which had the
same Map-Reduce and error handling stages. Only Ref stage was
different.

The new helpers in this commit were supposed to reuse some code
between those two map-call functions.

Later it was decided to leave just one map-call function and add a
new option to it. But these new helpers still look useful to have
as separate functions. They make the map-call function really
small and simple.

NO_DOC=internal
NO_TEST=refactoring
It ensures the ref is still in place. A read-only operation. It is
going to be used in the future commits about partial map-reduce.
Router will be going potentially more than once to some storages
and at all times it would ensure the ref is still in place.

NO_DOC=internal
The storage-size of the Partial Map-Reduce feature.

NO_DOC=later
The previous commit was failing some tests. Lets patch them up.
That commit isn't amended so as to keep its original shape in
respect to the external contributor.

NO_DOC=bugfix
Introduce a partial ref-map-reduce API for vshard. It guarantees
that in case of success the function is executed exactly once on
the storages, that contain the given list of buckets.

NO_DOC=later
The previous commit was failing some tests. Lets patch them up.
That commit isn't amended so as to keep its original shape in
respect to the external contributor.

NO_DOC=bugfix
This is useful for RW map-reduce requests which need to send
multiple network requests in parallel to multiple masters.

In-parallel means using is_async netbox feature. But it only
works if the connection is already established.

Which means that the connection establishment ideally must also
be parallel.

NO_DOC=internal
NO_TEST=already covered
There were a number of minor issues with the previous several
commits, like the tests running way too long or some cases not
being covered or the code being non-critically suboptimal.

Lets fix them all. The original commits aren't amended so as to
keep their original shape in respect to the external contributor.

NO_DOC=bugfix
There are 2 Ref-Map-Reduce functions right now - map_callrw() and
map_part_callrw(). Their only difference is that the former refs
the whole cluster, while the latter refs only a subset of
storages. The rest is the same.

There is an idea, that better lets merge these functions into one
and make the bucket IDs an option.

The commit extracts the Ref stages of both functions into separate
helpers which will allow to keep this future single function very
short and simple.

NO_DOC=internal
NO_TEST=refactoring
The behavior is regulated with the new bucket_ids option.

@TarantoolBot document
Title: vshard: `bucket_ids` option for `router.map_callrw()`

The option is an array of numeric bucket IDs. When specified, the
Ref-Map-Reduce is only performed on the masters having at least
one of these buckets. By default all the stages are done on all
masters in the cluster.

Example:
```Lua
-- Assume buckets 1, 2, 3 cover replicasets UUID_A and UUID_B.
res, err = vshard.router.map_callrw(func, args, {bucket_ids = {1, 2, 3}})
assert(res[UUID_A] == {func_result_from_A})
assert(res[UUID_B] == {func_result_from_B})
```
Lets merge all map_callrw() tests into a single file.

NO_DOC=internal
When there were only 2, all cases would either cover a single
replicaset or "all" of them. Lets make them 3, so that some tests
actually cover a part of a cluster which is not just a single
replicaset.

NO_DOC=internal
'moved_buckets' function would treat as "moved" all the buckets
which are not strictly ACTIVE. But that isn't optimal.

Also the 'moved_buckets' func would assume that when ref creation
is started, by the end of it the buckets stay unchanged. That
isn't true.

Thirdly, the moved buckets could contain the destination where
did they move to. Returning this to the router would make the
re-discovery faster.

Fourthly, PINNED buckets were not considered ACTIVE.

The commit fixes all these issues.

Firstly, when a bucket is SENDING, returning an error right away
isn't good. The router would just keep retrying then, without any
progress. The bucket is actually here, it is not moved yet.

Better let the storage try to take a ref. Then one of 2 results
are possible:
- It waits without useless active retries. And then SENDING fails
    and becomes ACTIVE. Ref is taken, all good.
- It waits without useless active retries. SENDING turns into
    SENT. Ref is taken for the other buckets, and this one is
    reported as moved.

Similar logic applies to RECEIVING.

Secondly, after a ref is taken, the not-moved buckets could become
moved. Need to re-check them before returning the ref. Luckily,
the storage can use bucket_generation to avoid this double-check
when nothing changed in _bucket.

NO_DOC=bugfix
During the partial Map-Reduce the router might visit some storages
more than once. Happens when after a ref on storage-A another
storage-B reports A as having taken some buckets.

Then router would come back to A to confirm that. The storage
still must hold its previously created ref in order for such
checks to make any sense. Otherwise any of the previously
confirmed buckets could have had escaped by now.

Without the ref-checking the router could reach the Map stage and
send some Map requests even though could detect earlier, that not
all storages would succeed.

This wasn't strictly speaking a bug, but it was clearly suboptimal
behaviour leading to the requests being executed not on all the
needed storages while the others would report errors.

NO_DOC=internal
It tests not only partial Map-Reduce. It covers a bit of the full
one as well.

NO_DOC=internal
@Gerold103 Gerold103 force-pushed the gerold103/gh-442-map_part_callrw branch from 78c6303 to 08aa380 Compare October 10, 2024 16:53
@Gerold103 Gerold103 merged commit 8283326 into master Oct 10, 2024
12 checks passed
This was referenced Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants