forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
0003.yml
157 lines (152 loc) · 5.4 KB
/
0003.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
version: 3
description: define all taskcluster-lib-azqueue tables and functions
migrationScript: |-
begin
create table azure_queue_messages (
message_id uuid not null primary key,
queue_name text not null,
message_text text not null,
inserted timestamp not null,
visible timestamp not null, -- visible after this time
expires timestamp not null, -- expired after this time
pop_receipt uuid -- null means not popped
);
-- 'get' operations sort by inserted within a queue
create index azure_queue_messages_inserted on azure_queue_messages(queue_name, inserted);
grant select, insert, update, delete on azure_queue_messages to $db_user_prefix$_queue;
end
downgradeScript: |-
begin
revoke select, insert, update, delete on azure_queue_messages from $db_user_prefix$_queue;
drop table azure_queue_messages;
end
methods:
azure_queue_count:
description: |
Count messages in the named queue.
mode: read
serviceName: queue
args: queue_name text
returns: integer
# note that this will block on locked rows. Such row locks are very quick,
# only lasting long enough to update the row with a pop_receipt and a new
# value of `visible`.
body: |-
begin
return (select count(*)
from azure_queue_messages msgs
where msgs.queue_name = azure_queue_count.queue_name);
end
azure_queue_put:
description: |
Put the given message into the given queue. The message will not be visible until
after the visible timestamp, and will disappear after the expires timestamp.
mode: write
serviceName: queue
args: queue_name text, message_text text, visible timestamp, expires timestamp
returns: void
body: |-
begin
insert into azure_queue_messages (
queue_name,
message_id,
message_text,
inserted,
visible,
expires
) values (
azure_queue_put.queue_name,
public.gen_random_uuid(),
azure_queue_put.message_text,
now(),
azure_queue_put.visible,
azure_queue_put.expires
);
execute 'notify ' || quote_ident(queue_name);
end
azure_queue_get:
description: |
Get up to `count` messages from the given queue, setting the `visible`
column of each to the given value. Returns a `message_id` and
`pop_receipt` for each one, for use with `azure_queue_delete` and
`azure_queue_update`.
mode: write
serviceName: queue
args: queue_name text, visible timestamp, count integer
returns: table (message_id uuid, message_text text, pop_receipt uuid)
# This uses the queueing logic described many places around the web but
# best in https://tnishimura.github.io/articles/queues-in-postgresql/
#
# The magic sauce is
#
# for update -- locks returned rows
# skip locked -- skips locked rows
#
# The first ensures that no two concurrent transactions return the
# same message, while the second ensures that concurrent transactions
# do not serialize themselves but instead immediately return distinct
# messages, if they are available.
body: |-
begin
return query update azure_queue_messages m1
set
pop_receipt = public.gen_random_uuid(),
visible = azure_queue_get.visible
where
m1.message_id in (
select m2.message_id from azure_queue_messages m2
where m2.queue_name = azure_queue_get.queue_name
and m2.visible <= now()
and m2.expires > now()
order by m2.inserted
for update skip locked
limit count
)
returning m1.message_id, m1.message_text, m1.pop_receipt;
end
azure_queue_delete:
description: |
Delete the message identified by the given `queue_name`, `message_id` and
`pop_receipt`.
mode: write
serviceName: queue
args: queue_name text, message_id uuid, pop_receipt uuid
returns: void
body: |-
begin
delete from azure_queue_messages msgs
where msgs.queue_name = azure_queue_delete.queue_name
and msgs.message_id = azure_queue_delete.message_id
and msgs.pop_receipt = azure_queue_delete.pop_receipt;
end
azure_queue_update:
description: |
Update the message identified by the given `queue_name`, `message_id` and
`pop_receipt`, setting its `visible` and `message_text` properties as
given.
mode: write
serviceName: queue
args: queue_name text, message_text text, message_id uuid, pop_receipt uuid, visible timestamp
returns: void
body: |-
begin
update azure_queue_messages msgs
set message_text = azure_queue_update.message_text,
visible = azure_queue_update.visible
where msgs.queue_name = azure_queue_update.queue_name
and msgs.message_id = azure_queue_update.message_id
and msgs.pop_receipt = azure_queue_update.pop_receipt;
end
azure_queue_delete_expired:
description: |
Delete all expired messages. This is a maintenance task that should occur
about once an hour.
mode: write
serviceName: queue
args: ''
returns: void
body: |-
begin
delete from azure_queue_messages msgs
where msgs.expires <= now();
end