/ src / libnml / buffer / tcpmem.cc
tcpmem.cc
   1  /********************************************************************
   2  * Description: tcpmem.cc
   3  *
   4  *   Derived from a work by Fred Proctor & Will Shackleford
   5  *
   6  * Author:
   7  * License: GPL Version 2
   8  * System: Linux
   9  *    
  10  * Copyright (c) 2004 All rights reserved.
  11  *
  12  * Last change: 
  13  ********************************************************************/
  14  
  15  #if defined(__GNUC__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 4)))
  16  #pragma GCC optimize "-fno-strict-aliasing"
  17  #pragma GCC diagnostic ignored "-Wstrict-aliasing"
  18  #endif
  19  
  20  #ifdef __cplusplus
  21  extern "C" {
  22  #endif
  23  #include <stdlib.h>		// strtol()
  24  #include <unistd.h>
  25  #include <string.h>		// strstr()
  26  #include <errno.h>		// errno, strerror()
  27  #include <signal.h>		// signal, SIG_ERR, SIGPIPE
  28  #include <ctype.h>		// isdigit()
  29  #include <arpa/inet.h>		/* inet_ntoa */
  30  #include <sys/socket.h>
  31  #include <sys/time.h>           /* struct timeval */
  32  #include <netinet/in.h>         /* sockaddr_in */
  33  #include <netdb.h>
  34  #include <math.h>		/* fmod() */
  35  
  36  #ifdef __cplusplus
  37  }
  38  #endif
  39  #include "rem_msg.hh"		/* REMOTE_CMS_READ_REQUEST_TYPE, etc. */
  40  #include "rcs_print.hh"		/* rcs_print_error() */
  41  #include "cmsdiag.hh"
  42  #define DEFAULT_MAX_CONSECUTIVE_TIMEOUTS (-1)
  43  #include "timer.hh"		/* esleep() */
  44  #include "tcpmem.hh"
  45  #include "recvn.h"		/* recvn() */
  46  #include "sendn.h"		/* sendn() */
  47  #include "tcp_opts.hh"		/* SET_TCP_NODELAY */
  48  #include "linklist.hh"          /* LinkedList */
  49  
  50  int tcpmem_sigpipe_count = 0;
  51  int last_sig = 0;
  52  
  53  void tcpmem_sigpipe_handler(int sig)
  54  {
  55      last_sig = sig;
  56      tcpmem_sigpipe_count++;
  57  }
  58  
  59  TCPMEM::TCPMEM(const char *_bufline, const char *_procline):CMS(_bufline, _procline)
  60  {
  61      max_consecutive_timeouts = DEFAULT_MAX_CONSECUTIVE_TIMEOUTS;
  62      char *max_consecutive_timeouts_string;
  63      max_consecutive_timeouts_string = strstr(ProcessLine, "max_timeouts=");
  64      polling = (NULL != strstr(proclineupper, "POLL"));
  65      socket_fd = 0;
  66      reconnect_needed = 0;
  67      autoreconnect = 1;
  68      old_handler = (void (*)(int)) SIG_ERR;
  69      sigpipe_count = 0;
  70      subscription_count = 0;
  71      read_serial_number = 0;
  72      write_serial_number = 0;
  73      read_socket_fd = 0;
  74      write_socket_fd = 0;
  75      if (NULL != max_consecutive_timeouts_string) {
  76  	max_consecutive_timeouts_string += strlen("max_timeouts=");
  77  	if (!strncmp(max_consecutive_timeouts_string, "INF", 3)) {
  78  	    max_consecutive_timeouts = -1;
  79  	} else {
  80  	    max_consecutive_timeouts =
  81  		strtol(max_consecutive_timeouts_string, (char **) NULL, 0);
  82  	}
  83      }
  84  
  85      char *sub_info_string = NULL;
  86      poll_interval_millis = 30000;
  87      subscription_type = CMS_NO_SUBSCRIPTION;
  88      sub_info_string = strstr(ProcessLine, "sub=");
  89      if (NULL != sub_info_string) {
  90  	if (!strncmp(sub_info_string + 4, "none", 4)) {
  91  	    subscription_type = CMS_NO_SUBSCRIPTION;
  92  	} else if (!strncmp(sub_info_string + 4, "var", 3)) {
  93  	    subscription_type = CMS_VARIABLE_SUBSCRIPTION;
  94  	} else {
  95  	    poll_interval_millis =
  96  		((int) (atof(sub_info_string + 4) * 1000.0));
  97  	    subscription_type = CMS_POLLED_SUBSCRIPTION;
  98  	}
  99      }
 100      if (NULL != strstr(ProcessLine, "noreconnect")) {
 101  	autoreconnect = 0;
 102      }
 103      server_host_entry = NULL;
 104  
 105      /* Set up the socket address stucture. */
 106      memset(&server_socket_address, 0, sizeof(server_socket_address));
 107      server_socket_address.sin_family = AF_INET;
 108      server_socket_address.sin_port = htons(((u_short) tcp_port_number));
 109      int hostname_was_address = 0;
 110      char bufferhost_first_char = BufferHost[0];
 111      if (bufferhost_first_char >= '0' && bufferhost_first_char <= '9') {
 112  	server_socket_address.sin_addr.s_addr = inet_addr(BufferHost);
 113  	if (server_socket_address.sin_addr.s_addr != INADDR_NONE) {
 114  	    hostname_was_address = 1;
 115  	}
 116      }
 117  
 118      if (!hostname_was_address) {
 119  	/* Get the IP address of the server using it's BufferHost. */
 120  	server_host_entry = gethostbyname(BufferHost);
 121  	if (NULL == server_host_entry) {
 122  	    status = CMS_CONFIG_ERROR;
 123  	    autoreconnect = 0;
 124  	    rcs_print_error("TCPMEM: Couldn't get host address for (%s).\n",
 125  		BufferHost);
 126  	    return;
 127  	}
 128  	server_socket_address.sin_addr.s_addr =
 129  	    *((int *) server_host_entry->h_addr_list[0]);
 130  	server_socket_address.sin_family = server_host_entry->h_addrtype;
 131      }
 132      rcs_print_debug(PRINT_CMS_CONFIG_INFO,
 133  	"Using server on %s with IP address %s and port %d.\n",
 134  	BufferHost,
 135  	inet_ntoa(server_socket_address.sin_addr), tcp_port_number);
 136  
 137      reconnect();
 138  
 139      if (status >= 0 &&
 140  	(min_compatible_version > 2.58 || min_compatible_version < 1e-6)) {
 141  	verify_bufname();
 142  	if (status < 0) {
 143  	    rcs_print_error("TCPMEM: verify_bufname() failed.\n");
 144  	}
 145      }
 146  
 147      if (status >= 0 && enable_diagnostics &&
 148  	(min_compatible_version > 3.71 || min_compatible_version < 1e-6)) {
 149  	send_diag_info();
 150      }
 151  }
 152  
 153  static void put32(char *addr, uint32_t val) {
 154      memcpy(addr, &val, sizeof(val));
 155  }
 156  
 157  static void putbe32(char *addr, uint32_t val) {
 158      val = htonl(val);
 159      memcpy(addr, &val, sizeof(val));
 160  }
 161  
 162  static uint32_t getbe32(char *addr) {
 163      uint32_t val;
 164      memcpy(&val, addr, sizeof(val));
 165      return ntohl(val);
 166  }
 167  
 168  void
 169    TCPMEM::send_diag_info()
 170  {
 171      if (polling) {
 172  	return;
 173      }
 174      if (NULL == dpi) {
 175  	return;
 176      }
 177      disable_sigpipe();
 178  
 179      set_socket_fds(read_socket_fd);
 180      memset(diag_info_buf, 0, 88);
 181      putbe32(diag_info_buf, (uint32_t)serial_number);
 182      putbe32(diag_info_buf + 4, REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE);
 183      putbe32(diag_info_buf + 8, buffer_number);
 184      strncpy(diag_info_buf + 20, dpi->name, 16);
 185      strncpy(diag_info_buf + 36, dpi->host_sysinfo, 32);
 186      putbe32(diag_info_buf + 68, (uint32_t) dpi->pid);
 187      putbe32(diag_info_buf + 72, (uint32_t) connection_number);
 188      memcpy(diag_info_buf + 76, &(dpi->rcslib_ver), 8);
 189      put32(diag_info_buf + 84, (uint32_t) 0x11223344);
 190      if (sendn(socket_fd, diag_info_buf, 88, 0, timeout) < 0) {
 191  	reconnect_needed = 1;
 192  	fatal_error_occurred = 1;
 193  	reenable_sigpipe();
 194  	status = CMS_MISC_ERROR;
 195  	return;
 196      }
 197      serial_number++;
 198      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 199  	"TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n",
 200  	socket_fd, serial_number,
 201  	ntohl(*((uint32_t *) diag_info_buf + 1)), buffer_number);
 202      reenable_sigpipe();
 203  
 204  }
 205  
 206  void TCPMEM::verify_bufname()
 207  {
 208      if (polling) {
 209  	return;
 210      }
 211      disable_sigpipe();
 212  
 213      set_socket_fds(read_socket_fd);
 214  
 215      putbe32(temp_buffer, (uint32_t) serial_number);
 216      putbe32(temp_buffer + 4, REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE);
 217      putbe32(temp_buffer + 8, buffer_number);
 218      if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) {
 219  	reconnect_needed = 1;
 220  	fatal_error_occurred = 1;
 221  	reenable_sigpipe();
 222  	status = CMS_MISC_ERROR;
 223  	return;
 224      }
 225      serial_number++;
 226      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 227  	"TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n",
 228  	socket_fd, serial_number,
 229  	ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number);
 230      if (recvn(socket_fd, temp_buffer, 40, 0, timeout, &recvd_bytes) < 0) {
 231  	if (recvn_timedout) {
 232  	    bytes_to_throw_away = 40;
 233  	    return;
 234  	}
 235      }
 236      returned_serial_number = getbe32(temp_buffer);
 237      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 238  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
 239  	socket_fd, returned_serial_number, buffer_number);
 240      if (returned_serial_number != serial_number) {
 241  	rcs_print_error
 242  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
 243  	    returned_serial_number, serial_number);
 244  	reconnect_needed = 1;
 245  	fatal_error_occurred = 1;
 246  	reenable_sigpipe();
 247  	status = CMS_MISC_ERROR;
 248  	return;
 249      }
 250      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
 251      if (status < 0) {
 252  	return;
 253      }
 254      if (strncmp(temp_buffer + 8, BufferName, 31)) {
 255  	rcs_print_error
 256  	    ("TCPMEM: The buffer (%s) is registered on TCP port %d of host %s with buffer number %ld.\n",
 257  	    ((char *) temp_buffer + 8), tcp_port_number, BufferHost,
 258  	    buffer_number);
 259  	rcs_print_error
 260  	    ("TCPMEM: However, this process (%s) is attempting to connect to the buffer %s at the same location.\n",
 261  	    ProcessName, BufferName);
 262  	status = CMS_RESOURCE_CONFLICT_ERROR;
 263  	return;
 264      }
 265      reenable_sigpipe();
 266  }
 267  
 268  CMS_DIAGNOSTICS_INFO *TCPMEM::get_diagnostics_info()
 269  {
 270      if (polling) {
 271  	return (NULL);
 272      }
 273      disable_sigpipe();
 274  
 275      if (((int) handle_old_replies()) < 0) {
 276  	reenable_sigpipe();
 277  	return (NULL);
 278      }
 279  
 280      set_socket_fds(read_socket_fd);
 281  
 282      putbe32(temp_buffer, serial_number);
 283      putbe32(temp_buffer + 4, REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE);
 284      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
 285      if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) {
 286  	reconnect_needed = 1;
 287  	fatal_error_occurred = 1;
 288  	reenable_sigpipe();
 289  	status = CMS_MISC_ERROR;
 290  	return (NULL);
 291      }
 292      memset(temp_buffer, 0, 0x2000);
 293      serial_number++;
 294      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 295  	"TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n",
 296  	socket_fd, serial_number,
 297  	ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number);
 298      if (recvn(socket_fd, temp_buffer, 32, 0, -1.0, &recvd_bytes) < 0) {
 299  	if (recvn_timedout) {
 300  	    bytes_to_throw_away = 32;
 301  	}
 302  	return (NULL);
 303      }
 304      recvd_bytes = 0;
 305      returned_serial_number = getbe32(temp_buffer);
 306      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 307  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
 308  	socket_fd, returned_serial_number, buffer_number);
 309      if (returned_serial_number != serial_number) {
 310  	rcs_print_error
 311  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
 312  	    returned_serial_number, serial_number);
 313  	reconnect_needed = 1;
 314  	fatal_error_occurred = 1;
 315  	reenable_sigpipe();
 316  	status = CMS_MISC_ERROR;
 317  	return (NULL);
 318      }
 319      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
 320      if (status < 0) {
 321  	return (NULL);
 322      }
 323      if (NULL == di) {
 324  	di = new CMS_DIAGNOSTICS_INFO();
 325  	di->dpis = new LinkedList();
 326      } else {
 327  	di->dpis->delete_members();
 328      }
 329      di->last_writer_dpi = NULL;
 330      di->last_reader_dpi = NULL;
 331      di->last_writer = ntohl(*((uint32_t *) temp_buffer + 2));
 332      di->last_reader = ntohl(*((uint32_t *) temp_buffer + 3));
 333      double server_time;
 334      memcpy(&server_time, temp_buffer + 16, 8);
 335      double local_time = etime();
 336      double diff_time = local_time - server_time;
 337      int dpi_count = ntohl(*((uint32_t *) temp_buffer + 6));
 338      int dpi_max_size = ntohl(*((uint32_t *) temp_buffer + 7));
 339      if (dpi_max_size > 32 && dpi_max_size < 0x2000) {
 340  	if (recvn
 341  	    (socket_fd, temp_buffer + 32, dpi_max_size - 32, 0, -1.0,
 342  		&recvd_bytes) < 0) {
 343  	    if (recvn_timedout) {
 344  		bytes_to_throw_away = dpi_max_size - 32;
 345  		return (NULL);
 346  	    }
 347  	}
 348  	recvd_bytes = 0;
 349  	int dpi_offset = 32;
 350  	CMS_DIAG_PROC_INFO cms_dpi;
 351  	for (int i = 0; i < dpi_count && dpi_offset < dpi_max_size; i++) {
 352  	    memset(&cms_dpi, 0, sizeof(CMS_DIAG_PROC_INFO));
 353  	    memcpy(cms_dpi.name, temp_buffer + dpi_offset, 16);
 354  	    dpi_offset += 16;
 355  	    memcpy(cms_dpi.host_sysinfo, temp_buffer + dpi_offset, 32);
 356  	    dpi_offset += 32;
 357  	    cms_dpi.pid =
 358  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 359  	    dpi_offset += 4;
 360  	    memcpy(&(cms_dpi.rcslib_ver), temp_buffer + dpi_offset, 8);
 361  	    dpi_offset += 8;
 362  	    cms_dpi.access_type = (CMS_INTERNAL_ACCESS_TYPE)
 363  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 364  	    dpi_offset += 4;
 365  	    cms_dpi.msg_id =
 366  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 367  	    dpi_offset += 4;
 368  	    cms_dpi.msg_size =
 369  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 370  	    dpi_offset += 4;
 371  	    cms_dpi.msg_type =
 372  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 373  	    dpi_offset += 4;
 374  	    cms_dpi.number_of_accesses =
 375  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 376  	    dpi_offset += 4;
 377  	    cms_dpi.number_of_new_messages =
 378  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 379  	    dpi_offset += 4;
 380  	    memcpy(&(cms_dpi.bytes_moved), temp_buffer + dpi_offset, 8);
 381  	    dpi_offset += 8;
 382  	    memcpy(&(cms_dpi.bytes_moved_across_socket),
 383  		temp_buffer + dpi_offset, 8);
 384  	    dpi_offset += 8;
 385  	    memcpy(&(cms_dpi.last_access_time), temp_buffer + dpi_offset, 8);
 386  	    if (cmsdiag_timebias_set) {
 387  		cms_dpi.last_access_time += diff_time - cmsdiag_timebias;
 388  	    }
 389  	    dpi_offset += 8;
 390  	    memcpy(&(cms_dpi.first_access_time), temp_buffer + dpi_offset, 8);
 391  	    if (cmsdiag_timebias_set) {
 392  		cms_dpi.first_access_time += diff_time - cmsdiag_timebias;
 393  	    }
 394  	    dpi_offset += 8;
 395  	    memcpy(&(cms_dpi.min_difference), temp_buffer + dpi_offset, 8);
 396  	    dpi_offset += 8;
 397  	    memcpy(&(cms_dpi.max_difference), temp_buffer + dpi_offset, 8);
 398  	    dpi_offset += 8;
 399  	    di->dpis->store_at_tail(&cms_dpi, sizeof(CMS_DIAG_PROC_INFO), 1);
 400  	    int is_last_writer =
 401  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 402  	    dpi_offset += 4;
 403  	    if (is_last_writer) {
 404  		di->last_writer_dpi =
 405  		    (CMS_DIAG_PROC_INFO *) di->dpis->get_tail();
 406  	    }
 407  	    int is_last_reader =
 408  		ntohl(*((uint32_t *) ((char *) temp_buffer + dpi_offset)));
 409  	    dpi_offset += 4;
 410  	    if (is_last_reader) {
 411  		di->last_reader_dpi =
 412  		    (CMS_DIAG_PROC_INFO *) di->dpis->get_tail();
 413  	    }
 414  	}
 415      }
 416      reenable_sigpipe();
 417      return di;
 418  }
 419  
 420  void TCPMEM::reconnect()
 421  {
 422      if (socket_fd > 0) {
 423  	disconnect();
 424      }
 425      subscription_count = 0;
 426      timedout_request = NO_REMOTE_CMS_REQUEST;
 427      bytes_to_throw_away = 0;
 428      recvd_bytes = 0;
 429      socket_fd = 0;
 430      waiting_for_message = 0;
 431      waiting_message_size = 0;
 432      waiting_message_id = 0;
 433      serial_number = 0;
 434  
 435      rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Creating socket . . .\n");
 436  
 437      socket_fd = socket(AF_INET, SOCK_STREAM, 0);
 438      if (socket_fd < 0) {
 439  	rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n",
 440  	    errno, strerror(errno));
 441  
 442  	status = CMS_CREATE_ERROR;
 443  	return;
 444      }
 445      rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Setting socket options . . . \n");
 446      if (set_tcp_socket_options(socket_fd) < 0) {
 447  	return;
 448      }
 449      struct timeval tm;
 450      int socket_ret;
 451      double start_time, current_time;
 452      fd_set fds;
 453      sockaddr_in cli_addr;
 454      cli_addr.sin_family = AF_INET;
 455      cli_addr.sin_addr.s_addr = htonl(INADDR_ANY);
 456      cli_addr.sin_port = htons(0);
 457      rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n");
 458      if (bind(socket_fd, (struct sockaddr *) &cli_addr, sizeof(cli_addr)) < 0) {
 459  	rcs_print_error("TCPMEM: bind error %d = %s\n", errno,
 460  	    strerror(errno));
 461  	status = CMS_CREATE_ERROR;
 462      }
 463      rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n");
 464      if (connect(socket_fd, (struct sockaddr *) &server_socket_address,
 465  	    sizeof(server_socket_address)) < 0) {
 466  	if (EINPROGRESS == errno) {
 467  
 468  	    tm.tv_sec = (long) timeout;
 469  	    tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6);
 470  	    FD_ZERO(&fds);
 471  	    FD_SET(socket_fd, &fds);
 472  	    start_time = etime();
 473  	    while (!(socket_ret = select(socket_fd + 1,
 474  			(fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) {
 475  		FD_SET(socket_fd, &fds);
 476  		esleep(0.001);
 477  		current_time = etime();
 478  		double timeleft = start_time + timeout - current_time;
 479  		if (timeleft <= 0.0 && timeout >= 0.0) {
 480  		    if (!reconnect_needed) {
 481  			rcs_print_error
 482  			    ("TCPMEM: Timed out waiting for connection.\n");
 483  		    }
 484  		    status = CMS_NO_SERVER_ERROR;
 485  		    return;
 486  		}
 487  		tm.tv_sec = (long) timeleft;
 488  		tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6);
 489  	    }
 490  
 491  	    if (-1 == socket_ret) {
 492  		rcs_print_error("select error: %d -- %s\n", errno,
 493  		    strerror(errno));
 494  		rcs_print_error("TCPMEM: Couldn't connect.\n");
 495  		status = CMS_NO_SERVER_ERROR;
 496  		return;
 497  	    }
 498  	} else {
 499  	    rcs_print_error("connect error: %d -- %s\n", errno,
 500  		strerror(errno));
 501  	    rcs_print_error
 502  		("TCPMEM: Error trying to connect to TCP port %d of host %s(%s). sin_family=%d\n",
 503  		ntohs(server_socket_address.sin_port), BufferHost,
 504  		inet_ntoa(server_socket_address.sin_addr),
 505  		server_socket_address.sin_family);
 506  	    status = CMS_NO_SERVER_ERROR;
 507  	    return;
 508  	}
 509      }
 510      read_socket_fd = socket_fd;
 511  
 512      memset(temp_buffer, 0, 32);
 513      if (total_subdivisions > 1) {
 514  	subscription_type = CMS_NO_SUBSCRIPTION;
 515      }
 516  
 517      if (subscription_type != CMS_NO_SUBSCRIPTION) {
 518  	verify_bufname();
 519  	if (status < 0) {
 520  	    rcs_print_error("TCPMEM: verify_bufname() failed\n");
 521  	    return;
 522  	}
 523  	putbe32(temp_buffer, (uint32_t) serial_number);
 524  	putbe32(temp_buffer + 4, REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE);
 525  	putbe32(temp_buffer + 8, (uint32_t) buffer_number);
 526  	putbe32(temp_buffer + 12, (uint32_t) subscription_type);
 527  	putbe32(temp_buffer + 16, (uint32_t) poll_interval_millis);
 528  	if (sendn(socket_fd, temp_buffer, 20, 0, 30) < 0) {
 529  	    rcs_print_error("Can`t setup subscription.\n");
 530  	    subscription_type = CMS_NO_SUBSCRIPTION;
 531  	} else {
 532  	    serial_number++;
 533  	    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 534  		"TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n",
 535  		socket_fd, serial_number,
 536  		ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number);
 537  	    memset(temp_buffer, 0, 20);
 538  	    recvd_bytes = 0;
 539  	    if (recvn(socket_fd, temp_buffer, 8, 0, 30, &recvd_bytes) < 0) {
 540  		rcs_print_error("Can`t setup subscription.\n");
 541  		subscription_type = CMS_NO_SUBSCRIPTION;
 542  	    }
 543  	    if (!getbe32(temp_buffer+4)) {
 544  		rcs_print_error("Can`t setup subscription.\n");
 545  		subscription_type = CMS_NO_SUBSCRIPTION;
 546  	    }
 547  
 548  	    bytes_to_throw_away = 8 - recvd_bytes;
 549  	    if (bytes_to_throw_away < 0 || bytes_to_throw_away > 8) {
 550  		bytes_to_throw_away = 0;
 551  	    }
 552  	    recvd_bytes = 0;
 553  	}
 554  	memset(temp_buffer, 0, 20);
 555      }
 556      if (subscription_type != CMS_NO_SUBSCRIPTION) {
 557  	polling = 1;
 558      }
 559  
 560      if (polling) {
 561  	make_tcp_socket_nonblocking(socket_fd);
 562  	write_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
 563  	if (write_socket_fd < 0) {
 564  	    rcs_print_error("TCPMEM: Error from socket() (errno = %d:%s)\n",
 565  		errno, strerror(errno));
 566  	    status = CMS_CREATE_ERROR;
 567  	    return;
 568  	}
 569  	rcs_print_debug(PRINT_CMS_CONFIG_INFO,
 570  	    "Setting socket options . . . \n");
 571  	if (set_tcp_socket_options(write_socket_fd) < 0) {
 572  	    return;
 573  	}
 574  	rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Binding . . . \n");
 575  	if (bind
 576  	    (write_socket_fd, (struct sockaddr *) &cli_addr,
 577  		sizeof(cli_addr)) < 0) {
 578  	    rcs_print_error("TCPMEM: bind error %d = %s\n", errno,
 579  		strerror(errno));
 580  	    status = CMS_CREATE_ERROR;
 581  	}
 582  	rcs_print_debug(PRINT_CMS_CONFIG_INFO, "Connecting . . .\n");
 583  	if (connect
 584  	    (write_socket_fd, (struct sockaddr *) &server_socket_address,
 585  		sizeof(server_socket_address)) < 0) {
 586  	    if (EINPROGRESS == errno) {
 587  		FD_ZERO(&fds);
 588  		FD_SET(write_socket_fd, &fds);
 589  		start_time = etime();
 590  		tm.tv_sec = (long) timeout;
 591  		tm.tv_sec = (long) (fmod(timeout, 1.0) * 1e6);
 592  		while (!(socket_ret = select(write_socket_fd + 1,
 593  			    (fd_set *) NULL, &fds, (fd_set *) NULL, &tm))) {
 594  		    FD_SET(write_socket_fd, &fds);
 595  		    esleep(0.001);
 596  		    current_time = etime();
 597  		    double timeleft = start_time + timeout - current_time;
 598  		    if (timeleft <= 0.0 && timeout >= 0.0) {
 599  			rcs_print_error
 600  			    ("TCPMEM: Timed out waiting for connection.\n");
 601  			status = CMS_NO_SERVER_ERROR;
 602  			return;
 603  		    }
 604  		    tm.tv_sec = (long) timeleft;
 605  		    tm.tv_sec = (long) (fmod(timeleft, 1.0) * 1e6);
 606  		}
 607  		if (-1 == socket_ret) {
 608  		    rcs_print_error("select error: %d -- %s\n", errno,
 609  			strerror(errno));
 610  		    rcs_print_error("TCPMEM: Couldn't connect.\n");
 611  		    status = CMS_NO_SERVER_ERROR;
 612  		    return;
 613  		}
 614  	    } else {
 615  		rcs_print_error("connect error: %d -- %s\n", errno,
 616  		    strerror(errno));
 617  		rcs_print_error
 618  		    ("TCPMEM: Error trying to connect to TCP port %d of host %s.\n",
 619  		    ntohs(server_socket_address.sin_port), BufferHost);
 620  	    }
 621  	}
 622  	timeout = 0;
 623      } else {
 624  	write_socket_fd = read_socket_fd;
 625      }
 626      reconnect_needed = 0;
 627      fatal_error_occurred = 0;
 628  
 629  }
 630  
 631  TCPMEM::~TCPMEM()
 632  {
 633      disconnect();
 634  }
 635  
 636  void TCPMEM::disconnect()
 637  {
 638      if (write_socket_fd > 0 && write_socket_fd != socket_fd) {
 639  	if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) {
 640  	    if (delete_totally) {
 641  		putbe32(temp_buffer, (uint32_t) serial_number);
 642  		putbe32(temp_buffer + 4, REMOTE_CMS_CLEAN_REQUEST_TYPE);
 643  		putbe32(temp_buffer + 8, (uint32_t) buffer_number);
 644  		sendn(write_socket_fd, temp_buffer, 20, 0, -1);
 645  	    }
 646  	}
 647  	close(write_socket_fd);
 648  	write_socket_fd = 0;
 649      }
 650  
 651      if (socket_fd > 0) {
 652  	if (status != CMS_CONFIG_ERROR && status != CMS_CREATE_ERROR) {
 653  	    if (delete_totally) {
 654  		putbe32(temp_buffer, (uint32_t) serial_number);
 655  		putbe32(temp_buffer + 4, REMOTE_CMS_CLEAN_REQUEST_TYPE);
 656  		putbe32(temp_buffer + 8, (uint32_t) buffer_number);
 657  		sendn(socket_fd, temp_buffer, 20, 0, -1);
 658  	    }
 659  	}
 660  	close(socket_fd);
 661  	socket_fd = 0;
 662      }
 663  }
 664  
 665  CMS_STATUS TCPMEM::handle_old_replies()
 666  {
 667      long message_size;
 668  
 669      timedout_request_writeid = 0;
 670      status = CMS_STATUS_NOT_SET;
 671      switch (timedout_request) {
 672      case REMOTE_CMS_READ_REQUEST_TYPE:
 673  	if (!waiting_for_message) {
 674  	    if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) <
 675  		0) {
 676  		if (recvn_timedout) {
 677  		    if (polling) {
 678  			return status;
 679  		    } else {
 680  			consecutive_timeouts++;
 681  			if (consecutive_timeouts > max_consecutive_timeouts &&
 682  			    max_consecutive_timeouts > 0) {
 683  			    rcs_print_error
 684  				("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n",
 685  				consecutive_timeouts);
 686  			    fatal_error_occurred = 1;
 687  			    reconnect_needed = 1;
 688  			}
 689  			return (status = CMS_TIMED_OUT);
 690  		    }
 691  		} else {
 692  		    recvd_bytes = 0;
 693  		    fatal_error_occurred = 1;
 694  		    return (status = CMS_MISC_ERROR);
 695  		}
 696  	    }
 697  	    recvd_bytes = 0;
 698  	    returned_serial_number =
 699  		(CMS_STATUS) getbe32(temp_buffer);
 700  	    rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 701  		"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
 702  		socket_fd, returned_serial_number, buffer_number);
 703  	    if (returned_serial_number != serial_number) {
 704  		rcs_print_error
 705  		    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
 706  		    returned_serial_number, serial_number);
 707  		if (subscription_type == CMS_NO_SUBSCRIPTION) {
 708  		    fatal_error_occurred = 1;
 709  		    reconnect_needed = 1;
 710  		    return (status = CMS_MISC_ERROR);
 711  		} else {
 712  		    serial_number = returned_serial_number;
 713  		}
 714  	    }
 715  	    message_size = ntohl(*((uint32_t *) temp_buffer + 2));
 716  	    timedout_request_status =
 717  		(CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
 718  	    timedout_request_writeid = ntohl(*((uint32_t *) temp_buffer + 3));
 719  	    header.was_read = ntohl(*((uint32_t *) temp_buffer + 4));
 720  	    if (message_size > max_encoded_message_size) {
 721  		rcs_print_error("Received message is too big. (%ld > %ld)\n",
 722  		    message_size, max_encoded_message_size);
 723  		fatal_error_occurred = 1;
 724  		reconnect_needed = 1;
 725  		return (status = CMS_INSUFFICIENT_SPACE_ERROR);
 726  	    }
 727  	} else {
 728  	    message_size = waiting_message_size;
 729  	}
 730  	if (message_size > 0) {
 731  	    if (recvn
 732  		(socket_fd, encoded_data, message_size, 0, timeout,
 733  		    &recvd_bytes) < 0) {
 734  		if (recvn_timedout) {
 735  		    if (!waiting_for_message) {
 736  			waiting_message_id = timedout_request_writeid;
 737  			waiting_message_size = message_size;
 738  		    }
 739  		    waiting_for_message = 1;
 740  		    timedout_request_writeid = 0;
 741  		    if (polling) {
 742  			return status;
 743  		    } else {
 744  			consecutive_timeouts++;
 745  			if (consecutive_timeouts > max_consecutive_timeouts &&
 746  			    max_consecutive_timeouts > 0) {
 747  			    rcs_print_error
 748  				("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n",
 749  				consecutive_timeouts);
 750  			    fatal_error_occurred = 1;
 751  			    reconnect_needed = 1;
 752  			}
 753  			return (status = CMS_TIMED_OUT);
 754  		    }
 755  		} else {
 756  		    recvd_bytes = 0;
 757  		    fatal_error_occurred = 1;
 758  		    reconnect_needed = 1;
 759  		    return (status = CMS_MISC_ERROR);
 760  		}
 761  	    }
 762  	    recvd_bytes = 0;
 763  	    if (waiting_for_message) {
 764  		timedout_request_writeid = waiting_message_id;
 765  	    }
 766  	}
 767  	break;
 768  
 769      case REMOTE_CMS_WRITE_REQUEST_TYPE:
 770      case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE:
 771      case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE:
 772      case REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE:
 773      case REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE:
 774  	if (timedout_request == REMOTE_CMS_WRITE_REQUEST_TYPE &&
 775  	    (min_compatible_version > 2.58 || min_compatible_version < 1e-6 ||
 776  		confirm_write)) {
 777  	    break;
 778  	}
 779  	if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
 780  	    if (recvn_timedout) {
 781  		consecutive_timeouts++;
 782  		if (consecutive_timeouts > max_consecutive_timeouts &&
 783  		    max_consecutive_timeouts > 0) {
 784  		    rcs_print_error
 785  			("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n",
 786  			consecutive_timeouts);
 787  		    reconnect_needed = 1;
 788  		    fatal_error_occurred = 1;
 789  		}
 790  		reconnect_needed = 1;
 791  		return (status = CMS_TIMED_OUT);
 792  	    } else {
 793  		fatal_error_occurred = 1;
 794  		reconnect_needed = 1;
 795  		return (status = CMS_MISC_ERROR);
 796  	    }
 797  	}
 798  	recvd_bytes = 0;
 799  	returned_serial_number =
 800  	    (CMS_STATUS) getbe32(temp_buffer);
 801  	rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 802  	    "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
 803  	    socket_fd, returned_serial_number, buffer_number);
 804  	if (returned_serial_number != serial_number) {
 805  	    rcs_print_error
 806  		("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
 807  		returned_serial_number, serial_number);
 808  	    reconnect_needed = 1;
 809  	    if (subscription_type == CMS_NO_SUBSCRIPTION) {
 810  		return (status = CMS_MISC_ERROR);
 811  	    }
 812  	}
 813  	break;
 814  
 815      case REMOTE_CMS_CLEAR_REQUEST_TYPE:
 816  	if (recvn(socket_fd, temp_buffer, 4, 0, timeout, &recvd_bytes) < 0) {
 817  	    if (recvn_timedout) {
 818  		consecutive_timeouts++;
 819  		reconnect_needed = 1;
 820  		if (consecutive_timeouts > max_consecutive_timeouts &&
 821  		    max_consecutive_timeouts > 0) {
 822  		    rcs_print_error
 823  			("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n",
 824  			consecutive_timeouts);
 825  		    fatal_error_occurred = 1;
 826  		}
 827  		return (status = CMS_TIMED_OUT);
 828  	    } else {
 829  		reconnect_needed = 1;
 830  		fatal_error_occurred = 1;
 831  		return (status = CMS_MISC_ERROR);
 832  	    }
 833  	}
 834  	recvd_bytes = 0;
 835  	returned_serial_number =
 836  	    (CMS_STATUS) getbe32(temp_buffer);
 837  	rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 838  	    "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
 839  	    socket_fd, returned_serial_number, buffer_number);
 840  	if (returned_serial_number != serial_number) {
 841  	    rcs_print_error
 842  		("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
 843  		returned_serial_number, serial_number);
 844  	    reconnect_needed = 1;
 845  	    if (subscription_type == CMS_NO_SUBSCRIPTION) {
 846  		return (status = CMS_MISC_ERROR);
 847  	    }
 848  	}
 849  	break;
 850  
 851      case NO_REMOTE_CMS_REQUEST:
 852      default:
 853  	break;
 854      }
 855      if (bytes_to_throw_away > 0) {
 856  	if (recvn
 857  	    (socket_fd, encoded_data, bytes_to_throw_away, 0, timeout,
 858  		&recvd_bytes) < 0) {
 859  	    if (recvn_timedout) {
 860  		consecutive_timeouts++;
 861  		if (consecutive_timeouts > max_consecutive_timeouts &&
 862  		    max_consecutive_timeouts > 0) {
 863  		    rcs_print_error
 864  			("CMS: %d consecutive timeouts have occurred. -- Stop trying.\n",
 865  			consecutive_timeouts);
 866  		    fatal_error_occurred = 1;
 867  		    reconnect_needed = 1;
 868  		}
 869  		return (status = CMS_TIMED_OUT);
 870  	    } else {
 871  		recvd_bytes = 0;
 872  		fatal_error_occurred = 1;
 873  		reconnect_needed = 1;
 874  		return (status = CMS_MISC_ERROR);
 875  	    }
 876  	}
 877  	recvd_bytes = 0;
 878      }
 879      bytes_to_throw_away = 0;
 880      timedout_request = NO_REMOTE_CMS_REQUEST;
 881      consecutive_timeouts = 0;
 882      waiting_for_message = 0;
 883      waiting_message_size = 0;
 884      waiting_message_id = 0;
 885      recvd_bytes = 0;
 886      return status;
 887  }
 888  
 889  CMS_STATUS TCPMEM::read()
 890  {
 891      long message_size, id;
 892      REMOTE_CMS_REQUEST_TYPE last_timedout_request;
 893  
 894      /* Produce error message if process does not have permission to read. */
 895      if (!read_permission_flag) {
 896  	rcs_print_error("CMS: %s was not configured to read %s\n",
 897  	    ProcessName, BufferName);
 898  	return (status = CMS_PERMISSIONS_ERROR);
 899      }
 900  
 901      if (reconnect_needed && autoreconnect) {
 902  	reconnect();
 903      }
 904  
 905      if (reconnect_needed) {
 906  	return (status = CMS_MISC_ERROR);
 907      }
 908      disable_sigpipe();
 909  
 910      if (subscription_type != CMS_NO_SUBSCRIPTION) {
 911  	set_socket_fds(read_socket_fd);
 912  	timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
 913  	if (subscription_count < 1) {
 914  	    serial_number++;
 915  	}
 916  	handle_old_replies();
 917  	check_id(timedout_request_writeid);
 918  	if (status == CMS_READ_OK) {
 919  	    serial_number++;
 920  	}
 921  	subscription_count++;
 922  	reenable_sigpipe();
 923  	return status;
 924      }
 925  
 926      if (timedout_request == NO_REMOTE_CMS_REQUEST) {
 927  	set_socket_fds(read_socket_fd);
 928      }
 929      if (fatal_error_occurred) {
 930  	if (status >= 0) {
 931  	    status = CMS_MISC_ERROR;
 932  	}
 933  	reenable_sigpipe();
 934  	return (status);
 935      }
 936      if (socket_fd <= 0) {
 937  	rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n",
 938  	    socket_fd);
 939  	fatal_error_occurred = 1;
 940  	reconnect_needed = 1;
 941  	reenable_sigpipe();
 942  	return (status = CMS_MISC_ERROR);
 943      }
 944      last_timedout_request = timedout_request;
 945      if (((int) handle_old_replies()) < 0) {
 946  	reenable_sigpipe();
 947  	return status;
 948      }
 949      if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) {
 950  	check_id(timedout_request_writeid);
 951  	reenable_sigpipe();
 952  	return status;
 953      }
 954      set_socket_fds(read_socket_fd);
 955  
 956      putbe32(temp_buffer, (uint32_t) serial_number);
 957      putbe32(temp_buffer + 4, REMOTE_CMS_READ_REQUEST_TYPE);
 958      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
 959      putbe32(temp_buffer + 12, CMS_READ_ACCESS);
 960      putbe32(temp_buffer + 16, in_buffer_id);
 961  
 962      int send_header_size = 20;
 963      if (total_subdivisions > 1) {
 964  	*((uint32_t *) temp_buffer + 5) = htonl((uint32_t) current_subdivision);
 965  	send_header_size = 24;
 966      }
 967      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
 968  	rcs_print_error("TCPMEM: Can't send READ request to server.\n");
 969  	reconnect_needed = 1;
 970  	fatal_error_occurred = 1;
 971  	reenable_sigpipe();
 972  	return (status = CMS_MISC_ERROR);
 973      }
 974      serial_number++;
 975      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 976  	"TCPMEM sending request: fd = %d, serial_number=%ld, request_type=%d, buffer_number=%ld\n",
 977  	socket_fd, serial_number,
 978  	ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number);
 979  
 980      if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 20) {
 981  	if (recvn_timedout) {
 982  	    timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
 983  	    if (polling) {
 984  		return (status = CMS_READ_OLD);
 985  	    } else {
 986  		consecutive_timeouts = 1;
 987  		reenable_sigpipe();
 988  		return (status = CMS_TIMED_OUT);
 989  	    }
 990  	} else {
 991  	    recvd_bytes = 0;
 992  	    reconnect_needed = 1;
 993  	    fatal_error_occurred = 1;
 994  	    reenable_sigpipe();
 995  	    return (status = CMS_MISC_ERROR);
 996  	}
 997      }
 998      recvd_bytes = 0;
 999      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1000      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1001  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1002  	socket_fd, returned_serial_number, buffer_number);
