Multi-ApplicationOnlineProfiling  2.1
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
VMPI_stream.c
Go to the documentation of this file.
1 /* ############################ MALP License ############################## */
2 /* # Fri Jan 18 14:00:00 CET 2013 # */
3 /* # Copyright or (C) or Copr. Commissariat a l'Energie Atomique # */
4 /* # # */
5 /* # This software is governed by the CeCILL-C license under French law # */
6 /* # and abiding by the rules of distribution of free software. You can # */
7 /* # use, modify and/ or redistribute the software under the terms of # */
8 /* # the CeCILL-C license as circulated by CEA, CNRS and INRIA at the # */
9 /* # following URL http://www.cecill.info. # */
10 /* # # */
11 /* # The fact that you are presently reading this means that you have # */
12 /* # had knowledge of the CeCILL-C license and that you accept its # */
13 /* # terms. # */
14 /* # # */
15 /* # Authors: # */
16 /* # - BESNARD Jean-Baptiste jean-baptiste.besnard@cea.fr # */
17 /* # # */
18 /* ######################################################################## */
19 #include "VMPI_stream.h"
20 
21 #include <string.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 
25 
26 int VMPI_Stream_init( VMPI_Stream *st, size_t block_size, VMPI_Stream_load_balance lb)
27 {
28  int i;
29  memset( st, 0, sizeof( VMPI_Stream ));
30  st->lb = lb;
31  st->block_size = block_size;
32 
33  st->r_tmp_buffers = NULL;
34  st->r_req = NULL;
35  st->r_stat = NULL;
36  st->r_current_buff = NULL;
37 
38  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC ; i++ )
39  {
40  /* Fill in requests */
41  st->w_req[i] = MPI_REQUEST_NULL;
42 
43 
44  st->w_tmp_buffers[i] = malloc( block_size );
45 
46  if( !st->w_tmp_buffers[i] )
47  return 2;
48  }
49 
50  st->lock = 0;
51 
52  return 0;
53 }
54 
55 
56 
57 
58 int VMPI_Stream_push_stream( VMPI_Stream *st, int dest, int remote_tag, int local_tag, VMPI_Stream_mode mode )
59 {
60  struct _VMPI_Stream new_stream;
61  int i, j;
62 
63  new_stream.target = dest;
64  new_stream.remote_tag = remote_tag;
65  new_stream.local_tag = local_tag;
66  new_stream.state = 1;
67 
68  /* Make sure we dont push the same rank twice */
69  switch( mode )
70  {
71  case VMPI_STREAM_R:
72  case VMPI_STREAM_RW:
73  for( i = 0; i < st->in_count; i++ )
74  {
75  /* Check for this stream in incoming */
76  if( st->in[i].target == dest )
77  return 0;
78 
79  if( mode == VMPI_STREAM_RW )
80  {
81  case VMPI_STREAM_W:
82  for( j = 0; j < st->out_count; j++ )
83  {
84  if( st->out[i].target == dest )
85  return 0;
86  }
87 
88  if( mode == VMPI_STREAM_W )
89  break;
90  }
91  }
92  break;
93  }
94 
95  /* Push a given rank */
96  switch( mode )
97  {
98  case VMPI_STREAM_RW:
99  case VMPI_STREAM_R:
100  /* We want to make sure that each stream is constantly
101  * in the read request list to avoid deadlocks
102  * as a consequences readers have to book more memory
103  *
104  * To do so each stream has VMPI_STREAM_MAX_ASYNC requests
105  * these could have been the same on the writer side but
106  * as the reader relies on these data for computing we
107  * want it to get as much data as possible whereas
108  * the writer wants to get rid of his data as fast
109  * as possible with low memory footprint */
110 
111  st->in_count++;
112 
113  /* Allocate room for streams */
114  st->in = realloc( st->in, st->in_count * sizeof( struct _VMPI_Stream ) );
115 
116  if( !st->in )
117  return 2;
118 
119  memcpy(&st->in[ st->in_count - 1 ], &new_stream, sizeof( struct _VMPI_Stream ) );
120 
121  /* Allocate requests */
122  st->r_req = realloc( st->r_req, st->in_count * sizeof( MPI_Request ) * VMPI_STREAM_MAX_ASYNC );
123 
124  if( !st->r_req )
125  return 2;
126 
127  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC ; i++ )
128  {
129  st->r_req[ (st->in_count*VMPI_STREAM_MAX_ASYNC) - VMPI_STREAM_MAX_ASYNC + i ] = MPI_REQUEST_NULL;
130  }
131 
132  /* Allocate statuses */
133  st->r_stat = realloc( st->r_stat, st->in_count * sizeof( MPI_Status ) * VMPI_STREAM_MAX_ASYNC );
134 
135  if( !st->r_stat )
136  return 2;
137 
138  /* Allocate buffer counter */
139  st->r_current_buff = realloc( st->r_current_buff, st->in_count * sizeof( int ) * VMPI_STREAM_MAX_ASYNC );
140 
141  if( !st->r_current_buff )
142  return 2;
143 
144  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC ; i++ )
145  {
146  st->r_current_buff[ (st->in_count*VMPI_STREAM_MAX_ASYNC) - VMPI_STREAM_MAX_ASYNC + i ] = -1;
147  }
148 
149  /* Allocate buffers */
150  st->r_tmp_buffers = realloc( st->r_tmp_buffers, st->in_count * sizeof( char * ) * VMPI_STREAM_MAX_ASYNC );
151 
152  if( !st->r_tmp_buffers )
153  return 2;
154 
155  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC ; i++ )
156  {
157  st->r_tmp_buffers[ (st->in_count*VMPI_STREAM_MAX_ASYNC) - VMPI_STREAM_MAX_ASYNC + i ] = malloc( st->block_size );
158 
159  if( !st->r_tmp_buffers[ (st->in_count*VMPI_STREAM_MAX_ASYNC) - VMPI_STREAM_MAX_ASYNC + i ] )
160  return 2;
161  }
162 
163 
164  if( mode == VMPI_STREAM_R )
165  break;
166  /* otherwise no break */
167  case VMPI_STREAM_W:
168  st->out_count++;
169  st->out = realloc( st->out, st->out_count * sizeof( struct _VMPI_Stream ) );
170 
171  if( !st->out )
172  return 2;
173 
174  memcpy(&st->out[ st->out_count - 1 ], &new_stream, sizeof( struct _VMPI_Stream ) );
175  break;
176  default :
177  printf("No such mode \n");
178  abort();
179  }
180 
181  return 0;
182 }
183 
184 
185 
186 
188 {
189  int i;
190  for( i = 0 ; i < src->in_count ; i++ )
191  {
192  if (VMPI_Stream_push_stream( dest, src->in[i].target , src->in[i].remote_tag, src->in[i].local_tag, VMPI_STREAM_R) != 0)
193  return 2;
194  }
195 
196  for( i = 0 ; i < src->out_count ; i++ )
197  {
198  if(VMPI_Stream_push_stream( dest, src->out[i].target , src->out[i].remote_tag, src->out[i].local_tag, VMPI_STREAM_W) != 0)
199  return 2;
200  }
201  return 0;
202  /* Todo clean release of src */
203 }
204 
205 
214 {
215  int rank;
216  int tag;
218 };
219 
220 
221 int VMPI_Stream_open( VMPI_Stream *st, int dest, char *smode )
222 {
223  struct TMP_VMPI_Stream_desc tmp_desc;
224  struct TMP_VMPI_Stream_desc remote_desc;
225  int rank, local_tag;
227  MPI_Status status;
228 
229  /* Check mode */
230  {
231  if( !strcmp( smode, "r") )
232  {
233  mode = VMPI_STREAM_R;
234  }else if( !strcmp( smode, "w") )
235  {
236  mode = VMPI_STREAM_W;
237  }
238  else if( !strcmp( smode, "rw") || !strcmp( smode, "wr") )
239  {
240  mode = VMPI_STREAM_RW;
241  }
242  else
243  {
244  printf("No such mode '%s' in %s\n", smode, __FUNCTION__ );
245  abort();
246  }
247  }
248 
249  /* Fill in mode desc */
250 
251  PMPI_Comm_rank(VMPI_Get_vmpi_comm(), &rank);
252 
253  /* We allocate a tag localy to
254  * identify this stream */
255  local_tag = VMPI_get_new_tag();
256 
257  tmp_desc.tag = local_tag;
258  tmp_desc.rank = rank;
259  tmp_desc.mode = mode;
260 
261 
262  /* Exchange mode descs using the VMPI_NG_ARBITRARY_VALUE + 1 tag*/
263  if( PMPI_Sendrecv((void *)&tmp_desc, sizeof(struct TMP_VMPI_Stream_desc), MPI_CHAR, dest, VMPI_NG_ARBITRARY_VALUE + 2,
264  (void *)&remote_desc, sizeof(struct TMP_VMPI_Stream_desc), MPI_CHAR, dest, VMPI_NG_ARBITRARY_VALUE + 2,
265  VMPI_Get_vmpi_comm(), &status) != MPI_SUCCESS )
266  {
267  return 1;
268  }
269 
270  MALP_Spinlock_lock(&st->lock);
271 
272  /* Check validity of remote desc */
273 
274  /* Check rank just in case */
275  if( remote_desc.rank != dest )
276  {
277  printf("Error exchanging MPI_Stream informations\n");
278  abort();
279  }
280 
281  /* Check modes */
282  if( (mode == VMPI_STREAM_R) && (remote_desc.mode == VMPI_STREAM_R) )
283  {
284  printf("Error opening a stream with two read endpoints is not possible\n");
285  abort();
286  }
287 
288  if( (mode == VMPI_STREAM_W) && (remote_desc.mode == VMPI_STREAM_W) )
289  {
290  printf("Error opening a stream with two write endpoints is not possible\n");
291  abort();
292  }
293 
294  /* If check passed store the new stream info */
295  VMPI_Stream_push_stream( st, dest, remote_desc.tag, local_tag, mode );
296 
298 
299  return 0;
300 }
301 
302 
304 {
305  int i = 0;
306 
307  for( i = 0 ; i < map->count ; i++ )
308  {
309  VMPI_Stream_open( st, map->ranks[i], mode );
310  }
311 
312  return 0;
313 }
314 
315 
317 {
318  int i, ret;
319  int dummy;
320  char *buff = malloc( st->block_size );
321  VMPI_Partition_desc *desc;
322 
323  desc = VMPI_Get_desc();
324 
325  if( !buff )
326  {
327  return 2;
328  }
329 
330  /* Ensure everything is written */
331  if( st->w_req_count )
332  PMPI_Waitall(st->w_req_count, st->w_req, st->w_stat);
333 
334  /* Broadcast close to outgoing targets so that read return EOF */
335  for( i = 0 ; i < st->out_count ; i++ )
336  {
337  if( PMPI_Send((void *)&dummy, 0, MPI_INT, st->out[i].target, st->out[i].remote_tag, VMPI_Get_vmpi_comm()) != MPI_SUCCESS )
338  return 1;
339  }
340 
341 
342  /* Wait for incoming close data on inbounds streams */
343  if( st->in_count )
344  {
345  do
346  {
347  ret = VMPI_Stream_read( st, buff, 1, VMPI_STREAM_BLOCK );
348 
349  if( ret )
350  {
351  printf("Warning : Unprocessed datas in partition %d (%s)\n", desc?desc->id:-1, desc?desc->name:"UNKNOWN");
352  printf("Make sure calls to close are done in both partitions\n");
353  abort();
354  }
355 
356  }while( ret );
357  }
358 
359  /* Now that we are done free everything */
360  free( st->in );
361  st->in = NULL;
362  st->in_count = 0;
363  free( st->out );
364  st->out_count = 0;
365  free( st->r_stat );
366  st->r_stat = NULL;
367  free( st->r_req );
368  st->r_req = NULL;
369 
370  /* Release tmp buffs */
371  for( i = 0 ; i < st->in_count * VMPI_STREAM_MAX_ASYNC ; i++ )
372  {
373  free( st->r_tmp_buffers[i] );
374  st->r_tmp_buffers[i] = NULL;
375  }
376 
377  free( st->r_tmp_buffers );
378  st->r_tmp_buffers = NULL;
379 
380  free( st->r_current_buff );
381  st->r_current_buff = NULL;
382 
383  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC; i++ )
384  {
385  free( st->w_tmp_buffers[i] );
386  st->w_tmp_buffers[i] = NULL;
387  }
388 
389  memset( st, 0, sizeof( VMPI_Stream ) );
390 
391 
392  return 0;
393 }
394 
395 int VMPI_Stream_write( VMPI_Stream * st, char *buff, unsigned int block_count )
396 {
397  size_t len;
398  char *dat_ptr;
399  char *tmp_buff;
400  MPI_Request *req;
401  int index, a_value;
402  MPI_Status status;
403  int written_size;
404  struct _VMPI_Stream *dest;
405 
406  len = block_count * st->block_size;
407 
408 
409  if( len == 0 )
410  return 0;
411 
412  written_size = 0;
413  dat_ptr = buff;
414 
415  MALP_Spinlock_lock(&st->lock);
416 
417  while( 1 )
418  {
419  /* Did we write everything ? */
420  if( len == 0 )
421  {
422  break;
423  }
424 
425  /* Find a free request */
427  {
428  /* We can allocate a new request */
429  req = &st->w_req[st->w_req_count];
430  tmp_buff = st->w_tmp_buffers[st->w_req_count];
431  st->w_req_count ++;
432  }
433  else
434  {
435  /* We can use an old one */
436  PMPI_Waitany(st->w_req_count, st->w_req, &index, &status);
437 
438  if( index != MPI_UNDEFINED )
439  {
440  req = &st->w_req[index];
441  tmp_buff = st->w_tmp_buffers[index];
442  }
443  else
444  {
445  continue;
446  }
447  }
448 
449  /* We now have a free request */
450 
451  /* Pick a dest according to policy */
452  switch( st->lb )
453  {
455  dest = &st->out[0];
456  break;
458  a_value = rand()%st->out_count;
459  dest = &st->out[a_value];
460  break;
462  st->current_out = (st->current_out + 1)%st->out_count;
463  dest = &st->out[st->current_out];
464  break;
465  }
466 
467 
468  /* Store in tmp Buff to relax user buffer */
469  memcpy( tmp_buff, dat_ptr, st->block_size );
470 
471 
472  /* Now lets send datas */
473  /* Do we have enough datas to fill a block_size ? */
474 
475  //printf("WRITE send to <%d;%d>\n", dest->target, dest->remote_tag );
476  if( PMPI_Isend((void *)tmp_buff, st->block_size, MPI_CHAR, dest->target, dest->remote_tag, VMPI_Get_vmpi_comm(), req) != MPI_SUCCESS )
477  {
479  return -1;
480  }
481 
482  dat_ptr += st->block_size;
483  len -= st->block_size;
484  written_size += st->block_size;
485  }
486 
488 
489  return written_size;
490 }
491 
492 
493 int VMPI_Stream_read( VMPI_Stream * st, char *buff, unsigned int block_count, VMPI_Stream_blocking block )
494 {
495  size_t len;
496  int read_size, i, dat_size, has_data;
497  struct _VMPI_Stream *src;
498  char *dat_ptr;
499  int src_id, target_index, flag;
500 
501  len = block_count * st->block_size;
502  dat_ptr = buff;
503  read_size = 0;
504 
505  if( len == 0 )
506  return VMPI_SUCCESS;
507 
508  MALP_Spinlock_lock(&st->lock);
509 
510  /* Enter Read LOOP */
511  read_size = 0;
512  while( 1 )
513  {
514  if( len == 0 )
515  {
517  return read_size;
518  }
519 
520  /* Check if all streams are closed */
521  has_data = 0;
522  for( i = 0 ; i < st->in_count; i++ )
523  {
524  has_data |= st->in[i].state;
525  }
526 
527  /* All streams are closed */
528  if( !has_data )
529  {
531  return 0;
532  }
533 
534  src_id = 0;
535  has_data = 0;
536 
537  /* Pick an open stream */
538  while( !has_data )
539  {
540  /* Pick a source to process according to policy */
541  switch( st->lb )
542  {
544  src_id = 0;
545 
546  /* Is the only stream from which we can read closed */
547  if( st->in[src_id].state == 0 )
548  {
550  return 0;
551  }
552 
553  break;
555  src_id = rand()%st->in_count;
556  break;
558  st->current_in = (st->current_in + 1)%st->in_count;
559  src_id = st->current_in;
560  break;
561  }
562 
563  has_data = st->in[src_id].state;
564  }
565 
566  src = &st->in[ src_id ];
567 
568  if( st->r_current_buff[ src_id ] == -1 )
569  {
570  /* There is no receive in this buffer pool */
571  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC; i++ )
572  {
573  //printf("READ INIT Receive from <%d;%d>\n", src->target, src->local_tag );
574  if( PMPI_Irecv((void *)st->r_tmp_buffers[ src_id * VMPI_STREAM_MAX_ASYNC + i ] ,
575  st->block_size, MPI_CHAR, src->target, src->local_tag, VMPI_Get_vmpi_comm(), &st->r_req[ src_id * VMPI_STREAM_MAX_ASYNC + i ]) != MPI_SUCCESS )
576  {
578  return VMPI_ERROR;
579  }
580  }
581 
582  /* Current buff is now 0 */
583  st->r_current_buff[src_id] = 0;
584  }
585 
586  /* Read */
587 
588  target_index = src_id * VMPI_STREAM_MAX_ASYNC + st->r_current_buff[ src_id ];
589 
590  if( block == VMPI_STREAM_NONBLOCK )
591  {
592  flag = 0;
593  PMPI_Test( &st->r_req[ target_index ], &flag, &st->r_stat[ target_index ]);
594 
595  if( !flag )
596  {
597  /* Operation not completed */
599  return VMPI_EAGAIN;
600  }
601  }
602  else
603  {
604  PMPI_Wait( &st->r_req[ target_index ], &st->r_stat[ target_index ]);
605  }
606 
607  /* Get incoming size */
608  PMPI_Get_count( &st->r_stat[ target_index ], MPI_CHAR, &dat_size );
609 
610  /* We received EOF */
611  if( dat_size == 0 )
612  {
613  /* Flag stream closed */
614  st->in[src_id].state = 0;
615 
616  /* Cancel all other receives */
617  for( i = 0 ; i < VMPI_STREAM_MAX_ASYNC ; i++ )
618  {
619  if( st->r_current_buff[ src_id ] != i )
620  {
621  target_index = src_id * VMPI_STREAM_MAX_ASYNC + i;
622  PMPI_Cancel( &st->r_req[ target_index ] );
623  }
624 
625  }
626 
627  continue;
628  }
629 
630  /* We have data */
631  memcpy( dat_ptr, (void *)st->r_tmp_buffers[ target_index ], st->block_size );
632 
633  /* Repost receive */
634  //printf("READ Receive from <%d;%d>\n", src->target, src->local_tag );
635  if( PMPI_Irecv((void *)st->r_tmp_buffers[target_index] ,
636  st->block_size, MPI_CHAR, src->target, src->local_tag, VMPI_Get_vmpi_comm(), &st->r_req[ target_index ]) != MPI_SUCCESS )
637  {
639  return VMPI_ERROR;
640  }
641 
642  /* Switch to next buff */
643  st->r_current_buff[ src_id ] = (st->r_current_buff[ src_id ] + 1)%VMPI_STREAM_MAX_ASYNC;
644 
645  len -= st->block_size;
646  read_size += st->block_size;
647  dat_ptr += st->block_size;
648  }
649 
651 
652  return 0;
653 }
654 
656 {
657  int src;
658  int ret = 0;
659  int target_index;
660 
661  switch( st->lb )
662  {
664  src = 0;
665  break;
667  /* As we cannot know the target stream we return ready for read */
668  return 1;
670  src = (st->current_in + 1)%st->in_count;
671  break;
672  }
673 
674  target_index = src * VMPI_STREAM_MAX_ASYNC + st->r_current_buff[ src ];
675 
676  if( st->r_req )
677  if( st->r_req[target_index] != MPI_REQUEST_NULL )
678  PMPI_Test(&st->r_req[target_index], &ret, MPI_STATUS_IGNORE);
679 
680  return ret;
681 }
682 
684 {
685  int dest;
686 
687  int ret = 0;
688  switch( st->lb )
689  {
691  dest = 0;
692  break;
694  /* As we cannot know the target stream we return ready for write */
695  return 1;
697  dest = (st->current_out + 1)%st->out_count;
698  break;
699  }
700 
701  if( st->w_req )
702  if( st->w_req[dest] != MPI_REQUEST_NULL )
703  PMPI_Test(&st->w_req[dest], &ret, MPI_STATUS_IGNORE);
704 
705  return ret;
706 }
All Calls are blocking ones.
Definition: VMPI_stream.h:64
All OK.
Definition: VMPI.h:59
int target
Target task.
Definition: VMPI_stream.h:75
#define VMPI_STREAM_MAX_ASYNC
Maximum number of asynchronous cells.
Definition: VMPI_stream.h:85
char ** r_tmp_buffers
Temporary buffer for input.
Definition: VMPI_stream.h:108
Defines a map.
Definition: VMPI.h:287
struct _VMPI_Stream * in
Input streams.
Definition: VMPI_stream.h:98
int * ranks
Target ranks.
Definition: VMPI.h:290
int tag
The local tag.
Definition: VMPI_stream.c:216
int VMPI_get_new_tag()
Book a tag on VMPI's communicator for this process.
Definition: VMPI.c:32
char name[200]
Name of the partition (could be NONE)
Definition: VMPI.h:105
int VMPI_Stream_test_write(VMPI_Stream *st)
Is this stream ready for writing ?
Definition: VMPI_stream.c:683
static MPI_Comm VMPI_Get_vmpi_comm()
Get VMPI's copy of MPI_COMM_WORLD.
Definition: VMPI.h:323
struct _VMPI_Stream * out
Output streams.
Definition: VMPI_stream.h:102
int out_count
Number of outputs.
Definition: VMPI_stream.h:104
MPI_Request w_req[VMPI_STREAM_MAX_ASYNC]
Output requests.
Definition: VMPI_stream.h:116
int VMPI_Stream_open_map(VMPI_Stream *st, VMPI_Map *map, char *mode)
Open a stream according to a mapping.
Definition: VMPI_stream.c:303
Something went wrong.
Definition: VMPI.h:57
int VMPI_Stream_test_read(VMPI_Stream *st)
Is this stream ready for read ?
Definition: VMPI_stream.c:655
MPI_Status w_stat[VMPI_STREAM_MAX_ASYNC]
Output statuses.
Definition: VMPI_stream.h:117
Send or receive from a random one.
Definition: VMPI_stream.h:42
int current_out
Current output stream.
Definition: VMPI_stream.h:103
int MALP_Spinlock_unlock(MALP_Spinlock *mutex)
Unlocks the given MALP_Spinlock.
Definition: Spinlock.c:41
int w_req_count
Number of output requests.
Definition: VMPI_stream.h:118
READ and WRITE.
Definition: VMPI_stream.h:54
int * r_current_buff
Current buffer.
Definition: VMPI_stream.h:109
MALP_Spinlock lock
Spinlock.
Definition: VMPI_stream.h:122
Structure holding partitions descriptions.
Definition: VMPI.h:100
Temporary stream description struct.
Definition: VMPI_stream.c:213
int rank
The rank of the stream.
Definition: VMPI_stream.c:215
VMPI_Stream_mode mode
the mode
Definition: VMPI_stream.c:217
Internal declaration of a stream endpoint.
Definition: VMPI_stream.h:73
VMPI_Stream_load_balance
Defines the load ballancing modes available to MPI_Streams.
Definition: VMPI_stream.h:39
int in_count
Number of inputs.
Definition: VMPI_stream.h:100
int MALP_Spinlock_lock(MALP_Spinlock *mutex)
Locks the given MALP_Spinlock.
Definition: Spinlock.c:29
VMPI_Stream_mode
Defines MPI_Stream mode.
Definition: VMPI_stream.h:50
int local_tag
Local unique tag.
Definition: VMPI_stream.h:77
int id
Unique identifier of the partition.
Definition: VMPI.h:102
VMPI_Stream_load_balance lb
Load balancing policy.
Definition: VMPI_stream.h:120
int VMPI_Stream_init(VMPI_Stream *st, size_t block_size, VMPI_Stream_load_balance lb)
Initializes a stream should be called before any *open call.
Definition: VMPI_stream.c:26
If no data are available or endpoint not ready call might return VMPI_EAGAIN.
Definition: VMPI_stream.h:65
VMPI_Stream_blocking
Defines call mode.
Definition: VMPI_stream.h:62
MPI_Request * r_req
Input requests.
Definition: VMPI_stream.h:110
The call cannot pursue retry.
Definition: VMPI.h:56
static VMPI_Partition_desc * VMPI_Get_desc()
Returns current partition's descriptions.
Definition: VMPI.h:153
int VMPI_Stream_close(VMPI_Stream *st)
Closes a stream and sends EOF to remote endpoints.
Definition: VMPI_stream.c:316
Send or receive from each turn by turn.
Definition: VMPI_stream.h:43
int VMPI_Stream_join(VMPI_Stream *dest, VMPI_Stream *src)
Join two streams in the same set.
Definition: VMPI_stream.c:187
int state
Been closed or not.
Definition: VMPI_stream.h:78
int VMPI_Stream_open(VMPI_Stream *st, int dest, char *smode)
Open a stream to dest rank, dest should do the symmetrical call.
Definition: VMPI_stream.c:221
int VMPI_Stream_read(VMPI_Stream *st, char *buff, unsigned int block_count, VMPI_Stream_blocking block)
Read data from a VMPI_Stream.
Definition: VMPI_stream.c:493
int VMPI_Stream_push_stream(VMPI_Stream *st, int dest, int remote_tag, int local_tag, VMPI_Stream_mode mode)
pushes a stream
Definition: VMPI_stream.c:58
char * w_tmp_buffers[VMPI_STREAM_MAX_ASYNC]
Output buffers.
Definition: VMPI_stream.h:115
int current_in
Current input stream.
Definition: VMPI_stream.h:99
MPI_Status * r_stat
Input statuses.
Definition: VMPI_stream.h:111
int VMPI_Stream_write(VMPI_Stream *st, char *buff, unsigned int block_count)
Write data to a VMPI_Stream.
Definition: VMPI_stream.c:395
int count
Number of ranks.
Definition: VMPI.h:289
int remote_tag
Remote unique tag.
Definition: VMPI_stream.h:76
Always send or receive from the same.
Definition: VMPI_stream.h:41
size_t block_size
Size of a data block.
Definition: VMPI_stream.h:106
#define VMPI_NG_ARBITRARY_VALUE
Defines of magic values.
Definition: VMPI.h:38
Defines a MPI stream.
Definition: VMPI_stream.h:96