CS模型epoll和线程池实现服务器高并发

  1. 使用信号量对主线程和线程处理函数进行线程同步,当线程处理函数读取完套接字的数据后通知信号量解除主线程的阻塞,开始处理下一个事件。
  2. 定义结构体dealinfo用来传送文件描述符和字符串地址。
  3. 注意一点,不能在线程处理函数中关闭不需要的文件描述符。不然会发生惊群现象
  4. "threadpool.h"头文件是用来实现线程池的
  5. 服务器实现的功能是把客户端发来的字符串转大写
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<errno.h>
#include<ctype.h>
#include<semaphore.h>
#include"threadpool.h"
#define SERV_PORT 9527
#define OPEN_MAX 1024

sem_t sem;//信号量,线程同步解决方案
//pthread_mutex_t mutex;

typedef struct dealinfo{
    int deal_sockfd;//对应文件描述符
    char *deal_buf;//字符串地址
}dealinfo;
void *process(void *arg)
{
    int prosockfd=(*(dealinfo *)arg).deal_sockfd;

    char buf[BUFSIZ];
    strcpy(buf,(*(dealinfo *)arg).deal_buf);
    sem_post(&sem);//已读取完数据,通知主线程解除阻塞
    int len=strlen(buf);

    for(int i=0;i<len;++i)
        buf[i]=toupper(buf[i]);        
    buf[len]=;
    write(prosockfd,buf,len+1);
    return NULL;

}
int main()
{
    //线程池参数
    threadpool_t *thp=threadpool_create(3,100,100);


    int i,listenfd,connfd,sockfd;
    int n,num=0;
    ssize_t epfd,nready,res;
    char tbuf[BUFSIZ],clie_IP[INET_ADDRSTRLEN];
    socklen_t clilen;

    struct sockaddr_in cliaddr,servaddr;
    struct epoll_event tep,ep[OPEN_MAX];
    
    sem_init(&sem,0,0);//初始信号量拥有0个资源
    //pthread_mutex_init(&mutex,NULL);

    listenfd=socket(AF_INET,SOCK_STREAM,0);
    
    //设置端口复用
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family=AF_INET;
    servaddr.sin_port=htons(SERV_PORT);
    servaddr.sin_addr.s_addr=htonl(INADDR_ANY);

    bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr));

    listen(listenfd,128);

    epfd=epoll_create(OPEN_MAX);
    tep.events=EPOLLIN;
    tep.data.fd=listenfd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&tep);

    for(;;)
    {
        nready=epoll_wait(epfd,ep,OPEN_MAX,-1);
        for(i=0;i<nready;++i)
        {
            if(!(ep[i].events&EPOLLIN))
                continue;
            if(ep[i].data.fd==listenfd)
            {
                clilen=sizeof(cliaddr);
                connfd=accept(listenfd,(struct sockaddr *)&cliaddr,&clilen);
                printf("Client IP:%s,Port:%d
",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,clie_IP,sizeof(clie_IP)),ntohs(cliaddr.sin_port));
                tep.data.fd=connfd;
                tep.events=EPOLLIN;
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&tep);
            }
            else{
                sockfd=ep[i].data.fd;
                n=read(sockfd,tbuf,sizeof(tbuf));

                if(n==0)
                {
                    epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
                    close(sockfd);
                    printf("Client [%d] close connection.
",sockfd);
                }
                else if(n<0)
                {
                    perror("read");
                    epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
                    close(sockfd);
                }
                else    
                {
                    dealinfo ptharg={sockfd,tbuf};
                    threadpool_add(thp,process,(void*)&ptharg);//添加到线程池中
                    sem_wait(&sem);//当有数据处理时在此处阻塞,当子线程读取完数据后被通知解除阻塞
                    /*
                    for(int i=0;i<n;++i)
                        buf[i]=toupper(buf[i]);        
                    write(sockfd,buf,n);*/
                }
            }
        }
    }
    close(listenfd);
    close(epfd);
    //pthread_mutex_destroy(&mutex);
    sem_destroy(&sem);
    threadpool_destroy(thp);

    return 0;
}
经验分享 程序员 微信小程序 职场和发展