В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обме
Автор:
Разместил: Amro   Дата: 2006-06-05 01:12
Комментарии: (0)   Рейтинг:

РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUX

В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обмен в данный момент времени. Код для процедуры serv_request , выполняемой ведомыми потоками может быть взят из различных источников ( см. [1],[2],[3]). Параметр , передаваемый в процедуру serv_request , явлется дескриптором пассивного сокета , создаваемый ведущим потоком с помощью вызова процедуры getServerSocket( ).
Необходимо отметить, что приведенное решение основано на свойстве Linux эффективно распараллеливать вызов accept( ). В противном случае возникает необходимость в блокировке мьютекса перед вызовом accept( ),
что влечет за собой последовательное выполнение потоками сервера критической части кода, т.е. вызова accept( ). В среде же Red Hat Linux 9 (например) эта предосторожность не нужна и только снижает производительность. Детальное описание Posix Threads API на русском языке может быть найдено в [2].
Приводится также модифицированный код заглушки sample_svc.c (не за- висящий от шаблона sample.x) , позволяюший скомпилировать многопото- ковый сервер RPC в среде LINUX.
/*
*  ServerNBTHR.c
*/
#include <sys/socket.h>

#include <sys/types.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>

#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#define NUM_THREADS 512

pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;

void
die(const char *func, int err)
{
        fprintf(stderr,"%s: %s\n",func, strerror(err));
        abort();
}

                                                                                
void
bark(const char *func, int err)
{
        fprintf(stderr,"%s: %s\n",func, strerror(err));
}
/* 
    Описание поцедуры ведущего потока , которая возвращает
дескрипторов пассивного сокета, привязанного к адресу
сервера.
*/
int
getServerSocket(unsigned short int port)
{
        int listenSocket;
        struct sockaddr_in listenSockaddr;
                                                                                
        if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0)
                die("socket()",errno);
        memset(&listenSockaddr, 0, sizeof(listenSockaddr));
        listenSockaddr.sin_family = PF_INET;
        listenSockaddr.sin_port = htons(port);
        listenSockaddr.sin_addr.s_addr = INADDR_ANY;
                                                                                
         if(bind(listenSocket,(struct sockaddr*)&listenSockaddr,
                                            sizeof(listenSockaddr)) < 0)
                die("bind()",errno);
                                                                                
        if(listen(listenSocket,5)<0)
                die("listen()",errno);
                                                                                
        return listenSocket;
}

/*
    Описание процедуры выполняемой всеми ведомыми потоками
*/