1003  
1004      if (returned_serial_number != serial_number) {
1005  	rcs_print_error
1006  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1007  	    returned_serial_number, serial_number);
1008  	reconnect_needed = 1;
1009  	if (subscription_type == CMS_NO_SUBSCRIPTION) {
1010  	    fatal_error_occurred = 1;
1011  	    reenable_sigpipe();
1012  	    return (status = CMS_MISC_ERROR);
1013  	}
1014      }
1015      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1016      message_size = ntohl(*((uint32_t *) temp_buffer + 2));
1017      id = ntohl(*((uint32_t *) temp_buffer + 3));
1018      header.was_read = ntohl(*((uint32_t *) temp_buffer + 4));
1019      if (message_size > max_encoded_message_size) {
1020  	rcs_print_error("Received message is too big. (%ld > %ld)\n",
1021  	    message_size, max_encoded_message_size);
1022  	fatal_error_occurred = 1;
1023  	reconnect_needed = 1;
1024  	reenable_sigpipe();
1025  	return (status = CMS_MISC_ERROR);
1026      }
1027      if (message_size > 0) {
1028  	if (recvn
1029  	    (socket_fd, encoded_data, message_size, 0, timeout,
1030  		&recvd_bytes) < 0) {
1031  	    if (recvn_timedout) {
1032  		if (!waiting_for_message) {
1033  		    waiting_message_id = id;
1034  		    waiting_message_size = message_size;
1035  		}
1036  		waiting_for_message = 1;
1037  		timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1038  		if (polling) {
1039  		    reenable_sigpipe();
1040  		    return (status = CMS_READ_OLD);
1041  		} else {
1042  		    reenable_sigpipe();
1043  		    return (status = CMS_TIMED_OUT);
1044  		}
1045  	    } else {
1046  		recvd_bytes = 0;
1047  		fatal_error_occurred = 1;
1048  		reconnect_needed = 1;
1049  		reenable_sigpipe();
1050  		return (status = CMS_MISC_ERROR);
1051  	    }
1052  	}
1053      }
1054      recvd_bytes = 0;
1055      check_id(id);
1056      reenable_sigpipe();
1057      return (status);
1058  }
1059  
1060  CMS_STATUS TCPMEM::blocking_read(double _blocking_timeout)
1061  {
1062      blocking_timeout = _blocking_timeout;
1063      long message_size, id;
1064      REMOTE_CMS_REQUEST_TYPE last_timedout_request;
1065      long timeout_millis;
1066      int orig_print_recvn_timeout_errors = print_recvn_timeout_errors;
1067      print_recvn_timeout_errors = 0;
1068  
1069  /* Produce error message if process does not have permission to read. */
1070      if (!read_permission_flag) {
1071  	rcs_print_error("CMS: %s was not configured to read %s\n",
1072  	    ProcessName, BufferName);
1073  	return (status = CMS_PERMISSIONS_ERROR);
1074      }
1075  
1076      if (blocking_timeout < 0) {
1077  	timeout_millis = -1;
1078      } else {
1079  	timeout_millis = (uint32_t) (blocking_timeout * 1000.0);
1080      }
1081  
1082      if (reconnect_needed && autoreconnect) {
1083  	reconnect();
1084      }
1085  
1086      if (reconnect_needed) {
1087  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1088  	return (status = CMS_MISC_ERROR);
1089      }
1090      disable_sigpipe();
1091      double orig_timeout = timeout;
1092  
1093      if (subscription_type != CMS_NO_SUBSCRIPTION) {
1094  	if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) {
1095  	    make_tcp_socket_blocking(read_socket_fd);
1096  	    timeout = blocking_timeout;
1097  	}
1098  	set_socket_fds(read_socket_fd);
1099  	if (subscription_count < 1) {
1100  	    serial_number++;
1101  	}
1102  	timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1103  	handle_old_replies();
1104  	check_id(timedout_request_writeid);
1105  	if (status == CMS_READ_OK) {
1106  	    serial_number++;
1107  	}
1108  	subscription_count++;
1109  	reenable_sigpipe();
1110  	if (blocking_timeout < -1e-6 || blocking_timeout > 1e-6) {
1111  	    make_tcp_socket_nonblocking(read_socket_fd);
1112  	    timeout = orig_timeout;
1113  	}
1114  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1115  	return status;
1116      }
1117  
1118      if (timedout_request == NO_REMOTE_CMS_REQUEST) {
1119  	set_socket_fds(read_socket_fd);
1120      }
1121      if (fatal_error_occurred) {
1122  	if (status >= 0) {
1123  	    status = CMS_MISC_ERROR;
1124  	}
1125  	reenable_sigpipe();
1126  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1127  	return (status);
1128      }
1129      if (socket_fd <= 0) {
1130  	rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n",
1131  	    socket_fd);
1132  	fatal_error_occurred = 1;
1133  	reconnect_needed = 1;
1134  	reenable_sigpipe();
1135  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1136  	return (status = CMS_MISC_ERROR);
1137      }
1138      last_timedout_request = timedout_request;
1139      if (((int) handle_old_replies()) < 0) {
1140  	reenable_sigpipe();
1141  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1142  	return status;
1143      }
1144      if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) {
1145  	check_id(timedout_request_writeid);
1146  	reenable_sigpipe();
1147  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1148  	return status;
1149      }
1150      set_socket_fds(read_socket_fd);
1151  
1152      putbe32(temp_buffer, (uint32_t) serial_number);
1153      putbe32(temp_buffer + 4, REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE);
1154      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1155      putbe32(temp_buffer + 12, CMS_READ_ACCESS);
1156      putbe32(temp_buffer + 16, (uint32_t) in_buffer_id);
1157      putbe32(temp_buffer + 20, (uint32_t) timeout_millis);
1158  
1159      int send_header_size = 24;
1160      if (total_subdivisions > 1) {
1161          putbe32(temp_buffer + 24, (uint32_t) current_subdivision);
1162  	send_header_size = 28;
1163      }
1164      if (sendn(socket_fd, temp_buffer, send_header_size, 0, blocking_timeout) <
1165  	0) {
1166  	rcs_print_error
1167  	    ("TCPMEM: Can't send BLOCKING_READ request to server.\n");
1168  	reconnect_needed = 1;
1169  	fatal_error_occurred = 1;
1170  	reenable_sigpipe();
1171  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1172  	return (status = CMS_MISC_ERROR);
1173      }
1174      serial_number++;
1175      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1176  	"TCPMEM sending request: fd = %d, serial_number=%ld, "
1177  	"request_type=%d, buffer_number=%ld\n",
1178  	socket_fd, serial_number,
1179  	ntohl(*((uint32_t *) temp_buffer + 1)), buffer_number);
1180      if (recvn(socket_fd, temp_buffer, 20, 0, blocking_timeout, &recvd_bytes) <
1181  	0) {
1182  	print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1183  	if (recvn_timedout) {
1184  	    timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1185  	    if (polling) {
1186  		return (status = CMS_READ_OLD);
1187  	    } else {
1188  		consecutive_timeouts = 1;
1189  		reenable_sigpipe();
1190  		return (status = CMS_TIMED_OUT);
1191  	    }
1192  	} else {
1193  	    recvd_bytes = 0;
1194  	    reconnect_needed = 1;
1195  	    fatal_error_occurred = 1;
1196  	    reenable_sigpipe();
1197  	    return (status = CMS_MISC_ERROR);
1198  	}
1199      }
1200      print_recvn_timeout_errors = orig_print_recvn_timeout_errors;
1201      recvd_bytes = 0;
1202      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1203      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1204  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1205  	socket_fd, returned_serial_number, buffer_number);
1206  
1207      if (returned_serial_number != serial_number) {
1208  	rcs_print_error
1209  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1210  	    returned_serial_number, serial_number);
1211  	reconnect_needed = 1;
1212  	if (subscription_type == CMS_NO_SUBSCRIPTION) {
1213  	    fatal_error_occurred = 1;
1214  	    reenable_sigpipe();
1215  	    return (status = CMS_MISC_ERROR);
1216  	}
1217      }
1218      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1219      message_size = ntohl(*((uint32_t *) temp_buffer + 2));
1220      id = ntohl(*((uint32_t *) temp_buffer + 3));
1221      header.was_read = ntohl(*((uint32_t *) temp_buffer + 4));
1222      if (message_size > max_encoded_message_size) {
1223  	rcs_print_error("Received message is too big. (%ld > %ld)\n",
1224  	    message_size, max_encoded_message_size);
1225  	fatal_error_occurred = 1;
1226  	reconnect_needed = 1;
1227  	reenable_sigpipe();
1228  	return (status = CMS_MISC_ERROR);
1229      }
1230      if (message_size > 0) {
1231  	if (recvn
1232  	    (socket_fd, encoded_data, message_size, 0, blocking_timeout,
1233  		&recvd_bytes) < 0) {
1234  	    if (recvn_timedout) {
1235  		if (!waiting_for_message) {
1236  		    waiting_message_id = id;
1237  		    waiting_message_size = message_size;
1238  		}
1239  		waiting_for_message = 1;
1240  		timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1241  		if (polling) {
1242  		    reenable_sigpipe();
1243  		    return (status = CMS_READ_OLD);
1244  		} else {
1245  		    reenable_sigpipe();
1246  		    return (status = CMS_TIMED_OUT);
1247  		}
1248  	    } else {
1249  		recvd_bytes = 0;
1250  		fatal_error_occurred = 1;
1251  		reconnect_needed = 1;
1252  		reenable_sigpipe();
1253  		return (status = CMS_MISC_ERROR);
1254  	    }
1255  	}
1256      }
1257      recvd_bytes = 0;
1258      check_id(id);
1259      reenable_sigpipe();
1260      return (status);
1261  }
1262  
1263  void TCPMEM::reenable_sigpipe()
1264  {
1265      if (old_handler != ((void (*)(int)) SIG_ERR)) {
1266  	signal(SIGPIPE, old_handler);
1267      }
1268      old_handler = (void (*)(int)) SIG_ERR;
1269      if (tcpmem_sigpipe_count > sigpipe_count) {
1270  	sigpipe_count = tcpmem_sigpipe_count;
1271  	reconnect_needed = 1;
1272      }
1273  }
1274  
1275  void TCPMEM::disable_sigpipe()
1276  {
1277      if (!autoreconnect) {
1278  	return;
1279      }
1280      old_handler = signal(SIGPIPE, tcpmem_sigpipe_handler);
1281      if (tcpmem_sigpipe_count > sigpipe_count) {
1282  	sigpipe_count = tcpmem_sigpipe_count;
1283      }
1284  }
1285  
1286  CMS_STATUS TCPMEM::peek()
1287  {
1288      /* Produce error message if process does not have permission to read. */
1289      if (!read_permission_flag) {
1290  	rcs_print_error("CMS: %s was not configured to read %s\n",
1291  	    ProcessName, BufferName);
1292  	return (status = CMS_PERMISSIONS_ERROR);
1293      }
1294  
1295      if (reconnect_needed && autoreconnect) {
1296  	reconnect();
1297      }
1298  
1299      if (reconnect_needed) {
1300  	return (status = CMS_MISC_ERROR);
1301      }
1302      disable_sigpipe();
1303  
1304      long message_size, id;
1305      REMOTE_CMS_REQUEST_TYPE last_timedout_request;
1306      if (subscription_type != CMS_NO_SUBSCRIPTION) {
1307  	set_socket_fds(read_socket_fd);
1308  	timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1309  	if (subscription_count < 1) {
1310  	    serial_number++;
1311  	}
1312  	handle_old_replies();
1313  	check_id(timedout_request_writeid);
1314  	if (status == CMS_READ_OK) {
1315  	    serial_number++;
1316  	}
1317  	reenable_sigpipe();
1318  	subscription_count++;
1319  	return status;
1320      }
1321  
1322      if (timedout_request == NO_REMOTE_CMS_REQUEST) {
1323  	set_socket_fds(read_socket_fd);
1324      }
1325  
1326      if (fatal_error_occurred) {
1327  	if (status >= 0) {
1328  	    status = CMS_MISC_ERROR;
1329  	}
1330  	reenable_sigpipe();
1331  	return (status);
1332      }
1333      if (socket_fd <= 0) {
1334  	reconnect_needed = 1;
1335  	rcs_print_error("TCPMEM::read: Invalid socket descriptor. (%d)\n",
1336  	    socket_fd);
1337  	reenable_sigpipe();
1338  	return (status = CMS_MISC_ERROR);
1339      }
1340      last_timedout_request = timedout_request;
1341      if (((int) handle_old_replies()) < 0) {
1342  	reenable_sigpipe();
1343  	return status;
1344      }
1345      if (polling && last_timedout_request == REMOTE_CMS_READ_REQUEST_TYPE) {
1346  	check_id(timedout_request_writeid);
1347  	reenable_sigpipe();
1348  	return status;
1349      }
1350      set_socket_fds(read_socket_fd);
1351  
1352      putbe32(temp_buffer, (uint32_t) serial_number);
1353      putbe32(temp_buffer + 4, REMOTE_CMS_READ_REQUEST_TYPE);
1354      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1355      putbe32(temp_buffer + 12, CMS_PEEK_ACCESS);
1356      putbe32(temp_buffer + 16, (uint32_t) in_buffer_id);
1357      int send_header_size = 20;
1358      if (total_subdivisions > 1) {
1359  	*((uint32_t *) temp_buffer + 20) = htonl((uint32_t) current_subdivision);
1360  	send_header_size = 24;
1361      }
1362      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1363  	rcs_print_error("TCPMEM: Can't send PEEK request to server.\n");
1364  	reconnect_needed = 1;
1365  	reenable_sigpipe();
1366  	return (status = CMS_MISC_ERROR);
1367      }
1368      serial_number++;
1369      if (recvn(socket_fd, temp_buffer, 20, 0, timeout, &recvd_bytes) < 0) {
1370  	if (recvn_timedout) {
1371  	    timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1372  	    if (polling) {
1373  		reenable_sigpipe();
1374  		return (status = CMS_READ_OLD);
1375  	    } else {
1376  		consecutive_timeouts = 1;
1377  		reenable_sigpipe();
1378  		return (status = CMS_TIMED_OUT);
1379  	    }
1380  	} else {
1381  	    recvd_bytes = 0;
1382  	    fatal_error_occurred = 1;
1383  	    reconnect_needed = 1;
1384  	    reenable_sigpipe();
1385  	    return (status = CMS_MISC_ERROR);
1386  	}
1387      }
1388      recvd_bytes = 0;
1389      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1390      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1391  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1392  	socket_fd, returned_serial_number, buffer_number);
1393  
1394      if (returned_serial_number != serial_number) {
1395  	rcs_print_error
1396  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1397  	    returned_serial_number, serial_number);
1398  	reconnect_needed = 1;
1399  	if (subscription_type == CMS_NO_SUBSCRIPTION) {
1400  	    reenable_sigpipe();
1401  	    return (status = CMS_MISC_ERROR);
1402  	}
1403      }
1404      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1405      message_size = ntohl(*((uint32_t *) temp_buffer + 2));
1406      id = ntohl(*((uint32_t *) temp_buffer + 3));
1407      header.was_read = ntohl(*((uint32_t *) temp_buffer + 4));
1408      if (message_size > max_encoded_message_size) {
1409  	reconnect_needed = 1;
1410  	rcs_print_error("Received message is too big. (%ld > %ld)\n",
1411  	    message_size, max_encoded_message_size);
1412  	reenable_sigpipe();
1413  	return (status = CMS_MISC_ERROR);
1414      }
1415      if (message_size > 0) {
1416  	if (recvn
1417  	    (socket_fd, encoded_data, message_size, 0, timeout,
1418  		&recvd_bytes) < 0) {
1419  	    if (recvn_timedout) {
1420  		if (!waiting_for_message) {
1421  		    waiting_message_id = id;
1422  		    waiting_message_size = message_size;
1423  		}
1424  		waiting_for_message = 1;
1425  		timedout_request = REMOTE_CMS_READ_REQUEST_TYPE;
1426  		if (polling) {
1427  		    reenable_sigpipe();
1428  		    return (status = CMS_READ_OLD);
1429  		} else {
1430  		    reenable_sigpipe();
1431  		    return (status = CMS_TIMED_OUT);
1432  		}
1433  	    } else {
1434  		reconnect_needed = 1;
1435  		recvd_bytes = 0;
1436  		fatal_error_occurred = 1;
1437  		reenable_sigpipe();
1438  		return (status = CMS_MISC_ERROR);
1439  	    }
1440  	}
1441      }
1442      recvd_bytes = 0;
1443      check_id(id);
1444      reenable_sigpipe();
1445      return (status);
1446  }
1447  
1448  CMS_STATUS TCPMEM::write(void *user_data, int *serial_number_out)
1449  {
1450  
1451      if (!write_permission_flag) {
1452  	rcs_print_error("CMS: %s was not configured to write to %s\n",
1453  	    ProcessName, BufferName);
1454  	return (status = CMS_PERMISSIONS_ERROR);
1455      }
1456  
1457      if (reconnect_needed && autoreconnect) {
1458  	reconnect();
1459      }
1460  
1461      if (!force_raw) {
1462  	user_data = encoded_data;
1463      }
1464  
1465      if (reconnect_needed) {
1466  	return (status = CMS_MISC_ERROR);
1467      }
1468  
1469      if (fatal_error_occurred) {
1470  	if (status >= 0) {
1471  	    status = CMS_MISC_ERROR;
1472  	}
1473  	return (status);
1474      }
1475  
1476      disable_sigpipe();
1477  
1478      if (socket_fd <= 0) {
1479  	rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n",
1480  	    socket_fd);
1481  	reenable_sigpipe();
1482  	return (status = CMS_MISC_ERROR);
1483      }
1484      if (((int) handle_old_replies()) < 0) {
1485  	reenable_sigpipe();
1486  	return status;
1487      }
1488      set_socket_fds(write_socket_fd);
1489  
1490      putbe32(temp_buffer, serial_number);
1491      putbe32(temp_buffer + 4, REMOTE_CMS_WRITE_REQUEST_TYPE);
1492      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1493      putbe32(temp_buffer + 12, CMS_WRITE_ACCESS);
1494      putbe32(temp_buffer + 16, (uint32_t) header.in_buffer_size);
1495      int send_header_size = 20;
1496      if (total_subdivisions > 1) {
1497          putbe32(temp_buffer + 20,(uint32_t) current_subdivision);
1498  	send_header_size = 24;
1499      }
1500      if (header.in_buffer_size < 0x2000 - 20 && header.in_buffer_size > 0) {
1501  	memcpy(temp_buffer + send_header_size, user_data,
1502  	    header.in_buffer_size);
1503  	if (sendn
1504  	    (socket_fd, temp_buffer, header.in_buffer_size + send_header_size,
1505  		0, timeout) < 0) {
1506  	    rcs_print_error
1507  		("TCPMEM: Failed to send message of size %ld + header of size %d  to the server.\n",
1508  		header.in_buffer_size, send_header_size);
1509  	    reconnect_needed = 1;
1510  	    reenable_sigpipe();
1511  	    return (status = CMS_MISC_ERROR);
1512  	}
1513      } else {
1514  	if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1515  	    rcs_print_error("TCPMEM: Failed to send header to server.\n");
1516  	    reconnect_needed = 1;
1517  	    reenable_sigpipe();
1518  	    return (status = CMS_MISC_ERROR);
1519  	}
1520  	if (header.in_buffer_size > 0) {
1521  	    if (sendn(socket_fd, user_data, header.in_buffer_size, 0, timeout)
1522  		< 0) {
1523  		reconnect_needed = 1;
1524  		reenable_sigpipe();
1525  		return (status = CMS_MISC_ERROR);
1526  	    }
1527  	}
1528      }
1529      serial_number++;
1530      if ((min_compatible_version < 2.58 && min_compatible_version > 1e-6)
1531  	|| confirm_write) {
1532  	if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1533  	    if (recvn_timedout) {
1534  		timedout_request = REMOTE_CMS_WRITE_REQUEST_TYPE;
1535  		consecutive_timeouts = 1;
1536  		reenable_sigpipe();
1537  		return (status = CMS_TIMED_OUT);
1538  	    } else {
1539  		recvd_bytes = 0;
1540  		reconnect_needed = 1;
1541  		fatal_error_occurred = 1;
1542  		reenable_sigpipe();
1543  		return (status = CMS_MISC_ERROR);
1544  	    }
1545  	}
1546  	recvd_bytes = 0;
1547  	returned_serial_number =
1548  	    (CMS_STATUS) getbe32(temp_buffer);
1549  	if(serial_number_out) *serial_number_out = returned_serial_number;
1550  	rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1551  	    "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1552  	    socket_fd, returned_serial_number, buffer_number);
1553  
1554  	status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1555  	header.was_read = ntohl(*((uint32_t *) temp_buffer + 2));
1556  	header.write_id = returned_serial_number;
1557      } else {
1558  	header.was_read = 0;
1559  	status = CMS_WRITE_OK;
1560  	returned_serial_number = serial_number;
1561      }
1562      reenable_sigpipe();
1563      return (status);
1564  }
1565  
1566  CMS_STATUS TCPMEM::write_if_read(void *user_data, int *serial_number_out)
1567  {
1568  
1569      if (!write_permission_flag) {
1570  	rcs_print_error("CMS: %s was not configured to write to %s\n",
1571  	    ProcessName, BufferName);
1572  	return (status = CMS_PERMISSIONS_ERROR);
1573      }
1574  
1575      if (reconnect_needed && autoreconnect) {
1576  	reconnect();
1577      }
1578      if (!force_raw) {
1579  	user_data = encoded_data;
1580      }
1581  
1582      if (reconnect_needed) {
1583  	return (status = CMS_MISC_ERROR);
1584      }
1585  
1586      if (fatal_error_occurred) {
1587  	if (status >= 0) {
1588  	    status = CMS_MISC_ERROR;
1589  	}
1590  	return (status);
1591      }
1592      disable_sigpipe();
1593  
1594      if (socket_fd <= 0) {
1595  	rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n",
1596  	    socket_fd);
1597  	reenable_sigpipe();
1598  	return (status = CMS_MISC_ERROR);
1599      }
1600      if (((int) handle_old_replies()) < 0) {
1601  	reenable_sigpipe();
1602  	return status;
1603      }
1604  
1605      set_socket_fds(write_socket_fd);
1606  
1607      putbe32( temp_buffer, (uint32_t) serial_number);
1608      putbe32( temp_buffer + 4, REMOTE_CMS_WRITE_REQUEST_TYPE);
1609      putbe32( temp_buffer + 8, (uint32_t) buffer_number);
1610      putbe32( temp_buffer + 12, CMS_WRITE_IF_READ_ACCESS);
1611      putbe32( temp_buffer + 16, (uint32_t) header.in_buffer_size);
1612      int send_header_size = 20;
1613      if (total_subdivisions > 1) {
1614          putbe32( temp_buffer + 20, (uint32_t) current_subdivision);
1615  	send_header_size = 24;
1616      }
1617      if (header.in_buffer_size < 0x2000 - 20 && header.in_buffer_size > 0) {
1618  	memcpy(temp_buffer + 20, user_data, header.in_buffer_size);
1619  	if (sendn
1620  	    (socket_fd, temp_buffer, header.in_buffer_size + send_header_size,
1621  		0, timeout) < 0) {
1622  	    reconnect_needed = 1;
1623  	    reenable_sigpipe();
1624  	    return (status = CMS_MISC_ERROR);
1625  	}
1626      } else {
1627  	if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1628  	    reconnect_needed = 1;
1629  	    reenable_sigpipe();
1630  	    return (status = CMS_MISC_ERROR);
1631  	}
1632  	if (header.in_buffer_size > 0) {
1633  	    if (sendn(socket_fd, user_data, header.in_buffer_size, 0, timeout)
1634  		< 0) {
1635  		reconnect_needed = 1;
1636  		reenable_sigpipe();
1637  		return (status = CMS_MISC_ERROR);
1638  	    }
1639  	}
1640      }
1641      serial_number++;
1642      if ((min_compatible_version < 2.58 && min_compatible_version > 1e-6) ||
1643  	confirm_write) {
1644  	if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1645  	    if (recvn_timedout) {
1646  		timedout_request = REMOTE_CMS_WRITE_REQUEST_TYPE;
1647  		consecutive_timeouts = 1;
1648  		reenable_sigpipe();
1649  		return (status = CMS_TIMED_OUT);
1650  	    } else {
1651  		recvd_bytes = 0;
1652  		fatal_error_occurred = 1;
1653  		reconnect_needed = 1;
1654  		reenable_sigpipe();
1655  		return (status = CMS_MISC_ERROR);
1656  	    }
1657  	}
1658  	recvd_bytes = 0;
1659  	returned_serial_number =
1660  	    (CMS_STATUS) getbe32(temp_buffer);
1661  	if(serial_number_out) *serial_number_out = returned_serial_number;
1662  	rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1663  	    "TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1664  	    socket_fd, returned_serial_number, buffer_number);
1665  	status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1666  	header.was_read = ntohl(*((uint32_t *) temp_buffer + 2));
1667      } else {
1668  	header.was_read = 0;
1669  	status = CMS_WRITE_OK;
1670  	returned_serial_number = 0;
1671      }
1672      reenable_sigpipe();
1673      return (status);
1674  }
1675  
1676  int TCPMEM::check_if_read()
1677  {
1678      if (reconnect_needed && autoreconnect) {
1679  	reconnect();
1680      }
1681  
1682      if (reconnect_needed) {
1683  	return (status = CMS_MISC_ERROR);
1684      }
1685  
1686      if (fatal_error_occurred) {
1687  	if (status >= 0) {
1688  	    status = CMS_MISC_ERROR;
1689  	}
1690  	return (status);
1691      }
1692  
1693      disable_sigpipe();
1694  
1695      if (socket_fd <= 0) {
1696  	rcs_print_error
1697  	    ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
1698  	    socket_fd);
1699  	reenable_sigpipe();
1700  	return (status = CMS_MISC_ERROR);
1701      }
1702      if (((int) handle_old_replies()) < 0) {
1703  	reenable_sigpipe();
1704  	return 0;
1705      }
1706  
1707      set_socket_fds(write_socket_fd);
1708  
1709      putbe32(temp_buffer, (uint32_t) serial_number);
1710      putbe32(temp_buffer + 4, REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE);
1711      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1712      int send_header_size = 20;
1713      if (total_subdivisions > 1) {
1714          putbe32(temp_buffer + 12, (uint32_t) current_subdivision);
1715      }
1716      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1717  	status = CMS_MISC_ERROR;
1718  	reconnect_needed = 1;
1719  	reenable_sigpipe();
1720  	return (0);
1721      }
1722      serial_number++;
1723      if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1724  	if (recvn_timedout) {
1725  	    timedout_request = REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE;
1726  	    consecutive_timeouts = 1;
1727  	    status = CMS_TIMED_OUT;
1728  	    reenable_sigpipe();
1729  	    return 0;
1730  	} else {
1731  	    recvd_bytes = 0;
1732  	    fatal_error_occurred = 1;
1733  	    status = CMS_MISC_ERROR;
1734  	    reenable_sigpipe();
1735  	    return 0;
1736  	}
1737      }
1738      recvd_bytes = 0;
1739      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1740      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1741  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1742  	socket_fd, returned_serial_number, buffer_number);
1743      if (returned_serial_number != serial_number) {
1744  	rcs_print_error
1745  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1746  	    returned_serial_number, serial_number);
1747  	reenable_sigpipe();
1748  	return (status = CMS_MISC_ERROR);
1749      }
1750      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1751      header.was_read = ntohl(*((uint32_t *) temp_buffer + 2));
1752      reenable_sigpipe();
1753      return (header.was_read);
1754  }
1755  
1756  int TCPMEM::get_queue_length()
1757  {
1758      if (reconnect_needed && autoreconnect) {
1759  	reconnect();
1760      }
1761  
1762      if (reconnect_needed) {
1763  	return (status = CMS_MISC_ERROR);
1764      }
1765  
1766      if (fatal_error_occurred) {
1767  	if (status >= 0) {
1768  	    status = CMS_MISC_ERROR;
1769  	}
1770  	return (status);
1771      }
1772  
1773      disable_sigpipe();
1774  
1775      if (socket_fd <= 0) {
1776  	rcs_print_error
1777  	    ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
1778  	    socket_fd);
1779  	reenable_sigpipe();
1780  	return (status = CMS_MISC_ERROR);
1781      }
1782      if (((int) handle_old_replies()) < 0) {
1783  	reenable_sigpipe();
1784  	return 0;
1785      }
1786  
1787      set_socket_fds(write_socket_fd);
1788  
1789      putbe32(temp_buffer, (uint32_t) serial_number);
1790      putbe32(temp_buffer + 4, REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE);
1791      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1792      int send_header_size = 20;
1793      if (total_subdivisions > 1) {
1794          putbe32(temp_buffer + 16, (uint32_t) current_subdivision);
1795      }
1796      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1797  	status = CMS_MISC_ERROR;
1798  	reconnect_needed = 1;
1799  	reenable_sigpipe();
1800  	return (0);
1801      }
1802      serial_number++;
1803      if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1804  	if (recvn_timedout) {
1805  	    timedout_request = REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE;
1806  	    consecutive_timeouts = 1;
1807  	    status = CMS_TIMED_OUT;
1808  	    reenable_sigpipe();
1809  	    return 0;
1810  	} else {
1811  	    recvd_bytes = 0;
1812  	    fatal_error_occurred = 1;
1813  	    status = CMS_MISC_ERROR;
1814  	    reenable_sigpipe();
1815  	    return 0;
1816  	}
1817      }
1818      recvd_bytes = 0;
1819      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1820      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1821  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1822  	socket_fd, returned_serial_number, buffer_number);
1823      if (returned_serial_number != serial_number) {
1824  	rcs_print_error
1825  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1826  	    returned_serial_number, serial_number);
1827  	reenable_sigpipe();
1828  	return (status = CMS_MISC_ERROR);
1829      }
1830      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1831      queuing_header.queue_length = ntohl(*((uint32_t *) temp_buffer + 2));
1832      reenable_sigpipe();
1833      return (queuing_header.queue_length);
1834  }
1835  
1836  int TCPMEM::get_msg_count()
1837  {
1838      if (reconnect_needed && autoreconnect) {
1839  	reconnect();
1840      }
1841  
1842      if (reconnect_needed) {
1843  	return (status = CMS_MISC_ERROR);
1844      }
1845  
1846      if (fatal_error_occurred) {
1847  	if (status >= 0) {
1848  	    status = CMS_MISC_ERROR;
1849  	}
1850  	return (status);
1851      }
1852  
1853      disable_sigpipe();
1854  
1855      if (socket_fd <= 0) {
1856  	rcs_print_error
1857  	    ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
1858  	    socket_fd);
1859  	reenable_sigpipe();
1860  	return (status = CMS_MISC_ERROR);
1861      }
1862      if (((int) handle_old_replies()) < 0) {
1863  	reenable_sigpipe();
1864  	return 0;
1865      }
1866  
1867      set_socket_fds(write_socket_fd);
1868  
1869      putbe32(temp_buffer, (uint32_t) serial_number);
1870      putbe32(temp_buffer + 4, REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE);
1871      putbe32(temp_buffer + 8, (uint32_t) buffer_number);
1872      int send_header_size = 20;
1873      if (total_subdivisions > 1) {
1874          putbe32(temp_buffer + 12, (uint32_t) current_subdivision);
1875      }
1876      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1877  	status = CMS_MISC_ERROR;
1878  	reconnect_needed = 1;
1879  	reenable_sigpipe();
1880  	return (0);
1881      }
1882      serial_number++;
1883      if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1884  	if (recvn_timedout) {
1885  	    timedout_request = REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE;
1886  	    consecutive_timeouts = 1;
1887  	    status = CMS_TIMED_OUT;
1888  	    reenable_sigpipe();
1889  	    return 0;
1890  	} else {
1891  	    recvd_bytes = 0;
1892  	    fatal_error_occurred = 1;
1893  	    status = CMS_MISC_ERROR;
1894  	    reenable_sigpipe();
1895  	    return 0;
1896  	}
1897      }
1898      recvd_bytes = 0;
1899      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1900      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1901  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1902  	socket_fd, returned_serial_number, buffer_number);
1903      if (returned_serial_number != serial_number) {
1904  	rcs_print_error
1905  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1906  	    returned_serial_number, serial_number);
1907  	reenable_sigpipe();
1908  	return (status = CMS_MISC_ERROR);
1909      }
1910      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1911      header.write_id = ntohl(*((uint32_t *) temp_buffer + 2));
1912      reenable_sigpipe();
1913      return (header.write_id);
1914  }
1915  
1916  int TCPMEM::get_space_available()
1917  {
1918      if (reconnect_needed && autoreconnect) {
1919  	reconnect();
1920      }
1921  
1922      if (reconnect_needed) {
1923  	return (status = CMS_MISC_ERROR);
1924      }
1925  
1926      if (fatal_error_occurred) {
1927  	if (status >= 0) {
1928  	    status = CMS_MISC_ERROR;
1929  	}
1930  	return (status);
1931      }
1932  
1933      disable_sigpipe();
1934  
1935      if (socket_fd <= 0) {
1936  	rcs_print_error
1937  	    ("TCPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
1938  	    socket_fd);
1939  	reenable_sigpipe();
1940  	return (status = CMS_MISC_ERROR);
1941      }
1942      if (((int) handle_old_replies()) < 0) {
1943  	reenable_sigpipe();
1944  	return 0;
1945      }
1946  
1947      set_socket_fds(write_socket_fd);
1948  
1949      putbe32(temp_buffer, (uint32_t) serial_number);
1950      putbe32(temp_buffer + 4, REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE);
1951      putbe32(temp_buffer + 8,buffer_number);
1952      int send_header_size = 20;
1953      if (total_subdivisions > 1) {
1954          putbe32(temp_buffer + 12, current_subdivision);
1955      }
1956      if (sendn(socket_fd, temp_buffer, send_header_size, 0, timeout) < 0) {
1957  	status = CMS_MISC_ERROR;
1958  	reconnect_needed = 1;
1959  	reenable_sigpipe();
1960  	return (0);
1961      }
1962      serial_number++;
1963      if (recvn(socket_fd, temp_buffer, 12, 0, timeout, &recvd_bytes) < 0) {
1964  	if (recvn_timedout) {
1965  	    timedout_request = REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE;
1966  	    consecutive_timeouts = 1;
1967  	    status = CMS_TIMED_OUT;
1968  	    reenable_sigpipe();
1969  	    return 0;
1970  	} else {
1971  	    recvd_bytes = 0;
1972  	    fatal_error_occurred = 1;
1973  	    status = CMS_MISC_ERROR;
1974  	    reenable_sigpipe();
1975  	    return 0;
1976  	}
1977      }
1978      recvd_bytes = 0;
1979      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
1980      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
1981  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
1982  	socket_fd, returned_serial_number, buffer_number);
1983      if (returned_serial_number != serial_number) {
1984  	rcs_print_error
1985  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
1986  	    returned_serial_number, serial_number);
1987  	reenable_sigpipe();
1988  	return (status = CMS_MISC_ERROR);
1989      }
1990      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
1991      free_space = ntohl(*((uint32_t *) temp_buffer + 2));
1992      reenable_sigpipe();
1993      return (free_space);
1994  }
1995  
1996  CMS_STATUS TCPMEM::clear()
1997  {
1998      if (reconnect_needed && autoreconnect) {
1999  	reconnect();
2000      }
2001  
2002      if (reconnect_needed) {
2003  	return (status = CMS_MISC_ERROR);
2004      }
2005  
2006      if (fatal_error_occurred) {
2007  	if (status >= 0) {
2008  	    status = CMS_MISC_ERROR;
2009  	}
2010  	return (status);
2011      }
2012      if (socket_fd <= 0) {
2013  	rcs_print_error("TCPMEM::clear: Invalid socket descriptor. (%d)\n",
2014  	    socket_fd);
2015  	reconnect_needed = 1;
2016  	return (status = CMS_MISC_ERROR);
2017      }
2018      if (((int) handle_old_replies()) < 0) {
2019  	return status;
2020      }
2021  
2022      set_socket_fds(write_socket_fd);
2023  
2024      putbe32(temp_buffer, (uint32_t) serial_number);
2025      putbe32(temp_buffer + 4, REMOTE_CMS_CLEAR_REQUEST_TYPE);
2026      putbe32(temp_buffer + 8, buffer_number);
2027      putbe32(temp_buffer + 12, current_subdivision);
2028  
2029      if (sendn(socket_fd, temp_buffer, 20, 0, timeout) < 0) {
2030  	reconnect_needed = 1;
2031  	return (status = CMS_MISC_ERROR);
2032      }
2033      serial_number++;
2034      if (recvn(socket_fd, temp_buffer, 8, 0, timeout, &recvd_bytes) < 0) {
2035  	if (recvn_timedout) {
2036  	    timedout_request = REMOTE_CMS_CLEAR_REQUEST_TYPE;
2037  	    consecutive_timeouts = 1;
2038  	    return (status = CMS_TIMED_OUT);
2039  	} else {
2040  	    fatal_error_occurred = 1;
2041  	    reconnect_needed = 1;
2042  	    return (status = CMS_MISC_ERROR);
2043  	}
2044      }
2045      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
2046      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
2047  	"TCPMEM received_reply: fd = %d, serial_number=%ld, buffer_number=%ld\n",
2048  	socket_fd, returned_serial_number, buffer_number);
2049  
2050      if (returned_serial_number != serial_number) {
2051  	rcs_print_error
2052  	    ("TCPMEM: Returned serial number(%ld) does not match expected serial number(%ld).\n",
2053  	    returned_serial_number, serial_number);
2054  	reconnect_needed = 1;
2055  	return (status = CMS_MISC_ERROR);
2056      }
2057      status = (CMS_STATUS) ntohl(*((uint32_t *) temp_buffer + 1));
2058      header.was_read = ntohl(*((uint32_t *) temp_buffer + 2));
2059      return (status);
2060  }
2061  /*! \todo Another #if 0 */
2062  #if 0
2063  int TCPMEM::login(const char *name, const char *passwd)
2064  {
2065      if (fatal_error_occurred) {
2066  	if (status >= 0) {
2067  	    status = CMS_MISC_ERROR;
2068  	}
2069  	return (status);
2070      }
2071      if (socket_fd <= 0) {
2072  	rcs_print_error("TCPMEM::write: Invalid socket descriptor. (%d)\n",
2073  	    socket_fd);
2074  	return (status = CMS_MISC_ERROR);
2075      }
2076      int handle_old_reply_ret = 0;
2077  
2078      while (timedout_request != NO_REMOTE_CMS_REQUEST && !handle_old_reply_ret) {
2079  	handle_old_reply_ret = handle_old_replies();
2080      }
2081      if (handle_old_reply_ret < 0) {
2082  	return 0;
2083      }
2084      set_socket_fds(write_socket_fd);
2085      *((uint32_t *) temp_buffer) = htonl((uint32_t) serial_number);
2086      *((uint32_t *) temp_buffer + 1) =
2087  	htonl((uint32_t) REMOTE_CMS_GET_KEYS_REQUEST_TYPE);
2088      *((uint32_t *) temp_buffer + 2) = htonl((uint32_t) buffer_number);
2089      if (sendn(socket_fd, temp_buffer, 20, 0, 30.0) < 0) {
2090  	return 0;
2091      }
2092      memset(temp_buffer, 0, 20);
2093      strncpy(((char *) temp_buffer), name, 16);
2094      if (sendn(socket_fd, temp_buffer, 16, 0, 30.0) < 0) {
2095  	return (status = CMS_MISC_ERROR);
2096      }
2097      serial_number++;
2098      if (recvn(socket_fd, temp_buffer, 20, 0, 30.0, &recvd_bytes) < 0) {
2099  	return 0;
2100      }
2101      recvd_bytes = 0;
2102      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
2103      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
2104  	"TCPMEM received_reply: fd = %d, serial_number=%d, buffer_number=%d\n",
2105  	socket_fd, returned_serial_number, buffer_number);
2106      if (returned_serial_number != serial_number) {
2107  	rcs_print_error
2108  	    ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n",
2109  	    returned_serial_number, serial_number);
2110  	return (0);
2111      }
2112      char *crypt1_ret = crypt(passwd, ((char *) temp_buffer) + 4);
2113      if (NULL == crypt1_ret) {
2114  	rcs_print_error("TCPMEM::login() crypt function failed.\n");
2115  	return 0;
2116      }
2117      char passwd_pass1[16];
2118      strncpy(passwd_pass1, crypt1_ret, 16);
2119      char *crypt2_ret = crypt(passwd_pass1, ((char *) temp_buffer) + 12);
2120      if (NULL == crypt2_ret) {
2121  	rcs_print_error("TCPMEM::login() crypt function failed.\n");
2122  	return (0);
2123      }
2124      char passwd_pass2[16];
2125      strncpy(passwd_pass2, crypt2_ret, 16);
2126  
2127      *((uint32_t *) temp_buffer) = htonl((uint32_t) serial_number);
2128      *((uint32_t *) temp_buffer + 1) =
2129  	htonl((uint32_t) REMOTE_CMS_LOGIN_REQUEST_TYPE);
2130      *((uint32_t *) temp_buffer + 2) = htonl((uint32_t) buffer_number);
2131      if (sendn(socket_fd, temp_buffer, 20, 0, 30.0) < 0) {
2132  	return 0;
2133      }
2134      memset(temp_buffer, 0, 20);
2135      strncpy(((char *) temp_buffer), name, 16);
2136      if (sendn(socket_fd, temp_buffer, 16, 0, 30.0) < 0) {
2137  	return (status = CMS_MISC_ERROR);
2138      }
2139      if (sendn(socket_fd, passwd_pass2, 16, 0, 30.0) < 0) {
2140  	return (status = CMS_MISC_ERROR);
2141      }
2142      serial_number++;
2143      if (recvn(socket_fd, temp_buffer, 8, 0, 30.0, &recvd_bytes) < 0) {
2144  	return 0;
2145      }
2146      recvd_bytes = 0;
2147      returned_serial_number = (CMS_STATUS) getbe32(temp_buffer);
2148      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
2149  	"TCPMEM received_reply: fd = %d, serial_number=%d, buffer_number=%d\n",
2150  	socket_fd, returned_serial_number, buffer_number);
2151      if (returned_serial_number != serial_number) {
2152  	rcs_print_error
2153  	    ("TCPMEM: Returned serial number(%d) does not match expected serial number(%d).\n",
2154  	    returned_serial_number, serial_number);
2155  	return (status = CMS_MISC_ERROR);
2156      }
2157      int success = ntohl(*((uint32_t *) temp_buffer + 1));
2158      return (success);
2159  }
2160  #endif
2161  
2162  void TCPMEM::set_socket_fds(int new_fd)
2163  {
2164      if (socket_fd == read_socket_fd) {
2165  	read_serial_number = serial_number;
2166      }
2167      if (socket_fd == write_socket_fd) {
2168  	write_serial_number = serial_number;
2169      }
2170      socket_fd = new_fd;
2171      if (socket_fd == read_socket_fd) {
2172  	serial_number = read_serial_number;
2173      }
2174      if (socket_fd == write_socket_fd) {
2175  	serial_number = write_serial_number;
2176      }
2177  }