Nginx学习笔记之事件驱动框架处理流程
ngx_event_core_module模块的ngx_event_process_init方法对事件模块做了一些初始化。其中包括将“请求连接”这样一个读事件对应的处理方法(handler)设置为ngx_event_accept函数,并将此事件添加到epoll模块中。当有新连接事件发生时,ngx_event_accept就会被调用。大致流程是这样:
worker进程在ngx_worker_process_cycle方法中不断循环调用ngx_process_events_and_timers函数处理事件,这个函数是事件处理的总入口。
ngx_process_events_and_timers会调用ngx_process_events,这是一个宏,相当于ngx_event_actions.process_events,ngx_event_actions是个全局的结构体,存储了对应事件驱动模块(这里是epoll模块)的10个函数接口。所以这里就是调用了ngx_epoll_module_ctx.actions.process_events函数,也就是ngx_epoll_process_events函数来处理事件。
ngx_epoll_process_events调用Linux函数接口epoll_wait获得“有新连接”这个事件,然后调用这个事件的handler处理函数来对这个事件进行处理。
在上面已经说过handler已经被设置成了ngx_event_accept函数,所以就调用ngx_event_accept进行实际的处理。
下面分析ngx_event_accept方法,它的流程图如下所示:
经过精简的代码如下,注释中的序号对应上图的序号:
void ngx_event_accept(ngx_event_t*ev) { socklen_tsocklen; ngx_err_terr; ngx_log_t*log; ngx_uint_tlevel; ngx_socket_ts; ngx_event_t*rev,*wev; ngx_listening_t*ls; ngx_connection_t*c,*lc; ngx_event_conf_t*ecf; u_charsa[NGX_SOCKADDRLEN]; if(ev->timedout){ if(ngx_enable_accept_events((ngx_cycle_t*)ngx_cycle)!=NGX_OK){ return; } ev->timedout=0; } ecf=ngx_event_get_conf(ngx_cycle->conf_ctx,ngx_event_core_module); if(ngx_event_flags&NGX_USE_RTSIG_EVENT){ ev->available=1; }elseif(!(ngx_event_flags&NGX_USE_KQUEUE_EVENT)){ ev->available=ecf->multi_accept; } lc=ev->data; ls=lc->listening; ev->ready=0; do{ socklen=NGX_SOCKADDRLEN; /*1、accept方法试图建立连接,非阻塞调用*/ s=accept(lc->fd,(structsockaddr*)sa,&socklen); if(s==(ngx_socket_t)-1) { err=ngx_socket_errno; if(err==NGX_EAGAIN) { /*没有连接,直接返回*/ return; } level=NGX_LOG_ALERT; if(err==NGX_ECONNABORTED){ level=NGX_LOG_ERR; }elseif(err==NGX_EMFILE||err==NGX_ENFILE){ level=NGX_LOG_CRIT; } if(err==NGX_ECONNABORTED){ if(ngx_event_flags&NGX_USE_KQUEUE_EVENT){ ev->available--; } if(ev->available){ continue; } } if(err==NGX_EMFILE||err==NGX_ENFILE){ if(ngx_disable_accept_events((ngx_cycle_t*)ngx_cycle) !=NGX_OK) { return; } if(ngx_use_accept_mutex){ if(ngx_accept_mutex_held){ ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held=0; } ngx_accept_disabled=1; }else{ ngx_add_timer(ev,ecf->accept_mutex_delay); } } return; } /*2、设置负载均衡阈值*/ ngx_accept_disabled=ngx_cycle->connection_n/8 -ngx_cycle->free_connection_n; /*3、从连接池获得一个连接对象*/ c=ngx_get_connection(s,ev->log); /*4、为连接创建内存池*/ c->pool=ngx_create_pool(ls->pool_size,ev->log); c->sockaddr=ngx_palloc(c->pool,socklen); ngx_memcpy(c->sockaddr,sa,socklen); log=ngx_palloc(c->pool,sizeof(ngx_log_t)); /*setablockingmodeforaioandnon-blockingmodeforothers*/ /*5、设置套接字属性为阻塞或非阻塞*/ if(ngx_inherited_nonblocking){ if(ngx_event_flags&NGX_USE_AIO_EVENT){ if(ngx_blocking(s)==-1){ ngx_log_error(NGX_LOG_ALERT,ev->log,ngx_socket_errno, ngx_blocking_n"failed"); ngx_close_accepted_connection(c); return; } } }else{ if(!(ngx_event_flags&(NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))){ if(ngx_nonblocking(s)==-1){ ngx_log_error(NGX_LOG_ALERT,ev->log,ngx_socket_errno, ngx_nonblocking_n"failed"); ngx_close_accepted_connection(c); return; } } } *log=ls->log; c->recv=ngx_recv; c->send=ngx_send; c->recv_chain=ngx_recv_chain; c->send_chain=ngx_send_chain; c->log=log; c->pool->log=log; c->socklen=socklen; c->listening=ls; c->local_sockaddr=ls->sockaddr; c->local_socklen=ls->socklen; c->unexpected_eof=1; rev=c->read; wev=c->write; wev->ready=1; if(ngx_event_flags&(NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)){ /*rtsig,aio,iocp*/ rev->ready=1; } if(ev->deferred_accept){ rev->ready=1; } rev->log=log; wev->log=log; /* *TODO:MT:-ngx_atomic_fetch_add() *orprotectionbycriticalsectionorlightmutex * *TODO:MP:-allocatedinasharedmemory *-ngx_atomic_fetch_add() *orprotectionbycriticalsectionorlightmutex */ c->number=ngx_atomic_fetch_add(ngx_connection_counter,1); if(ls->addr_ntop){ c->addr_text.data=ngx_pnalloc(c->pool,ls->addr_text_max_len); if(c->addr_text.data==NULL){ ngx_close_accepted_connection(c); return; } c->addr_text.len=ngx_sock_ntop(c->sockaddr,c->socklen, c->addr_text.data, ls->addr_text_max_len,0); if(c->addr_text.len==0){ ngx_close_accepted_connection(c); return; } } /*6、将新连接对应的读写事件添加到epoll对象中*/ if(ngx_add_conn&&(ngx_event_flags&NGX_USE_EPOLL_EVENT)==0){ if(ngx_add_conn(c)==NGX_ERROR){ ngx_close_accepted_connection(c); return; } } log->data=NULL; log->handler=NULL; /*7、TCP建立成功调用的方法,这个方法在ngx_listening_t结构体中*/ ls->handler(c); }while(ev->available);/*available标志表示一次尽可能多的建立连接,由配置项multi_accept决定*/ }
Nginx中的“惊群”问题
Nginx一般会运行多个worker进程,这些进程会同时监听同一端口。当有新连接到来时,内核将这些进程全部唤醒,但只有一个进程能够和客户端连接成功,导致其它进程在唤醒时浪费了大量开销,这被称为“惊群”现象。Nginx解决“惊群”的方法是,让进程获得互斥锁ngx_accept_mutex,让进程互斥地进入某一段临界区。在该临界区中,进程将它所要监听的连接对应的读事件添加到epoll模块中,使得当有“新连接”事件发生时,该worker进程会作出反应。这段加锁并添加事件的过程是在函数ngx_trylock_accept_mutex中完成的。而当其它进程也进入该函数想要添加读事件时,发现互斥锁被另外一个进程持有,所以它只能返回,它所监听的事件也无法添加到epoll模块,从而无法响应“新连接”事件。但这会出现一个问题:持有互斥锁的那个进程在什么时候释放互斥锁呢?如果需要等待它处理完所有的事件才释放锁的话,那么会需要相当长的时间。而在这段时间内,其它worker进程无法建立新连接,这显然是不可取的。Nginx的解决办法是:通过ngx_trylock_accept_mutex获得了互斥锁的进程,在获得就绪读/写事件并从epoll_wait返回后,将这些事件归类放入队列中:
新连接事件放入ngx_posted_accept_events队列
已有连接事件放入ngx_posted_events队列
代码如下:
if(flags&NGX_POST_EVENTS) { /*延后处理这批事件*/ queue=(ngx_event_t**)(rev->accept?&ngx_posted_accept_events:&ngx_posted_events); /*将事件添加到延后执行队列中*/ ngx_locked_post_event(rev,queue); } else { rev->handler(rev);/*不需要延后,则立即处理事件*/ }
写事件做类似处理。进程接下来处理ngx_posted_accept_events队列中的事件,处理完后立即释放互斥锁,使该进程占用锁的时间降到了最低。
Nginx中的负载均衡问题
Nginx中每个进程使用了一个处理负载均衡的阈值ngx_accept_disabled,它在上图的第2步中被初始化:
ngx_accept_disabled=ngx_cycle->connection_n/8-ngx_cycle->free_connection_n;
它的初值为一个负数,该负数的绝对值等于总连接数的7/8.当阈值小于0时正常响应新连接事件,当阈值大于0时不再响应新连接事件,并将ngx_accept_disabled减1,代码如下:
if(ngx_accept_disabled>0) { ngx_accept_disabled--; } else { if(ngx_trylock_accept_mutex(cycle)==NGX_ERROR) { return; } .... }
这说明,当某个进程当前的连接数达到能够处理的总连接数的7/8时,负载均衡机制被触发,进程停止响应新连接。
参考:
《深入理解Nginx》P328-P334.
相关文章
- SEM学习笔记——竞价账户流程梳理
- [Delta][SQL] Delta开源付费功能,最全分析ZOrder的源码实现流程
- MsSQL2008R2安装基础流程笔记
- 21. 网络基础(2)——网络传输的基本流程
- 单细胞入门之Seurat标准流程
- recat源码中的setState流程
- 用Go学设计模式-提炼流程,减少重复开发就靠它了!
- js 基础笔记 流程控制
- 7步学会在Windows下上架iOS APP流程
- iOS开发笔记 – App上架流程(视频分享)详解手机开发
- 详解Oracle安装流程,成功率提升至42%(oracle安装42)
- 揭示Oracle工作流程的秘密(oracle工作流程)
- Oracle操作视图分步编写流程说明(oracle写视图步骤)
- 京东全流程智能无人仓亮相,物流行业迎来“无人”时代?
- PHP基础学习之流程控制的实现分析
- CI框架学习笔记(一)-环境安装、基本术语和框架流程