Multi-ApplicationOnlineProfiling  2.1
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
Pbb_Workers.c
Go to the documentation of this file.
1 /* ############################ MALP License ############################## */
2 /* # Fri Jan 18 14:00:00 CET 2013 # */
3 /* # Copyright or (C) or Copr. Commissariat a l'Energie Atomique # */
4 /* # # */
5 /* # This software is governed by the CeCILL-C license under French law # */
6 /* # and abiding by the rules of distribution of free software. You can # */
7 /* # use, modify and/ or redistribute the software under the terms of # */
8 /* # the CeCILL-C license as circulated by CEA, CNRS and INRIA at the # */
9 /* # following URL http://www.cecill.info. # */
10 /* # # */
11 /* # The fact that you are presently reading this means that you have # */
12 /* # had knowledge of the CeCILL-C license and that you accept its # */
13 /* # terms. # */
14 /* # # */
15 /* # Authors: # */
16 /* # - BESNARD Jean-Baptiste jean-baptiste.besnard@cea.fr # */
17 /* # # */
18 /* ######################################################################## */
19 #include "Pbb_Workers.h"
20 #include <unistd.h>
21 #include "Data_Entry.h"
22 #include "MALP_Blackboard.h"
23 
24 
25 #define MAX_BACK_OFF_ID 11
26 static const unsigned int Pbb_workers_backoff_times[MAX_BACK_OFF_ID] =
27  {0,3,4,7,12,20,33,55,90,148,245};
28 
29 
30 static inline void Pbb_workers_process( struct MALP_blackboard *bb, struct Knowledge_system_job *job )
31 {
32  //printf("Calling [process : %p default : %p entries : %d]\n", job->process, bb->default_handler, job->entries_count);
33 
34 
35  /*int i;
36  for( i = 0 ; i < job->entries_count ; i++ )
37  {
38  printf("[%d/%d]@[%p] KEY : %llX SIZE : %d PL : %p\n", i + 1, job->entries_count, job->entries[i], job->entries[i]->key, job->entries[i]->size, job->entries[i]->payload);
39  }
40 
41 
42  printf("GOT process to %p KS : %lld Entries : %d pent %p arg : %p\n", job->process, job->ks_type, job->entries_count, job->entries, job->arg);
43  */
44 
45  if( !job->process )
46  {
47  if( bb->default_handler )
48  {
49  (bb->default_handler)( bb, job->ks_type, job->entries, job->entries_count, job->arg );
50  }
51  //else
52  //{
53  //printf("ERROR : There was a KS without associated process and no default KS handler\n");
54  //abort();
55  //}
56  }
57  else
58  {
59  (job->process)( bb, job->entries, job->entries_count, job->arg );
60  }
61 }
62 
63 void * Pbb_workers_mainloop( void *pdesc )
64 {
65  struct Pbb_workers_desc *desc = (struct Pbb_workers_desc *)pdesc;
66  struct Pbb_workers *wk = (struct Pbb_workers *)desc->wk;
67  int id = desc->id;
68 
69  int had_work = 0;
70  int backoff = 0;
71 
72  struct Knowledge_system_job job;
73 
74  uint8_t *to_free = malloc( 64 * sizeof( uint8_t ) );
75 
76  if( !to_free )
77  {
78  perror("malloc");
79  abort();
80  }
81 
82  while( wk->running )
83  {
84  memset( to_free, 0 , 64 * sizeof( uint8_t ));
85  had_work = 0;
86 
87  /* Acquire a job */
88 
89  int i = 0;
90 
91  void *got_job = NULL;
92 
93  got_job = Buffered_FIFO_pop( &wk->fifos[id], (void *)&job);
94 
95  /* We have a job */
96  if( got_job )
97  {
98  had_work = 1;
99 
100  /* Process JOB */
101  if( had_work )
102  {
103  if( 64 < job.entries_count )
104  {
105  printf("Error maximum Data entries count reached => update value\n");
106  abort();
107  }
108 
109 
110  for( i = 0 ; i < job.entries_count ; i++ )
111  {
112  uint64_t ret = Data_entry_dec_ref_count( job.entries[i] );
113  if( ret == 0 )
114  {
115  to_free[i] = 1;
116  }
117  }
118 
119  Pbb_workers_process( wk->bb, &job );
120 
121  /* Update the refcount of each data entry and free when zero */
122  for( i = 0 ; i < job.entries_count ; i++ )
123  {
124  if( job.entries[i]->been_resubmitted == 0 )
125  {
126  if( to_free[i] )
127  {
128  //printf("Removing data @ %llX\n", job.entries[i]->key );
129  /* One data to be freed update pending */
131 
132  Data_entry_release( job.entries[i] );
133  job.entries[i] = NULL;
134  }
135  }
136  else
137  {
138  job.entries[i]->been_resubmitted = 0;
139  }
140  }
141 
143  }
144  }
145 
146  /* Back off handling */
147 
148  if( had_work )
149  {
150  backoff=0;
151  }
152  else
153  {
154  if( backoff < (MAX_BACK_OFF_ID - 1) )
155  backoff++;
156  }
157 
158  if( 5 < backoff )
159  pthread_yield();
160 
161  //printf("Th %d BO : %d T : %d FIFO : %lld\n", id, backoff, Pbb_workers_backoff_times[ backoff ], (long long int)Buffered_FIFO_count(&wk->fifos[id]));
162  usleep( Pbb_workers_backoff_times[ backoff ] );
163 
164  }
165 
166  return NULL;
167 }
168 
169 void Pbb_workers_init( struct Pbb_workers *wk, struct MALP_blackboard *bb, uint32_t wk_count)
170 {
171  if( !wk_count )
172  {
173  printf("%s has been given erroneous arguments\n", __FUNCTION__ );
174  abort();
175  }
176 
177  wk->bb = bb;
178  wk->worker_count = wk_count;
179 
180  int i = 0;
181 
182  wk->running = 1;
183 
184  wk->fifos = malloc( wk_count * sizeof( struct Buffered_FIFO ) );
185  for( i = 0; i < wk_count ; i++ )
186  Buffered_FIFO_init(&wk->fifos[i], 200, sizeof( struct Knowledge_system_job ));
187 
188 
189 
190  wk->threads = malloc( wk_count * sizeof( pthread_t ) );
191  wk->descs = malloc( wk_count * sizeof( struct Pbb_workers_desc ) );
192 
193  for( i = 0; i < wk_count ; i++ )
194  {
195  wk->descs[i].wk = wk;
196  wk->descs[i].id = i;
197  pthread_create(&wk->threads[i], NULL, Pbb_workers_mainloop, (void *)&wk->descs[i] );
198  }
199 
200 }
201 
202 
204 {
205  int i = 0;
206 
207  wk->running = 0;
208 
209  for( i = 0; i < wk->worker_count ; i++ )
210  pthread_join( wk->threads[i], NULL );
211 
212 
213  free( wk->threads );
214  wk->threads = NULL;
215 
216  for( i = 0; i < wk->worker_count ; i++ )
217  Buffered_FIFO_release( &wk->fifos[i], NULL );
218 
219  wk->worker_count = 0;
220 
221  free( wk->fifos );
222  wk->fifos = NULL;
223 }
224 
225 
226 void Pbb_workers_submit( struct Pbb_workers *wk, struct Knowledge_system_job *ksj )
227 {
228  int entry_point = 0;
229 
230  if( ksj->is_serial )
231  {
232  entry_point = ksj->ks_type % wk->worker_count;
233  }
234  else
235  {
236  entry_point = rand() % wk->worker_count;
237  }
238  Buffered_FIFO_push(&wk->fifos[entry_point], (void *)ksj);
239 }
240 
241 void Pbb_workers_wait( struct Pbb_workers *wk )
242 {
243  int i = 0;
244  int all_zero = 0;
245 
246  /* Check if no jobs */
247  while( !all_zero )
248  {
249 
250  all_zero = 1;
251  for( i = 0; i < wk->worker_count ; i++ )
252  {
253  if( Buffered_FIFO_count( &wk->fifos[i] ) )
254  {
255  all_zero = 0;
256  break;
257  }
258 
259  }
260 
261  /* If there is no job left
262  * Check if there is also no pending data in BB */
263  if( all_zero )
264  {
265  uint64_t pending = MALP_Blackboard_pending( wk->bb );
266 
267  if( pending )
268  all_zero = 0;
269 
270  }
271 
272  /* Be very nice with the other threads if we are pinned
273  * on the same core to avoid live-locking
274  */
275  pthread_yield();
276 
277  }
278 
279 }
struct Pbb_workers_desc * descs
the workers descriptors
Definition: Pbb_Workers.h:56
static void MALP_Blackboard_remove_pending(struct MALP_blackboard *bb)
Decrements the number of pending data block counter.
#define MAX_BACK_OFF_ID
Definition: Pbb_Workers.c:25
struct Data_entry ** entries
void Buffered_FIFO_release(struct Buffered_FIFO *fifo, void(*free_func)(void *))
releases a FIFO
This is a struct defining a FIFO It is composed of several Buffered_FIFO_chunk.
int id
the id of the workers
Definition: Pbb_Workers.h:43
void * Buffered_FIFO_push(struct Buffered_FIFO *fifo, void *elem)
Pushes an element into a FIFO.
void * Buffered_FIFO_pop(struct Buffered_FIFO *fifo, void *dest)
Pops an element from a FIFO.
struct representing a job that does the actual data processing (created by Knowledge_system) ...
void(* default_handler)(struct MALP_blackboard *, uint64_t KS_id, struct Data_entry **, uint32_t, void *arg)
The MALP_blackboard structure.
struct MALP_blackboard * bb
the blackboard where the workers run
Definition: Pbb_Workers.h:60
static uint64_t MALP_Blackboard_pending(struct MALP_blackboard *bb)
Returns the number of pebnding data block counter.
uint32_t running
indicates if the workers are running or not
Definition: Pbb_Workers.h:58
static const unsigned int Pbb_workers_backoff_times[MAX_BACK_OFF_ID]
Definition: Pbb_Workers.c:26
Structure defining a parallel blackboard worker set.
Definition: Pbb_Workers.h:52
void Data_entry_release(struct Data_entry *de)
Clears a Data_entry.
Definition: Data_Entry.c:64
uint64_t Buffered_FIFO_count(struct Buffered_FIFO *fifo)
Thread-safely gets the number of elements stored in the FIFO.
Definition: Buffered_FIFO.c:23
void * Pbb_workers_mainloop(void *pdesc)
Definition: Pbb_Workers.c:63
uint32_t worker_count
the number of workers
Definition: Pbb_Workers.h:54
void Pbb_workers_wait(struct Pbb_workers *wk)
Waits until all workers are done.
Definition: Pbb_Workers.c:241
void(* process)(struct MALP_blackboard *bb, struct Data_entry **de, uint32_t entry_count, void *arg)
pthread_t * threads
the threads of the workers
Definition: Pbb_Workers.h:57
uint64_t ks_type
the type of Knowledge_system
static uint64_t Data_entry_dec_ref_count(struct Data_entry *de)
Decrements the ref_count of a Data_entry.
Definition: Data_Entry.h:127
void Buffered_FIFO_init(struct Buffered_FIFO *fifo, uint64_t chunk_size, size_t elem_size)
Initializes a Buffered_FIFO.
struct Buffered_FIFO * fifos
a set of FIFOs where Knowledge_system_job are stored
Definition: Pbb_Workers.h:61
void Pbb_workers_submit(struct Pbb_workers *wk, struct Knowledge_system_job *ksj)
sumbits a job on workers
Definition: Pbb_Workers.c:226
struct Pbb_workers * wk
the workers
Definition: Pbb_Workers.h:44
void Knowledge_system_job_release(struct Knowledge_system_job *ksj)
Knowledge_system_job_release.
void Pbb_workers_init(struct Pbb_workers *wk, struct MALP_blackboard *bb, uint32_t wk_count)
Initializes workers.
Definition: Pbb_Workers.c:169
static void Pbb_workers_process(struct MALP_blackboard *bb, struct Knowledge_system_job *job)
Definition: Pbb_Workers.c:30
void Pbb_workers_release(struct Pbb_workers *wk)
releases workers
Definition: Pbb_Workers.c:203
structure describing a workers descriptor
Definition: Pbb_Workers.h:41