Skip to content

Commit

Permalink
feature: move core_demormalization into gem
Browse files Browse the repository at this point in the history
  • Loading branch information
vadshalamov committed Dec 7, 2016
1 parent 8e1e11c commit 5d625a2
Show file tree
Hide file tree
Showing 82 changed files with 5,592 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ mkmf.log
/.rvmrc
/.idea/
gemfiles/
internal/log
spec/internal/log
3 changes: 2 additions & 1 deletion .rspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
--tty
--format progress
--order random
--backtrace
--backtrace
--require ./spec/spec_helper
4 changes: 0 additions & 4 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,3 @@ end if RUBY_VERSION < '2'
appraise 'activesupport3.2' do
gem 'activesupport', '~> 3.2.0'
end

appraise 'activesupport4.0' do
gem 'activesupport', '~> 4.0.13'
end
6 changes: 6 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ source 'https://rubygems.org'
if RUBY_VERSION < '2'
gem 'mime-types', '< 3.0'
gem 'json', '< 2'
gem 'pry-debugger'
gem 'pg', '<= 0.18.4'
gem 'shoulda-matchers', '< 3.0.0'
else
gem 'pry-byebug'
gem 'test-unit'
end

gemspec
6 changes: 6 additions & 0 deletions app/jobs/treasury/base_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module Treasury
class BaseJob < Treasury::BgExecutor::Job::Indicated
acts_as_no_cancel
acts_as_critical notify_email: Treasury.configuration.job_error_notifications
end
end
30 changes: 30 additions & 0 deletions app/jobs/treasury/delayed_increment_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# coding: utf-8

module Treasury
class DelayedIncrementJob
include Resque::Integration

queue :base
retrys

# Public: Отложенный инкремент поля
#
# params - Hash:
# 'object' - Integer идентификатор
# 'field_name' - String название поля
# 'field_class' - String класс поля
# 'by' - Integer приращение
#
# Returns nothing
def self.perform(params)
object = params.fetch('object')
field_name = params.fetch('field_name')
increment = params.fetch('by')

field = Treasury::Fields::Base.create_by_class(params.fetch('field_class'))
new_value = field.raw_value(object, field_name).to_i + increment

field.write_data({object => {field_name => new_value}}, true)
end
end
end
14 changes: 14 additions & 0 deletions app/jobs/treasury/initialize_field_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Treasury
class InitializeFieldJob < BaseJob
acts_as_singleton [:field_class]

def execute
field = Treasury::Fields::Base.create_by_class(params[:field_class])
field.initialize!
end

def title
"#{self.class.name.underscore.gsub('_job', '')}: #{params[:field_class]}"
end
end
end
9 changes: 9 additions & 0 deletions app/jobs/treasury/supervisor_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Treasury
class SupervisorJob < BaseJob
acts_as_singleton

def execute
Treasury::Supervisor.run
end
end
end
19 changes: 19 additions & 0 deletions app/jobs/treasury/worker_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module Treasury
class WorkerJob < BaseJob
acts_as_singleton [:worker_id]

def execute
Worker.run(params[:worker_id])
end

def title
"#{self.class.name.underscore.gsub('_job', '')}:#{worker.name}"
end

protected

def worker
@worker ||= Treasury::Models::Worker.find(params[:worker_id])
end
end
end
12 changes: 12 additions & 0 deletions app/models/treasury/models/events_log.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# coding: utf-8
module Treasury
module Models
class EventsLog < ActiveRecord::Base
self.table_name = 'denormalization.events_log'

def self.clear(date)
delete_all(['processed_at::date = ?', date])
end
end
end
end
45 changes: 45 additions & 0 deletions app/models/treasury/models/field.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding: utf-8
module Treasury
module Models
class Field < ActiveRecord::Base
self.table_name = 'denormalization.fields'
self.primary_key = 'id'

has_many :processors,
class_name: 'Treasury::Models::Processor',
dependent: :destroy,
order: 'processors.oid',
inverse_of: :field

belongs_to :worker, class_name: 'Treasury::Models::Worker'

scope :active, -> { where(active: true) }
scope :ordered, -> { order(arel_table[:oid]) }
scope :initialized, -> { where(state: Fields::STATE_INITIALIZED) }
scope :in_initialize, -> { where(state: Fields::STATE_IN_INITIALIZE) }
scope :for_initialize_or_in_initialize, -> do
where(state: [Fields::STATE_NEED_INITIALIZE, Fields::STATE_IN_INITIALIZE])
end
scope :for_processing, -> { active.initialized.ordered }

serialize :params, Hash
serialize :storage, Array

def need_initialize!
update_attribute(:state, Fields::STATE_NEED_INITIALIZE)
end

def need_initialize?
state == Fields::STATE_NEED_INITIALIZE
end

def suspend
update_attribute(:active, false)
end

def resume
update_attribute(:active, true)
end
end
end
end
35 changes: 35 additions & 0 deletions app/models/treasury/models/processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# coding: utf-8
module Treasury
module Models
class Processor < ActiveRecord::Base
self.table_name = 'denormalization.processors'
self.primary_key = 'id'

belongs_to :queue, class_name: 'Treasury::Models::Queue', inverse_of: :processors
belongs_to :field, class_name: 'Treasury::Models::Field', inverse_of: :processors

before_destroy :unregister_consumer

serialize :params, Hash

require 'treasury/pgq'

