/ tsmpool.cpp
tsmpool.cpp
 1  #include "tsmpool.h"
 2  
 3  tsmpool::tsmpool(size_t size, int num) :
 4  	size(size), 
 5  	num(num) //number of buffers of (size) to alloc
 6  {
 7  	this->threads_cntr = 0;
 8  	this->ok = 1;
 9  	this->lowest_read_index = -1;
10  	this->write_index = 0;
11  	this->my_read_index = index_before(0);
12      if (pthread_mutex_init(&this->mutex, NULL) != 0) { this->ok = 0; return; }
13  	for(int i=0; i<num; i++) 
14  	{
15  		void* newptr = (void*)new char[size];
16  		if(!newptr) { this->ok = 0; return; }
17  		buffers.push_back(newptr);
18  	}
19  }
20  
21  int tsmpool::is_ok() { return this->ok; }
22  
23  void* tsmpool::get_write_buffer()
24  {
25  	//if(write_index==index_before(lowest_read_index)) return NULL;
26  	pthread_mutex_lock(&this->mutex);
27  	void* to_return = buffers[write_index];
28  	write_index = index_next(write_index);
29  	pthread_mutex_unlock(&this->mutex);
30  	if(TSM_DEBUG) fprintf(stderr, "gwb: write_index = %d\n", write_index);
31  	return to_return;
32  }
33  
34  tsmthread_t* tsmpool::register_thread()
35  {
36  	if(!ok) return NULL;
37  	pthread_mutex_lock(&this->mutex);
38  	tsmthread_t* thread = new tsmthread_t();
39  	thread->read_index = index_before(write_index);
40  	threads.push_back(thread);
41  	pthread_mutex_unlock(&this->mutex);
42  	return thread;
43  }
44  
45  void tsmpool::remove_thread(tsmthread_t* thread)
46  {
47  	pthread_mutex_lock(&this->mutex);
48  	for(int i=0;i<threads.size();i++)
49  		if(threads[i] == thread)
50  		{
51  			delete threads[i];
52  			threads.erase(threads.begin()+i);
53  			break;
54  		}
55  	pthread_mutex_unlock(&this->mutex);
56  }
57  
58  void* tsmpool::get_read_buffer(tsmthread_t* thread)
59  {
60  	pthread_mutex_lock(&this->mutex);
61  	int* actual_read_index = (thread==NULL) ? &my_read_index : &thread->read_index;
62  	if(*actual_read_index==index_before(write_index)) 
63  	{
64  		if(TSM_DEBUG) fprintf(stderr, "grb: fail,"
65  			"read_index %d is just before write_index\n", *actual_read_index);
66  		pthread_mutex_unlock(&this->mutex);
67  		return NULL;
68  	}
69  	void* to_return = buffers[*actual_read_index];
70  	*actual_read_index=index_next(*actual_read_index);
71  	pthread_mutex_unlock(&this->mutex);
72  	if(TSM_DEBUG) fprintf(stderr, "grb: read_index = %d\n", *actual_read_index);
73  	return to_return;
74  }