-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: move core_demormalization into gem
- Loading branch information
1 parent
8e1e11c
commit 5d625a2
Showing
82 changed files
with
5,592 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,4 +15,4 @@ mkmf.log | |
/.rvmrc | ||
/.idea/ | ||
gemfiles/ | ||
internal/log | ||
spec/internal/log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,4 +2,5 @@ | |
--tty | ||
--format progress | ||
--order random | ||
--backtrace | ||
--backtrace | ||
--require ./spec/spec_helper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
module Treasury | ||
class BaseJob < Treasury::BgExecutor::Job::Indicated | ||
acts_as_no_cancel | ||
acts_as_critical notify_email: Treasury.configuration.job_error_notifications | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# coding: utf-8 | ||
|
||
module Treasury | ||
class DelayedIncrementJob | ||
include Resque::Integration | ||
|
||
queue :base | ||
retrys | ||
|
||
# Public: Отложенный инкремент поля | ||
# | ||
# params - Hash: | ||
# 'object' - Integer идентификатор | ||
# 'field_name' - String название поля | ||
# 'field_class' - String класс поля | ||
# 'by' - Integer приращение | ||
# | ||
# Returns nothing | ||
def self.perform(params) | ||
object = params.fetch('object') | ||
field_name = params.fetch('field_name') | ||
increment = params.fetch('by') | ||
|
||
field = Treasury::Fields::Base.create_by_class(params.fetch('field_class')) | ||
new_value = field.raw_value(object, field_name).to_i + increment | ||
|
||
field.write_data({object => {field_name => new_value}}, true) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
module Treasury | ||
class InitializeFieldJob < BaseJob | ||
acts_as_singleton [:field_class] | ||
|
||
def execute | ||
field = Treasury::Fields::Base.create_by_class(params[:field_class]) | ||
field.initialize! | ||
end | ||
|
||
def title | ||
"#{self.class.name.underscore.gsub('_job', '')}: #{params[:field_class]}" | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
module Treasury | ||
class SupervisorJob < BaseJob | ||
acts_as_singleton | ||
|
||
def execute | ||
Treasury::Supervisor.run | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
module Treasury | ||
class WorkerJob < BaseJob | ||
acts_as_singleton [:worker_id] | ||
|
||
def execute | ||
Worker.run(params[:worker_id]) | ||
end | ||
|
||
def title | ||
"#{self.class.name.underscore.gsub('_job', '')}:#{worker.name}" | ||
end | ||
|
||
protected | ||
|
||
def worker | ||
@worker ||= Treasury::Models::Worker.find(params[:worker_id]) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# coding: utf-8 | ||
module Treasury | ||
module Models | ||
class EventsLog < ActiveRecord::Base | ||
self.table_name = 'denormalization.events_log' | ||
|
||
def self.clear(date) | ||
delete_all(['processed_at::date = ?', date]) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# coding: utf-8 | ||
module Treasury | ||
module Models | ||
class Field < ActiveRecord::Base | ||
self.table_name = 'denormalization.fields' | ||
self.primary_key = 'id' | ||
|
||
has_many :processors, | ||
class_name: 'Treasury::Models::Processor', | ||
dependent: :destroy, | ||
order: 'processors.oid', | ||
inverse_of: :field | ||
|
||
belongs_to :worker, class_name: 'Treasury::Models::Worker' | ||
|
||
scope :active, -> { where(active: true) } | ||
scope :ordered, -> { order(arel_table[:oid]) } | ||
scope :initialized, -> { where(state: Fields::STATE_INITIALIZED) } | ||
scope :in_initialize, -> { where(state: Fields::STATE_IN_INITIALIZE) } | ||
scope :for_initialize_or_in_initialize, -> do | ||
where(state: [Fields::STATE_NEED_INITIALIZE, Fields::STATE_IN_INITIALIZE]) | ||
end | ||
scope :for_processing, -> { active.initialized.ordered } | ||
|
||
serialize :params, Hash | ||
serialize :storage, Array | ||
|
||
def need_initialize! | ||
update_attribute(:state, Fields::STATE_NEED_INITIALIZE) | ||
end | ||
|
||
def need_initialize? | ||
state == Fields::STATE_NEED_INITIALIZE | ||
end | ||
|
||
def suspend | ||
update_attribute(:active, false) | ||
end | ||
|
||
def resume | ||
update_attribute(:active, true) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# coding: utf-8 | ||
module Treasury | ||
module Models | ||
class Processor < ActiveRecord::Base | ||
self.table_name = 'denormalization.processors' | ||
self.primary_key = 'id' | ||
|
||
belongs_to :queue, class_name: 'Treasury::Models::Queue', inverse_of: :processors | ||
belongs_to :field, class_name: 'Treasury::Models::Field', inverse_of: :processors | ||
|
||
before_destroy :unregister_consumer | ||
|
||
serialize :params, Hash | ||
|
||
require 'treasury/pgq' | ||
|
||
def subscribe! | ||
unregister_consumer | ||
create_queue_if_needet | ||
# TODO: check and create or enable trigger if needet! | ||
ActiveRecord::Base.pgq_register_consumer(queue.pgq_queue_name, consumer_name, queue.work_connection) | ||
end | ||
|
||
def unregister_consumer | ||
ActiveRecord::Base.pgq_unregister_consumer(queue.pgq_queue_name, consumer_name, queue.work_connection) | ||
end | ||
|
||
def create_queue_if_needet | ||
return if queue.pgq_queue_exists? | ||
|
||
queue.create_pgq_queue | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# coding: utf-8 | ||
module Treasury | ||
module Models | ||
class Queue < ActiveRecord::Base | ||
self.table_name = 'denormalization.queues' | ||
self.primary_key = 'id' | ||
|
||
TRIGGER_PREFIX = 'tr_denorm'.freeze | ||
|
||
has_many :processors, :class_name => 'Treasury::Models::Processor', :dependent => :destroy | ||
|
||
before_destroy :destroy_pgq_queue | ||
after_create :create_pgq_queue | ||
# + пересоздавать очередь и триггер при изменениях | ||
|
||
def self.generate_trigger(options = {}) | ||
options = {:backup => true}.merge(options) | ||
|
||
raise ArgumentError if options[:ignore] && options[:include] | ||
raise ArgumentError, ':table_name is required' if options[:include] && !options[:table_name] | ||
|
||
events = options[:events] && ([*options[:events]] & [:insert, :update, :delete]) | ||
events = [:insert, :update, :delete] if events.nil? | ||
raise ArgumentError, ':events should include :insert, :update or :delete' if events && events.empty? | ||
|
||
if options[:include].present? | ||
connection = options.fetch(:connection, ActiveRecord::Base.connection) | ||
|
||
all_table_columns = connection.columns(options[:table_name]).map(&:name) | ||
included_table_columns = options[:include].split(',').map(&:strip).uniq.compact | ||
ignore_list = (all_table_columns - included_table_columns).join(',') | ||
elsif options[:ignore].present? | ||
ignore_list = options[:ignore] | ||
end | ||
|
||
conditions = nil | ||
conditions = "WHEN (#{options[:conditions]})" if options.key?(:conditions) | ||
|
||
params = '' | ||
params << ", 'backup'" if options[:backup] | ||
params << ", #{quote("ignore=#{ignore_list}")}" if ignore_list.present? | ||
params << ", #{quote("pkey=#{Array.wrap(options[:pkey]).join(',')}")}" if options[:pkey].present? | ||
|
||
of_columns = "OF #{options[:of_columns].join(',')}" if options[:of_columns] | ||
|
||
<<-SQL | ||
CREATE TRIGGER %{trigger_name} | ||
AFTER #{events.join(' OR ')} | ||
#{of_columns} | ||
ON %{table_name} | ||
FOR EACH ROW | ||
#{conditions} | ||
EXECUTE PROCEDURE pgq.logutriga(%{queue_name}#{params}); | ||
SQL | ||
end | ||
|
||
def generate_trigger(options = {}) | ||
options = { | ||
table_name: table_name, | ||
connection: work_connection | ||
}.merge(options) | ||
|
||
self.class.generate_trigger(options) | ||
end | ||
|
||
def create_pgq_queue | ||
work_connection.transaction do | ||
result = self.class.pgq_create_queue(pgq_queue_name, work_connection) | ||
raise "Queue already exists! #{pgq_queue_name}" if result == 0 | ||
recreate_trigger(false) | ||
end | ||
end | ||
|
||
def recreate_trigger(lock_table = true) | ||
return unless table_name.present? | ||
|
||
work_connection.transaction do | ||
self.lock_table! if lock_table | ||
drop_pgq_trigger | ||
create_pgq_trigger | ||
end | ||
end | ||
|
||
def pgq_queue_exists? | ||
self.class.pgq_get_queue_info(pgq_queue_name, work_connection).present? | ||
end | ||
|
||
# Public: Рабочее соединение с БД для данной очереди. | ||
# | ||
# В рамках этого соединения производятся все действия с объектами привязанными к данной очереди, | ||
# а именно инициализация и обработка событий. | ||
# | ||
# db_link_class - Имя класса - модели ActiveRecord, обеспечивающая связь с БД. | ||
# | ||
# Returns ActiveRecord::ConnectionAdapters::AbstractAdapter. | ||
# | ||
def work_connection | ||
return main_connection if db_link_class.nil? | ||
|
||
db_link_class.constantize.connection | ||
end | ||
|
||
# Public: Основное соединение с БД. | ||
# | ||
# В рамках этого соединения производятся общие действия (изменения метаданных). | ||
# | ||
# Returns ActiveRecord::ConnectionAdapters::AbstractAdapter. | ||
# | ||
def main_connection | ||
ActiveRecord::Base.connection | ||
end | ||
|
||
def pgq_queue_name | ||
"q_#{name}" | ||
end | ||
|
||
protected | ||
|
||
def lock_table! | ||
work_connection.execute <<-SQL | ||
LOCK TABLE #{table_name} IN SHARE MODE | ||
SQL | ||
end | ||
|
||
def create_pgq_trigger(options = {}) | ||
default_options = { | ||
trigger_code: trigger_code || generate_trigger(options), | ||
trigger_name: trigger_name, | ||
table_name: table_name, | ||
queue_name: pgq_queue_name | ||
} | ||
|
||
options.reverse_merge!(default_options) | ||
options[:queue_name] = quote(options[:queue_name]) | ||
|
||
work_connection.execute options[:trigger_code] % options | ||
end | ||
|
||
def drop_pgq_trigger | ||
return unless table_name.present? | ||
|
||
work_connection.execute <<-SQL | ||
DROP TRIGGER IF EXISTS #{trigger_name} ON #{table_name} | ||
SQL | ||
end | ||
|
||
def pgq_drop_queue | ||
return unless self.class.pgq_queue_exists?(pgq_queue_name, work_connection) | ||
self.class.pgq_drop_queue(pgq_queue_name, work_connection) | ||
end | ||
|
||
def trigger_name | ||
# cut scheme name | ||
clear_name = name.split('.').last | ||
"#{TRIGGER_PREFIX}_#{clear_name}" | ||
end | ||
|
||
def quote(text) | ||
self.class.quote(text) | ||
end | ||
|
||
def self.quote(text) | ||
ActiveRecord::Base.connection.quote(text) | ||
end | ||
|
||
def destroy_pgq_queue | ||
work_connection.transaction do | ||
processors.each(&:unregister_consumer) | ||
pgq_drop_queue | ||
drop_pgq_trigger | ||
end | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.