forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
0072.yml
143 lines (142 loc) · 6.18 KB
/
0072.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
version: 72
description: add last_date_active to queue_workers
migrationScript: |
begin
alter table queue_workers add column last_date_active timestamp with time zone;
end
downgradeScript: |
begin
alter table queue_workers drop column last_date_active;
end
methods:
quarantine_queue_worker:
deprecated: true
get_queue_worker_tqid:
deprecated: true
get_queue_workers_tqid:
deprecated: true
queue_worker_seen:
deprecated: true
queue_worker_seen_with_last_date_active:
description: |-
Recognize that a worker has been seen by the queue, creating it if necessary. This is called
when workers claim or re-claim work. The expiration time is not allowed to move backward.
Will also always bump its last date active time.
This function always writes to the DB, so calls should be suitably rate-limited at the
client side.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, expires_in timestamptz
returns: void
body: |-
begin
insert
into queue_workers (task_queue_id, worker_group, worker_id, quarantine_until, expires, first_claim, recent_tasks, last_date_active)
values (
task_queue_id_in,
worker_group_in,
worker_id_in,
now() - interval '10 years',
expires_in,
now(),
jsonb_build_array(),
now()
)
on conflict (task_queue_id, worker_group, worker_id) do update
set
expires = greatest(coalesce(expires_in, queue_workers.expires), queue_workers.expires),
last_date_active = now()
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in;
end
quarantine_queue_worker_with_last_date_active:
description: |-
Update the quarantine_until date for a worker. The Queue service interprets a date in the past
as "not quarantined". This function also "bumps" the expiration of the worker so that un-quarantined
workers do not immediately expire. Returns the worker row just as get_queue_worker would, or no rows if
no such worker exists.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, quarantine_until_in timestamptz
returns: table(task_queue_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz)
body: |-
begin
return query update queue_workers
set
quarantine_until = quarantine_until_in,
expires = greatest(queue_workers.expires, now() + interval '1 day')
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in
returning
queue_workers.task_queue_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks,
queue_workers.last_date_active;
end
get_queue_worker_tqid_with_last_date_active:
description: |-
Get a non-expired queue worker by task_queue_id, worker_group, and worker_id.
Workers are not considered expired until after their quarantine date expires.
mode: read
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, expires_in timestamptz
returns: table(task_queue_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, etag uuid)
body: |-
begin
return query
select
queue_workers.task_queue_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks,
queue_workers.last_date_active,
public.gen_random_uuid()
from queue_workers
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in and
(queue_workers.expires > expires_in or queue_workers.quarantine_until > expires_in);
end
get_queue_workers_tqid_with_last_date_active:
description: |-
Get non-expired queue workers ordered by task_queue_id, worker_group, and worker_id.
Workers are not considered expired until after their quarantine date expires.
If the pagination arguments are both NULL, all rows are returned.
Otherwise, page_size rows are returned at offset page_offset.
mode: read
serviceName: queue
args: task_queue_id_in text, expires_in timestamptz, page_size_in integer, page_offset_in integer
returns: table(task_queue_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, etag uuid)
body: |-
begin
return query
select
queue_workers.task_queue_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks,
queue_workers.last_date_active,
public.gen_random_uuid()
from queue_workers
where
(queue_workers.task_queue_id = task_queue_id_in or get_queue_workers_tqid_with_last_date_active.task_queue_id_in is null) and
((queue_workers.expires > expires_in and queue_workers.quarantine_until < expires_in) or get_queue_workers_tqid_with_last_date_active.expires_in is null)
order by task_queue_id, worker_group, worker_id
limit get_page_limit(page_size_in)
offset get_page_offset(page_offset_in);
end