/ src / libnml / cms / tcp_srv.cc
tcp_srv.cc
   1  /********************************************************************
   2  * Description: tcp_srv.cc
   3  *
   4  *   Derived from a work by Fred Proctor & Will Shackleford
   5  *
   6  * Author:
   7  * License: LGPL Version 2
   8  * System: Linux
   9  *    
  10  * Copyright (c) 2004 All rights reserved.
  11  *
  12  * Last change: 
  13  ********************************************************************/
  14  /****************************************************************************
  15  * File: tcp_srv.cc
  16  * Purpose: Provides the functions for the class CMS_SERVER_REMOTE_TCP_PORT
  17  *  which provides TCP specific overrides of the CMS_SERVER_REMOTE_PORT class.
  18  ****************************************************************************/
  19  
  20  #if defined(__GNUC__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 4)))
  21  #pragma GCC optimize "-fno-strict-aliasing"
  22  #pragma GCC diagnostic ignored "-Wstrict-aliasing"
  23  #endif
  24  
  25  #ifdef __cplusplus
  26  extern "C" {
  27  #endif
  28  
  29  #include <string.h>		/* memset(), strerror() */
  30  #include <stdlib.h>		// malloc(), free()
  31  #include <unistd.h>
  32  #include <sys/socket.h>
  33  #include <sys/ioctl.h>
  34  #include <errno.h>		/* errno */
  35  #include <signal.h>		// SIGPIPE, signal()
  36  
  37  #ifdef __cplusplus
  38  }
  39  #endif
  40  
  41  #include <sys/types.h>
  42  #include <sys/wait.h>		// waitpid
  43  
  44  #include <arpa/inet.h>		/* inet_ntoa */
  45  #include "cms.hh"		/* class CMS */
  46  #include "nml.hh"		// class NML
  47  #include "tcp_srv.hh"		/* class CMS_SERVER_REMOTE_TCP_PORT */
  48  #include "rcs_print.hh"		/* rcs_print_error() */
  49  #include "linklist.hh"		/* class LinkedList */
  50  #include "tcp_opts.hh"		/* SET_TCP_NODELAY */
  51  #include "timer.hh"		// esleep()
  52  #include "_timer.h"
  53  #include "cmsdiag.hh"		// class CMS_DIAGNOSTICS_INFO
  54  extern "C" {
  55  #include "recvn.h"		/* recvn() */
  56  #include "sendn.h"		/* sendn() */
  57  }
  58  #include "physmem.hh"           // PHYSMEM_HANDLE
  59  
  60  int tcpsvr_threads_created = 0;
  61  int tcpsvr_threads_killed = 0;
  62  int tcpsvr_threads_exited = 0;
  63  int tcpsvr_threads_returned_early = 0;
  64  
  65  TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST()
  66  {
  67      access_type = CMS_READ_ACCESS;	/* read or just peek */
  68      last_id_read = 0;		/* The server can compare with id from buffer 
  69  				 */
  70      /* to determine if the buffer is new */
  71      /* to this client */
  72      timeout_millis = -1;	/* Milliseconds for blocking_timeout or -1 to 
  73  				   wait forever */
  74      _client_tcp_port = NULL;
  75      remport = NULL;
  76      server = NULL;
  77      _nml = NULL;
  78      _reply = NULL;
  79      _data = NULL;
  80      read_reply = NULL;
  81  }
  82  
  83  static inline double tcp_svr_reverse_double(double in)
  84  {
  85      double out;
  86      char *c1, *c2;
  87  
  88      c1 = ((char *) &in) + 7;
  89      c2 = (char *) &out;
  90      for (int i = 0; i < 8; i++) {
  91  	*c2 = *c1;
  92  	c1--;
  93  	c2++;
  94      }
  95      return out;
  96  }
  97  
  98  TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST()
  99  {
 100      if (NULL != _nml) {
 101  	NML *nmlcopy = (NML *) _nml;
 102  	_nml = NULL;
 103  	delete nmlcopy;
 104      }
 105      if (NULL != _data) {
 106  	void *_datacopy = _data;
 107  	if (NULL != read_reply) {
 108  	    if (_data == read_reply->data) {
 109  		read_reply->data = NULL;
 110  	    }
 111  	}
 112  	_data = NULL;
 113  	free(_datacopy);
 114      }
 115      if (NULL != _reply) {
 116  	free(_reply);
 117  	_reply = NULL;
 118  	read_reply = NULL;
 119      }
 120      if (NULL != read_reply) {
 121  	if (NULL != read_reply->data) {
 122  	    free(read_reply->data);
 123  	    read_reply->data = NULL;
 124  	}
 125  	delete read_reply;
 126  	read_reply = NULL;
 127      }
 128  }
 129  
 130  CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT(CMS_SERVER * _cms_server):
 131  CMS_SERVER_REMOTE_PORT(_cms_server)
 132  {
 133      client_ports = (LinkedList *) NULL;
 134      connection_socket = 0;
 135      connection_port = 0;
 136      maxfdpl = 0;
 137      dtimeout = 20.0;
 138  
 139      memset(&server_socket_address, 0, sizeof(server_socket_address));
 140      server_socket_address.sin_family = AF_INET;
 141      server_socket_address.sin_addr.s_addr = htonl(INADDR_ANY);
 142      server_socket_address.sin_port = 0;
 143  
 144      client_ports = new LinkedList;
 145      if (NULL == client_ports) {
 146  	rcs_print_error("Can not create linked list for client ports.\n");
 147  	return;
 148      }
 149      polling_enabled = 0;
 150      memset(&select_timeout, 0, sizeof(select_timeout));
 151      select_timeout.tv_sec = 30;
 152      select_timeout.tv_usec = 30;
 153      subscription_buffers = NULL;
 154      current_poll_interval_millis = 30000;
 155      memset(&read_fd_set, 0, sizeof(read_fd_set));
 156      memset(&write_fd_set, 0, sizeof(write_fd_set));
 157  }
 158  
 159  CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT()
 160  {
 161      if (client_ports == NULL) return;
 162      unregister_port();
 163      if (NULL != client_ports) {
 164  	delete client_ports;
 165  	client_ports = (LinkedList *) NULL;
 166      }
 167  }
 168  
 169  void blocking_thread_kill(long int id)
 170  {
 171  
 172      if (id <= 0) {
 173  	return;
 174      }
 175  #ifdef POSIX_THREADS
 176      pthread_kill(id, SIGINT);
 177      pthread_join(id, NULL);
 178  #endif
 179  #ifdef NO_THREADS
 180      kill(id, SIGINT);
 181      waitpid(id, NULL, 0);
 182  #endif
 183      tcpsvr_threads_killed++;
 184  }
 185  
 186  void CMS_SERVER_REMOTE_TCP_PORT::unregister_port()
 187  {
 188      CLIENT_TCP_PORT *client;
 189      int number_of_connected_clients = 0;
 190  
 191      client = (CLIENT_TCP_PORT *) client_ports->get_head();
 192      while (NULL != client) {
 193  	rcs_print("Exiting even though client on %s is still connected.\n",
 194  	    inet_ntoa(client->address.sin_addr));
 195  	client = (CLIENT_TCP_PORT *) client_ports->get_next();
 196  	number_of_connected_clients++;
 197      }
 198      client = (CLIENT_TCP_PORT *) client_ports->get_head();
 199      while (NULL != client) {
 200  	delete client;
 201  	client_ports->delete_current_node();
 202  	client = (CLIENT_TCP_PORT *) client_ports->get_next();
 203      }
 204      if (NULL != subscription_buffers) {
 205  	TCP_BUFFER_SUBSCRIPTION_INFO *sub_info =
 206  	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();
 207  	while (NULL != sub_info) {
 208  	    delete sub_info;
 209  	    sub_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)
 210  		subscription_buffers->get_next();
 211  	}
 212  	delete subscription_buffers;
 213  	subscription_buffers = NULL;
 214      }
 215      if (number_of_connected_clients > 0) {
 216  	esleep(2.0);
 217      }
 218      if (connection_socket > 0) {
 219  	close(connection_socket);
 220  	connection_socket = 0;
 221      }
 222  }
 223  
 224  int CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms(CMS * _cms)
 225  {
 226      if (NULL == _cms) {
 227  	return 0;
 228      }
 229      if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE) {
 230  	return 0;
 231      }
 232      if (NULL != _cms) {
 233  	if (min_compatible_version < 1e-6 ||
 234  	    (min_compatible_version > _cms->min_compatible_version &&
 235  		_cms->min_compatible_version > 1e-6)) {
 236  	    min_compatible_version = _cms->min_compatible_version;
 237  	}
 238  	if (_cms->confirm_write) {
 239  	    confirm_write = _cms->confirm_write;
 240  	}
 241      }
 242      if (_cms->total_subdivisions > max_total_subdivisions) {
 243  	max_total_subdivisions = _cms->total_subdivisions;
 244      }
 245      if (server_socket_address.sin_port == 0) {
 246  	server_socket_address.sin_port =
 247  	    htons(((u_short) _cms->tcp_port_number));
 248  	port_num = _cms->tcp_port_number;
 249  	return 1;
 250      }
 251      if (server_socket_address.sin_port ==
 252  	htons(((u_short) _cms->tcp_port_number))) {
 253  	port_num = _cms->tcp_port_number;
 254  	return 1;
 255      }
 256      return 0;
 257  }
 258  
 259  void CMS_SERVER_REMOTE_TCP_PORT::register_port()
 260  {
 261      port_registered = 0;
 262      rcs_print_debug(PRINT_CMS_CONFIG_INFO,
 263  	"Registering server on TCP port %d.\n",
 264  	ntohs(server_socket_address.sin_port));
 265      if (server_socket_address.sin_port == 0) {
 266  	rcs_print_error("server can not register on port number 0.\n");
 267  	return;
 268      }
 269      if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 270  	rcs_print_error("socket error: %d -- %s\n", errno, strerror(errno));
 271  	rcs_print_error("Server can not open stream socket.\n");
 272  	return;
 273      }
 274  
 275      if (set_tcp_socket_options(connection_socket) < 0) {
 276  	return;
 277      }
 278      if (bind(connection_socket, (struct sockaddr *) &server_socket_address,
 279  	    sizeof(server_socket_address)) < 0) {
 280  	rcs_print_error("bind error: %d -- %s\n", errno, strerror(errno));
 281  	rcs_print_error
 282  	    ("Server can not bind the connection socket on port %d.\n",
 283  	    ntohs(server_socket_address.sin_port));
 284  	return;
 285      }
 286      if (listen(connection_socket, 5) < 0) {
 287  	rcs_print_error("listen error: %d -- %s\n", errno, strerror(errno));
 288  	rcs_print_error("TCP Server: error on call to listen for port %d.\n",
 289  	    ntohs(server_socket_address.sin_port));
 290  	return;
 291      }
 292      port_registered = 1;
 293  
 294  }
 295  
 296  static int last_pipe_signum = 0;
 297  
 298  static void handle_pipe_error(int signum)
 299  {
 300      last_pipe_signum = signum;
 301      rcs_print_error("SIGPIPE intercepted.\n");
 302  }
 303  
 304  void CMS_SERVER_REMOTE_TCP_PORT::run()
 305  {
 306      int bytes_ready;
 307      int ready_descriptors;
 308      if (NULL == client_ports) {
 309  	rcs_print_error("CMS_SERVER: List of client ports is NULL.\n");
 310  	return;
 311      }
 312      CLIENT_TCP_PORT *new_client_port, *client_port_to_check;
 313      FD_ZERO(&read_fd_set);
 314      FD_ZERO(&write_fd_set);
 315      FD_SET(connection_socket, &read_fd_set);
 316      maxfdpl = connection_socket + 1;
 317      signal(SIGPIPE, handle_pipe_error);
 318      rcs_print_debug(PRINT_CMS_CONFIG_INFO,
 319  	"running server for TCP port %d (connection_socket = %d).\n",
 320  	ntohs(server_socket_address.sin_port), connection_socket);
 321  
 322      cms_server_count++;
 323      fd_set read_fd_set_copy, write_fd_set_copy;
 324      FD_ZERO(&read_fd_set_copy);
 325      FD_ZERO(&write_fd_set_copy);
 326      FD_SET(connection_socket, &read_fd_set_copy);
 327  
 328      while (1) {
 329  	if (polling_enabled) {
 330  	    memcpy(&read_fd_set_copy, &read_fd_set, sizeof(fd_set));
 331  	    memcpy(&write_fd_set_copy, &write_fd_set, sizeof(fd_set));
 332  	    select_timeout.tv_sec = current_poll_interval_millis / 1000;
 333  	    select_timeout.tv_usec =
 334  		(current_poll_interval_millis % 1000) * 1000;
 335  	    ready_descriptors =
 336  		select(maxfdpl, &read_fd_set, &write_fd_set,
 337  		(fd_set *) NULL, (timeval *) & select_timeout);
 338  	    if (ready_descriptors == 0) {
 339  		update_subscriptions();
 340  		memcpy(&read_fd_set, &read_fd_set_copy, sizeof(fd_set));
 341  		memcpy(&write_fd_set, &write_fd_set_copy, sizeof(fd_set));
 342  		continue;
 343  	    }
 344  	} else {
 345  	    ready_descriptors =
 346  		select(maxfdpl, &read_fd_set, &write_fd_set,
 347  		(fd_set *) NULL, (timeval *) NULL);
 348  
 349  	}
 350  	if (ready_descriptors < 0) {
 351  	    rcs_print_error("server: select error.(errno = %d | %s)\n",
 352  		errno, strerror(errno));
 353  	}
 354  	if (NULL == client_ports) {
 355  	    rcs_print_error("CMS_SERVER: List of client ports is NULL.\n");
 356  	    return;
 357  	}
 358  	client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head();
 359  	while (NULL != client_port_to_check) {
 360  	    if (FD_ISSET(client_port_to_check->socket_fd, &read_fd_set)) {
 361  		ioctl(client_port_to_check->socket_fd, FIONREAD,
 362  		    (caddr_t) & bytes_ready);
 363  		if (bytes_ready <= 0) {
 364  		    rcs_print_debug(PRINT_SOCKET_CONNECT,
 365  			"Socket closed by host with IP address %s.\n",
 366  			inet_ntoa(client_port_to_check->address.sin_addr));
 367  		    if (NULL != client_port_to_check->subscriptions) {
 368  			TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info =
 369  			    (TCP_CLIENT_SUBSCRIPTION_INFO *)
 370  			    client_port_to_check->subscriptions->get_head();
 371  			while (NULL != clnt_sub_info) {
 372  			    if (NULL != clnt_sub_info->sub_buf_info &&
 373  				clnt_sub_info->subscription_list_id >= 0) {
 374  				if (NULL !=
 375  				    clnt_sub_info->sub_buf_info->
 376  				    sub_clnt_info) {
 377  				    clnt_sub_info->sub_buf_info->
 378  					sub_clnt_info->
 379  					delete_node(clnt_sub_info->
 380  					subscription_list_id);
 381  				    if (clnt_sub_info->sub_buf_info->
 382  					sub_clnt_info->list_size < 1) {
 383  					delete clnt_sub_info->sub_buf_info->
 384  					    sub_clnt_info;
 385  					clnt_sub_info->sub_buf_info->
 386  					    sub_clnt_info = NULL;
 387  					if (NULL != subscription_buffers
 388  					    && clnt_sub_info->sub_buf_info->
 389  					    list_id >= 0) {
 390  					    subscription_buffers->
 391  						delete_node(clnt_sub_info->
 392  						sub_buf_info->list_id);
 393  					    delete clnt_sub_info->
 394  						sub_buf_info;
 395  					    clnt_sub_info->sub_buf_info =
 396  						NULL;
 397  					}
 398  				    }
 399  				    clnt_sub_info->sub_buf_info = NULL;
 400  				}
 401  				delete clnt_sub_info;
 402  				clnt_sub_info =
 403  				    (TCP_CLIENT_SUBSCRIPTION_INFO *)
 404  				    client_port_to_check->subscriptions->
 405  				    get_next();
 406  			    }
 407  			    delete client_port_to_check->subscriptions;
 408  			    client_port_to_check->subscriptions = NULL;
 409  			    recalculate_polling_interval();
 410  			}
 411  		    }
 412  		    if (client_port_to_check->threadId > 0
 413  			&& client_port_to_check->blocking) {
 414  			blocking_thread_kill(client_port_to_check->threadId);
 415  		    }
 416  		    close(client_port_to_check->socket_fd);
 417  		    FD_CLR(client_port_to_check->socket_fd, &read_fd_set);
 418  		    client_port_to_check->socket_fd = -1;
 419  		    delete client_port_to_check;
 420  		    client_ports->delete_current_node();
 421  		} else {
 422  		    if (client_port_to_check->blocking) {
 423  			if (client_port_to_check->threadId > 0) {
 424  			    rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY,
 425  				"Data received from %s:%d when it should be blocking (bytes_ready=%d).\n",
 426  				inet_ntoa
 427  				(client_port_to_check->address.
 428  				    sin_addr),
 429  				client_port_to_check->socket_fd, bytes_ready);
 430  			    rcs_print_debug(PRINT_SERVER_THREAD_ACTIVITY,
 431  				"Killing handler %d.\n",
 432  				client_port_to_check->threadId);
 433  
 434  			    blocking_thread_kill
 435  				(client_port_to_check->threadId);
 436  #if 0
 437  			    *((uint32_t *) temp_buffer) =
 438  				htonl(client_port_to_check->serial_number);
 439  			    *((uint32_t *) temp_buffer + 1) =
 440  				htonl((unsigned long)
 441  				CMS_SERVER_SIDE_ERROR);
 442  			    putbe32(temp_buffer + 8, 0);	/* size
 443  									 */
 444  			    putbe32(temp_buffer + 12, 0);	/* write_id
 445  									 */
 446  			    putbe32(temp_buffer + 16, 0);	/* was_read
 447  									 */
 448  			    sendn(client_port_to_check->socket_fd,
 449  				temp_buffer, 20, 0, dtimeout);
 450  #endif
 451  			    client_port_to_check->threadId = 0;
 452  			    client_port_to_check->blocking = 0;
 453  			}
 454  		    }
 455  		    handle_request(client_port_to_check);
 456  		}
 457  		ready_descriptors--;
 458  	    } else {
 459  		FD_SET(client_port_to_check->socket_fd, &read_fd_set);
 460  	    }
 461  	    client_port_to_check =
 462  		(CLIENT_TCP_PORT *) client_ports->get_next();
 463  	}
 464  	if (FD_ISSET(connection_socket, &read_fd_set)
 465  	    && ready_descriptors > 0) {
 466  	    ready_descriptors--;
 467  	    socklen_t client_address_length;
 468  	    new_client_port = new CLIENT_TCP_PORT();
 469  	    client_address_length = sizeof(new_client_port->address);
 470  	    new_client_port->socket_fd = accept(connection_socket,
 471  		(struct sockaddr *)
 472  		&new_client_port->address, &client_address_length);
 473  	    current_clients++;
 474  	    if (current_clients > max_clients) {
 475  		max_clients = current_clients;
 476  	    }
 477  	    if (new_client_port->socket_fd < 0) {
 478  		rcs_print_error("server: accept error -- %d %s \n", errno,
 479  		    strerror(errno));
 480  		continue;
 481  	    }
 482  	    rcs_print_debug(PRINT_SOCKET_CONNECT,
 483  		"Socket opened by host with IP address %s.\n",
 484  		inet_ntoa(new_client_port->address.sin_addr));
 485  	    new_client_port->serial_number = 0;
 486  	    new_client_port->blocking = 0;
 487  	    if (NULL != client_ports) {
 488  		client_ports->store_at_tail(new_client_port,
 489  		    sizeof(new_client_port), 0);
 490  	    }
 491  	    if (maxfdpl < new_client_port->socket_fd + 1) {
 492  		maxfdpl = new_client_port->socket_fd + 1;
 493  	    }
 494  	    FD_SET(new_client_port->socket_fd, &read_fd_set);
 495  	} else {
 496  	    FD_SET(connection_socket, &read_fd_set);
 497  	}
 498  	if (0 != ready_descriptors) {
 499  	    rcs_print_error("%d descriptors ready but not serviced.\n",
 500  		ready_descriptors);
 501  	}
 502  	update_subscriptions();
 503      }
 504  }
 505  
 506  static int tcpsvr_handle_blocking_request_sigint_count = 0;
 507  static int tcpsvr_last_sig = 0;
 508  
 509  void tcpsvr_handle_blocking_request_sigint_handler(int sig)
 510  {
 511      tcpsvr_last_sig = sig;
 512      tcpsvr_handle_blocking_request_sigint_count++;
 513  }
 514  
 515  static void putbe32(char *addr, uint32_t val) {
 516      val = htonl(val);
 517      memcpy(addr, &val, sizeof(val));
 518  }
 519  
 520  static uint32_t getbe32(char *addr) {
 521      uint32_t val;
 522      memcpy(&val, addr, sizeof(val));
 523      return ntohl(val);
 524  }
 525  
 526  #if defined(POSIX_THREADS) || defined(NO_THREADS)
 527  void *tcpsvr_handle_blocking_request(void *_req)
 528  {
 529      signal(SIGINT, tcpsvr_handle_blocking_request_sigint_handler);
 530      TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req =
 531  	(TCPSVR_BLOCKING_READ_REQUEST *) _req;
 532      char temp_buffer[0x2000];
 533      if (_req == NULL) {
 534  	tcpsvr_threads_returned_early++;
 535  	return 0;
 536      }
 537      double dtimeout =
 538  	((double) (blocking_read_req->timeout_millis + 10)) / 1000.0;
 539      if (dtimeout < 0) {
 540  	dtimeout = 600.0;
 541      }
 542      if (dtimeout < 0.5) {
 543  	dtimeout = 0.5;
 544      }
 545      if (dtimeout > 600.0) {
 546  	dtimeout = 600.0;
 547      }
 548      CLIENT_TCP_PORT *_client_tcp_port = blocking_read_req->_client_tcp_port;
 549      CMS_SERVER *server = blocking_read_req->server;
 550  
 551      if (NULL == server || NULL == _client_tcp_port) {
 552  	tcpsvr_threads_returned_early++;
 553  	return 0;
 554      }
 555      memset(temp_buffer, 0, 0x2000);
 556      REMOTE_BLOCKING_READ_REPLY *read_reply;
 557  
 558      if (NULL != _client_tcp_port->diag_info) {
 559  	_client_tcp_port->diag_info->buffer_number =
 560  	    blocking_read_req->buffer_number;
 561  	server->set_diag_info(_client_tcp_port->diag_info);
 562      } else if (server->diag_enabled) {
 563  	server->reset_diag_info(blocking_read_req->buffer_number);
 564      }
 565  
 566      read_reply = (REMOTE_BLOCKING_READ_REPLY *)
 567  	server->process_request(blocking_read_req);
 568      blocking_read_req->read_reply = read_reply;
 569      if (NULL == read_reply) {
 570  	_client_tcp_port->blocking = 0;
 571  	rcs_print_error("Server could not process request.\n");
 572  	putbe32(temp_buffer, _client_tcp_port->serial_number);
 573  	putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
 574  	putbe32(temp_buffer + 8, 0);	/* size */
 575  	putbe32(temp_buffer + 12, 0)	/* write_id */;
 576  	putbe32(temp_buffer + 16, 0)	/* was_read */;
 577  	sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
 578  	_client_tcp_port->errors++;
 579  	_client_tcp_port->blocking_read_req = NULL;
 580  	delete blocking_read_req;
 581  	_client_tcp_port->threadId = 0;
 582  	tcpsvr_threads_returned_early++;
 583  	return 0;
 584      }
 585      putbe32(temp_buffer, _client_tcp_port->serial_number);
 586      putbe32(temp_buffer + 4, read_reply->status);
 587      putbe32(temp_buffer + 8, read_reply->size);
 588      putbe32(temp_buffer + 12, read_reply->write_id);
 589      putbe32(temp_buffer + 16, read_reply->was_read);
 590      if (read_reply->size < (0x2000 - 20) && read_reply->size > 0) {
 591  	memcpy(temp_buffer + 20, read_reply->data, read_reply->size);
 592  	_client_tcp_port->blocking = 0;
 593  	if (sendn
 594  	    (_client_tcp_port->socket_fd, temp_buffer, 20 + read_reply->size,
 595  		0, dtimeout) < 0) {
 596  	    _client_tcp_port->blocking = 0;
 597  	    _client_tcp_port->errors++;
 598  	    _client_tcp_port->blocking_read_req = NULL;
 599  	    delete blocking_read_req;
 600  	    _client_tcp_port->threadId = 0;
 601  	    tcpsvr_threads_returned_early++;
 602  	    return 0;
 603  	}
 604      } else {
 605  	_client_tcp_port->blocking = 0;
 606  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) <
 607  	    0) {
 608  	    _client_tcp_port->blocking = 0;
 609  	    _client_tcp_port->errors++;
 610  	    _client_tcp_port->blocking_read_req = NULL;
 611  	    delete blocking_read_req;
 612  	    _client_tcp_port->threadId = 0;
 613  	    tcpsvr_threads_returned_early++;
 614  	    return 0;
 615  	}
 616  	if (read_reply->size > 0) {
 617  	    if (sendn
 618  		(_client_tcp_port->socket_fd, read_reply->data,
 619  		    read_reply->size, 0, dtimeout) < 0) {
 620  		_client_tcp_port->blocking = 0;
 621  		_client_tcp_port->errors++;
 622  		_client_tcp_port->blocking_read_req = NULL;
 623  		delete blocking_read_req;
 624  		_client_tcp_port->threadId = 0;
 625  		tcpsvr_threads_returned_early++;
 626  		return 0;
 627  	    }
 628  	}
 629      }
 630      _client_tcp_port->blocking_read_req = NULL;
 631      delete blocking_read_req;
 632      _client_tcp_port->threadId = 0;
 633      tcpsvr_threads_exited++;
 634  #ifdef POSIX_THREADS
 635      pthread_exit(0);
 636  #endif
 637  #ifdef NO_THREADS
 638      exit(0);
 639  #endif
 640  }
 641  
 642  #endif
 643  
 644  void CMS_SERVER_REMOTE_TCP_PORT::handle_request(CLIENT_TCP_PORT *
 645      _client_tcp_port)
 646  {
 647      CLIENT_TCP_PORT *client_port_to_check = NULL;
 648      pid_t pid = getpid();
 649      pid_t tid = 0;
 650      CMS_SERVER *server;
 651      server = find_server(pid, tid);
 652      if (NULL == server) {
 653  	rcs_print_error
 654  	    ("CMS_SERVER_REMOTE_TCP_PORT::handle_request() Cannot find server object for pid = %d.\n",
 655  	    pid);
 656  	return;
 657      }
 658  
 659      if (server->using_passwd_file) {
 660  	current_user_info = get_connected_user(_client_tcp_port->socket_fd);
 661      }
 662  
 663      if (_client_tcp_port->errors >= _client_tcp_port->max_errors) {
 664  	rcs_print_error("Too many errors - closing connection(%d)\n",
 665  	    _client_tcp_port->socket_fd);
 666  	client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head();
 667  	while (NULL != client_port_to_check) {
 668  	    if (client_port_to_check->socket_fd ==
 669  		_client_tcp_port->socket_fd) {
 670  		delete client_port_to_check;
 671  		client_ports->delete_current_node();
 672  	    }
 673  	    client_port_to_check =
 674  		(CLIENT_TCP_PORT *) client_ports->get_next();
 675  	}
 676  	close(_client_tcp_port->socket_fd);
 677  	current_clients--;
 678  	FD_CLR(_client_tcp_port->socket_fd, &read_fd_set);
 679  	_client_tcp_port->socket_fd = -1;
 680      }
 681  
 682      if (recvn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, -1, NULL) < 0) {
 683  	rcs_print_error("Can not read from client port (%d) from %s\n",
 684  	    _client_tcp_port->socket_fd,
 685  	    inet_ntoa(_client_tcp_port->address.sin_addr));
 686  	_client_tcp_port->errors++;
 687  	return;
 688      }
 689      long request_type, buffer_number, received_serial_number;
 690      received_serial_number = getbe32(temp_buffer);
 691      if (received_serial_number != _client_tcp_port->serial_number) {
 692  	rcs_print_error
 693  	    ("received_serial_number (%ld) does not equal expected serial number.(%ld)\n",
 694  	    received_serial_number, _client_tcp_port->serial_number);
 695  	_client_tcp_port->serial_number = received_serial_number;
 696  	_client_tcp_port->errors++;
 697      }
 698      _client_tcp_port->serial_number++;
 699      request_type = ntohl(*((uint32_t *) temp_buffer + 1));
 700      buffer_number = ntohl(*((uint32_t *) temp_buffer + 2));
 701  
 702      rcs_print_debug(PRINT_ALL_SOCKET_REQUESTS,
 703  	"TCPSVR request received: fd = %d, serial_number=%ld, request_type=%ld, buffer_number=%ld\n",
 704  	_client_tcp_port->socket_fd,
 705  	_client_tcp_port->serial_number, request_type, buffer_number);
 706  
 707      if (NULL != _client_tcp_port->diag_info) {
 708  	_client_tcp_port->diag_info->buffer_number = buffer_number;
 709  	server->set_diag_info(_client_tcp_port->diag_info);
 710      } else if (server->diag_enabled) {
 711  	server->reset_diag_info(buffer_number);
 712      }
 713  
 714      switch_function(_client_tcp_port,
 715  	server, request_type, buffer_number, received_serial_number);
 716  
 717      if (NULL != _client_tcp_port->diag_info &&
 718  	NULL != server->last_local_port_used && server->diag_enabled) {
 719  	if (NULL != server->last_local_port_used->cms) {
 720  	    if (NULL !=
 721  		server->last_local_port_used->cms->handle_to_global_data) {
 722  		_client_tcp_port->diag_info->bytes_moved =
 723  		    server->last_local_port_used->cms->handle_to_global_data->
 724  		    total_bytes_moved;
 725  	    }
 726  	}
 727      }
 728  }
 729  
 730  void CMS_SERVER_REMOTE_TCP_PORT::switch_function(CLIENT_TCP_PORT *
 731      _client_tcp_port,
 732      CMS_SERVER * server,
 733      long request_type, long buffer_number, long received_serial_number)
 734  {
 735      int total_subdivisions = 1;
 736      CLIENT_TCP_PORT *client_port_to_check = NULL;
 737      switch (request_type) {
 738      case REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE:
 739  	{
 740  	    if (NULL == _client_tcp_port->diag_info) {
 741  		_client_tcp_port->diag_info =
 742  		    new REMOTE_SET_DIAG_INFO_REQUEST();
 743  	    }
 744  	    if (recvn
 745  		(_client_tcp_port->socket_fd, server->set_diag_info_buf, 68,
 746  		    0, -1, NULL) < 0) {
 747  		rcs_print_error
 748  		    ("Can not read from client port (%d) from %s\n",
 749  		    _client_tcp_port->socket_fd,
 750  		    inet_ntoa(_client_tcp_port->address.sin_addr));
 751  		_client_tcp_port->errors++;
 752  		return;
 753  	    }
 754  	    _client_tcp_port->diag_info->bytes_moved = 0.0;
 755  	    _client_tcp_port->diag_info->buffer_number = buffer_number;
 756  	    memcpy(_client_tcp_port->diag_info->process_name,
 757  		server->set_diag_info_buf, 16);
 758  	    memcpy(_client_tcp_port->diag_info->host_sysinfo,
 759  		server->set_diag_info_buf + 16, 32);
 760  	    _client_tcp_port->diag_info->pid =
 761  		htonl(*((uint32_t *) (server->set_diag_info_buf + 48)));
 762  	    _client_tcp_port->diag_info->c_num =
 763  		htonl(*((uint32_t *) (server->set_diag_info_buf + 52)));
 764  	    memcpy(&(_client_tcp_port->diag_info->rcslib_ver),
 765  		server->set_diag_info_buf + 56, 8);
 766  	    _client_tcp_port->diag_info->reverse_flag =
 767  		*((int *) ((char *) server->set_diag_info_buf + 64));
 768  	    if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) {
 769  		_client_tcp_port->diag_info->rcslib_ver =
 770  		    (double) tcp_svr_reverse_double((double)
 771  		    _client_tcp_port->diag_info->rcslib_ver);
 772  	    }
 773  	}
 774  	break;
 775  
 776      case REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE:
 777  	{
 778  	    REMOTE_GET_DIAG_INFO_REQUEST diagreq;
 779  	    diagreq.buffer_number = buffer_number;
 780  	    REMOTE_GET_DIAG_INFO_REPLY *diagreply = NULL;
 781  	    diagreply =
 782  		(REMOTE_GET_DIAG_INFO_REPLY *) server->
 783  		process_request(&diagreq);
 784  	    if (NULL == diagreply) {
 785  		putbe32(temp_buffer, _client_tcp_port->serial_number);
 786  		putbe32(temp_buffer+4, CMS_SERVER_SIDE_ERROR);
 787  		if (sendn
 788  		    (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
 789  			dtimeout) < 0) {
 790  		    _client_tcp_port->errors++;
 791  		}
 792  		return;
 793  	    }
 794  	    if (NULL == diagreply->cdi) {
 795  		putbe32(temp_buffer, _client_tcp_port->serial_number);
 796  		putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
 797  		if (sendn
 798  		    (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
 799  			dtimeout) < 0) {
 800  		    _client_tcp_port->errors++;
 801  		}
 802  		return;
 803  	    }
 804  	    memset(temp_buffer, 0, 0x2000);
 805  	    unsigned long dpi_offset = 32;
 806  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
 807  	    putbe32(temp_buffer + 4, diagreply->status);
 808  	    putbe32(temp_buffer + 8, diagreply->cdi->last_writer);
 809  	    putbe32(temp_buffer + 12, diagreply->cdi->last_reader);
 810  	    double curtime = etime();
 811  	    double reversed_temp = 0.0;
 812  	    if (_client_tcp_port->diag_info->reverse_flag == 0x44332211) {
 813  		reversed_temp =
 814  		    (double) tcp_svr_reverse_double((double) curtime);
 815  		memcpy(temp_buffer + 16, &reversed_temp, 8);
 816  	    } else {
 817  		memcpy(temp_buffer + 16, &(curtime), 8);
 818  	    }
 819  	    int dpi_count = 0;
 820  	    if (NULL != diagreply->cdi->dpis) {
 821  		CMS_DIAG_PROC_INFO *dpi =
 822  		    (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_head();
 823  		while ((dpi_offset <
 824  			((int) 0x2000 - sizeof(CMS_DIAG_PROC_INFO)))
 825  		    && dpi != NULL) {
 826  		    dpi_count++;
 827  		    memcpy(temp_buffer + dpi_offset, dpi->name, 16);
 828  		    dpi_offset += 16;
 829  		    memcpy(temp_buffer + dpi_offset, dpi->host_sysinfo, 32);
 830  		    dpi_offset += 32;
 831  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 832  			htonl(dpi->pid);
 833  		    dpi_offset += 4;
 834  		    if (_client_tcp_port->diag_info->reverse_flag ==
 835  			0x44332211) {
 836  			reversed_temp =
 837  			    (double) tcp_svr_reverse_double((double)
 838  			    dpi->rcslib_ver);
 839  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 840  		    } else {
 841  			memcpy(temp_buffer + dpi_offset, &(dpi->rcslib_ver),
 842  			    8);
 843  		    }
 844  		    dpi_offset += 8;
 845  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 846  			htonl(dpi->access_type);
 847  		    dpi_offset += 4;
 848  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 849  			htonl(dpi->msg_id);
 850  		    dpi_offset += 4;
 851  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 852  			htonl(dpi->msg_size);
 853  		    dpi_offset += 4;
 854  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 855  			htonl(dpi->msg_type);
 856  		    dpi_offset += 4;
 857  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 858  			htonl(dpi->number_of_accesses);
 859  		    dpi_offset += 4;
 860  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 861  			htonl(dpi->number_of_new_messages);
 862  		    dpi_offset += 4;
 863  		    if (_client_tcp_port->diag_info->reverse_flag ==
 864  			0x44332211) {
 865  			reversed_temp =
 866  			    (double) tcp_svr_reverse_double((double)
 867  			    dpi->bytes_moved);
 868  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 869  		    } else {
 870  			memcpy(temp_buffer + dpi_offset, &(dpi->bytes_moved),
 871  			    8);
 872  		    }
 873  		    dpi_offset += 8;
 874  		    if (_client_tcp_port->diag_info->reverse_flag ==
 875  			0x44332211) {
 876  			reversed_temp =
 877  			    (double) tcp_svr_reverse_double((double)
 878  			    dpi->bytes_moved_across_socket);
 879  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 880  		    } else {
 881  			memcpy(temp_buffer + dpi_offset,
 882  			    &(dpi->bytes_moved_across_socket), 8);
 883  		    }
 884  		    dpi_offset += 8;
 885  		    if (_client_tcp_port->diag_info->reverse_flag ==
 886  			0x44332211) {
 887  			reversed_temp =
 888  			    (double) tcp_svr_reverse_double((double)
 889  			    dpi->last_access_time);
 890  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 891  		    } else {
 892  			memcpy(temp_buffer + dpi_offset,
 893  			    &(dpi->last_access_time), 8);
 894  		    }
 895  		    dpi_offset += 8;
 896  		    if (_client_tcp_port->diag_info->reverse_flag ==
 897  			0x44332211) {
 898  			reversed_temp =
 899  			    (double) tcp_svr_reverse_double((double)
 900  			    dpi->first_access_time);
 901  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 902  		    } else {
 903  			memcpy(temp_buffer + dpi_offset,
 904  			    &(dpi->first_access_time), 8);
 905  		    }
 906  		    dpi_offset += 8;
 907  		    if (_client_tcp_port->diag_info->reverse_flag ==
 908  			0x44332211) {
 909  			reversed_temp =
 910  			    (double) tcp_svr_reverse_double((double)
 911  			    dpi->min_difference);
 912  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 913  		    } else {
 914  			memcpy(temp_buffer + dpi_offset,
 915  			    &(dpi->min_difference), 8);
 916  		    }
 917  		    dpi_offset += 8;
 918  		    if (_client_tcp_port->diag_info->reverse_flag ==
 919  			0x44332211) {
 920  			reversed_temp =
 921  			    (double) tcp_svr_reverse_double((double)
 922  			    dpi->max_difference);
 923  			memcpy(temp_buffer + dpi_offset, &reversed_temp, 8);
 924  		    } else {
 925  			memcpy(temp_buffer + dpi_offset,
 926  			    &(dpi->max_difference), 8);
 927  		    }
 928  		    dpi_offset += 8;
 929  		    int is_last_writer =
 930  			(dpi == diagreply->cdi->last_writer_dpi);
 931  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 932  			htonl(is_last_writer);
 933  		    dpi_offset += 4;
 934  		    int is_last_reader =
 935  			(dpi == diagreply->cdi->last_reader_dpi);
 936  		    *((uint32_t *) ((char *) temp_buffer + dpi_offset)) =
 937  			htonl(is_last_reader);
 938  		    dpi_offset += 4;
 939  		    dpi =
 940  			(CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->
 941  			get_next();
 942  		}
 943  	    }
 944  	    *((uint32_t *) temp_buffer + 6) = htonl(dpi_count);
 945  	    *((uint32_t *) temp_buffer + 7) = htonl(dpi_offset);
 946  	    if (sendn
 947  		(_client_tcp_port->socket_fd, temp_buffer, dpi_offset, 0,
 948  		    dtimeout) < 0) {
 949  		_client_tcp_port->errors++;
 950  		return;
 951  	    }
 952  	}
 953  	break;
 954  
 955      case REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE:
 956  	{
 957  	    REMOTE_GET_BUF_NAME_REQUEST namereq;
 958  	    namereq.buffer_number = buffer_number;
 959  	    REMOTE_GET_BUF_NAME_REPLY *namereply = NULL;
 960  	    namereply =
 961  		(REMOTE_GET_BUF_NAME_REPLY *) server->
 962  		process_request(&namereq);
 963  	    memset(temp_buffer, 0, 40);
 964  	    if (NULL != namereply) {
 965  		putbe32(temp_buffer, _client_tcp_port->serial_number);
 966  		putbe32(temp_buffer + 4, namereply->status);
 967  		strncpy(temp_buffer + 8, namereply->name, 31);
 968  		if (sendn
 969  		    (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
 970  			dtimeout) < 0) {
 971  		    _client_tcp_port->errors++;
 972  		    return;
 973  		}
 974  	    } else {
 975  		putbe32(temp_buffer, _client_tcp_port->serial_number);
 976  		putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
 977  		if (sendn
 978  		    (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
 979  			dtimeout) < 0) {
 980  		    _client_tcp_port->errors++;
 981  		    return;
 982  		}
 983  	    }
 984  	}
 985  	break;
 986  
 987      case REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE:
 988  	{
 989  	    TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req;
 990  
 991  #ifdef NO_THREADS
 992  	    if (NULL == _client_tcp_port->blocking_read_req) {
 993  		_client_tcp_port->blocking_read_req =
 994  		    new TCPSVR_BLOCKING_READ_REQUEST();
 995  	    }
 996  	    blocking_read_req = _client_tcp_port->blocking_read_req;
 997  #else
 998  	    blocking_read_req;
 999  	    = new TCPSVR_BLOCKING_READ_REQUEST();
1000  #endif
1001  	    blocking_read_req->buffer_number = buffer_number;
1002  	    blocking_read_req->access_type =
1003  		ntohl(*((uint32_t *) temp_buffer + 3));
1004  	    blocking_read_req->last_id_read =
1005  		ntohl(*((uint32_t *) temp_buffer + 4));
1006  	    total_subdivisions = 1;
1007  	    if (max_total_subdivisions > 1) {
1008  		total_subdivisions =
1009  		    server->get_total_subdivisions(buffer_number);
1010  	    }
1011  	    if (total_subdivisions > 1) {
1012  		if (recvn
1013  		    (_client_tcp_port->socket_fd,
1014  			(char *) (((uint32_t *) temp_buffer) + 5), 8, 0, -1,
1015  			NULL) < 0) {
1016  		    rcs_print_error
1017  			("Can not read from client port (%d) from %s\n",
1018  			_client_tcp_port->socket_fd,
1019  			inet_ntoa(_client_tcp_port->address.sin_addr));
1020  		    _client_tcp_port->errors++;
1021  		    return;
1022  		}
1023  		blocking_read_req->subdiv =
1024  		    ntohl(*((uint32_t *) temp_buffer + 6));
1025  	    } else {
1026  		if (recvn
1027  		    (_client_tcp_port->socket_fd,
1028  			(char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1,
1029  			NULL) < 0) {
1030  		    rcs_print_error
1031  			("Can not read from client port (%d) from %s\n",
1032  			_client_tcp_port->socket_fd,
1033  			inet_ntoa(_client_tcp_port->address.sin_addr));
1034  		    _client_tcp_port->errors++;
1035  		    return;
1036  		}
1037  	    }
1038  	    blocking_read_req->timeout_millis =
1039  		ntohl(*((uint32_t *) temp_buffer + 5));
1040  	    blocking_read_req->server = server;
1041  	    blocking_read_req->remport = this;
1042  	    _client_tcp_port->blocking = 1;
1043  	    blocking_read_req->_client_tcp_port = _client_tcp_port;
1044  #ifdef POSIX_THREADS
1045  	    int thr_retval = pthread_create(&(_client_tcp_port->threadId),	/* ptr to new-thread-id */
1046  		NULL,		// pthread_attr_t *, ptr to attributes
1047  		tcpsvr_handle_blocking_request,	// start_func
1048  		blocking_read_req	// arg for start_func
1049  		);
1050  	    if (thr_retval != 0) {
1051  		_client_tcp_port->blocking = 0;
1052  		rcs_print_error("pthread_create error: thr_retval = %d\n",
1053  		    thr_retval);
1054  		rcs_print_error("pthread_create error: %d %s\n", errno,
1055  		    strerror(errno));
1056  		*((uint32_t *) temp_buffer) =
1057  		    htonl(_client_tcp_port->serial_number);
1058  		*((uint32_t *) temp_buffer + 1) =
1059  		    htonl((unsigned long) CMS_SERVER_SIDE_ERROR);
1060  		putbe32(temp_buffer + 8, 0);	/* size */
1061  		putbe32(temp_buffer + 12, 0);	/* write_id */
1062  		putbe32(temp_buffer + 16, 0);	/* was_read */
1063  		sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0,
1064  		    dtimeout);
1065  		return;
1066  	    }
1067  #else
1068  #ifdef NO_THREADS
1069  	    int fork_ret = fork();
1070  	    switch (fork_ret) {
1071  	    case 0:		// child
1072  		_client_tcp_port->threadId = getpid();
1073  		tcpsvr_handle_blocking_request(blocking_read_req);
1074  		exit(0);
1075  		break;
1076  
1077  	    case -1:		// Error
1078  		rcs_print_error("fork error: %d %s\n", errno,
1079  		    strerror(errno));
1080  		_client_tcp_port->blocking = 0;
1081  		putbe32(temp_buffer, _client_tcp_port->serial_number);
1082  		putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1083  		putbe32(temp_buffer + 8, 0);
1084  		putbe32(temp_buffer + 12, 0);
1085  		putbe32(temp_buffer + 16, 0);
1086  		sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0,
1087  		    dtimeout);
1088  		break;
1089  
1090  	    default:		// parent;
1091  		_client_tcp_port->threadId = fork_ret;
1092  		break;
1093  	    }
1094  #else
1095  	    rcs_print_error
1096  		("Blocking read not supported on this platform.\n");
1097  	    *((uint32_t *) temp_buffer) =
1098  		htonl(_client_tcp_port->serial_number);
1099  	    *((uint32_t *) temp_buffer + 1) =
1100  		htonl((unsigned long) CMS_SERVER_SIDE_ERROR);
1101  	    putbe32(temp_buffer + 8, 0);	/* size */
1102  	    putbe32(temp_buffer + 12, 0);	/* write_id */
1103  	    putbe32(temp_buffer + 16, 0);	/* was_read */
1104  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
1105  	    return;
1106  
1107  #endif
1108  #endif
1109  	    tcpsvr_threads_created++;
1110  	}
1111  	break;
1112  
1113      case REMOTE_CMS_READ_REQUEST_TYPE:
1114  	server->read_req.buffer_number = buffer_number;
1115  	server->read_req.access_type = ntohl(*((uint32_t *) temp_buffer + 3));
1116  	server->read_req.last_id_read = ntohl(*((uint32_t *) temp_buffer + 4));
1117  	server->read_reply =
1118  	    (REMOTE_READ_REPLY *) server->process_request(&server->read_req);
1119  	if (max_total_subdivisions > 1) {
1120  	    total_subdivisions =
1121  		server->get_total_subdivisions(buffer_number);
1122  	}
1123  	if (total_subdivisions > 1) {
1124  	    if (recvn
1125  		(_client_tcp_port->socket_fd,
1126  		    (char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1,
1127  		    NULL) < 0) {
1128  		rcs_print_error
1129  		    ("Can not read from client port (%d) from %s\n",
1130  		    _client_tcp_port->socket_fd,
1131  		    inet_ntoa(_client_tcp_port->address.sin_addr));
1132  		_client_tcp_port->errors++;
1133  		return;
1134  	    }
1135  	    server->read_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 5));
1136  	} else {
1137  	    server->read_req.subdiv = 0;
1138  	}
1139  	if (NULL == server->read_reply) {
1140  	    rcs_print_error("Server could not process request.\n");
1141  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1142  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1143  	    putbe32(temp_buffer + 8, 0);
1144  	    putbe32(temp_buffer + 12, 0);
1145  	    putbe32(temp_buffer + 16, 0);
1146  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
1147  	    return;
1148  	}
1149  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1150  	putbe32(temp_buffer + 4, server->read_reply->status);
1151  	putbe32(temp_buffer + 8, server->read_reply->size);
1152  	putbe32(temp_buffer + 12, server->read_reply->write_id);
1153  	putbe32(temp_buffer + 16, server->read_reply->was_read);
1154  	if (server->read_reply->size < (0x2000 - 20)
1155  	    && server->read_reply->size > 0) {
1156  	    memcpy(temp_buffer + 20, server->read_reply->data,
1157  		server->read_reply->size);
1158  	    if (sendn
1159  		(_client_tcp_port->socket_fd, temp_buffer,
1160  		    20 + server->read_reply->size, 0, dtimeout) < 0) {
1161  		_client_tcp_port->errors++;
1162  		return;
1163  	    }
1164  	} else {
1165  	    if (sendn
1166  		(_client_tcp_port->socket_fd, temp_buffer, 20, 0,
1167  		    dtimeout) < 0) {
1168  		_client_tcp_port->errors++;
1169  		return;
1170  	    }
1171  	    if (server->read_reply->size > 0) {
1172  		if (sendn
1173  		    (_client_tcp_port->socket_fd, server->read_reply->data,
1174  			server->read_reply->size, 0, dtimeout) < 0) {
1175  		    _client_tcp_port->errors++;
1176  		    return;
1177  		}
1178  	    }
1179  	}
1180  	break;
1181  
1182      case REMOTE_CMS_WRITE_REQUEST_TYPE:
1183  	server->write_req.buffer_number = buffer_number;
1184  	server->write_req.access_type = ntohl(*((uint32_t *) temp_buffer + 3));
1185  	server->write_req.size = ntohl(*((uint32_t *) temp_buffer + 4));
1186  	total_subdivisions = 1;
1187  	if (max_total_subdivisions > 1) {
1188  	    total_subdivisions =
1189  		server->get_total_subdivisions(buffer_number);
1190  	}
1191  	if (total_subdivisions > 1) {
1192  	    if (recvn
1193  		(_client_tcp_port->socket_fd,
1194  		    (char *) (((uint32_t *) temp_buffer) + 5), 4, 0, -1,
1195  		    NULL) < 0) {
1196  		rcs_print_error
1197  		    ("Can not read from client port (%d) from %s\n",
1198  		    _client_tcp_port->socket_fd,
1199  		    inet_ntoa(_client_tcp_port->address.sin_addr));
1200  		_client_tcp_port->errors++;
1201  		return;
1202  	    }
1203  	    server->write_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 5));
1204  	} else {
1205  	    server->write_req.subdiv = 0;
1206  	}
1207  	if (server->write_req.size > 0) {
1208  	    if (recvn
1209  		(_client_tcp_port->socket_fd, server->write_req.data,
1210  		    server->write_req.size, 0, -1, NULL) < 0) {
1211  		_client_tcp_port->errors++;
1212  		return;
1213  	    }
1214  	}
1215  	REMOTE_WRITE_REPLY *reply;
1216  	server->write_reply = reply =
1217  	    (REMOTE_WRITE_REPLY *) server->process_request(&server->
1218  	    write_req);
1219  	if (((min_compatible_version < 2.58) && (min_compatible_version > 1e-6)) || server->write_reply->confirm_write) {
1220  	    if (NULL == server->write_reply) {
1221  		rcs_print_error("Server could not process request.\n");
1222  	        putbe32(temp_buffer, reply->write_id);
1223  		putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1224  		putbe32(temp_buffer + 8, 0);	/* was_read */
1225  		sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0,
1226  		    dtimeout);
1227  		return;
1228  	    }
1229  	    putbe32(temp_buffer, reply->write_id);
1230  	    putbe32(temp_buffer + 4, reply->status);
1231  	    putbe32(temp_buffer + 8, reply->was_read);
1232  	    if (sendn
1233  		(_client_tcp_port->socket_fd, temp_buffer, 12, 0,
1234  		    dtimeout) < 0) {
1235  		_client_tcp_port->errors++;
1236  	    }
1237  	} else {
1238  	    if (NULL == server->write_reply) {
1239  		rcs_print_error("Server could not process request.\n");
1240  	    }
1241  	}
1242  	break;
1243  
1244      case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE:
1245  	server->check_if_read_req.buffer_number = buffer_number;
1246  	server->check_if_read_req.subdiv =
1247  	    ntohl(*((uint32_t *) temp_buffer + 3));
1248  	server->check_if_read_reply =
1249  	    (REMOTE_CHECK_IF_READ_REPLY *) server->process_request(&server->
1250  	    check_if_read_req);
1251  	if (NULL == server->check_if_read_reply) {
1252  	    rcs_print_error("Server could not process request.\n");
1253  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1254  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1255  	    putbe32(temp_buffer + 8, 0);	/* was_read */
1256  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
1257  	    return;
1258  	}
1259  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1260  	*((uint32_t *) temp_buffer + 1) =
1261  	    htonl(server->check_if_read_reply->status);
1262  	*((uint32_t *) temp_buffer + 2) =
1263  	    htonl(server->check_if_read_reply->was_read);
1264  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
1265  	    0) {
1266  	    _client_tcp_port->errors++;
1267  	}
1268  	break;
1269  
1270      case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE:
1271  	server->get_msg_count_req.buffer_number = buffer_number;
1272  	server->get_msg_count_req.subdiv =
1273  	    ntohl(*((uint32_t *) temp_buffer + 3));
1274  	server->get_msg_count_reply =
1275  	    (REMOTE_GET_MSG_COUNT_REPLY *) server->process_request(&server->
1276  	    get_msg_count_req);
1277  	if (NULL == server->get_msg_count_reply) {
1278  	    rcs_print_error("Server could not process request.\n");
1279  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1280  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1281  	    putbe32(temp_buffer + 8, 0);	/* was_read */
1282  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
1283  	    return;
1284  	}
1285  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1286  	*((uint32_t *) temp_buffer + 1) =
1287  	    htonl(server->get_msg_count_reply->status);
1288  	*((uint32_t *) temp_buffer + 2) =
1289  	    htonl(server->get_msg_count_reply->count);
1290  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
1291  	    0) {
1292  	    _client_tcp_port->errors++;
1293  	}
1294  	break;
1295  
1296      case REMOTE_CMS_GET_QUEUE_LENGTH_REQUEST_TYPE:
1297  	server->get_queue_length_req.buffer_number = buffer_number;
1298  	server->get_queue_length_req.subdiv =
1299  	    ntohl(*((uint32_t *) temp_buffer + 3));
1300  	server->get_queue_length_reply =
1301  	    (REMOTE_GET_QUEUE_LENGTH_REPLY *) server->
1302  	    process_request(&server->get_queue_length_req);
1303  	if (NULL == server->get_queue_length_reply) {
1304  	    rcs_print_error("Server could not process request.\n");
1305  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1306  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1307  	    putbe32(temp_buffer + 8, 0);	/* was_read */
1308  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
1309  	    return;
1310  	}
1311  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1312  	*((uint32_t *) temp_buffer + 1) =
1313  	    htonl(server->get_queue_length_reply->status);
1314  	*((uint32_t *) temp_buffer + 2) =
1315  	    htonl(server->get_queue_length_reply->queue_length);
1316  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
1317  	    0) {
1318  	    _client_tcp_port->errors++;
1319  	}
1320  	break;
1321  
1322      case REMOTE_CMS_GET_SPACE_AVAILABLE_REQUEST_TYPE:
1323  	server->get_space_available_req.buffer_number = buffer_number;
1324  	server->get_space_available_req.subdiv =
1325  	    ntohl(*((uint32_t *) temp_buffer + 3));
1326  	server->get_space_available_reply =
1327  	    (REMOTE_GET_SPACE_AVAILABLE_REPLY *) server->
1328  	    process_request(&server->get_space_available_req);
1329  	if (NULL == server->get_space_available_reply) {
1330  	    rcs_print_error("Server could not process request.\n");
1331  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1332  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1333  	    putbe32(temp_buffer + 8, 0);	/* was_read */
1334  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
1335  	    return;
1336  	}
1337  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1338  	*((uint32_t *) temp_buffer + 1) =
1339  	    htonl(server->get_space_available_reply->status);
1340  	*((uint32_t *) temp_buffer + 2) =
1341  	    htonl(server->get_space_available_reply->space_available);
1342  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
1343  	    0) {
1344  	    _client_tcp_port->errors++;
1345  	}
1346  	break;
1347  
1348      case REMOTE_CMS_CLEAR_REQUEST_TYPE:
1349  	server->clear_req.buffer_number = buffer_number;
1350  	server->clear_req.subdiv = ntohl(*((uint32_t *) temp_buffer + 3));
1351  	server->clear_reply =
1352  	    (REMOTE_CLEAR_REPLY *) server->process_request(&server->
1353  	    clear_req);
1354  	if (NULL == server->clear_reply) {
1355  	    rcs_print_error("Server could not process request.\n");
1356  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1357  	    putbe32(temp_buffer + 4, CMS_SERVER_SIDE_ERROR);
1358  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
1359  	    return;
1360  	}
1361  	putbe32(temp_buffer, _client_tcp_port->serial_number);
1362  	putbe32(temp_buffer + 4, server->clear_reply->status);
1363  	if (sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout) <
1364  	    0) {
1365  	    _client_tcp_port->errors++;
1366  	}
1367  	break;
1368  
1369      case REMOTE_CMS_CLEAN_REQUEST_TYPE:
1370  	server->spawner_pid = server->server_pid;
1371  	server->kill_server();
1372  	break;
1373  
1374      case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE:
1375  	client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head();
1376  	while (NULL != client_port_to_check) {
1377  	    if (client_port_to_check->socket_fd ==
1378  		_client_tcp_port->socket_fd) {
1379  		break;
1380  	    }
1381  	    client_port_to_check =
1382  		(CLIENT_TCP_PORT *) client_ports->get_next();
1383  	}
1384  	FD_CLR(_client_tcp_port->socket_fd, &read_fd_set);
1385  	close(_client_tcp_port->socket_fd);
1386  	current_clients--;
1387  	if (NULL != _client_tcp_port->subscriptions) {
1388  	    remove_subscription_client(_client_tcp_port, buffer_number);
1389  	}
1390  	_client_tcp_port->socket_fd = -1;
1391  	delete _client_tcp_port;
1392  	client_ports->delete_current_node();
1393  	break;
1394  
1395      case REMOTE_CMS_GET_KEYS_REQUEST_TYPE:
1396  	server->get_keys_req.buffer_number = buffer_number;
1397  	if (recvn(_client_tcp_port->socket_fd,
1398  		server->get_keys_req.name, 16, 0, -1, NULL) < 0) {
1399  	    _client_tcp_port->errors++;
1400  	    return;
1401  	}
1402  	server->get_keys_reply =
1403  	    (REMOTE_GET_KEYS_REPLY *) server->process_request(&server->
1404  	    get_keys_req);
1405  	if (NULL == server->get_keys_reply) {
1406  	    rcs_print_error("Server could not process request.\n");
1407  	    memset(temp_buffer, 0, 20);
1408  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1409  	    server->gen_random_key(((char *) temp_buffer) + 4, 2);
1410  	    server->gen_random_key(((char *) temp_buffer) + 12, 2);
1411  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
1412  	    return;
1413  	} else {
1414  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1415  	    memcpy(((char *) temp_buffer) + 4, server->get_keys_reply->key1,
1416  		8);
1417  	    memcpy(((char *) temp_buffer) + 12, server->get_keys_reply->key2,
1418  		8);
1419  	    /* successful ? */
1420  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
1421  	    return;
1422  	}
1423  	break;
1424  
1425      case REMOTE_CMS_LOGIN_REQUEST_TYPE:
1426  	server->login_req.buffer_number = buffer_number;
1427  	if (recvn(_client_tcp_port->socket_fd,
1428  		server->login_req.name, 16, 0, -1, NULL) < 0) {
1429  	    _client_tcp_port->errors++;
1430  	    return;
1431  	}
1432  	if (recvn(_client_tcp_port->socket_fd,
1433  		server->login_req.passwd, 16, 0, -1, NULL) < 0) {
1434  	    _client_tcp_port->errors++;
1435  	    return;
1436  	}
1437  	server->login_reply =
1438  	    (REMOTE_LOGIN_REPLY *) server->process_request(&server->
1439  	    login_req);
1440  	if (NULL == server->login_reply) {
1441  	    rcs_print_error("Server could not process request.\n");
1442  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1443  	    putbe32(temp_buffer + 4, 0);	/* not successful */
1444  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
1445  	    return;
1446  	} else {
1447  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1448  	    putbe32(temp_buffer + 4, server->login_reply->success);
1449  	    /* successful ? */
1450  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
1451  	    return;
1452  	}
1453  	break;
1454  
1455      case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE:
1456  	server->set_subscription_req.buffer_number = buffer_number;
1457  	server->set_subscription_req.subscription_type =
1458  	    ntohl(*((uint32_t *) temp_buffer + 3));
1459  	server->set_subscription_req.poll_interval_millis =
1460  	    ntohl(*((uint32_t *) temp_buffer + 4));
1461  	server->set_subscription_reply =
1462  	    (REMOTE_SET_SUBSCRIPTION_REPLY *) server->
1463  	    process_request(&server->set_subscription_req);
1464  	if (NULL == server->set_subscription_reply) {
1465  	    rcs_print_error("Server could not process request.\n");
1466  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1467  	    putbe32(temp_buffer + 4, 0);	/* not successful */
1468  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
1469  	    return;
1470  	} else {
1471  	    if (server->set_subscription_reply->success) {
1472  		if (server->set_subscription_req.subscription_type ==
1473  		    CMS_POLLED_SUBSCRIPTION
1474  		    || server->set_subscription_req.subscription_type ==
1475  		    CMS_VARIABLE_SUBSCRIPTION) {
1476  		    add_subscription_client(buffer_number,
1477  			server->set_subscription_req.
1478  			subscription_type,
1479  			server->set_subscription_req.
1480  			poll_interval_millis, _client_tcp_port);
1481  		}
1482  		if (server->set_subscription_req.subscription_type ==
1483  		    CMS_NO_SUBSCRIPTION) {
1484  		    remove_subscription_client(_client_tcp_port,
1485  			buffer_number);
1486  		}
1487  	    }
1488  	    putbe32(temp_buffer, _client_tcp_port->serial_number);
1489  	    *((uint32_t *) temp_buffer + 1) =
1490  		htonl(server->set_subscription_reply->success);
1491  	    /* successful ? */
1492  	    sendn(_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
1493  	    return;
1494  	}
1495  	break;
1496  
1497      default:
1498  	_client_tcp_port->errors++;
1499  	rcs_print_error("Unrecognized request type received.(%ld)\n",
1500  	    request_type);
1501  	break;
1502      }
1503  }
1504  
1505  void CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client(int buffer_number,
1506      int subscription_type, int poll_interval_millis, CLIENT_TCP_PORT * clnt)
1507  {
1508      if (NULL == subscription_buffers) {
1509  	subscription_buffers = new LinkedList();
1510      }
1511      if (NULL == subscription_buffers) {
1512  	rcs_print_error("Can`t create subscription_buffers list.\n");
1513      }
1514  
1515      TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
1516  	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();
1517      while (NULL != buf_info) {
1518  	if (buf_info->buffer_number == buffer_number) {
1519  	    break;
1520  	}
1521  	buf_info =
1522  	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();
1523      }
1524      if (NULL == buf_info) {
1525  	buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO();
1526  	buf_info->buffer_number = buffer_number;
1527  	buf_info->sub_clnt_info = new LinkedList();
1528  	buf_info->list_id =
1529  	    subscription_buffers->store_at_tail(buf_info, sizeof(*buf_info),
1530  	    0);
1531      }
1532      buf_info->min_last_id = 0;
1533      if (NULL == clnt->subscriptions) {
1534  	clnt->subscriptions = new LinkedList();
1535      }
1536      TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
1537  	(TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head();
1538      while (temp_clnt_info != NULL) {
1539  	if (temp_clnt_info->buffer_number == buffer_number) {
1540  	    break;
1541  	}
1542  	temp_clnt_info =
1543  	    (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next();
1544      }
1545      if (NULL == temp_clnt_info) {
1546  	temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO();
1547  	temp_clnt_info->last_sub_sent_time = 0.0;
1548  	temp_clnt_info->buffer_number = buffer_number;
1549  	temp_clnt_info->subscription_paused = 0;
1550  	temp_clnt_info->last_id_read = 0;
1551  	temp_clnt_info->sub_buf_info = buf_info;
1552  	temp_clnt_info->clnt_port = clnt;
1553  	temp_clnt_info->last_sub_sent_time = etime();
1554  	temp_clnt_info->subscription_list_id =
1555  	    clnt->subscriptions->store_at_tail(temp_clnt_info,
1556  	    sizeof(*temp_clnt_info), 0);
1557  	buf_info->sub_clnt_info->store_at_tail(temp_clnt_info,
1558  	    sizeof(*temp_clnt_info), 0);
1559      }
1560      temp_clnt_info->subscription_type = subscription_type;
1561      temp_clnt_info->poll_interval_millis = poll_interval_millis;
1562      recalculate_polling_interval();
1563  }
1564  
1565  void CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client(CLIENT_TCP_PORT *
1566      clnt, int buffer_number)
1567  {
1568      TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
1569  	(TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head();
1570      while (temp_clnt_info != NULL) {
1571  	if (temp_clnt_info->buffer_number == buffer_number) {
1572  	    if (NULL != temp_clnt_info->sub_buf_info) {
1573  		if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info) {
1574  		    temp_clnt_info->sub_buf_info->sub_clnt_info->
1575  			delete_node(temp_clnt_info->subscription_list_id);
1576  		    if (temp_clnt_info->sub_buf_info->sub_clnt_info->
1577  			list_size == 0) {
1578  			subscription_buffers->delete_node(temp_clnt_info->
1579  			    sub_buf_info->list_id);
1580  			delete temp_clnt_info->sub_buf_info->sub_clnt_info;
1581  			temp_clnt_info->sub_buf_info->sub_clnt_info = NULL;
1582  			delete temp_clnt_info->sub_buf_info;
1583  			temp_clnt_info->sub_buf_info = NULL;
1584  		    }
1585  		}
1586  	    }
1587  	    delete temp_clnt_info;
1588  	    temp_clnt_info = NULL;
1589  	    break;
1590  	}
1591  	temp_clnt_info =
1592  	    (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next();
1593      }
1594      recalculate_polling_interval();
1595  }
1596  
1597  void CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval()
1598  {
1599      int min_poll_interval_millis = 30000;
1600      polling_enabled = 0;
1601      TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
1602  	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();
1603      while (NULL != buf_info) {
1604  	TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
1605  	    (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->
1606  	    get_head();
1607  	while (temp_clnt_info != NULL) {
1608  	    if (temp_clnt_info->poll_interval_millis <
1609  		min_poll_interval_millis
1610  		&& temp_clnt_info->subscription_type ==
1611  		CMS_POLLED_SUBSCRIPTION) {
1612  		min_poll_interval_millis =
1613  		    temp_clnt_info->poll_interval_millis;
1614  		polling_enabled = 1;
1615  	    }
1616  	    temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *)
1617  		buf_info->sub_clnt_info->get_next();
1618  	}
1619  	buf_info =
1620  	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();
1621      }
1622      if (min_poll_interval_millis >= ((int) (clk_tck() * 1000.0))) {
1623  	current_poll_interval_millis = min_poll_interval_millis;
1624      } else {
1625  	current_poll_interval_millis = ((int) (clk_tck() * 1000.0));
1626      }
1627      select_timeout.tv_sec = current_poll_interval_millis / 1000;
1628      select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000;
1629      dtimeout = (current_poll_interval_millis + 10) * 1000.0;
1630      if (dtimeout < 0.5) {
1631  	dtimeout = 0.5;
1632      }
1633  }
1634  
1635  void CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions()
1636  {
1637      pid_t pid = getpid();
1638      pid_t tid = 0;
1639      CMS_SERVER *server;
1640      server = find_server(pid, tid);
1641      if (NULL == server) {
1642  	rcs_print_error
1643  	    ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n",
1644  	    pid);
1645  	return;
1646      }
1647      if (NULL == subscription_buffers) {
1648  	return;
1649      }
1650      double cur_time = etime();
1651      TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
1652  	(TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head();
1653      while (NULL != buf_info) {
1654  	server->read_req.buffer_number = buf_info->buffer_number;
1655  	server->read_req.access_type = CMS_READ_ACCESS;
1656  	server->read_req.last_id_read = buf_info->min_last_id;
1657  	server->read_reply =
1658  	    (REMOTE_READ_REPLY *) server->process_request(&server->read_req);
1659  	if (NULL == server->read_reply) {
1660  	    rcs_print_error("Server could not process request.\n");
1661  	    buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)
1662  		subscription_buffers->get_next();
1663  	    continue;
1664  	}
1665  	if (server->read_reply->write_id == buf_info->min_last_id ||
1666  	    server->read_reply->size < 1) {
1667  	    buf_info = (TCP_BUFFER_SUBSCRIPTION_INFO *)
1668  		subscription_buffers->get_next();
1669  	    continue;
1670  	}
1671  	putbe32(temp_buffer, 0);
1672  	putbe32(temp_buffer + 4, server->read_reply->status);
1673  	putbe32(temp_buffer + 8, server->read_reply->size);
1674  	putbe32(temp_buffer + 12, server->read_reply->write_id);
1675  	putbe32(temp_buffer + 16, server->read_reply->was_read);
1676  	TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
1677  	    (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->
1678  	    get_head();
1679  	buf_info->min_last_id = server->read_reply->write_id;
1680  	while (temp_clnt_info != NULL) {
1681  	    double time_diff = cur_time - temp_clnt_info->last_sub_sent_time;
1682  	    int time_diff_millis = (int) ((double) time_diff * 1000.0);
1683  	    rcs_print_debug(PRINT_SERVER_SUBSCRIPTION_ACTIVITY,
1684  		"Subscription time_diff_millis=%d\n", time_diff_millis);
1685  	    if (((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION
1686  			&& time_diff_millis + 10 >=
1687  			temp_clnt_info->poll_interval_millis)
1688  		    || temp_clnt_info->subscription_type ==
1689  		    CMS_VARIABLE_SUBSCRIPTION)
1690  		&& temp_clnt_info->last_id_read !=
1691  		server->read_reply->write_id) {
1692  		temp_clnt_info->last_id_read = server->read_reply->write_id;
1693  		temp_clnt_info->last_sub_sent_time = cur_time;
1694  		temp_clnt_info->clnt_port->serial_number++;
1695  		putbe32(temp_buffer, temp_clnt_info->clnt_port->serial_number);
1696  		if (server->read_reply->size < 0x2000 - 20
1697  		    && server->read_reply->size > 0) {
1698  		    memcpy(temp_buffer + 20, server->read_reply->data,
1699  			server->read_reply->size);
1700  		    if (sendn
1701  			(temp_clnt_info->clnt_port->socket_fd, temp_buffer,
1702  			    20 + server->read_reply->size, 0, dtimeout) < 0) {
1703  			temp_clnt_info->clnt_port->errors++;
1704  			return;
1705  		    }
1706  		} else {
1707  		    if (sendn(temp_clnt_info->clnt_port->socket_fd,
1708  			    temp_buffer, 20, 0, dtimeout) < 0) {
1709  			temp_clnt_info->clnt_port->errors++;
1710  			return;
1711  		    }
1712  		    if (server->read_reply->size > 0) {
1713  			if (sendn(temp_clnt_info->clnt_port->socket_fd,
1714  				server->read_reply->data,
1715  				server->read_reply->size, 0, dtimeout) < 0) {
1716  			    temp_clnt_info->clnt_port->errors++;
1717  			    return;
1718  			}
1719  		    }
1720  		}
1721  	    }
1722  	    if (temp_clnt_info->last_id_read < buf_info->min_last_id) {
1723  		buf_info->min_last_id = temp_clnt_info->last_id_read;
1724  	    }
1725  	    temp_clnt_info = (TCP_CLIENT_SUBSCRIPTION_INFO *)
1726  		buf_info->sub_clnt_info->get_next();
1727  	}
1728  	buf_info =
1729  	    (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next();
1730      }
1731  }
1732  
1733  TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO()
1734  {
1735      buffer_number = -1;
1736      min_last_id = 0;
1737      list_id = -1;
1738      sub_clnt_info = NULL;
1739  }
1740  
1741  TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO()
1742  {
1743      buffer_number = -1;
1744      min_last_id = 0;
1745      list_id = -1;
1746      if (NULL != sub_clnt_info) {
1747  	delete sub_clnt_info;
1748  	sub_clnt_info = NULL;
1749      }
1750  }
1751  
1752  TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO()
1753  {
1754      subscription_type = CMS_NO_SUBSCRIPTION;
1755      poll_interval_millis = 30000;
1756      last_sub_sent_time = 0.0;
1757      subscription_list_id = -1;
1758      buffer_number = -1;
1759      subscription_paused = 0;
1760      last_id_read = 0;
1761      sub_buf_info = NULL;
1762      clnt_port = NULL;
1763  }
1764  
1765  TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO()
1766  {
1767      subscription_type = CMS_NO_SUBSCRIPTION;
1768      poll_interval_millis = 30000;
1769      last_sub_sent_time = 0.0;
1770      subscription_list_id = -1;
1771      buffer_number = -1;
1772      subscription_paused = 0;
1773      last_id_read = 0;
1774      sub_buf_info = NULL;
1775      clnt_port = NULL;
1776  }
1777  
1778  CLIENT_TCP_PORT::CLIENT_TCP_PORT()
1779  {
1780      serial_number = 0;
1781      errors = 0;
1782      max_errors = 50;
1783      address.sin_port = 0;
1784      address.sin_family = AF_INET;
1785      address.sin_addr.s_addr = htonl(INADDR_ANY);
1786      socket_fd = -1;
1787      subscriptions = NULL;
1788      tid = -1;
1789      pid = -1;
1790      blocking_read_req = NULL;
1791      threadId = 0;
1792      diag_info = NULL;
1793  }
1794  
1795  CLIENT_TCP_PORT::~CLIENT_TCP_PORT()
1796  {
1797      if (socket_fd > 0) {
1798  	close(socket_fd);
1799  	socket_fd = -1;
1800      }
1801      if (NULL != subscriptions) {
1802  	TCP_CLIENT_SUBSCRIPTION_INFO *sub_info =
1803  	    (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head();
1804  	while (NULL != sub_info) {
1805  	    delete sub_info;
1806  	    sub_info =
1807  		(TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next();
1808  	}
1809  	delete subscriptions;
1810  	subscriptions = NULL;
1811      }
1812  #ifdef NO_THREADS
1813      if (NULL != blocking_read_req) {
1814  	delete blocking_read_req;
1815  	blocking_read_req = NULL;
1816      }
1817  #endif
1818      if (NULL != diag_info) {
1819  	delete diag_info;
1820  	diag_info = NULL;
1821      }
1822  }