/ nmux.cpp
nmux.cpp
1 /* 2 This software is part of libcsdr, a set of simple DSP routines for 3 Software Defined Radio. 4 5 Copyright (c) 2014, Andras Retzler <randras@sdr.hu> 6 All rights reserved. 7 8 Redistribution and use in source and binary forms, with or without 9 modification, are permitted provided that the following conditions are met: 10 * Redistributions of source code must retain the above copyright 11 notice, this list of conditions and the following disclaimer. 12 * Redistributions in binary form must reproduce the above copyright 13 notice, this list of conditions and the following disclaimer in the 14 documentation and/or other materials provided with the distribution. 15 * Neither the name of the copyright holder nor the 16 names of its contributors may be used to endorse or promote products 17 derived from this software without specific prior written permission. 18 19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 DISCLAIMED. IN NO EVENT SHALL ANDRAS RETZLER BE LIABLE FOR ANY 23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "nmux.h" 32 33 char help_text[]="nmux is a TCP stream multiplexer. It reads data from the standard input, and sends it to each client connected through TCP sockets. Available command line options are:\n" 34 "\t--port (-p), --address (-a): TCP port and address to listen.\n" 35 "\t--bufsize (-b), --bufcnt (-n): Internal buffer size and count.\n" 36 "\t--help (-h): Show this message.\n"; 37 38 int host_port = 0; 39 char host_address[100] = "127.0.0.1"; 40 int thread_cntr = 0; 41 42 //CLI parameters 43 int bufsize = 1024; 44 int bufcnt = 1024; 45 46 char** global_argv; 47 int global_argc; 48 tsmpool* pool; 49 50 pthread_cond_t wait_condition; 51 pthread_mutex_t wait_mutex; 52 53 void sig_handler(int signo) 54 { 55 fprintf(stderr, MSG_START "signal %d caught, exiting...\n", signo); 56 fflush(stderr); 57 exit(0); 58 } 59 60 int main(int argc, char* argv[]) 61 { 62 global_argv = argv; 63 global_argc = argc; 64 int c; 65 int no_options = 1; 66 for(;;) 67 { 68 int option_index = 0; 69 static struct option long_options[] = { 70 {"port", required_argument, 0, 'p' }, 71 {"address", required_argument, 0, 'a' }, 72 {"bufsize", required_argument, 0, 'b' }, 73 {"bufcnt", required_argument, 0, 'n' }, 74 {"help", no_argument, 0, 'h' }, 75 {0, 0, 0, 0 } 76 }; 77 c = getopt_long(argc, argv, "p:a:b:n:h", long_options, &option_index); 78 if(c==-1) break; 79 no_options = 0; 80 switch (c) 81 { 82 case 'a': 83 host_address[100-1]=0; 84 strncpy(host_address,optarg,100-1); 85 break; 86 case 'p': 87 host_port=atoi(optarg); 88 break; 89 case 'b': 90 bufsize=atoi(optarg); 91 break; 92 case 'n': 93 bufcnt=atoi(optarg); 94 break; 95 case 'h': 96 print_exit(help_text); 97 break; 98 case 0: 99 case '?': 100 case ':': 101 default: 102 print_exit(MSG_START "error in getopt_long()\n"); 103 } 104 } 105 106 if(no_options) print_exit(help_text); 107 if(!host_port) print_exit(MSG_START "missing required command line argument, --port.\n"); 108 if(bufsize<=0) print_exit(MSG_START "invalid value for --bufsize (should be >0)\n"); 109 if(bufcnt<=0) print_exit(MSG_START "invalid value for --bufcnt (should be >0)\n"); 110 111 //set signals 112 struct sigaction sa; 113 memset(&sa, 0, sizeof(sa)); 114 sa.sa_handler = sig_handler; 115 sigaction(SIGTERM, &sa, NULL); 116 sigaction(SIGKILL, &sa, NULL); 117 sigaction(SIGQUIT, &sa, NULL); 118 sigaction(SIGINT, &sa, NULL); 119 sigaction(SIGHUP, &sa, NULL); 120 121 struct sockaddr_in addr_host; 122 int listen_socket; 123 std::vector<client_t*> clients; 124 clients.reserve(100); 125 listen_socket=socket(AF_INET,SOCK_STREAM,0); 126 127 int sockopt = 1; 128 if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) 129 error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 130 131 memset(&addr_host,'0',sizeof(addr_host)); 132 addr_host.sin_family = AF_INET; 133 addr_host.sin_port = htons(host_port); 134 addr_host.sin_addr.s_addr = INADDR_ANY; 135 136 if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) 137 error_exit(MSG_START "invalid host address"); 138 139 if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) 140 error_exit(MSG_START "cannot bind() address to the socket"); 141 142 if( listen(listen_socket, 10) == -1 ) 143 error_exit(MSG_START "cannot listen() on socket"); 144 145 fprintf(stderr, MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); 146 147 struct sockaddr_in addr_cli; 148 socklen_t addr_cli_len = sizeof(addr_cli); 149 int new_socket; 150 151 int highfd = 0; 152 maxfd(&highfd, listen_socket); 153 maxfd(&highfd, STDIN_FILENO); 154 155 fd_set select_fds; 156 157 //Set stdin and listen_socket to non-blocking 158 if(set_nonblocking(STDIN_FILENO) || set_nonblocking(listen_socket)) 159 error_exit(MSG_START "cannot set_nonblocking()"); 160 161 //Create tsmpool 162 pool = new tsmpool(bufsize, bufcnt); 163 if(!pool->is_ok()) print_exit(MSG_START "tsmpool failed to initialize\n"); 164 165 unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer(); 166 int index_in_current_write_buffer = 0; 167 168 //Create wait condition: client threads waiting for input data from the main thread will be 169 // waiting on this condition. They will be woken up with pthread_cond_broadcast() if new 170 // data arrives. 171 if(pthread_cond_init(&wait_condition, NULL)) 172 print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux 173 174 if(pthread_mutex_init(&wait_mutex, NULL)) 175 print_exit(MSG_START "pthread_mutex_t failed"); //cond_attrs is ignored by Linux 176 177 for(;;) 178 { 179 FD_ZERO(&select_fds); 180 FD_SET(listen_socket, &select_fds); 181 FD_SET(STDIN_FILENO, &select_fds); 182 183 if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting..."); 184 //Let's wait until there is any new data to read, or any new connection! 185 int select_ret = select(highfd, &select_fds, NULL, NULL, NULL); 186 if(NMUX_DEBUG) fprintf(stderr, "selected.\n"); 187 if(select_ret == -1) error_exit("mainfor select() error"); 188 189 //Is there a new client connection? 190 if( FD_ISSET(listen_socket, &select_fds) && ((new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) ) 191 { 192 if(NMUX_DEBUG) 193 { 194 fprintf(stderr, "\x1b[1m\x1b[33mmainfor: clients before closing: "); 195 for(int i=0;i<clients.size();i++) fprintf(stderr, "%p ", clients[i]); 196 fprintf(stderr, "\x1b[0m\n"); 197 } 198 if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (socket = %d).\n", new_socket); 199 //Close all finished clients 200 for(int i=0;i<clients.size();i++) 201 { 202 if(clients[i]->status == CS_THREAD_FINISHED) 203 { 204 if(pthread_detach(clients[i]->thread)!=0) 205 { 206 fprintf(stderr,"nmux pthread_detach failed for client %d\n", i); 207 continue; 208 } 209 210 if(NMUX_DEBUG) fprintf(stderr, "mainfor: client removed: %d\n", i); 211 //client destructor 212 pool->remove_thread(clients[i]->tsmthread); 213 clients.erase(clients.begin()+i); 214 i--; 215 } 216 } 217 if(NMUX_DEBUG) 218 { 219 fprintf(stderr, "\x1b[1m\x1b[33mmainfor: clients after closing: "); 220 for(int i=0;i<clients.size();i++) fprintf(stderr, "%p ", clients[i]); 221 fprintf(stderr, "\x1b[0m\n"); 222 } 223 224 //We're the parent, let's create a new client and initialize it 225 client_t* new_client = new client_t; 226 new_client->error = 0; 227 memcpy(&new_client->addr, &addr_cli, sizeof(struct sockaddr_in)); 228 new_client->socket = new_socket; 229 new_client->status = CS_CREATED; 230 new_client->tsmthread = pool->register_thread(); 231 new_client->lpool = pool; 232 new_client->sleeping = 0; 233 if(pthread_create(&new_client->thread, NULL, client_thread, (void*)new_client)==0) 234 { 235 clients.push_back(new_client); 236 fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", (int)clients.size()); 237 } 238 else 239 { 240 fprintf(stderr, MSG_START "pthread_create() failed.\n"); 241 pool->remove_thread(new_client->tsmthread); 242 delete new_client; 243 } 244 } 245 246 if( FD_ISSET(STDIN_FILENO, &select_fds) ) 247 { 248 if(index_in_current_write_buffer >= bufsize) 249 { 250 if(NMUX_DEBUG) fprintf(stderr, "mainfor: gwbing..."); 251 current_write_buffer = (unsigned char*)pool->get_write_buffer(); 252 if(NMUX_DEBUG) fprintf(stderr, "gwbed.\nmainfor: cond broadcasting..."); 253 pthread_mutex_lock(&wait_mutex); 254 pthread_cond_broadcast(&wait_condition); 255 pthread_mutex_unlock(&wait_mutex); 256 if(NMUX_DEBUG) fprintf(stderr, "cond broadcasted.\n"); 257 //Shouldn't we do it after we put data in? 258 // No, on get_write_buffer() actually the previous buffer is getting available 259 // for read for threads that wait for new data (wait on global pthead mutex 260 // wait_condition). 261 index_in_current_write_buffer = 0; 262 } 263 264 if(NMUX_DEBUG) fprintf(stderr, "mainfor: reading...\n"); 265 int read_ret = read(STDIN_FILENO, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); 266 if(NMUX_DEBUG) fprintf(stderr, "read %d\n", read_ret); 267 if(read_ret>0) 268 { 269 index_in_current_write_buffer += read_ret; 270 } 271 else if(read_ret==0) 272 { 273 //End of input stream, close clients and exit 274 print_exit(MSG_START "(main thread/for) end input stream, exiting.\n"); 275 } 276 else if(read_ret==-1) 277 { 278 if(errno == EAGAIN) { if(NMUX_DEBUG) fprintf(stderr, "mainfor: read EAGAIN\n"); /* seems like select would block forever, so we just read again */ } 279 else error_exit(MSG_START "(main thread/for) error in read(), exiting.\n"); 280 } 281 } 282 } 283 } 284 285 void* client_thread (void* param) 286 { 287 fprintf(stderr, "client %p: started!\n", param); 288 client_t* this_client = (client_t*)param; 289 this_client->status = CS_THREAD_RUNNING; 290 int retval; 291 tsmpool* lpool = this_client->lpool; 292 if(NMUX_DEBUG) fprintf(stderr, "client %p: socket = %d!\n", param, this_client->socket); 293 294 if(NMUX_DEBUG) fprintf(stderr, "client %p: poll init...", param); 295 struct pollfd pollfds[1]; 296 pollfds[0].fd = this_client->socket; 297 pollfds[0].events = POLLOUT; 298 pollfds[0].revents = 0; 299 if(NMUX_DEBUG) fprintf(stderr, "client poll inited.\n"); 300 301 //Set this_client->socket to non-blocking 302 if(set_nonblocking(this_client->socket)) 303 error_exit(MSG_START "cannot set_nonblocking() on this_client->socket"); 304 305 int client_buffer_index = 0; 306 int client_goto_source = 0; 307 char* pool_read_buffer = NULL; 308 309 for(;;) 310 { 311 //Wait until there is any data to send. 312 // If I haven't sent all the data from my last buffer, don't wait. 313 // (Wait for the server process to wake me up.) 314 while(!pool_read_buffer || client_buffer_index >= lpool->size) 315 { 316 if(NMUX_DEBUG) fprintf(stderr, "client %p: trying to grb\n", param); 317 pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread); 318 if(pool_read_buffer) { client_buffer_index = 0; break; } 319 if(NMUX_DEBUG) fprintf(stderr, "client %p: cond_waiting for more data\n", param); 320 pthread_mutex_lock(&wait_mutex); 321 this_client->sleeping = 1; 322 pthread_cond_wait(&wait_condition, &wait_mutex); 323 pthread_mutex_unlock(&wait_mutex); 324 } 325 326 //Wait for the socket to be available for write. 327 if(NMUX_DEBUG) fprintf(stderr, "client %p: polling for socket write...", param); 328 int ret = poll(pollfds, 1, -1); 329 if(NMUX_DEBUG) fprintf(stderr, "client polled for socket write.\n"); 330 if(ret == 0) continue; 331 else if (ret == -1) { client_goto_source = 1; goto client_thread_exit; } 332 333 //Read data from global tsmpool and write it to client socket 334 if(NMUX_DEBUG) fprintf(stderr, "client %p: sending...", param); 335 ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, MSG_NOSIGNAL); 336 if(NMUX_DEBUG) fprintf(stderr, "client sent.\n"); 337 if(ret == -1) 338 { 339 switch(errno) 340 { 341 case EAGAIN: break; 342 default: client_goto_source = 2; goto client_thread_exit; 343 } 344 } 345 else client_buffer_index += ret; 346 } 347 348 client_thread_exit: 349 fprintf(stderr, "client %p: CS_THREAD_FINISHED, client_goto_source = %d, errno = %d", param, client_goto_source, errno); 350 this_client->status = CS_THREAD_FINISHED; 351 pthread_exit(NULL); 352 return NULL; 353 } 354 355 356 int set_nonblocking(int fd) 357 { 358 int flagtmp; 359 if((flagtmp = fcntl(fd, F_GETFL))!=-1) 360 if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1) 361 return 0; 362 return 1; 363 } 364 365 void error_exit(const char* why) 366 { 367 perror(why); //do we need a \n at the end of (why)? 368 exit(1); 369 } 370 371 void print_exit(const char* why) 372 { 373 fprintf(stderr, "%s", why); 374 exit(1); 375 } 376 377 void maxfd(int* maxfd, int fd) 378 { 379 if(fd>=*maxfd) *maxfd=fd+1; 380 }