-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathsubscribe.lua
48 lines (40 loc) · 1.36 KB
/
subscribe.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
local utils = require "utils"
local driver = require "luasql.postgres"
local env = assert (driver.postgres())
local State = require("state")
local con = assert(env:connect("postgresql://MATERIALIZE_USERNAME:MATERIALIZE_PASSWORD@MATERIALIZE_HOST:6875/materialize?sslmode=require"))
con:setautocommit(false)
assert (con:execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);"))
while(true) do
local buffer = {};
local state = State:new(false)
for mz_timestamp, mz_progressed, mz_diff, sum in utils.rows(con,"FETCH ALL c") do
-- Map row fields
local ts = mz_timestamp
local progress = mz_progressed
local diff = mz_diff
-- When a progress is detected, get the last values
if progress ~= 'f' then
if updated then
updated = false;
-- Update state
state:update(buffer, ts);
buffer = {};
-- Print state
print("Sum: ", table.concat(state:getState(), ','));
end
else
-- Update the state with the last data
updated = true
local update = {
value = sum,
diff = tonumber(diff)
}
table.insert(buffer, update);
end
end
end
con:commit()
con:close()
con:close()
env:close()