From 3a3b0be09a11a9db7d2c1322cfbb8bd9b670a079 Mon Sep 17 00:00:00 2001 From: Jingning Han Date: Wed, 1 Jul 2015 14:58:13 -0700 Subject: Move multi-threading module functions into vpx_thread folder This commit moves the primitive multi-threading files from vp9 folder to vpx_thread, which will be accessible by all vpx codec. Change-Id: Ib51e66e9c69801c10631fab56d35a0c0aaed5883 --- libs.mk | 3 + test/vp9_thread_test.cc | 2 +- vp9/common/vp9_onyxc_int.h | 2 +- vp9/common/vp9_thread.c | 184 ---------------------------------- vp9/common/vp9_thread.h | 223 ----------------------------------------- vp9/common/vp9_thread_common.h | 2 +- vp9/decoder/vp9_decodeframe.c | 2 +- vp9/decoder/vp9_decoder.c | 2 +- vp9/decoder/vp9_decoder.h | 3 +- vp9/decoder/vp9_dthread.h | 2 +- vp9/encoder/vp9_encoder.h | 2 +- vp9/vp9_common.mk | 2 - vp9/vp9_dx_iface.c | 2 +- vpx_thread/vpx_thread.c | 184 ++++++++++++++++++++++++++++++++++ vpx_thread/vpx_thread.h | 223 +++++++++++++++++++++++++++++++++++++++++ vpx_thread/vpx_thread.mk | 13 +++ 16 files changed, 433 insertions(+), 418 deletions(-) delete mode 100644 vp9/common/vp9_thread.c delete mode 100644 vp9/common/vp9_thread.h create mode 100644 vpx_thread/vpx_thread.c create mode 100644 vpx_thread/vpx_thread.h create mode 100644 vpx_thread/vpx_thread.mk diff --git a/libs.mk b/libs.mk index 6215990c9..165002895 100644 --- a/libs.mk +++ b/libs.mk @@ -50,6 +50,9 @@ CODEC_SRCS-yes += $(addprefix vpx_ports/,$(call enabled,PORTS_SRCS)) include $(SRC_PATH_BARE)/vpx_dsp/vpx_dsp.mk CODEC_SRCS-yes += $(addprefix vpx_dsp/,$(call enabled,DSP_SRCS)) +include $(SRC_PATH_BARE)/vpx_thread/vpx_thread.mk +CODEC_SRCS-yes += $(addprefix vpx_thread/,$(call enabled,THREAD_SRCS)) + ifneq ($(CONFIG_VP8_ENCODER)$(CONFIG_VP8_DECODER),) VP8_PREFIX=vp8/ include $(SRC_PATH_BARE)/$(VP8_PREFIX)vp8_common.mk diff --git a/test/vp9_thread_test.cc b/test/vp9_thread_test.cc index 902a6fc35..e08e44ec9 100644 --- a/test/vp9_thread_test.cc +++ b/test/vp9_thread_test.cc @@ -18,7 +18,7 @@ #if CONFIG_WEBM_IO #include "test/webm_video_source.h" #endif -#include "vp9/common/vp9_thread.h" +#include "vpx_thread/vpx_thread.h" namespace { diff --git a/vp9/common/vp9_onyxc_int.h b/vp9/common/vp9_onyxc_int.h index 1811d76df..c5e2b3ab4 100644 --- a/vp9/common/vp9_onyxc_int.h +++ b/vp9/common/vp9_onyxc_int.h @@ -13,6 +13,7 @@ #include "./vpx_config.h" #include "vpx/internal/vpx_codec_internal.h" +#include "vpx_thread/vpx_thread.h" #include "./vp9_rtcd.h" #include "vp9/common/vp9_alloccommon.h" #include "vp9/common/vp9_loopfilter.h" @@ -21,7 +22,6 @@ #include "vp9/common/vp9_entropymode.h" #include "vp9/common/vp9_frame_buffers.h" #include "vp9/common/vp9_quant_common.h" -#include "vp9/common/vp9_thread.h" #include "vp9/common/vp9_tile_common.h" #if CONFIG_VP9_POSTPROC diff --git a/vp9/common/vp9_thread.c b/vp9/common/vp9_thread.c deleted file mode 100644 index 1c6aec032..000000000 --- a/vp9/common/vp9_thread.c +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2013 Google Inc. All Rights Reserved. -// -// Use of this source code is governed by a BSD-style license -// that can be found in the COPYING file in the root of the source -// tree. An additional intellectual property rights grant can be found -// in the file PATENTS. All contributing project authors may -// be found in the AUTHORS file in the root of the source tree. -// ----------------------------------------------------------------------------- -// -// Multi-threaded worker -// -// Original source: -// http://git.chromium.org/webm/libwebp.git -// 100644 blob 264210ba2807e4da47eb5d18c04cf869d89b9784 src/utils/thread.c - -#include -#include // for memset() -#include "./vp9_thread.h" -#include "vpx_mem/vpx_mem.h" - -#if CONFIG_MULTITHREAD - -struct VP9WorkerImpl { - pthread_mutex_t mutex_; - pthread_cond_t condition_; - pthread_t thread_; -}; - -//------------------------------------------------------------------------------ - -static void execute(VP9Worker *const worker); // Forward declaration. - -static THREADFN thread_loop(void *ptr) { - VP9Worker *const worker = (VP9Worker*)ptr; - int done = 0; - while (!done) { - pthread_mutex_lock(&worker->impl_->mutex_); - while (worker->status_ == OK) { // wait in idling mode - pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); - } - if (worker->status_ == WORK) { - execute(worker); - worker->status_ = OK; - } else if (worker->status_ == NOT_OK) { // finish the worker - done = 1; - } - // signal to the main thread that we're done (for sync()) - pthread_cond_signal(&worker->impl_->condition_); - pthread_mutex_unlock(&worker->impl_->mutex_); - } - return THREAD_RETURN(NULL); // Thread is finished -} - -// main thread state control -static void change_state(VP9Worker *const worker, - VP9WorkerStatus new_status) { - // No-op when attempting to change state on a thread that didn't come up. - // Checking status_ without acquiring the lock first would result in a data - // race. - if (worker->impl_ == NULL) return; - - pthread_mutex_lock(&worker->impl_->mutex_); - if (worker->status_ >= OK) { - // wait for the worker to finish - while (worker->status_ != OK) { - pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); - } - // assign new status and release the working thread if needed - if (new_status != OK) { - worker->status_ = new_status; - pthread_cond_signal(&worker->impl_->condition_); - } - } - pthread_mutex_unlock(&worker->impl_->mutex_); -} - -#endif // CONFIG_MULTITHREAD - -//------------------------------------------------------------------------------ - -static void init(VP9Worker *const worker) { - memset(worker, 0, sizeof(*worker)); - worker->status_ = NOT_OK; -} - -static int sync(VP9Worker *const worker) { -#if CONFIG_MULTITHREAD - change_state(worker, OK); -#endif - assert(worker->status_ <= OK); - return !worker->had_error; -} - -static int reset(VP9Worker *const worker) { - int ok = 1; - worker->had_error = 0; - if (worker->status_ < OK) { -#if CONFIG_MULTITHREAD - worker->impl_ = (VP9WorkerImpl*)vpx_calloc(1, sizeof(*worker->impl_)); - if (worker->impl_ == NULL) { - return 0; - } - if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) { - goto Error; - } - if (pthread_cond_init(&worker->impl_->condition_, NULL)) { - pthread_mutex_destroy(&worker->impl_->mutex_); - goto Error; - } - pthread_mutex_lock(&worker->impl_->mutex_); - ok = !pthread_create(&worker->impl_->thread_, NULL, thread_loop, worker); - if (ok) worker->status_ = OK; - pthread_mutex_unlock(&worker->impl_->mutex_); - if (!ok) { - pthread_mutex_destroy(&worker->impl_->mutex_); - pthread_cond_destroy(&worker->impl_->condition_); - Error: - vpx_free(worker->impl_); - worker->impl_ = NULL; - return 0; - } -#else - worker->status_ = OK; -#endif - } else if (worker->status_ > OK) { - ok = sync(worker); - } - assert(!ok || (worker->status_ == OK)); - return ok; -} - -static void execute(VP9Worker *const worker) { - if (worker->hook != NULL) { - worker->had_error |= !worker->hook(worker->data1, worker->data2); - } -} - -static void launch(VP9Worker *const worker) { -#if CONFIG_MULTITHREAD - change_state(worker, WORK); -#else - execute(worker); -#endif -} - -static void end(VP9Worker *const worker) { -#if CONFIG_MULTITHREAD - if (worker->impl_ != NULL) { - change_state(worker, NOT_OK); - pthread_join(worker->impl_->thread_, NULL); - pthread_mutex_destroy(&worker->impl_->mutex_); - pthread_cond_destroy(&worker->impl_->condition_); - vpx_free(worker->impl_); - worker->impl_ = NULL; - } -#else - worker->status_ = NOT_OK; - assert(worker->impl_ == NULL); -#endif - assert(worker->status_ == NOT_OK); -} - -//------------------------------------------------------------------------------ - -static VP9WorkerInterface g_worker_interface = { - init, reset, sync, launch, execute, end -}; - -int vp9_set_worker_interface(const VP9WorkerInterface* const winterface) { - if (winterface == NULL || - winterface->init == NULL || winterface->reset == NULL || - winterface->sync == NULL || winterface->launch == NULL || - winterface->execute == NULL || winterface->end == NULL) { - return 0; - } - g_worker_interface = *winterface; - return 1; -} - -const VP9WorkerInterface *vp9_get_worker_interface(void) { - return &g_worker_interface; -} - -//------------------------------------------------------------------------------ diff --git a/vp9/common/vp9_thread.h b/vp9/common/vp9_thread.h deleted file mode 100644 index 12848fede..000000000 --- a/vp9/common/vp9_thread.h +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2013 Google Inc. All Rights Reserved. -// -// Use of this source code is governed by a BSD-style license -// that can be found in the COPYING file in the root of the source -// tree. An additional intellectual property rights grant can be found -// in the file PATENTS. All contributing project authors may -// be found in the AUTHORS file in the root of the source tree. -// ----------------------------------------------------------------------------- -// -// Multi-threaded worker -// -// Original source: -// http://git.chromium.org/webm/libwebp.git -// 100644 blob 7bd451b124ae3b81596abfbcc823e3cb129d3a38 src/utils/thread.h - -#ifndef VP9_DECODER_VP9_THREAD_H_ -#define VP9_DECODER_VP9_THREAD_H_ - -#include "./vpx_config.h" - -#ifdef __cplusplus -extern "C" { -#endif - -// Set maximum decode threads to be 8 due to the limit of frame buffers -// and not enough semaphores in the emulation layer on windows. -#define MAX_DECODE_THREADS 8 - -#if CONFIG_MULTITHREAD - -#if defined(_WIN32) && !HAVE_PTHREAD_H -#include // NOLINT -#include // NOLINT -#include // NOLINT -typedef HANDLE pthread_t; -typedef CRITICAL_SECTION pthread_mutex_t; -typedef struct { - HANDLE waiting_sem_; - HANDLE received_sem_; - HANDLE signal_event_; -} pthread_cond_t; - -//------------------------------------------------------------------------------ -// simplistic pthread emulation layer - -// _beginthreadex requires __stdcall -#define THREADFN unsigned int __stdcall -#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) - -static INLINE int pthread_create(pthread_t* const thread, const void* attr, - unsigned int (__stdcall *start)(void*), - void* arg) { - (void)attr; - *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ - 0, /* unsigned stack_size */ - start, - arg, - 0, /* unsigned initflag */ - NULL); /* unsigned *thrdaddr */ - if (*thread == NULL) return 1; - SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); - return 0; -} - -static INLINE int pthread_join(pthread_t thread, void** value_ptr) { - (void)value_ptr; - return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || - CloseHandle(thread) == 0); -} - -// Mutex -static INLINE int pthread_mutex_init(pthread_mutex_t *const mutex, - void* mutexattr) { - (void)mutexattr; - InitializeCriticalSection(mutex); - return 0; -} - -static INLINE int pthread_mutex_trylock(pthread_mutex_t *const mutex) { - return TryEnterCriticalSection(mutex) ? 0 : EBUSY; -} - -static INLINE int pthread_mutex_lock(pthread_mutex_t *const mutex) { - EnterCriticalSection(mutex); - return 0; -} - -static INLINE int pthread_mutex_unlock(pthread_mutex_t *const mutex) { - LeaveCriticalSection(mutex); - return 0; -} - -static INLINE int pthread_mutex_destroy(pthread_mutex_t *const mutex) { - DeleteCriticalSection(mutex); - return 0; -} - -// Condition -static INLINE int pthread_cond_destroy(pthread_cond_t *const condition) { - int ok = 1; - ok &= (CloseHandle(condition->waiting_sem_) != 0); - ok &= (CloseHandle(condition->received_sem_) != 0); - ok &= (CloseHandle(condition->signal_event_) != 0); - return !ok; -} - -static INLINE int pthread_cond_init(pthread_cond_t *const condition, - void* cond_attr) { - (void)cond_attr; - condition->waiting_sem_ = CreateSemaphore(NULL, 0, MAX_DECODE_THREADS, NULL); - condition->received_sem_ = CreateSemaphore(NULL, 0, MAX_DECODE_THREADS, NULL); - condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); - if (condition->waiting_sem_ == NULL || - condition->received_sem_ == NULL || - condition->signal_event_ == NULL) { - pthread_cond_destroy(condition); - return 1; - } - return 0; -} - -static INLINE int pthread_cond_signal(pthread_cond_t *const condition) { - int ok = 1; - if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { - // a thread is waiting in pthread_cond_wait: allow it to be notified - ok = SetEvent(condition->signal_event_); - // wait until the event is consumed so the signaler cannot consume - // the event via its own pthread_cond_wait. - ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != - WAIT_OBJECT_0); - } - return !ok; -} - -static INLINE int pthread_cond_wait(pthread_cond_t *const condition, - pthread_mutex_t *const mutex) { - int ok; - // note that there is a consumer available so the signal isn't dropped in - // pthread_cond_signal - if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) - return 1; - // now unlock the mutex so pthread_cond_signal may be issued - pthread_mutex_unlock(mutex); - ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == - WAIT_OBJECT_0); - ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); - pthread_mutex_lock(mutex); - return !ok; -} -#else // _WIN32 -#include // NOLINT -# define THREADFN void* -# define THREAD_RETURN(val) val -#endif - -#endif // CONFIG_MULTITHREAD - -// State of the worker thread object -typedef enum { - NOT_OK = 0, // object is unusable - OK, // ready to work - WORK // busy finishing the current task -} VP9WorkerStatus; - -// Function to be called by the worker thread. Takes two opaque pointers as -// arguments (data1 and data2), and should return false in case of error. -typedef int (*VP9WorkerHook)(void*, void*); - -// Platform-dependent implementation details for the worker. -typedef struct VP9WorkerImpl VP9WorkerImpl; - -// Synchronization object used to launch job in the worker thread -typedef struct { - VP9WorkerImpl *impl_; - VP9WorkerStatus status_; - VP9WorkerHook hook; // hook to call - void *data1; // first argument passed to 'hook' - void *data2; // second argument passed to 'hook' - int had_error; // return value of the last call to 'hook' -} VP9Worker; - -// The interface for all thread-worker related functions. All these functions -// must be implemented. -typedef struct { - // Must be called first, before any other method. - void (*init)(VP9Worker *const worker); - // Must be called to initialize the object and spawn the thread. Re-entrant. - // Will potentially launch the thread. Returns false in case of error. - int (*reset)(VP9Worker *const worker); - // Makes sure the previous work is finished. Returns true if worker->had_error - // was not set and no error condition was triggered by the working thread. - int (*sync)(VP9Worker *const worker); - // Triggers the thread to call hook() with data1 and data2 arguments. These - // hook/data1/data2 values can be changed at any time before calling this - // function, but not be changed afterward until the next call to Sync(). - void (*launch)(VP9Worker *const worker); - // This function is similar to launch() except that it calls the - // hook directly instead of using a thread. Convenient to bypass the thread - // mechanism while still using the VP9Worker structs. sync() must - // still be called afterward (for error reporting). - void (*execute)(VP9Worker *const worker); - // Kill the thread and terminate the object. To use the object again, one - // must call reset() again. - void (*end)(VP9Worker *const worker); -} VP9WorkerInterface; - -// Install a new set of threading functions, overriding the defaults. This -// should be done before any workers are started, i.e., before any encoding or -// decoding takes place. The contents of the interface struct are copied, it -// is safe to free the corresponding memory after this call. This function is -// not thread-safe. Return false in case of invalid pointer or methods. -int vp9_set_worker_interface(const VP9WorkerInterface *const winterface); - -// Retrieve the currently set thread worker interface. -const VP9WorkerInterface *vp9_get_worker_interface(void); - -//------------------------------------------------------------------------------ - -#ifdef __cplusplus -} // extern "C" -#endif - -#endif // VP9_DECODER_VP9_THREAD_H_ diff --git a/vp9/common/vp9_thread_common.h b/vp9/common/vp9_thread_common.h index 3b3a6996a..f8221a851 100644 --- a/vp9/common/vp9_thread_common.h +++ b/vp9/common/vp9_thread_common.h @@ -12,7 +12,7 @@ #define VP9_COMMON_VP9_LOOPFILTER_THREAD_H_ #include "./vpx_config.h" #include "vp9/common/vp9_loopfilter.h" -#include "vp9/common/vp9_thread.h" +#include "vpx_thread/vpx_thread.h" struct VP9Common; struct FRAME_COUNTS; diff --git a/vp9/decoder/vp9_decodeframe.c b/vp9/decoder/vp9_decodeframe.c index 659b84848..4302d825f 100644 --- a/vp9/decoder/vp9_decodeframe.c +++ b/vp9/decoder/vp9_decodeframe.c @@ -18,6 +18,7 @@ #include "vpx_ports/mem.h" #include "vpx_ports/mem_ops.h" #include "vpx_scale/vpx_scale.h" +#include "vpx_thread/vpx_thread.h" #include "vp9/common/vp9_alloccommon.h" #include "vp9/common/vp9_common.h" @@ -30,7 +31,6 @@ #include "vp9/common/vp9_reconintra.h" #include "vp9/common/vp9_reconinter.h" #include "vp9/common/vp9_seg_common.h" -#include "vp9/common/vp9_thread.h" #include "vp9/common/vp9_tile_common.h" #include "vp9/decoder/vp9_decodeframe.h" diff --git a/vp9/decoder/vp9_decoder.c b/vp9/decoder/vp9_decoder.c index 7991a39e6..5f5e201d8 100644 --- a/vp9/decoder/vp9_decoder.c +++ b/vp9/decoder/vp9_decoder.c @@ -20,6 +20,7 @@ #include "vpx_ports/vpx_once.h" #include "vpx_ports/vpx_timer.h" #include "vpx_scale/vpx_scale.h" +#include "vpx_thread/vpx_thread.h" #include "vp9/common/vp9_alloccommon.h" #include "vp9/common/vp9_loopfilter.h" @@ -30,7 +31,6 @@ #include "vp9/common/vp9_quant_common.h" #include "vp9/common/vp9_reconintra.h" #include "vp9/common/vp9_systemdependent.h" -#include "vp9/common/vp9_thread.h" #include "vp9/decoder/vp9_decodeframe.h" #include "vp9/decoder/vp9_decoder.h" diff --git a/vp9/decoder/vp9_decoder.h b/vp9/decoder/vp9_decoder.h index c19f0ac3b..8a6590899 100644 --- a/vp9/decoder/vp9_decoder.h +++ b/vp9/decoder/vp9_decoder.h @@ -15,10 +15,11 @@ #include "vpx/vpx_codec.h" #include "vpx_scale/yv12config.h" +#include "vpx_thread/vpx_thread.h" + #include "vp9/common/vp9_thread_common.h" #include "vp9/common/vp9_onyxc_int.h" #include "vp9/common/vp9_ppflags.h" -#include "vp9/common/vp9_thread.h" #include "vp9/decoder/vp9_dthread.h" #include "vp9/decoder/vp9_reader.h" diff --git a/vp9/decoder/vp9_dthread.h b/vp9/decoder/vp9_dthread.h index 979cb3d8b..80445816e 100644 --- a/vp9/decoder/vp9_dthread.h +++ b/vp9/decoder/vp9_dthread.h @@ -12,7 +12,7 @@ #define VP9_DECODER_VP9_DTHREAD_H_ #include "./vpx_config.h" -#include "vp9/common/vp9_thread.h" +#include "vpx_thread/vpx_thread.h" #include "vpx/internal/vpx_codec_internal.h" struct VP9Common; diff --git a/vp9/encoder/vp9_encoder.h b/vp9/encoder/vp9_encoder.h index 2b0da103f..34075bee7 100644 --- a/vp9/encoder/vp9_encoder.h +++ b/vp9/encoder/vp9_encoder.h @@ -16,13 +16,13 @@ #include "./vpx_config.h" #include "vpx/internal/vpx_codec_internal.h" #include "vpx/vp8cx.h" +#include "vpx_thread/vpx_thread.h" #include "vp9/common/vp9_alloccommon.h" #include "vp9/common/vp9_ppflags.h" #include "vp9/common/vp9_entropymode.h" #include "vp9/common/vp9_thread_common.h" #include "vp9/common/vp9_onyxc_int.h" -#include "vp9/common/vp9_thread.h" #include "vp9/encoder/vp9_aq_cyclicrefresh.h" #include "vp9/encoder/vp9_context_tree.h" diff --git a/vp9/vp9_common.mk b/vp9/vp9_common.mk index 6f091eefb..fd611bf9a 100644 --- a/vp9/vp9_common.mk +++ b/vp9/vp9_common.mk @@ -51,8 +51,6 @@ VP9_COMMON_SRCS-yes += common/vp9_seg_common.h VP9_COMMON_SRCS-yes += common/vp9_seg_common.c VP9_COMMON_SRCS-yes += common/vp9_systemdependent.h VP9_COMMON_SRCS-yes += common/vp9_textblit.h -VP9_COMMON_SRCS-yes += common/vp9_thread.h -VP9_COMMON_SRCS-yes += common/vp9_thread.c VP9_COMMON_SRCS-yes += common/vp9_tile_common.h VP9_COMMON_SRCS-yes += common/vp9_tile_common.c VP9_COMMON_SRCS-yes += common/vp9_loopfilter.c diff --git a/vp9/vp9_dx_iface.c b/vp9/vp9_dx_iface.c index 4080d64c1..0af944312 100644 --- a/vp9/vp9_dx_iface.c +++ b/vp9/vp9_dx_iface.c @@ -17,10 +17,10 @@ #include "vpx/internal/vpx_codec_internal.h" #include "vpx/vp8dx.h" #include "vpx/vpx_decoder.h" +#include "vpx_thread/vpx_thread.h" #include "vp9/common/vp9_alloccommon.h" #include "vp9/common/vp9_frame_buffers.h" -#include "vp9/common/vp9_thread.h" #include "vp9/decoder/vp9_decoder.h" #include "vp9/decoder/vp9_decodeframe.h" diff --git a/vpx_thread/vpx_thread.c b/vpx_thread/vpx_thread.c new file mode 100644 index 000000000..c8b4f601b --- /dev/null +++ b/vpx_thread/vpx_thread.c @@ -0,0 +1,184 @@ +// Copyright 2013 Google Inc. All Rights Reserved. +// +// Use of this source code is governed by a BSD-style license +// that can be found in the COPYING file in the root of the source +// tree. An additional intellectual property rights grant can be found +// in the file PATENTS. All contributing project authors may +// be found in the AUTHORS file in the root of the source tree. +// ----------------------------------------------------------------------------- +// +// Multi-threaded worker +// +// Original source: +// http://git.chromium.org/webm/libwebp.git +// 100644 blob 264210ba2807e4da47eb5d18c04cf869d89b9784 src/utils/thread.c + +#include +#include // for memset() +#include "./vpx_thread.h" +#include "vpx_mem/vpx_mem.h" + +#if CONFIG_MULTITHREAD + +struct VP9WorkerImpl { + pthread_mutex_t mutex_; + pthread_cond_t condition_; + pthread_t thread_; +}; + +//------------------------------------------------------------------------------ + +static void execute(VP9Worker *const worker); // Forward declaration. + +static THREADFN thread_loop(void *ptr) { + VP9Worker *const worker = (VP9Worker*)ptr; + int done = 0; + while (!done) { + pthread_mutex_lock(&worker->impl_->mutex_); + while (worker->status_ == OK) { // wait in idling mode + pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); + } + if (worker->status_ == WORK) { + execute(worker); + worker->status_ = OK; + } else if (worker->status_ == NOT_OK) { // finish the worker + done = 1; + } + // signal to the main thread that we're done (for sync()) + pthread_cond_signal(&worker->impl_->condition_); + pthread_mutex_unlock(&worker->impl_->mutex_); + } + return THREAD_RETURN(NULL); // Thread is finished +} + +// main thread state control +static void change_state(VP9Worker *const worker, + VP9WorkerStatus new_status) { + // No-op when attempting to change state on a thread that didn't come up. + // Checking status_ without acquiring the lock first would result in a data + // race. + if (worker->impl_ == NULL) return; + + pthread_mutex_lock(&worker->impl_->mutex_); + if (worker->status_ >= OK) { + // wait for the worker to finish + while (worker->status_ != OK) { + pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); + } + // assign new status and release the working thread if needed + if (new_status != OK) { + worker->status_ = new_status; + pthread_cond_signal(&worker->impl_->condition_); + } + } + pthread_mutex_unlock(&worker->impl_->mutex_); +} + +#endif // CONFIG_MULTITHREAD + +//------------------------------------------------------------------------------ + +static void init(VP9Worker *const worker) { + memset(worker, 0, sizeof(*worker)); + worker->status_ = NOT_OK; +} + +static int sync(VP9Worker *const worker) { +#if CONFIG_MULTITHREAD + change_state(worker, OK); +#endif + assert(worker->status_ <= OK); + return !worker->had_error; +} + +static int reset(VP9Worker *const worker) { + int ok = 1; + worker->had_error = 0; + if (worker->status_ < OK) { +#if CONFIG_MULTITHREAD + worker->impl_ = (VP9WorkerImpl*)vpx_calloc(1, sizeof(*worker->impl_)); + if (worker->impl_ == NULL) { + return 0; + } + if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) { + goto Error; + } + if (pthread_cond_init(&worker->impl_->condition_, NULL)) { + pthread_mutex_destroy(&worker->impl_->mutex_); + goto Error; + } + pthread_mutex_lock(&worker->impl_->mutex_); + ok = !pthread_create(&worker->impl_->thread_, NULL, thread_loop, worker); + if (ok) worker->status_ = OK; + pthread_mutex_unlock(&worker->impl_->mutex_); + if (!ok) { + pthread_mutex_destroy(&worker->impl_->mutex_); + pthread_cond_destroy(&worker->impl_->condition_); + Error: + vpx_free(worker->impl_); + worker->impl_ = NULL; + return 0; + } +#else + worker->status_ = OK; +#endif + } else if (worker->status_ > OK) { + ok = sync(worker); + } + assert(!ok || (worker->status_ == OK)); + return ok; +} + +static void execute(VP9Worker *const worker) { + if (worker->hook != NULL) { + worker->had_error |= !worker->hook(worker->data1, worker->data2); + } +} + +static void launch(VP9Worker *const worker) { +#if CONFIG_MULTITHREAD + change_state(worker, WORK); +#else + execute(worker); +#endif +} + +static void end(VP9Worker *const worker) { +#if CONFIG_MULTITHREAD + if (worker->impl_ != NULL) { + change_state(worker, NOT_OK); + pthread_join(worker->impl_->thread_, NULL); + pthread_mutex_destroy(&worker->impl_->mutex_); + pthread_cond_destroy(&worker->impl_->condition_); + vpx_free(worker->impl_); + worker->impl_ = NULL; + } +#else + worker->status_ = NOT_OK; + assert(worker->impl_ == NULL); +#endif + assert(worker->status_ == NOT_OK); +} + +//------------------------------------------------------------------------------ + +static VP9WorkerInterface g_worker_interface = { + init, reset, sync, launch, execute, end +}; + +int vp9_set_worker_interface(const VP9WorkerInterface* const winterface) { + if (winterface == NULL || + winterface->init == NULL || winterface->reset == NULL || + winterface->sync == NULL || winterface->launch == NULL || + winterface->execute == NULL || winterface->end == NULL) { + return 0; + } + g_worker_interface = *winterface; + return 1; +} + +const VP9WorkerInterface *vp9_get_worker_interface(void) { + return &g_worker_interface; +} + +//------------------------------------------------------------------------------ diff --git a/vpx_thread/vpx_thread.h b/vpx_thread/vpx_thread.h new file mode 100644 index 000000000..12848fede --- /dev/null +++ b/vpx_thread/vpx_thread.h @@ -0,0 +1,223 @@ +// Copyright 2013 Google Inc. All Rights Reserved. +// +// Use of this source code is governed by a BSD-style license +// that can be found in the COPYING file in the root of the source +// tree. An additional intellectual property rights grant can be found +// in the file PATENTS. All contributing project authors may +// be found in the AUTHORS file in the root of the source tree. +// ----------------------------------------------------------------------------- +// +// Multi-threaded worker +// +// Original source: +// http://git.chromium.org/webm/libwebp.git +// 100644 blob 7bd451b124ae3b81596abfbcc823e3cb129d3a38 src/utils/thread.h + +#ifndef VP9_DECODER_VP9_THREAD_H_ +#define VP9_DECODER_VP9_THREAD_H_ + +#include "./vpx_config.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// Set maximum decode threads to be 8 due to the limit of frame buffers +// and not enough semaphores in the emulation layer on windows. +#define MAX_DECODE_THREADS 8 + +#if CONFIG_MULTITHREAD + +#if defined(_WIN32) && !HAVE_PTHREAD_H +#include // NOLINT +#include // NOLINT +#include // NOLINT +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef struct { + HANDLE waiting_sem_; + HANDLE received_sem_; + HANDLE signal_event_; +} pthread_cond_t; + +//------------------------------------------------------------------------------ +// simplistic pthread emulation layer + +// _beginthreadex requires __stdcall +#define THREADFN unsigned int __stdcall +#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) + +static INLINE int pthread_create(pthread_t* const thread, const void* attr, + unsigned int (__stdcall *start)(void*), + void* arg) { + (void)attr; + *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ + 0, /* unsigned stack_size */ + start, + arg, + 0, /* unsigned initflag */ + NULL); /* unsigned *thrdaddr */ + if (*thread == NULL) return 1; + SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); + return 0; +} + +static INLINE int pthread_join(pthread_t thread, void** value_ptr) { + (void)value_ptr; + return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || + CloseHandle(thread) == 0); +} + +// Mutex +static INLINE int pthread_mutex_init(pthread_mutex_t *const mutex, + void* mutexattr) { + (void)mutexattr; + InitializeCriticalSection(mutex); + return 0; +} + +static INLINE int pthread_mutex_trylock(pthread_mutex_t *const mutex) { + return TryEnterCriticalSection(mutex) ? 0 : EBUSY; +} + +static INLINE int pthread_mutex_lock(pthread_mutex_t *const mutex) { + EnterCriticalSection(mutex); + return 0; +} + +static INLINE int pthread_mutex_unlock(pthread_mutex_t *const mutex) { + LeaveCriticalSection(mutex); + return 0; +} + +static INLINE int pthread_mutex_destroy(pthread_mutex_t *const mutex) { + DeleteCriticalSection(mutex); + return 0; +} + +// Condition +static INLINE int pthread_cond_destroy(pthread_cond_t *const condition) { + int ok = 1; + ok &= (CloseHandle(condition->waiting_sem_) != 0); + ok &= (CloseHandle(condition->received_sem_) != 0); + ok &= (CloseHandle(condition->signal_event_) != 0); + return !ok; +} + +static INLINE int pthread_cond_init(pthread_cond_t *const condition, + void* cond_attr) { + (void)cond_attr; + condition->waiting_sem_ = CreateSemaphore(NULL, 0, MAX_DECODE_THREADS, NULL); + condition->received_sem_ = CreateSemaphore(NULL, 0, MAX_DECODE_THREADS, NULL); + condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); + if (condition->waiting_sem_ == NULL || + condition->received_sem_ == NULL || + condition->signal_event_ == NULL) { + pthread_cond_destroy(condition); + return 1; + } + return 0; +} + +static INLINE int pthread_cond_signal(pthread_cond_t *const condition) { + int ok = 1; + if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { + // a thread is waiting in pthread_cond_wait: allow it to be notified + ok = SetEvent(condition->signal_event_); + // wait until the event is consumed so the signaler cannot consume + // the event via its own pthread_cond_wait. + ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != + WAIT_OBJECT_0); + } + return !ok; +} + +static INLINE int pthread_cond_wait(pthread_cond_t *const condition, + pthread_mutex_t *const mutex) { + int ok; + // note that there is a consumer available so the signal isn't dropped in + // pthread_cond_signal + if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) + return 1; + // now unlock the mutex so pthread_cond_signal may be issued + pthread_mutex_unlock(mutex); + ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == + WAIT_OBJECT_0); + ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); + pthread_mutex_lock(mutex); + return !ok; +} +#else // _WIN32 +#include // NOLINT +# define THREADFN void* +# define THREAD_RETURN(val) val +#endif + +#endif // CONFIG_MULTITHREAD + +// State of the worker thread object +typedef enum { + NOT_OK = 0, // object is unusable + OK, // ready to work + WORK // busy finishing the current task +} VP9WorkerStatus; + +// Function to be called by the worker thread. Takes two opaque pointers as +// arguments (data1 and data2), and should return false in case of error. +typedef int (*VP9WorkerHook)(void*, void*); + +// Platform-dependent implementation details for the worker. +typedef struct VP9WorkerImpl VP9WorkerImpl; + +// Synchronization object used to launch job in the worker thread +typedef struct { + VP9WorkerImpl *impl_; + VP9WorkerStatus status_; + VP9WorkerHook hook; // hook to call + void *data1; // first argument passed to 'hook' + void *data2; // second argument passed to 'hook' + int had_error; // return value of the last call to 'hook' +} VP9Worker; + +// The interface for all thread-worker related functions. All these functions +// must be implemented. +typedef struct { + // Must be called first, before any other method. + void (*init)(VP9Worker *const worker); + // Must be called to initialize the object and spawn the thread. Re-entrant. + // Will potentially launch the thread. Returns false in case of error. + int (*reset)(VP9Worker *const worker); + // Makes sure the previous work is finished. Returns true if worker->had_error + // was not set and no error condition was triggered by the working thread. + int (*sync)(VP9Worker *const worker); + // Triggers the thread to call hook() with data1 and data2 arguments. These + // hook/data1/data2 values can be changed at any time before calling this + // function, but not be changed afterward until the next call to Sync(). + void (*launch)(VP9Worker *const worker); + // This function is similar to launch() except that it calls the + // hook directly instead of using a thread. Convenient to bypass the thread + // mechanism while still using the VP9Worker structs. sync() must + // still be called afterward (for error reporting). + void (*execute)(VP9Worker *const worker); + // Kill the thread and terminate the object. To use the object again, one + // must call reset() again. + void (*end)(VP9Worker *const worker); +} VP9WorkerInterface; + +// Install a new set of threading functions, overriding the defaults. This +// should be done before any workers are started, i.e., before any encoding or +// decoding takes place. The contents of the interface struct are copied, it +// is safe to free the corresponding memory after this call. This function is +// not thread-safe. Return false in case of invalid pointer or methods. +int vp9_set_worker_interface(const VP9WorkerInterface *const winterface); + +// Retrieve the currently set thread worker interface. +const VP9WorkerInterface *vp9_get_worker_interface(void); + +//------------------------------------------------------------------------------ + +#ifdef __cplusplus +} // extern "C" +#endif + +#endif // VP9_DECODER_VP9_THREAD_H_ diff --git a/vpx_thread/vpx_thread.mk b/vpx_thread/vpx_thread.mk new file mode 100644 index 000000000..0a4a3648a --- /dev/null +++ b/vpx_thread/vpx_thread.mk @@ -0,0 +1,13 @@ +## +## Copyright (c) 2015 The WebM project authors. All Rights Reserved. +## +## Use of this source code is governed by a BSD-style license +## that can be found in the LICENSE file in the root of the source +## tree. An additional intellectual property rights grant can be found +## in the file PATENTS. All contributing project authors may +## be found in the AUTHORS file in the root of the source tree. +## + +THREAD_SRCS-yes += vpx_thread.mk +THREAD_SRCS-yes += vpx_thread.c +THREAD_SRCS-yes += vpx_thread.h -- cgit v1.2.3