I'd like to develop a multithreaded UDP server in C/Linux. The service is running on a single port x, thus there's only the possibility to bind a single UDP socket to it. In order to work under high loads, I have n threads (statically defined), say 1 thread per CPU. Work could be delivered to the thread using epoll_wait, so threads get woken up on demand with 'EPOLLET | EPOLLONESHOT'. I've attached a code example:
static int epfd;
static sig_atomic_t sigint = 0;
...
/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
int rc, i, sock, nfds;
struct epoll_event ep, *events = (struct epoll_event *) pevents;
while (!sigint) {
nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);
for (i = 0; i < nfds; ++i) {
if (events[i].data.fd < 0)
continue;
sock = events[i].data.fd;
if((events[i].events & EPOLLIN) == EPOLLIN) {
printf("Event dispatch!\n");
handle_request(sock); // do a recvfrom
} else
whine("Unknown poll event!\n");
memset(&ep, 0, sizeof(ep));
ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
ep.data.fd = sock;
rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
if(rc < 0)
error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
}
}
pthread_exit(NULL);
}
int main(int argc, char **argv)
{
int rc, i, cpu, sock, opts;
struct sockaddr_in sin;
struct epoll_event ep, *events;
char *local_addr = "192.168.1.108";
void *status;
pthread_t *threads = NULL;
cpu_set_t cpuset;
threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);
sock = socket(PF_INET, SOCK_DGRAM, 0);
if (sock < 0)
error_and_die(EXIT_FAILURE, "Cannot create socket!\n");
/* Non-blocking */
opts = fcntl(sock, F_GETFL);
if(opts < 0)
error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
opts |= O_NONBLOCK;
rc = fcntl(sock, F_SETFL, opts);
if(rc < 0)
error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");
/* Initial epoll setup */
epfd = epoll_create(MAX_EVENT_NUM);
if(epfd < 0)
error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");
memset(&ep, 0, sizeof(ep));
ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
ep.data.fd = sock;
rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
if(rc < 0)
error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
/* Socket binding */
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = inet_addr(local_addr);
sin.sin_port = htons(port_xy);
rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
if (rc < 0)
error_and_die(EXIT_FAILURE, "Problem binding to port! "
"Already in use?\n");
register_signal(SIGINT, &signal_handler);
/* Thread initialization */
for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
rc = pthread_create(&threads[i], NULL, process_clients, events);
if (rc != 0)
error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
if (rc != 0)
error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");
cpu = (cpu + 1) % NR_CPUS_ON;
}
printf("up and running!\n");
/* Thread joining */
for (i = 0; i < MAX_THRD_NUM; ++i) {
rc = pthread_join(threads[i], &status);
if (rc != 0)
error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
}
close(sock);
xfree(threads);
xfree(events);
printf("shut down!\n");
return 0;
}
Is this the proper way of handling this scenario with epoll? Should the function _handle_request_ return as fast as possible, because for this time the eventqueue for the socket is blocked?!
Thanks for replies!
As you are only using a single UDP socket, there is no point using epoll - just use a blocking recvfrom instead.
Now, depending on the protocol you need to handle - if you can process each UDP packet individually - you can actually call recvfrom concurrently from multiple threads (in a thread pool). The OS will take care that exactly one thread will receive the UDP packet. This thread can then do whatever it needs to do in handle_request.
However, if you need to process the UDP packets in a particular order, you'll probably not have that many opportunities to parallalise your program...