Статьи :Операционные системы :Железо :
OS FAQ :
Кодинг :
Сейчас на сайте :0 пользователей, 8 гостей : |
Статьи » Операционные системы » Linux » РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUX
В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обме
Автор: Разместил: Amro Дата: 2006-06-05 01:12 Комментарии: ![]() ![]() РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUXВ заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обмен в данный момент времени. Код для процедуры serv_request , выполняемой ведомыми потоками может быть взят из различных источников ( см. [1],[2],[3]). Параметр , передаваемый в процедуру serv_request , явлется дескриптором пассивного сокета , создаваемый ведущим потоком с помощью вызова процедуры getServerSocket( ).Необходимо отметить, что приведенное решение основано на свойстве Linux эффективно распараллеливать вызов accept( ). В противном случае возникает необходимость в блокировке мьютекса перед вызовом accept( ), что влечет за собой последовательное выполнение потоками сервера критической части кода, т.е. вызова accept( ). В среде же Red Hat Linux 9 (например) эта предосторожность не нужна и только снижает производительность. Детальное описание Posix Threads API на русском языке может быть найдено в [2]. Приводится также модифицированный код заглушки sample_svc.c (не за- висящий от шаблона sample.x) , позволяюший скомпилировать многопото- ковый сервер RPC в среде LINUX. /* * ServerNBTHR.c */ #include <sys/socket.h> #include <sys/types.h> #include <sys/time.h> #include <netinet/in.h> #include <errno.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <pthread.h> #define NUM_THREADS 512 pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; void die(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); abort(); } void bark(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); } /* Описание поцедуры ведущего потока , которая возвращает дескрипторов пассивного сокета, привязанного к адресу сервера. */ int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0) die("socket()",errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if(bind(listenSocket,(struct sockaddr*)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()",errno); if(listen(listenSocket,5)<0) die("listen()",errno); return listenSocket; } /* Описание процедуры выполняемой всеми ведомыми потоками */ void * serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = (int)data; if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); while(1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); /* Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket,&readFdSet); maxFdNum = listenSocket; for(currentConn = connections;currentConn!=NULL;currentConn = currentConn->next) { if(currentConn->isReading) FD_SET(currentConn->dataSocket,&readFdSet); else FD_SET(currentConn->dataSocket,&writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn- >dataSocket : maxFdNum; } /* Получение множества дескрипторов сокетов для обработки */ if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0) { if(errno == EINTR) continue; die("select()",errno); } currentConnPtr=&connections; while(*currentConnPtr!=NULL) { /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству readFdSet */ if((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data),0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK) { bark("recv()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if(result==0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n",pthread_self()); } } else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data+(*currentConnPtr)->dataSent, (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN) { bark("write()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent +=result; if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n",pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if(FD_ISSET(listenSocket,&readFdSet)) { while(1) { /* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket,(struct sockaddr*)NULL,NULL); pthread_mutex_unlock(&request_mutex); if(result < 0) { if(errno==EAGAIN || errno == EWOULDBLOCK) break; die("accept()",errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if(*currentConnPtr==NULL) die("malloc()",0); if(fcntl(result,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n",pthread_self()); } } } } } int main(int argc,char *argv[]) { int k; int descSock; char *service="1500"; switch(argc) { case 1: break; case 2: service = argv[1]; break; default: printf ("Usage: ./ServerBNTH [port]\n"); exit(1); } size_t stacksize; pthread_t p_thread[NUM_THREADS]; /* Установка размера стека для ведомых потоков */ pthread_attr_t attr; pthread_attr_init(&attr); stacksize = 500000; pthread_attr_setstacksize (&attr, stacksize); pthread_attr_getstacksize (&attr, &stacksize); /* Получение значения дескриптора пассивного сокета */ descSock = getServerSocket(atoi(service)); /* Запуск ведомых потоков */ for(k=0; k<NUM_THREADS; k++) { pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock); printf("Thread %d started \n",k); } pthread_attr_destroy(&attr); for(k=0;k<NUM_THREADS;k++) { pthread_join(p_thread[k], NULL); printf("Completed join with thread %d\n",k); } } Ниже приведен модифицированный код square_svc.c (для шаблона square.x), позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0 Шаблон square.x: struct square_in { long arg1; }; struct square_out { long res1; }; program SQUARE_PROG { version SQUARE_VERS { square_out SQUAREPROC(square_in) = 1; } = 2 ; } = 0x31230000; Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла : $ rpcgen -a -M square.x Код процедур сервера: /* ServerSideProc.c */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int request=0; bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp) { printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1); /* Имитация работы процедуры , выполняемой потоками сервера */ sleep(5); outp->res1=inp->arg1*inp->arg1; printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1); return(TRUE); } int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result) { xdr_free(xdr_result,result); return(1); } Модифицированный файл square_svc.c: /* square_svc.c * Please do not edit this file. * It was generated using rpcgen. */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> #ifndef SIG_PF #define SIG_PF void(*)(int) #endif pthread_t p_thread; pthread_attr_t attr; /* Процедура выполняемая потоком */ void * serv_request(void *data) { struct thr_data { struct svc_req *rqstp; SVCXPRT *transp; } *ptr_data; { union { square_in squareproc_2_arg; } argument; union { square_out squareproc_2_res; } result; bool_t retval; xdrproc_t _xdr_argument, _xdr_result; bool_t (*local)(char *, void *, struct svc_req *); /* Распаковка данных , переданных в процедуру при запуске потока. */ ptr_data = (struct thr_data *)data; struct svc_req *rqstp = ptr_data->rqstp; register SVCXPRT *transp = ptr_data->transp; switch (rqstp->rq_proc) { case NULLPROC: (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL); return; case SQUAREPROC: _xdr_argument = (xdrproc_t) xdr_square_in; _xdr_result = (xdrproc_t) xdr_square_out; local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc; break; default: svcerr_noproc (transp); return; } memset ((char *)&argument, 0, sizeof (argument)); if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { svcerr_decode (transp); return; } /* Стандартный вызов функции сервера. Данные для вызова уже приведены к стандарту. */ retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp); if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) { svcerr_systemerr (transp); } if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { fprintf (stderr, "%s", "unable to free arguments"); exit (1); } if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result)) fprintf (stderr, "%s", "unable to free results"); return; } } /* Принципиально измененный код square_prog_2 , стартующей теперь новый поток для каждого инициированного клиентом вызова процедуры на удаленном сервере */ static void square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp) { struct data_str { struct svc_req *rqstp; SVCXPRT *transp; } *data_ptr =(struct data_str*)malloc(sizeof(struct data_str); { /* Упаковка данных в структуру для передачи ссылки на нее, как параметра запускаемому потоку */ data_ptr->rqstp = rqstp; data_ptr->transp = transp; pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr); } } int main (int argc, char **argv) { register SVCXPRT *transp; pmap_unset (SQUARE_PROG, SQUARE_VERS); transp = svcudp_create(RPC_ANYSOCK); if (transp == NULL) { fprintf (stderr, "%s", "cannot create udp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); exit(1); } transp = svctcp_create(RPC_ANYSOCK, 0, 0); if (transp == NULL) { fprintf (stderr, "%s", "cannot create tcp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); exit(1); } svc_run (); fprintf (stderr, "%s", "svc_run returned"); exit (1); /* NOTREACHED */ } Компиляция ServerSQUARE: $ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl Код клиента: /* * ClientSideProc.c */ #include <memory.h> /* for memset */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int main (int argc,char **argv) { CLIENT *cl; square_in in; square_out out; if (argc != 3 ) { printf ("Usage : client <hostname> <integer_valus=e>\n"); exit(1); } cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp"); if (cl == NULL) { clnt_perror (cl, "call failed"); exit (1); } in.arg1 = atol(argv[2]); if (squareproc_2(&in,&out,cl) != RPC_SUCCESS) { printf ("%s\n" , clnt_perror (cl,argv[1] )); exit(1); } printf("result: %ld\n",out.res1); exit(0); } Компиляция ClientSQUARE: $ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"): [root@dell4500 SQWMT]# cat square.bsh ./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & \ ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & \ ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & \ ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & \ ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & \ ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 & Вывод на машине клиента: [root@dell4500 SQWMT]# ./square.bsh [root@dell4500 SQWMT]# result: 196 result: 225 result: 256 result: 289 result: 121 result: 144 result: 441 result: 169 result: 100 result: 324 result: 361 result: 400 Вывод на машине сервера: [root@dell4500 SQWMT]# ./ServerSQUARE Thread id = '1082453184' started, arg = 14 Thread id = '1090841664' started, arg = 15 Thread id = '1099230144' started, arg = 16 Thread id = '1116941120' started, arg = 17 Thread id = '1125329600' started, arg = 11 Thread id = '1133718080' started, arg = 12 Thread id = '1142106560' started, arg = 21 Thread id = '1150495040' started, arg = 13 Thread id = '1158883520' started, arg = 10 Thread id = '1167272000' started, arg = 18 Thread id = '1175660480' started, arg = 19 Thread id = '1184048960' started, arg = 20 Thread id = '1082453184' is done 196 Thread id = '1090841664' is done 225 Thread id = '1099230144' is done 256 Thread id = '1116941120' is done 289 Thread id = '1125329600' is done 121 Thread id = '1133718080' is done 144 Thread id = '1142106560' is done 441 Thread id = '1150495040' is done 169 Thread id = '1158883520' is done 100 Thread id = '1167272000' is done 324 Thread id = '1175660480' is done 361 Thread id = '1184048960' is done 400 Литература:
1.Дуглас Э. Крамер,Дэвид Л. Стивенс. Сети TCP/IP .Разработка
приложений Типа клиент/сервер для LINUX/POSIX . Том 3
Издательский дом "Вильямс",2002 Автор: Б.А. Державец |