Building time limited workers with SIGALRM
Recently, I’ve been working on a system that needs to run tasks scheduled by either web requests, or other tasks. These tasks are trusted but sometimes problematic code. While developers generally write good code, we all make mistakes and the occasional slow job that can run for multiple hours gets written. These slow tasks can create problems if the volume of them goes up as they can consume all available resources. To prevent workers getting gummed up with slow tasks, we would need to have an upper bound on how long they could run.
What I was looking for in more detail
What I needed was a way to handle a unit of work (function/task) and if it doesn’t complete by X seconds, to abort that function/task. However, in a single-threaded application you can just ‘stop’ a function from running. For example:
- def do_work(args):
- while True:
- pass
will run forever, and nothing within the program as written would cause this to terminate. What I wanted to do was ensure that if the above code did happen that we could recover and continue to execute other work.
After doing some reading on what my options could be, I tried a few different solutions.
ProcessPool
I started off using a ProcessPool
and submitting tasks. Minion processes would do work and return results. The parent process could wait on results for a set amount of time and then timeout. Unfortunately, the timeout only releases the parent process, the minion continues to spin indefinitely, and ProcessPool
doesn’t give you a way to kill the stuck minion unless you exict the entire pool, which limits how big your pool can be to one.
This was the approach I went with on my initial solution. We wouldn’t be able to get a concurrent worker, but we’d be able to get a worker, which is more than we had before.
- def build_pool():
- return multiprocessing.Pool(processes=1)
- pool = build_pool()
- success = False
- try:
- # Send a task to the multiprocessing pool.
- result = pool.apply_async(func=do_work, args=task_args)
- # Will trigger a TimeoutError if the task execution runs long
- result.get(timeout=processing_timeout)
- success = True
- except TimeoutError:
- # When a task times out we kill the entire pool. This is necessary because
- # stdlib multiprocessing.Pool doesn't expose ways to terminate individual tasks.
- pool.terminate()
- # Make a new process pool for the next operation
- pool = build_pool()
Managing more children
The ProcessPool
solution was a dead end because I couldn’t kill only the problematic process. Instead, I would need to manage the pool of minions myself. I knew from recently refreshing my understanding of threading, that python offered classic thread synchronization abstractions in the multiprocessing
module. I could use a Queue
to push tasks to a group of worker processes. Those workers would then need to apply a timeout to their own task, or manage another child ProcessPool
which would get really expensive memory wise.
Using the old ways
In reading up more on how to solve this kind of hard timeout scenario, I came across signals, and one I had never used before – SIGALRM
.
> This signal typically indicates expiration of a timer that measures real or clock time. It is used by the alarm function, for example. gnu.org
Timers registered with signal.alarm()
would send an interrupt to the application was intriguing. When you kill a process with kill
or ctrl-c
you’re sending a signal to the process. If the SIGINT
signal could break up our while True
code, perhaps SIGALRM
could as well. Python provides signal.alarm()
for setting a timer, and signal.signal()
to register a signal handler:
- import signal
- def handle_alarm(signum, frame) -> None:
- print('alarm triggered!')
- raise ValueError('killed for timeout')
- # register the signal handler
- signal.signal(signal.SIGALRM, handle_alarm)
Then in our worker code we can set an alarm timeout, and then call our potentially long running function. Should that function run too long, we’ll get a signal which interrupts the process and jumps to the registered signal handler:
- # set a timeout for 2 seconds
- signal.alarm(2)
- # Call our task which contains an infinite loop
- do_work(task_args)
- # Clear the alarm timer
- signal.alarm(0)
Within the alarm signal handler, I raise an exception that results in the process terminating. If the signal handler doesn’t raise an error, the application will resume in the middle of that while True
loop and continues to run forever. I ended up with a parent process that supervises a collection of minions that apply timeouts:
- import multiprocessing
- import queue
- import signal
- import time
- NUM_PROCESS = 3
- def start_workers(processes, todo, done):
- # Check existing workers for aliveness, and spawn more minions as required.
- processes = [p for p in processes if p.is_alive()]
- if len(processes) == NUM_PROCESS:
- return processes
- while len(processes) < NUM_PROCESS:
- p = multiprocessing.Process(target=run_task, args=(todo, done))
- processes.append(p)
- p.start()
- return processes
- def main():
- queue_size = 10
- todo = multiprocessing.Queue(maxsize=queue_size)
- done = multiprocessing.Queue(maxsize=queue_size)
- processes = start_workers([], todo, done)
- def poll_done():
- """
- Pull from done queue and forward to storage
- """
- try:
- res = done.get_nowait()
- except queue.Empty:
- return False
- # Do IO with result
- print(f"completed: {res}")
- return True
- drain_threshold = queue_size / 2
- # Main work loop.
- i = 0
- while True:
- time.sleep(0.5)
- i += 1
- processes = start_workers(processes, todo, done)
- # If there are results collected, drain some results
- if done.qsize() > drain_threshold:
- poll_done()
- continue
- # Do IO to read from our worksource.
- # This is a counter in the prototype but could be an RPC call or kafka read
- try:
- todo.put(f"task # {i}", block=False)
- except queue.Full:
- pass
- if __name__ == "__main__":
- main()
And the minion process runs the following:
- # Our 'task' to be executed.
- def do_work(task):
- # Every 5th task is a bad one
- if "5" in task:
- while True:
- pass
- def run_task(
- todo: multiprocessing.Queue,
- done: multiprocessing.Queue,
- ) -> None:
- def handle_alarm(signum, frame) -> None:
- print('alarm triggered!')
- done.put('killed')
- raise ValueError('killed for timeout')
- signal.signal(signal.SIGALRM, handle_alarm)
- while True:
- try:
- task = todo.get_nowait()
- except queue.Empty:
- continue
- # Timeout in two seconds
- signal.alarm(2)
- # Run our task function that loops forever.
- do_work(task)
- print('run task: ' + task)
- done.put(task + ' complete by ' + multiprocessing.current_process().name)
- # reset the alarm
- signal.alarm(0)
This example doesn’t cleanly shutdown, as I’ve omitted some of the child process joining for brevity, but this example shows the alarm signal handler working well. This solution feels pretty good. It meets the requirements I had, and is a simple solution. Having to recreate workers when tasks timeout isn’t ideal, but this should be a rare event and the overhead won’t matter that much.
There are no comments, be the first!