41 st->
w_req[i] = MPI_REQUEST_NULL;
133 st->
r_stat = realloc( st->
r_stat, st->
in_count *
sizeof( MPI_Status ) * VMPI_STREAM_MAX_ASYNC );
177 printf(
"No such mode \n");
190 for( i = 0 ; i < src->
in_count ; i++ )
231 if( !strcmp( smode,
"r") )
234 }
else if( !strcmp( smode,
"w") )
238 else if( !strcmp( smode,
"rw") || !strcmp( smode,
"wr") )
244 printf(
"No such mode '%s' in %s\n", smode, __FUNCTION__ );
257 tmp_desc.
tag = local_tag;
275 if( remote_desc.
rank != dest )
277 printf(
"Error exchanging MPI_Stream informations\n");
284 printf(
"Error opening a stream with two read endpoints is not possible\n");
290 printf(
"Error opening a stream with two write endpoints is not possible\n");
307 for( i = 0 ; i < map->
count ; i++ )
351 printf(
"Warning : Unprocessed datas in partition %d (%s)\n", desc?desc->
id:-1, desc?desc->
name:
"UNKNOWN");
352 printf(
"Make sure calls to close are done in both partitions\n");
438 if( index != MPI_UNDEFINED )
440 req = &st->
w_req[index];
459 dest = &st->
out[a_value];
496 int read_size, i, dat_size, has_data;
499 int src_id, target_index, flag;
522 for( i = 0 ; i < st->
in_count; i++ )
547 if( st->
in[src_id].
state == 0 )
563 has_data = st->
in[src_id].
state;
566 src = &st->
in[ src_id ];
574 if( PMPI_Irecv((
void *)st->
r_tmp_buffers[ src_id * VMPI_STREAM_MAX_ASYNC + i ] ,
593 PMPI_Test( &st->
r_req[ target_index ], &flag, &st->
r_stat[ target_index ]);
604 PMPI_Wait( &st->
r_req[ target_index ], &st->
r_stat[ target_index ]);
608 PMPI_Get_count( &st->
r_stat[ target_index ], MPI_CHAR, &dat_size );
621 target_index = src_id * VMPI_STREAM_MAX_ASYNC + i;
622 PMPI_Cancel( &st->
r_req[ target_index ] );
677 if( st->
r_req[target_index] != MPI_REQUEST_NULL )
678 PMPI_Test(&st->
r_req[target_index], &ret, MPI_STATUS_IGNORE);
702 if( st->
w_req[dest] != MPI_REQUEST_NULL )
703 PMPI_Test(&st->
w_req[dest], &ret, MPI_STATUS_IGNORE);
All Calls are blocking ones.
#define VMPI_STREAM_MAX_ASYNC
Maximum number of asynchronous cells.
char ** r_tmp_buffers
Temporary buffer for input.
struct _VMPI_Stream * in
Input streams.
int VMPI_get_new_tag()
Book a tag on VMPI's communicator for this process.
char name[200]
Name of the partition (could be NONE)
int VMPI_Stream_test_write(VMPI_Stream *st)
Is this stream ready for writing ?
static MPI_Comm VMPI_Get_vmpi_comm()
Get VMPI's copy of MPI_COMM_WORLD.
struct _VMPI_Stream * out
Output streams.
int out_count
Number of outputs.
MPI_Request w_req[VMPI_STREAM_MAX_ASYNC]
Output requests.
int VMPI_Stream_open_map(VMPI_Stream *st, VMPI_Map *map, char *mode)
Open a stream according to a mapping.
int VMPI_Stream_test_read(VMPI_Stream *st)
Is this stream ready for read ?
MPI_Status w_stat[VMPI_STREAM_MAX_ASYNC]
Output statuses.
Send or receive from a random one.
int current_out
Current output stream.
int MALP_Spinlock_unlock(MALP_Spinlock *mutex)
Unlocks the given MALP_Spinlock.
int w_req_count
Number of output requests.
int * r_current_buff
Current buffer.
MALP_Spinlock lock
Spinlock.
Structure holding partitions descriptions.
Temporary stream description struct.
int rank
The rank of the stream.
VMPI_Stream_mode mode
the mode
Internal declaration of a stream endpoint.
VMPI_Stream_load_balance
Defines the load ballancing modes available to MPI_Streams.
int in_count
Number of inputs.
int MALP_Spinlock_lock(MALP_Spinlock *mutex)
Locks the given MALP_Spinlock.
VMPI_Stream_mode
Defines MPI_Stream mode.
int local_tag
Local unique tag.
int id
Unique identifier of the partition.
VMPI_Stream_load_balance lb
Load balancing policy.
int VMPI_Stream_init(VMPI_Stream *st, size_t block_size, VMPI_Stream_load_balance lb)
Initializes a stream should be called before any *open call.
If no data are available or endpoint not ready call might return VMPI_EAGAIN.
VMPI_Stream_blocking
Defines call mode.
MPI_Request * r_req
Input requests.
The call cannot pursue retry.
static VMPI_Partition_desc * VMPI_Get_desc()
Returns current partition's descriptions.
int VMPI_Stream_close(VMPI_Stream *st)
Closes a stream and sends EOF to remote endpoints.
Send or receive from each turn by turn.
int VMPI_Stream_join(VMPI_Stream *dest, VMPI_Stream *src)
Join two streams in the same set.
int state
Been closed or not.
int VMPI_Stream_open(VMPI_Stream *st, int dest, char *smode)
Open a stream to dest rank, dest should do the symmetrical call.
int VMPI_Stream_read(VMPI_Stream *st, char *buff, unsigned int block_count, VMPI_Stream_blocking block)
Read data from a VMPI_Stream.
int VMPI_Stream_push_stream(VMPI_Stream *st, int dest, int remote_tag, int local_tag, VMPI_Stream_mode mode)
pushes a stream
char * w_tmp_buffers[VMPI_STREAM_MAX_ASYNC]
Output buffers.
int current_in
Current input stream.
MPI_Status * r_stat
Input statuses.
int VMPI_Stream_write(VMPI_Stream *st, char *buff, unsigned int block_count)
Write data to a VMPI_Stream.
int count
Number of ranks.
int remote_tag
Remote unique tag.
Always send or receive from the same.
size_t block_size
Size of a data block.
#define VMPI_NG_ARBITRARY_VALUE
Defines of magic values.