MariaDB · 源码分析 · thread pool

简介: 1. thread pool 简介MariaDB 共有三种线程调度方式 one-thread-per-connection 每个连接一个线程 no-threads 所有连接共用一个线程 pool-of-threads 线程池 no-threads 只适用于简单的系统,并发数稍高性能就会严重下降one-thread-per-connection 在多数情况下性能优良,是个合适的选择,生产系统也常用此配置。

1. thread pool 简介

MariaDB 共有三种线程调度方式

  • one-thread-per-connection 每个连接一个线程

  • no-threads 所有连接共用一个线程

  • pool-of-threads 线程池

no-threads 只适用于简单的系统,并发数稍高性能就会严重下降

one-thread-per-connection 在多数情况下性能优良,是个合适的选择,生产系统也常用此配置。但在高并发、短连接的业务场景下,使用 one-thread-per-connection 会频繁得创建和销毁线程,严重影响性能

pool-of-threads 适用于高并发短连接的业务场景,线程复用,避免频繁创建和销毁线程带来的性能损耗

MariaDB 的 thread pool 在 win 和 unix 系统的实现不同,本文分析 unix 系统下的实现

thread pool 由若干个 thread_group 组成,每个 thread_group 有若干个 worker 线程,和 0~1 个 listenr 线程

server 接收到连接请求时,将这个连接分配给一个 group 处理,listener 线程负责监听请求,worker 线程处理请求内容

2.代码概览

struct scheduler_functions 内部声明了连接建立相关的属性和方法,这里使用函数指针实现多态

每种线程调度方式,分别实例化一个struct scheduler_functions,给相关变量和函数指针赋值

thread pool 相关实现在 sql/threadpool_common.cc 和 sql/threadpool_unix.cc 中

sql/scheduler.h

struct scheduler_functions
{
  uint max_threads, *connection_count;
  ulong *max_connections;
  bool (*init)(void);
  bool (*init_new_connection_thread)(void);                                                                                                                                                                 
  void (*add_connection)(THD *thd);
  void (*thd_wait_begin)(THD *thd, int wait_type);
  void (*thd_wait_end)(THD *thd);
  void (*post_kill_notification)(THD *thd);
  bool (*end_thread)(THD *thd, bool cache_thread);
  void (*end)(void);
};

-------------------------------------------------

sql/threadpool_common.cc

static scheduler_functions tp_scheduler_functions=                                                                                                                                                          
{
  0,                                  // max_threads
  NULL,
  NULL,
  tp_init,                            // init
  NULL,                               // init_new_connection_thread
  tp_add_connection,                  // add_connection
  tp_wait_begin,                      // thd_wait_begin
  tp_wait_end,                        // thd_wait_end
  post_kill_notification,             // post_kill_notification
  NULL,                               // end_thread
  tp_end                              // end
};

  • tp_init

初始化 thread_group

main

  mysqld_main
    
    network_init
    
      /* 调用相应 thread_scheduler 下的 init 函数,这里是 tp_init */
      MYSQL_CALLBACK_ELSE(thread_scheduler, init, (), 0)
      tp_init
      
        /* thread_group_t 申请内存 */
        all_groups= (thread_group_t *)
          my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
          
        /* 初始化 thread_group_t 数组 */
        for (uint i= 0; i < threadpool_max_size; i++) 
        {
          thread_group_init(&all_groups[i], get_connection_attrib());  
        }
		
		/* 设置最大执行时间,worker thread */
		pool_timer.tick_interval= threadpool_stall_limit;
        
        /* 开启 timer thread*/
        start_timer(&pool_timer);          
          

  • tp_add_connection

创建 connection,将 connection 分配到一个 thread_group,插入 group 队列

唤醒或创建一个 worker thread,如果 group 中没有活跃的 worker thread

main

  mysqld_main
  
    handle_connections_sockets
    
      create_new_thread
      
        /* 调用相应 thread_scheduler 下的 add_connection 函数,这里是 tp_add_connection */
        MYSQL_CALLBACK(thd->scheduler, add_connection, (thd));
        tp_add_connection
        
          /* 申请一个 connection */
          connection_t *connection= alloc_connection(thd);
          
          /* round-robin 映射到一个 group */
          thread_group_t *group= 
            &all_groups[thd->thread_id%group_count];
            
          queue_put(group, connection)
          
            /* connection 插入队列 */
            thread_group->queue.push_back(connection);
            
            /*
              如果没有活跃 woker 线程,唤醒一个处理新连接 
              此时连接尚未 login,认证等工作在 worker 线程中完成
            */
            if (thread_group->active_thread_count == 0)
              wake_or_create_thread(thread_group);                                        

