/ src / libnml / buffer / shmem.cc
shmem.cc
  1  /********************************************************************
  2  * Description: shmem.cc
  3  *   C++ file for the Communication Management System (CMS).
  4  *   Includes member Functions for class SHMEM.
  5  *   Notes: The class SHMEM should be used by procedures accessing a
  6  *   shared memory buffer on the same processor.
  7  *
  8  *   Derived from a work by Fred Proctor & Will Shackleford
  9  *
 10  * Author:
 11  * License: GPL Version 2
 12  * System: Linux
 13  *    
 14  * Copyright (c) 2004 All rights reserved.
 15  *
 16  * Last change: 
 17  ********************************************************************/
 18  
 19  #ifdef __cplusplus
 20  extern "C" {
 21  #endif
 22  
 23  #include <stdio.h>		/* sscanf() */
 24  #include <stddef.h>		/* size_t */
 25  #include <sys/stat.h>		/* S_IRUSR, etc. */
 26  #include <sys/types.h>		/* key_t */
 27  #include <errno.h>		// errno
 28  #include <string.h>		/* strchr(), memcpy(), memset() */
 29  #include <stdlib.h>		/* strtod */
 30  #include <physmem.hh>           /* PHYSMEM_HANDLE */
 31  
 32  #ifdef __cplusplus
 33  }
 34  #endif
 35  #include "rcs_print.hh"		/* rcs_print_error() */
 36  #include "cms.hh"		/* class CMS */
 37  #include "shmem.hh"		/* class SHMEM */
 38  #include "shm.hh"		/* class RCS_SHAREDMEM */
 39  //#include "sem.hh"             /* class RCS_SEMAPHORE */
 40  #include "memsem.hh"		/* mem_get_access(), mem_release_access() */
 41  #include "timer.hh"		/* etime(), esleep() */
 42  /* Common Definitions. */
 43  //#include "autokey.h"
 44  /* rw-rw-r-- permissions */
 45  #define MODE (0700)
 46  static double last_non_zero_x;
 47  static double last_x;
 48  
 49  static int not_zero(volatile double x)
 50  {
 51      last_x = x;
 52      if (x < -1E-6 && last_x < -1E-6) {
 53  	last_non_zero_x = x;
 54  	return 1;
 55      }
 56      if (x > 1E-6 && last_x > 1E-6) {
 57  	last_non_zero_x = x;
 58  	return 1;
 59      }
 60      return 0;
 61  }
 62  
 63  /* SHMEM Member Functions. */
 64  
 65  /* Constructor for hard coded tests. */
 66  SHMEM::SHMEM(const char *n, long s, int nt, key_t k, int m):CMS(s)
 67  {
 68      /* Set pointers to null so only properly opened pointers are closed. */
 69      shm = NULL;
 70  //  sem = NULL;
 71  
 72      /* save constructor args */
 73      master = m;
 74      key = k;
 75  
 76      /* open the shared mem buffer and create mutual exclusion semaphore */
 77      open();
 78  }
 79  
 80  /* Constructor for use with cms_config. */
 81  SHMEM::SHMEM(const char *bufline, const char *procline, int set_to_server,
 82      int set_to_master):CMS(bufline, procline, set_to_server)
 83  {
 84      /* Set pointers to null so only properly opened pointers are closed. */
 85      shm = NULL;
 86      sem = NULL;
 87      sem_delay = 0.00001;
 88      char *semdelay_equation;
 89      use_os_sem = 1;
 90      use_os_sem_only = 1;
 91      mutex_type = OS_SEM_MUTEX;
 92      bsem_key = -1;
 93      second_read = 0;
 94  
 95      if (status < 0) {
 96  	rcs_print_error("SHMEM: status = %d\n", status);
 97  	return;
 98      }
 99  
100      /* Save parameters from configuration file. */
101      if (sscanf(bufline, "%*s %*s %*s %*s %*s %*s %*s %*s %*s %d", &key) != 1) {
102  	rcs_print_error("SHMEM: Invalid configuration file format.\n");
103  	return;
104      }
105  
106      master = is_local_master;
107      if (1 == set_to_master) {
108  	master = 1;
109      } else if (-1 == set_to_master) {
110  	master = 0;
111      }
112      if (NULL != (semdelay_equation = strstr(proclineupper, "SEMDELAY="))) {
113  	sem_delay = strtod(semdelay_equation + 9, (char **) NULL);
114      } else if (NULL != (semdelay_equation =
115  	    strstr(buflineupper, "SEMDELAY="))) {
116  	sem_delay = strtod(semdelay_equation + 9, (char **) NULL);
117      }
118  
119      if (NULL != (semdelay_equation = strstr(buflineupper, "BSEM="))) {
120  	bsem_key = strtol(semdelay_equation + 5, (char **) NULL, 0);
121      }
122  
123      if (NULL != strstr(buflineupper, "MUTEX=NONE")) {
124  	mutex_type = NO_MUTEX;
125  	use_os_sem = 0;
126  	use_os_sem_only = 0;
127      }
128  
129      if (NULL != strstr(buflineupper, "MUTEX=OS_SEM")) {
130  	mutex_type = OS_SEM_MUTEX;
131  	use_os_sem = 1;
132  	use_os_sem_only = 1;
133      }
134  
135      if (NULL != strstr(buflineupper, "MUTEX=NO_INTERRUPTS")) {
136  	mutex_type = NO_INTERRUPTS_MUTEX;
137  	use_os_sem = 0;
138  	use_os_sem_only = 0;
139      }
140  
141      if (NULL != strstr(buflineupper, "MUTEX=NO_SWITCHING")) {
142  	mutex_type = NO_SWITCHING_MUTEX;
143  	use_os_sem = 0;
144  	use_os_sem_only = 0;
145      }
146  
147      if (NULL != strstr(buflineupper, "MUTEX=MAO")) {
148  	mutex_type = MAO_MUTEX;
149  	use_os_sem = 0;
150  	use_os_sem_only = 0;
151      }
152  
153      if (NULL != strstr(buflineupper, "MAO_W_OS_SEM")) {
154  	mutex_type = MAO_MUTEX_W_OS_SEM;
155  	use_os_sem = 1;
156  	use_os_sem_only = 0;
157      }
158  
159      /* Open the shared memory buffer and create mutual exclusion semaphore. */
160      open();
161  }
162  
163  SHMEM::~SHMEM()
164  {
165      /* detach from shared memory and semaphores */
166      close();
167  }
168  
169  /*
170    Open the SHMEM buffer
171    */
172  int SHMEM::open()
173  {
174      /* Set pointers to NULL incase error occurs. */
175      sem = NULL;
176      shm = NULL;
177      bsem = NULL;
178      shm_addr_offset = NULL;
179      second_read = 0;
180      autokey_table_size = 0;
181  /*! \todo Another #if 0 */
182  #if 0				// PC Do we need to use autokey ?
183      if (use_autokey_for_connection_number) {
184  	autokey_table_size = sizeof(AUTOKEY_TABLE_ENTRY) * total_connections;
185      }
186  #endif
187      /* set up the shared memory address and semaphore, in given state */
188      if (master) {
189  	shm = new RCS_SHAREDMEM(key, size, RCS_SHAREDMEM_CREATE, (int) MODE);
190  	if (shm->addr == NULL) {
191  	    switch (shm->create_errno) {
192  	    case EACCES:
193  		status = CMS_PERMISSIONS_ERROR;
194  		break;
195  
196  	    case EEXIST:
197  		status = CMS_RESOURCE_CONFLICT_ERROR;
198  		break;
199  
200  	    case ENOMEM:
201  	    case ENOSPC:
202  		status = CMS_CREATE_ERROR;
203  		break;
204  
205  	    default:
206  		status = CMS_MISC_ERROR;
207  	    }
208  	    delete shm;
209  	    shm = NULL;
210  	    return -1;
211  	}
212  	if (use_os_sem) {
213  	    sem =
214  		new RCS_SEMAPHORE(key, RCS_SEMAPHORE_CREATE, timeout,
215  		(int) MODE, (use_os_sem_only != 0));
216  	    if (NULL == sem) {
217  		rcs_print_error("CMS: couldn't create RCS_SEMAPHORE.\n");
218  		rcs_print_error(" Possibly out of memory?\n");
219  		status = CMS_CREATE_ERROR;
220  		return -1;
221  	    }
222  	    if (!sem->valid()) {
223  		rcs_print_error("CMS: RCS_SEMAPHORE is invalid.\n");
224  		status = CMS_MISC_ERROR;
225  		return -1;
226  	    }
227  	}
228  	if (bsem_key > 0) {
229  	    bsem = new RCS_SEMAPHORE(bsem_key, RCS_SEMAPHORE_CREATE,
230  		timeout, (int) MODE, 0);
231  	    if (NULL == bsem) {
232  		rcs_print_error("CMS: couldn't create RCS_SEMAPHORE.\n");
233  		rcs_print_error(" Possibly out of memory?\n");
234  		status = CMS_CREATE_ERROR;
235  		return -1;
236  	    }
237  	    if (!bsem->valid()) {
238  		rcs_print_error("CMS: RCS_SEMAPHORE is invalid.\n");
239  		status = CMS_MISC_ERROR;
240  		return -1;
241  	    }
242  	}
243  	in_buffer_id = 0;
244      } else {
245  	shm = new RCS_SHAREDMEM(key, size, RCS_SHAREDMEM_NOCREATE);
246  	if (NULL == shm) {
247  	    rcs_print_error
248  		("CMS: couldn't create RCS_SHAREDMEM(%d(0x%X), %ld(0x%lX), RCS_SHAREDMEM_NOCREATE).\n",
249  		key, key, size, size);
250  	    status = CMS_CREATE_ERROR;
251  	    return -1;
252  	}
253  	if (shm->addr == NULL) {
254  	    switch (shm->create_errno) {
255  	    case EACCES:
256  		status = CMS_PERMISSIONS_ERROR;
257  		break;
258  
259  	    case EEXIST:
260  		status = CMS_RESOURCE_CONFLICT_ERROR;
261  		break;
262  
263  	    case ENOENT:
264  		status = CMS_NO_MASTER_ERROR;
265  		break;
266  
267  	    case ENOMEM:
268  	    case ENOSPC:
269  		status = CMS_CREATE_ERROR;
270  		break;
271  
272  	    default:
273  		status = CMS_MISC_ERROR;
274  	    }
275  	    delete shm;
276  	    shm = NULL;
277  	    return -1;
278  	}
279  	if (use_os_sem) {
280  	    sem = new RCS_SEMAPHORE(key, RCS_SEMAPHORE_NOCREATE, timeout);
281  	    if (NULL == sem) {
282  		rcs_print_error("CMS: couldn't create RCS_SEMAPHORE.\n");
283  		rcs_print_error(" Possibly out of memory?\n");
284  		status = CMS_CREATE_ERROR;
285  		return -1;
286  	    }
287  	    if (!sem->valid()) {
288  		rcs_print_error("CMS: RCS_SEMAPHORE is invalid.\n");
289  		status = CMS_MISC_ERROR;
290  		return -1;
291  	    }
292  	}
293  	if (bsem_key > 0) {
294  	    bsem =
295  		new RCS_SEMAPHORE(bsem_key, RCS_SEMAPHORE_NOCREATE, timeout);
296  	    if (NULL == bsem) {
297  		rcs_print_error("CMS: couldn't create RCS_SEMAPHORE.\n");
298  		rcs_print_error(" Possibly out of memory?\n");
299  		status = CMS_CREATE_ERROR;
300  		return -1;
301  	    }
302  	    if (!bsem->valid()) {
303  		rcs_print_error("CMS: RCS_SEMAPHORE is invalid.\n");
304  		status = CMS_MISC_ERROR;
305  		return -1;
306  	    }
307  	}
308      }
309  
310      if (min_compatible_version < 3.44 && min_compatible_version > 0) {
311  	total_subdivisions = 1;
312      }
313  
314      if (min_compatible_version > 2.57 || min_compatible_version <= 0) {
315  	if (!shm->created) {
316  	    char *cptr = (char *) shm->addr;
317  	    cptr[31] = 0;
318  	    if (strncmp(cptr, BufferName, 31)) {
319  		rcs_print_error
320  		    ("Shared memory buffers %s and %s may conflict. (key=%d(0x%X))\n",
321  		    BufferName, cptr, key, key);
322  		strncpy(cptr, BufferName, 32);
323  	    }
324  	}
325  	if (master) {
326  /*! \todo Another #if 0 */
327  #if 0				// PC Do we need to use autokey ?
328  	    if (use_autokey_for_connection_number) {
329  		void *autokey_table_end =
330  		    (void *) (((char *) shm->addr) + 32 + autokey_table_size);
331  		memset(autokey_table_end, 0, size - 32 - autokey_table_size);
332  	    }
333  #endif
334  	    strncpy((char *) shm->addr, BufferName, 32);
335  	}
336  /*! \todo Another #if 0 */
337  #if 0				// PC Do we need to use autokey ?
338  	if (use_autokey_for_connection_number) {
339  	    void *autokey_table = (void *) (((char *) shm->addr) + 32);
340  	    connection_number =
341  		autokey_getkey(autokey_table, total_connections, ProcessName);
342  	    shm_addr_offset =
343  		(void *) ((char *) (shm->addr) + 32 + autokey_table_size);
344  	    max_message_size -= (32 + autokey_table_size);	/* size of
345  								   cms buffer 
346  								   available
347  								   for user */
348  	} else {
349  #endif
350  	    shm_addr_offset = (void *) ((char *) (shm->addr) + 32);
351  	    max_message_size -= 32;	/* size of cms buffer available for
352  					   user */
353  /*! \todo Another #if 0 */
354  #if 0				// PC Do we need to use autokey ?
355  	}
356  #endif
357  	/* messages = size - CMS Header space */
358  	if (enc_max_size <= 0 || enc_max_size > size) {
359  	    if (neutral) {
360  		max_encoded_message_size -= 32;
361  	    } else {
362  		max_encoded_message_size -=
363  		    (cms_encoded_data_explosion_factor * 32);
364  	    }
365  	}
366  	/* Maximum size of message after being encoded. */
367  	guaranteed_message_space -= 32;	/* Largest size message before being
368  					   encoded that can be guaranteed to
369  					   fit after xdr. */
370  	size -= 32;
371  	size_without_diagnostics -= 32;
372  	subdiv_size =
373  	    (size_without_diagnostics -
374  	    total_connections) / total_subdivisions;
375  	subdiv_size -= (subdiv_size % 4);
376      } else {
377  	if (master) {
378  	    memset(shm->addr, 0, size);
379  	}
380  	shm_addr_offset = shm->addr;
381      }
382      skip_area = 32 + total_connections + autokey_table_size;
383      mao.data = shm_addr_offset;
384      mao.timeout = timeout;
385      mao.total_connections = total_connections;
386      mao.sem_delay = sem_delay;
387      mao.connection_number = connection_number;
388      mao.split_buffer = split_buffer;
389      mao.read_only = 0;
390      mao.sem = sem;
391  
392      fast_mode = !queuing_enabled && !split_buffer && !neutral &&
393  	(mutex_type == NO_SWITCHING_MUTEX);
394      handle_to_global_data = dummy_handle = new PHYSMEM_HANDLE;
395      handle_to_global_data->set_to_ptr(shm_addr_offset, size);
396      if ((connection_number < 0 || connection_number >= total_connections)
397  	&& (mutex_type == MAO_MUTEX || mutex_type == MAO_MUTEX_W_OS_SEM)) {
398  	rcs_print_error("Bad connection number %ld\n", connection_number);
399  	status = CMS_MISC_ERROR;
400  	return -1;
401      }
402      return 0;
403  }
404  
405  /* Closes the  shared memory and mutual exclusion semaphore  descriptors. */
406  int SHMEM::close()
407  {
408      int nattch = 0;
409      second_read = 0;
410  
411  /*! \todo Another #if 0 */
412  #if 0				// PC Do we need to use autokey ?
413      if (use_autokey_for_connection_number) {
414  	void *autokey_table = (void *) (((char *) shm->addr) + 32);
415  	autokey_releasekey(autokey_table, total_connections, ProcessName,
416  	    connection_number);
417      }
418  #endif
419      if (NULL != shm) {
420  	/* see if we're the last one */
421  	nattch = shm->nattch();
422  	shm->delete_totally = delete_totally;
423  	delete shm;
424  	shm = NULL;
425      }
426      if (NULL != sem) {
427  	/* if we're the last one, then make us the master so that the
428  	   semaphore will go away */
429  	if ((nattch <= 1 && nattch > -1) || delete_totally) {
430  	    sem->setflag(RCS_SEMAPHORE_CREATE);
431  	} else {
432  	    sem->setflag(RCS_SEMAPHORE_NOCREATE);
433  	}
434  	delete sem;
435      }
436      if (NULL != bsem) {
437  	/* if we're the last one, then make us the master so that the
438  	   semaphore will go away */
439  	if ((nattch <= 1 && nattch > -1) || delete_totally) {
440  	    bsem->setflag(RCS_SEMAPHORE_CREATE);
441  	} else {
442  	    bsem->setflag(RCS_SEMAPHORE_NOCREATE);
443  	}
444  	delete bsem;
445      }
446  #ifdef DEBUG
447      printf("SHMEM(%s): nattch = %d\n", BufferName, nattch);
448  #endif
449  
450      return 0;
451  }
452  
453  /* Access the shared memory buffer. */
454  CMS_STATUS SHMEM::main_access(void *_local, int *serial_number)
455  {
456  
457      /* Check pointers. */
458      if (shm == NULL) {
459  	second_read = 0;
460  	return (status = CMS_MISC_ERROR);
461      }
462  
463      if (bsem == NULL && not_zero(blocking_timeout)) {
464  	rcs_print_error
465  	    ("No blocking semaphore available. Can not call blocking_read(%f).\n",
466  	    blocking_timeout);
467  	second_read = 0;
468  	return (status = CMS_NO_BLOCKING_SEM_ERROR);
469      }
470  
471      mao.read_only = ((internal_access_type == CMS_CHECK_IF_READ_ACCESS) ||
472  	(internal_access_type == CMS_PEEK_ACCESS) ||
473  	(internal_access_type == CMS_READ_ACCESS));
474  
475      switch (mutex_type) {
476      case NO_MUTEX:
477  	break;
478  
479      case MAO_MUTEX:
480      case MAO_MUTEX_W_OS_SEM:
481  	switch (mem_get_access(&mao)) {
482  	case -1:
483  	    rcs_print_error("SHMEM: Can't take semaphore\n");
484  	    second_read = 0;
485  	    return (status = CMS_MISC_ERROR);
486  	case -2:
487  	    if (timeout > 0) {
488  		rcs_print_error("SHMEM: Timed out waiting for semaphore.\n");
489  		rcs_print_error("buffer = %s, timeout = %lf sec.\n",
490  		    BufferName, timeout);
491  	    }
492  	    second_read = 0;
493  	    return (status = CMS_TIMED_OUT);
494  	default:
495  	    break;
496  	}
497  	toggle_bit = mao.toggle_bit;
498  	break;
499  
500      case OS_SEM_MUTEX:
501  	if (sem == NULL) {
502  	    second_read = 0;
503  	    return (status = CMS_MISC_ERROR);
504  	}
505  	switch (sem->wait()) {
506  	case -1:
507  	    rcs_print_error("SHMEM: Can't take semaphore\n");
508  	    second_read = 0;
509  	    return (status = CMS_MISC_ERROR);
510  	case -2:
511  	    if (timeout > 0) {
512  		rcs_print_error("SHMEM: Timed out waiting for semaphore.\n");
513  		rcs_print_error("buffer = %s, timeout = %lf sec.\n",
514  		    BufferName, timeout);
515  	    }
516  	    second_read = 0;
517  	    return (status = CMS_TIMED_OUT);
518  	default:
519  	    break;
520  	}
521  	break;
522  
523      case NO_INTERRUPTS_MUTEX:
524  	rcs_print_error("Interrupts can not be disabled.\n");
525  	second_read = 0;
526  	return (status = CMS_MISC_ERROR);
527  	break;
528  
529      case NO_SWITCHING_MUTEX:
530  	rcs_print_error("Interrupts can not be disabled.\n");
531  	return (status = CMS_MISC_ERROR);
532  	break;
533  
534      default:
535  	rcs_print_error("SHMEM: Invalid mutex type.(%d)\n", mutex_type);
536  	second_read = 0;
537  	return (status = CMS_MISC_ERROR);
538  	break;
539      }
540  
541      if (second_read > 0 && enable_diagnostics) {
542  	disable_diag_store = 1;
543      }
544  
545      /* Perform access function. */
546      internal_access(shm->addr, size, _local, serial_number);
547  
548      disable_diag_store = 0;
549  
550      if (NULL != bsem &&
551  	(internal_access_type == CMS_WRITE_ACCESS
552  	    || internal_access_type == CMS_WRITE_IF_READ_ACCESS)) {
553  	bsem->flush();
554      }
555      switch (mutex_type) {
556      case NO_MUTEX:
557  	break;
558  
559      case MAO_MUTEX:
560      case MAO_MUTEX_W_OS_SEM:
561  	mem_release_access(&mao);
562  	break;
563  
564      case OS_SEM_MUTEX:
565  	sem->post();
566  	break;
567      case NO_INTERRUPTS_MUTEX:
568  	rcs_print_error("Can not restore interrupts.\n");
569  	break;
570  
571      case NO_SWITCHING_MUTEX:
572  	rcs_print_error("Can not restore interrupts.\n");
573  	break;
574      }
575  
576      switch (internal_access_type) {
577  
578      case CMS_READ_ACCESS:
579  	if (NULL != bsem && status == CMS_READ_OLD &&
580  	    (blocking_timeout > 1e-6 || blocking_timeout < -1E-6)) {
581  	    if (second_read > 10 && total_subdivisions <= 1) {
582  		status = CMS_MISC_ERROR;
583  		rcs_print_error
584  		    ("CMS: Blocking semaphore error. The semaphore wait has returned %d times but there is still no new data.\n",
585  		    second_read);
586  		second_read = 0;
587  		return (status);
588  	    }
589  	    second_read++;
590  	    bsem->timeout = blocking_timeout;
591  	    int bsem_ret = bsem->wait();
592  	    if (bsem_ret == -2) {
593  		status = CMS_TIMED_OUT;
594  		second_read = 0;
595  		return (status);
596  	    }
597  	    if (bsem_ret == -1) {
598  		rcs_print_error("CMS: Blocking semaphore error.\n");
599  		status = CMS_MISC_ERROR;
600  		second_read = 0;
601  		return (status);
602  	    }
603  	    main_access(_local, serial_number);
604  	}
605  	break;
606  
607      case CMS_WRITE_ACCESS:
608      case CMS_WRITE_IF_READ_ACCESS:
609  	break;
610  
611      default:
612  	break;
613  
614      }
615  
616      second_read = 0;
617      return (status);
618  }