/ ddcd_old.cpp
ddcd_old.cpp
1 /* 2 This software is part of libcsdr, a set of simple DSP routines for 3 Software Defined Radio. 4 5 Copyright (c) 2014, Andras Retzler <randras@sdr.hu> 6 All rights reserved. 7 8 Redistribution and use in source and binary forms, with or without 9 modification, are permitted provided that the following conditions are met: 10 * Redistributions of source code must retain the above copyright 11 notice, this list of conditions and the following disclaimer. 12 * Redistributions in binary form must reproduce the above copyright 13 notice, this list of conditions and the following disclaimer in the 14 documentation and/or other materials provided with the distribution. 15 * Neither the name of the copyright holder nor the 16 names of its contributors may be used to endorse or promote products 17 derived from this software without specific prior written permission. 18 19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 DISCLAIMED. IN NO EVENT SHALL ANDRAS RETZLER BE LIABLE FOR ANY 23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "ddcd.h" 32 33 34 #define SOFTWARE_NAME "ddcd" 35 #define MSG_START SOFTWARE_NAME ": " 36 37 int host_port = 0; 38 char host_address[100] = "127.0.0.1"; 39 int decimation = 0; 40 float transition_bw = 0.05; 41 int bufsize = 1024; 42 int bufsizeall; 43 int pipe_max_size; 44 int in_client = 0; 45 char ddc_method_str[100] = "td"; 46 ddc_method_t ddc_method; 47 pid_t main_dsp_proc; 48 49 int input_fd = STDIN_FILENO; //can be stdin, or the stdout of main_subprocess 50 pid_t main_subprocess_pid = 0; 51 pid_t main_subprocess_pgrp = 0; 52 pid_t client_subprocess_pid = 0; 53 pid_t client_subprocess_pgrp = 0; 54 55 char* buf; 56 57 int set_nonblocking(int fd) 58 { 59 int flagtmp; 60 if((flagtmp = fcntl(fd, F_GETFL))!=-1) 61 if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1) 62 return 0; 63 return 1; 64 } 65 66 int proc_exists(pid_t pid) 67 { 68 if(pid==0 || pid==1) return 1; 69 return kill(pid, 0) != -1; 70 } 71 72 void sig_handler(int signo) 73 { 74 int tmpstat; 75 if(signo==SIGPIPE) 76 { 77 fprintf(stderr,MSG_START "SIGPIPE received.\n"); 78 return; 79 } 80 if(signo==SIGCHLD) 81 if( main_subprocess_pid && signo==SIGCHLD && (waitpid(main_subprocess_pid, &tmpstat, WNOHANG), 1) && !proc_exists(main_subprocess_pid) ) 82 { 83 fprintf(stderr,MSG_START "main_subprocess_pid exited! Exiting...\n"); 84 } 85 else return; 86 //if(pgrp!=1 && pgrp!=0) //I just want to make sure that we cannot kill init or sched 87 // killpg(pgrp, signo); 88 if( !in_client && main_subprocess_pid ) killpg2(main_subprocess_pgrp); 89 if( in_client && client_subprocess_pid ) killpg2(client_subprocess_pgrp); 90 fprintf(stderr, MSG_START "signal %d caught in %s, exiting ddcd...\n", signo, (in_client)?"client":"main"); 91 fflush(stderr); 92 exit(0); 93 } 94 95 client_t* this_client; 96 97 int main(int argc, char* argv[]) 98 { 99 int c; 100 fd_set select_fds; 101 102 for(;;) 103 { 104 int option_index = 0; 105 static struct option long_options[] = { 106 {"port", required_argument, 0, 'p' }, 107 {"address", required_argument, 0, 'a' }, 108 {"decimation", required_argument, 0, 'd' }, 109 {"bufsize", required_argument, 0, 'b' }, 110 {"method", required_argument, 0, 'm' }, 111 {"transition", required_argument, 0, 't' } 112 }; 113 c = getopt_long(argc, argv, "p:a:d:b:m:t:", long_options, &option_index); 114 if(c==-1) break; 115 switch (c) 116 { 117 case 'a': 118 host_address[100-1]=0; 119 strncpy(host_address,optarg,100-1); 120 break; 121 case 'p': 122 host_port=atoi(optarg); 123 break; 124 case 'd': 125 decimation=atoi(optarg); 126 break; 127 case 'b': 128 bufsize=atoi(optarg); 129 break; 130 case 'm': 131 ddc_method_str[100-1]=0; 132 strncpy(ddc_method_str,optarg,100-1); 133 break; 134 case 't': 135 sscanf(optarg,"%g",&transition_bw); 136 break; 137 case 0: 138 case '?': 139 case ':': 140 default:; 141 print_exit(MSG_START "error in getopt_long()\n"); 142 } 143 } 144 145 if(!decimation) print_exit(MSG_START "missing required command line argument, --decimation.\n"); 146 if(!host_port) print_exit(MSG_START "missing required command line argument, --port.\n"); 147 if(decimation<0) print_exit(MSG_START "invalid value for --decimation (should be >0).\n"); 148 if(decimation==1) fprintf(stderr, MSG_START "decimation = 1, just copying raw samples.\n"); 149 if(transition_bw<0||transition_bw>0.5) print_exit(MSG_START "invalid value for --transition (should be between 0 and 0.5).\n"); 150 151 if(decimation==1); //don't do anything then 152 else if(!strcmp(ddc_method_str,"td")) 153 { 154 ddc_method = M_TD; 155 fprintf(stderr, MSG_START "method is M_TD (default).\n"); 156 } 157 else if (!strcmp(ddc_method_str,"fastddc")) 158 { 159 ddc_method = M_FASTDDC; 160 fprintf(stderr, MSG_START "method is M_FASTDDC.\n"); 161 } 162 else print_exit(MSG_START "invalid parameter given to --method.\n"); 163 164 //set signals 165 struct sigaction sa; 166 memset(&sa, 0, sizeof(sa)); 167 sa.sa_handler = sig_handler; 168 sigaction(SIGTERM, &sa, NULL); 169 sigaction(SIGKILL, &sa, NULL); 170 sigaction(SIGQUIT, &sa, NULL); 171 sigaction(SIGINT, &sa, NULL); 172 sigaction(SIGHUP, &sa, NULL); 173 sigaction(SIGCHLD, &sa, NULL); 174 sigaction(SIGPIPE, &sa, NULL); 175 prctl(PR_SET_PDEATHSIG, SIGHUP); //get a signal when parent exits 176 177 struct sockaddr_in addr_host; 178 int listen_socket; 179 std::vector<client_t*> clients; 180 clients.reserve(100); 181 listen_socket=socket(AF_INET,SOCK_STREAM,0); 182 183 int sockopt = 1; 184 if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) 185 error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 186 187 memset(&addr_host,'0',sizeof(addr_host)); 188 addr_host.sin_family=AF_INET; 189 addr_host.sin_port=htons(host_port); 190 addr_host.sin_addr.s_addr = INADDR_ANY; 191 192 if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) 193 error_exit(MSG_START "invalid host address"); 194 195 if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) 196 error_exit(MSG_START "cannot bind() address to the socket"); 197 198 if( listen(listen_socket, 10) == -1 ) 199 error_exit(MSG_START "cannot listen() on socket"); 200 201 fprintf(stderr,MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); 202 203 struct sockaddr_in addr_cli; 204 socklen_t addr_cli_len = sizeof(addr_cli); 205 int new_socket; 206 207 bufsizeall = bufsize*sizeof(char); 208 buf = (char*)malloc(bufsizeall); 209 210 FILE* tempfile = fopen("/proc/sys/fs/pipe-max-size","r"); 211 if(!tempfile) 212 { 213 perror(MSG_START "cannot read /proc/sys/fs/pipe-max-size"); 214 } 215 else 216 { 217 char pipe_max_size_str[100]; 218 int tfread = fread(pipe_max_size_str, 1, 100, tempfile); 219 pipe_max_size_str[tfread]='\0'; 220 pipe_max_size = atoi(pipe_max_size_str); 221 //fprintf(stderr, MSG_START "note: pipe_max_size = %d\n", pipe_max_size); 222 //if(pipe_max_size>4096 && fcntl(STDIN_FILENO, F_SETPIPE_SZ, pipe_max_size)==-1) 223 // perror("failed to fcntl(STDIN_FILENO, F_SETPIPE_SZ, ...)"); 224 } 225 226 //We'll see if it is a good idea: 227 //setpgrp(); 228 //pgrp = getpgrp(); 229 //It is not, because we can't catch Ctrl+C (SIGINT), as it is sent to a process group... 230 231 //Start DSP subprocess from the main process if required 232 char main_subprocess_cmd_buf[500]; 233 234 235 int pipe_m2s_ctl[2]; //main to subprocess :: control channel 236 int pipe_s2m[2]; //subprocess to main 237 238 if(pipe(pipe_m2s_ctl)) error_exit(MSG_START "couldn't create pipe_m2s_ctl"); 239 if(pipe(pipe_s2m)) error_exit(MSG_START "couldn't create pipe_s2m"); 240 241 if(decimation!=1) 242 { 243 switch(ddc_method) 244 { 245 case M_TD: 246 break; 247 case M_FASTDDC: 248 sprintf(main_subprocess_cmd_buf, subprocess_args_fastddc_1, decimation, transition_bw); 249 fprintf(stderr, MSG_START "starting main_subprocess_cmd: %s\n", main_subprocess_cmd_buf); 250 if(!(main_subprocess_pid = run_subprocess( main_subprocess_cmd_buf, 0, pipe_s2m, &main_subprocess_pgrp ))) 251 print_exit(MSG_START "couldn't start main_subprocess_cmd!\n"); 252 close(STDIN_FILENO); // redirect stdin to the stdin of the subprocess 253 break; 254 } 255 } 256 257 int highfd = 0; 258 FD_ZERO(&select_fds); 259 FD_SET(listen_socket, &select_fds); 260 maxfd(&highfd, listen_socket); 261 if(main_subprocess_pid) input_fd = pipe_s2m[0]; //else STDIN_FILENO 262 FD_SET(input_fd, &select_fds); 263 maxfd(&highfd, input_fd); 264 265 //Set stdin and listen_socket to non-blocking 266 if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) //don't do it before subprocess fork! 267 error_exit(MSG_START "cannot set_nonblocking()"); 268 269 for(;;) 270 { 271 //Let's wait until there is any new data to read, or any new connection! 272 select(highfd, &select_fds, NULL, NULL, NULL); 273 274 //Is there a new client connection? 275 if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) 276 { 277 this_client = new client_t; 278 this_client->error = 0; 279 memcpy(&this_client->addr, &addr_cli, sizeof(this_client->addr)); 280 this_client->socket = new_socket; 281 if(pipe(this_client->pipefd) == -1) 282 { 283 perror(MSG_START "cannot open new pipe() for the client"); 284 continue; 285 } 286 if(fcntl(this_client->pipefd[1], F_SETPIPE_SZ, pipe_max_size) == -1) 287 perror("failed to F_SETPIPE_SZ for the client pipe"); 288 if(this_client->pid = fork()) 289 { 290 //We're the parent 291 set_nonblocking(this_client->pipefd[1]); 292 clients.push_back(this_client); 293 fprintf(stderr, MSG_START "client pid: %d\n", this_client->pid); 294 } 295 else 296 { 297 //We're the client 298 client(); 299 return 1; 300 } 301 } 302 303 int retval = read(input_fd, buf, bufsizeall); 304 if(retval==0) 305 { 306 //end of input stream, close clients and exit 307 } 308 else if(retval != -1) 309 { 310 for (int i=0; i<clients.size(); i++) 311 { 312 if(write(clients[i]->pipefd[1], buf, retval)==-1) 313 { 314 315 if(!clients[i]->error) 316 { 317 print_client(clients[i], "lost buffer, failed to write pipe."); 318 clients[i]->error=1; 319 } 320 //fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11 321 //int wpstatus; 322 //int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG); 323 //fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid); 324 //perror("somethings wrong"); 325 //if(wpresult == -1) print_client(clients[i], "error while waitpid()!"); 326 //else if(wpresult == 0) 327 waitpid(clients[i]->pid, NULL, WNOHANG); 328 if(!proc_exists(clients[i]->pid)) 329 { 330 //Client exited! 331 print_client(clients[i], "closing client from main process."); 332 close(clients[i]->pipefd[1]); 333 close(clients[i]->socket); 334 delete clients[i]; 335 clients.erase(clients.begin()+i); 336 fprintf(stderr, MSG_START "done closing client from main process.\n"); 337 } 338 } 339 else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; } 340 } 341 } 342 //TODO: at the end, server closes pipefd[1] for client 343 } 344 345 return 0; 346 } 347 348 pid_t run_subprocess(char* cmd, int* pipe_in, int* pipe_out, pid_t* pgrp) 349 { 350 /*char sem_name[101]; 351 snprintf(sem_name,100,"ddcd_sem_%d",getpid()); 352 sem_t mysem; 353 if(sem_init(&mysem, 1, 1)==-1) error_exit("failed to sem_init() in run_subprocess()"); 354 fprintf(stderr, "sem_waiting\n"); 355 if(sem_wait(&mysem)==-1) error_exit("the first sem_wait() failed in run_subprocess()"); 356 fprintf(stderr, "sem_waited\n"); 357 */ 358 int syncpipe[2]; 359 if(pipe(syncpipe)==-1) error_exit("failed to create pipe()"); 360 pid_t pid = fork(); 361 362 if(pid < 0) return 0; //fork failed 363 if(pid == 0) 364 { 365 setpgrp(); 366 write(syncpipe[1], " ", 1); 367 //if(sem_post(&mysem)==-1) error_exit("failed to sem_post() in run_subprocess()"); 368 //We're the subprocess 369 //fprintf(stderr, "run_subprocess :: execl\n"); 370 //if(fcntl(pipe_in[1], F_SETPIPE_SZ, pipe_max_size) == -1) perror("Failed to F_SETPIPE_SZ in run_subprocess()"); 371 if(pipe_in) 372 { 373 close(pipe_in[1]); 374 dup2(pipe_in[0], STDIN_FILENO); 375 } 376 if(pipe_out) 377 { 378 close(pipe_out[0]); 379 dup2(pipe_out[1], STDOUT_FILENO); 380 } 381 execl("/bin/bash","bash","-c",cmd, (char*)0); 382 error_exit(MSG_START "run_subprocess failed to execute command"); 383 } 384 else 385 { 386 //if(sem_wait(&mysem)==-1) error_exit("the second sem_wait() failed in run_subprocess()"); 387 int synctemp; 388 read(syncpipe[0], &synctemp, 1); 389 *pgrp = getpgid(pid); 390 fprintf(stderr, MSG_START "run_subprocess pgid returned = %d\n", *pgrp); 391 return pid; 392 } 393 } 394 395 void print_client(client_t* client, const char* what) 396 { 397 fprintf(stderr,MSG_START "(client %s:%d) %s\n", inet_ntoa(client->addr.sin_addr), client->addr.sin_port, what); 398 } 399 400 #define CTL_BUFSIZE 1024 401 402 int read_socket_ctl(int fd, char* output, int max_size) 403 { 404 //fprintf(stderr, "doing read_socket_ctl %d\n", fd); 405 //if(!fd) return 0; 406 static char buffer[CTL_BUFSIZE]; 407 static int buffer_index=0; 408 if(buffer_index==CTL_BUFSIZE) buffer_index=0; 409 int bytes_read=recv(fd,buffer+buffer_index,(CTL_BUFSIZE-buffer_index)*sizeof(char), MSG_DONTWAIT); 410 if(bytes_read<=0) return 0; 411 //fprintf(stderr, "recv %d\n", bytes_read); 412 413 int prev_newline_at=0; 414 int last_newline_at=0; 415 for(int i=0;i<buffer_index+bytes_read;i++) 416 { 417 if(buffer[i]=='\n') 418 { 419 prev_newline_at=last_newline_at; 420 last_newline_at=i+1; 421 } 422 } 423 if(last_newline_at) 424 { 425 int oi=0; 426 for(int i=prev_newline_at;buffer[i]!='\n'&&oi<max_size;i++) output[oi++]=buffer[i]; //copy to output buffer 427 output[oi++]='\0'; 428 memmove(buffer,buffer+last_newline_at,buffer_index+bytes_read-last_newline_at); 429 buffer_index=bytes_read-last_newline_at; 430 return 1; 431 } 432 else 433 { 434 buffer_index+=bytes_read; 435 return 0; 436 } 437 } 438 439 int ctl_get_arg(char* input, const char* cmd, const char* format, ...) 440 { 441 int retval=0; 442 int cmdlen=strlen(cmd); 443 if(input[cmdlen]=='=') 444 { 445 //fprintf(stderr, "cga found=\n"); 446 if(input[cmdlen]=0, !strcmp(input,cmd)) 447 { 448 //fprintf(stderr, "cga foundokay\n"); 449 va_list vl; 450 va_start(vl,format); 451 retval=vsscanf(input+cmdlen+1,format,vl); 452 va_end(vl); 453 } 454 input[cmdlen]='='; 455 } 456 //fprintf(stderr, "cga retval %d\n", retval); 457 return retval; 458 } 459 460 void client() 461 { 462 in_client=1; 463 print_client(this_client, "client process forked."); 464 465 char client_subprocess_cmd_buf[500]; 466 int input_fd = this_client->pipefd[0]; 467 int pipe_ctl[2], pipe_stdout[2]; 468 469 prctl(PR_SET_PDEATHSIG, SIGHUP); //get a signal when parent exits 470 471 if(decimation!=1) 472 { 473 474 if(pipe(pipe_ctl)==-1) error_exit(MSG_START "cannot open new pipe() for the client subprocess"); 475 if(pipe(pipe_stdout)==-1) error_exit(MSG_START "cannot open new pipe() for the client subprocess"); 476 switch(ddc_method) 477 { 478 case M_TD: 479 sprintf(client_subprocess_cmd_buf, subprocess_cmd_td, pipe_ctl[0], decimation, transition_bw); 480 break; 481 case M_FASTDDC: 482 sprintf(client_subprocess_cmd_buf, subprocess_args_fastddc_2, pipe_ctl[0], decimation, transition_bw); 483 break; 484 } 485 486 if(!(client_subprocess_pid = run_subprocess( client_subprocess_cmd_buf, this_client->pipefd, pipe_stdout, &client_subprocess_pgrp))) 487 print_exit(MSG_START "couldn't start client_subprocess_cmd!\n"); 488 fprintf(stderr, MSG_START "starting client_subprocess_cmd: %s\n", client_subprocess_cmd_buf); 489 input_fd = pipe_stdout[0]; //we don't have to set it nonblocking 490 fprintf(stderr, MSG_START "pipe_stdout[0] = %d\n", pipe_stdout[0]); 491 write(pipe_ctl[1], "0.0\n", 4); 492 } 493 char recv_cmd[CTL_BUFSIZE]; 494 char temps[CTL_BUFSIZE*2]; 495 int tempi; 496 float tempf; 497 498 for(;;) 499 { 500 while(read_socket_ctl(this_client->socket, recv_cmd, CTL_BUFSIZE)) 501 { 502 sprintf(temps, "read_socket_ctl: %s", recv_cmd); 503 print_client(this_client, temps); 504 if(ctl_get_arg(recv_cmd, "bypass", "%d", &tempi)) 505 { 506 if(tempi==1 && client_subprocess_pid) 507 { 508 //print_client(this_client, "suspending client_subprocess_pgrp...\n"); 509 //fprintf(stderr, "client_subprocess_pgrp = %d\n", client_subprocess_pgrp); 510 //killpg(client_subprocess_pgrp, SIGTSTP); 511 //while(proc_exists(client_subprocess_pid)) usleep(10000); 512 //print_client(this_client, "done killing client_subprocess_pid.\n"); 513 input_fd=this_client->pipefd[0]; //by doing this, we don't read from pipe_stdout[0] anymore, so that csdr stops doing anything, and also doesn't read anymore from the input: we get the whole I/Q stream! 514 } 515 if(tempi==0 && client_subprocess_pid) 516 { 517 input_fd=pipe_stdout[0]; 518 } 519 520 } 521 if(ctl_get_arg(recv_cmd, "shift", "%g", &tempf)) 522 { 523 tempi=sprintf(temps, "%g\n", tempf); 524 write(pipe_ctl[1], temps, tempi); 525 fsync(pipe_ctl[1]); 526 } 527 } 528 int nread = read(input_fd,buf,bufsizeall); 529 if(nread<=0) continue; 530 if(send(this_client->socket,buf,nread,0)==-1) 531 { 532 print_client(this_client, "client process is exiting.\n"); 533 if(client_subprocess_pid) killpg2(client_subprocess_pgrp); 534 exit(0); 535 } 536 } 537 } 538 539 void killpg2(pid_t pgrp) 540 { 541 //fprintf(stderr, MSG_START "killpg2: %d\n", pgrp); 542 if(pgrp!=1 && pgrp!=0) killpg(pgrp, SIGTERM); 543 } 544 545 void error_exit(const char* why) 546 { 547 perror(why); 548 exit(1); 549 } 550 551 void print_exit(const char* why) 552 { 553 fprintf(stderr, "%s", why); 554 exit(1); 555 } 556 557 void maxfd(int* maxfd, int fd) 558 { 559 if(fd>=*maxfd) *maxfd=fd+1; 560 }