forked from capotej/watercoolr
-
Notifications
You must be signed in to change notification settings - Fork 1
/
watercoolr.rb
225 lines (208 loc) · 6.58 KB
/
watercoolr.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
require 'rubygems'
require 'sinatra'
require 'sequel'
require 'zlib'
require 'json'
require 'crack'
require 'httpclient'
begin
require 'system_timer'
MyTimer = SystemTimer
rescue
require 'timeout'
MyTimer = Timeout
end
configure do
DB = Sequel.connect(ENV['DATABASE_URL'] || 'sqlite://watercoolr.db')
unless DB.table_exists? "channels"
DB.create_table :channels do
primary_key :id
varchar :name, :size => 32
# channel types: 'github', 'pingfm', 'superfeedr' etc.
varchar :type, :size => 32, :default => 'seq'
# pubsubhubbub support
varchar :topic, :size => 128
# protected channels
varchar :secret, :size => 32
time :created
time :updated
index [:updated]
index [:topic]
index [:name], :unique => true
end
# system channels
DB[:channels] << { :name => '__pingfm__', :created => Time.now,
:type => 'pingfm', :secret => 'change_me' }
DB[:channels] << { :name => '__github__', :created => Time.now,
:type => 'github', :secret => 'change_me' }
end
unless DB.table_exists? "subscribers"
DB.create_table :subscribers do
primary_key :id
foreign_key :channel_id
varchar :url, :size => 128
varchar :type, :size => 32, :default => 'github'
integer :state, :default => 0 # 0 - verified, 1 - need verification
text :data
index [:channel_id, :url], :unique => true
end
end
unless DB.table_exists? "users"
DB.create_table :users do
primary_key :id
varchar :name, :size => 32
varchar :password, :size => 32
varchar :service, :size => 32
index [:name, :service], :unique => true
end
# Need to have at least admin user
DB[:users] << { :name => 'admin', :password => 'change_me', :service => 'self' }
# All hooks library URLs will be /hook/:name/:secret/
# default secret
DB[:users] << { :name => 'all', :password => 'change_me', :service => 'hooks' }
# secret per hook
# DB[:users] << { :name => 'ff', :password => 'change_me_too', :service => 'hooks' }
end
end
helpers do
def protected!
response['WWW-Authenticate'] = %(Basic realm="HTTP Auth") and \
throw(:halt, [401, "Not authorized\n"]) and return unless authorized?
end
def authorized?
@auth ||= Rack::Auth::Basic::Request.new(request.env)
return false unless @auth.provided? && @auth.basic? && @auth.credentials
user,pass = @auth.credentials
return false unless DB[:users].filter(:name => user, :service => 'self', :password => pass).first
true
end
def gen_id
base = rand(100000000).to_s
salt = Time.now.to_s
Zlib.crc32(base + salt).to_s(36)
end
def marshal(string)
[Marshal.dump(string)].pack('m*').strip!
end
def unmarshal(str)
Marshal.load(str.unpack("m")[0])
end
def atom_time(date)
date.getgm.strftime("%Y-%m-%dT%H:%M:%SZ")
end
def atom_parse(text)
atom = Crack::XML.parse(text)
r = []
if atom["feed"]["entry"].kind_of?(Array)
atom["feed"]["entry"].each { |e|
r << {:id => e["id"], :title => e["title"], :published => e["published"] }
}
else
e = atom["feed"]["entry"]
r = {:id => e["id"], :title => e["title"], :published => e["published"] }
end
r
end
# post a message to a list of subscribers (urls)
def postman(channel, msg)
subs = DB[:subscribers].filter(:channel_id => channel).to_a
return { :status => 'FAIL' } unless (subs and msg)
subs.each do |sub|
begin
raise "No valid URL provided" unless sub[:url]
# remove sensitive data for the 'debug' subscribers
data = (sub[:type] == 'debug') ? {} : unmarshal(sub[:data])
MyTimer.timeout(5) do
# see: http://messagepub.com/documentation/api
if sub[:type] == 'messagepub'
MPubClient.post(sub[:url], msg, data)
else
HTTPClient.post(sub[:url],
:payload => {:message => msg}.to_json,
:data => data.to_json)
end
end
rescue Exception => e
case e
when Timeout::Error
puts "Timeout: #{sub[:url]}"
else
puts e.to_s
end
next
end
end
return {:status => 'OK'}
end
end
# support for specific publisher and subscribers
# comment following lines if not needed
load 'pubs.rb'
load 'subs.rb'
load 'hubbub.rb'
load 'hooks.rb'
get '/' do
erb :index
end
post '/channels' do
protected!
id = gen_id
begin
data = JSON.parse(params[:data])
# superfeedr api key
id = data['id'] if data['id'] && (data['type'] == 'superfeedr')
if data['topic'] && (data['type'] == 'pubsubhubbub')
topic = [data['topic']].pack("m*").strip
else
topic = nil
end
type = data['type'] || 'seq'
secret = data['secret']
rescue Exception => e
puts e.to_s
type = 'seq'
end
unless DB[:channels].filter(:name => id).first
DB[:channels] << { :name => id, :topic => topic, :created => Time.now,
:type => type, :secret => secret }
end
{ :id => id.to_s }.to_json
end
post '/subscribe' do
begin
data = JSON.parse(params[:data])
raise "missing URL in the 'data' parameter" unless url = data['url']
channel_name = data['channel'] || 'boo'
type = data['type'] || 'debug'
['url', 'channel', 'type'].each { |d| data.delete(d) }
rec = DB[:channels].filter(:name => channel_name).first
raise "channel #{channel_name} does not exists" unless rec[:id]
if rec[:secret]
raise "not authorized" unless rec[:secret] == data['secret']
data.delete('secret')
end
unless DB[:subscribers].filter(:channel_id => rec[:id], :url => url).first
raise "DB insert failed" unless DB[:subscribers] << {
:channel_id => rec[:id],
:url => url,
:type => type,
:data => marshal(data) }
end
{:status => 'OK'}.to_json
rescue Exception => e
{:status => e.to_s}.to_json
end
end
# general publisher - data contain both channel name and message
post '/publish' do
begin
data = JSON.parse(params[:payload])
channel_name = data['channel'] || 'boo'
message = data['message']
rec = DB[:channels].filter(:name => channel_name).first
raise "channel #{channel_name} does not exists" unless rec[:id]
postman(rec[:id], message).to_json
rescue Exception => e
{:status => e.to_s}.to_json
end
end