Skip to content

Commit

Permalink
utube: fix slow take on busy utubes
Browse files Browse the repository at this point in the history
If some of the utube for tasks at the top of the queue were busy
most of the time, `take` would slow down for every other task.
This problem is fixed by creating a new space `space_ready`. It
contains first task with `READY` status from each utube.

This solution shows great results for the stated problem, with the cost
of slowing the `put` method (it is ~3 times slower). Thus, this workaround is
disabled by default. To enable it, user should set the `v2 = true` as an
option while creating the tube. As example:
```lua
local test_queue = queue.create_tube('test_queue', 'utube',
        {temporary = true, v2 = true})
```

Part of #228
  • Loading branch information
DerekBum committed May 7, 2024
1 parent aa7c092 commit 5da7eca
Show file tree
Hide file tree
Showing 5 changed files with 411 additions and 41 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
- `v2` boolean option for creating a `utube` tube (#228). It enables the
workaround for slow takes while working with busy tubes.

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.
- Slow takes on busy `utube` tubes (#228). The workaround could be enabled by
passing the `v2 = true` option while creating the tube.

## [1.3.3] - 2023-09-13

Expand Down
194 changes: 187 additions & 7 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,37 @@ function tube.create_space(space_name, opts)
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()}
})
space.v2 = opts.v2
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

local space_ready_name = space.name .. "_utube_ready"
local space_ready = box.space[space_ready_name]
if space.v2 and not space_ready then
-- Create a space for first ready tasks from each utube.
space_ready = box.schema.create_space(space_ready_name, space_opts)
space_ready:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
unique = true
})
space_ready:create_index('utube', {
type = 'tree',
parts = {2, str_type()},
unique = true
})
end

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
space_ready = space_ready,
on_task_change = on_task_change,
v2 = space.v2 or false,
}, { __index = method })
return self
end
Expand All @@ -73,8 +93,25 @@ function method.normalize_task(self, task)
return task and task:transform(3, 1)
end

local function put_ready(self, utube)
local taken = self.space.index.utube:min{state.TAKEN, utube}
if taken == nil or taken[2] ~= state.TAKEN then
local next_task = self.space.index.utube:min{state.READY, utube}
if next_task == nil then
return
end
pcall(self.space_ready.insert, self.space_ready, {next_task[1], utube})
end
end

-- put task in space
function method.put(self, data, opts)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local max

-- Taking the maximum of the index is an implicit transactions, so it is
Expand All @@ -88,22 +125,71 @@ function method.put(self, data, opts)
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end
max = self.space.index.task_id:max()

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
if self.v2 then
put_ready(self, task[3])
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'put')
return task
end

local function take_ready(self)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())

while true do
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task_ready = self.space_ready.index.task_id:min()
if task_ready == nil then
if commit_requirements then
box.commit()
end
return
end

local id = task_ready[1]
local task = self.space:get(id)

if task[2] == state.READY then
local taken = self.space.index.utube:min{state.TAKEN, task[3]}

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(id, { { '=', 2, state.TAKEN } })

self.space_ready:delete(id)

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'take')
return task
end
end

if commit_requirements then
box.commit()
end
end
end

-- take task
function method.take(self)
if self.v2 then
return take_ready(self)
end

for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
Expand Down Expand Up @@ -141,49 +227,127 @@ function method.touch(self, id, ttr)
error('utube queue does not support touch')
end

local function delete_ready(self, id, utube)
self.space_ready:delete(id)
put_ready(self, utube)
end

-- delete task
function method.delete(self, id)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
if self.v2 then
if task[2] == state.TAKEN then
put_ready(self, task[3])
elseif task[2] == state.READY then
delete_ready(self, id, task[3])
end
end

task = task:transform(2, 1, state.DONE)

local neighbour = self.space.index.utube:min{state.READY, task[3]}

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'delete')
if neighbour then
self.on_task_change(neighbour)
end
return task
end

if commit_requirements then
box.commit()
end
return task
end

-- release task
function method.release(self, id, opts)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space:update(id, {{ '=', 2, state.READY }})
if task ~= nil then
if self.v2 then
-- We guarantee that release is called only on TAKEN tasks.
self.space_ready:insert{id, task[3]}
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'release')
return task
end

if commit_requirements then
box.commit()
end
return task
end

-- bury task
function method.bury(self, id)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space:update(id, {{ '=', 2, state.BURIED }})
if task ~= nil then
if self.v2 then
local ready_task = self.space_ready:get{task[1]}
if ready_task ~= nil then
delete_ready(self, id, task[3])
end
end

local neighbour = self.space.index.utube:min{state.READY, task[3]}

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'bury')
if neighbour and neighbour[i_status] == state.READY then
self.on_task_change(neighbour)
end
else
if commit_requirements then
box.commit()
end

self.on_task_change(task, 'bury')
end
return task
end

-- unbury several tasks
function method.kick(self, count)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())

for i = 1, count do
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return i - 1
Expand All @@ -193,6 +357,19 @@ function method.kick(self, count)
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.v2 then
local prev_task = self.space_ready.index.utube:get{task[3]}
if prev_task ~= nil then
delete_ready(self, prev_task[1], task[3])
else
put_ready(self, task[3])
end
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'kick')
end
return count
Expand All @@ -210,6 +387,9 @@ end

function method.truncate(self)
self.space:truncate()
if self.v2 then
self.space_ready:truncate()
end
end

-- This driver has no background activity.
Expand Down
Loading

0 comments on commit 5da7eca

Please sign in to comment.