Building time limited workers with SIGALRM

Recently, I’ve been working on a system that needs to run tasks scheduled by either web requests, or other tasks. These tasks are trusted but sometimes problematic code. While developers generally write good code, we all make mistakes and the occasional slow job that can run for multiple hours gets written. These slow tasks can create problems if the volume of them goes up as they can consume all available resources. To prevent workers getting gummed up with slow tasks, we would need to have an upper bound on how long they could run.

What I was looking for in more detail

What I needed was a way to handle a unit of work (function/task) and if it doesn’t complete by X seconds, to abort that function/task. However, in a single-threaded application you can just ‘stop’ a function from running. For example:

Show Plain Text
  1. def do_work(args):
  2.     while True:
  3.         pass

will run forever, and nothing within the program as written would cause this to terminate. What I wanted to do was ensure that if the above code did happen that we could recover and continue to execute other work.

After doing some reading on what my options could be, I tried a few different solutions.

ProcessPool

I started off using a ProcessPool and submitting tasks. Minion processes would do work and return results. The parent process could wait on results for a set amount of time and then timeout. Unfortunately, the timeout only releases the parent process, the minion continues to spin indefinitely, and ProcessPool doesn’t give you a way to kill the stuck minion unless you exict the entire pool, which limits how big your pool can be to one.

This was the approach I went with on my initial solution. We wouldn’t be able to get a concurrent worker, but we’d be able to get a worker, which is more than we had before.

Show Plain Text
  1. def build_pool():
  2.     return multiprocessing.Pool(processes=1)
  3.  
  4. pool = build_pool()
  5.  
  6. success = False
  7. try:
  8.     # Send a task to the multiprocessing pool.
  9.     result = pool.apply_async(func=do_work, args=task_args)
  10.  
  11.     # Will trigger a TimeoutError if the task execution runs long
  12.     result.get(timeout=processing_timeout)
  13.  
  14.     success = True
  15. except TimeoutError:
  16.     # When a task times out we kill the entire pool. This is necessary because
  17.     # stdlib multiprocessing.Pool doesn't expose ways to terminate individual tasks.
  18.     pool.terminate()
  19.  
  20.     # Make a new process pool for the next operation
  21.     pool = build_pool()

Managing more children

The ProcessPool solution was a dead end because I couldn’t kill only the problematic process. Instead, I would need to manage the pool of minions myself. I knew from recently refreshing my understanding of threading, that python offered classic thread synchronization abstractions in the multiprocessing module. I could use a Queue to push tasks to a group of worker processes. Those workers would then need to apply a timeout to their own task, or manage another child ProcessPool which would get really expensive memory wise.

Using the old ways

In reading up more on how to solve this kind of hard timeout scenario, I came across signals, and one I had never used before – SIGALRM.

> This signal typically indicates expiration of a timer that measures real or clock time. It is used by the alarm function, for example. gnu.org

Timers registered with signal.alarm() would send an interrupt to the application was intriguing. When you kill a process with kill or ctrl-c you’re sending a signal to the process. If the SIGINT signal could break up our while True code, perhaps SIGALRM could as well. Python provides signal.alarm() for setting a timer, and signal.signal() to register a signal handler:

Show Plain Text
  1. import signal
  2.  
  3. def handle_alarm(signum, frame) -> None:
  4.     print('alarm triggered!')
  5.     raise ValueError('killed for timeout')
  6.  
  7. # register the signal handler
  8. signal.signal(signal.SIGALRM, handle_alarm)

Then in our worker code we can set an alarm timeout, and then call our potentially long running function. Should that function run too long, we’ll get a signal which interrupts the process and jumps to the registered signal handler:

Show Plain Text
  1. # set a timeout for 2 seconds
  2. signal.alarm(2)
  3.  
  4. # Call our task which contains an infinite loop
  5. do_work(task_args)
  6.  
  7. # Clear the alarm timer
  8. signal.alarm(0)

Within the alarm signal handler, I raise an exception that results in the process terminating. If the signal handler doesn’t raise an error, the application will resume in the middle of that while True loop and continues to run forever. I ended up with a parent process that supervises a collection of minions that apply timeouts:

Show Plain Text
  1. import multiprocessing
  2. import queue
  3. import signal
  4. import time
  5.  
  6. NUM_PROCESS = 3
  7.  
  8. def start_workers(processes, todo, done):
  9.     # Check existing workers for aliveness, and spawn more minions as required.
  10.     processes = [p for p in processes if p.is_alive()]
  11.     if len(processes) == NUM_PROCESS:
  12.         return processes
  13.  
  14.     while len(processes) < NUM_PROCESS:
  15.         p = multiprocessing.Process(target=run_task, args=(todo, done))
  16.         processes.append(p)
  17.         p.start()
  18.  
  19.     return processes
  20.  
  21.  
  22. def main():
  23.     queue_size = 10
  24.  
  25.     todo = multiprocessing.Queue(maxsize=queue_size)
  26.     done = multiprocessing.Queue(maxsize=queue_size)
  27.  
  28.     processes = start_workers([], todo, done)
  29.  
  30.     def poll_done():
  31.         """
  32.        Pull from done queue and forward to storage
  33.        """
  34.         try:
  35.             res = done.get_nowait()
  36.         except queue.Empty:
  37.             return False
  38.  
  39.         # Do IO with result
  40.         print(f"completed: {res}")
  41.         return True
  42.  
  43.     drain_threshold = queue_size / 2
  44.  
  45.     # Main work loop.
  46.     i = 0
  47.     while True:
  48.         time.sleep(0.5)
  49.         i += 1
  50.  
  51.         processes = start_workers(processes, todo, done)
  52.  
  53.         # If there are results collected, drain some results
  54.         if done.qsize() > drain_threshold:
  55.             poll_done()
  56.             continue
  57.  
  58.         # Do IO to read from our worksource.
  59.         # This is a counter in the prototype but could be an RPC call or kafka read
  60.         try:
  61.             todo.put(f"task # {i}", block=False)
  62.         except queue.Full:
  63.             pass
  64.  
  65. if __name__ == "__main__":
  66.     main()

And the minion process runs the following:

Show Plain Text
  1. # Our 'task' to be executed.
  2. def do_work(task):
  3.     # Every 5th task is a bad one
  4.     if "5" in task:
  5.         while True:
  6.             pass
  7.  
  8. def run_task(
  9.     todo: multiprocessing.Queue,
  10.     done: multiprocessing.Queue,
  11. ) -> None:
  12.     def handle_alarm(signum, frame) -> None:
  13.         print('alarm triggered!')
  14.         done.put('killed')
  15.         raise ValueError('killed for timeout')
  16.  
  17.     signal.signal(signal.SIGALRM, handle_alarm)
  18.  
  19.     while True:
  20.         try:
  21.             task = todo.get_nowait()
  22.         except queue.Empty:
  23.             continue
  24.  
  25.         # Timeout in two seconds
  26.         signal.alarm(2)
  27.  
  28.         # Run our task function that loops forever.
  29.         do_work(task)
  30.  
  31.         print('run task: ' + task)
  32.         done.put(task + ' complete by ' + multiprocessing.current_process().name)
  33.  
  34.         # reset the alarm
  35.         signal.alarm(0)

This example doesn’t cleanly shutdown, as I’ve omitted some of the child process joining for brevity, but this example shows the alarm signal handler working well. This solution feels pretty good. It meets the requirements I had, and is a simple solution. Having to recreate workers when tasks timeout isn’t ideal, but this should be a rare event and the overhead won’t matter that much.

Comments

There are no comments, be the first!

Have your say: