For an overview of how to use php-resque, see README.md.
The following is a step-by-step breakdown of how php-resque operates.
What happens when you call Resque::enqueue()?
Resque::enqueue()callsResque_Job::create()with the same arguments it received.Resque_Job::create()checks that your$args(the third argument) are eithernullor in an arrayResque_Job::create()generates a job ID (a "token" in most of the docs)Resque_Job::create()pushes the job to the requested queue (first argument)Resque_Job::create(), if status monitoring is enabled for the job (fourth argument), callsResque_Job_Status::create()with the job ID as its only argumentResque_Job_Status::create()creates a key in Redis with the job ID in its name, and the current status (as well as a couple of timestamps) as its value, then returns control toResque_Job::create()Resque_Job::create()returns control toResque::enqueue(), with the job ID as a return valueResque::enqueue()triggers theafterEnqueueevent, then returns control to your application, again with the job ID as its return value
How do the workers process the queues?
Resque_Worker::work(), the main loop of the worker process, callsResque_Worker->reserve()to check for a jobResque_Worker->reserve()checks whether to use blocking pops or not (fromBLOCKING), then acts accordingly:
- Blocking Pop
Resque_Worker->reserve()callsResque_Job::reserveBlocking()with the entire queue list and the timeout (fromINTERVAL) as argumentsResque_Job::reserveBlocking()callsResque::blpop()(which in turn calls Redis'blpop, after prepping the queue list for the call, then processes the response for consistency with other aspects of the library, before finally returning control [and the queue/content of the retrieved job, if any] toResque_Job::reserveBlocking())Resque_Job::reserveBlocking()checks whether the job content is an array (it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque_Job::reserveBlocking()creates a newResque_Jobobject with the queue and content as constructor arguments to initialize the job itself, and returns it, along with control of the process, toResque_Worker->reserve()
- Queue Polling
Resque_Worker->reserve()iterates through the queue list, callingResque_Job::reserve()with the current queue's name as the sole argument on each passResque_Job::reserve()passes the queue name on toResque::pop(), which in turn calls Redis'lpopwith the same argument, then returns control (and the job content, if any) toResque_Job::reserve()Resque_Job::reserve()checks whether the job content is an array (as before, it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque_Job::reserve()creates a newResque_Jobobject in the same manner as above, and also returns this object (along with control of the process) toResque_Worker->reserve()
- In either case,
Resque_Worker->reserve()returns the newResque_Jobobject, along with control, up toResque_Worker::work(); if no job is found, it simply returnsFALSE
- No Jobs
- If blocking mode is not enabled,
Resque_Worker::work()sleeps forINTERVALseconds; it callsusleep()for this, so fractional seconds are supported
- If blocking mode is not enabled,
- Job Reserved
Resque_Worker::work()triggers abeforeForkeventResque_Worker::work()callsResque_Worker->workingOn()with the newResque_Jobobject as its argumentResque_Worker->workingOn()does some reference assignments to help keep track of the worker/job relationship, then updates the job status fromWAITINGtoRUNNINGResque_Worker->workingOn()stores the newResque_Jobobject's payload in a Redis key associated to the worker itself (this is to prevent the job from being lost indefinitely, but does rely on that PID never being allocated on that host to a different worker process), then returns control toResque_Worker::work()Resque_Worker::work()forks a child process to run the actualperform()- The next steps differ between the worker and the child, now running in separate processes:
- Worker
- The worker waits for the job process to complete
- If the exit status is not 0, the worker calls
Resque_Job->fail()with aResque_Job_DirtyExitExceptionas its only argument. Resque_Job->fail()triggers anonFailureeventResque_Job->fail()updates the job status fromRUNNINGtoFAILEDResque_Job->fail()callsResque_Failure::create()with the job payload, theResque_Job_DirtyExitException, the internal ID of the worker, and the queue name as argumentsResque_Failure::create()creates a new object of whatever type has been set as theResque_Failure"backend" handler; by default, this is aResque_Failure_Redisobject, whose constructor simply collects the data passed intoResque_Failure::create()and pushes it into Redis in thefailedqueueResque_Job->fail()increments two failure counters in Redis: one for a total count, and one for the workerResque_Job->fail()returns control to the worker (still inResque_Worker::work()) without a value
- Job
Resque_Job_PIDis created, registering the PID of the actual process doing the job.- The job calls
Resque_Worker->perform()with theResque_Jobas its only argument. Resque_Worker->perform()sets up atry...catchblock so it can properly handle exceptions by marking jobs as failed (by callingResque_Job->fail(), as above)- Inside the
try...catch,Resque_Worker->perform()triggers anafterForkevent - Still inside the
try...catch,Resque_Worker->perform()callsResque_Job->perform()with no arguments Resque_Job->perform()callsResque_Job->getInstance()with no arguments- If
Resque_Job->getInstance()has already been called, it returns the existing instance; otherwise: Resque_Job->getInstance()checks that the job's class (type) exists and has aperform()method; if not, in either case, it throws an exception which will be caught byResque_Worker->perform()Resque_Job->getInstance()creates an instance of the job's class, and initializes it with a reference to theResque_Jobitself, the job's arguments (which it gets by callingResque_Job->getArguments(), which in turn simply returns the value ofargs[0], or an empty array if no arguments were passed), and the queue nameResque_Job->getInstance()returns control, along with the job class instance, toResque_Job->perform()Resque_Job->perform()sets up its owntry...catchblock to handleResque_Job_DontPerformexceptions; any other exceptions are passed up toResque_Worker->perform()Resque_Job->perform()triggers abeforePerformeventResque_Job->perform()callssetUp()on the instance, if it existsResque_Job->perform()callsperform()on the instanceResque_Job->perform()callstearDown()on the instance, if it existsResque_Job->perform()triggers anafterPerformevent- The
try...catchblock ends, suppressingResque_Job_DontPerformexceptions by returning control, and the valueFALSE, toResque_Worker->perform(); any other situation returns the valueTRUEalong with control, instead - The
try...catchblock inResque_Worker->perform()ends Resque_Worker->perform()updates the job status fromRUNNINGtoCOMPLETE, then returns control, with no value, to the worker (again still inResque_Worker::work())Resque_Job_PID()is removed, the forked process will terminate soon cleanlyResque_Worker::work()callsexit(0)to terminate the job process
- SPECIAL CASE: Non-forking OS (Windows)
- Same as the job above, except it doesn't call
exit(0)when done
- Same as the job above, except it doesn't call
Resque_Worker::work()callsResque_Worker->doneWorking()with no argumentsResque_Worker->doneWorking()increments two processed counters in Redis: one for a total count, and one for the workerResque_Worker->doneWorking()deletes the Redis key set inResque_Worker->workingOn(), then returns control, with no value, toResque_Worker::work()
Resque_Worker::work()returns control to the beginning of the main loop, where it will wait for the next job to become available, and start this process all over again