nginx分布式锁以及accept锁简单整理

简介: nginx分布式锁以及accept锁简单整理

仅供个人学习整理,很多理解来自网络。

1:什么是锁,为什么需要锁?

当多个进程/线程需要共同操作一块共有资源时,如果不对这块资源加以保护,就会出现问题。

我的理解是,对共有资源加以保护,控制多个使用者对这块资源的访问机制,叫做锁。

2:临界资源的访问控制手段。

1:如过临界资源使用简单,可以相关原子操作函数。

2:加锁的方式: 自旋锁,互斥锁(条件变量 控制流程)

3:其他:读写锁,分布式锁等

3:初步了解nginx锁

1:nginx中的自旋锁 ngx_spinlock.c

void ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin)
{
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_uint_t  i, n;
    for ( ;; ) {
    //直接加锁成功
        if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
            return;
        }
        if (ngx_ncpu > 1) {
            for (n = 1; n < spin; n <<= 1) {
                //加个遍历  控制cpu探测锁释放的时间
                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }
        //已经成功获得该锁,给加锁
                if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
                    return;
                }
            }
        }
        ngx_sched_yield();
    }
#else
#if (NGX_THREADS)
#error ngx_spinlock() or ngx_atomic_cmp_set() are not defined !
#endif
#endif
}
void ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin);
#define ngx_trylock(lock)  (*(lock) == 0 && ngx_atomic_cmp_set(lock, 0, 1))
#define ngx_unlock(lock)    *(lock) = 0
//spinlock的适用   加锁,然后操作,最后解锁
        ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
        *ngx_thread_pool_done.last = task;
        ngx_thread_pool_done.last = &task->next;
        ngx_memory_barrier();
        ngx_unlock(&ngx_thread_pool_done_lock);

2:nginx中通过原子变量,信号量,文件fd加锁的方式实现一套互斥锁机制

nginx中互斥锁的文件:ngx_shmtx.h ngx_shmtx.c

1:锁的结构定义:

typedef struct {
    ngx_atomic_t   lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t   wait;
#endif
} ngx_shmtx_sh_t;
//相关锁的定义  
//信号量和原子变量实现互斥锁
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_atomic_t  *lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;
    sem_t          sem;
#endif
#else
    ngx_fd_t       fd;
    u_char        *name;
#endif
    ngx_uint_t     spin;
} ngx_shmtx_t;

其实经过拆分细化,可以看出,这里想要的定义是这样的:

//原子锁 +信号
{
  ngx_atomic_t  *lock;
  ngx_atomic_t  *wait;
  ngx_uint_t     semaphore;
  sem_t          sem;
  ngx_uint_t     spin;
}
//原子锁 
{
  ngx_atomic_t  *lock;
  ngx_uint_t     spin;  //标识信号量还是原子的方式而已
}
//文件锁
{
  ngx_fd_t       fd;
    u_char        *name;
    ngx_uint_t     spin;
}

2: 分析nginx互斥锁的实现逻辑

1: 原子变量的操作,实现自旋锁,
{
  ngx_atomic_t  *lock;
  ngx_uint_t     spin;  //自旋锁访问cpu控制
}
//如果满足原子操作不满足信号量的环境,这就是一个自旋锁
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;
    //没明白这个mtx->spin 为-1的场景,返回成功貌似没啥意义,
    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }
    mtx->spin = 2048;
  ...
}
//加锁函数 
void ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;
    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
    for ( ;; ) {
    //其实就是给这个原子变量赋值 参考spinlock
        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }
        //通过定义的spin大小对cpu进行轮询,探测是否能得到锁
        if (ngx_ncpu > 1) {
            for (n = 1; n < mtx->spin; n <<= 1) {
                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }
                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }
    //强制cpu让出
        ngx_sched_yield();
    }
}
//解锁函数:
void ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }
  //这里的解锁 只需要给原子变量赋值为0 就可以
    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}