wake_or_create_thread

  /* 取出 thread_group->waiting_threads 中的 waiting thread (如有) */
  if (wake_thread(thread_group) == 0)
    DBUG_RETURN(0);
  
  /* 
    没有活跃线程时,立即创建一个,在其他地方也有这个逻辑
    保证每个 thread_group 只是存在一个活跃线程
  */
  if (thread_group->active_thread_count == 0)
    DBUG_RETURN(create_worker(thread_group));
  
  /* 
    对创建新的 woker thread 限流,距离上个线程创建间隔一定时间才允许再次创建 worker thread
    这个 group 线程总数越多,要求的间隔越长
    4线程以下间隔为0,大于等于 4/8/16 线程时,要求间隔分别为 50*1000/100*1000/200*1000 微秒
  */  
  if (time_since_last_thread_created >
       microsecond_throttling_interval(thread_group))
  {
    DBUG_RETURN(create_worker(thread_group));
  }
  
  
  • worker_main

从 queue 中取出和处理 event

在 group 没有 listener 时会变成 listener

没取到 event 时,进入休眠状态,等待被唤醒

最后进入休眠状态的 worker thread 最先被唤醒

worker_main

  for(;;)
  {                                                                                                                                                              
    connection = get_event(&this_thread, thread_group, &ts);
    
    /* 没有 event 时,跳出循环,结束 worker thread */
    if (!connection)
      break;
    
    handle_event(connection);
  }


get_event

  for(;;)
  {
    /* 
      取出 tp_add_connection/listener 插入的 connection
    */
    connection = queue_get(thread_group);
    
    /* 取到 connection,跳出循环,进入下一步 handle_event */
    if(connection)
      break;
    
    /* 如果没有 listener thread,这个线程变为 listener */
    if(!thread_group->listener)
    {
      /* listener 变为 worker 处理一个 connection */
      connection = listener(current_thread, thread_group);
      
      /* listener 已变为 worker */
      thread_group->listener= NULL;

      /* 跳出循环,进入 handle_event 处理 connection */
      break;
    }
    
    /* 进入休眠状态前,尝试非阻塞方式获取一个 connection */
    if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
    {
      thread_group->io_event_count++;
      connection = (connection_t *)native_event_get_userdata(&nev);
      break;
    }
    
    /*
      进入休眠状态
      最后休眠的 worker thread 最先被唤醒,更容易命中 cache
    */
    current_thread->woken = false;
    thread_group->waiting_threads.push_front(current_thread);
    
    /* 等待被唤醒 */
    mysql_cond_wait
  }
  
  /* 返回获取到的 connection */
  DBUG_RETURN(connection);


handle_event

  if (!connection->logged_in)
  {
    /* login connection */
    err= threadpool_add_connection(connection->thd);
      
      login_connection
    
    connection->logged_in= true;
  }
  else
  {
    /* 处理 connection 请求 */
    err= threadpool_process_request(connection->thd);
    
      do_command
  }
  
  /* 
    告诉客户端可以读
    可能会变动 connection 所属的 group 
  */
  err= start_io(connection);
  
    /* 
      group_count 允许动态改变,所以处理 connection 的 group 可能发生变动
      这里检查 group 是否需要变动
    */
    thread_group_t *group = 
      &all_groups[connection->thd->thread_id%group_count];
      
    if (group != connection->thread_group)
      change_group(connection, connection->thread_group, group)
    
    return io_poll_start_read(group->pollfd, fd, connection);
  
  • listener

listener 线程通过 epoll_wait 监听 group 关联的描述符,epoll使用一个文件描述符管理多个描述符

监听获得的 event 插入队列,如果插入前队列为空,listener 变成 worker 线程处理第一个event,其余插入队列,否则所有 event 插入队列

如果 group 中没有活跃线程,唤醒或者创建一个 worker 线程

这里考虑一种情况,如果监听只获得一个 event 且队列为空,那么这个 event 将会被 listener 处理,队列中不会插入新的 event,此时只需要 listener 变成 worker 线程处理 event,不需要再唤醒其他 worker 线程

listener

  for(;;)
  {
    /* 监听事件 */
    cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
    
    /* 如果队列为空,listener 处理第一个事件,一定概率(只有一个事件)可以不再唤醒 worker thread */
    bool listener_picks_event= thread_group->queue.is_empty();
    
    
    /* 第一个事件留给 listener 处理,其余放入队列,或者全部放入队列 */
    for(int i=(listener_picks_event)?1:0; i < cnt ; i++) 
    {
      connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
      thread_group->queue.push_back(c);
    }
    
    if (listener_picks_event)
    {
      /* Handle the first event. */
      retval= (connection_t *)native_event_get_userdata(&ev[0]);
      mysql_mutex_unlock(&thread_group->mutex);
      break;
    }
    
    /* 
      队列中已经有一些 event,但是没有活跃线程
      唤醒一个 worker 线程处理队列中 event
      唤醒失败,并且仅有一个线程(仅有listener),创建一个 worker 线程
    */
    if(thread_group->active_thread_count==0)
    {    
      if(wake_thread(thread_group))
      {    
        if(thread_group->thread_count == 1)
          create_worker(thread_group);   
      }    
    }
    
  }
  
  DBUG_RETURN(retval);

  • timer_thread

