C语言-线程池代码

简介: github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。

说明

github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。


这里的代码是直接拷贝的,直接复制到自己的项目就能使用。


如果对线程池不是很熟悉的朋友,或者只需要支持linux的版本线程池,可以参考我另一篇博客的简单版本linux版本的线程池.


代码

thread-pool.h头文件

#ifndef _threadpool_h_
#define _threadpool_h_
#ifdef __cplusplus
extern "C" {
#endif
typedef void* thread_pool_t;
/**
 * @brief thread_pool_create 创建线程池
 * @param num 初始化的线程个数
 * @param min 最小的线程个数
 * @param max 最大的线程个数
 * @return 0-error, other-thread pool id
 */
thread_pool_t thread_pool_create(int num, int min, int max);
/**
 * @brief thread_pool_destroy 销毁线程池
 * @param pool
 */
void thread_pool_destroy(thread_pool_t pool);
/**
 * @brief 获取线程池中的线程个数
 * @param pool
 * @return <0-error code, >=0-thread count
 */
int thread_pool_threads_count(thread_pool_t pool);
///任务回调函数
typedef void (*thread_pool_proc)(void *param);
/**
 * @brief thread_pool_push 往线程池中放入一个任务
 * @param pool 线程池对象
 * @param proc 任务的函数指针
 * @param param 任务自定义函数参数
 * @return =0-ok, <0-error code
 */
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param);
#ifdef __cplusplus
}
#endif
#endif /* !_threadpool_h_ */

依赖头文件

locker.h

#ifndef _platform_locker_h_
#define _platform_locker_h_
#include <errno.h>
#if defined(WIN32)
#include <Windows.h>
typedef CRITICAL_SECTION  locker_t;
#else
#include <pthread.h>
typedef pthread_mutex_t  locker_t;
#endif
//-------------------------------------------------------------------------------------
// int locker_create(locker_t* locker);
// int locker_destroy(locker_t* locker);
// int locker_lock(locker_t* locker);
// int locker_unlock(locker_t* locker);
// int locker_trylock(locker_t* locker);
//-------------------------------------------------------------------------------------
static inline int locker_create(locker_t* locker)
{
#if defined(WIN32)
  InitializeCriticalSection(locker);
  return 0;
#else
  // create a recusive locker
  int r;
  pthread_mutexattr_t attr;
  pthread_mutexattr_init(&attr);
  // http://linux.die.net/man/3/pthread_mutexattr_settype
  // Application Usage:
  // It is advised that an application should not use a PTHREAD_MUTEX_RECURSIVE mutex 
  // with condition variables because the implicit unlock performed for a pthread_cond_timedwait() 
  // or pthread_cond_wait() may not actually release the mutex (if it had been locked multiple times). 
  // If this happens, no other thread can satisfy the condition of the predicate. 
#if defined(OS_LINUX) && defined(__GLIBC__)
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP);
#else
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
  r = pthread_mutex_init(locker, &attr);
  pthread_mutexattr_destroy(&attr);
  return r;
#endif
}
static inline int locker_destroy(locker_t* locker)
{
#if defined(WIN32)
  DeleteCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_destroy(locker);
#endif
}
static inline int locker_lock(locker_t* locker)
{
#if defined(WIN32)
  EnterCriticalSection(locker);
  return 0;
#else
    // These functions shall not return an error code of [EINTR].
  return pthread_mutex_lock(locker);
#endif
}
// linux: unlock thread must is the lock thread
static inline int locker_unlock(locker_t* locker)
{
#if defined(WIN32)
  LeaveCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_unlock(locker);
#endif
}
static inline int locker_trylock(locker_t* locker)
{
#if defined(WIN32)
  return TryEnterCriticalSection(locker)?0:-1;
#else
  return pthread_mutex_trylock(locker);
#endif
}
#endif /* !_platform_locker_h_ */

system.h

