-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.rb
143 lines (117 loc) · 3.72 KB
/
socket.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
require "eventmachine"
require "em-websocket"
require "colorize"
require "jsonify"
require "em-jack"
require "./lib/cache.rb"
class String
def from_json
JSON.parse(self)
rescue JSON::ParseError
return nil
end
end
#
# @message String Message to be printed to console
#
def debug(message)
puts "%s %s" % ["==>".black, message.to_s.green]
end
port = ARGV[0] || 3333
EM.run do
jack = EMJack::Connection.new(tube: "linjekoll.socket-server")
channel = EM::Channel.new
redis_cache = Cache.new
EventMachine::WebSocket.start(host: "0.0.0.0", port: port) do |ws|
# @event String Event that should be triggered on the client side.
# @data Object Data that should be pushed to the given client
# @data could be anything that has implemented #to_json
def ws.trigger(event, data)
self.send({
data: data,
event: event
}.to_json.force_encoding("BINARY"))
end
ws.onopen do
debug "WebSocket connection open."
sid = nil
list = []
ws.onmessage do |ingoing|
ingoing = ingoing.from_json
# This must be an array, otherwise we abort.
unless ingoing.is_a?(Hash)
ws.trigger("error", {
message: "Invalid data, should be an array.",
ingoing: ingoing
})
debug("Invalid 1: #{ingoing}"); next
end
# If this isn't the correct event, abort!
unless ingoing["event"] == "subscribe.trip.update"
ws.trigger("error", {
message: "Invalid event.",
ingoing: ingoing
})
debug("Invalid 2: #{ingoing.inspect}"); next
end
# Client could send invalid data, if so; abort!
if not notification = ingoing["data"] or not notification.is_a?(Array)
ws.trigger("error", {
message: "Received data was invalid.",
ingoing: ingoing
})
debug("Invalid 3: #{ingoing.inspect}"); next
end
unless notification.first.is_a?(Hash)
ws.trigger("error", {
message: "Invalid data",
ingoing: ingoing
})
debug("Invalid 4: #{ingoing.inspect}"); next
end
# Let's print the given data
debug("Data push from client: #{notification.inspect}")
# @listen Should now only contain new lines
listen = notification.uniq - list
# Nothing to listen for?
next if listen.empty?
list.push(*listen)
# Do we have any cached data to respond with?
listen.each do |what|
cache = redis_cache.read(what)
if cache
ws.trigger("update.trip", cache)
end
end
sid = channel.subscribe do |data|
listen.each do |message|
# Do we have any data to push to user?
#if ["provider_id", "line_id"].all?{|w| message[w].to_s == data[w].to_s}
ws.trigger("update.trip", data)
debug("Pushing :" + data.inspect)
#end
end
end
end
ws.onclose do
channel.unsubscribe(sid)
end
end
end
jack.each_job do |job|
debug "Ingoing job with id #{job.jobid} and size #{job.body.size}."
begin
parsed = JSON.parse(job.body)
rescue JSON::ParserError
debug $!.message
ensure
jack.delete(job)
end
next if parsed.nil?
# Everything should be saved to cache
redis_cache.save!(parsed)
# Push data to client
channel.push(parsed)
end
debug "Server started on port #{port}."
end