Skip to content

Commit

Permalink
refactor(clustering/rpc): reshape do_sync (#14199)
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki authored Jan 22, 2025
1 parent d80ff4b commit 1d0f212
Showing 1 changed file with 73 additions and 60 deletions.
133 changes: 73 additions & 60 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,63 @@ local function update_status(ver)
end


local function lmdb_update(db, t, delta, opts, is_full_sync)
local delta_type = delta.type
local delta_entity = delta.entity

-- upsert the entity
-- delete if exists
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end

if old_entity and not is_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

if is_full_sync then
return nil
end

return { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end


local function lmdb_delete(db, t, delta, opts, is_full_sync)
local delta_type = delta.type

local old_entity, err = db[delta_type]:select(delta.pk, opts)
if err then
return nil, err
end

-- full sync requires extra torlerance for missing entities
if not old_entity then
return nil
end

local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end

if is_full_sync then
return nil
end

return { delta_type, "delete", old_entity, }
end


local function do_sync()
if not is_rpc_ready() then
return nil, "rpc is not ready"
Expand Down Expand Up @@ -257,72 +314,28 @@ local function do_sync()
local delta_version = delta.version
local delta_type = delta.type
local delta_entity = delta.entity
local ev

-- delta should have ws_id to generate the correct lmdb key
-- if entity is workspaceable
-- set the correct workspace for item
opts.workspace = delta.ws_id

if delta_entity ~= nil and delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] update entity",
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

else
-- delete the entity, opts for getting correct lmdb key
local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] delete entity",
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
local is_update = delta_entity ~= nil and delta_entity ~= ngx_null
local operation_name = is_update and "update" or "delete"
local operation = is_update and lmdb_update or lmdb_delete

-- log the operation before executing it, so when failing we know what entity caused it
ngx_log(ngx_DEBUG,
"[kong.sync.v2] ", operation_name, " entity",
", version: ", delta_version,
", type: ", delta_type)

local ev, err = operation(db, t, delta, opts, wipe)
if err then
return nil, err
end

if ev then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand Down

1 comment on commit 1d0f212

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong-dev:1d0f21232fcacfcfbf6d5cef0a1c1e9129792c91
Artifacts available https://github.com/Kong/kong/actions/runs/12901265055

Please sign in to comment.