#ifndef _platform_system_h_
#define _platform_system_h_
#include <stdint.h>
#if defined(WIN32)
#include <Windows.h>
typedef HMODULE module_t;
typedef uint32_t useconds_t;
typedef FARPROC funcptr_t;
#else
#include <sys/types.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <pthread.h>
#include <dlfcn.h>
#include <errno.h>
#include <time.h>
typedef void* module_t;
typedef void (*funcptr_t)(void);
#endif
#if defined(OS_MAC)
#include <sys/param.h>
#include <sys/sysctl.h>
#include <mach/mach_time.h>  
#endif
#include <stdint.h>
#include <stdio.h>
//-----------------------------------------------------------------------
// void system_sleep(useconds_t millisecond);
// uint64_t system_time(void);
// uint64_t system_clock(void);
// int64_t system_getcyclecount(void);
// size_t system_getcpucount(void);
//
// int system_version(int* major, int* minor);
// module_t system_load(const char* module);
// int system_unload(module_t module);
// funcptr_t system_getproc(module_t module, const char* producer);
//-----------------------------------------------------------------------
//
///
/// implement
///
//
static inline void system_sleep(useconds_t milliseconds)
{
#if defined(WIN32)
  Sleep(milliseconds);
#else
  usleep(milliseconds*1000);
#endif
}
static inline size_t system_getcpucount(void)
{
#if defined(WIN32)
  SYSTEM_INFO sysinfo;
  GetSystemInfo(&sysinfo);
  return sysinfo.dwNumberOfProcessors;
#elif defined(OS_MAC) || defined(_FREEBSD_) || defined(_NETBSD_) || defined(_OPENBSD_)
  // FreeBSD, MacOS X, NetBSD, OpenBSD, etc.:
  int mib[4];
  size_t num;
  size_t len;
  mib[0] = CTL_HW;
  mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
    num = 0;
  len = sizeof(num);
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
  {
  mib[1] = HW_NCPU;
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
    num = 1;
  }
  return num;
#elif defined(_HPUX_)
  // HPUX:
  return mpctl(MPC_GETNUMSPUS, NULL, NULL);
#elif defined(_IRIX_)
  // IRIX:
  return sysconf(_SC_NPROC_ONLN);
#else
  // linux, Solaris, & AIX
  return sysconf(_SC_NPROCESSORS_ONLN);
  //"cat /proc/cpuinfo | grep processor | wc -l"
#endif
}
static inline int64_t system_getcyclecount(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceCounter(&count);
  QueryPerformanceFrequency(&freq);
#else
#endif
  return 0;
}
/// milliseconds since the Epoch(1970-01-01 00:00:00 +0000 (UTC))
static inline uint64_t system_time(void)
{
#if defined(WIN32)
  uint64_t t;
  FILETIME ft;
  GetSystemTimeAsFileTime(&ft);
  t = (uint64_t)ft.dwHighDateTime << 32 | ft.dwLowDateTime;
  return t / 10000 - 11644473600000ULL; /* Jan 1, 1601 */
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_REALTIME)
  struct timespec tp;
  clock_gettime(CLOCK_REALTIME, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
///@return milliseconds(relative time)
static inline uint64_t system_clock(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceFrequency(&freq);
  QueryPerformanceCounter(&count);
  return (uint64_t)count.QuadPart * 1000 / freq.QuadPart;
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_MONOTONIC)
  struct timespec tp;
  clock_gettime(CLOCK_MONOTONIC, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4996) // GetVersionEx
#pragma warning(disable: 28159)
#endif
static inline int system_version(int* major, int* minor)
{
#if defined(WIN32)
  /*
  Operating system  Version number 
  Windows 8    6.2
  Windows Server 2012  6.2
  Windows 7    6.1
  Windows Server 2008 R2  6.1
  Windows Server 2008  6.0 
  Windows Vista   6.0 
  Windows Server 2003 R2  5.2 
  Windows Server 2003  5.2 
  Windows XP    5.1 
  Windows 2000    5.0 
  Windows Me    4.90 
  Windows 98    4.10 
  Windows NT 4.0    4.0 
  Windows 95    4.0 
  */
  OSVERSIONINFO version;
  memset(&version, 0, sizeof(version));
  version.dwOSVersionInfoSize = sizeof(version);
  GetVersionEx(&version);
  *major = (int)(version.dwMajorVersion);
  *minor = (int)(version.dwMinorVersion);
  return 0;
#else
  struct utsname ver;
  if(0 != uname(&ver))
  return errno;
  if(2!=sscanf(ver.release, "%8d.%8d", major, minor))
  return -1;
  return 0;
#endif
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
//
///
/// dynamic module load/unload
///
//
static inline module_t system_load(const char* module)
{
#if defined(WIN32)
  return LoadLibraryExA(module, NULL, LOAD_WITH_ALTERED_SEARCH_PATH);
#else
  return dlopen(module, RTLD_LAZY|RTLD_LOCAL);
#endif
}
static inline int system_unload(module_t module)
{
#if defined(WIN32)
  return FreeLibrary(module);
#else
  return dlclose(module);
#endif
}
static inline funcptr_t system_getproc(module_t module, const char* producer)
{
#if defined(WIN32)
  return GetProcAddress(module, producer);
#else
  // https://linux.die.net/man/3/dlsym
  // cosine = (double (*)(double)) dlsym(handle, "cos")
  // ===> *(void **) (&cosine) = dlsym(handle, "cos");
  return (funcptr_t)dlsym(module, producer);
#endif
}
#endif /* !_platform_system_h_ */

thread.h

#ifndef _platform_thread_h_
#define _platform_thread_h_
#if defined(WIN32)
#include <Windows.h>
#include <process.h>
#ifndef STDCALL
#define STDCALL __stdcall
#endif
typedef struct
{
  DWORD id;
  HANDLE handle;
} pthread_t;
typedef DWORD tid_t;
#else
#include <pthread.h>
#include <sched.h>
typedef pthread_t tid_t;
#ifndef STDCALL
#define STDCALL
#endif
enum thread_priority
{
  THREAD_PRIORITY_IDLE    = 1,
  THREAD_PRIORITY_LOWEST    = 25,
  THREAD_PRIORITY_BELOW_NORMAL  = 40,
  THREAD_PRIORITY_NORMAL    = 50,
  THREAD_PRIORITY_ABOVE_NORMAL  = 60,
  THREAD_PRIORITY_HIGHEST   = 75,
  THREAD_PRIORITY_TIME_CRITICAL = 99,
};
#endif
//-------------------------------------------------------------------------------------
// int thread_create(pthread_t* thread, thread_proc func, void* param);
// int thread_destroy(pthread_t thread);
// int thread_detach(pthread_t thread);
// int thread_getpriority(pthread_t thread, int* priority);
// int thread_setpriority(pthread_t thread, int priority);
// int thread_isself(pthread_t thread);
// int thread_valid(pthread_t thread);
// int thread_yield(void);
// tid_t thread_getid(pthread_t thread);
// pthread_t thread_self(void);
//-------------------------------------------------------------------------------------
typedef int (STDCALL *thread_proc)(void* param);
static inline int thread_create2(pthread_t* thread, unsigned int stacksize, thread_proc func, void* param)
{
#if defined(WIN32)
  // https://msdn.microsoft.com/en-us/library/windows/desktop/ms682453.aspx
  // CreateThread function: By default, every thread has one megabyte of stack space. 
  // http://msdn.microsoft.com/en-us/library/windows/desktop/ms682453%28v=vs.85%29.aspx
  // A thread in an executable that calls the C run-time library (CRT) 
  // should use the _beginthreadex and _endthreadex functions for thread management 
  // rather than CreateThread and ExitThread;
  //thread->handle = CreateThread(NULL, stacksize, (LPTHREAD_START_ROUTINE)func, param, 0, &thread->id);
  typedef unsigned int(__stdcall *thread_routine)(void *);
  thread->handle = (HANDLE)_beginthreadex(NULL, stacksize, (thread_routine)func, param, 0, (unsigned int*)&thread->id);
  return NULL == thread->handle ? -1 : 0;
#else
  // https://linux.die.net/man/3/pthread_create
  // On Linux/x86-32, the default stack size for a new thread is 2 megabytes(10M 64bits)
  // http://udrepper.livejournal.com/20948.html
  // mallopt(M_ARENA_MAX, cpu); // limit multithread virtual memory
  typedef void* (*linux_thread_routine)(void*);
  int r;
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setstacksize(&attr, stacksize);
  r = pthread_create(thread, &attr, (linux_thread_routine)func, param);
  pthread_attr_destroy(&attr);
  return r;
#endif
}
static inline int thread_create(pthread_t* thread, thread_proc func, void* param)
{
  return thread_create2(thread, 0, func, param);
}
static inline int thread_destroy(pthread_t thread)
{
#if defined(WIN32)
  if(thread.id != GetCurrentThreadId())
  WaitForSingleObjectEx(thread.handle, INFINITE, TRUE);
  CloseHandle(thread.handle);
  return 0;
#else
  void* value = NULL;
  if(pthread_equal(pthread_self(),thread))
        return pthread_detach(thread);
  else
        return pthread_join(thread, &value);
#endif
}
static inline int thread_detach(pthread_t thread)
{
#if defined(WIN32)
  CloseHandle(thread.handle);
  return 0;
#else
  return pthread_detach(thread);
#endif
}
// priority: [-15, 15]
// 0: normal / -15: idle / 15: critical
static inline int thread_getpriority(pthread_t thread, int* priority)
{
#if defined(WIN32)
  int r = GetThreadPriority(thread.handle);
  if(THREAD_PRIORITY_ERROR_RETURN == r)
  return (int)GetLastError();
  *priority = r;
  return 0;
#else
  int policy;
  struct sched_param sched;
  int r = pthread_getschedparam(thread, &policy, &sched);
  if(0 == r)
  *priority = sched.sched_priority;
  return r;
#endif
}
static inline int thread_setpriority(pthread_t thread, int priority)
{
#if defined(WIN32)
  BOOL r = SetThreadPriority(thread.handle, priority);
  return TRUE==r?1:0;
#else
  int policy = SCHED_RR;
  struct sched_param sched;
  pthread_getschedparam(thread, &policy, &sched);
  // For processes scheduled under one of the normal scheduling policies 
  // (SCHED_OTHER, SCHED_IDLE, SCHED_BATCH), 
  // sched_priority is not used in scheduling decisions (it must be specified as 0).
  // Processes scheduled under one of the real-time policies(SCHED_FIFO, SCHED_RR) 
  // have a sched_priority value in the range 1 (low)to 99 (high)
  sched.sched_priority = (SCHED_FIFO==policy || SCHED_RR==policy) ? priority : 0;
  return pthread_setschedparam(thread, policy, &sched);
#endif
}
static inline pthread_t thread_self(void)
{
#if defined(WIN32)
  pthread_t t;
  t.handle = GetCurrentThread();
  t.id = GetCurrentThreadId();
  return t;
#else
  return pthread_self();
#endif
}
static inline tid_t thread_getid(pthread_t thread)
{
#if defined(WIN32)
  //return GetThreadId(thread.handle); // >= vista
  return thread.id;
#else
  return thread;
#endif
}
static inline int thread_isself(pthread_t thread)
{
#if defined(WIN32)
  return thread.id==GetCurrentThreadId() ? 1 : 0;
#else
  return pthread_equal(pthread_self(), thread);
#endif
}
static inline int thread_valid(pthread_t thread)
{
#if defined(WIN32)
  return 0 != thread.id ? 1 : 0;
#else
  return 0 != thread ? 1 : 0;
#endif
}
static inline int thread_yield(void)
{
#if defined(WIN32)
  // Causes the calling thread to yield execution to another thread that is ready to run 
  // on the current processor. The operating system selects the next thread to be executed.
  return SwitchToThread() ? 0 : -1;
#else
  return sched_yield();
#endif
}
#if defined(WIN32_XP)
typedef DWORD KPRIORITY;
typedef struct _CLIENT_ID
{
  PVOID UniqueProcess;
  PVOID UniqueThread;
} CLIENT_ID, *PCLIENT_ID;
typedef struct _THREAD_BASIC_INFORMATION
{
  NTSTATUS                ExitStatus;
  PVOID                   TebBaseAddress;
  CLIENT_ID               ClientId;
  KAFFINITY               AffinityMask;
  KPRIORITY               Priority;
  KPRIORITY               BasePriority;
} THREAD_BASIC_INFORMATION, *PTHREAD_BASIC_INFORMATION;
typedef NTSTATUS(__stdcall *NtQueryInformationThread)(HANDLE ThreadHandle, int ThreadInformationClass, PVOID ThreadInformation, ULONG ThreadInformationLength, PULONG ReturnLength);
static inline tid_t thread_getid_xp(HANDLE handle)
{
  // NT_TIB* tib = (NT_TIB*)__readfsdword(0x18);
  HMODULE module;
  THREAD_BASIC_INFORMATION tbi;
  memset(&tbi, 0, sizeof(tbi));
  module = GetModuleHandleA("ntdll.dll");
  NtQueryInformationThread fp = (NtQueryInformationThread)GetProcAddress(module, "NtQueryInformationThread");
  fp(handle, 0/*ThreadBasicInformation*/, &tbi, sizeof(tbi), NULL);
  return (tid_t)tbi.ClientId.UniqueThread;
}
#endif
#endif /* !_platform_thread_h_ */

event.h

#ifndef _platform_event_h_
#define _platform_event_h_
#if defined(WIN32)
#include <Windows.h>
typedef HANDLE  event_t;
#else
#include <sys/time.h> // gettimeofday
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h> // clock_gettime
typedef struct
{
  int count; // fixed pthread_cond_signal/pthread_cond_wait call order
  pthread_cond_t event;
  pthread_mutex_t mutex;
} event_t;
#ifndef WAIT_TIMEOUT
#define WAIT_TIMEOUT  ETIMEDOUT
#endif
#endif
/// event: Windows Event/Linux condition variable
/// multi-processor: no
//-------------------------------------------------------------------------------------
// int event_create(event_t* event);
// int event_destroy(event_t* event);
// int event_wait(event_t* event);
// int event_timewait(event_t* event, int timeout);
// int event_signal(event_t* event);
// int event_reset(event_t* event);
//-------------------------------------------------------------------------------------
static inline int event_create(event_t* event)
{
#if defined(WIN32)
    HANDLE h = CreateEvent(NULL, FALSE, FALSE, NULL);//自动复位,切初始无状态
  if(NULL==h)
  return (int)GetLastError();
  *event = h;
  return 0;
#else
  int r;
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_t attr;
#endif
    pthread_mutexattr_t mutex;
  pthread_mutexattr_init(&mutex);
#if defined(OS_LINUX)
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE_NP);
#else
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE);
#endif
  pthread_mutex_init(&event->mutex, &mutex);
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_init(&attr);
  pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
  r = pthread_cond_init(&event->event, &attr);
  pthread_condattr_destroy(&attr);
#else
  r = pthread_cond_init(&event->event, NULL);
#endif
  event->count = 0;
  return r;
#endif
}
static inline int event_destroy(event_t* event)
{
#if defined(WIN32)
  BOOL r = CloseHandle(*event);
  return r?0:(int)GetLastError();
#else
  int r = pthread_cond_destroy(&event->event);
  while(EBUSY == r)
  {
  usleep(1000);
  r = pthread_cond_destroy(&event->event);
  }
  pthread_mutex_destroy(&event->mutex);
  return r;
#endif
}
// 0-success, other-error
static inline int event_wait(event_t* event)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, INFINITE, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
  int r = 0;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_wait(&event->event, &event->mutex); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