2:信号量配合原子变量实现的互斥锁。
1: 相关信号量的接口:
#include<semaphore.h>
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
//sem_init用于初始化,sem参数指向应用程序分配的sem_t变量,shared如果为0那么初始化的信号量是在同一个进程的各个线程间共享的,否则是在进程共享的,value是分配给信号量的初始值。
//sem_destroy则用于销毁信号量。
//sem_wait用于给信号量减1操作,当信号量小于等于0的时候阻塞,直到信号量大于0。
//sem_post则是用于给信号量做加1操作。
2: 相关结构及操作:
数据结构和构造与析构
//原子锁 +信号
typedef struct {
    ngx_atomic_t   lock;
    ngx_atomic_t   wait;
} ngx_shmtx_sh_t;    //初始化时用
{
  ngx_atomic_t  *lock;    //原子变量标识锁的获取和释放
  ngx_atomic_t  *wait;    //表示有几个线程在公用这个锁
  ngx_uint_t     semaphore; //标记信号量启动成功
  sem_t          sem;     //信号量
  ngx_uint_t     spin;      
}
//初始化锁结构以及相关的信号量  
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;
    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }
    //cpu轮询遍历设置
    mtx->spin = 2048;
//#if (NGX_HAVE_POSIX_SEM)
    mtx->wait = &addr->wait;   //标记已经共享的线程个数
    if (sem_init(&mtx->sem, 1, 0) == -1) { //初始化信号量,标识在进程间共享
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,"sem_init() failed");
    } else {
        mtx->semaphore = 1;    //信号量创建成功的标志
    }
    return NGX_OK;
}
void ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
  //资源的释放,信号量的销毁
    if (mtx->semaphore) {
        if (sem_destroy(&mtx->sem) == -1) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_destroy() failed");
        }
    }
}
加锁和解锁操作:
//互斥锁加锁操作  要考虑多线程,加锁成功的场景,不成功的时候,等待信号触发
void ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;
    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
    for ( ;; ) {
    //先检查是否已经加锁,如果为0,表示没有加锁,进行加锁,第一个进入就会直接返回,加锁成功  
        //如果不为0,说明这是一个已经加锁的锁,由下文控制等待释放,获取锁,等待加锁成功
        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }
        if (ngx_ncpu > 1) {
            //这是一个自旋锁的逻辑 
            //先用自旋的逻辑判断,如果在一定时间内不满足,则用信号的方式,如果成功,直接返回
            for (n = 1; n < mtx->spin; n <<= 1) {
        //设置cpu等待
                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }
        //锁已经释放并且成功加锁,就返回了
                if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }
#if (NGX_HAVE_POSIX_SEM)
    //真正互斥锁信号处理逻辑
        if (mtx->semaphore) {
            (void) ngx_atomic_fetch_add(mtx->wait, 1); //等待锁的个数+1 与下文的ngx_shmtx_wakeup配合使用
            //检查是否可以获得锁,可以获得,则加锁后返回
            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                (void) ngx_atomic_fetch_add(mtx->wait, -1);
                return;
            }
            ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wait %uA", *mtx->wait);
      //等待信号的唤醒,,如果有唤醒,继续执行,唤醒多个的话,由上文的原子操作保证流程
            //相当于P操作 信号量等于0,则阻塞
            while (sem_wait(&mtx->sem) == -1) {
                ngx_err_t  err;
                err = ngx_errno;
                if (err != NGX_EINTR) {
                    ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "sem_wait() failed while waiting on shmtx");
                    break;
                }
            }
            ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke");
            continue;
        }
#endif
        ngx_sched_yield(); //强制让出cpu
    }
}
//释放锁的逻辑: 给锁的标识原子变量lock赋值0
//信号的操作
void ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }
    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}
//判断是否还有等待锁的进程,如果有,还需要发送信号
static void ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_uint_t  wait;
    if (!mtx->semaphore) {
        return;
    }
    for ( ;; ) {
        wait = *mtx->wait;
        //如果没有等待的线程,说明不用唤醒了,直接运行结束
        if ((ngx_atomic_int_t) wait <= 0) {
            return;
        }
        //等待信号的线程数减少一个,
        if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
            break;
        }
    }
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait);
    if (sem_post(&mtx->sem) == -1) { //释放信号量 给信号量加1,相当于V操作
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,"sem_post() failed while wake shmtx");
    }
