tcp_srv.cc
1 /******************************************************************** 2 * Description: tcp_srv.cc 3 * 4 * Derived from a work by Fred Proctor & Will Shackleford 5 * 6 * Author: 7 * License: LGPL Version 2 8 * System: Linux 9 * 10 * Copyright (c) 2004 All rights reserved. 11 * 12 * Last change: 13 ********************************************************************/ 14 /**************************************************************************** 15 * File: tcp_srv.cc 16 * Purpose: Provides the functions for the class CMS_SERVER_REMOTE_TCP_PORT 17 * which provides TCP specific overrides of the CMS_SERVER_REMOTE_PORT class. 18 ****************************************************************************/ 19 20 #if defined(__GNUC__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 4))) 21 #pragma GCC optimize "-fno-strict-aliasing" 22 #pragma GCC diagnostic ignored "-Wstrict-aliasing" 23 #endif 24 25 #ifdef __cplusplus 26 extern "C" { 27 #endif 28 29 #include <string.h> /* memset(), strerror() */ 30 #include <stdlib.h> // malloc(), free() 31 #include <unistd.h> 32 #include <sys/socket.h> 33 #include <sys/ioctl.h> 34 #include <errno.h> /* errno */ 35 #include <signal.h> // SIGPIPE, signal() 36 37 #ifdef __cplusplus 38 } 39 #endif 40 41 #include <sys/types.h> 42 #include <sys/wait.h> // waitpid 43 44 #include <arpa/inet.h> /* inet_ntoa */ 45 #include "cms.hh" /* class CMS */ 46 #include "nml.hh" // class NML 47 #include "tcp_srv.hh" /* class CMS_SERVER_REMOTE_TCP_PORT */ 48 #include "rcs_print.hh" /* rcs_print_error() */ 49 #include "linklist.hh" /* class LinkedList */ 50 #include "tcp_opts.hh" /* SET_TCP_NODELAY */ 51 #include "timer.hh" // esleep() 52 #include "_timer.h" 53 #include "cmsdiag.hh" // class CMS_DIAGNOSTICS_INFO 54 extern "C" { 55 #include "recvn.h" /* recvn() */ 56 #include "sendn.h" /* sendn() */ 57 } 58 #include "physmem.hh" // PHYSMEM_HANDLE 59 60 int tcpsvr_threads_created = 0; 61 int tcpsvr_threads_killed = 0; 62 int tcpsvr_threads_exited = 0; 63 int tcpsvr_threads_returned_early = 0; 64 65 TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST() 66 { 67 access_type = CMS_READ_ACCESS; /* read or just peek */ 68 last_id_read = 0; /* The server can compare with id from buffer 69 */ 70 /* to determine if the buffer is new */ 71 /* to this client */ 72 timeout_millis = -1; /* Milliseconds for blocking_timeout or -1 to 73 wait forever */ 74 _client_tcp_port = NULL; 75 remport = NULL; 76 server = NULL; 77 _nml = NULL; 78 _reply = NULL; 79 _data = NULL; 80 read_reply = NULL; 81 } 82 83 static inline double tcp_svr_reverse_double(double in) 84 { 85 double out; 86 char *c1, *c2; 87 88 c1 = ((char *) &in) + 7; 89 c2 = (char *) &out; 90 for (int i = 0; i < 8; i++) { 91 *c2 = *c1; 92 c1--; 93 c2++; 94 } 95 return out; 96 } 97 98 TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST() 99 { 100 if (NULL != _nml) { 101 NML *nmlcopy = (NML *) _nml; 102 _nml = NULL; 103 delete nmlcopy; 104 } 105 if (NULL != _data) { 106 void *_datacopy = _data; 107 if (NULL != read_reply) { 108 if (_data == read_reply->data) { 109 read_reply->data = NULL; 110 } 111 } 112 _data = NULL; 113 free(_datacopy); 114 } 115 if (NULL != _reply) { 116 free(_reply); 117 _reply = NULL; 118 read_reply = NULL; 119 } 120 if (NULL != read_reply) { 121 if (NULL != read_reply->data) { 122 free(read_reply->data); 123 read_reply->data = NULL; 124 } 125 delete read_reply; 126 read_reply = NULL; 127 } 128 } 129 130 CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT(CMS_SERVER * _cms_server): 131 CMS_SERVER_REMOTE_PORT(_cms_server) 132 { 133 client_ports = (LinkedList *) NULL; 134 connection_socket = 0; 135 connection_port = 0; 136 maxfdpl = 0; 137 dtimeout = 20.0; 138 139 memset(&server_socket_address, 0, sizeof(server_socket_address)); 140 server_socket_address.sin_family = AF_INET; 141 server_socket_address.sin_addr.s_addr = htonl(INADDR_ANY); 142 server_socket_address.sin_port = 0; 143 144 client_ports = new LinkedList; 145 if (NULL == client_ports) { 146 rcs_print_error("Can not create linked list for client ports.\n"); 147 return; 148 } 149 polling_enabled = 0; 150 memset(&select_timeout, 0, sizeof(select_timeout)); 151 select_timeout.tv_sec = 30; 152 select_timeout.tv_usec = 30; 153 subscription_buffers = NULL; 154 current_poll_interval_millis = 30000; 155 memset(&read_fd_set, 0, sizeof(read_fd_set)); 156 memset(&write_fd_set, 0, sizeof(write_fd_set)); 157 } 158 159 CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT() 160 { 161 if (client_ports == NULL) return; 162 unregister_port(); 163 if (NULL != client_ports) { 164 delete client_ports; 165 client_ports = (LinkedList *) NULL; 166 } 167 } 168 169 void blocking_thread_kill(long int id) 170 { 171 172 if (id <= 0) { 173 return; 174 } 175 #ifdef POSIX_THREADS 176 pthread_kill(id, SIGINT); 177 pthread_join(id, NULL); 178 #endif 179 #ifdef NO_THREADS 180 kill(id, SIGINT); 181 waitpid(id, NULL, 0); 182 #endif 183 tcpsvr_threads_killed++; 184 } 185 186 void CMS_SERVER_REMOTE_TCP_PORT::unregister_port() 187 { 188 CLIENT_TCP_PORT *client; 189 int number_of_connected_clients = 0; 190 191 client = (CLIENT_TCP_PORT *) client_ports->get_head(); 192 while (NULL != client) { 193 rcs_print("Exiting even though client on %s is still connected.\n", 194 inet_ntoa(client->address.sin_addr)); 195 client = (CLIENT_TCP_PORT *) client_ports->get_next(); 196 number_of_connected_clients++; 197 } 198 client = (CLIENT_TCP_PORT *) client_ports->get_head(); 199 while (NULL != client) { 200 delete client; 201 client_ports->delete_current_node(); 202 client = (CLIENT_TCP_PORT *) client_ports->get_next(); 203 } 204 if (NULL != subscription_buffers) { 205 TCP_BUFFER_SUBSCRIPTION_INFO *sub_info = 206 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); 207 while (NULL != sub_info) { 208 delete sub_info; 209 sub_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) 210 subscription_buffers->get_next(); 211 } 212 delete subscription_buffers; 213 subscription_buffers = NULL; 214 } 215 if (number_of_connected_clients > 0) { 216 esleep(2.0); 217 } 218 if (connection_socket > 0) { 219 close(connection_socket); 220 connection_socket = 0; 221 } 222 } 223 224 int CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms(CMS * _cms) 225 { 226 if (NULL == _cms) { 227 return 0; 228 } 229 if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE) { 230 return 0; 231 } 232 if (NULL != _cms) { 233 if (min_compatible_version < 1e-6 || 234 (min_compatible_version > _cms->min_compatible_version && 235 _cms->min_compatible_version > 1e-6)) { 236 min_compatible_version = _cms->min_compatible_version; 237 } 238 if (_cms->confirm_write) { 239 confirm_write = _cms->confirm_write; 240 } 241 } 242 if (_cms->total_subdivisions > max_total_subdivisions) { 243 max_total_subdivisions = _cms->total_subdivisions; 244 } 245 if (server_socket_address.sin_port == 0) { 246 server_socket_address.sin_port = 247 htons(((u_short) _cms->tcp_port_number)); 248 port_num = _cms->tcp_port_number; 249 return 1; 250 } 251 if (server_socket_address.sin_port == 252 htons(((u_short) _cms->tcp_port_number))) { 253 port_num = _cms->tcp_port_number; 254 return 1; 255 } 256 return 0; 257 } 258 259 void CMS_SERVER_REMOTE_TCP_PORT::register_port() 260 { 261 port_registered = 0; 262 rcs_print_debug(PRINT_CMS_CONFIG_INFO, 263 "Registering server on TCP port %d.\n", 264 ntohs(server_socket_address.sin_port)); 265 if (server_socket_address.sin_port == 0) { 266 rcs_print_error("server can not register on port number 0.\n"); 267 return; 268 } 269 if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 270 rcs_print_error("socket error: %d -- %s\n", errno, strerror(errno)); 271 rcs_print_error("Server can not open stream socket.\n"); 272 return; 273 } 274 275 if (set_tcp_socket_options(connection_socket) < 0) { 276 return; 277 } 278 if (bind(connection_socket, (struct sockaddr *) &server_socket_address, 279 sizeof(server_socket_address)) < 0) { 280 rcs_print_error("bind error: %d -- %s\n", errno, strerror(errno)); 281 rcs_print_error 282 ("Server can not bind the connection socket on port %d.\n", 283 ntohs(server_socket_address.sin_port)); 284 return; 285 } 286 if (listen(connection_socket, 5) < 0) { 287 rcs_print_error("listen error: %d -- %s\n", errno, strerror(errno)); 288 rcs_print_error("TCP Server: error on call to listen for port %d.\n", 289 ntohs(server_socket_address.sin_port)); 290 return; 291 } 292 port_registered = 1; 293 294 } 295 296 static int last_pipe_signum = 0; 297 298 static void handle_pipe_error(int signum) 299 { 300 last_pipe_signum = signum; 301 rcs_print_error("SIGPIPE intercepted.\n"); 302 } 303 304 void CMS_SERVER_REMOTE_TCP_PORT::run() 305 { 306 int bytes_ready; 307 int ready_descriptors; 308 if (NULL == client_ports) { 309 rcs_print_error("CMS_SERVER: List of client ports is NULL.\n"); 310 return; 311 } 312 CLIENT_TCP_PORT *new_client_port, *client_port_to_check; 313 FD_ZERO(&read_fd_set); 314 FD_ZERO(&write_fd_set); 315 FD_SET(connection_socket, &read_fd_set); 316 maxfdpl = connection_socket + 1; 317 signal(SIGPIPE, handle_pipe_error); 318 rcs_print_debug(PRINT_CMS_CONFIG_INFO, 319 "running server for TCP port %d (connection_socket = %d).\n", 320 ntohs(server_socket_address.sin_port), connection_socket); 321 322 cms_server_count++; 323 fd_set read_fd_set_copy, write_fd_set_copy; 324 FD_ZERO(&read_fd_set_copy); 325 FD_ZERO(&write_fd_set_copy); 326 FD_SET(connection_socket, &read_fd_set_copy); 327 328 while (1) { 329 if (polling_enabled) { 330 memcpy(&read_fd_set_copy, &read_fd_set, sizeof(fd_set)); 331 memcpy(&write_fd_set_copy, &write_fd_set, sizeof(fd_set)); 332 select_timeout.tv_sec = current_poll_interval_millis / 1000; 333 select_timeout.tv_usec = 334 (current_poll_interval_millis % 1000) * 1000; 335 ready_descriptors = 336 select(maxfdpl, &read_fd_set, &write_fd_set, 337 (fd_set *) NULL, (timeval *) & select_timeout); 338 if (ready_descriptors == 0) { 339 update_subscriptions(); 340 memcpy(&read_fd_set, &read_fd_set_copy, sizeof(fd_set)); 341 memcpy(&write_fd_set, &write_fd_set_copy, sizeof(fd_set)); 342 continue; 343 } 344 } else { 345 ready_descriptors = 346 select(maxfdpl, &read_fd_set, &write_fd_set, 347 (fd_set *) NULL, (timeval *) NULL); 348 349 } 350 if (ready_descriptors < 0) { 351 rcs_print_error("server: select error.(errno = %d | %s)\n", 352 errno, strerror(errno)); 353 } 354 if (NULL == client_ports) { 355 rcs_print_error("CMS_SERVER: List of client ports is NULL.\n"); 356 return; 357 } 358 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); 359 while (NULL != client_port_to_check) { 360 if (FD_ISSET(client_port_to_check->socket_fd, &read_fd_set)) { 361 ioctl(client_port_to_check->socket_fd, FIONREAD, 362 (caddr_t) & bytes_ready); 363 if (bytes_ready <= 0) { 364 rcs_print_debug(PRINT_SOCKET_CONNECT, 365 "Socket closed by host with IP address %s.\n", 366 inet_ntoa(client_port_to_check->address.sin_addr)); 367 if (NULL != client_port_to_check->subscriptions) { 368 TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info = 369 (TCP_CLIENT_SUBSCRIPTION_INFO *) 370 client_port_to_check->subscriptions->get_head(); 371 while (NULL != clnt_sub_info) { 372 if (NULL != clnt_sub_info->sub_buf_info && 373 clnt_sub_info->subscription_list_id >= 0) { 374 if (NULL != 375 clnt_sub_info->sub_buf_info-> 376 sub_clnt_info) { 377 clnt_sub_info->sub_buf_info-> 378 sub_clnt_info-> 379 delete_node(clnt_sub_info-> 380 subscription_list_id); 381 if (clnt_sub_info->sub_buf_info-> 382 sub_clnt_info->list_size < 1) { 383 delete clnt_sub_info->sub_buf_info-> 384 sub_clnt_info; 385 clnt_sub_info->sub_buf_info-> 386 sub_clnt_info = NULL; 387 if (NULL != subscription_buffers 388 && clnt_sub_info->sub_buf_info-> 389 list_id >= 0) { 390 subscription_buffers-> 391 delete_node(clnt_sub_info-> 392 sub_buf_info->list_id); 393 delete clnt_sub_info-> 394 sub_buf_info; 395 clnt_sub_info->sub_buf_info = 396 NULL; 397 } 398 } 399 clnt_sub_info->sub_buf_info = NULL; 400 } 401 delete clnt_sub_info; 402 clnt_sub_info = 403 (TCP_CLIENT_SUBSCRIPTION_INFO *) 404 client_port_to_check->subscriptions-> 405 get_next(); 406 } 407 delete client_port_to_check->subscriptions; 408 client_port_to_check->subscriptions = NULL; 409 recalculate_polling_interval(); 410 } 411 } 412 if (client_port_to_check->threadId > 0 413 && client_port_to_check->blocking) { 414 blocking_thread_kill(client_port_to_check->threadId); 415 } 416 close(client_port_to_check->socket_fd); 417 FD_CLR(client_port_to_check->socket_fd, &read_fd_set); 418 client_port_to_check->socket_fd = -1; 419 delete client_port_to_check; 420 client_ports->delete_current_node(); 421 } else { 422 if (client_port_to_check->blocking) { 423 if (client_port_to_check->threadId > 0) { 424 rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY, 425 "Data received from %s:%d when it should be blocking (bytes_ready=%d).\n", 426 inet_ntoa 427 (client_port_to_check->address. 428 sin_addr), 429 client_port_to_check->socket_fd, bytes_ready); 430 rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY, 431 "Killing handler %d.\n", 432 client_port_to_check->threadId); 433 434 blocking_thread_kill 435 (client_port_to_check->threadId); 436 #if 0 437 *((uint32_t *) temp_buffer) = 438 htonl(client_port_to_check->serial_number); 439 *((uint32_t *) temp_buffer + 1) = 440 htonl((unsigned long) 441 CMS_SERVER_SIDE_ERROR); 442 putbe32(temp_buffer + 8, 0); /* size 443 */ 444 putbe32(temp_buffer + 12, 0); /* write_id 445 */ 446 putbe32(temp_buffer + 16, 0); /* was_read 447 */ 448 sendn(client_port_to_check->socket_fd, 449 temp_buffer, 20, 0, dtimeout); 450 #endif 451 client_port_to_check->threadId = 0; 452 client_port_to_check->blocking = 0; 453 } 454 } 455 handle_request(client_port_to_check); 456 } 457 ready_descriptors--; 458 } else { 459 FD_SET(client_port_to_check->socket_fd, &read_fd_set); 460 } 461 client_port_to_check = 462 (CLIENT_TCP_PORT *) client_ports->get_next(); 463 } 464 if (FD_ISSET(connection_socket, &read_fd_set) 465 && ready_descriptors > 0) { 466 ready_descriptors--; 467 socklen_t client_address_length; 468 new_client_port = new CLIENT_TCP_PORT(); 469 client_address_length = sizeof(new_client_port->address); 470 new_client_port->socket_fd = accept(connection_socket, 471 (struct sockaddr *) 472 &new_client_port->address, &client_address_length); 473 current_clients++; 474 if (current_clients > max_clients) { 475 max_clients = current_clients; 476 } 477 if (new_client_port->socket_fd < 0) { 478 rcs_print_error("server: accept error -- %d %s \n", errno, 479 strerror(errno)); 480 continue; 481 } 482 rcs_print_debug(PRINT_SOCKET_CONNECT, 483 "Socket opened by host with IP address %s.\n", 484 inet_ntoa(new_client_port->address.sin_addr)); 485 new_client_port->serial_number = 0; 486 new_client_port->blocking = 0; 487 if (NULL != client_ports) { 488 client_ports->store_at_tail(new_client_port, 489 sizeof(new_client_port), 0); 490 } 491 if (maxfdpl < new_client_port->socket_fd + 1) { 492 maxfdpl = new_client_port->socket_fd + 1; 493 } 494 FD_SET(new_client_port->socket_fd, &read_fd_set); 495 } else { 496 FD_SET(connection_socket, &read_fd_set); 497 } 498 if (0 != ready_descriptors) { 499 rcs_print_error("%d descriptors ready but not serviced.\n", 500 ready_descriptors); 501 } 502 update_subscriptions(); 503 } 504 } 505 506 static int tcpsvr_handle_blocking_request_sigint_count = 0; 507 static int tcpsvr_last_sig = 0; 508 509 void tcpsvr_handle_blocking_request_sigint_handler(int sig) 510 { 511 tcpsvr_last_sig = sig; 512 tcpsvr_handle_blocking_request_sigint_count++; 513 } 514 515 static void putbe32(char *addr, uint32_t val) { 516 val = htonl(val); 517 memcpy(addr, &val, sizeof(val)); 518 } 519 520 static uint32_t getbe32(char *addr) { 521 uint32_t val; 522 memcpy(&val, addr, sizeof(val)); 523 return ntohl(val); 524 } 525 526 #if defined(POSIX_THREADS) || defined(NO_THREADS) 527 void *tcpsvr_handle_blocking_request(void *_req) 528 { 529 signal(SIGINT, tcpsvr_handle_blocking_request_sigint_handler); 530 TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req = 531 (TCPSVR_BLOCKING_READ_REQUEST *) _req; 532 char temp_buffer[0x2000]; 533 if (_req == NULL) { 534 tcpsvr_threads_returned_early++; 535 return 0; 536 } 537 double dtimeout = 538 ((double) (blocking_read_req->timeout_millis + 10)) / 1000.0; 539 if (dtimeout < 0) { 540 dtimeout = 600.0; 541 } 542 if (dtimeout < 0.5) { 543 dtimeout = 0.5; 544 } 545 if (dtimeout > 600.0) { 546 dtimeout = 600.0; 547 } 548 CLIENT_TCP_PORT *_client_tcp_port = blocking_read_req->_client_tcp_port; 549 CMS_SERVER *server = blocking_read_req->server; 550 551 if (NULL == server || NULL == _client_tcp_port) { 552 tcpsvr_threads_returned_early++; 553 return 0; 554 } 555 memset(temp_buffer, 0, 0x2000); 556 REMOTE_BLOCKING_READ_REPLY *read_reply; 557 558 if (NULL != _client_tcp_port->diag_info) { 559 _client_tcp_port->diag_info->buffer_number = 560 blocking_read_req->buffer_number; 561 server->set_diag_info(_client_tcp_port->diag_info); 562 } else if (server->diag_enabled) { 563 server->reset_diag_info(blocking_read_req->buffer_number); 564 } 565 566 read_reply = (REMOTE_BLOCKING_READ_REPLY *) 567 server->process_request(blocking_read_req); 568 blocking_read_req->read_reply = read_reply; 569 if (NULL == read_reply) { 570 _client_tcp_port->blocking = 0; 571 rcs_print_error("Server could not process request.\n"); 572 putbe32(temp_buffer, _client_tcp_port->serial_number); 573 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 574 putbe32(temp_buffer + 8, 0); /* size */ 575 putbe32(temp_buffer + 12, 0) /* write_id */; 576 putbe32(temp_buffer + 16, 0) /* was_read */; 577 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); 578 _client_tcp_port->errors++; 579 _client_tcp_port->blocking_read_req = NULL; 580 delete blocking_read_req; 581 _client_tcp_port->threadId = 0; 582 tcpsvr_threads_returned_early++; 583 return 0; 584 } 585 putbe32(temp_buffer, _client_tcp_port->serial_number); 586 putbe32(temp_buffer + 4, read_reply->status); 587 putbe32(temp_buffer + 8, read_reply->size); 588 putbe32(temp_buffer + 12, read_reply->write_id); 589 putbe32(temp_buffer + 16, read_reply->was_read); 590 if (read_reply->size < (0x2000 - 20) && read_reply->size > 0) { 591 memcpy(temp_buffer + 20, read_reply->data, read_reply->size); 592 _client_tcp_port->blocking = 0; 593 if (sendn 594 (_client_tcp_port->socket_fd, temp_buffer, 20 + read_reply->size, 595 0, dtimeout) < 0) { 596 _client_tcp_port->blocking = 0; 597 _client_tcp_port->errors++; 598 _client_tcp_port->blocking_read_req = NULL; 599 delete blocking_read_req; 600 _client_tcp_port->threadId = 0; 601 tcpsvr_threads_returned_early++; 602 return 0; 603 } 604 } else { 605 _client_tcp_port->blocking = 0; 606 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) < 607 0) { 608 _client_tcp_port->blocking = 0; 609 _client_tcp_port->errors++; 610 _client_tcp_port->blocking_read_req = NULL; 611 delete blocking_read_req; 612 _client_tcp_port->threadId = 0; 613 tcpsvr_threads_returned_early++; 614 return 0; 615 } 616 if (read_reply->size > 0) { 617 if (sendn 618 (_client_tcp_port->socket_fd, read_reply->data, 619 read_reply->size, 0, dtimeout) < 0) { 620 _client_tcp_port->blocking = 0; 621 _client_tcp_port->errors++; 622 _client_tcp_port->blocking_read_req = NULL; 623 delete blocking_read_req; 624 _client_tcp_port->threadId = 0; 625 tcpsvr_threads_returned_early++; 626 return 0; 627 } 628 } 629 } 630 _client_tcp_port->blocking_read_req = NULL; 631 delete blocking_read_req; 632 _client_tcp_port->threadId = 0; 633 tcpsvr_threads_exited++; 634 #ifdef POSIX_THREADS 635 pthread_exit(0); 636 #endif 637 #ifdef NO_THREADS 638 exit(0); 639 #endif 640 } 641 642 #endif 643 644 void CMS_SERVER_REMOTE_TCP_PORT::handle_request(CLIENT_TCP_PORT * 645 _client_tcp_port) 646 { 647 CLIENT_TCP_PORT *client_port_to_check = NULL; 648 pid_t pid = getpid(); 649 pid_t tid = 0; 650 CMS_SERVER *server; 651 server = find_server(pid, tid); 652 if (NULL == server) { 653 rcs_print_error 654 ("CMS_SERVER_REMOTE_TCP_PORT::handle_request() Cannot find server object for pid = %d.\n", 655 pid); 656 return; 657 } 658 659 if (server->using_passwd_file) { 660 current_user_info = get_connected_user(_client_tcp_port->socket_fd); 661 } 662 663 if (_client_tcp_port->errors >= _client_tcp_port->max_errors) { 664 rcs_print_error("Too many errors - closing connection(%d)\n", 665 _client_tcp_port->socket_fd); 666 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); 667 while (NULL != client_port_to_check) { 668 if (client_port_to_check->socket_fd == 669 _client_tcp_port->socket_fd) { 670 delete client_port_to_check; 671 client_ports->delete_current_node(); 672 } 673 client_port_to_check = 674 (CLIENT_TCP_PORT *) client_ports->get_next(); 675 } 676 close(_client_tcp_port->socket_fd); 677 current_clients--; 678 FD_CLR(_client_tcp_port->socket_fd, &read_fd_set); 679 _client_tcp_port->socket_fd = -1; 680 } 681 682 if (recvn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, -1, NULL) < 0) { 683 rcs_print_error("Can not read from client port (%d) from %s\n", 684 _client_tcp_port->socket_fd, 685 inet_ntoa(_client_tcp_port->address.sin_addr)); 686 _client_tcp_port->errors++; 687 return; 688 } 689 long request_type, buffer_number, received_serial_number; 690 received_serial_number = getbe32(temp_buffer); 691 if (received_serial_number != _client_tcp_port->serial_number) { 692 rcs_print_error 693 ("received_serial_number (%ld) does not equal expected serial number.(%ld)\n", 694 received_serial_number, _client_tcp_port->serial_number); 695 _client_tcp_port->serial_number = received_serial_number; 696 _client_tcp_port->errors++; 697 } 698 _client_tcp_port->serial_number++; 699 request_type = ntohl(*((uint32_t *) temp_buffer + 1)); 700 buffer_number = ntohl(*((uint32_t *) temp_buffer + 2)); 701 702 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 703 "TCPSVR request received: fd = %d, serial_number=%ld, request_type=%ld, buffer_number=%ld\n", 704 _client_tcp_port->socket_fd, 705 _client_tcp_port->serial_number, request_type, buffer_number); 706 707 if (NULL != _client_tcp_port->diag_info) { 708 _client_tcp_port->diag_info->buffer_number = buffer_number; 709 server->set_diag_info(_client_tcp_port->diag_info); 710 } else if (server->diag_enabled) { 711 server->reset_diag_info(buffer_number); 712 } 713 714 switch_function(_client_tcp_port, 715 server, request_type, buffer_number, received_serial_number); 716 717 if (NULL != _client_tcp_port->diag_info && 718 NULL != server->last_local_port_used && server->diag_enabled) { 719 if (NULL != server->last_local_port_used->cms) { 720 if (NULL != 721 server->last_local_port_used->cms->handle_to_global_data) { 722 _client_tcp_port->diag_info->bytes_moved = 723 server->last_local_port_used->cms->handle_to_global_data-> 724 total_bytes_moved; 725 } 726 } 727 } 728 } 729 730 void CMS_SERVER_REMOTE_TCP_PORT::switch_function(CLIENT_TCP_PORT * 731 _client_tcp_port, 732 CMS_SERVER * server, 733 long request_type, long buffer_number, long received_serial_number) 734 { 735 int total_subdivisions = 1; 736 CLIENT_TCP_PORT *client_port_to_check = NULL; 737 switch (request_type) { 738 case REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE: 739 { 740 if (NULL == _client_tcp_port->diag_info) { 741 _client_tcp_port->diag_info = 742 new REMOTE_SET_DIAG_INFO_REQUEST(); 743 } 744 if (recvn 745 (_client_tcp_port->socket_fd, server->set_diag_info_buf, 68, 746 0, -1, NULL) < 0) { 747 rcs_print_error 748 ("Can not read from client port (%d) from %s\n", 749 _client_tcp_port->socket_fd, 750 inet_ntoa(_client_tcp_port->address.sin_addr)); 751 _client_tcp_port->errors++; 752 return; 753 } 754 _client_tcp_port->diag_info->bytes_moved = 0.0; 755 _client_tcp_port->diag_info->buffer_number = buffer_number; 756 memcpy(_client_tcp_port->diag_info->process_name, 757 server->set_diag_info_buf, 16); 758 memcpy(_client_tcp_port->diag_info->host_sysinfo, 759 server->set_diag_info_buf + 16, 32); 760 _client_tcp_port->diag_info->pid = 761 htonl(*((uint32_t *) (server->set_diag_info_buf + 48))); 762 _client_tcp_port->diag_info->c_num = 763 htonl(*((uint32_t *) (server->set_diag_info_buf + 52))); 764 memcpy(&(_client_tcp_port->diag_info->rcslib_ver), 765 server->set_diag_info_buf + 56, 8); 766 _client_tcp_port->diag_info->reverse_flag = 767 *((int *) ((char *) server->set_diag_info_buf + 64)); 768 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { 769 _client_tcp_port->diag_info->rcslib_ver = 770 (double) tcp_svr_reverse_double((double) 771 _client_tcp_port->diag_info->rcslib_ver); 772 } 773 } 774 break; 775 776 case REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE: 777 { 778 REMOTE_GET_DIAG_INFO_REQUEST diagreq; 779 diagreq.buffer_number = buffer_number; 780 REMOTE_GET_DIAG_INFO_REPLY *diagreply = NULL; 781 diagreply = 782 (REMOTE_GET_DIAG_INFO_REPLY *) server-> 783 process_request(&diagreq); 784 if (NULL == diagreply) { 785 putbe32(temp_buffer, _client_tcp_port->serial_number); 786 putbe32(temp_buffer+4, CMS_SERVER_SIDE_ERROR); 787 if (sendn 788 (_client_tcp_port->socket_fd, temp_buffer, 24, 0, 789 dtimeout) < 0) { 790 _client_tcp_port->errors++; 791 } 792 return; 793 } 794 if (NULL == diagreply->cdi) { 795 putbe32(temp_buffer, _client_tcp_port->serial_number); 796 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 797 if (sendn 798 (_client_tcp_port->socket_fd, temp_buffer, 24, 0, 799 dtimeout) < 0) { 800 _client_tcp_port->errors++; 801 } 802 return; 803 } 804 memset(temp_buffer, 0, 0x2000); 805 unsigned long dpi_offset = 32; 806 putbe32(temp_buffer, _client_tcp_port->serial_number); 807 putbe32(temp_buffer + 4, diagreply->status); 808 putbe32(temp_buffer + 8, diagreply->cdi->last_writer); 809 putbe32(temp_buffer + 12, diagreply->cdi->last_reader); 810 double curtime = etime(); 811 double reversed_temp = 0.0; 812 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) { 813 reversed_temp = 814 (double) tcp_svr_reverse_double((double) curtime); 815 memcpy(temp_buffer + 16, &reversed_temp, 8); 816 } else { 817 memcpy(temp_buffer + 16, &(curtime), 8); 818 } 819 int dpi_count = 0; 820 if (NULL != diagreply->cdi->dpis) { 821 CMS_DIAG_PROC_INFO *dpi = 822 (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_head(); 823 while ((dpi_offset < 824 ((int) 0x2000 - sizeof(CMS_DIAG_PROC_INFO))) 825 && dpi != NULL) { 826 dpi_count++; 827 memcpy(temp_buffer + dpi_offset, dpi->name, 16); 828 dpi_offset += 16; 829 memcpy(temp_buffer + dpi_offset, dpi->host_sysinfo, 32); 830 dpi_offset += 32; 831 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 832 htonl(dpi->pid); 833 dpi_offset += 4; 834 if (_client_tcp_port->diag_info->reverse_flag == 835 0x44332211) { 836 reversed_temp = 837 (double) tcp_svr_reverse_double((double) 838 dpi->rcslib_ver); 839 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 840 } else { 841 memcpy(temp_buffer + dpi_offset, &(dpi->rcslib_ver), 842 8); 843 } 844 dpi_offset += 8; 845 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 846 htonl(dpi->access_type); 847 dpi_offset += 4; 848 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 849 htonl(dpi->msg_id); 850 dpi_offset += 4; 851 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 852 htonl(dpi->msg_size); 853 dpi_offset += 4; 854 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 855 htonl(dpi->msg_type); 856 dpi_offset += 4; 857 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 858 htonl(dpi->number_of_accesses); 859 dpi_offset += 4; 860 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 861 htonl(dpi->number_of_new_messages); 862 dpi_offset += 4; 863 if (_client_tcp_port->diag_info->reverse_flag == 864 0x44332211) { 865 reversed_temp = 866 (double) tcp_svr_reverse_double((double) 867 dpi->bytes_moved); 868 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 869 } else { 870 memcpy(temp_buffer + dpi_offset, &(dpi->bytes_moved), 871 8); 872 } 873 dpi_offset += 8; 874 if (_client_tcp_port->diag_info->reverse_flag == 875 0x44332211) { 876 reversed_temp = 877 (double) tcp_svr_reverse_double((double) 878 dpi->bytes_moved_across_socket); 879 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 880 } else { 881 memcpy(temp_buffer + dpi_offset, 882 &(dpi->bytes_moved_across_socket), 8); 883 } 884 dpi_offset += 8; 885 if (_client_tcp_port->diag_info->reverse_flag == 886 0x44332211) { 887 reversed_temp = 888 (double) tcp_svr_reverse_double((double) 889 dpi->last_access_time); 890 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 891 } else { 892 memcpy(temp_buffer + dpi_offset, 893 &(dpi->last_access_time), 8); 894 } 895 dpi_offset += 8; 896 if (_client_tcp_port->diag_info->reverse_flag == 897 0x44332211) { 898 reversed_temp = 899 (double) tcp_svr_reverse_double((double) 900 dpi->first_access_time); 901 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 902 } else { 903 memcpy(temp_buffer + dpi_offset, 904 &(dpi->first_access_time), 8); 905 } 906 dpi_offset += 8; 907 if (_client_tcp_port->diag_info->reverse_flag == 908 0x44332211) { 909 reversed_temp = 910 (double) tcp_svr_reverse_double((double) 911 dpi->min_difference); 912 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 913 } else { 914 memcpy(temp_buffer + dpi_offset, 915 &(dpi->min_difference), 8); 916 } 917 dpi_offset += 8; 918 if (_client_tcp_port->diag_info->reverse_flag == 919 0x44332211) { 920 reversed_temp = 921 (double) tcp_svr_reverse_double((double) 922 dpi->max_difference); 923 memcpy(temp_buffer + dpi_offset, &reversed_temp, 8); 924 } else { 925 memcpy(temp_buffer + dpi_offset, 926 &(dpi->max_difference), 8); 927 } 928 dpi_offset += 8; 929 int is_last_writer = 930 (dpi == diagreply->cdi->last_writer_dpi); 931 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 932 htonl(is_last_writer); 933 dpi_offset += 4; 934 int is_last_reader = 935 (dpi == diagreply->cdi->last_reader_dpi); 936 *((uint32_t *) ((char *) temp_buffer + dpi_offset)) = 937 htonl(is_last_reader); 938 dpi_offset += 4; 939 dpi = 940 (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis-> 941 get_next(); 942 } 943 } 944 *((uint32_t *) temp_buffer + 6) = htonl(dpi_count); 945 *((uint32_t *) temp_buffer + 7) = htonl(dpi_offset); 946 if (sendn 947 (_client_tcp_port->socket_fd, temp_buffer, dpi_offset, 0, 948 dtimeout) < 0) { 949 _client_tcp_port->errors++; 950 return; 951 } 952 } 953 break; 954 955 case REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE: 956 { 957 REMOTE_GET_BUF_NAME_REQUEST namereq; 958 namereq.buffer_number = buffer_number; 959 REMOTE_GET_BUF_NAME_REPLY *namereply = NULL; 960 namereply = 961 (REMOTE_GET_BUF_NAME_REPLY *) server-> 962 process_request(&namereq); 963 memset(temp_buffer, 0, 40); 964 if (NULL != namereply) { 965 putbe32(temp_buffer, _client_tcp_port->serial_number); 966 putbe32(temp_buffer + 4, namereply->status); 967 strncpy(temp_buffer + 8, namereply->name, 31); 968 if (sendn 969 (_client_tcp_port->socket_fd, temp_buffer, 40, 0, 970 dtimeout) < 0) { 971 _client_tcp_port->errors++; 972 return; 973 } 974 } else { 975 putbe32(temp_buffer, _client_tcp_port->serial_number); 976 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 977 if (sendn 978 (_client_tcp_port->socket_fd, temp_buffer, 40, 0, 979 dtimeout) < 0) { 980 _client_tcp_port->errors++; 981 return; 982 } 983 } 984 } 985 break; 986 987 case REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE: 988 { 989 TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req; 990 991 #ifdef NO_THREADS 992 if (NULL == _client_tcp_port->blocking_read_req) { 993 _client_tcp_port->blocking_read_req = 994 new TCPSVR_BLOCKING_READ_REQUEST(); 995 } 996 blocking_read_req = _client_tcp_port->blocking_read_req; 997 #else 998 blocking_read_req; 999 = new TCPSVR_BLOCKING_READ_REQUEST(); 1000 #endif 1001 blocking_read_req->buffer_number = buffer_number; 1002 blocking_read_req->access_type = 1003 ntohl(*((uint32_t *) temp_buffer + 3)); 1004 blocking_read_req->last_id_read = 1005 ntohl(*((uint32_t *) temp_buffer + 4)); 1006 total_subdivisions = 1; 1007 if (max_total_subdivisions > 1) { 1008 total_subdivisions = 1009 server->get_total_subdivisions(buffer_number); 1010 } 1011 if (total_subdivisions > 1) { 1012 if (recvn 1013 (_client_tcp_port->socket_fd, 1014 (char *) (((uint32_t *) temp_buffer) + 5), 8, 0, -1, 1015 NULL) < 0) { 1016 rcs_print_error 1017 ("Can not read from client port (%d) from %s\n", 1018 _client_tcp_port->socket_fd, 1019 inet_ntoa(_client_tcp_port->address.sin_addr)); 1020 _client_tcp_port->errors++; 1021 return; 1022 } 1023 blocking_read_req->subdiv = 1024 ntohl(*((uint32_t *) temp_buffer + 6)); 1025 } else { 1026 if (recvn 1027 (_client_tcp_port->socket_fd, 1028 (char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1, 1029 NULL) < 0) { 1030 rcs_print_error 1031 ("Can not read from client port (%d) from %s\n", 1032 _client_tcp_port->socket_fd, 1033 inet_ntoa(_client_tcp_port->address.sin_addr)); 1034 _client_tcp_port->errors++; 1035 return; 1036 } 1037 } 1038 blocking_read_req->timeout_millis = 1039 ntohl(*((uint32_t *) temp_buffer + 5)); 1040 blocking_read_req->server = server; 1041 blocking_read_req->remport = this; 1042 _client_tcp_port->blocking = 1; 1043 blocking_read_req->_client_tcp_port = _client_tcp_port; 1044 #ifdef POSIX_THREADS 1045 int thr_retval = pthread_create(&(_client_tcp_port->threadId), /* ptr to new-thread-id */ 1046 NULL, // pthread_attr_t *, ptr to attributes 1047 tcpsvr_handle_blocking_request, // start_func 1048 blocking_read_req // arg for start_func 1049 ); 1050 if (thr_retval != 0) { 1051 _client_tcp_port->blocking = 0; 1052 rcs_print_error("pthread_create error: thr_retval = %d\n", 1053 thr_retval); 1054 rcs_print_error("pthread_create error: %d %s\n", errno, 1055 strerror(errno)); 1056 *((uint32_t *) temp_buffer) = 1057 htonl(_client_tcp_port->serial_number); 1058 *((uint32_t *) temp_buffer + 1) = 1059 htonl((unsigned long) CMS_SERVER_SIDE_ERROR); 1060 putbe32(temp_buffer + 8, 0); /* size */ 1061 putbe32(temp_buffer + 12, 0); /* write_id */ 1062 putbe32(temp_buffer + 16, 0); /* was_read */ 1063 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, 1064 dtimeout); 1065 return; 1066 } 1067 #else 1068 #ifdef NO_THREADS 1069 int fork_ret = fork(); 1070 switch (fork_ret) { 1071 case 0: // child 1072 _client_tcp_port->threadId = getpid(); 1073 tcpsvr_handle_blocking_request(blocking_read_req); 1074 exit(0); 1075 break; 1076 1077 case -1: // Error 1078 rcs_print_error("fork error: %d %s\n", errno, 1079 strerror(errno)); 1080 _client_tcp_port->blocking = 0; 1081 putbe32(temp_buffer, _client_tcp_port->serial_number); 1082 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1083 putbe32(temp_buffer + 8, 0); 1084 putbe32(temp_buffer + 12, 0); 1085 putbe32(temp_buffer + 16, 0); 1086 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, 1087 dtimeout); 1088 break; 1089 1090 default: // parent; 1091 _client_tcp_port->threadId = fork_ret; 1092 break; 1093 } 1094 #else 1095 rcs_print_error 1096 ("Blocking read not supported on this platform.\n"); 1097 *((uint32_t *) temp_buffer) = 1098 htonl(_client_tcp_port->serial_number); 1099 *((uint32_t *) temp_buffer + 1) = 1100 htonl((unsigned long) CMS_SERVER_SIDE_ERROR); 1101 putbe32(temp_buffer + 8, 0); /* size */ 1102 putbe32(temp_buffer + 12, 0); /* write_id */ 1103 putbe32(temp_buffer + 16, 0); /* was_read */ 1104 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); 1105 return; 1106 1107 #endif 1108 #endif 1109 tcpsvr_threads_created++; 1110 } 1111 break; 1112 1113 case REMOTE_CMS_READ_REQUEST_TYPE: 1114 server->read_req.buffer_number = buffer_number; 1115 server->read_req.access_type = ntohl(*((uint32_t *) temp_buffer + 3)); 1116 server->read_req.last_id_read = ntohl(*((uint32_t *) temp_buffer + 4)); 1117 server->read_reply = 1118 (REMOTE_READ_REPLY *) server->process_request(&server->read_req); 1119 if (max_total_subdivisions > 1) { 1120 total_subdivisions = 1121 server->get_total_subdivisions(buffer_number); 1122 } 1123 if (total_subdivisions > 1) { 1124 if (recvn 1125 (_client_tcp_port->socket_fd, 1126 (char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1, 1127 NULL) < 0) { 1128 rcs_print_error 1129 ("Can not read from client port (%d) from %s\n", 1130 _client_tcp_port->socket_fd, 1131 inet_ntoa(_client_tcp_port->address.sin_addr)); 1132 _client_tcp_port->errors++; 1133 return; 1134 } 1135 server->read_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 5)); 1136 } else { 1137 server->read_req.subdiv = 0; 1138 } 1139 if (NULL == server->read_reply) { 1140 rcs_print_error("Server could not process request.\n"); 1141 putbe32(temp_buffer, _client_tcp_port->serial_number); 1142 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1143 putbe32(temp_buffer + 8, 0); 1144 putbe32(temp_buffer + 12, 0); 1145 putbe32(temp_buffer + 16, 0); 1146 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); 1147 return; 1148 } 1149 putbe32(temp_buffer, _client_tcp_port->serial_number); 1150 putbe32(temp_buffer + 4, server->read_reply->status); 1151 putbe32(temp_buffer + 8, server->read_reply->size); 1152 putbe32(temp_buffer + 12, server->read_reply->write_id); 1153 putbe32(temp_buffer + 16, server->read_reply->was_read); 1154 if (server->read_reply->size < (0x2000 - 20) 1155 && server->read_reply->size > 0) { 1156 memcpy(temp_buffer + 20, server->read_reply->data, 1157 server->read_reply->size); 1158 if (sendn 1159 (_client_tcp_port->socket_fd, temp_buffer, 1160 20 + server->read_reply->size, 0, dtimeout) < 0) { 1161 _client_tcp_port->errors++; 1162 return; 1163 } 1164 } else { 1165 if (sendn 1166 (_client_tcp_port->socket_fd, temp_buffer, 20, 0, 1167 dtimeout) < 0) { 1168 _client_tcp_port->errors++; 1169 return; 1170 } 1171 if (server->read_reply->size > 0) { 1172 if (sendn 1173 (_client_tcp_port->socket_fd, server->read_reply->data, 1174 server->read_reply->size, 0, dtimeout) < 0) { 1175 _client_tcp_port->errors++; 1176 return; 1177 } 1178 } 1179 } 1180 break; 1181 1182 case REMOTE_CMS_WRITE_REQUEST_TYPE: 1183 server->write_req.buffer_number = buffer_number; 1184 server->write_req.access_type = ntohl(*((uint32_t *) temp_buffer + 3)); 1185 server->write_req.size = ntohl(*((uint32_t *) temp_buffer + 4)); 1186 total_subdivisions = 1; 1187 if (max_total_subdivisions > 1) { 1188 total_subdivisions = 1189 server->get_total_subdivisions(buffer_number); 1190 } 1191 if (total_subdivisions > 1) { 1192 if (recvn 1193 (_client_tcp_port->socket_fd, 1194 (char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1, 1195 NULL) < 0) { 1196 rcs_print_error 1197 ("Can not read from client port (%d) from %s\n", 1198 _client_tcp_port->socket_fd, 1199 inet_ntoa(_client_tcp_port->address.sin_addr)); 1200 _client_tcp_port->errors++; 1201 return; 1202 } 1203 server->write_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 5)); 1204 } else { 1205 server->write_req.subdiv = 0; 1206 } 1207 if (server->write_req.size > 0) { 1208 if (recvn 1209 (_client_tcp_port->socket_fd, server->write_req.data, 1210 server->write_req.size, 0, -1, NULL) < 0) { 1211 _client_tcp_port->errors++; 1212 return; 1213 } 1214 } 1215 REMOTE_WRITE_REPLY *reply; 1216 server->write_reply = reply = 1217 (REMOTE_WRITE_REPLY *) server->process_request(&server-> 1218 write_req); 1219 if (((min_compatible_version < 2.58) && (min_compatible_version > 1e-6)) || server->write_reply->confirm_write) { 1220 if (NULL == server->write_reply) { 1221 rcs_print_error("Server could not process request.\n"); 1222 putbe32(temp_buffer, reply->write_id); 1223 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1224 putbe32(temp_buffer + 8, 0); /* was_read */ 1225 sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, 1226 dtimeout); 1227 return; 1228 } 1229 putbe32(temp_buffer, reply->write_id); 1230 putbe32(temp_buffer + 4, reply->status); 1231 putbe32(temp_buffer + 8, reply->was_read); 1232 if (sendn 1233 (_client_tcp_port->socket_fd, temp_buffer, 12, 0, 1234 dtimeout) < 0) { 1235 _client_tcp_port->errors++; 1236 } 1237 } else { 1238 if (NULL == server->write_reply) { 1239 rcs_print_error("Server could not process request.\n"); 1240 } 1241 } 1242 break; 1243 1244 case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE: 1245 server->check_if_read_req.buffer_number = buffer_number; 1246 server->check_if_read_req.subdiv = 1247 ntohl(*((uint32_t *) temp_buffer + 3)); 1248 server->check_if_read_reply = 1249 (REMOTE_CHECK_IF_READ_REPLY *) server->process_request(&server-> 1250 check_if_read_req); 1251 if (NULL == server->check_if_read_reply) { 1252 rcs_print_error("Server could not process request.\n"); 1253 putbe32(temp_buffer, _client_tcp_port->serial_number); 1254 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1255 putbe32(temp_buffer + 8, 0); /* was_read */ 1256 sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout); 1257 return; 1258 } 1259 putbe32(temp_buffer, _client_tcp_port->serial_number); 1260 *((uint32_t *) temp_buffer + 1) = 1261 htonl(server->check_if_read_reply->status); 1262 *((uint32_t *) temp_buffer + 2) = 1263 htonl(server->check_if_read_reply->was_read); 1264 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 1265 0) { 1266 _client_tcp_port->errors++; 1267 } 1268 break; 1269 1270 case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE: 1271 server->get_msg_count_req.buffer_number = buffer_number; 1272 server->get_msg_count_req.subdiv = 1273 ntohl(*((uint32_t *) temp_buffer + 3)); 1274 server->get_msg_count_reply = 1275 (REMOTE_GET_MSG_COUNT_REPLY *) server->process_request(&server-> 1276 get_msg_count_req); 1277 if (NULL == server->get_msg_count_reply) { 1278 rcs_print_error("Server could not process request.\n"); 1279 putbe32(temp_buffer, _client_tcp_port->serial_number); 1280 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1281 putbe32(temp_buffer + 8, 0); /* was_read */ 1282 sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout); 1283 return; 1284 } 1285 putbe32(temp_buffer, _client_tcp_port->serial_number); 1286 *((uint32_t *) temp_buffer + 1) = 1287 htonl(server->get_msg_count_reply->status); 1288 *((uint32_t *) temp_buffer + 2) = 1289 htonl(server->get_msg_count_reply->count); 1290 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 1291 0) { 1292 _client_tcp_port->errors++; 1293 } 1294 break; 1295 1296 case REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE: 1297 server->get_queue_length_req.buffer_number = buffer_number; 1298 server->get_queue_length_req.subdiv = 1299 ntohl(*((uint32_t *) temp_buffer + 3)); 1300 server->get_queue_length_reply = 1301 (REMOTE_GET_QUEUE_LENGTH_REPLY *) server-> 1302 process_request(&server->get_queue_length_req); 1303 if (NULL == server->get_queue_length_reply) { 1304 rcs_print_error("Server could not process request.\n"); 1305 putbe32(temp_buffer, _client_tcp_port->serial_number); 1306 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1307 putbe32(temp_buffer + 8, 0); /* was_read */ 1308 sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout); 1309 return; 1310 } 1311 putbe32(temp_buffer, _client_tcp_port->serial_number); 1312 *((uint32_t *) temp_buffer + 1) = 1313 htonl(server->get_queue_length_reply->status); 1314 *((uint32_t *) temp_buffer + 2) = 1315 htonl(server->get_queue_length_reply->queue_length); 1316 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 1317 0) { 1318 _client_tcp_port->errors++; 1319 } 1320 break; 1321 1322 case REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE: 1323 server->get_space_available_req.buffer_number = buffer_number; 1324 server->get_space_available_req.subdiv = 1325 ntohl(*((uint32_t *) temp_buffer + 3)); 1326 server->get_space_available_reply = 1327 (REMOTE_GET_SPACE_AVAILABLE_REPLY *) server-> 1328 process_request(&server->get_space_available_req); 1329 if (NULL == server->get_space_available_reply) { 1330 rcs_print_error("Server could not process request.\n"); 1331 putbe32(temp_buffer, _client_tcp_port->serial_number); 1332 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1333 putbe32(temp_buffer + 8, 0); /* was_read */ 1334 sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout); 1335 return; 1336 } 1337 putbe32(temp_buffer, _client_tcp_port->serial_number); 1338 *((uint32_t *) temp_buffer + 1) = 1339 htonl(server->get_space_available_reply->status); 1340 *((uint32_t *) temp_buffer + 2) = 1341 htonl(server->get_space_available_reply->space_available); 1342 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 1343 0) { 1344 _client_tcp_port->errors++; 1345 } 1346 break; 1347 1348 case REMOTE_CMS_CLEAR_REQUEST_TYPE: 1349 server->clear_req.buffer_number = buffer_number; 1350 server->clear_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 3)); 1351 server->clear_reply = 1352 (REMOTE_CLEAR_REPLY *) server->process_request(&server-> 1353 clear_req); 1354 if (NULL == server->clear_reply) { 1355 rcs_print_error("Server could not process request.\n"); 1356 putbe32(temp_buffer, _client_tcp_port->serial_number); 1357 putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR); 1358 sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); 1359 return; 1360 } 1361 putbe32(temp_buffer, _client_tcp_port->serial_number); 1362 putbe32(temp_buffer + 4, server->clear_reply->status); 1363 if (sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout) < 1364 0) { 1365 _client_tcp_port->errors++; 1366 } 1367 break; 1368 1369 case REMOTE_CMS_CLEAN_REQUEST_TYPE: 1370 server->spawner_pid = server->server_pid; 1371 server->kill_server(); 1372 break; 1373 1374 case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE: 1375 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head(); 1376 while (NULL != client_port_to_check) { 1377 if (client_port_to_check->socket_fd == 1378 _client_tcp_port->socket_fd) { 1379 break; 1380 } 1381 client_port_to_check = 1382 (CLIENT_TCP_PORT *) client_ports->get_next(); 1383 } 1384 FD_CLR(_client_tcp_port->socket_fd, &read_fd_set); 1385 close(_client_tcp_port->socket_fd); 1386 current_clients--; 1387 if (NULL != _client_tcp_port->subscriptions) { 1388 remove_subscription_client(_client_tcp_port, buffer_number); 1389 } 1390 _client_tcp_port->socket_fd = -1; 1391 delete _client_tcp_port; 1392 client_ports->delete_current_node(); 1393 break; 1394 1395 case REMOTE_CMS_GET_KEYS_REQUEST_TYPE: 1396 server->get_keys_req.buffer_number = buffer_number; 1397 if (recvn(_client_tcp_port->socket_fd, 1398 server->get_keys_req.name, 16, 0, -1, NULL) < 0) { 1399 _client_tcp_port->errors++; 1400 return; 1401 } 1402 server->get_keys_reply = 1403 (REMOTE_GET_KEYS_REPLY *) server->process_request(&server-> 1404 get_keys_req); 1405 if (NULL == server->get_keys_reply) { 1406 rcs_print_error("Server could not process request.\n"); 1407 memset(temp_buffer, 0, 20); 1408 putbe32(temp_buffer, _client_tcp_port->serial_number); 1409 server->gen_random_key(((char *) temp_buffer) + 4, 2); 1410 server->gen_random_key(((char *) temp_buffer) + 12, 2); 1411 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); 1412 return; 1413 } else { 1414 putbe32(temp_buffer, _client_tcp_port->serial_number); 1415 memcpy(((char *) temp_buffer) + 4, server->get_keys_reply->key1, 1416 8); 1417 memcpy(((char *) temp_buffer) + 12, server->get_keys_reply->key2, 1418 8); 1419 /* successful ? */ 1420 sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout); 1421 return; 1422 } 1423 break; 1424 1425 case REMOTE_CMS_LOGIN_REQUEST_TYPE: 1426 server->login_req.buffer_number = buffer_number; 1427 if (recvn(_client_tcp_port->socket_fd, 1428 server->login_req.name, 16, 0, -1, NULL) < 0) { 1429 _client_tcp_port->errors++; 1430 return; 1431 } 1432 if (recvn(_client_tcp_port->socket_fd, 1433 server->login_req.passwd, 16, 0, -1, NULL) < 0) { 1434 _client_tcp_port->errors++; 1435 return; 1436 } 1437 server->login_reply = 1438 (REMOTE_LOGIN_REPLY *) server->process_request(&server-> 1439 login_req); 1440 if (NULL == server->login_reply) { 1441 rcs_print_error("Server could not process request.\n"); 1442 putbe32(temp_buffer, _client_tcp_port->serial_number); 1443 putbe32(temp_buffer + 4, 0); /* not successful */ 1444 sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); 1445 return; 1446 } else { 1447 putbe32(temp_buffer, _client_tcp_port->serial_number); 1448 putbe32(temp_buffer + 4, server->login_reply->success); 1449 /* successful ? */ 1450 sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); 1451 return; 1452 } 1453 break; 1454 1455 case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE: 1456 server->set_subscription_req.buffer_number = buffer_number; 1457 server->set_subscription_req.subscription_type = 1458 ntohl(*((uint32_t *) temp_buffer + 3)); 1459 server->set_subscription_req.poll_interval_millis = 1460 ntohl(*((uint32_t *) temp_buffer + 4)); 1461 server->set_subscription_reply = 1462 (REMOTE_SET_SUBSCRIPTION_REPLY *) server-> 1463 process_request(&server->set_subscription_req); 1464 if (NULL == server->set_subscription_reply) { 1465 rcs_print_error("Server could not process request.\n"); 1466 putbe32(temp_buffer, _client_tcp_port->serial_number); 1467 putbe32(temp_buffer + 4, 0); /* not successful */ 1468 sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); 1469 return; 1470 } else { 1471 if (server->set_subscription_reply->success) { 1472 if (server->set_subscription_req.subscription_type == 1473 CMS_POLLED_SUBSCRIPTION 1474 || server->set_subscription_req.subscription_type == 1475 CMS_VARIABLE_SUBSCRIPTION) { 1476 add_subscription_client(buffer_number, 1477 server->set_subscription_req. 1478 subscription_type, 1479 server->set_subscription_req. 1480 poll_interval_millis, _client_tcp_port); 1481 } 1482 if (server->set_subscription_req.subscription_type == 1483 CMS_NO_SUBSCRIPTION) { 1484 remove_subscription_client(_client_tcp_port, 1485 buffer_number); 1486 } 1487 } 1488 putbe32(temp_buffer, _client_tcp_port->serial_number); 1489 *((uint32_t *) temp_buffer + 1) = 1490 htonl(server->set_subscription_reply->success); 1491 /* successful ? */ 1492 sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout); 1493 return; 1494 } 1495 break; 1496 1497 default: 1498 _client_tcp_port->errors++; 1499 rcs_print_error("Unrecognized request type received.(%ld)\n", 1500 request_type); 1501 break; 1502 } 1503 } 1504 1505 void CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client(int buffer_number, 1506 int subscription_type, int poll_interval_millis, CLIENT_TCP_PORT * clnt) 1507 { 1508 if (NULL == subscription_buffers) { 1509 subscription_buffers = new LinkedList(); 1510 } 1511 if (NULL == subscription_buffers) { 1512 rcs_print_error("Can`t create subscription_buffers list.\n"); 1513 } 1514 1515 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = 1516 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); 1517 while (NULL != buf_info) { 1518 if (buf_info->buffer_number == buffer_number) { 1519 break; 1520 } 1521 buf_info = 1522 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); 1523 } 1524 if (NULL == buf_info) { 1525 buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO(); 1526 buf_info->buffer_number = buffer_number; 1527 buf_info->sub_clnt_info = new LinkedList(); 1528 buf_info->list_id = 1529 subscription_buffers->store_at_tail(buf_info, sizeof(*buf_info), 1530 0); 1531 } 1532 buf_info->min_last_id = 0; 1533 if (NULL == clnt->subscriptions) { 1534 clnt->subscriptions = new LinkedList(); 1535 } 1536 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 1537 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head(); 1538 while (temp_clnt_info != NULL) { 1539 if (temp_clnt_info->buffer_number == buffer_number) { 1540 break; 1541 } 1542 temp_clnt_info = 1543 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next(); 1544 } 1545 if (NULL == temp_clnt_info) { 1546 temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO(); 1547 temp_clnt_info->last_sub_sent_time = 0.0; 1548 temp_clnt_info->buffer_number = buffer_number; 1549 temp_clnt_info->subscription_paused = 0; 1550 temp_clnt_info->last_id_read = 0; 1551 temp_clnt_info->sub_buf_info = buf_info; 1552 temp_clnt_info->clnt_port = clnt; 1553 temp_clnt_info->last_sub_sent_time = etime(); 1554 temp_clnt_info->subscription_list_id = 1555 clnt->subscriptions->store_at_tail(temp_clnt_info, 1556 sizeof(*temp_clnt_info), 0); 1557 buf_info->sub_clnt_info->store_at_tail(temp_clnt_info, 1558 sizeof(*temp_clnt_info), 0); 1559 } 1560 temp_clnt_info->subscription_type = subscription_type; 1561 temp_clnt_info->poll_interval_millis = poll_interval_millis; 1562 recalculate_polling_interval(); 1563 } 1564 1565 void CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client(CLIENT_TCP_PORT * 1566 clnt, int buffer_number) 1567 { 1568 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 1569 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head(); 1570 while (temp_clnt_info != NULL) { 1571 if (temp_clnt_info->buffer_number == buffer_number) { 1572 if (NULL != temp_clnt_info->sub_buf_info) { 1573 if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info) { 1574 temp_clnt_info->sub_buf_info->sub_clnt_info-> 1575 delete_node(temp_clnt_info->subscription_list_id); 1576 if (temp_clnt_info->sub_buf_info->sub_clnt_info-> 1577 list_size == 0) { 1578 subscription_buffers->delete_node(temp_clnt_info-> 1579 sub_buf_info->list_id); 1580 delete temp_clnt_info->sub_buf_info->sub_clnt_info; 1581 temp_clnt_info->sub_buf_info->sub_clnt_info = NULL; 1582 delete temp_clnt_info->sub_buf_info; 1583 temp_clnt_info->sub_buf_info = NULL; 1584 } 1585 } 1586 } 1587 delete temp_clnt_info; 1588 temp_clnt_info = NULL; 1589 break; 1590 } 1591 temp_clnt_info = 1592 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next(); 1593 } 1594 recalculate_polling_interval(); 1595 } 1596 1597 void CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval() 1598 { 1599 int min_poll_interval_millis = 30000; 1600 polling_enabled = 0; 1601 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = 1602 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); 1603 while (NULL != buf_info) { 1604 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 1605 (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info-> 1606 get_head(); 1607 while (temp_clnt_info != NULL) { 1608 if (temp_clnt_info->poll_interval_millis < 1609 min_poll_interval_millis 1610 && temp_clnt_info->subscription_type == 1611 CMS_POLLED_SUBSCRIPTION) { 1612 min_poll_interval_millis = 1613 temp_clnt_info->poll_interval_millis; 1614 polling_enabled = 1; 1615 } 1616 temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) 1617 buf_info->sub_clnt_info->get_next(); 1618 } 1619 buf_info = 1620 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); 1621 } 1622 if (min_poll_interval_millis >= ((int) (clk_tck() * 1000.0))) { 1623 current_poll_interval_millis = min_poll_interval_millis; 1624 } else { 1625 current_poll_interval_millis = ((int) (clk_tck() * 1000.0)); 1626 } 1627 select_timeout.tv_sec = current_poll_interval_millis / 1000; 1628 select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000; 1629 dtimeout = (current_poll_interval_millis + 10) * 1000.0; 1630 if (dtimeout < 0.5) { 1631 dtimeout = 0.5; 1632 } 1633 } 1634 1635 void CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions() 1636 { 1637 pid_t pid = getpid(); 1638 pid_t tid = 0; 1639 CMS_SERVER *server; 1640 server = find_server(pid, tid); 1641 if (NULL == server) { 1642 rcs_print_error 1643 ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n", 1644 pid); 1645 return; 1646 } 1647 if (NULL == subscription_buffers) { 1648 return; 1649 } 1650 double cur_time = etime(); 1651 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info = 1652 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head(); 1653 while (NULL != buf_info) { 1654 server->read_req.buffer_number = buf_info->buffer_number; 1655 server->read_req.access_type = CMS_READ_ACCESS; 1656 server->read_req.last_id_read = buf_info->min_last_id; 1657 server->read_reply = 1658 (REMOTE_READ_REPLY *) server->process_request(&server->read_req); 1659 if (NULL == server->read_reply) { 1660 rcs_print_error("Server could not process request.\n"); 1661 buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) 1662 subscription_buffers->get_next(); 1663 continue; 1664 } 1665 if (server->read_reply->write_id == buf_info->min_last_id || 1666 server->read_reply->size < 1) { 1667 buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *) 1668 subscription_buffers->get_next(); 1669 continue; 1670 } 1671 putbe32(temp_buffer, 0); 1672 putbe32(temp_buffer + 4, server->read_reply->status); 1673 putbe32(temp_buffer + 8, server->read_reply->size); 1674 putbe32(temp_buffer + 12, server->read_reply->write_id); 1675 putbe32(temp_buffer + 16, server->read_reply->was_read); 1676 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 1677 (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info-> 1678 get_head(); 1679 buf_info->min_last_id = server->read_reply->write_id; 1680 while (temp_clnt_info != NULL) { 1681 double time_diff = cur_time - temp_clnt_info->last_sub_sent_time; 1682 int time_diff_millis = (int) ((double) time_diff * 1000.0); 1683 rcs_print_debug(PRINT_SERVER_SUBSCRIPTION_ACTIVITY, 1684 "Subscription time_diff_millis=%d\n", time_diff_millis); 1685 if (((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION 1686 && time_diff_millis + 10 >= 1687 temp_clnt_info->poll_interval_millis) 1688 || temp_clnt_info->subscription_type == 1689 CMS_VARIABLE_SUBSCRIPTION) 1690 && temp_clnt_info->last_id_read != 1691 server->read_reply->write_id) { 1692 temp_clnt_info->last_id_read = server->read_reply->write_id; 1693 temp_clnt_info->last_sub_sent_time = cur_time; 1694 temp_clnt_info->clnt_port->serial_number++; 1695 putbe32(temp_buffer, temp_clnt_info->clnt_port->serial_number); 1696 if (server->read_reply->size < 0x2000 - 20 1697 && server->read_reply->size > 0) { 1698 memcpy(temp_buffer + 20, server->read_reply->data, 1699 server->read_reply->size); 1700 if (sendn 1701 (temp_clnt_info->clnt_port->socket_fd, temp_buffer, 1702 20 + server->read_reply->size, 0, dtimeout) < 0) { 1703 temp_clnt_info->clnt_port->errors++; 1704 return; 1705 } 1706 } else { 1707 if (sendn(temp_clnt_info->clnt_port->socket_fd, 1708 temp_buffer, 20, 0, dtimeout) < 0) { 1709 temp_clnt_info->clnt_port->errors++; 1710 return; 1711 } 1712 if (server->read_reply->size > 0) { 1713 if (sendn(temp_clnt_info->clnt_port->socket_fd, 1714 server->read_reply->data, 1715 server->read_reply->size, 0, dtimeout) < 0) { 1716 temp_clnt_info->clnt_port->errors++; 1717 return; 1718 } 1719 } 1720 } 1721 } 1722 if (temp_clnt_info->last_id_read < buf_info->min_last_id) { 1723 buf_info->min_last_id = temp_clnt_info->last_id_read; 1724 } 1725 temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *) 1726 buf_info->sub_clnt_info->get_next(); 1727 } 1728 buf_info = 1729 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next(); 1730 } 1731 } 1732 1733 TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO() 1734 { 1735 buffer_number = -1; 1736 min_last_id = 0; 1737 list_id = -1; 1738 sub_clnt_info = NULL; 1739 } 1740 1741 TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO() 1742 { 1743 buffer_number = -1; 1744 min_last_id = 0; 1745 list_id = -1; 1746 if (NULL != sub_clnt_info) { 1747 delete sub_clnt_info; 1748 sub_clnt_info = NULL; 1749 } 1750 } 1751 1752 TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO() 1753 { 1754 subscription_type = CMS_NO_SUBSCRIPTION; 1755 poll_interval_millis = 30000; 1756 last_sub_sent_time = 0.0; 1757 subscription_list_id = -1; 1758 buffer_number = -1; 1759 subscription_paused = 0; 1760 last_id_read = 0; 1761 sub_buf_info = NULL; 1762 clnt_port = NULL; 1763 } 1764 1765 TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO() 1766 { 1767 subscription_type = CMS_NO_SUBSCRIPTION; 1768 poll_interval_millis = 30000; 1769 last_sub_sent_time = 0.0; 1770 subscription_list_id = -1; 1771 buffer_number = -1; 1772 subscription_paused = 0; 1773 last_id_read = 0; 1774 sub_buf_info = NULL; 1775 clnt_port = NULL; 1776 } 1777 1778 CLIENT_TCP_PORT::CLIENT_TCP_PORT() 1779 { 1780 serial_number = 0; 1781 errors = 0; 1782 max_errors = 50; 1783 address.sin_port = 0; 1784 address.sin_family = AF_INET; 1785 address.sin_addr.s_addr = htonl(INADDR_ANY); 1786 socket_fd = -1; 1787 subscriptions = NULL; 1788 tid = -1; 1789 pid = -1; 1790 blocking_read_req = NULL; 1791 threadId = 0; 1792 diag_info = NULL; 1793 } 1794 1795 CLIENT_TCP_PORT::~CLIENT_TCP_PORT() 1796 { 1797 if (socket_fd > 0) { 1798 close(socket_fd); 1799 socket_fd = -1; 1800 } 1801 if (NULL != subscriptions) { 1802 TCP_CLIENT_SUBSCRIPTION_INFO *sub_info = 1803 (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head(); 1804 while (NULL != sub_info) { 1805 delete sub_info; 1806 sub_info = 1807 (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next(); 1808 } 1809 delete subscriptions; 1810 subscriptions = NULL; 1811 } 1812 #ifdef NO_THREADS 1813 if (NULL != blocking_read_req) { 1814 delete blocking_read_req; 1815 blocking_read_req = NULL; 1816 } 1817 #endif 1818 if (NULL != diag_info) { 1819 delete diag_info; 1820 diag_info = NULL; 1821 } 1822 }