// 0-success, WAIT_TIMEOUT-timeout, other-error
static inline int event_timewait(event_t* event, int timeout)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, timeout, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
#if defined(OS_LINUX) && defined(CLOCK_REALTIME)
  int r = 0;
  struct timespec ts;
#if defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  clock_gettime(CLOCK_MONOTONIC, &ts);
#else
  clock_gettime(CLOCK_REALTIME, &ts);
#endif
  ts.tv_sec += timeout/1000;
  ts.tv_nsec += (timeout%1000)*1000000;
#else
  int r = 0;
  struct timeval tv;
  struct timespec ts;
  gettimeofday(&tv, NULL);
  ts.tv_sec = tv.tv_sec + timeout/1000;
  ts.tv_nsec = tv.tv_usec * 1000 + (timeout%1000)*1000000;
#endif
  // tv_nsec >= 1000000000 ==> EINVAL
  ts.tv_sec += ts.tv_nsec / 1000000000;
  ts.tv_nsec %= 1000000000;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_timedwait(&event->event, &event->mutex, &ts); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_signal(event_t* event)
{
#if defined(WIN32)
  return SetEvent(*event)?0:(int)GetLastError();
#else
  int r;
  pthread_mutex_lock(&event->mutex);
  event->count = 1;
  r = pthread_cond_signal(&event->event);
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_reset(event_t* event)
{
#if defined(WIN32)
  return ResetEvent(*event)?0:(int)GetLastError();
#else
  pthread_mutex_lock(&event->mutex);
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return 0;
#endif
}
#endif /* !_platform_event_h_ */

thread-pool.c源文件

#include "thread-pool.h"
#include "locker.h"
#include "system.h"
#include "thread.h"
#include "event.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
struct _thread_pool_context_t;
//线程列表
typedef struct _thread_list_t
{
    struct _thread_list_t *next;//下一个线程
    struct _thread_pool_context_t *pool;//所属线程池
    pthread_t thread; //线程id和句柄
} thread_list_t;
//任务队列
typedef struct _thread_task_list_t
{
    struct _thread_task_list_t *next;//下一个任务
    thread_pool_proc proc; //要执行的任务
    void *param;//任务参数
} thread_task_list_t;
typedef struct _thread_pool_context_t
{
    int run;//是否运行的标志
    int idle_max;
    int threshold;
    int thread_count;//线程池中线程的数量
    int thread_count_min;//线程池最小的线程数
    int thread_count_max;//线程池最大的线程数
    int thread_count_idle;//闲置的线程
    int task_count;
    thread_task_list_t *tasks;//任务队列
    thread_task_list_t *recycle_tasks;//回收利用的任务队列(主要是用于内存作用)
    thread_list_t *task_threads;//线程池中的所有线程列表
    locker_t locker;//锁
    event_t event;//事件对象/条件变量
} thread_pool_context_t;
static void thread_pool_destroy_thread(thread_pool_context_t *context);
//线程执行逻辑(所有线程都一样)
static int STDCALL thread_pool_worker(void *param)
{
    thread_list_t* threads;
    thread_task_list_t *task;
    thread_pool_context_t *context;
    threads = (thread_list_t*)param;
    context = threads->pool;
    locker_lock(&context->locker);
    while(context->run)//一直运行中
    {
        // pop task
        task = context->tasks;
        while(task && context->run)//有任务且处于运行中(如果有任务,会一直执行任务)
        {
            // remove task from task list
            context->tasks = task->next;
            --context->task_count;
            // do task procedure
            --context->thread_count_idle;
            locker_unlock(&context->locker);
            task->proc(task->param);//执行任务
            locker_lock(&context->locker);
            ++context->thread_count_idle;
            // recycle task: push task to recycle list 将其放入到回收的任务队列中
            task->next = context->recycle_tasks;
            context->recycle_tasks = task;
            // do next
            task = context->tasks;
        }
        // delete idle thread
        if(context->thread_count_idle > context->idle_max
                || !context->run)
            break;
        // wait for task
        locker_unlock(&context->locker);
        event_timewait(&context->event, 60*1000);//1min内判断是否有任务,没有任务就超时不等待了,有任务了,就去获取任务。(自动变为无状态)
        locker_lock(&context->locker);
    }
    --context->thread_count;
    --context->thread_count_idle;
    thread_pool_destroy_thread(context);
    locker_unlock(&context->locker);
    return 0;
}
/**
 * @brief thread_pool_create_thread 创建线程
 * @param context
 * @return
 */
static thread_list_t* thread_pool_create_thread(thread_pool_context_t *context)
{
    thread_list_t* threads;
    threads = (thread_list_t*)malloc(sizeof(thread_list_t));
    if(!threads)
        return NULL;
    memset(threads, 0, sizeof(thread_list_t));
    threads->pool = context;
    if(0 != thread_create(&threads->thread, thread_pool_worker, threads))
    {
        free(threads);
        return NULL;
    }
    return threads;
}
/**
 * @brief thread_pool_destroy_thread 释放线程
 * @param context 线程池参数
 */
static void thread_pool_destroy_thread(thread_pool_context_t *context)
{
    thread_list_t **head;
    thread_list_t *next;
    head = &context->task_threads;
    while(*head)//从队列中找到要释放的当前线程
    {
        if(thread_isself((*head)->thread))
        {
            next = *head;
            *head = (*head)->next;
            free(next);
            break;
        }
        head = &(*head)->next;
    }
}
/**
 * @brief thread_pool_create_threads 创建所有的线程
 * @param context 线程池对象
 * @param num 创建线程的数量
 */
static void thread_pool_create_threads(thread_pool_context_t *context, 
                                       int num)
{
    int i;
    thread_list_t *threads;
    for(i=0; i<num; i++)
    {
        threads = thread_pool_create_thread(context);
        if(!threads)
            break;
        // add to list head 头插法
        threads->next = context->task_threads;
        context->task_threads = threads;
    }
    context->thread_count += i;
    context->thread_count_idle += i;
}
/**
 * @brief thread_pool_destroy_threads 删除所有线程,没有使用
 * @param threads
 */
static void thread_pool_destroy_threads(thread_list_t *threads)
{
    thread_list_t *next;
    while(threads)
    {
        next = threads->next;
        thread_destroy(threads->thread);
        free(threads);
        threads = next;
    }
}
/**
 * @brief thread_pool_create_task 往线程池中插入任务
 * @param context
 * @param proc
 * @param param
 * @return
 */
static thread_task_list_t* thread_pool_create_task(thread_pool_context_t *context,
                                                   thread_pool_proc proc,
                                                   void* param)
{
    thread_task_list_t *task;
    if(context->recycle_tasks)
    {
        task = context->recycle_tasks;
        context->recycle_tasks = context->recycle_tasks->next;
    }
    else
    {
        task = (thread_task_list_t*)malloc(sizeof(thread_task_list_t));
    }
    if(!task)
        return NULL;
    memset(task, 0, sizeof(thread_task_list_t));//清空内容
    task->param = param;
    task->proc = proc;
    return task;
}
/**
 * @brief thread_pool_destroy_tasks 销毁任务队列
 * @param tasks 任务队列
 */
static void thread_pool_destroy_tasks(thread_task_list_t *tasks)
{
    thread_task_list_t *next;
    while(tasks)
    {
        next = tasks->next;
        free(tasks);
        tasks = next;
    }
}
thread_pool_t thread_pool_create(int num, int min, int max)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)malloc(sizeof(thread_pool_context_t));
    if(!ctx)
        return NULL;
    memset(ctx, 0, sizeof(thread_pool_context_t));
    ctx->thread_count_min = min;
    ctx->thread_count_max = max;
    ctx->threshold = num / 2;
    ctx->idle_max = num;
    ctx->run = 1;//代表运行
    if(0 != locker_create(&ctx->locker))
    {
        free(ctx);
        return NULL;
    }
    if(0 != event_create(&ctx->event))
    {
        locker_destroy(&ctx->locker);
        free(ctx);
        return NULL;
    }
    thread_pool_create_threads(ctx, num);//创建所有的线程
    return ctx;
}
void thread_pool_destroy(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    ctx->run = 0;
    locker_lock(&ctx->locker);
    while(ctx->thread_count)
    {
        event_signal(&ctx->event);//通知所有线程退出
        locker_unlock(&ctx->locker);
        system_sleep(100);
        locker_lock(&ctx->locker);
    }
    locker_unlock(&ctx->locker);
    //thread_pool_destroy_threads(ctx->task_threads);
    thread_pool_destroy_tasks(ctx->recycle_tasks);
    thread_pool_destroy_tasks(ctx->tasks);
    event_destroy(&ctx->event);
    locker_destroy(&ctx->locker);
    free(ctx);
}
int thread_pool_threads_count(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    return ctx->thread_count;
}
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param)
{
    thread_task_list_t *task;
    thread_pool_context_t *context;
    context = (thread_pool_context_t*)pool;
    locker_lock(&context->locker);
    task = thread_pool_create_task(context, proc, param);//创建任务
    if(!task)
    {
        locker_unlock(&context->locker);
        return -1;
    }
    // 添加到任务队列中
    task->next = context->tasks;
    context->tasks = task;
    ++context->task_count;
    // 添加新线程来处理任务(如果没有闲置线程,并且还没达到最大线程数)
    if(context->thread_count_idle<1
            && context->thread_count<context->thread_count_max)
        thread_pool_create_threads(context, 1);
    event_signal(&context->event);//通知线程有任务了
    locker_unlock(&context->locker);
    return 0;
}
相关文章
|
2月前
|
存储 安全 数据管理
C语言之考勤模拟系统平台(千行代码)
C语言之考勤模拟系统平台(千行代码)
65 4
|
27天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
54 3
|
1月前
|
消息中间件 Unix Linux
【C语言】进程和线程详解
在现代操作系统中,进程和线程是实现并发执行的两种主要方式。理解它们的区别和各自的应用场景对于编写高效的并发程序至关重要。
68 6
|
1月前
|
存储 算法 程序员
C 语言递归算法:以简洁代码驾驭复杂逻辑
C语言递归算法简介:通过简洁的代码实现复杂的逻辑处理,递归函数自我调用解决分层问题,高效而优雅。适用于树形结构遍历、数学计算等领域。
|
1月前
|
消息中间件 存储 负载均衡
C 语言多线程编程:并行处理的利剑
C语言多线程编程是实现并行处理的强大工具,通过创建和管理多个线程,可以显著提升程序执行效率,尤其在处理大量数据或复杂计算时效果显著。
|
2月前
|
供应链 安全 NoSQL
PHP 互斥锁:如何确保代码的线程安全?
在多线程和高并发环境中,确保代码段互斥执行至关重要。本文介绍了 PHP 互斥锁库 `wise-locksmith`,它提供多种锁机制(如文件锁、分布式锁等),有效解决线程安全问题,特别适用于电商平台库存管理等场景。通过 Composer 安装后,开发者可以利用该库确保在高并发下数据的一致性和安全性。
47 6
|
2月前
|
存储 安全 物联网
C语言物联网开发之设备安全与代码可靠性隐患
物联网设备的C语言代码安全与可靠性至关重要。一是防范代码安全漏洞,包括缓冲区溢出和代码注入风险,通过使用安全函数和严格输入验证来预防。二是提高代码跨平台兼容性,利用`stdint.h`定义统一的数据类型,并通过硬件接口抽象与适配减少平台间的差异,确保程序稳定运行。
|
2月前
|
并行计算 算法 测试技术
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面,旨在通过综合策略提升程序性能,满足实际需求。
80 1
|
3月前
|
存储 搜索推荐 C语言
深入C语言指针,使代码更加灵活(二)
深入C语言指针,使代码更加灵活(二)
|
3月前
|
存储 程序员 编译器
深入C语言指针,使代码更加灵活(一)
深入C语言指针,使代码更加灵活(一)

热门文章

最新文章