diff --git a/app/mailers/treasury/bg_executor_mailer.rb b/app/mailers/treasury/bg_executor_mailer.rb new file mode 100644 index 0000000..6b4d86c --- /dev/null +++ b/app/mailers/treasury/bg_executor_mailer.rb @@ -0,0 +1,13 @@ +# coding: utf-8 +module Treasury + class BgExecutorMailer < ActionMailer::Base + default from: DO_NOT_REPLY, + return_path: DO_NOT_REPLY_RETURN_PATH + + def notify(recipient, subject, message, exception = nil) + @message = message.to_s + @exception = exception + mail(to: recipient, subject: subject) + end + end +end diff --git a/app/views/bg_executor_mailer/notify.html.haml b/app/views/bg_executor_mailer/notify.html.haml new file mode 100644 index 0000000..8c4f8ef --- /dev/null +++ b/app/views/bg_executor_mailer/notify.html.haml @@ -0,0 +1,6 @@ += @message + +- if @exception.present? + %p Exception: #{@exception.message} + %div + = @exception.backtrace.join("
").html_safe diff --git a/lib/tasks/bge_tasks.rake b/lib/tasks/bge_tasks.rake new file mode 100644 index 0000000..7c7e877 --- /dev/null +++ b/lib/tasks/bge_tasks.rake @@ -0,0 +1,13 @@ +namespace :bge do + task start: :environment do + Treasury::BgExecutor.daemonize("start") + end + + task stop: :environment do + Treasury::BgExecutor.daemonize("stop") + end + + task restart: :environment do + Treasury::BgExecutor.daemonize("restart") + end +end diff --git a/lib/treasury.rb b/lib/treasury.rb index 8b96633..26a8b39 100644 --- a/lib/treasury.rb +++ b/lib/treasury.rb @@ -4,7 +4,16 @@ require 'treasury/version' require 'treasury/engine' +require 'treasury/bg_executor' module Treasury LIST_DELIMITER = ','.freeze + + def self.configuration + @configuration ||= Configuration.new + end + + def self.configure + yield configuration + end end diff --git a/lib/treasury/bg_executor.rb b/lib/treasury/bg_executor.rb new file mode 100644 index 0000000..fc3973a --- /dev/null +++ b/lib/treasury/bg_executor.rb @@ -0,0 +1,16 @@ +require "daemons" +require "treasury/bg_executor/errors" + +module Treasury + module BgExecutor + class << self + def daemonize(*args) + options = Treasury.configuration.bge_daemon_options + options[:ARGV] = args + + file_path = File.expand_path(File.join(File.dirname(__FILE__), "bg_executor", "bg_executor_daemon.rb")) + Daemons.run(file_path, options) + end + end + end +end diff --git a/lib/treasury/bg_executor/bg_executor_daemon.rb b/lib/treasury/bg_executor/bg_executor_daemon.rb new file mode 100644 index 0000000..e881fdf --- /dev/null +++ b/lib/treasury/bg_executor/bg_executor_daemon.rb @@ -0,0 +1,25 @@ +Daemons::Application.class_eval do + def exception_log + # stub + end +end + +daemon = Treasury::BgExecutor::Daemon.new + +$running = true + +def terminate + puts "Start terminating..." + $running = false +end + +Signal.trap("TERM") { terminate } +Signal.trap("KILL") { terminate } if Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.0.0') +Signal.trap("INT") { terminate } + +while($running) + daemon.execute_job + sleep 0.1 +end + +puts 'Exit' diff --git a/lib/treasury/bg_executor/client.rb b/lib/treasury/bg_executor/client.rb new file mode 100644 index 0000000..b633568 --- /dev/null +++ b/lib/treasury/bg_executor/client.rb @@ -0,0 +1,254 @@ +# coding: utf-8 +require 'digest/sha2' + +module Treasury + module BgExecutor + # Классс для клиентов BgExecutor + # В инстансе мы можем: + # поставить задачу в очередь, + # спросить статус задачи, + # узнать информацию о задаче, + # спросить конечный результат + class Client + include Singleton + + CACHE_MUTEX = "bg_executor:cache:mutex".freeze + SEQUENCE_KEY = "bg_executor:jobs_sequence".freeze + QUEUE_KEY = "bg_executor:jobs_queue".freeze + SINGLETON_JOBS_HASH_KEY = "bg_executor:singleton_jobs_hash".freeze + JOBS_KEY_PREFIX = "bg_executor:job:".freeze + + # constructor + def initialize + redis.delete QUEUE_KEY unless redis.list? QUEUE_KEY + end + + def redis + @cache ||= BgExecutor::Redis.new + end + + def reconnect! + redis.redis.client.reconnect + end + + # поставить задачу в очередь + # возвращает два значения: ID задачи и ключ доступа к задаче + def queue_job!(job_name, args = {}) + klass = job_class(job_name) + singleton_hexdigest = nil + + if klass.acts_as_singleton? + singleton_hexdigest = klass.singleton_hexdigest(args) + is_running = singleton_job_running?(job_name, args, singleton_hexdigest) + return is_running if is_running + end + + args[:_critical] = true if klass.acts_as_critical? + args.merge!(klass.default_args) if klass.default_args + + id = next_id + raise QueueError if id.nil? + secure = generate_secure_key + + if klass.acts_as_singleton? + add_to_singletons(singleton_hexdigest, id: id, secure_key: secure, queued_at: Time.now.to_f) + end + + # это для того, чтобы в логе было видно когда поставлена задача + args[:created_at] = Time.now.to_i + + redis[job_key(id)] = { + id: id, + secure_key: secure, + job_name: job_name, + args: args, + singleton_hexdigest: singleton_hexdigest, + status: :new, + info: {}, + error: nil, + result: nil, + queued_at: Time.now.to_f, + started_at: nil, + finished_at: nil, + failed_at: nil + } + + redis.push QUEUE_KEY, id: id, + job_name: job_name, + args: args + if Rails.logger + Rails.logger.info "BgExecutor queued job :name => #{job_name}, :id => #{id} :args => #{args.inspect}" + end + + [id, secure] + end + alias_method :push_job!, :queue_job! + + # получить из очереди задание + def pop + redis.synchronize(CACHE_MUTEX) { redis.pop(QUEUE_KEY) } + rescue => e + puts "Error in BgExecutor::Client#pop" + puts e.message + puts e.backtrace.join("\n") + end + + def singleton_job_running?(job_name, args, hexdigest = nil) + klass = job_class(job_name) + return unless klass.acts_as_singleton? + hexdigest ||= klass.singleton_hexdigest(args) + if res = redis.hget(SINGLETON_JOBS_HASH_KEY, hexdigest) + if !klass.acts_as_no_cancel? && res[:queued_at] && (Time.now - Time.at(res[:queued_at])) > 12.hours # сборщик мусора =) + fail_job!(res[:id], 'Job выполняется более 12 часов. Убиваем его в редисе.') + remove_from_singletons(hexdigest) # на всякий случай сотрём инфу из хеша синглтонов, вдруг самого джоба уже не было в редисе + return + end + return [res[:id], res[:secure_key]] + end + rescue => e + puts "Error in BgExecutor::Client#singleton_job_running?" + puts e.message + puts e.backtrace.join("\n") + return nil + end + + def job_class(job_name) + @_class_cache ||= {} + @_class_cache[job_name] ||= "#{job_name}_job".classify.constantize + end + + def job_exists?(job_id, secure_key = nil) + exists = redis.exists?(job_key(job_id)) + + raise JobAccessError if exists && secure_key.present? && !secure_key_matches?(job_id, secure_key) + + exists + end + + # получить статус задачи + def ask_status(job_id, secure_key = nil) + return nil unless job_exists? job_id, secure_key + + raise JobAccessError unless secure_key_matches?(job_id, secure_key) + + (find_job(job_id) || {})[:status] + end + + # получить информацию из задачи + def ask_info(job_id, secure_key = nil) + return nil unless job_exists? job_id, secure_key + + raise JobAccessError unless secure_key_matches?(job_id, secure_key) + + (find_job(job_id) || {})[:info] + end + + # получить результат выполнения задачи + def ask_result(job_id, secure_key = nil) + return nil unless job_exists? job_id, secure_key + + raise JobAccessError unless secure_key_matches?(job_id, secure_key) + + j = find_job(job_id) || {} + raise JobExecutionError, j[:error] unless j[:error].blank? + + j[:result] + end + + # проверить ключ к задаче на зуб + def secure_key_matches?(job_id, secure_key) + return true if secure_key.nil? + find_job(job_id)[:secure_key] == secure_key + end + + # обновить информацию о задании + def update_job!(job_id, params) + redis[job_key job_id] = redis[job_key(job_id)].merge(params) + rescue => e + puts "Error in BgExecutor::Client#update_job!" + puts e.message + end + + def start_job!(job_id) + update_job!(job_id, status: :running, started_at: Time.now.to_f) + rescue => e + puts "Error in BgExecutor::Client#start_job!" + puts e.message + end + + # считать задание завершенным + def finish_job!(job_id) + if (job = find_job(job_id)) + remove_from_singletons(job[:singleton_hexdigest]) if job[:singleton_hexdigest] + + job_updates = {:status => :finished, :finished_at => Time.now.to_f} + if job[:started_at].present? + job_updates[:info] = job[:info].merge(:execution_time => "%.2f" % [Time.now.to_f - job[:started_at]]) + end + update_job!(job_id, job_updates) + end + redis.expire(job_key(job_id), 600) + rescue => e + puts "Error in BgExecutor::Client#finish_job!" + puts e.message + end + + # считать задание проваленным + def fail_job!(job_id, exception) + if exception.is_a?(::Exception) + error = [exception.message, exception.backtrace.present? ? exception.backtrace.join("\n") : ''].join("\n") + else + error = exception.to_s + end + + if (job = find_job(job_id)) + remove_from_singletons(job[:singleton_hexdigest]) if job[:singleton_hexdigest] + + job_updates = {:status => :failed, :error => error, :failed_at => Time.now.to_f} + if job[:started_at].present? + job_updates[:info] = job[:info].merge(:execution_time => "%.2f" % [Time.now.to_f - job[:started_at]]) + end + update_job!(job_id, job_updates) + end + redis.expire(job_key(job_id), 600) + rescue => e + puts "Error in BgExecutor::Client#fail_job!" + puts e.message + end + + def reset! + redis.synchronize(CACHE_MUTEX) do + redis.zero(SEQUENCE_KEY) + redis.delete(QUEUE_KEY) + redis.delete(SINGLETON_JOBS_HASH_KEY) + end + end + + def find_job(id) + redis[job_key(id)] + end + + protected + + def next_id + redis.increment(SEQUENCE_KEY) + end + + def job_key(id) + "#{JOBS_KEY_PREFIX}#{id}" + end + + def add_to_singletons(hexdigest, args) + redis.hset SINGLETON_JOBS_HASH_KEY, hexdigest, args + end + + def remove_from_singletons(hexdigest) + redis.hdel SINGLETON_JOBS_HASH_KEY, hexdigest + end + + def generate_secure_key + Digest::SHA2.hexdigest(rand.to_s) + end + end + end +end diff --git a/lib/treasury/bg_executor/daemon.rb b/lib/treasury/bg_executor/daemon.rb new file mode 100644 index 0000000..8fa1e55 --- /dev/null +++ b/lib/treasury/bg_executor/daemon.rb @@ -0,0 +1,268 @@ +# coding: utf-8 + +module Treasury + module BgExecutor + class Daemon + def initialize + ActiveRecord::Base.clear_all_connections! + + reconnect! + + enable_gc_optimizations + end + + def execute_job + job = client.pop + return unless job + + wait_till_fork_allowed! do + log ">>> Executing job :id => #{job[:id]}, :name => #{job[:job_name]}, :args => #{job[:args].inspect}" + Daemons.run_proc('bg_executor_job.rb', self.daemon_options) do + begin + self.reconnect! + BgExecutor::Executor.new.execute_job(job) + rescue Exception => e + begin + error_log "*** Failed job #{job.inspect}", e + client.fail_job!(job[:id].to_i, e) if job + rescue Exception => e2 + client.fail_job!(job[:id].to_i, e) if job + error_log "*** Failed to mark job as fail", e2 + end + + process_critical_job(job, e) + ensure + shoutdown_job! + end + end + end + + rescue Timeout::Error => e + client.fail_job! job[:id].to_i, BgExecutor::QueueError.new('BgExecutor queue is full. Timeout error.') + log "Timeout::Error cannot push job(#{job[:id]}) into queue" + + process_critical_job(job, e) + rescue => e + begin + error_log "*** Failed job #{job.inspect}", e + client.fail_job!(job[:id].to_i, e) if job + rescue Exception => e2 + client.fail_job!(job[:id].to_i, e) if job + error_log "*** Failed to mark job as fail", e2 + end + + process_critical_job(job, e) + end + + protected + + # если это важный job, и он упал, то поставим его опять в очередь + def process_critical_job(job, e) + return false unless job.present? + + if job[:args] && job[:args][:_critical] + job[:args][:_tries] ||= 0 + + if job[:args][:_tries] >= Treasury.configuration.bge_max_tries_on_fail + log "*** Job #{job.inspect} max tries exceeded" + + recipient = job[:args][:notify_email].presence || Conf.general['support_email'] + subject = "BgExecutor: critical job #{job[:job_name]} failed" + message = "Job #{job.inspect} max tries exceeded" + Treasury::BgExecutorMailer.notify(recipient, subject, message, e).deliver + else + job[:args][:_tries] += 1 + log "> Queue job :name => #{job[:job_name]}, :args => #{job[:args].inspect}" + client.queue_job! job[:job_name], job[:args] + end + end + rescue Exception => e2 + error_log "*** Failed to restart job", e2 + end + + def reconnect! + reopen_logs + + logger = Logger.new(rails_logger_filename) + [Rails, ActiveRecord::Base, ActionController::Base, ActionMailer::Base].each do |logged_class| + logged_class.logger = logger + end + + ActiveRecord::Base.connection_handler.connection_pools.each_value do |pool| + pool.connections.each(&:reconnect!) + end + + ActiveRecord::Base.verify_active_connections! + rescue Exception => e + log "Could not reconnect!" + log e.message + log e.backtrace.join("\n") + end + + def shoutdown_job! + $running = false + ensure + exit() + end + + def get_concurrency + Treasury.configuration.bge_concurrency + end + + def get_queue_timeout + Treasury.configuration.bge_queue_timeout + end + + def daemon_options(command = :start) + {:multiple => true, + :ontop => false, + :backtrace => true, + :dir_mode => :normal, + :dir => pid_files_dir, + :log_dir => pid_files_dir, + :log_output => true, + :monitor => false, + :keep_pid_files => true, + :ARGV => [command.to_s] + } + end + + def client + @client ||= BgExecutor::Client.instance + end + + def wait_till_fork_allowed! + if allowed_to_fork? + yield + return + else + log "Queue is full. Waiting..." + clean_pids! + end + + Timeout.timeout(get_queue_timeout) do + loop do + if allowed_to_fork? + yield + break + end + sleep 5 + end + end + end + + def allowed_to_fork? + executors_count < get_concurrency + end + + def executors_count + Daemons::PidFile.find_files(pid_files_dir, 'bg_executor_job', false).size + rescue Exception => e + log "Error in executors_count" + log e.message + 0 + end + + def pid_files_dir + @pid_files_dir ||= "#{Rails.root}/log" + end + + def enable_gc_optimizations + GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=) + end + + # Log message to stdout + def log(message) + puts "%s: %s" % [Time.now.to_s, message] + end + + def error_log(message, exception) + log "#{message} \n\nError: #{exception.message}\n\nBacktrace: #{exception.backtrace.join("\n")}" + end + + def clean_pids! + Daemons::PidFile.find_files("#{Rails.root}/log", 'bg_executor_job', true) + end + + private + + # Internal: Рельсовые логи перенаправим в отдельный лог + # + # Returns String + def rails_logger_filename + @rails_logger_filename ||= Rails.env.development? ? Rails.root.join('log', "bg_executor_#{Rails.env}.log").to_s : '/dev/null' + end + + # Переоткрытие всех логов + # + # @see Unicorn::Utils + def reopen_logs + to_reopen = [] + nr = 0 + ObjectSpace.each_object(File) { |fp| is_log?(fp) and to_reopen << fp } + + to_reopen.each do |fp| + orig_st = begin + fp.stat + rescue IOError, Errno::EBADF # race + next + end + + begin + b = File.stat(fp.path) + next if orig_st.ino == b.ino && orig_st.dev == b.dev + rescue Errno::ENOENT + end + + begin + # stdin, stdout, stderr are special. The following dance should + # guarantee there is no window where `fp' is unwritable in MRI + # (or any correct Ruby implementation). + # + # Fwiw, GVL has zero bearing here. This is tricky because of + # the unavoidable existence of stdio FILE * pointers for + # std{in,out,err} in all programs which may use the standard C library + if fp.fileno <= 2 + # We do not want to hit fclose(3)->dup(2) window for std{in,out,err} + # MRI will use freopen(3) here internally on std{in,out,err} + fp.reopen(fp.path, "a") + else + # We should not need this workaround, Ruby can be fixed: + # http://bugs.ruby-lang.org/issues/9036 + # MRI will not call call fclose(3) or freopen(3) here + # since there's no associated std{in,out,err} FILE * pointer + # This should atomically use dup3(2) (or dup2(2)) syscall + File.open(fp.path, "a") { |tmpfp| fp.reopen(tmpfp) } + end + + fp.sync = true + fp.flush # IO#sync=true may not implicitly flush + new_st = fp.stat + + # this should only happen in the master: + if orig_st.uid != new_st.uid || orig_st.gid != new_st.gid + fp.chown(orig_st.uid, orig_st.gid) + end + + nr += 1 + rescue IOError, Errno::EBADF + # not much we can do... + end + end + nr + end + + # @see Unicorn::Utils + def is_log?(fp) + append_flags = File::WRONLY | File::APPEND + + !fp.closed? && + fp.stat.file? && + fp.sync && + (fp.fcntl(Fcntl::F_GETFL) & append_flags) == append_flags + rescue IOError, Errno::EBADF + false + end + end + end +end diff --git a/lib/treasury/bg_executor/errors.rb b/lib/treasury/bg_executor/errors.rb new file mode 100644 index 0000000..2b2042e --- /dev/null +++ b/lib/treasury/bg_executor/errors.rb @@ -0,0 +1,13 @@ +# coding: utf-8 + +module Treasury + module BgExecutor + class Error < StandardError; end + + class ConnectionError < Error; end + class QueueError < Error; end + class JobExecutionError < Error; end + class JobNotFound < Error; end + class JobAccessError < Error; end + end +end diff --git a/lib/treasury/bg_executor/executor.rb b/lib/treasury/bg_executor/executor.rb new file mode 100644 index 0000000..b46e481 --- /dev/null +++ b/lib/treasury/bg_executor/executor.rb @@ -0,0 +1,34 @@ +# coding: utf-8 + +module Treasury + module BgExecutor + class Executor + def client + @client ||= BgExecutor::Client.instance + end + + def execute_job(job_hash) + id, name, args = job_hash[:id].to_i, job_hash[:job_name], job_hash[:args] + job = Treasury::BgExecutor::Job.create(id, name, args) + + $0 = "Job ##{job_hash[:id]}: #{job.title || name}" + + log ">>> Executing job :id => #{id}, :name => #{name}, :args => #{args.inspect}" + + client.start_job! id + + job.execute + + client.finish_job! id + + log "*** Finished job ##{id}" + $0 = "Job ##{job_hash[:id]}: #{job.title || name}*" + end + + # Log message to stdout + def log(message) + puts "%s: %s" % [Time.now.to_s, message] + end + end # end class + end # end module +end diff --git a/lib/treasury/bg_executor/job.rb b/lib/treasury/bg_executor/job.rb new file mode 100644 index 0000000..ef24cbb --- /dev/null +++ b/lib/treasury/bg_executor/job.rb @@ -0,0 +1,166 @@ +# coding: utf-8 +require 'digest/sha2' + +module Treasury + module BgExecutor + class Job + class_attribute :singleton_job, :critical_job, :no_trace_job, :no_cancel_job, :default_args + + attr_reader :id, :result, :info, :params + + class << self + def create(id, job_name, params) + "#{job_name}_job".classify.constantize.new(id, params) + end + + # указать, что только один джоб этого класса может выполняться в одну единицу времени + # можно также задать указать параметры джоба, и тогда только один джоб с такой комбинацией параметров может выполняться в одну единицу времени + def acts_as_singleton(scope = []) + self.singleton_job = Array(scope) + end + + def acts_as_singleton? + !singleton_job.nil? + end + + def singleton_scope + singleton_job + end + + def singleton_hexdigest(args) + result = nil + if acts_as_singleton? + singleton_args = args.select { |k, _| singleton_scope.include?(k) } + result = Digest::SHA2.hexdigest(name + singleton_args.sort.to_hash.to_s) + end + + result + end + + def add_default_args(args) + self.default_args ||= {} + self.default_args.merge!(args) + end + + # указать, что джоб важный, и в случае его падения, ставить его опять в очередь. + # Количество попыток ограничено в конфиге опцией max_tries_on_fail, по умолчанию Daemon:DEFAULT_MAX_TRIES_ON_FAIL + def acts_as_critical(job_args = {}) + self.critical_job = true + add_default_args(job_args) if job_args.present? + end + + def acts_as_critical? + !!critical_job + end + + # указать, что не нужно вести трассировку сессии для NewRelic + def acts_as_no_trace + self.no_trace_job = true + end + + def acts_as_no_trace? + !!no_trace_job + end + + # указать, что не нужно вести трассировку сессии для NewRelic + def acts_as_no_cancel + self.no_cancel_job = true + end + + def acts_as_no_cancel? + !!no_cancel_job + end + end + + def initialize(id, params) + @id = id + raise "No such job in queue" unless params[:allow_new] || client.job_exists?(@id) + + @info = {} + @result = nil + @error = nil + + @params = params + end + + def result=(value) + @result = value + client.update_job!(id, result: @result) + end + + def info=(value) + @info = value + client.update_job!(id, info: @info) + end + + def execute + # override in descendants + end + + def title + @params[:_job_title] || @params[:job_name] + end + + private + + def client + @client ||= BgExecutor::Client.instance + end + end + + # класс для джобов, которые можно проецировать в прогресс-бар + class Job::Indicated < Job + attr_reader :completed + + def initialize(id, params) + super id, params + self.info = {:completed => 0.0} + @total = 1 + @completed = 0 + end + + # указать, сколько всего итемов в джобе + def total=(total_items) + raise ArgumentError unless total_items.is_a?(Integer) + @total = total_items + end + + # указать, сколько итемов в джобе завершено + def completed=(completed_items) + raise ArgumentError unless completed_items.is_a?(Integer) + @completed = completed_items + update_percentage! + end + + def message=(value) + self.info = info.merge(message: value) + end + + def redirect_url=(value) + self.info = info.merge(redirect_url: value) + end + + def increment_completed!(count = nil) + self.completed = @completed + (count || 1) + end + + def update_percentage! + self.info = info.merge(completed: ((@completed.to_f / [@total, 1].max.to_f) * 100).round) + end + end + + class CallMethodJob < Job + acts_as_singleton [:object, :method] + + def execute + object = Marshal.load(Base64.decode64(params[:object])) + + if params[:method_args].present? + object.send params[:method], *params[:method_args] + else + object.send params[:method] + end + end + end + end +end diff --git a/lib/treasury/bg_executor/redis.rb b/lib/treasury/bg_executor/redis.rb new file mode 100644 index 0000000..8730f32 --- /dev/null +++ b/lib/treasury/bg_executor/redis.rb @@ -0,0 +1,192 @@ +# coding: utf-8 + +module Treasury + module BgExecutor + # Redis backend + class Redis + def initialize + redis + rescue + raise BgExecutor::ConnectionError, $ERROR_INFO.message + end + + # increments the value for +key+ by 1 and returns the new value + def increment(key) + redis.incr _key(key) + end + + # decrements the value for +key+ by 1 and returns the new value + def decrement(key) + redis.decr _key(key) + end + + def zero(key) + redis.set _key(key), 0 + end + + # tests whether +key+ exists or not + def exists?(key) + redis.exists _key(key) + end + + # retrieve data from redis by +key+ + def get(key) + u redis.get(_key(key)) + end + alias [] get + + # set +value+ for +key+ and optionally expiration time + def set(key, value, expire = 0) + if expire > 0 + redis.setex(_key(key), expire, s(value)) + else + redis.set _key(key), s(value) + end + value + end + + # set +value+ for +key+ only when it's missing in cache + def set_if_not_exists(key, value, expire) + set key, value, expire unless exists? key + value + end + + # shortcut for set without expiration + def []=(key, value) + return delete(key) if value.nil? + + set(key, value) + end + + # remove +key+ from cache + def delete(key) + redis.del _key(key) + end + alias unset delete + + # set expiration time for +key+ + def expire(key, expire) + redis.expire _key(key), expire + end + + # return first element of list and remove it + def shift(key) + u(redis.lpop(_key(key))) + end + + # push new element at the beginning of the list + def unshift(key, value) + redis.lpush _key(key), s(value) + end + + # push new element at the end of the list + def push(key, value) + redis.rpush _key(key), s(value) + end + + # return last element of list and remove it + def pop(key) + u(redis.rpop(_key(key))) + end + + # return LIST as ARRAY + def list(key) + return nil unless list?(key) + + result = [] + (redis.llen _key(key)).times do |idx| + result << list_item(key, idx) + end + + result + end + + def list_item(key, idx) + u(redis.lindex(_key(key)), idx) + end + + # return list length of list + def list_length(key) + redis.llen(_key(key)) + end + + # is given +key+ is list + def list?(key) + redis.type(_key(key)) == "list" + end + + # is given +key+ is string + def string?(key) + redis.type(_key(key)) == "string" + end + + def sadd(key, value) + redis.sadd _key(key), s(value) + end + + def sismember(key, value) + redis.sismember _key(key), s(value) + end + + def srem(key, value) + redis.srem _key(key), s(value) + end + + def hset(key, field, value) + redis.hset _key(key), field, s(value) + end + + def hdel(key, field) + redis.hdel _key(key), field + end + + def hexists(key, field) + redis.hexists _key(key), field + end + + def hget(key, field) + u(redis.hget(_key(key), field)) + end + + # execute +block+ with pessimistic locking + def synchronize(mutex_id, &block) + mutex_key = "mutex:#{mutex_id}" + + timeout(60) do + loop do + break unless exists?(mutex_key) + sleep 0.05 + end + end if exists?(mutex_key) + + set(mutex_key, 1, 6) + result = yield + delete(mutex_key) + + result + end + + def redis + Treasury.configuration.redis + end + + private + + def serialize(data) + Marshal.dump data + end + alias s serialize + + def unserialize(data) + Marshal.load data + rescue + nil + end + alias u unserialize + + def _key(key) + "#{Treasury.configuration.bge_namespace}:#{key}" + end + end + end +end diff --git a/lib/treasury/configuration.rb b/lib/treasury/configuration.rb new file mode 100644 index 0000000..d671a73 --- /dev/null +++ b/lib/treasury/configuration.rb @@ -0,0 +1,16 @@ +module Treasury + class Configuration + attr_accessor :redis, + :bge_concurrency, + :bge_max_tries_on_fail, + :bge_namespace, + :bge_queue_timeout, + :bge_daemon_options + + def initialize + self.bge_concurrency = 4 + self.bge_queue_timeout = 300 + self.bge_max_tries_on_fail = 5 + end + end +end diff --git a/spec/internal/config/environments/test.rb b/spec/internal/config/environments/test.rb new file mode 100644 index 0000000..53ecf05 --- /dev/null +++ b/spec/internal/config/environments/test.rb @@ -0,0 +1,2 @@ +DO_NOT_REPLY = 'support@example.org'.freeze +DO_NOT_REPLY_RETURN_PATH = 'support@example.org'.freeze diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b88902b..3805ba4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,6 +6,6 @@ end require 'combustion' -Combustion.initialize! +Combustion.initialize! :action_mailer Treasury::SpecHelpers.stub_core_denormalization diff --git a/treasury.gemspec b/treasury.gemspec index 0baba20..3eff242 100644 --- a/treasury.gemspec +++ b/treasury.gemspec @@ -18,6 +18,7 @@ Gem::Specification.new do |spec| spec.require_paths = %w(lib) spec.add_runtime_dependency 'rails', '>= 3.1.12', '< 4.1' + spec.add_runtime_dependency 'daemons', '>= 1.1.9' spec.add_development_dependency 'rspec', '>= 3.1' spec.add_development_dependency 'simplecov'