41                 st->
w_req[i] = MPI_REQUEST_NULL;
 
  133                         st->
r_stat = realloc( st->
r_stat, st->
in_count * 
sizeof( MPI_Status ) * VMPI_STREAM_MAX_ASYNC );
 
  177                         printf(
"No such mode \n");
 
  190         for( i = 0 ; i < src->
in_count ; i++ )
 
  231                 if( !strcmp( smode, 
"r") )
 
  234                 }
else if( !strcmp( smode, 
"w") )
 
  238                 else if( !strcmp( smode, 
"rw") || !strcmp( smode, 
"wr") )
 
  244                         printf(
"No such mode '%s' in %s\n", smode, __FUNCTION__ );
 
  257         tmp_desc.
tag = local_tag;
 
  275     if( remote_desc.
rank != dest )
 
  277                 printf(
"Error exchanging MPI_Stream informations\n");
 
  284                 printf(
"Error opening a stream with two read endpoints is not possible\n");
 
  290                 printf(
"Error opening a stream with two write endpoints is not possible\n");
 
  307         for( i = 0 ; i < map->
count ; i++ )
 
  351                                 printf(
"Warning : Unprocessed datas in partition %d (%s)\n", desc?desc->
id:-1, desc?desc->
name:
"UNKNOWN");
 
  352                                 printf(
"Make sure calls to close are done in both partitions\n");
 
  438                         if( index != MPI_UNDEFINED )
 
  440                                 req = &st->
w_req[index];
 
  459                                 dest = &st->
out[a_value];
 
  496         int read_size, i, dat_size, has_data;
 
  499         int src_id, target_index, flag;
 
  522                 for( i = 0 ; i < st->
in_count; i++ )
 
  547                                         if( st->
in[src_id].
state == 0 )
 
  563                         has_data = st->
in[src_id].
state;
 
  566                 src = &st->
in[ src_id ];
 
  574                                 if( PMPI_Irecv((
void *)st->
r_tmp_buffers[ src_id * VMPI_STREAM_MAX_ASYNC + i ] , 
 
  593                         PMPI_Test(  &st->
r_req[ target_index ], &flag, &st->
r_stat[ target_index ]);
 
  604                         PMPI_Wait( &st->
r_req[ target_index ], &st->
r_stat[ target_index ]);
 
  608                 PMPI_Get_count( &st->
r_stat[ target_index ], MPI_CHAR, &dat_size );
 
  621                                         target_index = src_id * VMPI_STREAM_MAX_ASYNC + i;
 
  622                                         PMPI_Cancel( &st->
r_req[ target_index ] );
 
  677                 if( st->
r_req[target_index] != MPI_REQUEST_NULL )
 
  678                         PMPI_Test(&st->
r_req[target_index], &ret, MPI_STATUS_IGNORE);
 
  702                 if( st->
w_req[dest] != MPI_REQUEST_NULL )
 
  703                         PMPI_Test(&st->
w_req[dest], &ret, MPI_STATUS_IGNORE);
 
All Calls are blocking ones. 
 
#define VMPI_STREAM_MAX_ASYNC
Maximum number of asynchronous cells. 
 
char ** r_tmp_buffers
Temporary buffer for input. 
 
struct _VMPI_Stream * in
Input streams. 
 
int VMPI_get_new_tag()
Book a tag on VMPI's communicator for this process. 
 
char name[200]
Name of the partition (could be NONE) 
 
int VMPI_Stream_test_write(VMPI_Stream *st)
Is this stream ready for writing ? 
 
static MPI_Comm VMPI_Get_vmpi_comm()
Get VMPI's copy of MPI_COMM_WORLD. 
 
struct _VMPI_Stream * out
Output streams. 
 
int out_count
Number of outputs. 
 
MPI_Request w_req[VMPI_STREAM_MAX_ASYNC]
Output requests. 
 
int VMPI_Stream_open_map(VMPI_Stream *st, VMPI_Map *map, char *mode)
Open a stream according to a mapping. 
 
int VMPI_Stream_test_read(VMPI_Stream *st)
Is this stream ready for read ? 
 
MPI_Status w_stat[VMPI_STREAM_MAX_ASYNC]
Output statuses. 
 
Send or receive from a random one. 
 
int current_out
Current output stream. 
 
int MALP_Spinlock_unlock(MALP_Spinlock *mutex)
Unlocks the given MALP_Spinlock. 
 
int w_req_count
Number of output requests. 
 
int * r_current_buff
Current buffer. 
 
MALP_Spinlock lock
Spinlock. 
 
Structure holding partitions descriptions. 
 
Temporary stream description struct. 
 
int rank
The rank of the stream. 
 
VMPI_Stream_mode mode
the mode 
 
Internal declaration of a stream endpoint. 
 
VMPI_Stream_load_balance
Defines the load ballancing modes available to MPI_Streams. 
 
int in_count
Number of inputs. 
 
int MALP_Spinlock_lock(MALP_Spinlock *mutex)
Locks the given MALP_Spinlock. 
 
VMPI_Stream_mode
Defines MPI_Stream mode. 
 
int local_tag
Local unique tag. 
 
int id
Unique identifier of the partition. 
 
VMPI_Stream_load_balance lb
Load balancing policy. 
 
int VMPI_Stream_init(VMPI_Stream *st, size_t block_size, VMPI_Stream_load_balance lb)
Initializes a stream should be called before any *open call. 
 
If no data are available or endpoint not ready call might return VMPI_EAGAIN. 
 
VMPI_Stream_blocking
Defines call mode. 
 
MPI_Request * r_req
Input requests. 
 
The call cannot pursue retry. 
 
static VMPI_Partition_desc * VMPI_Get_desc()
Returns current partition's descriptions. 
 
int VMPI_Stream_close(VMPI_Stream *st)
Closes a stream and sends EOF to remote endpoints. 
 
Send or receive from each turn by turn. 
 
int VMPI_Stream_join(VMPI_Stream *dest, VMPI_Stream *src)
Join two streams in the same set. 
 
int state
Been closed or not. 
 
int VMPI_Stream_open(VMPI_Stream *st, int dest, char *smode)
Open a stream to dest rank, dest should do the symmetrical call. 
 
int VMPI_Stream_read(VMPI_Stream *st, char *buff, unsigned int block_count, VMPI_Stream_blocking block)
Read data from a VMPI_Stream. 
 
int VMPI_Stream_push_stream(VMPI_Stream *st, int dest, int remote_tag, int local_tag, VMPI_Stream_mode mode)
pushes a stream 
 
char * w_tmp_buffers[VMPI_STREAM_MAX_ASYNC]
Output buffers. 
 
int current_in
Current input stream. 
 
MPI_Status * r_stat
Input statuses. 
 
int VMPI_Stream_write(VMPI_Stream *st, char *buff, unsigned int block_count)
Write data to a VMPI_Stream. 
 
int count
Number of ranks. 
 
int remote_tag
Remote unique tag. 
 
Always send or receive from the same. 
 
size_t block_size
Size of a data block. 
 
#define VMPI_NG_ARBITRARY_VALUE
Defines of magic values.