Skip to content

Commit

Permalink
PS-9148: Implement dictionary flusher for masking_functions plugin
Browse files Browse the repository at this point in the history
https://perconadev.atlassian.net/browse/PS-9148

- Added component_masking.dictionaries_flush_interval_seconds system
  variable.
- Added actual flusher thread. It periodically rereads content of
  dictionary table and updates in-memory cache.
  • Loading branch information
oleksandr-kachan committed Apr 10, 2024
1 parent 15cfaae commit 58517cf
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@
#ifndef MASKING_FUNCTIONS_QUERY_CACHE_HPP
#define MASKING_FUNCTIONS_QUERY_CACHE_HPP

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <string>

#include <my_inttypes.h>
#include <mysql/components/services/psi_thread.h>

#include "masking_functions/dictionary_container.hpp"

namespace masking_functions {

class query_cache {
public:
query_cache();
query_cache(query_cache &other) = delete;
query_cache(query_cache &&other) = delete;
query_cache &operator=(query_cache &other) = delete;
query_cache &operator=(query_cache &&other) = delete;
~query_cache();

bool contains(const std::string &dictionary_name,
const std::string &term) const;
Expand All @@ -35,9 +47,23 @@ class query_cache {
bool insert(const std::string &dictionary_name, const std::string &term);
bool load_cache();

void init_thd() noexcept;
void release_thd() noexcept;
void dict_flusher() noexcept;

private:
mutable std::shared_mutex m_dict_mut;
dictionary_container m_dict_cache;

ulonglong m_flusher_interval_seconds;
std::atomic<bool> m_is_flusher_stopped;
std::mutex m_flusher_mutex;
std::condition_variable m_flusher_condition_var;

PSI_thread_key m_psi_flusher_thread_key;
my_thread_handle m_flusher_thread;
my_thread_attr_t m_flusher_thread_attr;
std::unique_ptr<THD> m_flusher_thd;
};

} // namespace masking_functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
#ifndef MASKING_FUNCTIONS_SYS_VARS_HPP
#define MASKING_FUNCTIONS_SYS_VARS_HPP

#include "my_inttypes.h"

#include <string_view>

namespace masking_functions::sys_vars {

std::string_view get_dict_database_name() noexcept;
ulonglong get_flush_interval_seconds() noexcept;

bool register_sys_vars();
bool unregister_sys_vars();
Expand Down
5 changes: 5 additions & 0 deletions components/masking_functions/src/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <mysql/components/services/mysql_current_thread_reader.h>
#include <mysql/components/services/mysql_runtime_error.h>
#include <mysql/components/services/mysql_string.h>
#include <mysql/components/services/psi_thread.h>
#include <mysql/components/services/security_context.h>
#include <mysql/components/services/udf_metadata.h>
#include <mysql/components/services/udf_registration.h>
Expand Down Expand Up @@ -59,6 +60,8 @@ REQUIRES_SERVICE_PLACEHOLDER(mysql_command_query_result);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_options);
REQUIRES_SERVICE_PLACEHOLDER(mysql_command_factory);

REQUIRES_PSI_THREAD_SERVICE_PLACEHOLDER;

REQUIRES_SERVICE_PLACEHOLDER(udf_registration);
REQUIRES_SERVICE_PLACEHOLDER(dynamic_privilege_register);

Expand Down Expand Up @@ -192,6 +195,8 @@ BEGIN_COMPONENT_REQUIRES(CURRENT_COMPONENT_NAME)
REQUIRES_SERVICE(mysql_string_substr),
REQUIRES_SERVICE(mysql_string_compare),

REQUIRES_PSI_THREAD_SERVICE,

REQUIRES_SERVICE(mysql_command_query),
REQUIRES_SERVICE(mysql_command_query_result),
REQUIRES_SERVICE(mysql_command_options),
Expand Down
105 changes: 104 additions & 1 deletion components/masking_functions/src/masking_functions/query_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,121 @@
#include "masking_functions/primitive_singleton.hpp"
#include "masking_functions/query_builder.hpp"
#include "masking_functions/sql_context.hpp"
#include "masking_functions/sys_vars.hpp"

#include <mysql/components/services/log_builtins.h>
#include <mysql/psi/mysql_thread.h>
#include <mysqld_error.h>
#include <sql/debug_sync.h>
#include <sql/sql_class.h>

#include <chrono>
#include <string_view>

extern REQUIRES_SERVICE_PLACEHOLDER(log_builtins);

namespace masking_functions {
namespace {

constexpr std::string_view psi_category_name{"masking_functions"};
constexpr std::string_view flusher_thd_psi_name{
"masking_functions_dict_flusher"};
constexpr std::string_view flusher_thd_psi_os_name{"mf_flusher"};

using global_command_services = masking_functions::primitive_singleton<
masking_functions::command_service_tuple>;
using global_query_builder =
masking_functions::primitive_singleton<masking_functions::query_builder>;

void *run_dict_flusher(void *arg) {
auto *self = reinterpret_cast<masking_functions::query_cache *>(arg);
self->init_thd();
self->dict_flusher();
self->release_thd();
return nullptr;
}

} // namespace

query_cache::query_cache() { load_cache(); }
query_cache::query_cache()
: m_flusher_interval_seconds{sys_vars::get_flush_interval_seconds()},
m_is_flusher_stopped{true} {
load_cache();

if (m_flusher_interval_seconds > 0) {
PSI_thread_info thread_info{&m_psi_flusher_thread_key,
flusher_thd_psi_name.data(),
flusher_thd_psi_os_name.data(),
PSI_FLAG_SINGLETON,
0,
PSI_DOCUMENT_ME};
mysql_thread_register(psi_category_name.data(), &thread_info, 1);

const auto res =
mysql_thread_create(m_psi_flusher_thread_key, &m_flusher_thread,
&m_flusher_thread_attr, run_dict_flusher, this);

if (res != 0) {
LogComponentErr(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
"Cannot initialize dictionary flusher");
} else {
m_is_flusher_stopped = false;
}
}
}

query_cache::~query_cache() {
if (!m_is_flusher_stopped) {
m_is_flusher_stopped = true;
m_flusher_condition_var.notify_one();
}
}

void query_cache::init_thd() noexcept {
auto *thd = new THD;
my_thread_init();
thd->set_new_thread_id();
thd->thread_stack = reinterpret_cast<char *>(&thd);
thd->store_globals();
m_flusher_thd.reset(thd);
}

void query_cache::release_thd() noexcept { my_thread_end(); }

void query_cache::dict_flusher() noexcept {
#ifdef HAVE_PSI_THREAD_INTERFACE
{
struct PSI_thread *psi = m_flusher_thd->get_psi();
PSI_THREAD_CALL(set_thread_id)(psi, m_flusher_thd->thread_id());
PSI_THREAD_CALL(set_thread_THD)(psi, m_flusher_thd.get());
PSI_THREAD_CALL(set_thread_command)(m_flusher_thd->get_command());
PSI_THREAD_CALL(set_thread_info)
(STRING_WITH_LEN("Masking functions component cache flusher"));
}
#endif

while (!m_is_flusher_stopped) {
std::unique_lock lock{m_flusher_mutex};
const auto wait_started_at = std::chrono::system_clock::now();
m_flusher_condition_var.wait_for(
lock, std::chrono::seconds{m_flusher_interval_seconds},
[this, wait_started_at] {
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - wait_started_at) >=
std::chrono::seconds{m_flusher_interval_seconds} ||
m_is_flusher_stopped.load();
});

if (!m_is_flusher_stopped) {
load_cache();

DBUG_EXECUTE_IF("masking_functions_signal_on_cache_reload", {
const char act[] = "now SIGNAL masking_functions_cache_reload_done";
assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
};);
}
}
}

bool query_cache::load_cache() {
auto query = global_query_builder::instance().select_all_from_dictionary();
Expand Down
34 changes: 34 additions & 0 deletions components/masking_functions/src/masking_functions/sys_vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <mysqld_error.h>

#include <climits>
#include <cstring>
#include <string>

Expand All @@ -32,20 +33,30 @@ namespace masking_functions::sys_vars {
namespace {

using str_arg_check_type = STR_CHECK_ARG(str);
using ulonglong_arg_check_type = INTEGRAL_CHECK_ARG(ulonglong);

constexpr std::string_view component_name{"masking_functions"};
constexpr std::string_view masking_database_var_name{"masking_database"};
constexpr std::string_view flush_interval_var_name{
"dictionaries_flush_interval_seconds"};

std::string default_database_name{"mysql"};
const ulonglong default_flush_interval_seconds = 0;

bool is_database_name_initialised = false;
bool is_flush_interval_initialised = false;

char *database_name;
ulonglong flush_interval_seconds = 0;

} // namespace

std::string_view get_dict_database_name() noexcept { return database_name; }

ulonglong get_flush_interval_seconds() noexcept {
return flush_interval_seconds;
}

bool register_sys_vars() {
str_arg_check_type check_db_name{default_database_name.data()};

Expand All @@ -61,6 +72,23 @@ bool register_sys_vars() {
}
is_database_name_initialised = true;

ulonglong_arg_check_type check_flush_interval{default_flush_interval_seconds,
0, ULLONG_MAX, 1};

if (mysql_service_component_sys_variable_register->register_variable(
component_name.data(), flush_interval_var_name.data(),
PLUGIN_VAR_LONGLONG | PLUGIN_VAR_UNSIGNED | PLUGIN_VAR_RQCMDARG |
PLUGIN_VAR_READONLY,
"Sets the interval, in seconds, to wait before attempting to "
"schedule another flush of the data masking dictionaries table to "
"the memory data masking dictionaries cache following a restart or "
"previous execution.",
nullptr, nullptr, static_cast<void *>(&check_flush_interval),
static_cast<void *>(&flush_interval_seconds)) != 0) {
return false;
}
is_flush_interval_initialised = true;

return true;
}

Expand All @@ -73,6 +101,12 @@ bool unregister_sys_vars() {
is_success = false;
}

if (is_flush_interval_initialised &&
mysql_service_component_sys_variable_unregister->unregister_variable(
component_name.data(), flush_interval_var_name.data()) != 0) {
is_success = false;
}

return is_success;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information.
[connection master]
[connection master]
SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload';
INSTALL COMPONENT 'file://component_masking_functions';
[connection slave]
SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload';
INSTALL COMPONENT 'file://component_masking_functions';
[connection master]
CREATE TABLE mysql.masking_dictionaries(
Dictionary VARCHAR(256) NOT NULL,
Term VARCHAR(256) NOT NULL,
UNIQUE INDEX dictionary_term_idx (Dictionary, Term)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4;
CREATE USER udftest_priv@localhost;
GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost;
SELECT masking_dictionary_term_add('single_dict_1', 'entry_1');
masking_dictionary_term_add('single_dict_1', 'entry_1')
1
SELECT masking_dictionary_term_add('single_dict_2', 'entry_2');
masking_dictionary_term_add('single_dict_2', 'entry_2')
1
SELECT gen_dictionary('single_dict_1');
gen_dictionary('single_dict_1')
entry_1
SELECT gen_dictionary('single_dict_2');
gen_dictionary('single_dict_2')
entry_2
include/rpl_sync.inc
[connection slave]
SELECT * FROM mysql.masking_dictionaries;
Dictionary Term
single_dict_1 entry_1
single_dict_2 entry_2
SELECT gen_dictionary('single_dict_1');
gen_dictionary('single_dict_1')
entry_1
SELECT gen_dictionary('single_dict_2');
gen_dictionary('single_dict_2')
entry_2
[connection master]
INSERT INTO mysql.masking_dictionaries VALUES ('single_dict_3', 'entry_3');
SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done';
SELECT gen_dictionary('single_dict_3');
gen_dictionary('single_dict_3')
entry_3
include/rpl_sync.inc
[connection slave]
SELECT * FROM mysql.masking_dictionaries;
Dictionary Term
single_dict_1 entry_1
single_dict_2 entry_2
single_dict_3 entry_3
SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done';
SELECT gen_dictionary('single_dict_3');
gen_dictionary('single_dict_3')
entry_3
[connection slave]
SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload';
UNINSTALL COMPONENT 'file://component_masking_functions';
[connection master]
SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload';
UNINSTALL COMPONENT 'file://component_masking_functions';
DROP USER udftest_priv@localhost;
DROP TABLE mysql.masking_dictionaries;
include/rpl_sync.inc
include/rpl_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
INSTALL COMPONENT 'file://component_masking_functions';
SELECT @@global.masking_functions.dictionaries_flush_interval_seconds;
@@global.masking_functions.dictionaries_flush_interval_seconds
0
SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%";
NAME
SET GLOBAL masking_functions.dictionaries_flush_interval_seconds=100;
ERROR HY000: Variable 'masking_functions.dictionaries_flush_interval_seconds' is a read only variable
SET SESSION masking_functions.dictionaries_flush_interval_seconds=100;
ERROR HY000: Variable 'masking_functions.dictionaries_flush_interval_seconds' is a read only variable
# restart: --masking-functions.dictionaries-flush-interval-seconds=100
SELECT @@global.masking_functions.dictionaries_flush_interval_seconds;
@@global.masking_functions.dictionaries_flush_interval_seconds
100
CREATE TABLE mysql.masking_dictionaries(
Dictionary VARCHAR(256) NOT NULL,
Term VARCHAR(256) NOT NULL,
UNIQUE INDEX dictionary_term_idx (Dictionary, Term)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4;
CREATE USER udftest_priv@localhost;
GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost;
SELECT masking_dictionary_term_add('single_dict_1', 'entry_1');
masking_dictionary_term_add('single_dict_1', 'entry_1')
1
SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%";
NAME
thread/masking_functions/masking_functions_dict_flusher
UNINSTALL COMPONENT 'file://component_masking_functions';
DROP USER udftest_priv@localhost;
DROP TABLE mysql.masking_dictionaries;
# restart:
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$MASKING_FUNCTIONS_COMPONENT_OPT
--loose-masking_functions.dictionaries_flush_interval_seconds=1
Loading

0 comments on commit 58517cf

Please sign in to comment.