当前版本对应的tag标签为:v0.4.0 ,包含两个分支:master主分支和develop开发分支。作者会在开发分支上进行日常的开发工作,在某个版本开发结束时,会合并到主分支上。从v0.4.0版本开始,zenglServer会以守护进程的模式来运行,并采用epoll方式来处理http请求...
zengl@zengl-ubuntu:~/zenglServer$ ./zenglServer zengl@zengl-ubuntu:~/zenglServer$ ps -aux | grep zenglServer zengl 4940 0.0 0.0 26352 2436 ? Ss 14:41 0:00 zenglServer: master zengl 4941 0.0 0.0 42744 532 ? Sl 14:41 0:00 zenglServer: child(0) zengl@zengl-ubuntu:~/zenglServer$ cat logfile create master process for daemon [pid:4940] use default config: config.zl *** config is in debug mode *** run config.zl complete, config: port: 8083 process_num: 1 webroot: my_webroot bind done accept sem initialized. process_max_open_fd_num: 1024 Master: Spawning child(0) [pid 4941] epoll max fd count : 896 zengl@zengl-ubuntu:~/zenglServer$ |
zengl@zengl-ubuntu:~/zenglServer$ tail -f logfile ----------------------------------- Sun Nov 5 14:50:25 2017 recv [client_socket_fd:9] [lst_idx:0] [pid:4941] [tid:4943]: request header: Host: 192.168.0.105:8083 | Connection: keep-alive | Cache-Control: max-age=0 | Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 | User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36 | Accept-Encoding: gzip,deflate,sdch | Accept-Language: zh-CN,zh;q=0.8 | url: / url_path: / free socket_list[0]/list_cnt:0 epoll_fd_add_count:1 pid:4941 tid:4943 ----------------------------------- Sun Nov 5 14:50:25 2017 recv [client_socket_fd:11] [lst_idx:0] [pid:4941] [tid:4943]: request header: Host: 192.168.0.105:8083 | Connection: keep-alive | Accept: */* | User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36 | Accept-Encoding: gzip,deflate,sdch | Accept-Language: zh-CN,zh;q=0.8 | url: /favicon.ico url_path: /favicon.ico free socket_list[0]/list_cnt:0 epoll_fd_add_count:0 pid:4941 tid:4943 |
zengl@zengl-ubuntu:~/zenglServer$ ps aux | grep zenglServer zengl 4940 0.0 0.0 26352 2436 ? Ss 14:41 0:00 zenglServer: master zengl 4941 0.0 0.0 108280 532 ? Sl 14:41 0:00 zenglServer: child(0) zengl@zengl-ubuntu:~/zenglServer$ kill 4940 zengl@zengl-ubuntu:~/zenglServer$ ps aux | grep zenglServer zengl@zengl-ubuntu:~/zenglServer$ tail -f logfile free socket_list[0]/list_cnt:0 epoll_fd_add_count:1 pid:4941 tid:4943 **** warning: 0 data length occured 0[0] free socket_list[0]/list_cnt:0 epoll_fd_add_count:0 pid:4941 tid:4943 Termination signal received! Killing children. All children reaped, shutting down. closed accept_sem shutdowned server socket closed server socket =================================== |
/* * client_socket_list.c * * Created on: 2017-10-24 * Author: zengl */ #include "client_socket_list.h" #include <stdlib.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> ................................................................ /** * 处理EPOLLIN事件,下面函数中会先根据client_socket_fd套接字文件描述符,从套接字列表中 * 搜索是否有该套接字,如果没有,则将其加入到套接字列表中,如果有,则将客户端请求的数据通过recv读取出来, * 并通过http_parser_execute进行请求数据的解析,在解析的过程中,会将请求数据的各部分写入到列表成员的不同的动态字符串中, * 以方便后续的各种操作,当recv返回EAGAIN或者EWOULDBLOCK错误时,就说明当前可读的数据都读完了,但是, * 客户端还有数据没传递过来(可能因为网络延迟等),这时就直接返回,当下一次当前连接的后续数据到来时,会触发EPOLLIN事件, * 并再次进入该函数去处理,直到把所有请求的数据都读取和解析完为止。 */ int client_socket_list_process_epollin(CLIENT_SOCKET_LIST * list, int client_socket_fd) { int idx = client_socket_list_find(list, client_socket_fd); if(idx < 0) { idx = client_socket_list_add(list, client_socket_fd); if(idx < 0) { routine_close_single_socket(client_socket_fd); return idx; } } int data_length; int total_length = 0; size_t parsed; char buffer[1025]; do { data_length = recv(client_socket_fd, buffer, (sizeof(buffer) - 1), 0); if(data_length > 0) { total_length += data_length; } if(data_length == -1) { if(errno == EAGAIN || errno == EWOULDBLOCK) { return CLIENT_EPOLL_EAGAIN_ERRNO; } else { write_to_server_log_pipe(WRITE_TO_PIPE, " **** error:[%d] %s\n", errno, strerror(errno)); routine_close_client_socket(list, idx); return -1; } } else if(data_length == 0) { write_to_server_log_pipe(WRITE_TO_PIPE, " **** warning: 0 data length occured"); write_to_server_log_pipe(WRITE_TO_PIPE, " %d[%d]\n", data_length, total_length); routine_close_client_socket(list, idx); return -1; } parsed = http_parser_execute(&(list->member[idx].parser), &settings, buffer, data_length); if(parsed != data_length) { write_to_server_log_pipe(WRITE_TO_PIPE, " **** parser error: parsed[%d] != data_length[%d]", (int)parsed, data_length); routine_close_client_socket(list, idx); return -1; } if(list->member[idx].parser_data.header_complete) { if(list->member[idx].parser.flags & (F_CHUNKED | F_CONTENTLENGTH)) { if(list->member[idx].parser_data.message_complete) break; } else break; } } while(1); return idx; } /** * 处理EPOLLOUT事件,list列表成员中的send_data动态字符串中包含了需要输出给客户端的响应数据, * 通过循环调用send将数据发送出去,如果输出数据比较大时,send可能会返回EAGAIN或者EWOULDBLOCK错误, * 表示当前客户端连接对应的发送缓存区已满了,就直接返回,并在下一次收到EPOLLOUT事件时再进入当前函数, * (表示发送缓存区的数据已发送到客户端,可以继续发送数据了), * 在该函数中,继续send,直到将所有需要输出的数据都发送完为止。 */ int client_socket_list_process_epollout(CLIENT_SOCKET_LIST * list, int idx) { if(idx < 0 || idx >= list->size) { return -1; } int client_socket_fd = list->member[idx].client_socket_fd; char * buf = list->member[idx].send_data.str + list->member[idx].send_data_cur; size_t buf_len = list->member[idx].send_data.count - list->member[idx].send_data_cur; int data_length; int total_length = 0; do { data_length = send(client_socket_fd, buf, buf_len, 0); if(data_length > 0) { total_length += data_length; } else if(data_length == -1) { if(errno == EAGAIN || errno == EWOULDBLOCK) { return CLIENT_EPOLL_EAGAIN_ERRNO; } else { write_to_server_log_pipe(WRITE_TO_PIPE, " **** error:[%d] %s\n", errno, strerror(errno)); routine_close_client_socket(list, idx); return -1; } } else { write_to_server_log_pipe(WRITE_TO_PIPE, " **** warning: 0 data length occured when send", errno, strerror(errno)); write_to_server_log_pipe(WRITE_TO_PIPE, " %d[%d]\n", data_length, total_length); routine_close_client_socket(list, idx); return -1; } list->member[idx].send_data_cur += data_length; buf_len = list->member[idx].send_data.count - list->member[idx].send_data_cur; if(buf_len == 0) { routine_close_client_socket(list, idx); return CLIENT_EPOLLOUT_FINISH; } else { buf = list->member[idx].send_data.str + list->member[idx].send_data_cur; } } while(1); } ................................................................ |
................................................................ /** * 将logstr写入server_log_fd文件描述符对应的日志文件中 */ int write_to_server_log(char * logstr) { return write(server_log_fd, logstr, strlen(logstr)); } /** * 子进程会将日志信息写入server_log_pipefd管道的一端 * 主进程则会循环读取管道的另一端,并将读取到的日志信息,统一写入到日志文件中, * 通过这种方式,日志信息就可以交由主进程统一管理,由主进程来决定写入到哪个日志文件中 * (虽然目前的版本还是写入到一个日志文件里,但是以后可能会根据日期将日志写入不同的日志文件中) */ int read_from_server_log_pipe() { while(1) { char logstr[200]; int chars_read; chars_read = read(server_log_pipefd[0], logstr, 195); logstr[chars_read] = STR_NULL; write_to_server_log(logstr); } } /** * 主进程和子进程都会通过这个函数来写入日志信息, * 当write_to_pipe参数为WRITE_TO_LOG(就是整数0)时,就直接将信息写入日志文件(一般是主进程使用WRITE_TO_LOG方式) * 当write_to_pipe参数为WRITE_TO_PIPE(整数1)时,就将日志写入管道(一般是子进程使用WRITE_TO_PIPE方式) * 写入日志时,可以提供format格式,下面会通过vsnprintf来根据format和arglist参数列表,来构建需要写入的字符串 */ int write_to_server_log_pipe(ZL_EXP_BOOL write_to_pipe, const char * format, ...) { if(server_log_pipe_string.str == NULL) { server_log_pipe_string.size = SERVER_LOG_PIPE_STR_SIZE; server_log_pipe_string.str = (char *)malloc(server_log_pipe_string.size * sizeof(char)); } int retcount = 0; va_list arglist; va_start(arglist, format); while(1) { retcount = vsnprintf(server_log_pipe_string.str, server_log_pipe_string.size, format, arglist); if(retcount >=0 && retcount < server_log_pipe_string.size) { server_log_pipe_string.str[retcount] = STR_NULL; if(write_to_pipe) { write(server_log_pipefd[1], server_log_pipe_string.str, retcount); } else { write_to_server_log(server_log_pipe_string.str); } break; } server_log_pipe_string.size += SERVER_LOG_PIPE_STR_SIZE; server_log_pipe_string.str = (char *)realloc(server_log_pipe_string.str, server_log_pipe_string.size * sizeof(char)); } va_end(arglist); return retcount; } /** * zenglServer启动时会执行的入口函数 */ int main(int argc, char * argv[]) { ....................................................................... // 后面会切换到守护进程,所有信息都会写入到logfile日志文件中 if ((server_log_fd = open("logfile", O_WRONLY|O_APPEND|O_CREAT, 0644)) < 0) { printf("open for server_log_fd failed [%d] %s \n", errno, strerror(errno)); exit(errno); } // 将argv[0]赋值给current_process_name,通过current_process_name就可以修改当前进程的名称 current_process_name = argv[0]; //通过fork创建master主进程,该进程将在后台以守护进程的形式一直运行,并通过该进程来创建执行具体任务的child子进程 pid_t master_pid = fork(); if(master_pid < 0) { write_to_server_log_pipe(WRITE_TO_LOG, "failed to create master process [%d] %s \n", errno, strerror(errno)); // 创建完master进程后,退出当前进程 exit(-1); } else if(master_pid > 0) { // 记录master主进程的进程ID write_to_server_log_pipe(WRITE_TO_LOG, "create master process for daemon [pid:%d] \n", master_pid); return 0; } // 将umask设为0,让子进程给文件设置的读写执行权限不会被屏蔽掉 umask(0); int logStdout; if ((logStdout = open("/dev/null", O_WRONLY|O_APPEND|O_CREAT, 0644)) < 0) { write_to_server_log_pipe(WRITE_TO_LOG, "open /dev/null failed [%d] %s \n", errno, strerror(errno)); exit(errno); } // 将标准输入和输出重定向到/dev/null dup2(logStdout, STDIN_FILENO); dup2(logStdout, STDOUT_FILENO); dup2(logStdout, STDERR_FILENO); close(logStdout); // 设置新的会话,这样主进程和子进程就不会受到控制台信号的影响了 if (setsid() < 0) { write_to_server_log_pipe(WRITE_TO_LOG, "setsid() failed [%d] %s \n", errno, strerror(errno)); exit(errno); } // 创建日志用的管道,子进程中的日志信息会先写入管道,再由主进程统一从管道中读取出来,并写入日志文件中 if (pipe(server_log_pipefd) == -1) { write_to_server_log_pipe(WRITE_TO_LOG, "pipe() failed [%d] %s \n", errno, strerror(errno)); exit(errno); } // 当没有使用-c命令行参数指定配置文件名时,就使用默认的配置文件名 if(config_file == NULL) { write_to_server_log_pipe(WRITE_TO_LOG, "use default config: " DEFAULT_CONFIG_FILE "\n"); config_file = DEFAULT_CONFIG_FILE; } else { write_to_server_log_pipe(WRITE_TO_LOG, "use config: %s\n", config_file); } ....................................................................... // 获取当前进程可以打开的文件描述符数量限制,用于控制epoll监听的文件描述符数 struct rlimit limit; if (getrlimit(RLIMIT_NOFILE, &limit) != 0) { write_to_server_log_pipe(WRITE_TO_LOG, "getrlimit() failed with errno=%d %s\n", errno, strerror(errno)); exit(1); } process_max_open_fd_num = limit.rlim_cur; write_to_server_log_pipe(WRITE_TO_LOG, "process_max_open_fd_num: %d \n", process_max_open_fd_num); // 根据process_num的值,创建多个子进程,如果是调试模式,一般就设置一个子进程,方便gdb调试 for(int i=0;i < server_process_num;i++) { fork_child_process(i); } // 注册信号,主要是进程终止信号,子进程结束信号等 register_signals(); // trap_signals会通过sigaction系统调用,将register_signals中注册的信号应用到相关的处理函数上,当进程接收到信号时,就会调用相关的C函数去处理 if (!trap_signals(ZL_EXP_TRUE)) { write_to_server_log_pipe(WRITE_TO_LOG, "trap_signals() failed!\n"); exit(-1); } // 主进程循环读取管道,将子进程通过管道发送的日志信息统一写入到日志文件中 read_from_server_log_pipe(); return 0; } /** * 通过fork系统调用创建执行具体工作的子进程 */ void fork_child_process(int idx) { // 通过fork创建子进程 pid_t childpid = fork(); // 如果childpid等于0,说明当前进程是子进程,就创建工作线程 if(childpid == 0) { pthread_t tid[THREAD_NUM_MAX]; // 设置child子进程的进程名 snprintf(current_process_name, 0xff, "zenglServer: child(%d)", idx); // 将子进程从父进程继承过来的信号处理函数取消掉 if (!trap_signals(ZL_EXP_FALSE)) { fprintf(stderr, "Child %d: trap_signals() failed!\n", idx); exit(1); } // 将process_max_open_fd_num的7/8的值,定为max_size,即epoll可以添加的用于监听事件的文件描述符数 int max_size = (process_max_open_fd_num / 8) * 7; if(max_size > 0) { // 从Linux 2.6.8开始, epoll_create的第一个size参数已经被忽略掉, 但是该参数还是必须大于0 process_epoll_fd = epoll_create(max_size); } else { process_epoll_fd = epoll_create(100); } // epoll_create返回的是epoll实例对应的文件描述符,后面会通过该文件描述符,对epoll进行操作,例如往epoll中添加需要监听的套接字等 if(process_epoll_fd == -1) { write_to_server_log_pipe(WRITE_TO_PIPE, "epoll_create failed : [%d] %s \n", errno, strerror(errno)); exit(-1); } // 该全局变量用于统计添加到epoll中的文件描述符的数量 epoll_fd_add_count = 0; // 每次epoll_wait操作时,一次最多可以提取出MAX_EPOLL_EVENTS个事件进行处理,每个事件都对应一个epoll_event结构体 process_epoll_events = calloc(MAX_EPOLL_EVENTS, sizeof(struct epoll_event)); // 初始化线程互斥锁 if(pthread_mutex_init(&(my_thread_lock.lock), NULL) != 0) { write_to_server_log_pipe(WRITE_TO_PIPE, "thread lock init failed : [%d] %s \n", errno, strerror(errno)); exit(-1); } // 依次创建两个线程,第一个线程的处理函数为routine_epoll_append_fd,该处理函数主要用于从服务端套接字中获取客户端套接字的文件描述符,并将其加入到epoll中 // 第二个线程的处理函数为routine,该处理函数会通过epoll_wait从epoll实例中获取每个客户端套接字文件描述符的相关事件(例如某个客户端连接的可读或可写等事件),并对这些事件进行处理 pthread_create(&tid[0], NULL, routine_epoll_append_fd, (void *)&max_size); pthread_create(&tid[1], NULL, routine, NULL); // 通过join线程,来等待所有的线程结束 //for (int i = 0; i < thread_num_per_process; i++) for (int i = 0; i < 2; i++) { pthread_join(tid[i], NULL); } // 如果所有的线程都结束的话,就销毁线程锁,释放相关资源,并退出当前子进程,正常情况下,线程不会退出(因为routine中是一个无限循环),除非是发生严重的异常 pthread_mutex_destroy(&(my_thread_lock.lock)); free(process_epoll_events); close(process_epoll_fd); exit(0); } else if(childpid > 0) { // childpid大于0,表示当前是主进程,就向日志中输出创建的子进程的信息 write_to_server_log_pipe(WRITE_TO_LOG, "Master: Spawning child(%d) [pid %d] \n", idx, childpid); server_child_process[idx] = childpid; } } /** * 子进程退出时,主进程会收到SIGCHLD信号,并触发下面这个sig_child_callback函数去处理该信号 */ void sig_child_callback() { int i, status[0xff]; /* 数组中存储了每个子进程的退出码,暂时最多只处理255个子进程 */ pid_t pid; for (i = 0; i < server_process_num; ++i) { pid = waitpid(server_child_process[i], &status[i], WNOHANG); /* waitpid时采用WNOHANG非阻塞模式 */ if(pid < 0) { write_to_server_log_pipe(WRITE_TO_LOG, "waitpid error [%d] %s", errno, strerror(errno)); } else if(!pid) { /* waitpid返回0,表示该子进程正在运行中 */ continue; } else { // pid大于0,说明对应的子进程已经退出,则根据status退出码,将子进程退出的原因写入到日志中 if (WIFEXITED(status[i])) write_to_server_log_pipe(WRITE_TO_LOG, "child PID %d exited normally. Exit number: %d\n", pid, WEXITSTATUS(status[i])); else { if (WIFSTOPPED(status[i])) write_to_server_log_pipe(WRITE_TO_LOG, "child PID %d was stopped by %d\n", pid, WSTOPSIG(status[i])); else { if (WIFSIGNALED(status[i])) write_to_server_log_pipe(WRITE_TO_LOG, "child PID %d exited due to signal %d\n.", pid, WTERMSIG(status[i])); else write_to_server_log_pipe(WRITE_TO_LOG, "child PID %d exited, status: %d", pid, status[i]); } } // 通过fork_child_process函数重新创建一个新的子进程,继续工作 fork_child_process(i); } } } /** * 当主进程接收到SIGINT或者SIGTERM终止信号时,会触发的信号处理函数 */ void sig_terminate_master_callback() { int i, status; pid_t pid; write_to_server_log_pipe(WRITE_TO_LOG, "Termination signal received! Killing children"); /* * 在kill杀死子进程之前,需要先重置所有的信号处理函数,否则,当子进程被kill结束时,会给主进程发送SIGCHLD信号,并自动触发上面的sig_child_callback, * sig_child_callback又会通过fork_child_process重启子进程,就没办法结束掉子进程。因此需要先重置信号处理函数, * 通过将trap_signals的参数设置为ZL_EXP_FALSE(也就是整数0),就可以进行重置 */ trap_signals(ZL_EXP_FALSE); // 循环向子进程发送SIGTERM(终止信号),从而结束掉子进程 for (i = 0; i < server_process_num; ++i) kill(server_child_process[i], SIGTERM); /* 循环等待所有子进程结束 */ while ((pid = wait(&status)) != -1) write_to_server_log_pipe(WRITE_TO_LOG, "."); write_to_server_log_pipe(WRITE_TO_LOG, "\nAll children reaped, shutting down.\n"); // 如果所有子进程都退出了,就释放相关资源,并退出主进程,子进程和主进程都退出后,整个程序也就退出了 sem_unlink("accept_sem"); sem_close(my_thread_lock.accept_sem); write_to_server_log_pipe(WRITE_TO_LOG, "closed accept_sem\n"); shutdown(server_socket_fd, SHUT_RDWR); write_to_server_log_pipe(WRITE_TO_LOG, "shutdowned server socket\n"); close(server_socket_fd); write_to_server_log_pipe(WRITE_TO_LOG, "closed server socket\n===================================\n\n"); free(server_log_pipe_string.str); exit(0); } /** * 注册信号,将要处理的信号和对应的处理函数写入到server_sig_pairs数组中 * 后面的trap_signals函数,就会根据该数组进行实际的信号处理函数的绑定操作 */ void register_signals() { int i = 0; server_sig_pairs[i].signal = SIGCHLD; server_sig_pairs[i].action.sa_handler = &sig_child_callback; /* Don't send SIGCHLD when a process has been frozen (e.g. Ctrl-Z) */ server_sig_pairs[i].action.sa_flags = SA_NOCLDSTOP; server_sig_pairs[++i].signal = SIGINT; server_sig_pairs[i].action.sa_handler = &sig_terminate_master_callback; server_sig_pairs[++i].signal = SIGTERM; server_sig_pairs[i].action.sa_handler = &sig_terminate_master_callback; /* setting sigcount now is easier than doing it dynamically */ server_sig_count = ++i; } /** * 如果on参数是非0值,就将server_sig_pairs中注册的信号绑定到相应的自定义处理函数上 * 这样当主进程接收到注册的信号时,就会自动调用自定义的处理函数去处理这些信号 * 当该函数的on参数是0时,则将server_sig_pairs中注册的信号的处理handler恢复到默认的SIG_DFL * 相当于重置所有的信号处理函数 */ int trap_signals(ZL_EXP_BOOL on) { int i; struct sigaction dfl; /* the handler object */ dfl.sa_handler = SIG_DFL; /* for resetting to default behavior */ /* Loop through all registered signals and either set to the new handler * or reset them back to the default */ for (i = 0; i < server_sig_count; ++i) { /* notice that the second parameter takes the address of the handler */ if (sigaction(server_sig_pairs[i].signal, on ? &server_sig_pairs[i].action : &dfl, NULL) < 0) return ZL_EXP_FALSE; } return ZL_EXP_TRUE; } .......................................................................... /** * 将sfd对应的套接字设置为非阻塞模式,以配合epoll的事件驱动的工作方式 */ static int make_socket_non_blocking (int sfd) { int flags, s; flags = fcntl (sfd, F_GETFL, 0); if (flags == -1) { write_to_server_log_pipe(WRITE_TO_PIPE, "fcntl failed [%d] %s \n", errno, strerror(errno)); return -1; } flags |= O_NONBLOCK; s = fcntl (sfd, F_SETFL, flags); if (s == -1) { write_to_server_log_pipe(WRITE_TO_PIPE, "fcntl failed [%d] %s \n", errno, strerror(errno)); return -1; } return 0; } /** * 子进程的第一个工作线程的处理函数为routine_epoll_append_fd * 该处理函数会通过accept,从server_socket_fd服务端套接字中获取到client_socket_fd(客户端套接字) * 并将client_socket_fd设置为非阻塞模式,并加入到epoll实例中,这样当该套接字对应的客户端连接有输入数据时,就会触发EPOLLIN事件, * 另一个工作线程,就会对EPOLLIN事件进行处理,并对客户端连接传递过来的数据进行处理 */ void * routine_epoll_append_fd(void * arg) { struct sockaddr_in client_addr; int c_len = sizeof(client_addr); struct epoll_event event; int max_count = *((int *)arg); write_to_server_log_pipe(WRITE_TO_PIPE, "epoll max fd count : %d \n", max_count); do { //sem_wait(my_thread_lock.accept_sem); int client_socket_fd = accept(server_socket_fd, (struct sockaddr *)&client_addr, (socklen_t *)&c_len); if(client_socket_fd < 0) { write_to_server_log_pipe(WRITE_TO_PIPE, "accept client_socket_fd less than 0, maybe your linux is too old, and have thundering herd problem \n", max_count); continue; } //sem_post(my_thread_lock.accept_sem); if(make_socket_non_blocking(client_socket_fd) == 0) { event.data.fd = client_socket_fd; event.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP; // 每当向epoll实例中添加客户端套接字时,都将epoll_fd_add_count加一,用于统计添加了多少文件描述符 // 为了不让添加操作受到另一个工作线程的影响,这里对添加操作进行了线程加锁 pthread_mutex_lock(&(my_thread_lock.lock)); epoll_ctl (process_epoll_fd, EPOLL_CTL_ADD, client_socket_fd, &event); epoll_fd_add_count++; pthread_mutex_unlock(&(my_thread_lock.lock)); // 当添加到epoll中的文件描述符数超过了max_count时,就循环通过pthread_yield切换到其他工作线程,不再往epoll中添加更多的文件描述符了 // 除非另一个工作线程消化完了这些客户端连接,并让epoll_fd_add_count小于max_count时,就可以跳出循环,再继续添加文件描述符了 while(epoll_fd_add_count >= max_count) { pthread_yield(); } } } while(1); return NULL; } /** * 获取当前的线程ID */ pid_t routine_get_tid() { // 通过gettid系统调用来获取到当前的线程ID #ifdef SYS_gettid pid_t tid = syscall(SYS_gettid); #else #error "SYS_gettid unavailable on this system" #endif return tid; } /** * 关闭客户端套接字,并将套接字从socket_list列表中移除,同时将epoll_fd_add_count统计数减一 */ void routine_close_client_socket(CLIENT_SOCKET_LIST * socket_list, int lst_idx) { if(lst_idx >= 0 && lst_idx < socket_list->size) { pthread_mutex_lock(&(my_thread_lock.lock)); client_socket_list_free_by_idx(socket_list, lst_idx); epoll_fd_add_count--; write_to_server_log_pipe(WRITE_TO_PIPE, "free socket_list[%d]/list_cnt:%d epoll_fd_add_count:%d pid:%d tid:%d\n", lst_idx, socket_list->count, epoll_fd_add_count, getpid(), routine_get_tid()); pthread_mutex_unlock(&(my_thread_lock.lock)); } } /** * 对于未添加到socket_list列表中的套接字,当发生错误时,就直接close关闭掉它 */ void routine_close_single_socket(int client_socket_fd) { if(client_socket_fd > 0) { pthread_mutex_lock(&(my_thread_lock.lock)); close(client_socket_fd); epoll_fd_add_count--; write_to_server_log_pipe(WRITE_TO_PIPE, "close single socket:%d pid:%d tid:%d\n", client_socket_fd, getpid(), routine_get_tid()); pthread_mutex_unlock(&(my_thread_lock.lock)); } } /** * 子进程的第二个工作线程的处理函数,该处理函数会循环通过epoll_wait来获取各个套接字的读写事件, * 当某个客户端连接有可读的数据时,就会触发EPOLLIN事件,线程收到该事件时,就会将客户端连接中 * 可读的数据读取到该连接对应的缓存中,如果需要读取的数据比较多时,可能会触发多次EPOLLIN,线程就需要 * 读取多次,并将数据写入到客户端连接对应的缓存中,这里建立了一个socket_list的套接字列表,每个列表 * 成员中,包含了每个客户端连接对应的fd套接字文件描述符,以及相应的缓存等。这里还会处理EPOLLOUT事件, * 需要输出的数据也会先缓存起来,如果一次没传完的话,下次接收到EPOLLOUT时,再继续传数据,直到把所有需要 * 输出的数据都传递给客户端为止。 */ void * routine(void *arg) { int n, i; int client_socket_fd; struct epoll_event event; CLIENT_SOCKET_LIST socket_list = {0}; int lst_idx, epollout_ret; // 整个线程使用无限循环来循环处理客户端请求,除非发生异常,或者主体程序退出 do { n = epoll_wait (process_epoll_fd, process_epoll_events, MAX_EPOLL_EVENTS, -1); for (i = 0; i < n; i++) { if ((process_epoll_events[i].events & EPOLLERR) || (process_epoll_events[i].events & EPOLLHUP) || (!(process_epoll_events[i].events & EPOLLIN) && !(process_epoll_events[i].events & EPOLLOUT))) { /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ write_to_server_log_pipe(WRITE_TO_PIPE, "epoll error: 0x%x\n", process_epoll_events[i].events); client_socket_fd = process_epoll_events[i].data.fd; lst_idx = client_socket_list_find(&socket_list, client_socket_fd); if(lst_idx < 0) routine_close_single_socket(client_socket_fd); else routine_close_client_socket(&socket_list, lst_idx); continue; } else { if((process_epoll_events[i].events & EPOLLIN)) { client_socket_fd = process_epoll_events[i].data.fd; lst_idx = client_socket_list_process_epollin(&socket_list, client_socket_fd); if(lst_idx < 0) { continue; } lst_idx = routine_process_client_socket(&socket_list, lst_idx); if(lst_idx < 0) { continue; } event.data.fd = client_socket_fd; event.events = EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; epoll_ctl (process_epoll_fd, EPOLL_CTL_MOD, client_socket_fd, &event); client_socket_list_process_epollout(&socket_list, lst_idx); } else if(process_epoll_events[i].events & EPOLLOUT) { client_socket_fd = process_epoll_events[i].data.fd; lst_idx = client_socket_list_find(&socket_list, client_socket_fd); if(lst_idx < 0) { write_to_server_log_pipe(WRITE_TO_PIPE, "client_socket_list_find return less than 0: %d\n", lst_idx); routine_close_single_socket(client_socket_fd); continue; } client_socket_list_process_epollout(&socket_list, lst_idx); } } } } while(1); return NULL; } /** * 当线程读取到客户端的完整的请求数据后,就会执行下面这个函数,去处理该请求, * 并将处理的结果写入到输出缓存,函数返回后,线程会将输出缓存里的数据传递给客户端, * 当输出缓存中的数据比较多时,线程就需要分多次进行传输(通过检测EPOLLOUT事件来实现多次传输, * 当收到EPOLLOUT事件时,就说明该事件对应的客户端连接可以继续发送数据了) */ static int routine_process_client_socket(CLIENT_SOCKET_LIST * socket_list, int lst_idx) { time_t rawtime; struct tm * timeinfo; time ( &rawtime ); timeinfo = localtime ( &rawtime ); char * current_time = asctime (timeinfo); // 将当前时间和客户端套接字对应的描述符给打印出来 write_to_server_log_pipe(WRITE_TO_PIPE, "-----------------------------------\n%srecv [client_socket_fd:%d] [lst_idx:%d] [pid:%d] [tid:%d]:", current_time, socket_list->member[lst_idx].client_socket_fd, lst_idx, getpid(), routine_get_tid()); write_to_server_log_pipe(WRITE_TO_PIPE, "\n\n"); MY_PARSER_DATA * parser_data = &(socket_list->member[lst_idx].parser_data); write_to_server_log_pipe(WRITE_TO_PIPE, "request header: "); { char * tmp = parser_data->request_header.str; char * end = parser_data->request_header.str + parser_data->request_header.count; do{ ZL_EXP_CHAR * field = tmp; ZL_EXP_CHAR * value = field + strlen(field) + 1; if(field >= end || value >= end) { break; } write_to_server_log_pipe(WRITE_TO_PIPE, "%s: %s | ", field, value); tmp = value + strlen(value) + 1; }while(1); } write_to_server_log_pipe(WRITE_TO_PIPE, "\n\n"); write_to_server_log_pipe(WRITE_TO_PIPE, "url: %s\n", parser_data->request_url.str); // 通过http_parser_parse_url来解析url资源路径(包含查询字符串),该函数会将路径信息和查询字符串信息给解析出来,并将解析结果存储到url_parser中 ......................................................................... } |