ThreadMan

A synchronised thread manager. Provides cross-platform management of threads (e.g. "joinAll", automatic cleanup), templating to match threadable functions and a "synchronised" mode that starts threads of the same function in series.

For example, you might have a function responsible for writing objects out to disk, another for sending files over a TCP connection, and a "main" thread that generates the objects. So that you can "fire and forget" the output functions and start generating the next object, a sample use case might be:

bool write_to_disk(Object* o);
bool write_to_server(Object* o);
/*...*/
ThreadMan &tm = ThreadMan::tman();
Object* obj;
while ((obj = generate())) {
    tm.run(ThreadMan::runner(write_to_disk), obj, true);
    tm.run(ThreadMan::runner(write_to_server), obj, true);
}
tm.joinSync();

The calls to tm.run return immediately, allowing the next object to be generated. However, successive calls to write_to_disk will be performed in serial (as for write_to_server), so there will only ever be 1 object being written to disk at a time, and only 1 object being written to the server -- the other objects will be queued automatically. Also note the type safety: template matching avoids the ugly cast-from-void* that is common for thread function pointers -- any pointer argument will be type-checked. Also, function objects and member functions are matched, so can be used to pass arbitrary types and numbers of arguments.

But note that the above is not really a tractable solution in a threaded program -- it will probably leak resources. A better way would be to copy the object (at least once), and have each write_to_? function delete their copy when done, thus removing leaks and avoiding possible race conditions caused by both threads accessing the same object. For more examples, see threadman/trunk/tests/test_cases.cpp.

browse trunk

threadman.h

semwaiter.h

The use case above provides one direction of concurrent data transfer -- writing out. But what about generate()? What if we want to generate the objects in the background thread. That is, start a batch task in the background that is constantly fed data from an external source. Whenever it receives a complete object (or a frame), signal the main thread that an object is ready. Also, we want to be able to tell the background thread to shutdown when the main thread no longer wants to receive objects.

For this, I built a special "waiter" semaphore -- an abstraction that encapsulates a semaphore for doing this. A cross-platform inline-able implementation is in semwaiter.h. There is also a DestroyWaiter -- a special kind of SemWaiter for signalling destruction. Here is a use-case

bool fill_frame(Frame** f, bool stop_last);

struct Provider {
  SemWaiter arg_ready;
  DestroyWaiter destroy;
  Frame* next_frame;
  SDL_Thread* thread_handle;

  bool generate_thread() {
      Frame* f;
      while (fill_frame(&f, !arg_ready.running())) {  /* processor-intensive batch task */
          if (arg_ready) {  /* blocks until main thread is ready for a new frame */
              next_frame = f;
              /* signal event queue in main thread, return immediately */
              pushMemberEvent(&Provider::update, this);
          } 
          /* else: waiter returned false after blocking (running() == false) */
          /* signal stop==true to fill_frame, then fill_frame will return false */
          /* fill_frame can return false at other times, too, when it has nothing else to generate */
      }
      destroy.ready_to_destroy(); /* fill_frame has cleaned up, ready to destroy */
  }

  bool update() {
      if (arg_ready.running()) {
          process(next_frame);
      }
      delete next_frame; /* could be moved to process..*/
      next_frame = 0;
      arg_ready.post(); /* we are ready for another */
  }

  ~Provider() { /* could also be a "cancel" function */
      if (!destroy.wants_to_destroy(arg_ready, &thread_handle, TIMEOUT_MS))
          /* TIMED OUT */;
  }

  Provider() : next_frame(0), thread_handle(0) {
      if (initialisation_successful()) {
          /* we don't want the destructor to block if we don't start the thread! */
          destroy.alternate_started();
          ThreadMan::srun(ThreadMan::runner(&Provider::generate_thread),
                          this, true, ThreadMan::IDLE, &thread_handle);
          arg_ready.post();  /* ready for first arg */
      }
  }

};