#endif
}
3:当不满足原子变量的环境场景下,使用文件锁实现互斥锁。
//文件锁
{
  ngx_fd_t       fd;    //文件fd的标识
    u_char        *name;  //文件名的标识
    ngx_uint_t     spin;
}
//文件锁的创建 
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    //对入参作相关的校验,并重新初始化
    if (mtx->name) {
        if (ngx_strcmp(name, mtx->name) == 0) {
            mtx->name = name;
            return NGX_OK;
        }
        ngx_shmtx_destroy(mtx);
    }
  //调用底层open 可读可写可创建的方式打开该文件
    mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN, NGX_FILE_DEFAULT_ACCESS);
  //如果文件打开失败
    if (mtx->fd == NGX_INVALID_FILE) {
        ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno, ngx_open_file_n " \"%s\" failed", name);
        return NGX_ERROR;
    }
    //unlink文件,还没有调用close,所以这个文件还可以用 ==》
    //unlink函数删除文件,并且减少一个链接数。如果链接数达到0并且没有任何进程打开该文件,该文件内容才被真正删除
    //如果在unlilnk之前没有close,那么依旧可以访问文件内容。所以只是unlink了文件,文件的链接数为0,但是进程与文件还有访问关系,所以文件并没有被删除
    //在调用close时,内核会检查打开该文件的进程数,如果此数为0,进一步检查文件的链接数,如果这个数也为0,那么就删除文件内容。
    if (ngx_delete_file(name) == NGX_FILE_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", name);
    }
    mtx->name = name;
    return NGX_OK;
}
//这里才会调用文件的close,配合上面的unlink,检测真正删除文件
void ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
    if (ngx_close_file(mtx->fd) == NGX_FILE_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_close_file_n " \"%s\" failed", mtx->name);
    }
}
文件锁的加锁,以及解锁操作:struct flock 结构和 fcntl()

这里涉及到struct flock 结构和 fcntl()函数

struct flock {
    short l_type;/*F_RDLCK, F_WRLCK, or F_UNLCK */
    off_t l_start;/*offset in bytes, relative to l_whence */
    short l_whence;/*SEEK_SET, SEEK_CUR, or SEEK_END */
    off_t l_len;/*length, in bytes; 0 means lock to EOF */
    pid_t l_pid;/*returned with F_GETLK */
};
//第一个成员是加锁的类型:只读锁,读写锁,或是解锁
//l_start和l_whence用来指明加锁部分的开始位置,
//l_len是加锁的长度,
//l_pid是加锁进程的进程id
//使用 fcntl 操作文件描述词的一些特性
#include <unistd.h>    
#include <fcntl.h>
int fcntl(int fd, int cmd);
int fcntl(int fd, int cmd, long arg);
int fcntl(int fd, int cmd, struct flock * lock);
//cmd有如下:
//F_DUPFD 用来查找大于或等于参数arg 的最小且仍未使用的文件描述词, 并且复制参数fd 的文件描述词. 执行成功则返回新复制的文件描述词. 请参考dup2(). //F_GETFD 取得close-on-exec 旗标. 若此旗标的FD_CLOEXEC 位为0, 代表在调用exec()相关函数时文件将不会关闭.
//F_SETFD 设置close-on-exec 旗标. 该旗标以参数arg 的FD_CLOEXEC 位决定.
//F_GETFL 取得文件描述词状态旗标, 此旗标为open()的参数flags.
//F_SETFL 设置文件描述词状态旗标, 参数arg 为新旗标, 但只允许O_APPEND、O_NONBLOCK 和O_ASYNC 位的改变, 其他位的改变将不受影响.
//F_GETLK 取得文件锁定的状态.
//F_SETLK 设置文件锁定的状态. 此时flcok 结构的l_type 值必须是F_RDLCK、F_WRLCK 或F_UNLCK. 如果无法建立锁定, 则返回-1, 错误代码为EACCES 或EAGAIN.
//F_SETLKW 同F_SETLK 作用相同, 但是无法建立锁定时, 此调用会一直等到锁定动作成功为止. 若在等待锁定的过程中被信号中断时, 会立即返回-1, 错误代码为EINTR.
//通过fcntl实现文件锁的相关封装,立即返回
ngx_err_t ngx_trylock_fd(ngx_fd_t fd)
{
    struct flock  fl;
    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET; //SEEK_SET 文件的开头 SEEK_CUR: 当前位置 SEEK_END: 文件结尾
  //默认文件描述符是阻塞的,不成功就会一直等待,
    //F_SETLK 设置文件锁定的状态. 如果无法建立锁定, 则返回-1, 错误代码为EACCES 或EAGAIN.
    //此时flcok 结构的l_type 值必须是F_RDLCK、F_WRLCK 或F_UNLCK. 
    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return ngx_errno;
    }
    return 0;
}
//给文件fd加锁,直到成功
ngx_err_t ngx_lock_fd(ngx_fd_t fd)
{
    struct flock  fl;
    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;
  //F_SETLKW 同F_SETLK 作用相同, 但是无法建立锁定时, 此调用会一直等到锁定动作成功为止. 若在等待锁定的过程中被信号中断时, 会立即返回-1, 错误代码为EINTR.
    if (fcntl(fd, F_SETLKW, &fl) == -1) {
        return ngx_errno;
    }
    return 0;
}
ngx_err_t ngx_unlock_fd(ngx_fd_t fd)
{
    struct flock  fl;
    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_UNLCK;
    fl.l_whence = SEEK_SET;
    //设置解锁
    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return  ngx_errno;
    }
    return 0;
}