def subscribe!
unregister_consumer
create_queue_if_needet
# TODO: check and create or enable trigger if needet!
ActiveRecord::Base.pgq_register_consumer(queue.pgq_queue_name, consumer_name, queue.work_connection)
end

def unregister_consumer
ActiveRecord::Base.pgq_unregister_consumer(queue.pgq_queue_name, consumer_name, queue.work_connection)
end

def create_queue_if_needet
return if queue.pgq_queue_exists?

queue.create_pgq_queue
end
end
end
end
175 changes: 175 additions & 0 deletions app/models/treasury/models/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# coding: utf-8
module Treasury
module Models
class Queue < ActiveRecord::Base
self.table_name = 'denormalization.queues'
self.primary_key = 'id'

TRIGGER_PREFIX = 'tr_denorm'.freeze

has_many :processors, :class_name => 'Treasury::Models::Processor', :dependent => :destroy

before_destroy :destroy_pgq_queue
after_create :create_pgq_queue
# + пересоздавать очередь и триггер при изменениях

def self.generate_trigger(options = {})
options = {:backup => true}.merge(options)

raise ArgumentError if options[:ignore] && options[:include]
raise ArgumentError, ':table_name is required' if options[:include] && !options[:table_name]

events = options[:events] && ([*options[:events]] & [:insert, :update, :delete])
events = [:insert, :update, :delete] if events.nil?
raise ArgumentError, ':events should include :insert, :update or :delete' if events && events.empty?

if options[:include].present?
connection = options.fetch(:connection, ActiveRecord::Base.connection)

all_table_columns = connection.columns(options[:table_name]).map(&:name)
included_table_columns = options[:include].split(',').map(&:strip).uniq.compact
ignore_list = (all_table_columns - included_table_columns).join(',')
elsif options[:ignore].present?
ignore_list = options[:ignore]
end

conditions = nil
conditions = "WHEN (#{options[:conditions]})" if options.key?(:conditions)

params = ''
params << ", 'backup'" if options[:backup]
params << ", #{quote("ignore=#{ignore_list}")}" if ignore_list.present?
params << ", #{quote("pkey=#{Array.wrap(options[:pkey]).join(',')}")}" if options[:pkey].present?

of_columns = "OF #{options[:of_columns].join(',')}" if options[:of_columns]

<<-SQL
CREATE TRIGGER %{trigger_name}
AFTER #{events.join(' OR ')}
#{of_columns}
ON %{table_name}
FOR EACH ROW
#{conditions}
EXECUTE PROCEDURE pgq.logutriga(%{queue_name}#{params});
SQL
end

def generate_trigger(options = {})
options = {
table_name: table_name,
connection: work_connection
}.merge(options)

self.class.generate_trigger(options)
end

def create_pgq_queue
work_connection.transaction do
result = self.class.pgq_create_queue(pgq_queue_name, work_connection)
raise "Queue already exists! #{pgq_queue_name}" if result == 0
recreate_trigger(false)
end
end

def recreate_trigger(lock_table = true)
return unless table_name.present?

work_connection.transaction do
self.lock_table! if lock_table
drop_pgq_trigger
create_pgq_trigger
end
end

def pgq_queue_exists?
self.class.pgq_get_queue_info(pgq_queue_name, work_connection).present?
end

# Public: Рабочее соединение с БД для данной очереди.
#
# В рамках этого соединения производятся все действия с объектами привязанными к данной очереди,
# а именно инициализация и обработка событий.
#
# db_link_class - Имя класса - модели ActiveRecord, обеспечивающая связь с БД.
#
# Returns ActiveRecord::ConnectionAdapters::AbstractAdapter.
#
def work_connection
return main_connection if db_link_class.nil?

db_link_class.constantize.connection
end

# Public: Основное соединение с БД.
#
# В рамках этого соединения производятся общие действия (изменения метаданных).
#
# Returns ActiveRecord::ConnectionAdapters::AbstractAdapter.
#
def main_connection
ActiveRecord::Base.connection
end

def pgq_queue_name
"q_#{name}"
end

protected

def lock_table!
work_connection.execute <<-SQL
LOCK TABLE #{table_name} IN SHARE MODE
SQL
end

def create_pgq_trigger(options = {})
default_options = {
trigger_code: trigger_code || generate_trigger(options),
trigger_name: trigger_name,
table_name: table_name,
queue_name: pgq_queue_name
}

options.reverse_merge!(default_options)
options[:queue_name] = quote(options[:queue_name])

work_connection.execute options[:trigger_code] % options
end

def drop_pgq_trigger
return unless table_name.present?

work_connection.execute <<-SQL
DROP TRIGGER IF EXISTS #{trigger_name} ON #{table_name}
SQL
end

def pgq_drop_queue
return unless self.class.pgq_queue_exists?(pgq_queue_name, work_connection)
self.class.pgq_drop_queue(pgq_queue_name, work_connection)
end

def trigger_name
# cut scheme name
clear_name = name.split('.').last
"#{TRIGGER_PREFIX}_#{clear_name}"
end

def quote(text)
self.class.quote(text)
end

def self.quote(text)
ActiveRecord::Base.connection.quote(text)
end

def destroy_pgq_queue
work_connection.transaction do
processors.each(&:unregister_consumer)
pgq_drop_queue
drop_pgq_trigger
end
end
end
end
end
Loading

0 comments on commit 5d625a2

Please sign in to comment.