Skip to content

Commit

Permalink
Merge branch 'feature/0.5.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Aug 11, 2015
2 parents 3dade2e + d7be1f5 commit efddb28
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 119 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
Version 0.5.0 (2015-08-11)
--------------------------
Made tracker.flush synchronous by default (#80)
Added SelfDescribingJson class (#82)
Prevented the buffer from being flushed when it is empty (#62)
Bumped contexts schema to 1-0-1 (#53)
Updated Contracts dependency range to "~> 0.7", "<= 0.11" (#81)
Moved Contracts mixin inside classes (#73)
Made synchronous flush wait until buffer is empty (#79)
Made buffer size 1-indexed rather than 0-indexed (#67)
Started handling all network-related exceptions (#76)
Started treating all 2xx and 3xx status codes as successful (#75)
Made number of worker threads used by AsyncEmitter configurable (#77)
Made Emitter and AsyncEmitter thread-safe (#74) Fixed callback logic to only fire one callback per buffer flush (#61)
Added set_fingerprint method, thanks @kazjote! (#65)
Updated Travis image to show status of master branch (#78)
Added bundler installation to up.guidance (#83)

Version 0.4.2 (2015-04-08)
--------------------------
Relaxed Contracts dependency (#68)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Ruby Analytics for Snowplow
[![Gem Version](https://badge.fury.io/rb/snowplow-tracker.svg)](http://badge.fury.io/rb/snowplow-tracker)
[![Build Status](https://travis-ci.org/snowplow/snowplow-ruby-tracker.png)](https://travis-ci.org/snowplow/snowplow-ruby-tracker)
[![Build Status](https://travis-ci.org/snowplow/snowplow-ruby-tracker.png?branch=master)](https://travis-ci.org/snowplow/snowplow-ruby-tracker)
[![Code Climate](https://codeclimate.com/github/snowplow/snowplow-ruby-tracker.png)](https://codeclimate.com/github/snowplow/snowplow-ruby-tracker)
[![Coverage Status](https://coveralls.io/repos/snowplow/snowplow-ruby-tracker/badge.png)](https://coveralls.io/r/snowplow/snowplow-ruby-tracker)
[![License][license-image]][license]
Expand Down
1 change: 1 addition & 0 deletions lib/snowplow-tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

require 'snowplow-tracker/contracts.rb'
require 'snowplow-tracker/version.rb'
require 'snowplow-tracker/self_describing_json.rb'
require 'snowplow-tracker/payload.rb'
require 'snowplow-tracker/subject.rb'
require 'snowplow-tracker/emitters.rb'
Expand Down
1 change: 0 additions & 1 deletion lib/snowplow-tracker/contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# License:: Apache License Version 2.0

require 'contracts'
include Contracts

module SnowplowTracker

Expand Down
178 changes: 114 additions & 64 deletions lib/snowplow-tracker/emitters.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
require 'set'
require 'logger'
require 'contracts'
include Contracts

module SnowplowTracker

Expand All @@ -26,13 +25,16 @@ module SnowplowTracker

class Emitter

include Contracts

@@ConfigHash = ({
:protocol => Maybe[Or['http', 'https']],
:port => Maybe[Num],
:method => Maybe[Or['get', 'post']],
:buffer_size => Maybe[Num],
:on_success => Maybe[Func[Num => Any]],
:on_failure => Maybe[Func[Num, Hash => Any]]
:on_failure => Maybe[Func[Num, Hash => Any]],
:thread_count => Maybe[Num]
})

@@StrictConfigHash = And[@@ConfigHash, lambda { |x|
Expand All @@ -47,19 +49,19 @@ class Emitter
Contract String, @@StrictConfigHash => lambda { |x| x.is_a? Emitter }
def initialize(endpoint, config={})
config = @@DefaultConfig.merge(config)
@lock = Monitor.new
@collector_uri = as_collector_uri(endpoint, config[:protocol], config[:port], config[:method])
@buffer = []
if not config[:buffer_size].nil?
@buffer_size = config[:buffer_size]
elsif config[:method] == 'get'
@buffer_size = 0
@buffer_size = 1
else
@buffer_size = 10
end
@method = config[:method]
@on_success = config[:on_success]
@on_failure = config[:on_failure]
@threads = []
LOGGER.info("#{self.class} initialized with endpoint #{@collector_uri}")

self
Expand All @@ -80,9 +82,11 @@ def as_collector_uri(endpoint, protocol, port, method)
Contract Hash => nil
def input(payload)
payload.each { |k,v| payload[k] = v.to_s}
@buffer.push(payload)
if @buffer.size > @buffer_size
flush
@lock.synchronize do
@buffer.push(payload)
if @buffer.size >= @buffer_size
flush
end
end

nil
Expand All @@ -91,58 +95,70 @@ def input(payload)
# Flush the buffer
#
Contract Bool => nil
def flush(sync=false)
send_requests

def flush(async=true)
@lock.synchronize do
send_requests(@buffer)
@buffer = []
end
nil
end

# Send all events in the buffer to the collector
#
Contract None => nil
def send_requests
LOGGER.info("Attempting to send #{@buffer.size} request#{@buffer.size == 1 ? '' : 's'}")
temp_buffer = @buffer
@buffer = []
Contract ArrayOf[Hash] => nil
def send_requests(evts)
if evts.size < 1
LOGGER.info("Skipping sending events since buffer is empty")
return
end
LOGGER.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}")

if @method == 'post'
post_succeeded = false
begin
request = http_post(SelfDescribingJson.new(
'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-2',
evts
).to_json)
post_succeeded = is_good_status_code(request.code)
rescue StandardError => se
LOGGER.warn(se)
end
if post_succeeded
unless @on_success.nil?
@on_success.call(evts.size)
end
else
unless @on_failure.nil?
@on_failure.call(0, evts)
end
end

if @method == 'get'
elsif @method == 'get'
success_count = 0
unsent_requests = []
temp_buffer.each do |payload|
request = http_get(payload)
if request.code.to_i == 200
success_count += 1
else
unsent_requests.push(payload)
evts.each do |evt|
get_succeeded = false
begin
request = http_get(evt)
get_succeeded = is_good_status_code(request.code)
rescue StandardError => se
LOGGER.warn(se)
end
if unsent_requests.size == 0
unless @on_success.nil?
@on_success.call(success_count)
end
if get_succeeded
success_count += 1
else
unless @on_failure.nil?
@on_failure.call(success_count, unsent_requests)
end
unsent_requests << evt
end
end

elsif @method == 'post'
if temp_buffer.size > 0
request = http_post({
'schema' => 'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-2',
'data' => temp_buffer
})

if request.code.to_i == 200
unless @on_success.nil?
@on_success.call(temp_buffer.size)
end
else
unless @on_failure.nil?
@on_failure.call(0, temp_buffer)
end
if unsent_requests.size == 0
unless @on_success.nil?
@on_success.call(success_count)
end
else
unless @on_failure.nil?
@on_failure.call(success_count, unsent_requests)
end

end
end

Expand All @@ -162,7 +178,7 @@ def http_get(payload)
http.use_ssl = true
end
response = http.request(request)
LOGGER.add(response.code == '200' ? Logger::INFO : Logger::WARN) {
LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
"GET request to #{@collector_uri} finished with status code #{response.code}"
}

Expand All @@ -184,13 +200,20 @@ def http_post(payload)
request.body = payload.to_json
request.set_content_type('application/json; charset=utf-8')
response = http.request(request)
LOGGER.add(response.code == '200' ? Logger::INFO : Logger::WARN) {
LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
"POST request to #{@collector_uri} finished with status code #{response.code}"
}

response
end

# Only 2xx and 3xx status codes are considered successes
#
Contract String => Bool
def is_good_status_code(status_code)
status_code.to_i >= 200 && status_code.to_i < 400
end

private :as_collector_uri,
:http_get,
:http_post
Expand All @@ -200,27 +223,54 @@ def http_post(payload)

class AsyncEmitter < Emitter

# Flush the buffer in a new thread
# If sync is true, block until all flushing threads have exited
#
def flush(sync=false)
t = Thread.new do
send_requests
end
t.abort_on_exception = true
@threads.select!{ |thread| thread.alive?}
@threads.push(t)

if sync
LOGGER.info('Starting synchronous flush')
@threads.each do |thread|
thread.join(10)
Contract String, @@StrictConfigHash => lambda { |x| x.is_a? Emitter }
def initialize(endpoint, config={})
@queue = Queue.new()
# @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
@queue.extend(MonitorMixin)
@all_processed_condition = @queue.new_cond
@results_unprocessed = 0
(config[:thread_count] || 1).times do
t = Thread.new do
consume
end
end
super(endpoint, config)
end

nil
def consume
loop do
work_unit = @queue.pop
send_requests(work_unit)
@queue.synchronize do
@results_unprocessed -= 1
@all_processed_condition.broadcast
end
end
end

# Flush the buffer
# If async is false, block until the queue is empty
#
def flush(async=true)
loop do
@lock.synchronize do
@queue.synchronize do
@results_unprocessed += 1
end
@queue << @buffer
@buffer = []
end
if not async
LOGGER.info('Starting synchronous flush')
@queue.synchronize do
@all_processed_condition.wait_while { @results_unprocessed > 0 }
LOGGER.info('Finished synchronous flush')
end
end
break if @buffer.size < 1
end
end
end

end
3 changes: 2 additions & 1 deletion lib/snowplow-tracker/payload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
require 'json'
require 'net/http'
require 'contracts'
include Contracts

module SnowplowTracker

class Payload

include Contracts

attr_reader :context

Contract nil => Payload
Expand Down
34 changes: 34 additions & 0 deletions lib/snowplow-tracker/self_describing_json.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2013-2014 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0,
# and you may not use this file except in compliance with the Apache License Version 2.0.
# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the Apache License Version 2.0 is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.

# Author:: Alex Dean, Fred Blundun (mailto:[email protected])
# Copyright:: Copyright (c) 2013-2014 Snowplow Analytics Ltd
# License:: Apache License Version 2.0

module SnowplowTracker

class SelfDescribingJson

def initialize(schema, data)
@schema = schema
@data = data
end

def to_json
{
:schema => @schema,
:data => @data
}
end

end

end
Loading

0 comments on commit efddb28

Please sign in to comment.