void *
serv_request(void *data)
{
struct connection_cb
{
        int dataSocket;
        char data[256];
        int dataSent;
        int dataToSend;
        int isReading;
        struct connection_cb *next;
};
                                                                                                   
struct connection_cb *connections = NULL;

int listenSocket = (int)data;
       if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0)
               die("fcntl()",errno);
       
       while(1)
       {
               fd_set readFdSet;
               fd_set writeFdSet;
               struct connection_cb *currentConn, **currentConnPtr, *tempConn;
               int maxFdNum;

               FD_ZERO(&readFdSet);
               FD_ZERO(&writeFdSet);
             
              /*
                    Добавление дескриптора к множеству readFdSet
              */
               FD_SET(listenSocket,&readFdSet);
               maxFdNum = listenSocket;
               
           for(currentConn = connections;currentConn!=NULL;currentConn =   
                                                           currentConn->next)
       {

               if(currentConn->isReading)
                       FD_SET(currentConn->dataSocket,&readFdSet);
               else
                       FD_SET(currentConn->dataSocket,&writeFdSet);
       maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn-
                                       >dataSocket : maxFdNum; 
       }
          /*
              Получение множества дескрипторов сокетов для обработки
          */           
       if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0)
       {

               if(errno == EINTR)
                       continue;
               die("select()",errno);
       }
               
       currentConnPtr=&connections;
       
       while(*currentConnPtr!=NULL)
       {

          /*
             Проверка принадлежности дескриптора 
             (*currentConnPtr)->dataSocket к множеству readFdSet
           */

       if((*currentConnPtr)->isReading && 
                  FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet))
       {

         int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data,
                                                         sizeof((*currentConnPtr)->data),0);

       if(result < 0)
       {
       if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK)
       {
               bark("recv()",errno);
               close((*currentConnPtr)->dataSocket);
               tempConn = *currentConnPtr;
               *currentConnPtr = (*currentConnPtr)->next;
               free(tempConn);
               continue;
       }

               }
       else 
         if(result==0)
               {
               close((*currentConnPtr)->dataSocket);
               tempConn = *currentConnPtr;
               *currentConnPtr = (*currentConnPtr)->next;
               free(tempConn);
               continue;
               }
       else
       {
               (*currentConnPtr)->dataToSend = result;
               (*currentConnPtr)->dataSent = 0;
               (*currentConnPtr)->isReading = 0;
         printf("Recieving  as Slave Thread id = '%d' \n",pthread_self());
       }

               }
       else 
         /*
             Проверка принадлежности дескриптора 
             (*currentConnPtr)->dataSocket к множеству writedFdSet
         */
         if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet))
       {
       int result = send((*currentConnPtr)->dataSocket, 
               (*currentConnPtr)->data+(*currentConnPtr)->dataSent,
                (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0);
                       
       if(result < 0)
       {

         if(errno!=EINTR && errno!=EAGAIN)
               {
                       bark("write()",errno);
                       close((*currentConnPtr)->dataSocket);
                       tempConn = *currentConnPtr;
                       *currentConnPtr = (*currentConnPtr)->next;
                       free(tempConn);
                       continue;
               }
                       }
               else
               {
                       (*currentConnPtr)->dataSent +=result;

               if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend)
                       (*currentConnPtr)->isReading = 1;
                               }

                       }

                       currentConnPtr = &((*currentConnPtr)->next);
                printf("Sending as Slave Thread id = '%d' \n",pthread_self());
               }
           /*
                Проверка принадлежности дескриптора listenSocket
                к множеству readFdSet,т.е. необходимости обработать
                вызов connect( )  от  нового клиента.
           */
       if(FD_ISSET(listenSocket,&readFdSet))
               {
                       
  while(1)
       {

        /*
            Вызовы pthread_mutex_lock, pthread_mutex_unlock
            Не нужны в среде Linux
         */
        pthread_mutex_lock(&request_mutex);
           int result = accept(listenSocket,(struct sockaddr*)NULL,NULL);
        pthread_mutex_unlock(&request_mutex);              
       if(result < 0)
       {

               if(errno==EAGAIN || errno == EWOULDBLOCK)
                       break;
                       die("accept()",errno);
                       }
       else
       {
               *currentConnPtr = malloc(sizeof(struct connection_cb));
               if(*currentConnPtr==NULL)
               die("malloc()",0);
       
               if(fcntl(result,F_SETFL,O_NONBLOCK)<0)
                       die("fcntl()",errno);

               (*currentConnPtr)->dataSocket = result;
                    (*currentConnPtr)->isReading = 1;
               (*currentConnPtr)->next = 0;
               currentConnPtr = &((*currentConnPtr)->next);
       printf("Accepting  as Master Thread id = '%d' \n",pthread_self());
                               }
                       }       
               }

       }
}
int
main(int argc,char *argv[])
{
int k;
int descSock;
char *service="1500";
switch(argc) {
case 1:
 break;
case 2:
 service = argv[1];
 break;
default:
 printf ("Usage: ./ServerBNTH [port]\n");
 exit(1);
}
                                                                                
size_t stacksize;
pthread_t p_thread[NUM_THREADS];

/*
 Установка размера стека для ведомых потоков
*/

pthread_attr_t attr;
   pthread_attr_init(&attr);
   stacksize = 500000;
   pthread_attr_setstacksize (&attr, stacksize);
   pthread_attr_getstacksize (&attr, &stacksize);

/*
  Получение значения дескриптора пассивного сокета
*/ 

   descSock  = getServerSocket(atoi(service));

/* 
 Запуск ведомых потоков
*/

for(k=0; k<NUM_THREADS; k++) {

  pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock);
  printf("Thread %d started \n",k);
}
                                                                               
pthread_attr_destroy(&attr);

for(k=0;k<NUM_THREADS;k++) {
  pthread_join(p_thread[k], NULL);
  printf("Completed join with thread %d\n",k);
 }

}

