diff --git a/components/masking_functions/include/masking_functions/query_cache.hpp b/components/masking_functions/include/masking_functions/query_cache.hpp index 592d812f6de1..809a6db8578b 100644 --- a/components/masking_functions/include/masking_functions/query_cache.hpp +++ b/components/masking_functions/include/masking_functions/query_cache.hpp @@ -16,9 +16,16 @@ #ifndef MASKING_FUNCTIONS_QUERY_CACHE_HPP #define MASKING_FUNCTIONS_QUERY_CACHE_HPP +#include +#include +#include +#include #include #include +#include +#include + #include "masking_functions/dictionary_container.hpp" namespace masking_functions { @@ -26,6 +33,11 @@ namespace masking_functions { class query_cache { public: query_cache(); + query_cache(query_cache &other) = delete; + query_cache(query_cache &&other) = delete; + query_cache &operator=(query_cache &other) = delete; + query_cache &operator=(query_cache &&other) = delete; + ~query_cache(); bool contains(const std::string &dictionary_name, const std::string &term) const; @@ -35,9 +47,23 @@ class query_cache { bool insert(const std::string &dictionary_name, const std::string &term); bool load_cache(); + void init_thd() noexcept; + void release_thd() noexcept; + void dict_flusher() noexcept; + private: mutable std::shared_mutex m_dict_mut; dictionary_container m_dict_cache; + + ulonglong m_flusher_interval_seconds; + std::atomic m_is_flusher_stopped; + std::mutex m_flusher_mutex; + std::condition_variable m_flusher_condition_var; + + PSI_thread_key m_psi_flusher_thread_key; + my_thread_handle m_flusher_thread; + my_thread_attr_t m_flusher_thread_attr; + std::unique_ptr m_flusher_thd; }; } // namespace masking_functions diff --git a/components/masking_functions/include/masking_functions/sys_vars.hpp b/components/masking_functions/include/masking_functions/sys_vars.hpp index 45a80992ef2f..599773df3816 100644 --- a/components/masking_functions/include/masking_functions/sys_vars.hpp +++ b/components/masking_functions/include/masking_functions/sys_vars.hpp @@ -16,11 +16,14 @@ #ifndef MASKING_FUNCTIONS_SYS_VARS_HPP #define MASKING_FUNCTIONS_SYS_VARS_HPP +#include "my_inttypes.h" + #include namespace masking_functions::sys_vars { std::string_view get_dict_database_name() noexcept; +ulonglong get_flush_interval_seconds() noexcept; bool register_sys_vars(); bool unregister_sys_vars(); diff --git a/components/masking_functions/src/component.cpp b/components/masking_functions/src/component.cpp index d37e24084934..d83b85de5202 100644 --- a/components/masking_functions/src/component.cpp +++ b/components/masking_functions/src/component.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,8 @@ REQUIRES_SERVICE_PLACEHOLDER(mysql_command_query_result); REQUIRES_SERVICE_PLACEHOLDER(mysql_command_options); REQUIRES_SERVICE_PLACEHOLDER(mysql_command_factory); +REQUIRES_PSI_THREAD_SERVICE_PLACEHOLDER; + REQUIRES_SERVICE_PLACEHOLDER(udf_registration); REQUIRES_SERVICE_PLACEHOLDER(dynamic_privilege_register); @@ -192,6 +195,8 @@ BEGIN_COMPONENT_REQUIRES(CURRENT_COMPONENT_NAME) REQUIRES_SERVICE(mysql_string_substr), REQUIRES_SERVICE(mysql_string_compare), + REQUIRES_PSI_THREAD_SERVICE, + REQUIRES_SERVICE(mysql_command_query), REQUIRES_SERVICE(mysql_command_query_result), REQUIRES_SERVICE(mysql_command_options), diff --git a/components/masking_functions/src/masking_functions/query_cache.cpp b/components/masking_functions/src/masking_functions/query_cache.cpp index 7107c728b265..a19219e9201e 100644 --- a/components/masking_functions/src/masking_functions/query_cache.cpp +++ b/components/masking_functions/src/masking_functions/query_cache.cpp @@ -19,18 +19,121 @@ #include "masking_functions/primitive_singleton.hpp" #include "masking_functions/query_builder.hpp" #include "masking_functions/sql_context.hpp" +#include "masking_functions/sys_vars.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +extern REQUIRES_SERVICE_PLACEHOLDER(log_builtins); namespace masking_functions { namespace { +constexpr std::string_view psi_category_name{"masking_functions"}; +constexpr std::string_view flusher_thd_psi_name{ + "masking_functions_dict_flusher"}; +constexpr std::string_view flusher_thd_psi_os_name{"mf_flusher"}; + using global_command_services = masking_functions::primitive_singleton< masking_functions::command_service_tuple>; using global_query_builder = masking_functions::primitive_singleton; +void *run_dict_flusher(void *arg) { + auto *self = reinterpret_cast(arg); + self->init_thd(); + self->dict_flusher(); + self->release_thd(); + return nullptr; +} + } // namespace -query_cache::query_cache() { load_cache(); } +query_cache::query_cache() + : m_flusher_interval_seconds{sys_vars::get_flush_interval_seconds()}, + m_is_flusher_stopped{true} { + load_cache(); + + if (m_flusher_interval_seconds > 0) { + PSI_thread_info thread_info{&m_psi_flusher_thread_key, + flusher_thd_psi_name.data(), + flusher_thd_psi_os_name.data(), + PSI_FLAG_SINGLETON, + 0, + PSI_DOCUMENT_ME}; + mysql_thread_register(psi_category_name.data(), &thread_info, 1); + + const auto res = + mysql_thread_create(m_psi_flusher_thread_key, &m_flusher_thread, + &m_flusher_thread_attr, run_dict_flusher, this); + + if (res != 0) { + LogComponentErr(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG, + "Cannot initialize dictionary flusher"); + } else { + m_is_flusher_stopped = false; + } + } +} + +query_cache::~query_cache() { + if (!m_is_flusher_stopped) { + m_is_flusher_stopped = true; + m_flusher_condition_var.notify_one(); + } +} + +void query_cache::init_thd() noexcept { + auto *thd = new THD; + my_thread_init(); + thd->set_new_thread_id(); + thd->thread_stack = reinterpret_cast(&thd); + thd->store_globals(); + m_flusher_thd.reset(thd); +} + +void query_cache::release_thd() noexcept { my_thread_end(); } + +void query_cache::dict_flusher() noexcept { +#ifdef HAVE_PSI_THREAD_INTERFACE + { + struct PSI_thread *psi = m_flusher_thd->get_psi(); + PSI_THREAD_CALL(set_thread_id)(psi, m_flusher_thd->thread_id()); + PSI_THREAD_CALL(set_thread_THD)(psi, m_flusher_thd.get()); + PSI_THREAD_CALL(set_thread_command)(m_flusher_thd->get_command()); + PSI_THREAD_CALL(set_thread_info) + (STRING_WITH_LEN("Masking functions component cache flusher")); + } +#endif + + while (!m_is_flusher_stopped) { + std::unique_lock lock{m_flusher_mutex}; + const auto wait_started_at = std::chrono::system_clock::now(); + m_flusher_condition_var.wait_for( + lock, std::chrono::seconds{m_flusher_interval_seconds}, + [this, wait_started_at] { + return std::chrono::duration_cast( + std::chrono::system_clock::now() - wait_started_at) >= + std::chrono::seconds{m_flusher_interval_seconds} || + m_is_flusher_stopped.load(); + }); + + if (!m_is_flusher_stopped) { + load_cache(); + + DBUG_EXECUTE_IF("masking_functions_signal_on_cache_reload", { + const char act[] = "now SIGNAL masking_functions_cache_reload_done"; + assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); + };); + } + } +} bool query_cache::load_cache() { auto query = global_query_builder::instance().select_all_from_dictionary(); diff --git a/components/masking_functions/src/masking_functions/sys_vars.cpp b/components/masking_functions/src/masking_functions/sys_vars.cpp index e8a44a7dbb58..3b4a02d3f313 100644 --- a/components/masking_functions/src/masking_functions/sys_vars.cpp +++ b/components/masking_functions/src/masking_functions/sys_vars.cpp @@ -21,6 +21,7 @@ #include +#include #include #include @@ -32,20 +33,30 @@ namespace masking_functions::sys_vars { namespace { using str_arg_check_type = STR_CHECK_ARG(str); +using ulonglong_arg_check_type = INTEGRAL_CHECK_ARG(ulonglong); constexpr std::string_view component_name{"masking_functions"}; constexpr std::string_view masking_database_var_name{"masking_database"}; +constexpr std::string_view flush_interval_var_name{ + "dictionaries_flush_interval_seconds"}; std::string default_database_name{"mysql"}; +const ulonglong default_flush_interval_seconds = 0; bool is_database_name_initialised = false; +bool is_flush_interval_initialised = false; char *database_name; +ulonglong flush_interval_seconds = 0; } // namespace std::string_view get_dict_database_name() noexcept { return database_name; } +ulonglong get_flush_interval_seconds() noexcept { + return flush_interval_seconds; +} + bool register_sys_vars() { str_arg_check_type check_db_name{default_database_name.data()}; @@ -61,6 +72,23 @@ bool register_sys_vars() { } is_database_name_initialised = true; + ulonglong_arg_check_type check_flush_interval{default_flush_interval_seconds, + 0, ULLONG_MAX, 1}; + + if (mysql_service_component_sys_variable_register->register_variable( + component_name.data(), flush_interval_var_name.data(), + PLUGIN_VAR_LONGLONG | PLUGIN_VAR_UNSIGNED | PLUGIN_VAR_RQCMDARG | + PLUGIN_VAR_READONLY, + "Sets the interval, in seconds, to wait before attempting to " + "schedule another flush of the data masking dictionaries table to " + "the memory data masking dictionaries cache following a restart or " + "previous execution.", + nullptr, nullptr, static_cast(&check_flush_interval), + static_cast(&flush_interval_seconds)) != 0) { + return false; + } + is_flush_interval_initialised = true; + return true; } @@ -73,6 +101,12 @@ bool unregister_sys_vars() { is_success = false; } + if (is_flush_interval_initialised && + mysql_service_component_sys_variable_unregister->unregister_variable( + component_name.data(), flush_interval_var_name.data()) != 0) { + is_success = false; + } + return is_success; } diff --git a/mysql-test/suite/component_masking_functions/r/rpl_dictionaries_flush_interval.result b/mysql-test/suite/component_masking_functions/r/rpl_dictionaries_flush_interval.result new file mode 100644 index 000000000000..1961728b739f --- /dev/null +++ b/mysql-test/suite/component_masking_functions/r/rpl_dictionaries_flush_interval.result @@ -0,0 +1,70 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +[connection master] +[connection master] +SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload'; +INSTALL COMPONENT 'file://component_masking_functions'; +[connection slave] +SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload'; +INSTALL COMPONENT 'file://component_masking_functions'; +[connection master] +CREATE TABLE mysql.masking_dictionaries( +Dictionary VARCHAR(256) NOT NULL, +Term VARCHAR(256) NOT NULL, +UNIQUE INDEX dictionary_term_idx (Dictionary, Term) +) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4; +CREATE USER udftest_priv@localhost; +GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost; +SELECT masking_dictionary_term_add('single_dict_1', 'entry_1'); +masking_dictionary_term_add('single_dict_1', 'entry_1') +1 +SELECT masking_dictionary_term_add('single_dict_2', 'entry_2'); +masking_dictionary_term_add('single_dict_2', 'entry_2') +1 +SELECT gen_dictionary('single_dict_1'); +gen_dictionary('single_dict_1') +entry_1 +SELECT gen_dictionary('single_dict_2'); +gen_dictionary('single_dict_2') +entry_2 +include/rpl_sync.inc +[connection slave] +SELECT * FROM mysql.masking_dictionaries; +Dictionary Term +single_dict_1 entry_1 +single_dict_2 entry_2 +SELECT gen_dictionary('single_dict_1'); +gen_dictionary('single_dict_1') +entry_1 +SELECT gen_dictionary('single_dict_2'); +gen_dictionary('single_dict_2') +entry_2 +[connection master] +INSERT INTO mysql.masking_dictionaries VALUES ('single_dict_3', 'entry_3'); +SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done'; +SELECT gen_dictionary('single_dict_3'); +gen_dictionary('single_dict_3') +entry_3 +include/rpl_sync.inc +[connection slave] +SELECT * FROM mysql.masking_dictionaries; +Dictionary Term +single_dict_1 entry_1 +single_dict_2 entry_2 +single_dict_3 entry_3 +SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done'; +SELECT gen_dictionary('single_dict_3'); +gen_dictionary('single_dict_3') +entry_3 +[connection slave] +SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload'; +UNINSTALL COMPONENT 'file://component_masking_functions'; +[connection master] +SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload'; +UNINSTALL COMPONENT 'file://component_masking_functions'; +DROP USER udftest_priv@localhost; +DROP TABLE mysql.masking_dictionaries; +include/rpl_sync.inc +include/rpl_end.inc diff --git a/mysql-test/suite/component_masking_functions/r/sys_var_dictionaries_flush_interval_seconds_basic.result b/mysql-test/suite/component_masking_functions/r/sys_var_dictionaries_flush_interval_seconds_basic.result new file mode 100644 index 000000000000..415f35602080 --- /dev/null +++ b/mysql-test/suite/component_masking_functions/r/sys_var_dictionaries_flush_interval_seconds_basic.result @@ -0,0 +1,31 @@ +INSTALL COMPONENT 'file://component_masking_functions'; +SELECT @@global.masking_functions.dictionaries_flush_interval_seconds; +@@global.masking_functions.dictionaries_flush_interval_seconds +0 +SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%"; +NAME +SET GLOBAL masking_functions.dictionaries_flush_interval_seconds=100; +ERROR HY000: Variable 'masking_functions.dictionaries_flush_interval_seconds' is a read only variable +SET SESSION masking_functions.dictionaries_flush_interval_seconds=100; +ERROR HY000: Variable 'masking_functions.dictionaries_flush_interval_seconds' is a read only variable +# restart: --masking-functions.dictionaries-flush-interval-seconds=100 +SELECT @@global.masking_functions.dictionaries_flush_interval_seconds; +@@global.masking_functions.dictionaries_flush_interval_seconds +100 +CREATE TABLE mysql.masking_dictionaries( +Dictionary VARCHAR(256) NOT NULL, +Term VARCHAR(256) NOT NULL, +UNIQUE INDEX dictionary_term_idx (Dictionary, Term) +) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4; +CREATE USER udftest_priv@localhost; +GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost; +SELECT masking_dictionary_term_add('single_dict_1', 'entry_1'); +masking_dictionary_term_add('single_dict_1', 'entry_1') +1 +SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%"; +NAME +thread/masking_functions/masking_functions_dict_flusher +UNINSTALL COMPONENT 'file://component_masking_functions'; +DROP USER udftest_priv@localhost; +DROP TABLE mysql.masking_dictionaries; +# restart: diff --git a/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-master.opt b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-master.opt new file mode 100644 index 000000000000..414e47ef180a --- /dev/null +++ b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-master.opt @@ -0,0 +1,2 @@ +$MASKING_FUNCTIONS_COMPONENT_OPT +--loose-masking_functions.dictionaries_flush_interval_seconds=1 diff --git a/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-slave.opt b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-slave.opt new file mode 100644 index 000000000000..414e47ef180a --- /dev/null +++ b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval-slave.opt @@ -0,0 +1,2 @@ +$MASKING_FUNCTIONS_COMPONENT_OPT +--loose-masking_functions.dictionaries_flush_interval_seconds=1 diff --git a/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval.test b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval.test new file mode 100644 index 000000000000..4449423814d4 --- /dev/null +++ b/mysql-test/suite/component_masking_functions/t/rpl_dictionaries_flush_interval.test @@ -0,0 +1,67 @@ +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/have_masking_functions_component.inc +--source include/master-slave.inc + +--source include/rpl_connection_master.inc +SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload'; +INSTALL COMPONENT 'file://component_masking_functions'; +--source include/rpl_connection_slave.inc +SET GLOBAL DEBUG='+d, masking_functions_signal_on_cache_reload'; +INSTALL COMPONENT 'file://component_masking_functions'; + +--source include/rpl_connection_master.inc +CREATE TABLE mysql.masking_dictionaries( + Dictionary VARCHAR(256) NOT NULL, + Term VARCHAR(256) NOT NULL, + UNIQUE INDEX dictionary_term_idx (Dictionary, Term) +) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE USER udftest_priv@localhost; +GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost; +--connect(con_priv,localhost,udftest_priv,,) + +SELECT masking_dictionary_term_add('single_dict_1', 'entry_1'); +SELECT masking_dictionary_term_add('single_dict_2', 'entry_2'); +SELECT gen_dictionary('single_dict_1'); +SELECT gen_dictionary('single_dict_2'); + +--source include/rpl_sync.inc +--source include/rpl_connection_slave.inc + +SELECT * FROM mysql.masking_dictionaries; +SELECT gen_dictionary('single_dict_1'); +SELECT gen_dictionary('single_dict_2'); + +--source include/rpl_connection_master.inc +INSERT INTO mysql.masking_dictionaries VALUES ('single_dict_3', 'entry_3'); +SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done'; + +# Will fail to get data from single_dict_3 at this point if no dictionary flusher thread is running +SELECT gen_dictionary('single_dict_3'); + +--source include/rpl_sync.inc +--source include/rpl_connection_slave.inc + +SELECT * FROM mysql.masking_dictionaries; +SET DEBUG_SYNC='now WAIT_FOR masking_functions_cache_reload_done'; + +# Will fail to get data from single_dict_3 at this point if no dictionary flusher thread is running +SELECT gen_dictionary('single_dict_3'); + +# +# Cleanup +--disconnect con_priv + +--source include/rpl_connection_slave.inc +SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload'; +UNINSTALL COMPONENT 'file://component_masking_functions'; +--source include/rpl_connection_master.inc +SET GLOBAL DEBUG='-d, masking_functions_signal_on_cache_reload'; +UNINSTALL COMPONENT 'file://component_masking_functions'; + +DROP USER udftest_priv@localhost; +DROP TABLE mysql.masking_dictionaries; + +--source include/rpl_sync.inc +--source include/rpl_end.inc diff --git a/mysql-test/suite/component_masking_functions/t/sys_var_dictionaries_flush_interval_seconds_basic.test b/mysql-test/suite/component_masking_functions/t/sys_var_dictionaries_flush_interval_seconds_basic.test new file mode 100644 index 000000000000..cfb8a75a0359 --- /dev/null +++ b/mysql-test/suite/component_masking_functions/t/sys_var_dictionaries_flush_interval_seconds_basic.test @@ -0,0 +1,47 @@ +--source include/have_masking_functions_component.inc + +INSTALL COMPONENT 'file://component_masking_functions'; + +# No running flusher thread with default settings +SELECT @@global.masking_functions.dictionaries_flush_interval_seconds; +SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%"; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +SET GLOBAL masking_functions.dictionaries_flush_interval_seconds=100; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +SET SESSION masking_functions.dictionaries_flush_interval_seconds=100; + +--let $restart_parameters="restart: --masking-functions.dictionaries-flush-interval-seconds=100" +--source include/restart_mysqld.inc + +SELECT @@global.masking_functions.dictionaries_flush_interval_seconds; + +# Make sure dict flusher process is running +CREATE TABLE mysql.masking_dictionaries( + Dictionary VARCHAR(256) NOT NULL, + Term VARCHAR(256) NOT NULL, + UNIQUE INDEX dictionary_term_idx (Dictionary, Term) +) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE USER udftest_priv@localhost; +GRANT MASKING_DICTIONARIES_ADMIN ON *.* TO udftest_priv@localhost; +--connect(con_priv,localhost,udftest_priv,,) + +SELECT masking_dictionary_term_add('single_dict_1', 'entry_1'); + +# Flusher thread is active +--connection default +SELECT NAME FROM performance_schema.threads WHERE NAME LIKE "%masking_functions%"; + +# +# Cleanup +--disconnect con_priv + +UNINSTALL COMPONENT 'file://component_masking_functions'; + +DROP USER udftest_priv@localhost; +DROP TABLE mysql.masking_dictionaries; + +--let $restart_parameters="restart:" +--source include/restart_mysqld.inc