4:nginx accept锁

nginx是一个多进程服务器,当多个进程同时监听一个端口时,如果有一个外部连接进来,就会触发多个进程共同唤醒,但是实际处理只能有一个进程正常处理accept事件,这就是所谓的惊群。

其实在Linux2.6版本以后,内核内核已经解决了accept()函数的“惊群”问题,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程或线程。所以,如果服务器采用accept阻塞调用方式,在最新的Linux系统上,已经没有“惊群”的问题了。

nginx处理惊群问题:

我们先大概梳理一下 Nginx 的网络架构,几个关键步骤为:

  1. Nginx 主进程解析配置文件,根据 listen 指令,将监听套接字初始化到全局变量 ngx_cycle 的 listening 数组之中。此时,监听套接字的创建、绑定工作早已完成。
  2. Nginx 主进程 fork 出多个子进程。
  3. 每个子进程在 ngx_worker_process_init 方法里依次调用各个 Nginx 模块的 init_process 钩子,其中当然也包括 NGX_EVENT_MODULE 类型的 ngx_event_core_module 模块,其 init_process 钩子为 ngx_event_process_init。
  4. ngx_event_process_init 函数会初始化 Nginx 内部的连接池,并把 ngx_cycle 里的监听套接字数组通过连接池来获得相应的表示连接的 ngx_connection_t 数据结构,这里关于 Nginx 的连接池先略过。我们主要看 ngx_event_process_init 函数所做的另一个工作:如果在配置文件里没有开启accept_mutex锁,就通过 ngx_add_event 将所有的监听套接字添加到 epoll 中。
  5. 每一个 Nginx 子进程在执行完 ngx_worker_process_init 后,会在一个死循环中执行 ngx_process_events_and_timers,这就进入到时间处理的核心逻辑了。
  6. 在 ngx_process_events_and_timers 中,如果在配置文件里开启了 accept_mutex 锁,子进程就会去获取 accept_mutex 锁。如果获取成功,则通过 ngx_enable_accept_events 将监听套接字添加到 epoll 中,否则,不会将监听套接字添加到 epoll 中,甚至有可能会调用 ngx_disable_accept_events 将监听套接字从 epoll 中删除(如果在之前的连接中,本worker子进程已经获得过accept_mutex锁)。
  7. ngx_process_events_and_timers 继续调用 ngx_process_events,在这个函数里面阻塞调用 epoll_wait。
    如果配置文件中没有开启 accept_mutex,则所有的监听套接字不管三七二十一,都加入到 epoll中,这样当一个新的连接来到时,所有的 worker 子进程都会惊醒。
    如果配置文件中开启了 accept_mutex,则只有一个子进程会将监听套接字添加到 epoll 中,这样当一个新的连接来到时,当然就只有一个 worker 子进程会被唤醒了。