检查 listener、worker thread 的运行情况和 connection 超时情况,每隔 thread_pool_stall_limit 检查一次

如果 listener 不存在,且检查周期内没有监听到新的 event,则创建一个 worker thread(自动变成 listener)

如果没有活跃线程(运行状态的 worker thread),且检查周期内没有处理 event,创建一个 worker thread

关闭长时间空闲的 connection

timer_thread

  for(;;)
  {
    /*
      等待一个 tick_interval,即 thread_pool_stall_limit
      正常情况下回等待至超时,只有 stop_timer 会发出 timer->cond 信号
    */
    set_timespec_nsec(ts,timer->tick_interval*1000000);
    err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
    
    /* ETIMEDOUT 等待超时,代表 timer 没有被 shutdown */
    if (err == ETIMEDOUT)
    {
      timer->current_microtime= microsecond_interval_timer();
      
      /* 遍历 thread_group,查看是否 stall */
      for (i= 0; i < threadpool_max_size; i++)
      {
        if(all_groups[i].connection_count)
           check_stall(&all_groups[i]);
      }
      
      /* 关闭空闲连接 */
      if (timer->next_timeout_check <= timer->current_microtime)
        timeout_check(timer);
    }
  }

check_stall

  /*
    thread_group->io_event_count 代表插入队列的 event 数量
    thread_group 没有 listener,且没有 event 在队列中,这个检查周期内 group 没有监听连接信息
    唤醒或者创建一个 worker 线程,这个线程会自动成为 listener
  */
  if (!thread_group->listener && !thread_group->io_event_count)                                                                                                                                             
  {
    wake_or_create_thread(thread_group);
    return;
  }
  
  /* Reset io event count */
  thread_group->io_event_count= 0;
  
  /*
    thread_group->queue_event_count 代表已经出队的 event 数量
    队列非空,并且出队 event 数量为0,这个检查周期内 worker thread 没有处理 event
    唤醒或创建一个 worker thread, thread_group 标记为 stalled, 下个 event 出队时 reset stalled标记
  */
  if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
  {
    thread_group->stalled= true;
    wake_or_create_thread(thread_group);
  }
  
  /* Reset queue event count */
  thread_group->queue_event_count= 0;


  • tp_wait_begin/tp_wait_end

线程进入阻塞状态前/阻塞状态结束后,分别调用者两个函数汇报状态,用于维护 active_thread_count 即活跃线程数量

tp_wait_begin 将活跃线程数 -1,活跃线程为 0 时,唤醒或创建一个线程

tp_wait_eng 将活跃线程数 +1

这里需要注意,不是所有的等待都需要调用 tp_wait_begin,预期内短时间结束的等待,比如 mutex,可以不调用 tp_wait_begin

行锁或者磁盘IO这种长时间的等待,需要调用 tp_wait_begin,汇报这个线程暂时无法处理请求,需要开启新的线程

线程池和连接池

线程池是在 server 端的优化,避免频繁的创建和销毁线程

连接池是在 client 端的优化,减少连接创建时间,节省资源

连接池和线程池是两个不同的概念,可以同时使用

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
Java 关系型数据库 MySQL
|
算法 Oracle 关系型数据库
|
3月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
803 152
|
NoSQL 关系型数据库 MySQL
阿里云RDS关系型数据库大全_MySQL版、PolarDB、PostgreSQL、SQL Server和MariaDB等
阿里云RDS关系型数据库如MySQL版、PolarDB、PostgreSQL、SQL Server和MariaDB等,NoSQL数据库如Redis、Tair、Lindorm和MongoDB
493 0
|
3月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。
|
3月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎,提供高性价比、稳定安全的云数据库服务,适用于多种行业与业务场景。
|
关系型数据库 Java MySQL
Linux安装JDK1.8 & tomcat & MariaDB(MySQL删减版)
本教程提供了在Linux环境下安装JDK1.8、Tomcat和MariaDB的详细步骤。这三个组件的组合为Java Web开发和部署提供了一个强大的基础。通过遵循这些简单的指导步骤,您可以轻松建立起一个稳定、高效的开发和部署环境。希望这个指导对您的开发工作有所帮助。
470 8
|
缓存 关系型数据库 MySQL
error: Failed dependencies: mariadb-connector-c-config is obsoleted by mysql-community-server-8.0.36-1.el7.x86_64 问题解决
error: Failed dependencies: mariadb-connector-c-config is obsoleted by mysql-community-server-8.0.36-1.el7.x86_64 问题解决
976 19
|
SQL 关系型数据库 MySQL
如何在 MySQL 或 MariaDB 中导入和导出数据库
如何在 MySQL 或 MariaDB 中导入和导出数据库
1165 0
|
SQL Ubuntu 关系型数据库
如何在云服务器上创建和管理 MySQL 和 MariaDB 数据库
如何在云服务器上创建和管理 MySQL 和 MariaDB 数据库
244 0

推荐镜像

更多