mirror of https://gitee.com/openkylin/openmpi.git
467 lines
15 KiB
C
467 lines
15 KiB
C
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2017 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2013-2018 University of Houston. All rights reserved.
|
|
* Copyright (c) 2018 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
|
|
#include "ompi_config.h"
|
|
#include "sharedfp_individual.h"
|
|
|
|
#include "mpi.h"
|
|
#include "ompi/constants.h"
|
|
#include "ompi/mca/sharedfp/sharedfp.h"
|
|
#include "ompi/mca/sharedfp/base/base.h"
|
|
#include "ompi/mca/common/ompio/common_ompio.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
|
|
int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh, ompio_file_t *ompio_fh)
|
|
{
|
|
int ret = OMPI_SUCCESS;
|
|
mca_sharedfp_individual_header_record *headnode = NULL;
|
|
char *buff=NULL;
|
|
int nodesoneachprocess = 0;
|
|
int idx=0,i=0,j=0, l=0;
|
|
int *ranks = NULL;
|
|
double *timestampbuff = NULL;
|
|
OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
|
|
int *countbuff = NULL;
|
|
int *displ = NULL;
|
|
double *ind_ts = NULL;
|
|
long *ind_recordlength = NULL;
|
|
OMPI_MPI_OFFSET_TYPE *local_off = NULL;
|
|
int totalnodes = 0;
|
|
ompi_status_public_t status;
|
|
int recordlength=0;
|
|
|
|
headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
|
|
if ( NULL == headnode) {
|
|
opal_output(0, "sharedfp_individual_collaborate_data: headnode is NULL but file is open\n");
|
|
return OMPI_ERROR;
|
|
}
|
|
|
|
/* Number of nodes on each process is the sum of records
|
|
* on file and records in the linked list
|
|
*/
|
|
nodesoneachprocess = headnode->numofrecordsonfile + headnode->numofrecords;
|
|
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"Nodes of each process = %d\n",nodesoneachprocess);
|
|
}
|
|
|
|
countbuff = (int*)malloc(ompio_fh->f_size * sizeof(int));
|
|
if ( NULL == countbuff ) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
displ = (int*)malloc(sizeof(int) * ompio_fh->f_size);
|
|
if ( NULL == displ ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
|
|
/* Each process counts the number of nodes
|
|
* in its linked list for which global offset */
|
|
ret = mca_sharedfp_individual_get_timestamps_and_reclengths ( &ind_ts, &ind_recordlength,
|
|
&local_off, sh );
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
ret = ompio_fh->f_comm->c_coll->coll_allgather ( &nodesoneachprocess,
|
|
1,
|
|
MPI_INT,
|
|
countbuff,
|
|
1,
|
|
MPI_INT,
|
|
ompio_fh->f_comm,
|
|
ompio_fh->f_comm->c_coll->coll_allgather_module );
|
|
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
|
|
if ( mca_sharedfp_individual_verbose) {
|
|
for (i = 0; i < ompio_fh->f_size ; i++) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,"sharedfp_individual_collaborate_data: "
|
|
"Countbuff[%d] = %d\n", i, countbuff[i]);
|
|
}
|
|
}
|
|
|
|
if ( 0 == nodesoneachprocess ) {
|
|
ind_ts[0] = 0;
|
|
ind_recordlength[0] = 0;
|
|
local_off[0] = 0;
|
|
}
|
|
|
|
for(i = 0; i < ompio_fh->f_size; i++) {
|
|
displ[i] = totalnodes;
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"sharedfp_individual_collaborate_data: displ[%d] = %d\n",i,displ[i]);
|
|
}
|
|
totalnodes = totalnodes + countbuff[i];
|
|
}
|
|
|
|
if (totalnodes <= 0 ) {
|
|
goto exit;
|
|
}
|
|
|
|
ranks = (int *) malloc ( totalnodes * sizeof(int));
|
|
if ( NULL == ranks ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
for ( l=0, i=0; i< ompio_fh->f_size; i++ ) {
|
|
for ( j=0; j< countbuff[i]; j++ ) {
|
|
ranks[l++]=i;
|
|
}
|
|
}
|
|
|
|
ret = mca_sharedfp_individual_create_buff ( ×tampbuff, &offsetbuff, totalnodes, ompio_fh->f_size);
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_ts,
|
|
countbuff[ompio_fh->f_rank],
|
|
MPI_DOUBLE,
|
|
timestampbuff,
|
|
countbuff,
|
|
displ,
|
|
MPI_DOUBLE,
|
|
ompio_fh->f_comm,
|
|
ompio_fh->f_comm->c_coll->coll_allgatherv_module );
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_recordlength,
|
|
countbuff[ompio_fh->f_rank],
|
|
OMPI_OFFSET_DATATYPE,
|
|
offsetbuff,
|
|
countbuff,
|
|
displ,
|
|
OMPI_OFFSET_DATATYPE,
|
|
ompio_fh->f_comm,
|
|
ompio_fh->f_comm->c_coll->coll_allgatherv_module );
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
ret = mca_sharedfp_individual_sort_timestamps(×tampbuff, &offsetbuff, &ranks, totalnodes);
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);
|
|
|
|
recordlength = ind_recordlength[0] * 1.2;
|
|
buff = (char * ) malloc( recordlength );
|
|
if ( NULL == buff ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
|
|
for (i = 0; i < nodesoneachprocess ; i++) {
|
|
if ( ind_recordlength[i] > recordlength ) {
|
|
recordlength = ind_recordlength[i] * 1.2;
|
|
buff = (char *) realloc ( buff, recordlength );
|
|
if ( NULL == buff ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
}
|
|
|
|
/*Read from the local data file*/
|
|
ret = mca_common_ompio_file_read_at ( headnode->datafilehandle,
|
|
local_off[i], buff, ind_recordlength[i],
|
|
MPI_BYTE, &status);
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, ompio_fh->f_rank, totalnodes);
|
|
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
|
|
"%lld (%d)\n", ompio_fh->f_rank, ind_recordlength[i], offsetbuff[idx], idx);
|
|
}
|
|
|
|
/*Write into main data file*/
|
|
ret = mca_common_ompio_file_write_at( ompio_fh, offsetbuff[idx], buff,
|
|
ind_recordlength[i], MPI_BYTE, &status);
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
}
|
|
|
|
exit:
|
|
if ( NULL != countbuff ) {
|
|
free ( countbuff );
|
|
}
|
|
if ( NULL != displ ) {
|
|
free ( displ );
|
|
}
|
|
|
|
if( NULL != timestampbuff ){
|
|
free ( timestampbuff );
|
|
}
|
|
if ( NULL != offsetbuff ){
|
|
free ( offsetbuff );
|
|
}
|
|
if ( NULL != ind_ts ) {
|
|
free ( ind_ts );
|
|
}
|
|
if ( NULL != ind_recordlength ) {
|
|
free ( ind_recordlength );
|
|
}
|
|
if ( NULL != local_off ) {
|
|
free ( local_off );
|
|
}
|
|
if ( NULL != buff ) {
|
|
free ( buff );
|
|
}
|
|
if ( NULL != ranks ) {
|
|
free ( ranks );
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* Count the number of nodes and create and array of the timestamps*/
|
|
int mca_sharedfp_individual_get_timestamps_and_reclengths ( double **buff, long **rec_length,
|
|
MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh)
|
|
{
|
|
int num = 0, i= 0, ctr = 0;
|
|
int ret=OMPI_SUCCESS;
|
|
mca_sharedfp_individual_metadata_node *currnode;
|
|
mca_sharedfp_individual_header_record *headnode;
|
|
OMPI_MPI_OFFSET_TYPE metaoffset = 0;
|
|
struct mca_sharedfp_individual_record2 rec;
|
|
MPI_Status status;
|
|
|
|
headnode = (mca_sharedfp_individual_header_record*)(sh->selected_module_data);
|
|
num = ( headnode->numofrecords + headnode->numofrecordsonfile);
|
|
currnode = headnode->next;
|
|
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,"Num is %d\n",num);
|
|
}
|
|
|
|
if ( 0 == num ) {
|
|
*buff = (double*) malloc ( sizeof ( double ));
|
|
*rec_length = (long *) malloc ( sizeof ( long ));
|
|
*offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) );
|
|
if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
}
|
|
else {
|
|
*buff = (double* ) malloc(sizeof ( double) * num);
|
|
*rec_length = (long *) malloc(sizeof ( long) * num);
|
|
*offbuff = (OMPI_MPI_OFFSET_TYPE *) malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * num);
|
|
if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
goto exit;
|
|
}
|
|
}
|
|
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"sharedfp_individual_get_timestamps_and_reclengths: Numofrecords on file %d\n",
|
|
headnode->numofrecordsonfile);
|
|
}
|
|
|
|
if (headnode->numofrecordsonfile > 0) {
|
|
metaoffset = headnode->metafile_start_offset;
|
|
ctr = 0;
|
|
for (i = 0; i < headnode->numofrecordsonfile ; i++) {
|
|
|
|
ret = mca_common_ompio_file_read_at(headnode->metadatafilehandle,metaoffset,
|
|
&rec, 32, MPI_BYTE,&status);
|
|
if ( OMPI_SUCCESS != ret ) {
|
|
goto exit;
|
|
}
|
|
|
|
*(*rec_length + ctr) = rec.recordlength;
|
|
*(*buff + ctr) = rec.timestamp;
|
|
*(*offbuff + ctr) = rec.localposition;
|
|
|
|
metaoffset = metaoffset + sizeof(struct mca_sharedfp_individual_record2);
|
|
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"sharedfp_individual_get_timestamps_and_reclengths: Ctr = %d\n",ctr);
|
|
}
|
|
ctr++;
|
|
}
|
|
|
|
headnode->numofrecordsonfile = 0;
|
|
headnode->metafile_start_offset = metaoffset;
|
|
|
|
} /* End of if (headnode->numofrecordsonfile > 0) */
|
|
|
|
/* Add the records from the linked list */
|
|
currnode = headnode->next;
|
|
while (currnode) {
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,"Ctr = %d\n",ctr);
|
|
}
|
|
/* Some error over here..need to check this code again */
|
|
/*while(headnode->next != NULL)*/
|
|
|
|
*(*rec_length + ctr) = currnode->recordlength;
|
|
*(*buff + ctr) = currnode->timestamp;
|
|
*(*offbuff + ctr) = currnode->localposition;
|
|
|
|
ctr = ctr + 1;
|
|
|
|
headnode->next = currnode->next;
|
|
if ( mca_sharedfp_individual_verbose ) {
|
|
opal_output(ompi_sharedfp_base_framework.framework_output,
|
|
"sharedfp_individual_get_timestamps_and_reclengths: node deleted from the metadatalinked list\n");
|
|
}
|
|
free(currnode);
|
|
currnode = headnode->next;
|
|
|
|
} /*End of while(currnode) loop*/
|
|
|
|
|
|
/*Reset the numofrecords*/
|
|
headnode->numofrecords = 0;
|
|
|
|
exit:
|
|
|
|
return ret;
|
|
}
|
|
|
|
int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes, int size)
|
|
{
|
|
|
|
if ( totalnodes) {
|
|
*off = (OMPI_MPI_OFFSET_TYPE *) malloc ( totalnodes * sizeof(OMPI_MPI_OFFSET_TYPE));
|
|
if ( NULL == *off ) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
*ts = (double *) malloc ( totalnodes * sizeof(double) );
|
|
if (NULL == *ts ) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/*Sort the timestamp buffer*/
|
|
int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
|
|
{
|
|
|
|
int i = 0;
|
|
int j = 0;
|
|
int flag = 1;
|
|
double tempts = 0.0;
|
|
OMPI_MPI_OFFSET_TYPE tempoffset = 0;
|
|
int temprank = 0;
|
|
|
|
for (i= 1; (i <= totalnodes)&&(flag) ; i++) {
|
|
flag = 0;
|
|
for (j = 0; j < (totalnodes - 1); j++) {
|
|
if ( *(*ts + j + 1) < *(*ts + j )) {
|
|
/*swap timestamp*/
|
|
tempts = *(*ts + j );
|
|
*(*ts + j) = *(*ts + j + 1);
|
|
*(*ts + j + 1) = tempts;
|
|
|
|
/*swap offset*/
|
|
tempoffset = *(*off + j);
|
|
*(*off + j) = *(*off + j + 1);
|
|
*(*off + j + 1) = tempoffset;
|
|
|
|
/*swap ranks*/
|
|
temprank = *(*ranks + j);
|
|
*(*ranks + j) = *(*ranks + j + 1);
|
|
*(*ranks + j + 1) = temprank;
|
|
|
|
flag = 1;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,
|
|
struct mca_sharedfp_base_data_t *sh)
|
|
{
|
|
int i = 0;
|
|
OMPI_MPI_OFFSET_TYPE temp = 0,prevoffset = 0;
|
|
OMPI_MPI_OFFSET_TYPE global_offset = 0;
|
|
|
|
for (i = 0; i < totalnodes; i++) {
|
|
temp = *(*offsetbuff + i);
|
|
|
|
if (i == 0) {
|
|
*(*offsetbuff + i ) = sh->global_offset;
|
|
}
|
|
else {
|
|
*(*offsetbuff + i) = *(*offsetbuff + i - 1) + prevoffset;
|
|
}
|
|
prevoffset = temp;
|
|
}
|
|
global_offset = *(*offsetbuff + i - 1) + prevoffset;
|
|
|
|
return global_offset;
|
|
}
|
|
|
|
|
|
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
|
|
{
|
|
int i = 0;
|
|
int notfound = 1;
|
|
|
|
|
|
while (notfound) {
|
|
if (ts[i] == timestamp && ranks[i] == myrank )
|
|
break;
|
|
|
|
i++;
|
|
|
|
if (i == totalnodes) {
|
|
notfound = 0;
|
|
}
|
|
}
|
|
|
|
if (!notfound) {
|
|
return -1;
|
|
}
|
|
|
|
return i;
|
|
}
|