源码分析:

网络事件入口,只有在accept的时候加入,开始监听这一个。

//每个worker进程都会死循环执行的时间处理循环函数 
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
  ....
  //如果定义了accept锁
    if (ngx_use_accept_mutex) {
        //ngx_accept_disabled = nginx单进程的所有连接总数 / 8 -空闲连接数量,当ngx_accept_disabled大于0时,不会去尝试获取accept_mutex锁,ngx_accept_disable越大,于是让出的机会就越多,这样其它进程获取锁的机会也就越大。不
        //ngx_accept_disabled  为正数时,触发负载均衡,不再获取accept锁,但是会-1,说明运行时间负载降低
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;
        } else {
            //尝试获取accept锁 这个是重点
            //没有获取到 直接返回,负责加入一个NGX_READ_EVENT事件
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }
      //获取成功 设置flag
            if (ngx_accept_mutex_held) {
               // 如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。 
               // 这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。 
               // 因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁, 
               // 导致其他进程无法获取锁,这样accept的效率就低了。 
                flags |= NGX_POST_EVENTS;
            } else {//设置失败的场景 
                if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay)
                {
                    //没有获得所得进程,当然不需要NGX_POST_EVENTS标志。 
                  //但需要设置延时多长时间,再去争抢锁。
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }
    delta = ngx_current_msec;
  //所有事件处理的入口 :epoll要开始wait事件, 
    //ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数
    //只是post到队列中
    (void) ngx_process_events(cycle, timer, flags);
  //统计本次wait事件的耗时 
    delta = ngx_current_msec - delta;
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"timer delta: %M", delta);
  // ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。 
    //前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列
    ngx_event_process_posted(cycle, &ngx_posted_accept_events); //accept 延迟事件队列 处理对应的handler 其实就是ngx_event_accept函数
  //所有accept事件处理完之后,如果持有锁的话,就释放掉
    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }
    if (delta) {
        ngx_event_expire_timers();
    }
  //处理普通事件(连接上获得的读写事件), 因为每个事件都有自己的handler方法
    ngx_event_process_posted(cycle, &ngx_posted_events);//普通延迟事件队列 处理对应的handler
}
//尝试获取accept锁
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    //尝试加锁
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex locked");
        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }
    //加入事件中
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }
        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;
        return NGX_OK;
    }
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held);
  //获取锁失败  将当前子进程中已经处理结束的相关事件移除 
    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }
        ngx_accept_mutex_held = 0;
    }
    return NGX_OK;
}

了解到一些nginx锁相关的知识,简单做一下汇总,遗留的部分不清晰的源码问题,后续更新吧

目录
相关文章
|
8月前
|
NoSQL 算法 安全
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
318 0
|
22天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
51 10
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
258 0
|
5月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
8月前
|
缓存 NoSQL Java
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(一)
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson
99 0
|
6月前
|
负载均衡 NoSQL Java
|
5月前
|
算法
分布式锁设计问题之重建节点锁信息时要分为多个阶段如何解决
分布式锁设计问题之重建节点锁信息时要分为多个阶段如何解决
|
5月前
分布式锁设计问题之节点A向节点C发起对R1的加锁请求如何解决
分布式锁设计问题之节点A向节点C发起对R1的加锁请求如何解决
|
5月前
|
NoSQL Go API
[go 面试] 为并发加锁:保障数据一致性(分布式锁)
[go 面试] 为并发加锁:保障数据一致性(分布式锁)