Ниже приведен модифицированный код square_svc.c (для шаблона square.x), позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0 Шаблон square.x: struct square_in { long arg1; }; struct square_out { long res1; }; program SQUARE_PROG { version SQUARE_VERS { square_out SQUAREPROC(square_in) = 1; } = 2 ; } = 0x31230000; Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла : $ rpcgen -a -M square.x
Код процедур сервера: /* ServerSideProc.c */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int request=0; bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp) { printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1); /* Имитация работы процедуры , выполняемой потоками сервера */ sleep(5); outp->res1=inp->arg1*inp->arg1; printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1); return(TRUE); } int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result) { xdr_free(xdr_result,result); return(1); }
Модифицированный файл square_svc.c: /* square_svc.c * Please do not edit this file. * It was generated using rpcgen. */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> #ifndef SIG_PF #define SIG_PF void(*)(int) #endif pthread_t p_thread; pthread_attr_t attr; /* Процедура выполняемая потоком */ void * serv_request(void *data) { struct thr_data { struct svc_req *rqstp; SVCXPRT *transp; } *ptr_data; { union { square_in squareproc_2_arg; } argument; union { square_out squareproc_2_res; } result; bool_t retval; xdrproc_t _xdr_argument, _xdr_result; bool_t (*local)(char *, void *, struct svc_req *); /* Распаковка данных , переданных в процедуру при запуске потока. */ ptr_data = (struct thr_data *)data; struct svc_req *rqstp = ptr_data->rqstp; register SVCXPRT *transp = ptr_data->transp; switch (rqstp->rq_proc) { case NULLPROC: (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL); return; case SQUAREPROC: _xdr_argument = (xdrproc_t) xdr_square_in; _xdr_result = (xdrproc_t) xdr_square_out; local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc; break; default: svcerr_noproc (transp); return; } memset ((char *)&argument, 0, sizeof (argument)); if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { svcerr_decode (transp); return; } /* Стандартный вызов функции сервера. Данные для вызова уже приведены к стандарту. */ retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp); if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) { svcerr_systemerr (transp); } if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { fprintf (stderr, "%s", "unable to free arguments"); exit (1); } if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result)) fprintf (stderr, "%s", "unable to free results"); return; } } /* Принципиально измененный код square_prog_2 , стартующей теперь новый поток для каждого инициированного клиентом вызова процедуры на удаленном сервере */ static void square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp) { struct data_str { struct svc_req *rqstp; SVCXPRT *transp; } *data_ptr =(struct data_str*)malloc(sizeof(struct data_str); { /* Упаковка данных в структуру для передачи ссылки на нее, как параметра запускаемому потоку */ data_ptr->rqstp = rqstp; data_ptr->transp = transp; pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr); } } int main (int argc, char **argv) { register SVCXPRT *transp; pmap_unset (SQUARE_PROG, SQUARE_VERS); transp = svcudp_create(RPC_ANYSOCK); if (transp == NULL) { fprintf (stderr, "%s", "cannot create udp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); exit(1); } transp = svctcp_create(RPC_ANYSOCK, 0, 0); if (transp == NULL) { fprintf (stderr, "%s", "cannot create tcp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); exit(1); } svc_run (); fprintf (stderr, "%s", "svc_run returned"); exit (1); /* NOTREACHED */ } Компиляция ServerSQUARE: $ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl
Код клиента: /* * ClientSideProc.c */ #include <memory.h> /* for memset */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int main (int argc,char **argv) { CLIENT *cl; square_in in; square_out out; if (argc != 3 ) { printf ("Usage : client <hostname> <integer_valus=e>\n"); exit(1); } cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp"); if (cl == NULL) { clnt_perror (cl, "call failed"); exit (1); } in.arg1 = atol(argv[2]); if (squareproc_2(&in,&out,cl) != RPC_SUCCESS) { printf ("%s\n" , clnt_perror (cl,argv[1] )); exit(1); } printf("result: %ld\n",out.res1); exit(0); } Компиляция ClientSQUARE: $ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl
Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"): [root@dell4500 SQWMT]# cat square.bsh ./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & \ ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & \ ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & \ ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & \ ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & \ ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 & Вывод на машине клиента: [root@dell4500 SQWMT]# ./square.bsh [root@dell4500 SQWMT]# result: 196 result: 225 result: 256 result: 289 result: 121 result: 144 result: 441 result: 169 result: 100 result: 324 result: 361 result: 400 Вывод на машине сервера: [root@dell4500 SQWMT]# ./ServerSQUARE Thread id = '1082453184' started, arg = 14 Thread id = '1090841664' started, arg = 15 Thread id = '1099230144' started, arg = 16 Thread id = '1116941120' started, arg = 17 Thread id = '1125329600' started, arg = 11 Thread id = '1133718080' started, arg = 12 Thread id = '1142106560' started, arg = 21 Thread id = '1150495040' started, arg = 13 Thread id = '1158883520' started, arg = 10 Thread id = '1167272000' started, arg = 18 Thread id = '1175660480' started, arg = 19 Thread id = '1184048960' started, arg = 20 Thread id = '1082453184' is done 196 Thread id = '1090841664' is done 225 Thread id = '1099230144' is done 256 Thread id = '1116941120' is done 289 Thread id = '1125329600' is done 121 Thread id = '1133718080' is done 144 Thread id = '1142106560' is done 441 Thread id = '1150495040' is done 169 Thread id = '1158883520' is done 100 Thread id = '1167272000' is done 324 Thread id = '1175660480' is done 361 Thread id = '1184048960' is done 400 Литература:

1.Дуглас Э. Крамер,Дэвид Л. Стивенс. Сети TCP/IP .Разработка приложений Типа клиент/сервер для LINUX/POSIX . Том 3 Издательский дом "Вильямс",2002
2. http://dlenev.nm.ru/
3.Стивенс У. UNIX: Взаимодействие процессов.Том 1,2 Из-во "Питер",2002

Автор: Б.А. Державец