tcpmem.cc
1 /******************************************************************** 2 * Description: tcpmem.cc 3 * 4 * Derived from a work by Fred Proctor & Will Shackleford 5 * 6 * Author: 7 * License: GPL Version 2 8 * System: Linux 9 * 10 * Copyright (c) 2004 All rights reserved. 11 * 12 * Last change: 13 ********************************************************************/ 14 15 #if defined(__GNUC__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 4))) 16 #pragma GCC optimize "-fno-strict-aliasing" 17 #pragma GCC diagnostic ignored "-Wstrict-aliasing" 18 #endif 19 20 #ifdef __cplusplus 21 extern "C" { 22 #endif 23 #include <stdlib.h> // strtol() 24 #include <unistd.h> 25 #include <string.h> // strstr() 26 #include <errno.h> // errno, strerror() 27 #include <signal.h> // signal, SIG_ERR, SIGPIPE 28 #include <ctype.h> // isdigit() 29 #include <arpa/inet.h> /* inet_ntoa */ 30 #include <sys/socket.h> 31 #include <sys/time.h> /* struct timeval */ 32 #include <netinet/in.h> /* sockaddr_in */ 33 #include <netdb.h> 34 #include <math.h> /* fmod() */ 35 36 #ifdef __cplusplus 37 } 38 #endif 39 #include "rem_msg.hh" /* REMOTE_CMS_READ_REQUEST_TYPE, etc. */ 40 #include "rcs_print.hh" /* rcs_print_error() */ 41 #include "cmsdiag.hh" 42 #define DEFAULT_MAX_CONSECUTIVE_TIMEOUTS (-1) 43 #include "timer.hh" /* esleep() */ 44 #include "tcpmem.hh" 45 #include "recvn.h" /* recvn() */ 46 #include "sendn.h" /* sendn() */ 47 #include "tcp_opts.hh" /* SET_TCP_NODELAY */ 48 #include "linklist.hh" /* LinkedList */ 49 50 int tcpmem_sigpipe_count = 0; 51 int last_sig = 0; 52 53 void tcpmem_sigpipe_handler(int sig) 54 { 55 last_sig = sig; 56 tcpmem_sigpipe_count++; 57 } 58 59 TCPMEM::TCPMEM(const char *_bufline, const char *_procline):CMS(_bufline, _procline) 60 { 61 max_consecutive_timeouts = DEFAULT_MAX_CONSECUTIVE_TIMEOUTS; 62 char *max_consecutive_timeouts_string; 63 max_consecutive_timeouts_string = strstr(ProcessLine, "max_timeouts="); 64 polling = (NULL != strstr(proclineupper, "POLL")); 65 socket_fd = 0; 66 reconnect_needed = 0; 67 autoreconnect = 1; 68 old_handler = (void (*)(int)) SIG_ERR; 69 sigpipe_count = 0; 70 subscription_count = 0; 71 read_serial_number = 0; 72 write_serial_number = 0; 73 read_socket_fd = 0; 74 write_socket_fd = 0; 75 if (NULL != max_consecutive_timeouts_string) { 76 max_consecutive_timeouts_string += strlen("max_timeouts="); 77 if (!strncmp(max_consecutive_timeouts_string, "INF", 3)) { 78 max_consecutive_timeouts = -1; 79 } else { 80 max_consecutive_timeouts = 81 strtol(max_consecutive_timeouts_string, (char **) NULL, 0); 82 } 83 } 84 85 char *sub_info_string = NULL; 86 poll_interval_millis = 30000; 87 subscription_type = CMS_NO_SUBSCRIPTION; 88 sub_info_string = strstr(ProcessLine, "sub="); 89 if (NULL != sub_info_string) { 90 if (!strncmp(sub_info_string + 4, "none", 4)) { 91 subscription_type = CMS_NO_SUBSCRIPTION; 92 } else if (!strncmp(sub_info_string + 4, "var", 3)) { 93 subscription_type = CMS_VARIABLE_SUBSCRIPTION; 94 } else { 95 poll_interval_millis = 96 ((int) (atof(sub_info_string + 4) * 1000.0)); 97 subscription_type = CMS_POLLED_SUBSCRIPTION; 98 } 99 } 100 if (NULL != strstr(ProcessLine, "noreconnect")) { 101 autoreconnect = 0; 102 } 103 server_host_entry = NULL; 104 105 /* Set up the socket address stucture. */ 106 memset(&server_socket_address, 0, sizeof(server_socket_address)); 107 server_socket_address.sin_family = AF_INET; 108 server_socket_address.sin_port = htons(((u_short) tcp_port_number)); 109 int hostname_was_address = 0; 110 char bufferhost_first_char = BufferHost[0]; 111 if (bufferhost_first_char >= '0' && bufferhost_first_char <= '9') { 112 server_socket_address.sin_addr.s_addr = inet_addr(BufferHost); 113 if (server_socket_address.sin_addr.s_addr != INADDR_NONE) { 114 hostname_was_address = 1; 115 } 116 } 117 118 if (!hostname_was_address) { 119 /* Get the IP address of the server using it's BufferHost. */ 120 server_host_entry = gethostbyname(BufferHost); 121 if (NULL == server_host_entry) { 122 status = CMS_CONFIG_ERROR; 123 autoreconnect = 0; 124 rcs_print_error("TCPMEM: Couldn't get host address for (%s).\n", 125 BufferHost); 126 return; 127 } 128 server_socket_address.sin_addr.s_addr = 129 *((int *) server_host_entry->h_addr_list[0]); 130 server_socket_address.sin_family = server_host_entry->h_addrtype; 131 } 132 rcs_print_debug(PRINT_CMS_CONFIG_INFO, 133 "Using server on %s with IP address %s and port %d.\n", 134 BufferHost, 135 inet_ntoa(server_socket_address.sin_addr), tcp_port_number); 136 137 reconnect(); 138 139 if (status >= 0 && 140 (min_compatible_version > 2.58 || min_compatible_version < 1e-6)) { 141 verify_bufname(); 142 if (status < 0) { 143 rcs_print_error("TCPMEM: verify_bufname() failed.\n"); 144 } 145 } 146 147 if (status >= 0 && enable_diagnostics && 148 (min_compatible_version > 3.71 || min_compatible_version < 1e-6)) { 149 send_diag_info(); 150 } 151 } 152 153 static void put32(char *addr, uint32_t val) { 154 memcpy(addr, &val, sizeof(val)); 155 } 156 157 static void putbe32(char *addr, uint32_t val) { 158 val = htonl(val); 159 memcpy(addr, &val, sizeof(val)); 160 } 161 162 static uint32_t getbe32(char *addr) { 163 uint32_t val; 164 memcpy(&val, addr, sizeof(val)); 165 return ntohl(val); 166 } 167 168 void 169 TCPMEM::send_diag_info() 170 { 171 if (polling) { 172 return; 173 } 174 if (NULL == dpi) { 175 return; 176 } 177 disable_sigpipe(); 178 179 set_socket_fds(read_socket_fd); 180 memset(diag_info_buf, 0, 88); 181 putbe32(diag_info_buf, (uint32_t)serial_number); 182 putbe32(diag_info_buf + 4, REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE); 183 putbe32(diag_info_buf + 8, buffer_number); 184 strncpy(diag_info_buf + 20, dpi->name, 16); 185 strncpy(diag_info_buf + 36, dpi->host_sysinfo, 32); 186 putbe32(diag_info_buf + 68, (uint32_t) dpi->pid); 187 putbe32(diag_info_buf + 72, (uint32_t) connection_number); 188 memcpy(diag_info_buf + 76, &(dpi->rcslib_ver), 8); 189 put32(diag_info_buf + 84, (uint32_t) 0x11223344); 190 if (sendn(socket_fd, diag_info_buf, 88, 0, timeout) < 0) { 191 reconnect_needed = 1; 192 fatal_error_occurred = 1; 193 reenable_sigpipe(); 194 status = CMS_MISC_ERROR; 195 return; 196 } 197 serial_number++; 198 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 199 "TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n", 200 socket_fd, serial_number, 201 ntohl(*((uint32_t *) diag_info_buf + 1)), buffer_number); 202 reenable_sigpipe(); 203 204 } 205 206 void TCPMEM::verify_bufname() 207 { 208 if (polling) { 209 return; 210 } 211 disable_sigpipe(); 212 213 set_socket_fds(read_socket_fd); 214 215 putbe32(temp_buffer, (uint32_t) serial_number); 216 putbe32(temp_buffer + 4, REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE); 217 putbe32(temp_buffer + 8, buffer_number); 218 if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) { 219 reconnect_needed = 1; 220 fatal_error_occurred = 1; 221 reenable_sigpipe(); 222 status = CMS_MISC_ERROR; 223 return; 224 } 225 serial_number++; 226 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 227 "TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n", 228 socket_fd, serial_number, 229 ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number); 230 if (recvn(socket_fd, temp_buffer, 40, 0, timeout, &recvd_bytes) < 0) { 231 if (recvn_timedout) { 232 bytes_to_throw_away = 40; 233 return; 234 } 235 } 236 returned_serial_number = getbe32(temp_buffer); 237 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 238 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 239 socket_fd, returned_serial_number, buffer_number); 240 if (returned_serial_number != serial_number) { 241 rcs_print_error 242 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 243 returned_serial_number, serial_number); 244 reconnect_needed = 1; 245 fatal_error_occurred = 1; 246 reenable_sigpipe(); 247 status = CMS_MISC_ERROR; 248 return; 249 } 250 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 251 if (status < 0) { 252 return; 253 } 254 if (strncmp(temp_buffer + 8, BufferName, 31)) { 255 rcs_print_error 256 ("TCPMEM: The buffer (%s) is registered on TCP port %d of host %s with buffer number %ld.\n", 257 ((char *) temp_buffer + 8), tcp_port_number, BufferHost, 258 buffer_number); 259 rcs_print_error 260 ("TCPMEM: However, this process (%s) is attempting to connect to the buffer %s at the same location.\n", 261 ProcessName, BufferName); 262 status = CMS_RESOURCE_CONFLICT_ERROR; 263 return; 264 } 265 reenable_sigpipe(); 266 } 267 268 CMS_DIAGNOSTICS_INFO *TCPMEM::get_diagnostics_info() 269 { 270 if (polling) { 271 return (NULL); 272 } 273 disable_sigpipe(); 274 275 if (((int) handle_old_replies()) < 0) { 276 reenable_sigpipe(); 277 return (NULL); 278 } 279 280 set_socket_fds(read_socket_fd); 281 282 putbe32(temp_buffer, serial_number); 283 putbe32(temp_buffer + 4, REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE); 284 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 285 if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) { 286 reconnect_needed = 1; 287 fatal_error_occurred = 1; 288 reenable_sigpipe(); 289 status = CMS_MISC_ERROR; 290 return (NULL); 291 } 292 memset(temp_buffer, 0, 0x2000); 293 serial_number++; 294 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 295 "TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n", 296 socket_fd, serial_number, 297 ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number); 298 if (recvn(socket_fd, temp_buffer, 32, 0, -1.0, &recvd_bytes) < 0) { 299 if (recvn_timedout) { 300 bytes_to_throw_away = 32; 301 } 302 return (NULL); 303 } 304 recvd_bytes = 0; 305 returned_serial_number = getbe32(temp_buffer); 306 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 307 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 308 socket_fd, returned_serial_number, buffer_number); 309 if (returned_serial_number != serial_number) { 310 rcs_print_error 311 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 312 returned_serial_number, serial_number); 313 reconnect_needed = 1; 314 fatal_error_occurred = 1; 315 reenable_sigpipe(); 316 status = CMS_MISC_ERROR; 317 return (NULL); 318 } 319 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 320 if (status < 0) { 321 return (NULL); 322 } 323 if (NULL == di) { 324 di = new CMS_DIAGNOSTICS_INFO(); 325 di->dpis = new LinkedList(); 326 } else { 327 di->dpis->delete_members(); 328 } 329 di->last_writer_dpi = NULL; 330 di->last_reader_dpi = NULL; 331 di->last_writer = ntohl(*((uint32_t *) temp_buffer + 2)); 332 di->last_reader = ntohl(*((uint32_t *) temp_buffer + 3)); 333 double server_time; 334 memcpy(&server_time, temp_buffer + 16, 8); 335 double local_time = etime(); 336 double diff_time = local_time - server_time; 337 int dpi_count = ntohl(*((uint32_t *) temp_buffer + 6)); 338 int dpi_max_size = ntohl(*((uint32_t *) temp_buffer + 7)); 339 if (dpi_max_size > 32 && dpi_max_size < 0x2000) { 340 if (recvn 341 (socket_fd, temp_buffer + 32, dpi_max_size - 32, 0, -1.0, 342 &recvd_bytes) < 0) { 343 if (recvn_timedout) { 344 bytes_to_throw_away = dpi_max_size - 32; 345 return (NULL); 346 } 347 } 348 recvd_bytes = 0; 349 int dpi_offset = 32; 350 CMS_DIAG_PROC_INFO cms_dpi; 351 for (int i = 0; i < dpi_count && dpi_offset < dpi_max_size; i++) { 352 memset(&cms_dpi, 0, sizeof(CMS_DIAG_PROC_INFO)); 353 memcpy(cms_dpi.name, temp_buffer + dpi_offset, 16); 354 dpi_offset += 16; 355 memcpy(cms_dpi.host_sysinfo, temp_buffer + dpi_offset, 32); 356 dpi_offset += 32; 357 cms_dpi.pid = 358 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 359 dpi_offset += 4; 360 memcpy(&(cms_dpi.rcslib_ver), temp_buffer + dpi_offset, 8); 361 dpi_offset += 8; 362 cms_dpi.access_type = (CMS_INTERNAL_ACCESS_TYPE) 363 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 364 dpi_offset += 4; 365 cms_dpi.msg_id = 366 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 367 dpi_offset += 4; 368 cms_dpi.msg_size = 369 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 370 dpi_offset += 4; 371 cms_dpi.msg_type = 372 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 373 dpi_offset += 4; 374 cms_dpi.number_of_accesses = 375 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 376 dpi_offset += 4; 377 cms_dpi.number_of_new_messages = 378 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 379 dpi_offset += 4; 380 memcpy(&(cms_dpi.bytes_moved), temp_buffer + dpi_offset, 8); 381 dpi_offset += 8; 382 memcpy(&(cms_dpi.bytes_moved_across_socket), 383 temp_buffer + dpi_offset, 8); 384 dpi_offset += 8; 385 memcpy(&(cms_dpi.last_access_time), temp_buffer + dpi_offset, 8); 386 if (cmsdiag_timebias_set) { 387 cms_dpi.last_access_time += diff_time - cmsdiag_timebias; 388 } 389 dpi_offset += 8; 390 memcpy(&(cms_dpi.first_access_time), temp_buffer + dpi_offset, 8); 391 if (cmsdiag_timebias_set) { 392 cms_dpi.first_access_time += diff_time - cmsdiag_timebias; 393 } 394 dpi_offset += 8; 395 memcpy(&(cms_dpi.min_difference), temp_buffer + dpi_offset, 8); 396 dpi_offset += 8; 397 memcpy(&(cms_dpi.max_difference), temp_buffer + dpi_offset, 8); 398 dpi_offset += 8; 399 di->dpis->store_at_tail(&cms_dpi, sizeof(CMS_DIAG_PROC_INFO), 1); 400 int is_last_writer = 401 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 402 dpi_offset += 4; 403 if (is_last_writer) { 404 di->last_writer_dpi = 405 (CMS_DIAG_PROC_INFO *) di->dpis->get_tail(); 406 } 407 int is_last_reader = 408 ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset))); 409 dpi_offset += 4; 410 if (is_last_reader) { 411 di->last_reader_dpi = 412 (CMS_DIAG_PROC_INFO *) di->dpis->get_tail(); 413 } 414 } 415 } 416 reenable_sigpipe(); 417 return di; 418 } 419 420 void TCPMEM::reconnect() 421 { 422 if (socket_fd > 0) { 423 disconnect(); 424 } 425 subscription_count = 0; 426 timedout_request = NO_REMOTE_CMS_REQUEST; 427 bytes_to_throw_away = 0; 428 recvd_bytes = 0; 429 socket_fd = 0; 430 waiting_for_message = 0; 431 waiting_message_size = 0; 432 waiting_message_id = 0; 433 serial_number = 0; 434 435 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Creating socket . . .\n"); 436 437 socket_fd = socket(AF_INET, SOCK_STREAM, 0); 438 if (socket_fd < 0) { 439 rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n", 440 errno, strerror(errno)); 441 442 status = CMS_CREATE_ERROR; 443 return; 444 } 445 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Setting socket options . . . \n"); 446 if (set_tcp_socket_options(socket_fd) < 0) { 447 return; 448 } 449 struct timeval tm; 450 int socket_ret; 451 double start_time, current_time; 452 fd_set fds; 453 sockaddr_in cli_addr; 454 cli_addr.sin_family = AF_INET; 455 cli_addr.sin_addr.s_addr = htonl(INADDR_ANY); 456 cli_addr.sin_port = htons(0); 457 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n"); 458 if (bind(socket_fd, (struct sockaddr *) &cli_addr, sizeof(cli_addr)) < 0) { 459 rcs_print_error("TCPMEM: bind error %d = %s\n", errno, 460 strerror(errno)); 461 status = CMS_CREATE_ERROR; 462 } 463 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n"); 464 if (connect(socket_fd, (struct sockaddr *) &server_socket_address, 465 sizeof(server_socket_address)) < 0) { 466 if (EINPROGRESS == errno) { 467 468 tm.tv_sec = (long) timeout; 469 tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6); 470 FD_ZERO(&fds); 471 FD_SET(socket_fd, &fds); 472 start_time = etime(); 473 while (!(socket_ret = select(socket_fd + 1, 474 (fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) { 475 FD_SET(socket_fd, &fds); 476 esleep(0.001); 477 current_time = etime(); 478 double timeleft = start_time + timeout - current_time; 479 if (timeleft <= 0.0 && timeout >= 0.0) { 480 if (!reconnect_needed) { 481 rcs_print_error 482 ("TCPMEM: Timed out waiting for connection.\n"); 483 } 484 status = CMS_NO_SERVER_ERROR; 485 return; 486 } 487 tm.tv_sec = (long) timeleft; 488 tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6); 489 } 490 491 if (-1 == socket_ret) { 492 rcs_print_error("select error: %d -- %s\n", errno, 493 strerror(errno)); 494 rcs_print_error("TCPMEM: Couldn't connect.\n"); 495 status = CMS_NO_SERVER_ERROR; 496 return; 497 } 498 } else { 499 rcs_print_error("connect error: %d -- %s\n", errno, 500 strerror(errno)); 501 rcs_print_error 502 ("TCPMEM: Error trying to connect to TCP port %d of host %s(%s). sin_family=%d\n", 503 ntohs(server_socket_address.sin_port), BufferHost, 504 inet_ntoa(server_socket_address.sin_addr), 505 server_socket_address.sin_family); 506 status = CMS_NO_SERVER_ERROR; 507 return; 508 } 509 } 510 read_socket_fd = socket_fd; 511 512 memset(temp_buffer, 0, 32); 513 if (total_subdivisions > 1) { 514 subscription_type = CMS_NO_SUBSCRIPTION; 515 } 516 517 if (subscription_type != CMS_NO_SUBSCRIPTION) { 518 verify_bufname(); 519 if (status < 0) { 520 rcs_print_error("TCPMEM: verify_bufname() failed\n"); 521 return; 522 } 523 putbe32(temp_buffer, (uint32_t) serial_number); 524 putbe32(temp_buffer + 4, REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE); 525 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 526 putbe32(temp_buffer + 12, (uint32_t) subscription_type); 527 putbe32(temp_buffer + 16, (uint32_t) poll_interval_millis); 528 if (sendn(socket_fd, temp_buffer, 20, 0, 30) < 0) { 529 rcs_print_error("Can`t setup subscription.\n"); 530 subscription_type = CMS_NO_SUBSCRIPTION; 531 } else { 532 serial_number++; 533 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 534 "TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n", 535 socket_fd, serial_number, 536 ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number); 537 memset(temp_buffer, 0, 20); 538 recvd_bytes = 0; 539 if (recvn(socket_fd, temp_buffer, 8, 0, 30, &recvd_bytes) < 0) { 540 rcs_print_error("Can`t setup subscription.\n"); 541 subscription_type = CMS_NO_SUBSCRIPTION; 542 } 543 if (!getbe32(temp_buffer+4)) { 544 rcs_print_error("Can`t setup subscription.\n"); 545 subscription_type = CMS_NO_SUBSCRIPTION; 546 } 547 548 bytes_to_throw_away = 8 - recvd_bytes; 549 if (bytes_to_throw_away < 0 || bytes_to_throw_away > 8) { 550 bytes_to_throw_away = 0; 551 } 552 recvd_bytes = 0; 553 } 554 memset(temp_buffer, 0, 20); 555 } 556 if (subscription_type != CMS_NO_SUBSCRIPTION) { 557 polling = 1; 558 } 559 560 if (polling) { 561 make_tcp_socket_nonblocking(socket_fd); 562 write_socket_fd = socket(AF_INET, SOCK_STREAM, 0); 563 if (write_socket_fd < 0) { 564 rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n", 565 errno, strerror(errno)); 566 status = CMS_CREATE_ERROR; 567 return; 568 } 569 rcs_print_debug(PRINT_CMS_CONFIG_INFO, 570 "Setting socket options . . . \n"); 571 if (set_tcp_socket_options(write_socket_fd) < 0) { 572 return; 573 } 574 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n"); 575 if (bind 576 (write_socket_fd, (struct sockaddr *) &cli_addr, 577 sizeof(cli_addr)) < 0) { 578 rcs_print_error("TCPMEM: bind error %d = %s\n", errno, 579 strerror(errno)); 580 status = CMS_CREATE_ERROR; 581 } 582 rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n"); 583 if (connect 584 (write_socket_fd, (struct sockaddr *) &server_socket_address, 585 sizeof(server_socket_address)) < 0) { 586 if (EINPROGRESS == errno) { 587 FD_ZERO(&fds); 588 FD_SET(write_socket_fd, &fds); 589 start_time = etime(); 590 tm.tv_sec = (long) timeout; 591 tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6); 592 while (!(socket_ret = select(write_socket_fd + 1, 593 (fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) { 594 FD_SET(write_socket_fd, &fds); 595 esleep(0.001); 596 current_time = etime(); 597 double timeleft = start_time + timeout - current_time; 598 if (timeleft <= 0.0 && timeout >= 0.0) { 599 rcs_print_error 600 ("TCPMEM: Timed out waiting for connection.\n"); 601 status = CMS_NO_SERVER_ERROR; 602 return; 603 } 604 tm.tv_sec = (long) timeleft; 605 tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6); 606 } 607 if (-1 == socket_ret) { 608 rcs_print_error("select error: %d -- %s\n", errno, 609 strerror(errno)); 610 rcs_print_error("TCPMEM: Couldn't connect.\n"); 611 status = CMS_NO_SERVER_ERROR; 612 return; 613 } 614 } else { 615 rcs_print_error("connect error: %d -- %s\n", errno, 616 strerror(errno)); 617 rcs_print_error 618 ("TCPMEM: Error trying to connect to TCP port %d of host %s.\n", 619 ntohs(server_socket_address.sin_port), BufferHost); 620 } 621 } 622 timeout = 0; 623 } else { 624 write_socket_fd = read_socket_fd; 625 } 626 reconnect_needed = 0; 627 fatal_error_occurred = 0; 628 629 } 630 631 TCPMEM::~TCPMEM() 632 { 633 disconnect(); 634 } 635 636 void TCPMEM::disconnect() 637 { 638 if (write_socket_fd > 0 && write_socket_fd != socket_fd) { 639 if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) { 640 if (delete_totally) { 641 putbe32(temp_buffer, (uint32_t) serial_number); 642 putbe32(temp_buffer + 4, REMOTE_CMS_CLEAN_REQUEST_TYPE); 643 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 644 sendn(write_socket_fd, temp_buffer, 20, 0, -1); 645 } 646 } 647 close(write_socket_fd); 648 write_socket_fd = 0; 649 } 650 651 if (socket_fd > 0) { 652 if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) { 653 if (delete_totally) { 654 putbe32(temp_buffer, (uint32_t) serial_number); 655 putbe32(temp_buffer + 4, REMOTE_CMS_CLEAN_REQUEST_TYPE); 656 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 657 sendn(socket_fd, temp_buffer, 20, 0, -1); 658 } 659 } 660 close(socket_fd); 661 socket_fd = 0; 662 } 663 } 664 665 CMS_STATUS TCPMEM::handle_old_replies() 666 { 667 long message_size; 668 669 timedout_request_writeid = 0; 670 status = CMS_STATUS_NOT_SET; 671 switch (timedout_request) { 672 case REMOTE_CMS_READ_REQUEST_TYPE: 673 if (!waiting_for_message) { 674 if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 675 0) { 676 if (recvn_timedout) { 677 if (polling) { 678 return status; 679 } else { 680 consecutive_timeouts++; 681 if (consecutive_timeouts > max_consecutive_timeouts && 682 max_consecutive_timeouts > 0) { 683 rcs_print_error 684 ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", 685 consecutive_timeouts); 686 fatal_error_occurred = 1; 687 reconnect_needed = 1; 688 } 689 return (status = CMS_TIMED_OUT); 690 } 691 } else { 692 recvd_bytes = 0; 693 fatal_error_occurred = 1; 694 return (status = CMS_MISC_ERROR); 695 } 696 } 697 recvd_bytes = 0; 698 returned_serial_number = 699 (CMS_STATUS) getbe32(temp_buffer); 700 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 701 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 702 socket_fd, returned_serial_number, buffer_number); 703 if (returned_serial_number != serial_number) { 704 rcs_print_error 705 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 706 returned_serial_number, serial_number); 707 if (subscription_type == CMS_NO_SUBSCRIPTION) { 708 fatal_error_occurred = 1; 709 reconnect_needed = 1; 710 return (status = CMS_MISC_ERROR); 711 } else { 712 serial_number = returned_serial_number; 713 } 714 } 715 message_size = ntohl(*((uint32_t *) temp_buffer + 2)); 716 timedout_request_status = 717 (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 718 timedout_request_writeid = ntohl(*((uint32_t *) temp_buffer + 3)); 719 header.was_read = ntohl(*((uint32_t *) temp_buffer + 4)); 720 if (message_size > max_encoded_message_size) { 721 rcs_print_error("Received message is too big. (%ld > %ld)\n", 722 message_size, max_encoded_message_size); 723 fatal_error_occurred = 1; 724 reconnect_needed = 1; 725 return (status = CMS_INSUFFICIENT_SPACE_ERROR); 726 } 727 } else { 728 message_size = waiting_message_size; 729 } 730 if (message_size > 0) { 731 if (recvn 732 (socket_fd, encoded_data, message_size, 0, timeout, 733 &recvd_bytes) < 0) { 734 if (recvn_timedout) { 735 if (!waiting_for_message) { 736 waiting_message_id = timedout_request_writeid; 737 waiting_message_size = message_size; 738 } 739 waiting_for_message = 1; 740 timedout_request_writeid = 0; 741 if (polling) { 742 return status; 743 } else { 744 consecutive_timeouts++; 745 if (consecutive_timeouts > max_consecutive_timeouts && 746 max_consecutive_timeouts > 0) { 747 rcs_print_error 748 ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", 749 consecutive_timeouts); 750 fatal_error_occurred = 1; 751 reconnect_needed = 1; 752 } 753 return (status = CMS_TIMED_OUT); 754 } 755 } else { 756 recvd_bytes = 0; 757 fatal_error_occurred = 1; 758 reconnect_needed = 1; 759 return (status = CMS_MISC_ERROR); 760 } 761 } 762 recvd_bytes = 0; 763 if (waiting_for_message) { 764 timedout_request_writeid = waiting_message_id; 765 } 766 } 767 break; 768 769 case REMOTE_CMS_WRITE_REQUEST_TYPE: 770 case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE: 771 case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE: 772 case REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE: 773 case REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE: 774 if (timedout_request == REMOTE_CMS_WRITE_REQUEST_TYPE && 775 (min_compatible_version > 2.58 || min_compatible_version < 1e-6 || 776 confirm_write)) { 777 break; 778 } 779 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 780 if (recvn_timedout) { 781 consecutive_timeouts++; 782 if (consecutive_timeouts > max_consecutive_timeouts && 783 max_consecutive_timeouts > 0) { 784 rcs_print_error 785 ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", 786 consecutive_timeouts); 787 reconnect_needed = 1; 788 fatal_error_occurred = 1; 789 } 790 reconnect_needed = 1; 791 return (status = CMS_TIMED_OUT); 792 } else { 793 fatal_error_occurred = 1; 794 reconnect_needed = 1; 795 return (status = CMS_MISC_ERROR); 796 } 797 } 798 recvd_bytes = 0; 799 returned_serial_number = 800 (CMS_STATUS) getbe32(temp_buffer); 801 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 802 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 803 socket_fd, returned_serial_number, buffer_number); 804 if (returned_serial_number != serial_number) { 805 rcs_print_error 806 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 807 returned_serial_number, serial_number); 808 reconnect_needed = 1; 809 if (subscription_type == CMS_NO_SUBSCRIPTION) { 810 return (status = CMS_MISC_ERROR); 811 } 812 } 813 break; 814 815 case REMOTE_CMS_CLEAR_REQUEST_TYPE: 816 if (recvn(socket_fd, temp_buffer, 4, 0, timeout, &recvd_bytes) < 0) { 817 if (recvn_timedout) { 818 consecutive_timeouts++; 819 reconnect_needed = 1; 820 if (consecutive_timeouts > max_consecutive_timeouts && 821 max_consecutive_timeouts > 0) { 822 rcs_print_error 823 ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", 824 consecutive_timeouts); 825 fatal_error_occurred = 1; 826 } 827 return (status = CMS_TIMED_OUT); 828 } else { 829 reconnect_needed = 1; 830 fatal_error_occurred = 1; 831 return (status = CMS_MISC_ERROR); 832 } 833 } 834 recvd_bytes = 0; 835 returned_serial_number = 836 (CMS_STATUS) getbe32(temp_buffer); 837 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 838 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 839 socket_fd, returned_serial_number, buffer_number); 840 if (returned_serial_number != serial_number) { 841 rcs_print_error 842 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 843 returned_serial_number, serial_number); 844 reconnect_needed = 1; 845 if (subscription_type == CMS_NO_SUBSCRIPTION) { 846 return (status = CMS_MISC_ERROR); 847 } 848 } 849 break; 850 851 case NO_REMOTE_CMS_REQUEST: 852 default: 853 break; 854 } 855 if (bytes_to_throw_away > 0) { 856 if (recvn 857 (socket_fd, encoded_data, bytes_to_throw_away, 0, timeout, 858 &recvd_bytes) < 0) { 859 if (recvn_timedout) { 860 consecutive_timeouts++; 861 if (consecutive_timeouts > max_consecutive_timeouts && 862 max_consecutive_timeouts > 0) { 863 rcs_print_error 864 ("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n", 865 consecutive_timeouts); 866 fatal_error_occurred = 1; 867 reconnect_needed = 1; 868 } 869 return (status = CMS_TIMED_OUT); 870 } else { 871 recvd_bytes = 0; 872 fatal_error_occurred = 1; 873 reconnect_needed = 1; 874 return (status = CMS_MISC_ERROR); 875 } 876 } 877 recvd_bytes = 0; 878 } 879 bytes_to_throw_away = 0; 880 timedout_request = NO_REMOTE_CMS_REQUEST; 881 consecutive_timeouts = 0; 882 waiting_for_message = 0; 883 waiting_message_size = 0; 884 waiting_message_id = 0; 885 recvd_bytes = 0; 886 return status; 887 } 888 889 CMS_STATUS TCPMEM::read() 890 { 891 long message_size, id; 892 REMOTE_CMS_REQUEST_TYPE last_timedout_request; 893 894 /* Produce error message if process does not have permission to read. */ 895 if (!read_permission_flag) { 896 rcs_print_error("CMS: %s was not configured to read %s\n", 897 ProcessName, BufferName); 898 return (status = CMS_PERMISSIONS_ERROR); 899 } 900 901 if (reconnect_needed && autoreconnect) { 902 reconnect(); 903 } 904 905 if (reconnect_needed) { 906 return (status = CMS_MISC_ERROR); 907 } 908 disable_sigpipe(); 909 910 if (subscription_type != CMS_NO_SUBSCRIPTION) { 911 set_socket_fds(read_socket_fd); 912 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 913 if (subscription_count < 1) { 914 serial_number++; 915 } 916 handle_old_replies(); 917 check_id(timedout_request_writeid); 918 if (status == CMS_READ_OK) { 919 serial_number++; 920 } 921 subscription_count++; 922 reenable_sigpipe(); 923 return status; 924 } 925 926 if (timedout_request == NO_REMOTE_CMS_REQUEST) { 927 set_socket_fds(read_socket_fd); 928 } 929 if (fatal_error_occurred) { 930 if (status >= 0) { 931 status = CMS_MISC_ERROR; 932 } 933 reenable_sigpipe(); 934 return (status); 935 } 936 if (socket_fd <= 0) { 937 rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n", 938 socket_fd); 939 fatal_error_occurred = 1; 940 reconnect_needed = 1; 941 reenable_sigpipe(); 942 return (status = CMS_MISC_ERROR); 943 } 944 last_timedout_request = timedout_request; 945 if (((int) handle_old_replies()) < 0) { 946 reenable_sigpipe(); 947 return status; 948 } 949 if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) { 950 check_id(timedout_request_writeid); 951 reenable_sigpipe(); 952 return status; 953 } 954 set_socket_fds(read_socket_fd); 955 956 putbe32(temp_buffer, (uint32_t) serial_number); 957 putbe32(temp_buffer + 4, REMOTE_CMS_READ_REQUEST_TYPE); 958 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 959 putbe32(temp_buffer + 12, CMS_READ_ACCESS); 960 putbe32(temp_buffer + 16, in_buffer_id); 961 962 int send_header_size = 20; 963 if (total_subdivisions > 1) { 964 *((uint32_t *) temp_buffer + 5) = htonl((uint32_t) current_subdivision); 965 send_header_size = 24; 966 } 967 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 968 rcs_print_error("TCPMEM: Can't send READ request to server.\n"); 969 reconnect_needed = 1; 970 fatal_error_occurred = 1; 971 reenable_sigpipe(); 972 return (status = CMS_MISC_ERROR); 973 } 974 serial_number++; 975 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 976 "TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n", 977 socket_fd, serial_number, 978 ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number); 979 980 if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 20) { 981 if (recvn_timedout) { 982 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 983 if (polling) { 984 return (status = CMS_READ_OLD); 985 } else { 986 consecutive_timeouts = 1; 987 reenable_sigpipe(); 988 return (status = CMS_TIMED_OUT); 989 } 990 } else { 991 recvd_bytes = 0; 992 reconnect_needed = 1; 993 fatal_error_occurred = 1; 994 reenable_sigpipe(); 995 return (status = CMS_MISC_ERROR); 996 } 997 } 998 recvd_bytes = 0; 999 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1000 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1001 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1002 socket_fd, returned_serial_number, buffer_number); 1003 1004 if (returned_serial_number != serial_number) { 1005 rcs_print_error 1006 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1007 returned_serial_number, serial_number); 1008 reconnect_needed = 1; 1009 if (subscription_type == CMS_NO_SUBSCRIPTION) { 1010 fatal_error_occurred = 1; 1011 reenable_sigpipe(); 1012 return (status = CMS_MISC_ERROR); 1013 } 1014 } 1015 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1016 message_size = ntohl(*((uint32_t *) temp_buffer + 2)); 1017 id = ntohl(*((uint32_t *) temp_buffer + 3)); 1018 header.was_read = ntohl(*((uint32_t *) temp_buffer + 4)); 1019 if (message_size > max_encoded_message_size) { 1020 rcs_print_error("Received message is too big. (%ld > %ld)\n", 1021 message_size, max_encoded_message_size); 1022 fatal_error_occurred = 1; 1023 reconnect_needed = 1; 1024 reenable_sigpipe(); 1025 return (status = CMS_MISC_ERROR); 1026 } 1027 if (message_size > 0) { 1028 if (recvn 1029 (socket_fd, encoded_data, message_size, 0, timeout, 1030 &recvd_bytes) < 0) { 1031 if (recvn_timedout) { 1032 if (!waiting_for_message) { 1033 waiting_message_id = id; 1034 waiting_message_size = message_size; 1035 } 1036 waiting_for_message = 1; 1037 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1038 if (polling) { 1039 reenable_sigpipe(); 1040 return (status = CMS_READ_OLD); 1041 } else { 1042 reenable_sigpipe(); 1043 return (status = CMS_TIMED_OUT); 1044 } 1045 } else { 1046 recvd_bytes = 0; 1047 fatal_error_occurred = 1; 1048 reconnect_needed = 1; 1049 reenable_sigpipe(); 1050 return (status = CMS_MISC_ERROR); 1051 } 1052 } 1053 } 1054 recvd_bytes = 0; 1055 check_id(id); 1056 reenable_sigpipe(); 1057 return (status); 1058 } 1059 1060 CMS_STATUS TCPMEM::blocking_read(double _blocking_timeout) 1061 { 1062 blocking_timeout = _blocking_timeout; 1063 long message_size, id; 1064 REMOTE_CMS_REQUEST_TYPE last_timedout_request; 1065 long timeout_millis; 1066 int orig_print_recvn_timeout_errors = print_recvn_timeout_errors; 1067 print_recvn_timeout_errors = 0; 1068 1069 /* Produce error message if process does not have permission to read. */ 1070 if (!read_permission_flag) { 1071 rcs_print_error("CMS: %s was not configured to read %s\n", 1072 ProcessName, BufferName); 1073 return (status = CMS_PERMISSIONS_ERROR); 1074 } 1075 1076 if (blocking_timeout < 0) { 1077 timeout_millis = -1; 1078 } else { 1079 timeout_millis = (uint32_t) (blocking_timeout * 1000.0); 1080 } 1081 1082 if (reconnect_needed && autoreconnect) { 1083 reconnect(); 1084 } 1085 1086 if (reconnect_needed) { 1087 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1088 return (status = CMS_MISC_ERROR); 1089 } 1090 disable_sigpipe(); 1091 double orig_timeout = timeout; 1092 1093 if (subscription_type != CMS_NO_SUBSCRIPTION) { 1094 if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) { 1095 make_tcp_socket_blocking(read_socket_fd); 1096 timeout = blocking_timeout; 1097 } 1098 set_socket_fds(read_socket_fd); 1099 if (subscription_count < 1) { 1100 serial_number++; 1101 } 1102 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1103 handle_old_replies(); 1104 check_id(timedout_request_writeid); 1105 if (status == CMS_READ_OK) { 1106 serial_number++; 1107 } 1108 subscription_count++; 1109 reenable_sigpipe(); 1110 if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) { 1111 make_tcp_socket_nonblocking(read_socket_fd); 1112 timeout = orig_timeout; 1113 } 1114 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1115 return status; 1116 } 1117 1118 if (timedout_request == NO_REMOTE_CMS_REQUEST) { 1119 set_socket_fds(read_socket_fd); 1120 } 1121 if (fatal_error_occurred) { 1122 if (status >= 0) { 1123 status = CMS_MISC_ERROR; 1124 } 1125 reenable_sigpipe(); 1126 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1127 return (status); 1128 } 1129 if (socket_fd <= 0) { 1130 rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n", 1131 socket_fd); 1132 fatal_error_occurred = 1; 1133 reconnect_needed = 1; 1134 reenable_sigpipe(); 1135 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1136 return (status = CMS_MISC_ERROR); 1137 } 1138 last_timedout_request = timedout_request; 1139 if (((int) handle_old_replies()) < 0) { 1140 reenable_sigpipe(); 1141 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1142 return status; 1143 } 1144 if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) { 1145 check_id(timedout_request_writeid); 1146 reenable_sigpipe(); 1147 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1148 return status; 1149 } 1150 set_socket_fds(read_socket_fd); 1151 1152 putbe32(temp_buffer, (uint32_t) serial_number); 1153 putbe32(temp_buffer + 4, REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE); 1154 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1155 putbe32(temp_buffer + 12, CMS_READ_ACCESS); 1156 putbe32(temp_buffer + 16, (uint32_t) in_buffer_id); 1157 putbe32(temp_buffer + 20, (uint32_t) timeout_millis); 1158 1159 int send_header_size = 24; 1160 if (total_subdivisions > 1) { 1161 putbe32(temp_buffer + 24, (uint32_t) current_subdivision); 1162 send_header_size = 28; 1163 } 1164 if (sendn(socket_fd, temp_buffer, send_header_size, 0, blocking_timeout) < 1165 0) { 1166 rcs_print_error 1167 ("TCPMEM: Can't send BLOCKING_READ request to server.\n"); 1168 reconnect_needed = 1; 1169 fatal_error_occurred = 1; 1170 reenable_sigpipe(); 1171 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1172 return (status = CMS_MISC_ERROR); 1173 } 1174 serial_number++; 1175 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1176 "TCPMEM sending request: fd = %d, serial_number=%ld, " 1177 "request_type=%d, buffer_number=%ld\n", 1178 socket_fd, serial_number, 1179 ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number); 1180 if (recvn(socket_fd, temp_buffer, 20, 0, blocking_timeout, &recvd_bytes) < 1181 0) { 1182 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1183 if (recvn_timedout) { 1184 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1185 if (polling) { 1186 return (status = CMS_READ_OLD); 1187 } else { 1188 consecutive_timeouts = 1; 1189 reenable_sigpipe(); 1190 return (status = CMS_TIMED_OUT); 1191 } 1192 } else { 1193 recvd_bytes = 0; 1194 reconnect_needed = 1; 1195 fatal_error_occurred = 1; 1196 reenable_sigpipe(); 1197 return (status = CMS_MISC_ERROR); 1198 } 1199 } 1200 print_recvn_timeout_errors = orig_print_recvn_timeout_errors; 1201 recvd_bytes = 0; 1202 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1203 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1204 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1205 socket_fd, returned_serial_number, buffer_number); 1206 1207 if (returned_serial_number != serial_number) { 1208 rcs_print_error 1209 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1210 returned_serial_number, serial_number); 1211 reconnect_needed = 1; 1212 if (subscription_type == CMS_NO_SUBSCRIPTION) { 1213 fatal_error_occurred = 1; 1214 reenable_sigpipe(); 1215 return (status = CMS_MISC_ERROR); 1216 } 1217 } 1218 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1219 message_size = ntohl(*((uint32_t *) temp_buffer + 2)); 1220 id = ntohl(*((uint32_t *) temp_buffer + 3)); 1221 header.was_read = ntohl(*((uint32_t *) temp_buffer + 4)); 1222 if (message_size > max_encoded_message_size) { 1223 rcs_print_error("Received message is too big. (%ld > %ld)\n", 1224 message_size, max_encoded_message_size); 1225 fatal_error_occurred = 1; 1226 reconnect_needed = 1; 1227 reenable_sigpipe(); 1228 return (status = CMS_MISC_ERROR); 1229 } 1230 if (message_size > 0) { 1231 if (recvn 1232 (socket_fd, encoded_data, message_size, 0, blocking_timeout, 1233 &recvd_bytes) < 0) { 1234 if (recvn_timedout) { 1235 if (!waiting_for_message) { 1236 waiting_message_id = id; 1237 waiting_message_size = message_size; 1238 } 1239 waiting_for_message = 1; 1240 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1241 if (polling) { 1242 reenable_sigpipe(); 1243 return (status = CMS_READ_OLD); 1244 } else { 1245 reenable_sigpipe(); 1246 return (status = CMS_TIMED_OUT); 1247 } 1248 } else { 1249 recvd_bytes = 0; 1250 fatal_error_occurred = 1; 1251 reconnect_needed = 1; 1252 reenable_sigpipe(); 1253 return (status = CMS_MISC_ERROR); 1254 } 1255 } 1256 } 1257 recvd_bytes = 0; 1258 check_id(id); 1259 reenable_sigpipe(); 1260 return (status); 1261 } 1262 1263 void TCPMEM::reenable_sigpipe() 1264 { 1265 if (old_handler != ((void (*)(int)) SIG_ERR)) { 1266 signal(SIGPIPE, old_handler); 1267 } 1268 old_handler = (void (*)(int)) SIG_ERR; 1269 if (tcpmem_sigpipe_count > sigpipe_count) { 1270 sigpipe_count = tcpmem_sigpipe_count; 1271 reconnect_needed = 1; 1272 } 1273 } 1274 1275 void TCPMEM::disable_sigpipe() 1276 { 1277 if (!autoreconnect) { 1278 return; 1279 } 1280 old_handler = signal(SIGPIPE, tcpmem_sigpipe_handler); 1281 if (tcpmem_sigpipe_count > sigpipe_count) { 1282 sigpipe_count = tcpmem_sigpipe_count; 1283 } 1284 } 1285 1286 CMS_STATUS TCPMEM::peek() 1287 { 1288 /* Produce error message if process does not have permission to read. */ 1289 if (!read_permission_flag) { 1290 rcs_print_error("CMS: %s was not configured to read %s\n", 1291 ProcessName, BufferName); 1292 return (status = CMS_PERMISSIONS_ERROR); 1293 } 1294 1295 if (reconnect_needed && autoreconnect) { 1296 reconnect(); 1297 } 1298 1299 if (reconnect_needed) { 1300 return (status = CMS_MISC_ERROR); 1301 } 1302 disable_sigpipe(); 1303 1304 long message_size, id; 1305 REMOTE_CMS_REQUEST_TYPE last_timedout_request; 1306 if (subscription_type != CMS_NO_SUBSCRIPTION) { 1307 set_socket_fds(read_socket_fd); 1308 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1309 if (subscription_count < 1) { 1310 serial_number++; 1311 } 1312 handle_old_replies(); 1313 check_id(timedout_request_writeid); 1314 if (status == CMS_READ_OK) { 1315 serial_number++; 1316 } 1317 reenable_sigpipe(); 1318 subscription_count++; 1319 return status; 1320 } 1321 1322 if (timedout_request == NO_REMOTE_CMS_REQUEST) { 1323 set_socket_fds(read_socket_fd); 1324 } 1325 1326 if (fatal_error_occurred) { 1327 if (status >= 0) { 1328 status = CMS_MISC_ERROR; 1329 } 1330 reenable_sigpipe(); 1331 return (status); 1332 } 1333 if (socket_fd <= 0) { 1334 reconnect_needed = 1; 1335 rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n", 1336 socket_fd); 1337 reenable_sigpipe(); 1338 return (status = CMS_MISC_ERROR); 1339 } 1340 last_timedout_request = timedout_request; 1341 if (((int) handle_old_replies()) < 0) { 1342 reenable_sigpipe(); 1343 return status; 1344 } 1345 if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) { 1346 check_id(timedout_request_writeid); 1347 reenable_sigpipe(); 1348 return status; 1349 } 1350 set_socket_fds(read_socket_fd); 1351 1352 putbe32(temp_buffer, (uint32_t) serial_number); 1353 putbe32(temp_buffer + 4, REMOTE_CMS_READ_REQUEST_TYPE); 1354 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1355 putbe32(temp_buffer + 12, CMS_PEEK_ACCESS); 1356 putbe32(temp_buffer + 16, (uint32_t) in_buffer_id); 1357 int send_header_size = 20; 1358 if (total_subdivisions > 1) { 1359 *((uint32_t *) temp_buffer + 20) = htonl((uint32_t) current_subdivision); 1360 send_header_size = 24; 1361 } 1362 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1363 rcs_print_error("TCPMEM: Can't send PEEK request to server.\n"); 1364 reconnect_needed = 1; 1365 reenable_sigpipe(); 1366 return (status = CMS_MISC_ERROR); 1367 } 1368 serial_number++; 1369 if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 0) { 1370 if (recvn_timedout) { 1371 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1372 if (polling) { 1373 reenable_sigpipe(); 1374 return (status = CMS_READ_OLD); 1375 } else { 1376 consecutive_timeouts = 1; 1377 reenable_sigpipe(); 1378 return (status = CMS_TIMED_OUT); 1379 } 1380 } else { 1381 recvd_bytes = 0; 1382 fatal_error_occurred = 1; 1383 reconnect_needed = 1; 1384 reenable_sigpipe(); 1385 return (status = CMS_MISC_ERROR); 1386 } 1387 } 1388 recvd_bytes = 0; 1389 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1390 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1391 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1392 socket_fd, returned_serial_number, buffer_number); 1393 1394 if (returned_serial_number != serial_number) { 1395 rcs_print_error 1396 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1397 returned_serial_number, serial_number); 1398 reconnect_needed = 1; 1399 if (subscription_type == CMS_NO_SUBSCRIPTION) { 1400 reenable_sigpipe(); 1401 return (status = CMS_MISC_ERROR); 1402 } 1403 } 1404 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1405 message_size = ntohl(*((uint32_t *) temp_buffer + 2)); 1406 id = ntohl(*((uint32_t *) temp_buffer + 3)); 1407 header.was_read = ntohl(*((uint32_t *) temp_buffer + 4)); 1408 if (message_size > max_encoded_message_size) { 1409 reconnect_needed = 1; 1410 rcs_print_error("Received message is too big. (%ld > %ld)\n", 1411 message_size, max_encoded_message_size); 1412 reenable_sigpipe(); 1413 return (status = CMS_MISC_ERROR); 1414 } 1415 if (message_size > 0) { 1416 if (recvn 1417 (socket_fd, encoded_data, message_size, 0, timeout, 1418 &recvd_bytes) < 0) { 1419 if (recvn_timedout) { 1420 if (!waiting_for_message) { 1421 waiting_message_id = id; 1422 waiting_message_size = message_size; 1423 } 1424 waiting_for_message = 1; 1425 timedout_request = REMOTE_CMS_READ_REQUEST_TYPE; 1426 if (polling) { 1427 reenable_sigpipe(); 1428 return (status = CMS_READ_OLD); 1429 } else { 1430 reenable_sigpipe(); 1431 return (status = CMS_TIMED_OUT); 1432 } 1433 } else { 1434 reconnect_needed = 1; 1435 recvd_bytes = 0; 1436 fatal_error_occurred = 1; 1437 reenable_sigpipe(); 1438 return (status = CMS_MISC_ERROR); 1439 } 1440 } 1441 } 1442 recvd_bytes = 0; 1443 check_id(id); 1444 reenable_sigpipe(); 1445 return (status); 1446 } 1447 1448 CMS_STATUS TCPMEM::write(void *user_data, int *serial_number_out) 1449 { 1450 1451 if (!write_permission_flag) { 1452 rcs_print_error("CMS: %s was not configured to write to %s\n", 1453 ProcessName, BufferName); 1454 return (status = CMS_PERMISSIONS_ERROR); 1455 } 1456 1457 if (reconnect_needed && autoreconnect) { 1458 reconnect(); 1459 } 1460 1461 if (!force_raw) { 1462 user_data = encoded_data; 1463 } 1464 1465 if (reconnect_needed) { 1466 return (status = CMS_MISC_ERROR); 1467 } 1468 1469 if (fatal_error_occurred) { 1470 if (status >= 0) { 1471 status = CMS_MISC_ERROR; 1472 } 1473 return (status); 1474 } 1475 1476 disable_sigpipe(); 1477 1478 if (socket_fd <= 0) { 1479 rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n", 1480 socket_fd); 1481 reenable_sigpipe(); 1482 return (status = CMS_MISC_ERROR); 1483 } 1484 if (((int) handle_old_replies()) < 0) { 1485 reenable_sigpipe(); 1486 return status; 1487 } 1488 set_socket_fds(write_socket_fd); 1489 1490 putbe32(temp_buffer, serial_number); 1491 putbe32(temp_buffer + 4, REMOTE_CMS_WRITE_REQUEST_TYPE); 1492 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1493 putbe32(temp_buffer + 12, CMS_WRITE_ACCESS); 1494 putbe32(temp_buffer + 16, (uint32_t) header.in_buffer_size); 1495 int send_header_size = 20; 1496 if (total_subdivisions > 1) { 1497 putbe32(temp_buffer + 20,(uint32_t) current_subdivision); 1498 send_header_size = 24; 1499 } 1500 if (header.in_buffer_size < 0x2000 - 20 && header.in_buffer_size > 0) { 1501 memcpy(temp_buffer + send_header_size, user_data, 1502 header.in_buffer_size); 1503 if (sendn 1504 (socket_fd, temp_buffer, header.in_buffer_size + send_header_size, 1505 0, timeout) < 0) { 1506 rcs_print_error 1507 ("TCPMEM: Failed to send message of size %ld + header of size %d to the server.\n", 1508 header.in_buffer_size, send_header_size); 1509 reconnect_needed = 1; 1510 reenable_sigpipe(); 1511 return (status = CMS_MISC_ERROR); 1512 } 1513 } else { 1514 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1515 rcs_print_error("TCPMEM: Failed to send header to server.\n"); 1516 reconnect_needed = 1; 1517 reenable_sigpipe(); 1518 return (status = CMS_MISC_ERROR); 1519 } 1520 if (header.in_buffer_size > 0) { 1521 if (sendn(socket_fd, user_data, header.in_buffer_size, 0, timeout) 1522 < 0) { 1523 reconnect_needed = 1; 1524 reenable_sigpipe(); 1525 return (status = CMS_MISC_ERROR); 1526 } 1527 } 1528 } 1529 serial_number++; 1530 if ((min_compatible_version < 2.58 && min_compatible_version > 1e-6) 1531 || confirm_write) { 1532 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1533 if (recvn_timedout) { 1534 timedout_request = REMOTE_CMS_WRITE_REQUEST_TYPE; 1535 consecutive_timeouts = 1; 1536 reenable_sigpipe(); 1537 return (status = CMS_TIMED_OUT); 1538 } else { 1539 recvd_bytes = 0; 1540 reconnect_needed = 1; 1541 fatal_error_occurred = 1; 1542 reenable_sigpipe(); 1543 return (status = CMS_MISC_ERROR); 1544 } 1545 } 1546 recvd_bytes = 0; 1547 returned_serial_number = 1548 (CMS_STATUS) getbe32(temp_buffer); 1549 if(serial_number_out) *serial_number_out = returned_serial_number; 1550 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1551 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1552 socket_fd, returned_serial_number, buffer_number); 1553 1554 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1555 header.was_read = ntohl(*((uint32_t *) temp_buffer + 2)); 1556 header.write_id = returned_serial_number; 1557 } else { 1558 header.was_read = 0; 1559 status = CMS_WRITE_OK; 1560 returned_serial_number = serial_number; 1561 } 1562 reenable_sigpipe(); 1563 return (status); 1564 } 1565 1566 CMS_STATUS TCPMEM::write_if_read(void *user_data, int *serial_number_out) 1567 { 1568 1569 if (!write_permission_flag) { 1570 rcs_print_error("CMS: %s was not configured to write to %s\n", 1571 ProcessName, BufferName); 1572 return (status = CMS_PERMISSIONS_ERROR); 1573 } 1574 1575 if (reconnect_needed && autoreconnect) { 1576 reconnect(); 1577 } 1578 if (!force_raw) { 1579 user_data = encoded_data; 1580 } 1581 1582 if (reconnect_needed) { 1583 return (status = CMS_MISC_ERROR); 1584 } 1585 1586 if (fatal_error_occurred) { 1587 if (status >= 0) { 1588 status = CMS_MISC_ERROR; 1589 } 1590 return (status); 1591 } 1592 disable_sigpipe(); 1593 1594 if (socket_fd <= 0) { 1595 rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n", 1596 socket_fd); 1597 reenable_sigpipe(); 1598 return (status = CMS_MISC_ERROR); 1599 } 1600 if (((int) handle_old_replies()) < 0) { 1601 reenable_sigpipe(); 1602 return status; 1603 } 1604 1605 set_socket_fds(write_socket_fd); 1606 1607 putbe32( temp_buffer, (uint32_t) serial_number); 1608 putbe32( temp_buffer + 4, REMOTE_CMS_WRITE_REQUEST_TYPE); 1609 putbe32( temp_buffer + 8, (uint32_t) buffer_number); 1610 putbe32( temp_buffer + 12, CMS_WRITE_IF_READ_ACCESS); 1611 putbe32( temp_buffer + 16, (uint32_t) header.in_buffer_size); 1612 int send_header_size = 20; 1613 if (total_subdivisions > 1) { 1614 putbe32( temp_buffer + 20, (uint32_t) current_subdivision); 1615 send_header_size = 24; 1616 } 1617 if (header.in_buffer_size < 0x2000 - 20 && header.in_buffer_size > 0) { 1618 memcpy(temp_buffer + 20, user_data, header.in_buffer_size); 1619 if (sendn 1620 (socket_fd, temp_buffer, header.in_buffer_size + send_header_size, 1621 0, timeout) < 0) { 1622 reconnect_needed = 1; 1623 reenable_sigpipe(); 1624 return (status = CMS_MISC_ERROR); 1625 } 1626 } else { 1627 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1628 reconnect_needed = 1; 1629 reenable_sigpipe(); 1630 return (status = CMS_MISC_ERROR); 1631 } 1632 if (header.in_buffer_size > 0) { 1633 if (sendn(socket_fd, user_data, header.in_buffer_size, 0, timeout) 1634 < 0) { 1635 reconnect_needed = 1; 1636 reenable_sigpipe(); 1637 return (status = CMS_MISC_ERROR); 1638 } 1639 } 1640 } 1641 serial_number++; 1642 if ((min_compatible_version < 2.58 && min_compatible_version > 1e-6) || 1643 confirm_write) { 1644 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1645 if (recvn_timedout) { 1646 timedout_request = REMOTE_CMS_WRITE_REQUEST_TYPE; 1647 consecutive_timeouts = 1; 1648 reenable_sigpipe(); 1649 return (status = CMS_TIMED_OUT); 1650 } else { 1651 recvd_bytes = 0; 1652 fatal_error_occurred = 1; 1653 reconnect_needed = 1; 1654 reenable_sigpipe(); 1655 return (status = CMS_MISC_ERROR); 1656 } 1657 } 1658 recvd_bytes = 0; 1659 returned_serial_number = 1660 (CMS_STATUS) getbe32(temp_buffer); 1661 if(serial_number_out) *serial_number_out = returned_serial_number; 1662 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1663 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1664 socket_fd, returned_serial_number, buffer_number); 1665 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1666 header.was_read = ntohl(*((uint32_t *) temp_buffer + 2)); 1667 } else { 1668 header.was_read = 0; 1669 status = CMS_WRITE_OK; 1670 returned_serial_number = 0; 1671 } 1672 reenable_sigpipe(); 1673 return (status); 1674 } 1675 1676 int TCPMEM::check_if_read() 1677 { 1678 if (reconnect_needed && autoreconnect) { 1679 reconnect(); 1680 } 1681 1682 if (reconnect_needed) { 1683 return (status = CMS_MISC_ERROR); 1684 } 1685 1686 if (fatal_error_occurred) { 1687 if (status >= 0) { 1688 status = CMS_MISC_ERROR; 1689 } 1690 return (status); 1691 } 1692 1693 disable_sigpipe(); 1694 1695 if (socket_fd <= 0) { 1696 rcs_print_error 1697 ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n", 1698 socket_fd); 1699 reenable_sigpipe(); 1700 return (status = CMS_MISC_ERROR); 1701 } 1702 if (((int) handle_old_replies()) < 0) { 1703 reenable_sigpipe(); 1704 return 0; 1705 } 1706 1707 set_socket_fds(write_socket_fd); 1708 1709 putbe32(temp_buffer, (uint32_t) serial_number); 1710 putbe32(temp_buffer + 4, REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE); 1711 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1712 int send_header_size = 20; 1713 if (total_subdivisions > 1) { 1714 putbe32(temp_buffer + 12, (uint32_t) current_subdivision); 1715 } 1716 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1717 status = CMS_MISC_ERROR; 1718 reconnect_needed = 1; 1719 reenable_sigpipe(); 1720 return (0); 1721 } 1722 serial_number++; 1723 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1724 if (recvn_timedout) { 1725 timedout_request = REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE; 1726 consecutive_timeouts = 1; 1727 status = CMS_TIMED_OUT; 1728 reenable_sigpipe(); 1729 return 0; 1730 } else { 1731 recvd_bytes = 0; 1732 fatal_error_occurred = 1; 1733 status = CMS_MISC_ERROR; 1734 reenable_sigpipe(); 1735 return 0; 1736 } 1737 } 1738 recvd_bytes = 0; 1739 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1740 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1741 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1742 socket_fd, returned_serial_number, buffer_number); 1743 if (returned_serial_number != serial_number) { 1744 rcs_print_error 1745 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1746 returned_serial_number, serial_number); 1747 reenable_sigpipe(); 1748 return (status = CMS_MISC_ERROR); 1749 } 1750 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1751 header.was_read = ntohl(*((uint32_t *) temp_buffer + 2)); 1752 reenable_sigpipe(); 1753 return (header.was_read); 1754 } 1755 1756 int TCPMEM::get_queue_length() 1757 { 1758 if (reconnect_needed && autoreconnect) { 1759 reconnect(); 1760 } 1761 1762 if (reconnect_needed) { 1763 return (status = CMS_MISC_ERROR); 1764 } 1765 1766 if (fatal_error_occurred) { 1767 if (status >= 0) { 1768 status = CMS_MISC_ERROR; 1769 } 1770 return (status); 1771 } 1772 1773 disable_sigpipe(); 1774 1775 if (socket_fd <= 0) { 1776 rcs_print_error 1777 ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n", 1778 socket_fd); 1779 reenable_sigpipe(); 1780 return (status = CMS_MISC_ERROR); 1781 } 1782 if (((int) handle_old_replies()) < 0) { 1783 reenable_sigpipe(); 1784 return 0; 1785 } 1786 1787 set_socket_fds(write_socket_fd); 1788 1789 putbe32(temp_buffer, (uint32_t) serial_number); 1790 putbe32(temp_buffer + 4, REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE); 1791 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1792 int send_header_size = 20; 1793 if (total_subdivisions > 1) { 1794 putbe32(temp_buffer + 16, (uint32_t) current_subdivision); 1795 } 1796 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1797 status = CMS_MISC_ERROR; 1798 reconnect_needed = 1; 1799 reenable_sigpipe(); 1800 return (0); 1801 } 1802 serial_number++; 1803 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1804 if (recvn_timedout) { 1805 timedout_request = REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE; 1806 consecutive_timeouts = 1; 1807 status = CMS_TIMED_OUT; 1808 reenable_sigpipe(); 1809 return 0; 1810 } else { 1811 recvd_bytes = 0; 1812 fatal_error_occurred = 1; 1813 status = CMS_MISC_ERROR; 1814 reenable_sigpipe(); 1815 return 0; 1816 } 1817 } 1818 recvd_bytes = 0; 1819 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1820 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1821 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1822 socket_fd, returned_serial_number, buffer_number); 1823 if (returned_serial_number != serial_number) { 1824 rcs_print_error 1825 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1826 returned_serial_number, serial_number); 1827 reenable_sigpipe(); 1828 return (status = CMS_MISC_ERROR); 1829 } 1830 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1831 queuing_header.queue_length = ntohl(*((uint32_t *) temp_buffer + 2)); 1832 reenable_sigpipe(); 1833 return (queuing_header.queue_length); 1834 } 1835 1836 int TCPMEM::get_msg_count() 1837 { 1838 if (reconnect_needed && autoreconnect) { 1839 reconnect(); 1840 } 1841 1842 if (reconnect_needed) { 1843 return (status = CMS_MISC_ERROR); 1844 } 1845 1846 if (fatal_error_occurred) { 1847 if (status >= 0) { 1848 status = CMS_MISC_ERROR; 1849 } 1850 return (status); 1851 } 1852 1853 disable_sigpipe(); 1854 1855 if (socket_fd <= 0) { 1856 rcs_print_error 1857 ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n", 1858 socket_fd); 1859 reenable_sigpipe(); 1860 return (status = CMS_MISC_ERROR); 1861 } 1862 if (((int) handle_old_replies()) < 0) { 1863 reenable_sigpipe(); 1864 return 0; 1865 } 1866 1867 set_socket_fds(write_socket_fd); 1868 1869 putbe32(temp_buffer, (uint32_t) serial_number); 1870 putbe32(temp_buffer + 4, REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE); 1871 putbe32(temp_buffer + 8, (uint32_t) buffer_number); 1872 int send_header_size = 20; 1873 if (total_subdivisions > 1) { 1874 putbe32(temp_buffer + 12, (uint32_t) current_subdivision); 1875 } 1876 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1877 status = CMS_MISC_ERROR; 1878 reconnect_needed = 1; 1879 reenable_sigpipe(); 1880 return (0); 1881 } 1882 serial_number++; 1883 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1884 if (recvn_timedout) { 1885 timedout_request = REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE; 1886 consecutive_timeouts = 1; 1887 status = CMS_TIMED_OUT; 1888 reenable_sigpipe(); 1889 return 0; 1890 } else { 1891 recvd_bytes = 0; 1892 fatal_error_occurred = 1; 1893 status = CMS_MISC_ERROR; 1894 reenable_sigpipe(); 1895 return 0; 1896 } 1897 } 1898 recvd_bytes = 0; 1899 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1900 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1901 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1902 socket_fd, returned_serial_number, buffer_number); 1903 if (returned_serial_number != serial_number) { 1904 rcs_print_error 1905 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1906 returned_serial_number, serial_number); 1907 reenable_sigpipe(); 1908 return (status = CMS_MISC_ERROR); 1909 } 1910 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1911 header.write_id = ntohl(*((uint32_t *) temp_buffer + 2)); 1912 reenable_sigpipe(); 1913 return (header.write_id); 1914 } 1915 1916 int TCPMEM::get_space_available() 1917 { 1918 if (reconnect_needed && autoreconnect) { 1919 reconnect(); 1920 } 1921 1922 if (reconnect_needed) { 1923 return (status = CMS_MISC_ERROR); 1924 } 1925 1926 if (fatal_error_occurred) { 1927 if (status >= 0) { 1928 status = CMS_MISC_ERROR; 1929 } 1930 return (status); 1931 } 1932 1933 disable_sigpipe(); 1934 1935 if (socket_fd <= 0) { 1936 rcs_print_error 1937 ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n", 1938 socket_fd); 1939 reenable_sigpipe(); 1940 return (status = CMS_MISC_ERROR); 1941 } 1942 if (((int) handle_old_replies()) < 0) { 1943 reenable_sigpipe(); 1944 return 0; 1945 } 1946 1947 set_socket_fds(write_socket_fd); 1948 1949 putbe32(temp_buffer, (uint32_t) serial_number); 1950 putbe32(temp_buffer + 4, REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE); 1951 putbe32(temp_buffer + 8,buffer_number); 1952 int send_header_size = 20; 1953 if (total_subdivisions > 1) { 1954 putbe32(temp_buffer + 12, current_subdivision); 1955 } 1956 if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) { 1957 status = CMS_MISC_ERROR; 1958 reconnect_needed = 1; 1959 reenable_sigpipe(); 1960 return (0); 1961 } 1962 serial_number++; 1963 if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) { 1964 if (recvn_timedout) { 1965 timedout_request = REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE; 1966 consecutive_timeouts = 1; 1967 status = CMS_TIMED_OUT; 1968 reenable_sigpipe(); 1969 return 0; 1970 } else { 1971 recvd_bytes = 0; 1972 fatal_error_occurred = 1; 1973 status = CMS_MISC_ERROR; 1974 reenable_sigpipe(); 1975 return 0; 1976 } 1977 } 1978 recvd_bytes = 0; 1979 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 1980 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 1981 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 1982 socket_fd, returned_serial_number, buffer_number); 1983 if (returned_serial_number != serial_number) { 1984 rcs_print_error 1985 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 1986 returned_serial_number, serial_number); 1987 reenable_sigpipe(); 1988 return (status = CMS_MISC_ERROR); 1989 } 1990 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 1991 free_space = ntohl(*((uint32_t *) temp_buffer + 2)); 1992 reenable_sigpipe(); 1993 return (free_space); 1994 } 1995 1996 CMS_STATUS TCPMEM::clear() 1997 { 1998 if (reconnect_needed && autoreconnect) { 1999 reconnect(); 2000 } 2001 2002 if (reconnect_needed) { 2003 return (status = CMS_MISC_ERROR); 2004 } 2005 2006 if (fatal_error_occurred) { 2007 if (status >= 0) { 2008 status = CMS_MISC_ERROR; 2009 } 2010 return (status); 2011 } 2012 if (socket_fd <= 0) { 2013 rcs_print_error("TCPMEM::clear: Invalid socket descriptor. (%d)\n", 2014 socket_fd); 2015 reconnect_needed = 1; 2016 return (status = CMS_MISC_ERROR); 2017 } 2018 if (((int) handle_old_replies()) < 0) { 2019 return status; 2020 } 2021 2022 set_socket_fds(write_socket_fd); 2023 2024 putbe32(temp_buffer, (uint32_t) serial_number); 2025 putbe32(temp_buffer + 4, REMOTE_CMS_CLEAR_REQUEST_TYPE); 2026 putbe32(temp_buffer + 8, buffer_number); 2027 putbe32(temp_buffer + 12, current_subdivision); 2028 2029 if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) { 2030 reconnect_needed = 1; 2031 return (status = CMS_MISC_ERROR); 2032 } 2033 serial_number++; 2034 if (recvn(socket_fd, temp_buffer, 8, 0, timeout, &recvd_bytes) < 0) { 2035 if (recvn_timedout) { 2036 timedout_request = REMOTE_CMS_CLEAR_REQUEST_TYPE; 2037 consecutive_timeouts = 1; 2038 return (status = CMS_TIMED_OUT); 2039 } else { 2040 fatal_error_occurred = 1; 2041 reconnect_needed = 1; 2042 return (status = CMS_MISC_ERROR); 2043 } 2044 } 2045 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 2046 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 2047 "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n", 2048 socket_fd, returned_serial_number, buffer_number); 2049 2050 if (returned_serial_number != serial_number) { 2051 rcs_print_error 2052 ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n", 2053 returned_serial_number, serial_number); 2054 reconnect_needed = 1; 2055 return (status = CMS_MISC_ERROR); 2056 } 2057 status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1)); 2058 header.was_read = ntohl(*((uint32_t *) temp_buffer + 2)); 2059 return (status); 2060 } 2061 /*! \todo Another #if 0 */ 2062 #if 0 2063 int TCPMEM::login(const char *name, const char *passwd) 2064 { 2065 if (fatal_error_occurred) { 2066 if (status >= 0) { 2067 status = CMS_MISC_ERROR; 2068 } 2069 return (status); 2070 } 2071 if (socket_fd <= 0) { 2072 rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n", 2073 socket_fd); 2074 return (status = CMS_MISC_ERROR); 2075 } 2076 int handle_old_reply_ret = 0; 2077 2078 while (timedout_request != NO_REMOTE_CMS_REQUEST && !handle_old_reply_ret) { 2079 handle_old_reply_ret = handle_old_replies(); 2080 } 2081 if (handle_old_reply_ret < 0) { 2082 return 0; 2083 } 2084 set_socket_fds(write_socket_fd); 2085 *((uint32_t *) temp_buffer) = htonl((uint32_t) serial_number); 2086 *((uint32_t *) temp_buffer + 1) = 2087 htonl((uint32_t) REMOTE_CMS_GET_KEYS_REQUEST_TYPE); 2088 *((uint32_t *) temp_buffer + 2) = htonl((uint32_t) buffer_number); 2089 if (sendn(socket_fd, temp_buffer, 20, 0, 30.0) < 0) { 2090 return 0; 2091 } 2092 memset(temp_buffer, 0, 20); 2093 strncpy(((char *) temp_buffer), name, 16); 2094 if (sendn(socket_fd, temp_buffer, 16, 0, 30.0) < 0) { 2095 return (status = CMS_MISC_ERROR); 2096 } 2097 serial_number++; 2098 if (recvn(socket_fd, temp_buffer, 20, 0, 30.0, &recvd_bytes) < 0) { 2099 return 0; 2100 } 2101 recvd_bytes = 0; 2102 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 2103 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 2104 "TCPMEM received_reply: fd = %d, serial_number=%d, buffer_number=%d\n", 2105 socket_fd, returned_serial_number, buffer_number); 2106 if (returned_serial_number != serial_number) { 2107 rcs_print_error 2108 ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", 2109 returned_serial_number, serial_number); 2110 return (0); 2111 } 2112 char *crypt1_ret = crypt(passwd, ((char *) temp_buffer) + 4); 2113 if (NULL == crypt1_ret) { 2114 rcs_print_error("TCPMEM::login() crypt function failed.\n"); 2115 return 0; 2116 } 2117 char passwd_pass1[16]; 2118 strncpy(passwd_pass1, crypt1_ret, 16); 2119 char *crypt2_ret = crypt(passwd_pass1, ((char *) temp_buffer) + 12); 2120 if (NULL == crypt2_ret) { 2121 rcs_print_error("TCPMEM::login() crypt function failed.\n"); 2122 return (0); 2123 } 2124 char passwd_pass2[16]; 2125 strncpy(passwd_pass2, crypt2_ret, 16); 2126 2127 *((uint32_t *) temp_buffer) = htonl((uint32_t) serial_number); 2128 *((uint32_t *) temp_buffer + 1) = 2129 htonl((uint32_t) REMOTE_CMS_LOGIN_REQUEST_TYPE); 2130 *((uint32_t *) temp_buffer + 2) = htonl((uint32_t) buffer_number); 2131 if (sendn(socket_fd, temp_buffer, 20, 0, 30.0) < 0) { 2132 return 0; 2133 } 2134 memset(temp_buffer, 0, 20); 2135 strncpy(((char *) temp_buffer), name, 16); 2136 if (sendn(socket_fd, temp_buffer, 16, 0, 30.0) < 0) { 2137 return (status = CMS_MISC_ERROR); 2138 } 2139 if (sendn(socket_fd, passwd_pass2, 16, 0, 30.0) < 0) { 2140 return (status = CMS_MISC_ERROR); 2141 } 2142 serial_number++; 2143 if (recvn(socket_fd, temp_buffer, 8, 0, 30.0, &recvd_bytes) < 0) { 2144 return 0; 2145 } 2146 recvd_bytes = 0; 2147 returned_serial_number = (CMS_STATUS) getbe32(temp_buffer); 2148 rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS, 2149 "TCPMEM received_reply: fd = %d, serial_number=%d, buffer_number=%d\n", 2150 socket_fd, returned_serial_number, buffer_number); 2151 if (returned_serial_number != serial_number) { 2152 rcs_print_error 2153 ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n", 2154 returned_serial_number, serial_number); 2155 return (status = CMS_MISC_ERROR); 2156 } 2157 int success = ntohl(*((uint32_t *) temp_buffer + 1)); 2158 return (success); 2159 } 2160 #endif 2161 2162 void TCPMEM::set_socket_fds(int new_fd) 2163 { 2164 if (socket_fd == read_socket_fd) { 2165 read_serial_number = serial_number; 2166 } 2167 if (socket_fd == write_socket_fd) { 2168 write_serial_number = serial_number; 2169 } 2170 socket_fd = new_fd; 2171 if (socket_fd == read_socket_fd) { 2172 serial_number = read_serial_number; 2173 } 2174 if (socket_fd == write_socket_fd) { 2175 serial_number = write_serial_number; 2176 } 2177 }