forked from stuartcarnie/qless-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresource.lua
More file actions
241 lines (199 loc) · 6.38 KB
/
resource.lua
File metadata and controls
241 lines (199 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
-------------------------------------------------------------------------------
-- Resource Class
--
-- Returns an object that represents a resource with the provided RID
-------------------------------------------------------------------------------
----
-- This gets all the data associated with the resource with the provided id. If the
-- job is not found, it returns nil. If found, it returns an object with the
-- appropriate properties
function QlessResource:data(...)
local res = redis.call(
'hmget', QlessResource.ns .. self.rid, 'rid', 'max')
-- Return nil if we haven't found it
if not res[1] then
return nil
end
local data = {
rid = res[1],
max = tonumber(res[2] or 0),
pending = self:pending(),
locks = self:locks(),
}
return data
end
function QlessResource:get(...)
local res = redis.call(
'hmget', QlessResource.ns .. self.rid, 'rid', 'max')
-- Return nil if we haven't found it
if not res[1] then
return nil
end
return tonumber(res[2] or 0)
end
function QlessResource:set(now, max)
local max = assert(tonumber(max), 'Set(): Arg "max" not a number: ' .. tostring(max))
local current_max = self:get()
if current_max == nil then
current_max = max
end
local keyLocks = self:prefix('locks')
local current_locks = redis.pcall('scard', keyLocks)
-- get the max of the current limit and the current locks
-- this is just in case the limit was decreased immediately before and the locks have not come down to the limit yet.
local confirm_limit = math.max(current_max,current_locks)
local max_change = max - confirm_limit
local keyPending = self:prefix('pending')
redis.call('hmset', QlessResource.ns .. self.rid, 'rid', self.rid, 'max', max);
if max_change > 0 then
local jids = redis.call('zrevrange', keyPending, 0, max_change - 1, 'withscores')
local jid_count = #jids
if jid_count == 0 then
return self.rid
end
for i = 1, jid_count, 2 do
local newJid = jids[i]
local score = jids[i + 1]
-- we know there is capacity to get this released resource, need to check all resources in case waiting on multiple
if Qless.job(newJid):acquire_resources(now) then
local data = Qless.job(newJid):data()
local queue = Qless.queue(data['queue'])
queue.work.add(score, 0, newJid)
end
end
end
return self.rid
end
function QlessResource:unset()
return redis.call('del', QlessResource.ns .. self.rid);
end
function QlessResource:prefix(group)
if group then
return QlessResource.ns..self.rid..'-'..group
end
return QlessResource.ns..self.rid
end
function QlessResource:acquire(now, priority, jid)
local keyLocks = self:prefix('locks')
local max = self:get()
if max == nil then
error({code=1, msg='Acquire(): resource ' .. self.rid .. ' does not exist'})
end
if type(jid) ~= 'string' then
error({code=2, msg='Acquire(): invalid jid; expected string, got \'' .. type(jid) .. '\''})
end
-- check if already has a lock, then just return. This is used for when multiple resources are needed.
if redis.call('sismember', self:prefix('locks'), jid) == 1 then
return true
end
local remaining = max - redis.pcall('scard', keyLocks)
if remaining > 0 then
-- acquire a lock and release it from the pending queue
redis.call('sadd', keyLocks, jid)
redis.call('zrem', self:prefix('pending'), jid)
return true
end
-- check if already pending, then don't update its priority.
if redis.call('zscore', self:prefix('pending'), jid) == false then
redis.call('zadd', self:prefix('pending'), priority - (now / 10000000000), jid)
end
return false
end
--- Releases the resource for the specified job identifier and assigns it to the next waiting job
-- @param now
-- @param jid
--
function QlessResource:release(now, jid)
local keyLocks = self:prefix('locks')
local keyPending = self:prefix('pending')
redis.call('srem', keyLocks, jid)
redis.call('zrem', keyPending, jid)
local jids = redis.call('zrevrange', keyPending, 0, 0, 'withscores')
if #jids == 0 then
return false
end
local newJid = jids[1]
local score = jids[2]
-- we know there is capacity to get this released resource but need to check all resources in case multiple.
if Qless.job(newJid):acquire_resources(now) then
local data = Qless.job(newJid):data()
local queue = Qless.queue(data['queue'])
queue.work.add(score, 0, newJid)
end
return newJid
end
--- Return the list of job IDs with locks for this resource
--
function QlessResource:locks()
return redis.call('smembers', self:prefix('locks'))
end
--- Return the number of active locks for this resource
--
function QlessResource:lock_count()
return redis.call('scard', self:prefix('locks'))
end
--- Return the list of job identifiers waiting for this resource
--
function QlessResource:pending()
return redis.call('zrevrange', self:prefix('pending'), 0, -1)
end
--- Return the number of jobs waiting for this resource
--
function QlessResource:pending_count()
return redis.call('zcard', self:prefix('pending'))
end
function QlessResource:exists()
return redis.call('exists', self:prefix()) == 1
end
---- Return true if all resources exist
--
function QlessResource.all_exist(resources)
for _, res in ipairs(resources) do
if redis.call('exists', QlessResource.ns .. res) == 0 then
return false
end
end
return true
end
-- Return resources pending
-- [
-- {
-- 'name': 'res',
-- 'count': 5
-- }, {
-- 'name': 'res',
-- 'count': 2
-- }
-- ]
function QlessResource.pending_counts(now)
local search = QlessResource.ns..'*-pending'
local reply = redis.call('keys', search)
local response = {}
for index, rname in ipairs(reply) do
local count = redis.call('zcard', rname)
local resStat = {name = rname, count = count}
table.insert(response,resStat)
end
return response
end
-- Return resources locks
-- [
-- {
-- 'name': 'res',
-- 'count': 5
-- }, {
-- 'name': 'res',
-- 'count': 2
-- }
-- ]
function QlessResource.locks_counts(now)
local search = QlessResource.ns..'*-locks'
local reply = redis.call('keys', search)
local response = {}
for index, rname in ipairs(reply) do
local count = redis.call('scard', rname)
local resStat = {name = rname, count = count}
table.insert(response,resStat)
end
return response
end