前言
上周出现了几次连接超时、连接池满还有dbc连接事务模板失败的问题。所以有必要深入了解下MySQL的连接过程。
正好,上周研究了怎么用Clion调试MySQL源码,接下来通过调试来研究一下吧。
服务端
启动
sql/main.cc
extern int mysqld_main(int argc, char **argv);
int main(int argc, char **argv) { return mysqld_main(argc, argv); }
- main:入口文件,仅调用了mysqld_main函数
sql/mysqld.cc
int mysqld_main(int argc, char **argv)
#endif
{
if (my_init()) // init my_sys library & pthreads
{
...
}
...
if (load_defaults(MYSQL_CONFIG_NAME, load_default_groups, &argc, &argv,
&argv_alloc)) {
...
}
mysqld_socket_acceptor->connection_event_loop();
mysqld_exit(signal_hand_thr_exit_code);
}
- mysql_main:MySQL服务端启动逻辑的主要处理函数
- my_init:系统库和线程初始化
- load_defaults:加载my.cnf各参数
- connection_event_loop:循环监听套接字。
sql/conn_handler/connection_acceptor.h
/**
Connection acceptor loop to accept connections from clients.
*/
void connection_event_loop() {
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
while (!connection_events_loop_aborted()) {
Channel_info *channel_info = m_listener->listen_for_connection_event();
if (channel_info != NULL) mgr->process_new_connection(channel_info);
}
}
- connection_event_loop:通过socket_connection.cc::listen_for_connection_event循环监听,直到有新的连接,开始connection_handler_manager.cc::process_new_connection新连接的处理过程。
新连接
服务端一直处于监听状态,当有新连接请求时,调用process_new_connection处理新连接。
sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
if (connection_events_loop_aborted() ||
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
delete channel_info;
return;
}
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}
- connection_events_loop_aborted:先判断是否已取消监听
- check_and_incr_conn_count:再判断(会加锁)是否现有连接数是否大于连接最大值(连接池满),未满,则将线程数加一,满了则拒绝连接。(注意,这里的判断逻辑使MySQL的实际最大连接数是max_connections + 1)
- add_connection:调用add_connection添加连接
sql/conn_handler/connection_handler_pre_thread.cc
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
int error = 0;
my_thread_handle id;
DBUG_ENTER("Per_thread_connection_handler::add_connection");
// Simulate thread creation for test case before we check thread cache
DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);
if (!check_idle_thread_and_enqueue_connection(channel_info))
DBUG_RETURN(false);
/*
There are no idle threads avaliable to take up the new
connection. Create a new thread to handle the connection
*/
channel_info->set_prior_thr_create_utime();
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
#ifndef DBUG_OFF
handle_error:
#endif // !DBUG_OFF
if (error) {
...
//错误处理,略
}
Global_THD_manager::get_instance()->inc_thread_created();
DBUG_PRINT("info", ("Thread created"));
DBUG_RETURN(false);
}
- 调用check_idle_thread_and_enqueue_connection查看是否有空闲的线程,有则将本次连接信息加入等待队列,并给空闲线程发送唤醒信号;否则新建线程处理本次连接
- 在新线程中,调用handle_connection函数开始进行逻辑处理。
static void *handle_connection(void *arg) {
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Connection_handler_manager *handler_manager =
Connection_handler_manager::get_instance();
Channel_info *channel_info = static_cast<Channel_info *>(arg);
bool pthread_reused MY_ATTRIBUTE((unused)) = false;
if (my_thread_init()) {
...
//错误处理,略
}
for (;;) {
THD *thd = init_new_thd(channel_info);
if (thd == NULL) {
...
//错误处理,略
}
#ifdef HAVE_PSI_THREAD_INTERFACE
if (pthread_reused) {
...
//错误处理,略
}
#endif
#ifdef HAVE_PSI_THREAD_INTERFACE
/* Find the instrumented thread */
PSI_thread *psi = PSI_THREAD_CALL(get_thread)();
/* Save it within THD, so it can be inspected */
thd->set_psi(psi);
#endif /* HAVE_PSI_THREAD_INTERFACE */
mysql_thread_set_psi_id(thd->thread_id());
mysql_thread_set_psi_THD(thd);
mysql_socket_set_thread_owner(
thd->get_protocol_classic()->get_vio()->mysql_socket);
thd_manager->add_thd(thd);
if (thd_prepare_connection(thd))
handler_manager->inc_aborted_connects();
else {
while (thd_connection_alive(thd)) {
if (do_command(thd)) break;
}
end_connection(thd);
}
close_connection(thd, 0, false, false);
thd->get_stmt_da()->reset_diagnostics_area();
thd->release_resources();
// Clean up errors now, before possibly waiting for a new connection.
#ifndef HAVE_WOLFSSL
#if OPENSSL_VERSION_NUMBER < 0x10100000L
ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
#endif
thd_manager->remove_thd(thd);
Connection_handler_manager::dec_connection_count();
#ifdef HAVE_PSI_THREAD_INTERFACE
/*
Delete the instrumentation for the job that just completed.
*/
thd->set_psi(NULL);
PSI_THREAD_CALL(delete_current_thread)();
#endif /* HAVE_PSI_THREAD_INTERFACE */
delete thd;
// Server is shutting down so end the pthread.
if (connection_events_loop_aborted()) break;
channel_info = Per_thread_connection_handler::block_until_new_connection();
if (channel_info == NULL) break;
pthread_reused = true;
if (connection_events_loop_aborted()) {
...
//错误处理,略
}
}
my_thread_end();
my_thread_exit(0);
return NULL;
}
- 会对连接进行thd_prepare_connection预处理操作,没问题后继续下面的逻辑。
- 当连接未被关闭,就会一直do_command处理请求。
- 当连接关闭,则走下面关闭逻辑
执行
sql/sql_parse.cc
bool do_command(THD *thd) {
bool return_value;
int rc;
NET *net = NULL;
enum enum_server_command command;
COM_DATA com_data;
DBUG_ENTER("do_command");
DBUG_ASSERT(thd->is_classic_protocol());
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
*/
thd->lex->set_current_select(0);
/*
XXX: this code is here only to clear possible errors of init_connect.
Consider moving to prepare_new_connection_state() instead.
That requires making sure the DA is cleared before non-parsing statements
such as COM_QUIT.
*/
thd->clear_error(); // Clear error message
thd->get_stmt_da()->reset_diagnostics_area();
/*
This thread will do a blocking read from the client which
will be interrupted when the next command is received from
the client, the connection is closed or "net_wait_timeout"
number of seconds has passed.
*/
net = thd->get_protocol_classic()->get_net();
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
net_new_transaction(net);
/*
Synchronization point for testing of KILL_CONNECTION.
This sync point can wait here, to simulate slow code execution
between the last test of thd->killed and blocking in read().
The goal of this test is to verify that a connection does not
hang, if it is killed at this point of execution.
(Bug#37780 - main.kill fails randomly)
Note that the sync point wait itself will be terminated by a
kill. In this case it consumes a condition broadcast, but does
not change anything else. The consumed broadcast should not
matter here, because the read/recv() below doesn't use it.
*/
DEBUG_SYNC(thd, "before_do_command_net_read");
/*
Because of networking layer callbacks in place,
this call will maintain the following instrumentation:
- IDLE events
- SOCKET events
- STATEMENT events
- STAGE events
when reading a new network packet.
In particular, a new instrumented statement is started.
See init_net_server_extension()
*/
thd->m_server_idle = true;
rc = thd->get_protocol()->get_command(&com_data, &command);
thd->m_server_idle = false;
if (rc) {
...
//错误处理,略
}
char desc[VIO_DESCRIPTION_SIZE];
vio_description(net->vio, desc);
DBUG_PRINT("info", ("Command on %s = %d (%s)", desc, command,
command_name[command].str));
DBUG_PRINT("info", ("packet: '%*.s'; command: %d",
thd->get_protocol_classic()->get_packet_length(),
thd->get_protocol_classic()->get_raw_packet(), command));
if (thd->get_protocol_classic()->bad_packet)
DBUG_ASSERT(0); // Should be caught earlier
// Reclaim some memory
thd->get_protocol_classic()->get_output_packet()->shrink(
thd->variables.net_buffer_length);
/* Restore read timeout value */
my_net_set_read_timeout(net, thd->variables.net_read_timeout);
return_value = dispatch_command(thd, &com_data, command);
thd->get_protocol_classic()->get_output_packet()->shrink(
thd->variables.net_buffer_length);
out:
/* The statement instrumentation must be closed in all cases. */
DBUG_ASSERT(thd->m_digest == NULL);
DBUG_ASSERT(thd->m_statement_psi == NULL);
DBUG_RETURN(return_value);
}
- 主要的处理逻辑为dispatch_command,根据不同的command类型进行分发。
/**
Perform one connection-level (COM_XXXX) command.
@param thd connection handle
@param command type of command to perform
@param com_data com_data union to store the generated command
@todo
set thd->lex->sql_command to SQLCOM_END here.
@todo
The following has to be changed to an 8 byte integer
@retval
0 ok
@retval
1 request of thread shutdown, i. e. if command is
COM_QUIT
*/
bool dispatch_command(THD *thd, const COM_DATA *com_data,
enum enum_server_command command) {
... //太长不看
switch (command) {
case ... //太长不看
case COM_QUERY: {
... //太长不看
mysql_parse(thd, &parser_state);
... //太长不看
DBUG_PRINT("info", ("query ready"));
break;
}
case ... //太长不看
default:
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
break;
}
}
- 主要看COM_QUERY这个逻辑,我们要用到的DDL、DML都会走这个流程,这个流程中主要是调用mysql_parse方法
/**
Parse a query.
@param thd Current session.
@param parser_state Parser state.
*/
void mysql_parse(THD *thd, Parser_state *parser_state) {
... //太长不看
mysql_reset_thd_for_next_command(thd);
if (!err) {
err = parse_sql(thd, parser_state, NULL);
... //太长不看
}
if (!err) {
mysql_rewrite_query(thd);
... //太长不看
}
if (!err) {
...
error = mysql_execute_command(thd, true);
...
}
}
- 主要是SQL语法解析和执行
- mysql_reset_thd_for_next_command是对下一次执行做准备,重置线程各变量
- mysql_rewrite_query看着像是SQL优化?待定 还没追进去,记个TODO
- 词法解析前不应该有缓存吗?没有找到缓存的逻辑,记个TODO
关闭连接
sql/conn_handler/connection_handler_pre_thread.cc
Channel_info *Per_thread_connection_handler::block_until_new_connection() {
Channel_info *new_conn = NULL;
mysql_mutex_lock(&LOCK_thread_cache);
if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) {
/* Don't kill the pthread, just block it for reuse */
DBUG_PRINT("info", ("Blocking pthread for reuse"));
/*
mysys_var is bound to the physical thread,
so make sure mysys_var->dbug is reset to a clean state
before picking another session in the thread cache.
*/
DBUG_POP();
DBUG_ASSERT(!_db_is_pushed_());
// Block pthread
blocked_pthread_count++;
while (!connection_events_loop_aborted() && !wake_pthread && !shrink_cache)
mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
blocked_pthread_count--;
if (shrink_cache && blocked_pthread_count <= max_blocked_pthreads) {
mysql_cond_signal(&COND_flush_thread_cache);
}
if (wake_pthread) {
wake_pthread--;
if (!waiting_channel_info_list->empty()) {
new_conn = waiting_channel_info_list->front();
waiting_channel_info_list->pop_front();
DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
} else {
DBUG_ASSERT(0); // We should not get here.
}
}
}
mysql_mutex_unlock(&LOCK_thread_cache);
return new_conn;
}
- 如果阻塞的线程数小于最大阻塞线程数,则此线程不回收,而是进入阻塞状态(等待),等待新连接来的时候重复使用。
- 否则关闭线程。