Don't read thread_ctx nb_available_sockets and nb_cur_sockets as they may be changed. Use temporary variables. Use microseconds timeout instead of seconds

This commit is contained in:
Grégory Soutadé 2016-02-04 20:39:50 +01:00
parent a10ce76381
commit d110f892b4

View File

@ -180,7 +180,7 @@ static inline void close_socket(socket_ctx_t* socket)
static void* thread_loop(void* param) static void* thread_loop(void* param)
{ {
thread_ctx_t* ctx = (thread_ctx_t*)param; thread_ctx_t* ctx = (thread_ctx_t*)param;
int i, ret, nfds; int i, ret, nfds, nb_cur_sockets, nb_available_sockets;
fd_set read_set, exc_set; fd_set read_set, exc_set;
struct timeval timeout; struct timeval timeout;
@ -190,21 +190,29 @@ static void* thread_loop(void* param)
FD_ZERO(&exc_set); FD_ZERO(&exc_set);
nfds = 0; nfds = 0;
for(i=0; i<ctx->nb_cur_sockets; i++) pthread_mutex_lock(&ctx->mutex);
nb_cur_sockets = ctx->nb_cur_sockets;
nb_available_sockets = ctx->nb_available_sockets;
pthread_mutex_unlock(&ctx->mutex);
for(i=0; i<nb_cur_sockets; i++)
{ {
if (ctx->sockets[i].timeout > 0) if (ctx->sockets[i].timeout > 0)
{ {
FD_SET(ctx->sockets[i].socket, &read_set); FD_SET(ctx->sockets[i].socket, &read_set);
FD_SET(ctx->sockets[i].socket, &exc_set); FD_SET(ctx->sockets[i].socket, &exc_set);
if (ctx->sockets[i].socket+1 > nfds) if (ctx->sockets[i].socket > nfds)
nfds = ctx->sockets[i].socket+1; nfds = ctx->sockets[i].socket;
} }
} }
if (!nfds) if (!nfds)
{ {
// No more active socket for this thread /*
if (!ctx->nb_available_sockets) No more active socket for this thread
nor available slots
*/
if (!nb_available_sockets)
break; break;
usleep(100); usleep(100);
@ -214,15 +222,14 @@ static void* thread_loop(void* param)
timeout.tv_sec = ctx->max_timeout; timeout.tv_sec = ctx->max_timeout;
timeout.tv_usec = 0; timeout.tv_usec = 0;
ret = select(nfds, &read_set, NULL, &exc_set, &timeout); ret = select(nfds+1, &read_set, NULL, &exc_set, &timeout);
pthread_mutex_lock(&ctx->mutex);
// Timeout, remove all current sockets // Timeout, remove all current sockets
if (ret == 0) if (ret == 0)
{ {
if (ctx->quiet < 0) if (ctx->quiet < 0)
syslog(LOG_DEBUG, "Timeout"); syslog(LOG_DEBUG, "Timeout");
for(i=0; i<ctx->nb_cur_sockets; i++) for(i=0; i<nb_cur_sockets; i++)
{ {
if (ctx->sockets[i].timeout > 0) if (ctx->sockets[i].timeout > 0)
close_socket(&ctx->sockets[i]); close_socket(&ctx->sockets[i]);
@ -235,16 +242,15 @@ static void* thread_loop(void* param)
} }
else else
{ {
for(i=0; i<ctx->nb_cur_sockets; i++) for(i=0; i<nb_cur_sockets; i++)
{ {
if (ctx->sockets[i].timeout < 0) continue; if (ctx->sockets[i].timeout < 0) continue;
if (FD_ISSET(ctx->sockets[i].socket, &exc_set)) if (FD_ISSET(ctx->sockets[i].socket, &exc_set))
{ {
close_socket(&ctx->sockets[i]); close_socket(&ctx->sockets[i]);
continue;
} }
// Someone is speaking // Someone is speaking
if (FD_ISSET(ctx->sockets[i].socket, &read_set)) else if (FD_ISSET(ctx->sockets[i].socket, &read_set))
{ {
ctx->sockets[i].timeout = ctx->max_timeout; ctx->sockets[i].timeout = ctx->max_timeout;
ret = handle_request(ctx, ctx->sockets[i].socket); ret = handle_request(ctx, ctx->sockets[i].socket);
@ -256,7 +262,7 @@ static void* thread_loop(void* param)
close_socket(&ctx->sockets[i]); close_socket(&ctx->sockets[i]);
} }
// No more requests accepted // No more requests accepted
if (!ctx->sockets[i].nb_remaining_requests--) else if (!ctx->sockets[i].nb_remaining_requests--)
{ {
if (ctx->quiet < 0) if (ctx->quiet < 0)
syslog(LOG_DEBUG, "Max requests reached for socket %d", syslog(LOG_DEBUG, "Max requests reached for socket %d",
@ -267,11 +273,10 @@ static void* thread_loop(void* param)
} }
else else
{ {
ctx->sockets[i].timeout -= timeout.tv_sec; ctx->sockets[i].timeout -= timeout.tv_sec*1000000 + timeout.tv_usec;
} }
} }
} }
pthread_mutex_unlock(&ctx->mutex);
}; };
delete_thread(ctx); delete_thread(ctx);
@ -291,7 +296,7 @@ static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params
thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg); thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
thread_ctx->nb_cur_sockets = 0; thread_ctx->nb_cur_sockets = 0;
thread_ctx->nb_available_sockets = params->sockets_per_thread_arg; thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
thread_ctx->max_timeout = params->sockets_timeout_arg; thread_ctx->max_timeout = params->sockets_timeout_arg*1000000;
thread_ctx->stop = 0; thread_ctx->stop = 0;
thread_ctx->quiet = params->quiet_flag; thread_ctx->quiet = params->quiet_flag;
if (params->verbose_flag) if (params->verbose_flag)