diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c39331a..ecd30154 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- `storage_mode` option for creating a `utube` and `utubettl` tube (#228). + It enables the workaround for slow takes while working with busy tubes. + ### Fixed - Stuck in `INIT` state if an instance failed to enter the `running` mode in time (#226). This fix works only for Tarantool versions >= 2.10.0. +- Slow takes on busy `utube` and `utubettl` tubes (#228). The workaround could + be enabled by passing the `storage_mode = "ready_buffer"` option while + creating the tube. ## [1.3.3] - 2023-09-13 diff --git a/README.md b/README.md index b156944f..95b98e5c 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,10 @@ The main idea of this queue backend is the same as in a `fifo` queue: the tasks are executed in FIFO order. However, tasks may be grouped into sub-queues. +It is advised not to use `utube` methods inside transactions with +`read-confirmed` isolation level. It can lead to errors when trying to make +parallel tube methods calls with mvcc enabled. + The following options can be specified when creating a `utube` queue: * `temporary` - boolean - if true, the contents of the queue do not persist on disk @@ -172,6 +176,38 @@ on disk already exists * `on_task_change` - function name - a callback to be executed on every operation + * `storage_mode` - string - one of + * `queue.driver.utube.STORAGE_MODE_DEFAULT` ("default") - default + implementation of `utube` + * `queue.driver.utube.STORAGE_MODE_READY_BUFFER` + ("ready_buffer") - allows processing `take` requests faster, but + by the cost of `put` operations speed. Right now this option is supported + only for `memtx` engine. + WARNING: this is an experimental storage mode. + + Here is a benchmark comparison of these two modes: + * Benchmark for simple `put` and `take` methods. 30k utubes are created + with a single task each. Task creation time is calculated. After that + 30k consumers are calling `take` + `ack`, each in the separate fiber. + Time to ack all tasks is calculated. The results are as follows: + + | | put (30k) | take+ack | + |---------|-----------|----------| + | default | 180ms | 1.6s | + | ready | 270ms | 1.7s | + * Benchmark for the busy utubes. 10 tubes are created. + Each contains 1000 tasks. After that, 10 consumers are created (each works + on his tube only, one tube — one consumer). Each consumer will + `take`, then `yield` and then `ack` every task from their utube + (1000 tasks each). + After that, we can also run this benchmark with 10k tasks on each utube, + 100k tasks and 150k tasks. But all that with 10 utubes and 10 consumers. + The results are as follows: + + | | 1k | 10k | 50k | 150k | + |---------|-------|------|------|-------| + | default | 53s | 1.5h | 100h | 1000h | + | ready | 450ms | 4.7s | 26s | 72s | The following options can be specified when putting a task in a `utube` queue: @@ -208,6 +244,10 @@ in strict FIFO order. This queue type is effectively a combination of `fifottl` and `utube`. +It is advised not to use `utubettl` methods inside transactions with +`read-confirmed` isolation level. It can lead to errors when trying to make +parallel tube methods calls with mvcc enabled. + The following options can be specified when creating a `utubettl` queue: * `temporary` - boolean - if true, the contents of the queue do not persist on disk @@ -215,9 +255,42 @@ on disk already exists * `on_task_change` - function name - a callback to be executed on every operation + * `storage_mode` - string - one of + * `queue.driver.utubettl.STORAGE_MODE_DEFAULT` ("default") - default + implementation of `utubettl` + * `queue.driver.utubettl.STORAGE_MODE_READY_BUFFER` + ("ready_buffer") - allows processing `take` requests faster, but + by the cost of `put` operations speed. Right now this option is supported + only for `memtx` engine. + WARNING: this is an experimental storage mode. + + Here is a benchmark comparison of these two modes: + * Benchmark for simple `put` and `take` methods. 30k utubes are created + with a single task each. Task creation time is calculated. After that + 30k consumers are calling `take` + `ack`, each in the separate fiber. + Time to ack all tasks is calculated. The results are as follows: + + | | put (30k) | take+ack | + |---------|-----------|----------| + | default | 200ms | 1.7s | + | ready | 320ms | 1.8s | + * Benchmark for the busy utubes. 10 tubes are created. + Each contains 1000 tasks. After that, 10 consumers are created (each works + on his tube only, one tube — one consumer). Each consumer will + `take`, then `yield` and then `ack` every task from their utube + (1000 tasks each). + After that, we can also run this benchmark with 10k tasks on each utube, + 100k tasks and 140k tasks. But all that with 10 utubes and 10 consumers. + The results are as follows: + + | | 1k | 10k | 50k | 140k | + |---------|-------|------|------|-------| + | default | 80s | 1.6h | 100h | 1000h | + | ready | 520ms | 5.4s | 28s | 83s | The following options can be specified when putting a task in a `utubettl` queue: + * `pri` - task priority (`0` is the highest priority and is the default) * `utube` - the name of the sub-queue * `ttl` - numeric - time to live for a task put into the queue, in seconds. if `ttl` is not specified, it is set to infinity diff --git a/queue/abstract.lua b/queue/abstract.lua index d8dcff88..3e441d5e 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -295,9 +295,12 @@ function tube.drop(self) error("There are taken tasks in the tube") end - local space_name = tube[3] - - box.space[space_name]:drop() + if self.raw.drop ~= nil then + self.raw:drop() + else + local space_name = tube[3] + box.space[space_name]:drop() + end box.space._queue:delete{tube_name} -- drop queue queue.tube[tube_name] = nil diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index d3b7e006..b93a8db7 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -5,6 +5,9 @@ local str_type = require('queue.compat').str_type local tube = {} local method = {} +tube.STORAGE_MODE_DEFAULT = "default" +tube.STORAGE_MODE_READY_BUFFER = "ready_buffer" + local i_status = 2 -- validate space of queue @@ -19,6 +22,18 @@ local function validate_space(space) end end +-- validate ready buffer space of queue +local function validate_space_ready_buffer(space) + -- check indexes + local indexes = {'task_id', 'utube'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) local space_opts = {} @@ -57,13 +72,56 @@ function tube.create_space(space_name, opts) end -- start tube on space -function tube.new(space, on_task_change) +function tube.new(space, on_task_change, opts) validate_space(space) + local space_opts = {} + space_opts.temporary = opts.temporary or false + space_opts.engine = opts.engine or 'memtx' + space_opts.format = { + {name = 'task_id', type = num_type()}, + {name = 'utube', type = str_type()} + } + + local space_ready_buffer_name = space.name .. "_ready_buffer" + local space_ready_buffer = box.space[space_ready_buffer_name] + -- Feature implemented only for memtx engine for now. + -- https://github.com/tarantool/queue/issues/230. + if opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER and opts.engine == 'vinyl' then + error(string.format('"%s" storage mode cannot be used with vinyl engine', + tube.STORAGE_MODE_READY_BUFFER)) + end + + local ready_space_mode = (opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER) + if ready_space_mode then + if space_ready_buffer == nil then + -- Create a space for first ready tasks from each utube. + space_ready_buffer = box.schema.create_space(space_ready_buffer_name, space_opts) + space_ready_buffer:create_index('task_id', { + type = 'tree', + parts = {1, num_type()}, + unique = true, + }) + space_ready_buffer:create_index('utube', { + type = 'tree', + parts = {2, str_type()}, + unique = true, + }) + else + validate_space_ready_buffer(space_ready_buffer) + if space:len() == 0 then + space_ready_buffer:truncate() + end + end + end + on_task_change = on_task_change or (function() end) local self = setmetatable({ - space = space, - on_task_change = on_task_change, + space = space, + space_ready_buffer = space_ready_buffer, + on_task_change = on_task_change, + ready_space_mode = ready_space_mode, + opts = opts, }, { __index = method }) return self end @@ -73,43 +131,128 @@ function method.normalize_task(self, task) return task and task:transform(3, 1) end +-- Find the first ready task for given 'utube'. +-- Utube is also checked for the absence of 'TAKEN' tasks. +local function put_next_ready(self, utube) + local taken = self.space.index.utube:min{state.TAKEN, utube} + if taken == nil or taken[2] ~= state.TAKEN then + local next_task = self.space.index.utube:min{state.READY, utube} + if next_task == nil or next_task[2] ~= state.READY then + return + end + -- Ignoring ER_TUPLE_FOUND error, if a tuple with the same task_id + -- or utube name is already in the space. + -- Note that both task_id and utube indexes are unique, so there will be + -- no duplicates: each task_id can occur in the space not more than once, + -- there can be no more than one task from each utube in a space. + pcall(self.space_ready_buffer.insert, self.space_ready_buffer, {next_task[1], utube}) + end +end + +-- Put this task into ready_buffer. +-- Utube is also checked for the absence of 'TAKEN' tasks. +local function put_ready(self, id, utube) + local taken = self.space.index.utube:min{state.TAKEN, utube} + if taken == nil or taken[2] ~= state.TAKEN then + -- Ignoring ER_TUPLE_FOUND error, if a tuple with the same task_id + -- or utube name is already in the space. + -- Note that both task_id and utube indexes are unique, so there will be + -- no duplicates: each task_id can occur in the space not more than once, + -- there can be no more than one task from each utube in a space. + pcall(self.space_ready_buffer.insert, self.space_ready_buffer, {id, utube}) + end +end + +local function commit() + box.commit() +end + +local function empty() +end + +-- Start transaction with the correct options, if the transaction is not already running. +local function begin_if_not_in_txn() + local transaction_opts = {} + if box.cfg.memtx_use_mvcc_engine then + transaction_opts = {txn_isolation = 'read-committed'} + end + + if not box.is_in_txn() then + box.begin(transaction_opts) + return commit + else + return empty + end +end + -- put task in space function method.put(self, data, opts) - local max - - -- Taking the maximum of the index is an implicit transactions, so it is + -- Taking the minimum is an implicit transactions, so it is -- always done with 'read-confirmed' mvcc isolation level. - -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. - -- It is hapenning because 'max' for several puts in parallel will be the same since + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since -- read confirmed isolation level makes visible all transactions that finished the commit. -- To fix it we wrap it with box.begin/commit and set right isolation level. -- Current fix does not resolve that bug in situations when we already are in transaction -- since it will open nested transactions. -- See https://github.com/tarantool/queue/issues/207 -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + local commit_func = begin_if_not_in_txn() - if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then - box.begin({txn_isolation = 'read-committed'}) - max = self.space.index.task_id:max() - box.commit() - else - max = self.space.index.task_id:max() - end + local max = self.space.index.task_id:max() local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} + if self.ready_space_mode then + put_ready(self, task[1], task[3]) + end + + commit_func() + self.on_task_change(task, 'put') return task end --- take task -function method.take(self) +-- Take the first task form the ready_buffer. +local function take_ready(self) + while true do + local commit_func = begin_if_not_in_txn() + + local task_ready = self.space_ready_buffer.index.task_id:min() + if task_ready == nil then + commit_func() + return nil + end + + local id = task_ready[1] + local task = self.space:get(id) + local take_complete = false + + if task[2] == state.READY then + local taken = self.space.index.utube:min{state.TAKEN, task[3]} + + if taken == nil or taken[2] ~= state.TAKEN then + task = self.space:update(id, { { '=', 2, state.TAKEN } }) + self.space_ready_buffer:delete(id) + take_complete = true + end + end + + commit_func() + + if take_complete then + self.on_task_change(task, 'take') + return task + end + end +end + +local function take(self) for s, task in self.space.index.status:pairs(state.READY, - { iterator = 'GE' }) do + { iterator = 'GE' }) do if task[2] ~= state.READY then break end - local taken -- Taking the minimum is an implicit transactions, so it is -- always done with 'read-confirmed' mvcc isolation level. -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. @@ -120,62 +263,128 @@ function method.take(self) -- since it will open nested transactions. -- See https://github.com/tarantool/queue/issues/207 -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ - if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then - box.begin({txn_isolation = 'read-committed'}) - taken = self.space.index.utube:min{state.TAKEN, task[3]} - box.commit() - else - taken = self.space.index.utube:min{state.TAKEN, task[3]} - end + local commit_func = begin_if_not_in_txn() + local taken = self.space.index.utube:min{state.TAKEN, task[3]} + local take_complete = false if taken == nil or taken[2] ~= state.TAKEN then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) + take_complete = true + end + + commit_func() + if take_complete then self.on_task_change(task, 'take') return task end end end +-- take task +function method.take(self) + if self.ready_space_mode then + return take_ready(self) + end + return take(self) +end + -- touch task function method.touch(self, id, ttr) error('utube queue does not support touch') end +-- Delete task from the ready_buffer and find next ready task from the same 'utube' to replace it. +local function delete_ready(self, id, utube) + self.space_ready_buffer:delete(id) + put_next_ready(self, utube) +end + -- delete task function method.delete(self, id) + local commit_func = begin_if_not_in_txn() + local task = self.space:get(id) self.space:delete(id) if task ~= nil then + if self.ready_space_mode then + if task[2] == state.TAKEN then + put_next_ready(self, task[3]) + elseif task[2] == state.READY then + delete_ready(self, id, task[3]) + end + end + task = task:transform(2, 1, state.DONE) local neighbour = self.space.index.utube:min{state.READY, task[3]} + + commit_func() + self.on_task_change(task, 'delete') if neighbour then self.on_task_change(neighbour) end + return task end + + commit_func() return task end -- release task function method.release(self, id, opts) + local commit_func = begin_if_not_in_txn() + local task = self.space:update(id, {{ '=', 2, state.READY }}) if task ~= nil then + if self.ready_space_mode then + local inserted, err = + pcall(self.space_ready_buffer.insert, self.space_ready_buffer, {id, task[3]}) + if not inserted then + require('log').warn( + 'queue: [tube "utube"] insert after release error: %s', err) + delete_ready(self, task[1], task[3]) + end + end + + commit_func() + self.on_task_change(task, 'release') + return task end + + commit_func() return task end -- bury task function method.bury(self, id) + local commit_func = begin_if_not_in_txn() + + local current_task = self.space:get{id} local task = self.space:update(id, {{ '=', 2, state.BURIED }}) if task ~= nil then + if self.ready_space_mode then + local status = current_task[2] + local ready_task = self.space_ready_buffer:get{task[1]} + if ready_task ~= nil then + delete_ready(self, id, task[3]) + elseif status == state.TAKEN then + put_next_ready(self, task[3]) + end + end + local neighbour = self.space.index.utube:min{state.READY, task[3]} + + commit_func() + self.on_task_change(task, 'bury') if neighbour and neighbour[i_status] == state.READY then self.on_task_change(neighbour) end else + commit_func() + self.on_task_change(task, 'bury') end return task @@ -184,6 +393,8 @@ end -- unbury several tasks function method.kick(self, count) for i = 1, count do + local commit_func = begin_if_not_in_txn() + local task = self.space.index.status:min{ state.BURIED } if task == nil then return i - 1 @@ -193,6 +404,20 @@ function method.kick(self, count) end task = self.space:update(task[1], {{ '=', 2, state.READY }}) + if self.ready_space_mode then + local prev_task = self.space_ready_buffer.index.utube:get{task[3]} + if prev_task ~= nil then + if prev_task[1] > task[1] then + self.space_ready_buffer:delete(prev_task[1]) + self.space_ready_buffer:insert({task[1], task[2]}) + end + else + put_ready(self, task[3]) + end + end + + commit_func() + self.on_task_change(task, 'kick') end return count @@ -210,6 +435,9 @@ end function method.truncate(self) self.space:truncate() + if self.ready_space_mode then + self.space_ready_buffer:truncate() + end end -- This driver has no background activity. @@ -222,4 +450,12 @@ function method.stop() return end +function method.drop(self) + self:stop() + box.space[self.space.name]:drop() + if self.ready_space_mode then + box.space[self.space_ready_buffer.name]:drop() + end +end + return tube diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 51b0b612..0bcdaf93 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -11,6 +11,9 @@ local str_type = qc.str_type local tube = {} local method = {} +tube.STORAGE_MODE_DEFAULT = "default" +tube.STORAGE_MODE_READY_BUFFER = "ready_buffer" + local i_id = 1 local i_status = 2 local i_next_event = 3 @@ -29,7 +32,7 @@ end -- validate space of queue local function validate_space(space) -- check indexes - local indexes = {'task_id', 'status', 'utube', 'watch'} + local indexes = {'task_id', 'status', 'utube', 'watch', 'utube_pri'} for _, index in pairs(indexes) do if space.index[index] == nil then error(string.format('space "%s" does not have "%s" index', @@ -38,6 +41,18 @@ local function validate_space(space) end end +-- validate ready buffer space of queue +local function validate_space_ready_buffer(space) + -- check indexes + local indexes = {'task_id', 'utube', 'pri'} + for _, index in pairs(indexes) do + if space.index[index] == nil then + error(string.format('space "%s" does not have "%s" index', + space.name, index)) + end + end +end + -- create space function tube.create_space(space_name, opts) opts.ttl = opts.ttl or util.MAX_TIMEOUT @@ -87,6 +102,10 @@ function tube.create_space(space_name, opts) type = 'tree', parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()} }) + space:create_index('utube_pri', { + type = 'tree', + parts = {i_status, str_type(), i_utube, str_type(), i_pri, num_type(), i_id, num_type()} + }) return space end @@ -94,11 +113,99 @@ local delayed_state = { state.DELAYED } local ttl_states = { state.READY, state.BURIED } local ttr_state = { state.TAKEN } +-- Find the first ready task for given 'utube'. +-- Utube is also checked for the absence of 'TAKEN' tasks. +local function put_next_ready(self, utube) + local taken = self.space.index.utube:min{state.TAKEN, utube} + if taken == nil or taken[i_status] ~= state.TAKEN then + local next_task = self.space.index.utube_pri:min{state.READY, utube} + if next_task == nil or next_task[i_status] ~= state.READY then + return + end + -- Ignoring ER_TUPLE_FOUND error, if a tuple with the same task_id + -- or utube name is already in the space. + -- Note that both task_id and utube indexes are unique, so there will be + -- no duplicates: each task_id can occur in the space not more than once, + -- there can be no more than one task from each utube in a space. + pcall(self.space_ready_buffer.insert, self.space_ready_buffer, {next_task[i_id], utube, next_task[i_pri]}) + end +end + +-- Check if given task has lowest priority in the ready_buffer for the given 'utube'. +-- If so, current task for the given 'utube' is replaced in the ready_buffer. +-- Utube is also checked for the absence of 'TAKEN' tasks. +local function put_ready(self, id, utube, pri) + local taken = self.space.index.utube:min{state.TAKEN, utube} + if taken == nil or taken[i_status] ~= state.TAKEN then + local current_task = self.space.index.utube_pri:min{state.READY, utube} + if current_task[i_status] ~= state.READY or + current_task[i_pri] < pri or (current_task[i_pri] == pri and current_task[i_id] < id) then + return + end + if current_task[i_pri] > pri then + self.space_ready_buffer:delete(current_task[id]) + end + -- Ignoring ER_TUPLE_FOUND error, if a tuple with the same task_id + -- or utube name is already in the space. + -- Note that both task_id and utube indexes are unique, so there will be + -- no duplicates: each task_id can occur in the space not more than once, + -- there can be no more than one task from each utube in a space. + pcall(self.space_ready_buffer.insert, self.space_ready_buffer, {id, utube, pri}) + end +end + +-- Delete task from the ready_buffer and find next ready task from the same 'utube' to replace it. +local function delete_ready(self, id, utube) + self.space_ready_buffer:delete(id) + put_next_ready(self, utube) +end + +-- Try to update the current task in ready_buffer for the given 'utube'. +local function update_ready(self, id, utube, pri) + local prev_task = self.space_ready_buffer.index.utube:get{utube} + if prev_task ~= nil then + if prev_task[3] > pri or (prev_task[3] == pri and prev_task[1] > id) then + self.space_ready_buffer:delete(prev_task[1]) + self.space_ready_buffer:insert({id, utube, pri}) + end + else + put_ready(self, id, utube, pri) + end +end + +local function commit() + box.commit() +end + +local function empty() +end + +-- Start transaction with the correct options, if the transaction is not already running +-- and current engine is not 'vinyl'. +local function begin_if_not_in_txn(self) + local transaction_opts = {} + if box.cfg.memtx_use_mvcc_engine then + transaction_opts = {txn_isolation = 'read-committed'} + end + + -- Implemented only for memtx engine for now. + -- https://github.com/tarantool/queue/issues/230. + if not box.is_in_txn() and self.opts.engine ~= 'vinyl' then + box.begin(transaction_opts) + return commit + else + return empty + end +end + local function utubettl_fiber_iteration(self, processed) local now = util.time() local task = nil local estimated = util.MAX_TIMEOUT + local commit_func = begin_if_not_in_txn(self) + local commited = false + -- delayed tasks task = self.space.index.watch:min(delayed_state) if task and task[i_status] == state.DELAYED then @@ -107,6 +214,13 @@ local function utubettl_fiber_iteration(self, processed) { '=', i_status, state.READY }, { '=', i_next_event, task[i_created] + task[i_ttl] } }) + + if self.ready_space_mode then + update_ready(self, task[i_id], task[i_utube], task[i_pri]) + end + commit_func() + commited = true + self:on_task_change(task, 'delayed') estimated = 0 processed = processed + 1 @@ -114,6 +228,9 @@ local function utubettl_fiber_iteration(self, processed) estimated = tonumber(task[i_next_event] - now) / 1000000 end end + if not commited then + commit_func() + end -- ttl tasks for _, state in pairs(ttl_states) do @@ -131,6 +248,8 @@ local function utubettl_fiber_iteration(self, processed) end end + commit_func = begin_if_not_in_txn(self) + commited = false -- ttr tasks task = self.space.index.watch:min(ttr_state) if task and task[i_status] == state.TAKEN then @@ -139,6 +258,13 @@ local function utubettl_fiber_iteration(self, processed) { '=', i_status, state.READY }, { '=', i_next_event, task[i_created] + task[i_ttl] } }) + + if self.ready_space_mode then + put_ready(self, task[i_id], task[i_utube], task[i_pri]) + end + commit_func() + commited = true + self:on_task_change(task, 'ttr') estimated = 0 processed = processed + 1 @@ -147,6 +273,9 @@ local function utubettl_fiber_iteration(self, processed) estimated = et < estimated and et or estimated end end + if not commited then + commit_func() + end if estimated > 0 or processed > 1000 then -- free refcounter @@ -176,12 +305,11 @@ local function utubettl_fiber(self) elseif stat then processed = err end - else - -- When switching the master to the replica, the fiber will be stopped. - if self.sync_chan:get(0.1) ~= nil then - log.info("Queue utubettl fiber was stopped") - break - end + end + + if self.sync_chan:get(0.1) ~= nil then + log.info("Queue utubettl fiber was stopped") + break end end end @@ -190,22 +318,69 @@ end function tube.new(space, on_task_change, opts) validate_space(space) + local space_ready_buffer_name = space.name .. "_ready_buffer" + local space_ready_buffer = box.space[space_ready_buffer_name] + -- Feature implemented only for memtx engine for now. + -- https://github.com/tarantool/queue/issues/230. + if opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER and opts.engine == 'vinyl' then + error(string.format('"%s" storage mode cannot be used with vinyl engine', + tube.STORAGE_MODE_READY_BUFFER)) + end + + local ready_space_mode = (opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER or false) + if ready_space_mode then + if space_ready_buffer == nil then + local space_opts = {} + space_opts.temporary = opts.temporary or false + space_opts.engine = opts.engine or 'memtx' + space_opts.format = { + {name = 'task_id', type = num_type()}, + {name = 'utube', type = str_type()}, + {name = 'pri', type = num_type()}, + } + + -- Create a space for first ready tasks from each utube. + space_ready_buffer = box.schema.create_space(space_ready_buffer_name, space_opts) + space_ready_buffer:create_index('task_id', { + type = 'tree', + parts = {1, num_type()}, + unique = true, + }) + space_ready_buffer:create_index('utube', { + type = 'tree', + parts = {2, str_type()}, + unique = true, + }) + space_ready_buffer:create_index('pri', { + type = 'tree', + parts = {3, num_type(), 1, num_type()}, + }) + else + validate_space_ready_buffer(space_ready_buffer) + if space:len() == 0 then + space_ready_buffer:truncate() + end + end + end + on_task_change = on_task_change or (function() end) local self = setmetatable({ - space = space, - on_task_change = function(self, task, stat_data) + space = space, + space_ready_buffer = space_ready_buffer, + on_task_change = function(self, task, stat_data) -- wakeup fiber if task ~= nil and self.fiber ~= nil then self.cond:signal(self.fiber:id()) end on_task_change(task, stat_data) end, - opts = opts, + opts = opts, + ready_space_mode = ready_space_mode, }, { __index = method }) self.cond = qc.waiter() self.fiber = fiber.create(utubettl_fiber, self) - self.sync_chan = fiber.channel() + self.sync_chan = fiber.channel(1) return self end @@ -217,27 +392,19 @@ end -- put task in space function method.put(self, data, opts) - local max - - -- Taking the maximum of the index is an implicit transactions, so it is + -- Taking the minimum is an implicit transactions, so it is -- always done with 'read-confirmed' mvcc isolation level. - -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. - -- It is hapenning because 'max' for several puts in parallel will be the same since + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since -- read confirmed isolation level makes visible all transactions that finished the commit. -- To fix it we wrap it with box.begin/commit and set right isolation level. -- Current fix does not resolve that bug in situations when we already are in transaction -- since it will open nested transactions. -- See https://github.com/tarantool/queue/issues/207 -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + local commit_func = begin_if_not_in_txn(self) - if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then - box.begin({txn_isolation = 'read-committed'}) - max = self.space.index.task_id:max() - box.commit() - else - max = self.space.index.task_id:max() - end - + local max = self.space.index.task_id:max() local id = max and max[i_id] + 1 or 0 local status @@ -267,6 +434,12 @@ function method.put(self, data, opts) tostring(opts.utube), data } + if self.ready_space_mode and status == state.READY then + put_ready(self, task[i_id], task[i_utube], task[i_pri]) + end + + commit_func() + self:on_task_change(task, 'put') return task end @@ -293,14 +466,60 @@ function method.touch(self, id, delta) return task end --- take task -function method.take(self) +-- Take the first task form the ready_buffer. +local function take_ready(self) + while true do + local commit_func = begin_if_not_in_txn(self) + + local task_ready = self.space_ready_buffer.index.pri:min() + if task_ready == nil then + commit_func() + return nil + end + + local id = task_ready[1] + local task = self.space:get(id) + local take_complete = false + local take_ttl = false + + if task[i_status] == state.READY then + if not is_expired(task) then + local next_event = util.time() + task[i_ttr] + + local taken = self.space.index.utube:min{state.TAKEN, task[i_utube]} + + if taken == nil or taken[i_status] ~= state.TAKEN then + task = self.space:update(task[1], { + { '=', i_status, state.TAKEN }, + { '=', i_next_event, next_event } + }) + + self.space_ready_buffer:delete(task[i_id]) + take_complete = true + end + else + task = self:delete(task[i_id]):transform(2, 1, state.DONE) + take_ttl = true + end + end + + commit_func() + + if take_complete then + self:on_task_change(task, 'take') + return task + elseif take_ttl then + self:on_task_change(task, 'ttl') + end + end +end + +local function take(self) for s, t in self.space.index.status:pairs(state.READY, {iterator = 'GE'}) do if t[2] ~= state.READY then break elseif not is_expired(t) then local next_event = util.time() + t[i_ttr] - local taken -- Taking the minimum is an implicit transactions, so it is -- always done with 'read-confirmed' mvcc isolation level. -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. @@ -311,19 +530,20 @@ function method.take(self) -- since it will open nested transactions. -- See https://github.com/tarantool/queue/issues/207 -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ - if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then - box.begin({txn_isolation = 'read-committed'}) - taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} - box.commit() - else - taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} - end + local commit_func = begin_if_not_in_txn(self) + local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + local take_complete = false if taken == nil or taken[i_status] ~= state.TAKEN then t = self.space:update(t[1], { { '=', i_status, state.TAKEN }, { '=', i_next_event, next_event } }) + take_complete = true + end + + commit_func() + if take_complete then self:on_task_change(t, 'take') return t end @@ -331,6 +551,14 @@ function method.take(self) end end +-- take task +function method.take(self) + if self.ready_space_mode then + return take_ready(self) + end + return take(self) +end + local function process_neighbour(self, task, operation) self:on_task_change(task, operation) if task ~= nil then @@ -347,11 +575,25 @@ end -- delete task function method.delete(self, id) + local commit_func = begin_if_not_in_txn(self) + local task = self.space:get(id) if task ~= nil then local is_taken = task[i_status] == state.TAKEN self.space:delete(id) + + if self.ready_space_mode then + if task[i_status] == state.TAKEN then + put_next_ready(self, task[i_utube]) + elseif task[i_status] == state.READY then + delete_ready(self, id, task[i_utube]) + end + end + task = task:transform(i_status, 1, state.DONE) + + commit_func() + if is_taken then return process_neighbour(self, task, 'delete') else @@ -359,12 +601,17 @@ function method.delete(self, id) return task end end + + commit_func() end -- release task function method.release(self, id, opts) + local commit_func = begin_if_not_in_txn(self) + local task = self.space:get{id} if task == nil then + commit_func() return end if opts.delay ~= nil and opts.delay > 0 then @@ -374,6 +621,12 @@ function method.release(self, id, opts) { '+', i_ttl, util.time(opts.delay) } }) if task ~= nil then + if self.ready_space_mode then + put_next_ready(self, task[i_utube]) + end + + commit_func() + return process_neighbour(self, task, 'release') end else @@ -381,25 +634,45 @@ function method.release(self, id, opts) { '=', i_status, state.READY }, { '=', i_next_event, util.time(task[i_created] + task[i_ttl]) } }) + + if self.ready_space_mode and task ~= nil then + put_ready(self, task[i_id], task[i_utube], task[i_pri]) + end end + + commit_func() self:on_task_change(task, 'release') return task end -- bury task function method.bury(self, id) + local commit_func = begin_if_not_in_txn(self) + -- The `i_next_event` should be updated because if the task has been -- "buried" after it was "taken" (and the task has "ttr") when the time in -- `i_next_event` will be interpreted as "ttl" in `utubettl_fiber_iteration` -- and the task will be deleted. local task = self.space:get{id} if task == nil then + commit_func() return end + + local status = task[i_status] task = self.space:update(id, { { '=', i_status, state.BURIED }, { '=', i_next_event, task[i_created] + task[i_ttl] } }) + if self.ready_space_mode then + if status == state.READY then + delete_ready(self, id, task[i_utube]) + elseif status == state.TAKEN then + put_next_ready(self, task[i_utube]) + end + end + + commit_func() return process_neighbour( self, task:transform(i_status, 1, state.BURIED), 'bury' @@ -409,15 +682,27 @@ end -- unbury several tasks function method.kick(self, count) for i = 1, count do + local commit_func = begin_if_not_in_txn(self) + local task = self.space.index.status:min{ state.BURIED } if task == nil then + commit_func() + return i - 1 end if task[i_status] ~= state.BURIED then + commit_func() + return i - 1 end task = self.space:update(task[i_id], {{ '=', i_status, state.READY }}) + if self.ready_space_mode then + update_ready(self, task[i_id], task[i_utube], task[i_pri]) + end + + commit_func() + self:on_task_change(task, 'kick') end return count @@ -435,6 +720,9 @@ end function method.truncate(self) self.space:truncate() + if self.ready_space_mode then + self.space_ready_buffer:truncate() + end end function method.start(self) @@ -453,4 +741,12 @@ function method.stop(self) self.fiber = nil end +function method.drop(self) + self:stop() + box.space[self.space.name]:drop() + if self.ready_space_mode then + box.space[self.space_ready_buffer.name]:drop() + end +end + return tube diff --git a/t/030-utube.t b/t/030-utube.t index ffd92136..a5b5057d 100755 --- a/t/030-utube.t +++ b/t/030-utube.t @@ -18,125 +18,182 @@ test:ok(queue, 'queue is loaded') local tube = queue.create_tube('test', 'utube', { engine = engine }) local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine }) +local tube_ready, tube2_ready +if engine ~= 'vinyl' then + tube_ready = queue.create_tube('test_ready', 'utube', + { engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER }) + tube2_ready = queue.create_tube('test_stat_ready', 'utube', + { engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER }) +end test:ok(tube, 'test tube created') test:is(tube.name, 'test', 'tube.name') test:is(tube.type, 'utube', 'tube.type') test:test('Utube statistics', function(test) - test:plan(13) - tube2:put('stat_0') - tube2:put('stat_1') - tube2:put('stat_2') - tube2:put('stat_3') - tube2:put('stat_4') - tube2:delete(4) - tube2:take(.001) - tube2:release(0) - tube2:take(.001) - tube2:ack(0) - tube2:bury(1) - tube2:bury(2) - tube2:kick(1) - tube2:take(.001) - - local stats = queue.statistics('test_stat') - - -- check tasks statistics - test:is(stats.tasks.taken, 1, 'tasks.taken') - test:is(stats.tasks.buried, 1, 'tasks.buried') - test:is(stats.tasks.ready, 1, 'tasks.ready') - test:is(stats.tasks.done, 2, 'tasks.done') - test:is(stats.tasks.delayed, 0, 'tasks.delayed') - test:is(stats.tasks.total, 3, 'tasks.total') - - -- check function call statistics - test:is(stats.calls.delete, 1, 'calls.delete') - test:is(stats.calls.ack, 1, 'calls.ack') - test:is(stats.calls.take, 3, 'calls.take') - test:is(stats.calls.kick, 1, 'calls.kick') - test:is(stats.calls.bury, 2, 'calls.bury') - test:is(stats.calls.put, 5, 'calls.put') - test:is(stats.calls.release, 1, 'calls.release') + if engine ~= 'vinyl' then + test:plan(13 * 2) + else + test:plan(13) + end + for _, tube_stat in ipairs({tube2, tube2_ready}) do + if tube_stat == nil then + break + end + + tube_stat:put('stat_0') + tube_stat:put('stat_1') + tube_stat:put('stat_2') + tube_stat:put('stat_3') + tube_stat:put('stat_4') + tube_stat:delete(4) + tube_stat:take(.001) + tube_stat:release(0) + tube_stat:take(.001) + tube_stat:ack(0) + tube_stat:bury(1) + tube_stat:bury(2) + tube_stat:kick(1) + tube_stat:take(.001) + + local stats = queue.statistics(tube_stat.name) + + -- check tasks statistics + test:is(stats.tasks.taken, 1, 'tasks.taken') + test:is(stats.tasks.buried, 1, 'tasks.buried') + test:is(stats.tasks.ready, 1, 'tasks.ready') + test:is(stats.tasks.done, 2, 'tasks.done') + test:is(stats.tasks.delayed, 0, 'tasks.delayed') + test:is(stats.tasks.total, 3, 'tasks.total') + + -- check function call statistics + test:is(stats.calls.delete, 1, 'calls.delete') + test:is(stats.calls.ack, 1, 'calls.ack') + test:is(stats.calls.take, 3, 'calls.take') + test:is(stats.calls.kick, 1, 'calls.kick') + test:is(stats.calls.bury, 2, 'calls.bury') + test:is(stats.calls.put, 5, 'calls.put') + test:is(stats.calls.release, 1, 'calls.release') + end end) test:test('Easy put/take/ack', function(test) - test:plan(12) - - test:ok(tube:put(123, {utube = 1}), 'task was put') - test:ok(tube:put(345, {utube = 1}), 'task was put') - local task = tube:take() - test:ok(task, 'task was taken') - test:is(task[2], state.TAKEN, 'task status') - test:is(task[3], 123, 'task.data') - test:ok(tube:take(.1) == nil, 'second task was not taken (the same tube)') - - task = tube:ack(task[1]) - test:ok(task, 'task was acked') - test:is(task[2], '-', 'task status') - test:is(task[3], 123, 'task.data') - - task = tube:take(.1) - test:ok(task, 'task2 was taken') - test:is(task[3], 345, 'task.data') - test:is(task[2], state.TAKEN, 'task.status') + if engine ~= 'vinyl' then + test:plan(12 * 2) + else + test:plan(12) + end + + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end + + test:ok(test_tube:put(123, {utube = 1}), 'task was put') + test:ok(test_tube:put(345, {utube = 1}), 'task was put') + local task = test_tube:take() + test:ok(task, 'task was taken') + test:is(task[2], state.TAKEN, 'task status') + test:is(task[3], 123, 'task.data') + test:ok(test_tube:take(.1) == nil, 'second task was not taken (the same tube)') + + task = test_tube:ack(task[1]) + test:ok(task, 'task was acked') + test:is(task[2], '-', 'task status') + test:is(task[3], 123, 'task.data') + + task = test_tube:take(.1) + test:ok(task, 'task2 was taken') + test:is(task[3], 345, 'task.data') + test:is(task[2], state.TAKEN, 'task.status') + end end) test:test('ack in utube', function(test) - test:plan(8) - - test:ok(tube:put(123, {utube = 'abc'}), 'task was put') - test:ok(tube:put(345, {utube = 'abc'}), 'task was put') + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'second task was taken') - test:is(taken[3], 345, 'task.data') - state = state + 1 - end) + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken') - test:is(taken[3], 123, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:ack(taken[1]) - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + test:ok(test_tube:put(123, {utube = 'abc'}), 'task was put') + test:ok(test_tube:put(345, {utube = 'abc'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'second task was taken') + test:is(taken[3], 345, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken') + test:is(taken[3], 123, 'task.data') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:ack(taken[1]) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('bury in utube', function(test) - test:plan(8) - - test:ok(tube:put(567, {utube = 'cde'}), 'task was put') - test:ok(tube:put(789, {utube = 'cde'}), 'task was put') + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'second task was taken') - test:is(taken[3], 789, 'task.data') - state = state + 1 - end) + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken') - test:is(taken[3], 567, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:bury(taken[1]) - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + test:ok(test_tube:put(567, {utube = 'cde'}), 'task was put') + test:ok(test_tube:put(789, {utube = 'cde'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'second task was taken') + test:is(taken[3], 789, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken') + test:is(taken[3], 567, 'task.data') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:bury(taken[1]) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('instant bury', function(test) - test:plan(1) + if engine ~= 'vinyl' then + test:plan(1 * 2) + else + test:plan(1) + end tube:put(1, {ttr=60}) local taken = tube:take(.1) test:is(tube:bury(taken[1])[2], '!', 'task is buried') + + if tube_ready ~= nil then + tube_ready:put(1, {ttr=60}) + local taken = tube_ready:take(.1) + test:is(tube_ready:bury(taken[1])[2], '!', 'task is buried') + end end) test:test('if_not_exists test', function(test) diff --git a/t/040-utubettl.t b/t/040-utubettl.t index 1d2c5f37..be0535b7 100755 --- a/t/040-utubettl.t +++ b/t/040-utubettl.t @@ -21,187 +21,280 @@ test:ok(queue, 'queue is loaded') local tube = queue.create_tube('test', 'utubettl', { engine = engine }) local tube2 = queue.create_tube('test_stat', 'utubettl', { engine = engine }) +local tube_ready, tube2_ready +if engine ~= 'vinyl' then + tube_ready = queue.create_tube('test_ready', 'utubettl', + { engine = engine, storage_mode = queue.driver.utubettl.STORAGE_MODE_READY_BUFFER }) + tube2_ready = queue.create_tube('test_stat_ready', 'utubettl', + { engine = engine, storage_mode = queue.driver.utubettl.STORAGE_MODE_READY_BUFFER }) +end test:ok(tube, 'test tube created') test:is(tube.name, 'test', 'tube.name') test:is(tube.type, 'utubettl', 'tube.type') test:test('Utubettl statistics', function(test) - test:plan(13) - tube2:put('stat_0') - tube2:put('stat_1') - tube2:put('stat_2') - tube2:put('stat_3') - tube2:put('stat_4') - tube2:put('stat_5', {delay=1000}) - tube2:delete(4) - tube2:take(.001) - tube2:release(0) - tube2:take(.001) - tube2:ack(0) - tube2:bury(1) - tube2:bury(2) - tube2:kick(1) - tube2:take(.001) - - local stats = queue.statistics('test_stat') - - -- check tasks statistics - test:is(stats.tasks.taken, 1, 'tasks.taken') - test:is(stats.tasks.buried, 1, 'tasks.buried') - test:is(stats.tasks.ready, 1, 'tasks.ready') - test:is(stats.tasks.done, 2, 'tasks.done') - test:is(stats.tasks.delayed, 1, 'tasks.delayed') - test:is(stats.tasks.total, 4, 'tasks.total') - - -- check function call statistics - test:is(stats.calls.delete, 1, 'calls.delete') - test:is(stats.calls.ack, 1, 'calls.ack') - test:is(stats.calls.take, 3, 'calls.take') - test:is(stats.calls.kick, 1, 'calls.kick') - test:is(stats.calls.bury, 2, 'calls.bury') - test:is(stats.calls.put, 6, 'calls.put') - test:is(stats.calls.release, 1, 'calls.release') + if engine ~= 'vinyl' then + test:plan(13 * 2) + else + test:plan(13) + end + for _, tube_stat in ipairs({tube2, tube2_ready}) do + if tube_stat == nil then + break + end + + tube_stat:put('stat_0') + tube_stat:put('stat_1') + tube_stat:put('stat_2') + tube_stat:put('stat_3') + tube_stat:put('stat_4') + tube_stat:put('stat_5', {delay=1000}) + tube_stat:delete(4) + tube_stat:take(.001) + tube_stat:release(0) + tube_stat:take(.001) + tube_stat:ack(0) + tube_stat:bury(1) + tube_stat:bury(2) + tube_stat:kick(1) + tube_stat:take(.001) + + local stats = queue.statistics(tube_stat.name) + + -- check tasks statistics + test:is(stats.tasks.taken, 1, 'tasks.taken') + test:is(stats.tasks.buried, 1, 'tasks.buried') + test:is(stats.tasks.ready, 1, 'tasks.ready') + test:is(stats.tasks.done, 2, 'tasks.done') + test:is(stats.tasks.delayed, 1, 'tasks.delayed') + test:is(stats.tasks.total, 4, 'tasks.total') + + -- check function call statistics + test:is(stats.calls.delete, 1, 'calls.delete') + test:is(stats.calls.ack, 1, 'calls.ack') + test:is(stats.calls.take, 3, 'calls.take') + test:is(stats.calls.kick, 1, 'calls.kick') + test:is(stats.calls.bury, 2, 'calls.bury') + test:is(stats.calls.put, 6, 'calls.put') + test:is(stats.calls.release, 1, 'calls.release') + end end) test:test('Easy put/take/ack', function(test) - test:plan(12) + if engine ~= 'vinyl' then + test:plan(12 * 2) + else + test:plan(12) + end - test:ok(tube:put(123, {utube = 1}), 'task was put') - test:ok(tube:put(345, {utube = 1}), 'task was put') - local task = tube:take() - test:ok(task, 'task was taken') - test:is(task[2], state.TAKEN, 'task status') - test:is(task[3], 123, 'task.data') - test:ok(tube:take(.1) == nil, 'second task was not taken (the same tube)') - - task = tube:ack(task[1]) - test:ok(task, 'task was acked') - test:is(task[2], '-', 'task status') - test:is(task[3], 123, 'task.data') - - task = tube:take(.1) - test:ok(task, 'task2 was taken') - test:is(task[3], 345, 'task.data') - test:is(task[2], state.TAKEN, 'task.status') + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end + + test:ok(test_tube:put(123, {utube = 1}), 'task was put') + test:ok(test_tube:put(345, {utube = 1}), 'task was put') + local task = test_tube:take() + test:ok(task, 'task was taken') + test:is(task[2], state.TAKEN, 'task status') + test:is(task[3], 123, 'task.data') + test:ok(test_tube:take(.1) == nil, 'second task was not taken (the same tube)') + + task = test_tube:ack(task[1]) + test:ok(task, 'task was acked') + test:is(task[2], '-', 'task status') + test:is(task[3], 123, 'task.data') + + task = test_tube:take(.1) + test:ok(task, 'task2 was taken') + test:is(task[3], 345, 'task.data') + test:is(task[2], state.TAKEN, 'task.status') + end end) test:test('ttr put/take', function(test) - test:plan(3) + if engine ~= 'vinyl' then + test:plan(3 * 2) + else + test:plan(3) + end + local my_queue = queue.create_tube('trr_test', 'utubettl', { engine = engine }) test:ok(my_queue:put('ttr1', { ttr = 1 }), 'put ttr task') test:ok(my_queue:take(0.1) ~= nil, 'take this task') - fiber.sleep(1.1) + fiber.sleep(1.5) local task = my_queue:peek(0) test:is(task[2], state.READY, 'Ready state returned after one second') + + if engine ~= 'vinyl' then + local my_queue_ready = queue.create_tube('trr_test_v2', 'utubettl', + { engine = engine, storage_mode = queue.driver.utubettl.STORAGE_MODE_READY_BUFFER }) + test:ok(my_queue_ready:put('ttr1', { ttr = 1 }), 'put ttr task') + test:ok(my_queue_ready:take(0.1) ~= nil, 'take this task') + fiber.sleep(1.5) + local task = my_queue_ready:peek(0) + test:is(task[2], state.READY, 'Ready state returned after one second') + end end) test:test('ack in utube', function(test) - test:plan(8) - - test:ok(tube:put(123, {utube = 'abc'}), 'task was put') - test:ok(tube:put(345, {utube = 'abc'}), 'task was put') + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'second task was taken') - test:is(taken[3], 345, 'task.data') - state = state + 1 - end) + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken') - test:is(taken[3], 123, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:ack(taken[1]) - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + test:ok(test_tube:put(123, {utube = 'abc'}), 'task was put') + test:ok(test_tube:put(345, {utube = 'abc'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'second task was taken') + test:is(taken[3], 345, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken') + test:is(taken[3], 123, 'task.data') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:ack(taken[1]) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('bury in utube', function(test) - test:plan(8) - - test:ok(tube:put(567, {utube = 'cde'}), 'task was put') - test:ok(tube:put(789, {utube = 'cde'}), 'task was put') + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'second task was taken') - test:is(taken[3], 789, 'task.data') - state = state + 1 - end) + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken') - test:is(taken[3], 567, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:bury(taken[1]) - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + test:ok(test_tube:put(567, {utube = 'cde'}), 'task was put') + test:ok(test_tube:put(789, {utube = 'cde'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'second task was taken') + test:is(taken[3], 789, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken') + test:is(taken[3], 567, 'task.data') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:bury(taken[1]) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('instant bury', function(test) - test:plan(1) + if engine ~= 'vinyl' then + test:plan(1 * 2) + else + test:plan(1) + end + tube:put(1, {ttr=60}) local taken = tube:take(.1) test:is(tube:bury(taken[1])[2], '!', 'task is buried') + + if tube_ready ~= nil then + tube_ready:put(1, {ttr=60}) + taken = tube_ready:take(.1) + test:is(tube_ready:bury(taken[1])[2], '!', 'task is buried') + end end) test:test('release in utube', function(test) - test:plan(8) + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - test:ok(tube:put(678, {utube = 'def'}), 'task was put') - test:ok(tube:put(890, {utube = 'def'}), 'task was put') + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'first task was taken again') + test:ok(test_tube:put(678, {utube = 'def'}), 'task was put') + test:ok(test_tube:put(890, {utube = 'def'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'first task was taken again') + test:is(taken[3], 678, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken ' .. taken[1]) test:is(taken[3], 678, 'task.data') - state = state + 1 - end) - - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken ' .. taken[1]) - test:is(taken[3], 678, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:release(taken[1]) - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:release(taken[1]) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('release[delay] in utube', function(test) - test:plan(8) - - test:ok(tube:put(789, {utube = 'efg'}), 'task was put') - test:ok(tube:put(901, {utube = 'efg'}), 'task was put') + if engine ~= 'vinyl' then + test:plan(8 * 2) + else + test:plan(8) + end - local state = 0 - fiber.create(function() - fiber.sleep(0.1) - local taken = tube:take() - test:ok(taken, 'second task was taken') - test:is(taken[3], 901, 'task.data') - state = state + 1 - end) + for _, test_tube in ipairs({tube, tube_ready}) do + if test_tube == nil then + break + end - local taken = tube:take(.1) - state = 1 - test:ok(taken, 'task was taken ' .. taken[1]) - test:is(taken[3], 789, 'task.data') - fiber.sleep(0.3) - test:is(state, 1, 'state was not changed') - tube:release(taken[1], { delay = 10 }) -- - fiber.sleep(0.2) - test:is(state, 2, 'state was changed') + test:ok(test_tube:put(789, {utube = 'efg'}), 'task was put') + test:ok(test_tube:put(901, {utube = 'efg'}), 'task was put') + + local state = 0 + fiber.create(function() + fiber.sleep(0.1) + local taken = test_tube:take() + test:ok(taken, 'second task was taken') + test:is(taken[3], 901, 'task.data') + state = state + 1 + end) + + local taken = test_tube:take(.1) + state = 1 + test:ok(taken, 'task was taken ' .. taken[1]) + test:is(taken[3], 789, 'task.data') + fiber.sleep(0.3) + test:is(state, 1, 'state was not changed') + test_tube:release(taken[1], { delay = 10 }) + fiber.sleep(0.2) + test:is(state, 2, 'state was changed') + end end) test:test('if_not_exists test', function(test) diff --git a/t/050-ttl.t b/t/050-ttl.t index 0695ffba..8f26b4fb 100755 --- a/t/050-ttl.t +++ b/t/050-ttl.t @@ -2,7 +2,7 @@ local fiber = require('fiber') local test = require('tap').test() -test:plan(5) +test:plan(7) local queue = require('queue') @@ -55,6 +55,21 @@ test:test('one message per queue utttl', function (test) end end) +test:test('one message per queue utttl_ready', function (test) + if engine == 'vinyl' then + return + end + + test:plan(20) + local tube = queue.create_tube('ompq_utttl_ready', 'utubettl', + { engine = engine, storage_mode = queue.driver.utubettl.STORAGE_MODE_READY_BUFFER }) + for i = 1, 20 do + tube:put('ompq_' .. i, {ttl=ttl}) + + test_take_after_ttl(test, tube, ttl) + end +end) + test:test('many messages, one queue ffttl', function (test) test:plan(20) for i = 1, 20 do @@ -75,6 +90,21 @@ test:test('many messages, one queue utttl', function (test) end end) +test:test('many messages, one queue utttl_ready', function (test) + if engine == 'vinyl' then + return + end + + test:plan(20) + for i = 1, 20 do + local tube = queue.create_tube('mmpq_utttl_ready_' .. i, 'utubettl', + { engine = engine, storage_mode = queue.driver.utubettl.STORAGE_MODE_READY_BUFFER }) + tube:put('mmpq_' .. i, {ttl=ttl}) + + test_take_after_ttl(test, tube, ttl) + end +end) + tnt.finish() os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/160-validate-space.t b/t/160-validate-space.t index 1fa0e30b..cca151c4 100755 --- a/t/160-validate-space.t +++ b/t/160-validate-space.t @@ -81,7 +81,7 @@ end) test:test('test corrupted space utubettl', function(test) test_corrupted_space(test, utubettl, - {'task_id', 'status', 'utube', 'watch'}) + {'task_id', 'status', 'utube', 'watch', 'utube_pri'}) end) test:test('Space name conflict fifo', function(test) diff --git a/t/benchmark/busy_utubes.lua b/t/benchmark/busy_utubes.lua new file mode 100644 index 00000000..3e95c3ba --- /dev/null +++ b/t/benchmark/busy_utubes.lua @@ -0,0 +1,90 @@ +#!/usr/bin/env tarantool + +local clock = require('clock') +local os = require('os') +local fiber = require('fiber') +local queue = require('queue') + +-- Set the number of consumers. +local consumers_count = 10 +-- Set the number of tasks processed by one consumer per iteration. +local batch_size = 150000 + +local barrier = fiber.cond() +local wait_count = 0 + +box.cfg() + +local test_queue = queue.create_tube('test_queue', 'utube', + {temporary = true, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER}) + +local function prepare_tasks() + local test_data = 'test data' + + for i = 1, consumers_count do + for _ = 1, batch_size do + test_queue:put(test_data, {utube = tostring(i)}) + end + end +end + +local function prepare_consumers() + local consumers = {} + + for i = 1, consumers_count do + consumers[i] = fiber.create(function() + wait_count = wait_count + 1 + -- Wait for all consumers to start. + barrier:wait() + + -- Ack the tasks. + for _ = 1, batch_size do + local task = test_queue:take() + fiber.yield() + test_queue:ack(task[1]) + end + + wait_count = wait_count + 1 + end) + end + + return consumers +end + +local function multi_consumer_bench() + --- Wait for all consumer fibers. + local wait_all = function() + while (wait_count ~= consumers_count) do + fiber.yield() + end + wait_count = 0 + end + + fiber.set_max_slice(100) + + prepare_tasks() + + -- Wait for all consumers to start. + local consumers = prepare_consumers() + wait_all() + + -- Start timing of task confirmation. + local start_ack_time = clock.proc64() + barrier:broadcast() + -- Wait for all tasks to be acked. + wait_all() + -- Complete the timing of task confirmation. + local complete_time = clock.proc64() + + -- Print the result in milliseconds. + print(string.format("Time it takes to confirm the tasks: %i ms", + tonumber((complete_time - start_ack_time) / 10^6))) +end + +-- Start benchmark. +multi_consumer_bench() + +-- Cleanup. +test_queue:drop() + +os.exit(0) diff --git a/t/benchmark/many_utubes.lua b/t/benchmark/many_utubes.lua new file mode 100644 index 00000000..63a08b3d --- /dev/null +++ b/t/benchmark/many_utubes.lua @@ -0,0 +1,86 @@ +#!/usr/bin/env tarantool + +local clock = require('clock') +local os = require('os') +local fiber = require('fiber') +local queue = require('queue') + +-- Set the number of consumers. +local consumers_count = 30000 + +local barrier = fiber.cond() +local wait_count = 0 + +box.cfg() + +local test_queue = queue.create_tube('test_queue', 'utube', + {temporary = true, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER}) + +local function prepare_tasks() + local test_data = 'test data' + + for i = 1, consumers_count do + test_queue:put(test_data, {utube = tostring(i)}) + end +end + +local function prepare_consumers() + local consumers = {} + + for i = 1, consumers_count do + consumers[i] = fiber.create(function() + wait_count = wait_count + 1 + -- Wait for all consumers to start. + barrier:wait() + + -- Ack the task. + local task = test_queue:take() + test_queue:ack(task[1]) + + wait_count = wait_count + 1 + end) + end + + return consumers +end + +local function multi_consumer_bench() + --- Wait for all consumer fibers. + local wait_all = function() + while (wait_count ~= consumers_count) do + fiber.yield() + end + wait_count = 0 + end + + fiber.set_max_slice(100) + + -- Wait for all consumers to start. + local consumers = prepare_consumers() + wait_all() + + -- Start timing creation of tasks. + local start_put_time = clock.proc64() + prepare_tasks() + -- Start timing of task confirmation. + local start_ack_time = clock.proc64() + barrier:broadcast() + -- Wait for all tasks to be acked. + wait_all() + -- Complete the timing of task confirmation. + local complete_time = clock.proc64() + + -- Print results in milliseconds. + print(string.format("Time it takes to fill the queue: %i ms", + tonumber((start_ack_time - start_put_time) / 10^6))) + print(string.format("Time it takes to confirm the tasks: %i ms", + tonumber((complete_time - start_ack_time) / 10^6))) +end + +-- Start benchmark. +multi_consumer_bench() + +-- Cleanup. +test_queue:drop() + +os.exit(0)