/ nmux.cpp
nmux.cpp
  1  /*
  2  This software is part of libcsdr, a set of simple DSP routines for
  3  Software Defined Radio.
  4  
  5  Copyright (c) 2014, Andras Retzler <randras@sdr.hu>
  6  All rights reserved.
  7  
  8  Redistribution and use in source and binary forms, with or without
  9  modification, are permitted provided that the following conditions are met:
 10      * Redistributions of source code must retain the above copyright
 11        notice, this list of conditions and the following disclaimer.
 12      * Redistributions in binary form must reproduce the above copyright
 13        notice, this list of conditions and the following disclaimer in the
 14        documentation and/or other materials provided with the distribution.
 15      * Neither the name of the copyright holder nor the
 16        names of its contributors may be used to endorse or promote products
 17        derived from this software without specific prior written permission.
 18  
 19  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 20  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 21  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 22  DISCLAIMED. IN NO EVENT SHALL ANDRAS RETZLER BE LIABLE FOR ANY
 23  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 24  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 25  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 26  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 27  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 28  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 29  */
 30  
 31  #include "nmux.h"
 32  
 33  char help_text[]="nmux is a TCP stream multiplexer. It reads data from the standard input, and sends it to each client connected through TCP sockets. Available command line options are:\n"
 34  "\t--port (-p), --address (-a): TCP port and address to listen.\n"
 35  "\t--bufsize (-b), --bufcnt (-n): Internal buffer size and count.\n"
 36  "\t--help (-h): Show this message.\n";
 37  
 38  int host_port = 0;
 39  char host_address[100] = "127.0.0.1";
 40  int thread_cntr = 0;
 41  
 42  //CLI parameters
 43  int bufsize = 1024; 
 44  int bufcnt = 1024;
 45  
 46  char** global_argv;
 47  int global_argc;
 48  tsmpool* pool;
 49  
 50  pthread_cond_t wait_condition;
 51  pthread_mutex_t wait_mutex;
 52  
 53  void sig_handler(int signo)
 54  {
 55  	fprintf(stderr, MSG_START "signal %d caught, exiting...\n", signo);
 56  	fflush(stderr);
 57  	exit(0);
 58  }
 59  
 60  int main(int argc, char* argv[])
 61  {
 62  	global_argv = argv;
 63  	global_argc = argc;
 64  	int c;
 65  	int no_options = 1;
 66  	for(;;)
 67  	{
 68  		int option_index = 0;
 69  		static struct option long_options[] = {
 70  		   {"port",       required_argument, 0,  'p' },
 71  		   {"address",    required_argument, 0,  'a' },
 72  		   {"bufsize", 	  required_argument, 0,  'b' },
 73  		   {"bufcnt", 	  required_argument, 0,  'n' },
 74  		   {"help", 	  no_argument, 		 0,  'h' },
 75  		   {0,			  0,                 0,  0   }
 76  		};
 77  		c = getopt_long(argc, argv, "p:a:b:n:h", long_options, &option_index);
 78  		if(c==-1) break;
 79  		no_options = 0;
 80  		switch (c)
 81  		{
 82  		case 'a':
 83  			host_address[100-1]=0;
 84  			strncpy(host_address,optarg,100-1);
 85  			break;
 86  		case 'p':
 87  			host_port=atoi(optarg);
 88  			break;
 89  		case 'b':
 90  			bufsize=atoi(optarg);
 91  			break;
 92  		case 'n':
 93  			bufcnt=atoi(optarg);
 94  			break;
 95  		case 'h':
 96  			print_exit(help_text);
 97  			break;
 98  		case 0:
 99  		case '?':
100  		case ':':
101  		default:
102  			print_exit(MSG_START "error in getopt_long()\n");
103  		}
104  	}
105  
106  	if(no_options) print_exit(help_text);
107  	if(!host_port) print_exit(MSG_START "missing required command line argument, --port.\n");
108  	if(bufsize<=0) print_exit(MSG_START "invalid value for --bufsize (should be >0)\n");
109  	if(bufcnt<=0) print_exit(MSG_START "invalid value for --bufcnt (should be >0)\n");
110  
111  	//set signals
112  	struct sigaction sa;
113  	memset(&sa, 0, sizeof(sa));
114  	sa.sa_handler = sig_handler;
115  	sigaction(SIGTERM, &sa, NULL);
116  	sigaction(SIGKILL, &sa, NULL);
117  	sigaction(SIGQUIT, &sa, NULL);
118  	sigaction(SIGINT, &sa, NULL);
119  	sigaction(SIGHUP, &sa, NULL);
120  
121  	struct sockaddr_in addr_host;
122      int listen_socket;
123  	std::vector<client_t*> clients;
124  	clients.reserve(100);
125      listen_socket=socket(AF_INET,SOCK_STREAM,0);
126  
127  	int sockopt = 1;
128  	if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 )
129  		error_exit(MSG_START "cannot set SO_REUSEADDR");  //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453
130  
131  	memset(&addr_host,'0',sizeof(addr_host));
132      addr_host.sin_family = AF_INET;
133      addr_host.sin_port = htons(host_port);
134  	addr_host.sin_addr.s_addr = INADDR_ANY;
135  
136      if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE )
137  		error_exit(MSG_START "invalid host address");
138  
139  	if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 )
140  		error_exit(MSG_START "cannot bind() address to the socket");
141  
142  	if( listen(listen_socket, 10) == -1 )
143  		error_exit(MSG_START "cannot listen() on socket");
144  
145  	fprintf(stderr, MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port);
146  
147  	struct sockaddr_in addr_cli;
148  	socklen_t addr_cli_len = sizeof(addr_cli);
149  	int new_socket;
150  
151  	int highfd = 0;
152  	maxfd(&highfd, listen_socket);
153  	maxfd(&highfd, STDIN_FILENO);
154  	
155  	fd_set select_fds;
156  
157  	//Set stdin and listen_socket to non-blocking
158  	if(set_nonblocking(STDIN_FILENO) || set_nonblocking(listen_socket))
159  		error_exit(MSG_START "cannot set_nonblocking()");
160  
161  	//Create tsmpool
162  	pool = new tsmpool(bufsize, bufcnt);
163  	if(!pool->is_ok()) print_exit(MSG_START "tsmpool failed to initialize\n");
164  
165  	unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer();
166  	int index_in_current_write_buffer = 0;
167  
168  	//Create wait condition: client threads waiting for input data from the main thread will be
169  	//	waiting on this condition. They will be woken up with pthread_cond_broadcast() if new
170  	//	data arrives.
171  	if(pthread_cond_init(&wait_condition, NULL)) 
172  		print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux
173  	
174  	if(pthread_mutex_init(&wait_mutex, NULL))
175  		print_exit(MSG_START "pthread_mutex_t failed"); //cond_attrs is ignored by Linux
176  
177  	for(;;)
178  	{
179  		FD_ZERO(&select_fds);
180  		FD_SET(listen_socket, &select_fds);
181  		FD_SET(STDIN_FILENO, &select_fds);
182  
183  		if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting...");
184  		//Let's wait until there is any new data to read, or any new connection!
185  		int select_ret = select(highfd, &select_fds, NULL, NULL, NULL);
186  		if(NMUX_DEBUG) fprintf(stderr, "selected.\n");
187  		if(select_ret == -1) error_exit("mainfor select() error");
188  
189  		//Is there a new client connection?
190  		if( FD_ISSET(listen_socket, &select_fds) && ((new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) )
191  		{
192  			if(NMUX_DEBUG) 
193  			{
194  				fprintf(stderr, "\x1b[1m\x1b[33mmainfor: clients before closing: ");
195  				for(int i=0;i<clients.size();i++) fprintf(stderr, "%p ", clients[i]);
196  				fprintf(stderr, "\x1b[0m\n");
197  			}
198  			if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (socket = %d).\n", new_socket);
199  			//Close all finished clients
200  			for(int i=0;i<clients.size();i++)
201  			{
202  				if(clients[i]->status == CS_THREAD_FINISHED)
203  				{
204                      if(pthread_detach(clients[i]->thread)!=0)
205                      {
206                          fprintf(stderr,"nmux pthread_detach failed for client %d\n", i);
207                          continue;
208                      }
209  
210  					if(NMUX_DEBUG) fprintf(stderr, "mainfor: client removed: %d\n", i);
211  					//client destructor
212  					pool->remove_thread(clients[i]->tsmthread);
213  					clients.erase(clients.begin()+i);
214  					i--;
215  				}
216  			}
217  			if(NMUX_DEBUG) 
218  			{
219  				fprintf(stderr, "\x1b[1m\x1b[33mmainfor: clients after closing: ");
220  				for(int i=0;i<clients.size();i++) fprintf(stderr, "%p ", clients[i]);
221  				fprintf(stderr, "\x1b[0m\n");
222  			}
223  
224  			//We're the parent, let's create a new client and initialize it
225  			client_t* new_client = new client_t;
226  			new_client->error = 0;
227  			memcpy(&new_client->addr, &addr_cli, sizeof(struct sockaddr_in));
228  			new_client->socket = new_socket;
229  			new_client->status = CS_CREATED;
230  			new_client->tsmthread = pool->register_thread();
231  			new_client->lpool = pool;
232  			new_client->sleeping = 0;
233  			if(pthread_create(&new_client->thread, NULL, client_thread, (void*)new_client)==0)
234  			{
235  				clients.push_back(new_client);
236  				fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", (int)clients.size());
237  			}
238  			else
239  			{
240  				fprintf(stderr, MSG_START "pthread_create() failed.\n");
241  				pool->remove_thread(new_client->tsmthread);
242  				delete new_client;
243  			}
244  		}
245  
246  		if( FD_ISSET(STDIN_FILENO, &select_fds) )
247  		{
248  			if(index_in_current_write_buffer >= bufsize)
249  			{
250  				if(NMUX_DEBUG) fprintf(stderr, "mainfor: gwbing...");
251  				current_write_buffer = (unsigned char*)pool->get_write_buffer();
252  				if(NMUX_DEBUG) fprintf(stderr, "gwbed.\nmainfor: cond broadcasting...");
253  				pthread_mutex_lock(&wait_mutex);
254  				pthread_cond_broadcast(&wait_condition); 
255  				pthread_mutex_unlock(&wait_mutex);
256  				if(NMUX_DEBUG) fprintf(stderr, "cond broadcasted.\n");
257  					//Shouldn't we do it after we put data in?
258  					//	No, on get_write_buffer() actually the previous buffer is getting available 
259  					//	for read for threads that wait for new data (wait on global pthead mutex 
260  					//	wait_condition). 
261  				index_in_current_write_buffer = 0;
262  			}
263  
264  			if(NMUX_DEBUG) fprintf(stderr, "mainfor: reading...\n");
265  			int read_ret = read(STDIN_FILENO, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
266  			if(NMUX_DEBUG) fprintf(stderr, "read %d\n", read_ret);
267  			if(read_ret>0)
268  			{
269  				index_in_current_write_buffer += read_ret;
270  			}
271  			else if(read_ret==0)
272  			{
273  				//End of input stream, close clients and exit
274  				print_exit(MSG_START "(main thread/for) end input stream, exiting.\n");
275  			}
276  			else if(read_ret==-1)
277  			{
278  				if(errno == EAGAIN) { if(NMUX_DEBUG) fprintf(stderr, "mainfor: read EAGAIN\n"); /* seems like select would block forever, so we just read again */ }
279  				else error_exit(MSG_START "(main thread/for) error in read(), exiting.\n");
280  			}
281  		}
282  	}
283  }
284  
285  void* client_thread (void* param)
286  {
287  	fprintf(stderr, "client %p: started!\n", param);
288  	client_t* this_client = (client_t*)param;
289  	this_client->status = CS_THREAD_RUNNING;
290  	int retval;
291  	tsmpool* lpool = this_client->lpool;
292  	if(NMUX_DEBUG) fprintf(stderr, "client %p: socket = %d!\n", param, this_client->socket);
293  
294  	if(NMUX_DEBUG) fprintf(stderr, "client %p: poll init...", param);
295  	struct pollfd pollfds[1];
296  	pollfds[0].fd = this_client->socket;
297  	pollfds[0].events = POLLOUT;
298  	pollfds[0].revents = 0;
299  	if(NMUX_DEBUG) fprintf(stderr, "client poll inited.\n");
300  
301  	//Set this_client->socket to non-blocking
302  	if(set_nonblocking(this_client->socket))
303  		error_exit(MSG_START "cannot set_nonblocking() on this_client->socket");
304  
305  	int client_buffer_index = 0;
306  	int client_goto_source = 0;
307  	char* pool_read_buffer = NULL;
308  
309  	for(;;)
310  	{
311  		//Wait until there is any data to send.
312  		//  If I haven't sent all the data from my last buffer, don't wait.
313  		//	(Wait for the server process to wake me up.)
314  		while(!pool_read_buffer || client_buffer_index >= lpool->size)
315  		{
316  			if(NMUX_DEBUG) fprintf(stderr, "client %p: trying to grb\n", param);
317  			pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread);
318  			if(pool_read_buffer) { client_buffer_index = 0; break; }
319  			if(NMUX_DEBUG) fprintf(stderr, "client %p: cond_waiting for more data\n", param);
320  			pthread_mutex_lock(&wait_mutex);
321  			this_client->sleeping = 1;
322  			pthread_cond_wait(&wait_condition, &wait_mutex);
323  			pthread_mutex_unlock(&wait_mutex);
324  		}
325  
326  		//Wait for the socket to be available for write.
327  		if(NMUX_DEBUG) fprintf(stderr, "client %p: polling for socket write...", param);
328  		int ret = poll(pollfds, 1, -1);
329  		if(NMUX_DEBUG) fprintf(stderr, "client polled for socket write.\n");
330  		if(ret == 0) continue;
331  		else if (ret == -1) { client_goto_source = 1; goto client_thread_exit; }
332  
333  		//Read data from global tsmpool and write it to client socket
334  		if(NMUX_DEBUG) fprintf(stderr, "client %p: sending...", param);
335  		ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, MSG_NOSIGNAL);
336  		if(NMUX_DEBUG) fprintf(stderr, "client sent.\n");
337  		if(ret == -1) 
338  		{
339  			switch(errno)
340  			{
341  				case EAGAIN: break;
342  				default: client_goto_source = 2; goto client_thread_exit;
343  			}
344  		}
345  		else client_buffer_index += ret;
346  	}
347  
348  client_thread_exit:
349  	fprintf(stderr, "client %p: CS_THREAD_FINISHED, client_goto_source = %d, errno = %d", param, client_goto_source, errno);
350  	this_client->status = CS_THREAD_FINISHED;
351  	pthread_exit(NULL);
352  	return NULL;
353  }
354  
355  
356  int set_nonblocking(int fd)
357  {
358  	int flagtmp;
359  	if((flagtmp = fcntl(fd, F_GETFL))!=-1)
360  		if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1)
361  			return 0;
362  	return 1;
363  }
364  
365  void error_exit(const char* why)
366  {
367  	perror(why); //do we need a \n at the end of (why)?
368  	exit(1);
369  }
370  
371  void print_exit(const char* why)
372  {
373  	fprintf(stderr, "%s", why);
374  	exit(1);
375  }
376  
377  void maxfd(int* maxfd, int fd)
378  {
379  	if(fd>=*maxfd) *maxfd=fd+1;
380  }