forked from openkylin/openmpi
570 lines
20 KiB
C
570 lines
20 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2016-2018 Intel, Inc. All rights reserved.
|
|
* Copyright (c) 2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
/** @file:
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* includes
|
|
*/
|
|
#include "orte_config.h"
|
|
|
|
|
|
#include "opal/dss/dss.h"
|
|
|
|
#include "orte/util/compress.h"
|
|
#include "orte/util/proc_info.h"
|
|
#include "orte/util/error_strings.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/odls/base/base.h"
|
|
#include "orte/mca/rmaps/rmaps_types.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/threads.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/grpcomm/grpcomm.h"
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag);
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns);
|
|
|
|
typedef struct {
|
|
opal_object_t super;
|
|
opal_event_t ev;
|
|
orte_grpcomm_signature_t *sig;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_cbfunc_t cbfunc;
|
|
void *cbdata;
|
|
} orte_grpcomm_caddy_t;
|
|
static void gccon(orte_grpcomm_caddy_t *p)
|
|
{
|
|
p->sig = NULL;
|
|
p->buf = NULL;
|
|
p->cbfunc = NULL;
|
|
p->cbdata = NULL;
|
|
}
|
|
static void gcdes(orte_grpcomm_caddy_t *p)
|
|
{
|
|
if (NULL != p->buf) {
|
|
OBJ_RELEASE(p->buf);
|
|
}
|
|
}
|
|
static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
|
|
opal_object_t,
|
|
gccon, gcdes);
|
|
|
|
int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
|
|
orte_rml_tag_t tag,
|
|
opal_buffer_t *msg)
|
|
{
|
|
int rc = ORTE_ERROR;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_vpid_t *dmns;
|
|
size_t ndmns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:xcast sending %u bytes to tag %ld",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
|
|
|
|
/* this function does not access any framework-global data, and
|
|
* so it does not require us to push it into the event library */
|
|
|
|
/* prep the output buffer */
|
|
buf = OBJ_NEW(opal_buffer_t);
|
|
|
|
/* create the array of participating daemons */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
return rc;
|
|
}
|
|
|
|
/* setup the payload */
|
|
if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/* cycle thru the actives and see who can send it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->xcast) {
|
|
if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(buf); // if the module needs to keep the buf, it should OBJ_RETAIN it
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
static void allgather_stub(int fd, short args, void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
|
|
int ret = OPAL_SUCCESS;
|
|
int rc;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_grpcomm_coll_t *coll;
|
|
uint32_t *seq_number;
|
|
|
|
ORTE_ACQUIRE_OBJECT(cd);
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather stub",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* retrieve an existing tracker, create it if not
|
|
* already found. The allgather module is responsible
|
|
* for releasing it upon completion of the collective */
|
|
ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
|
|
if (OPAL_ERR_NOT_FOUND == ret) {
|
|
seq_number = (uint32_t *)malloc(sizeof(uint32_t));
|
|
*seq_number = 0;
|
|
} else if (OPAL_SUCCESS == ret) {
|
|
*seq_number = *seq_number + 1;
|
|
} else {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot get signature from hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
|
|
if (OPAL_SUCCESS != ret) {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot add new signature to hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
coll = orte_grpcomm_base_get_tracker(cd->sig, true);
|
|
if (NULL == coll) {
|
|
OBJ_RELEASE(cd->sig);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
OBJ_RELEASE(cd->sig);
|
|
coll->cbfunc = cd->cbfunc;
|
|
coll->cbdata = cd->cbdata;
|
|
|
|
/* cycle thru the actives and see who can process it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->allgather) {
|
|
if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(cd);
|
|
}
|
|
|
|
int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buf,
|
|
orte_grpcomm_cbfunc_t cbfunc,
|
|
void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* must push this into the event library to ensure we can
|
|
* access framework-global data safely */
|
|
cd = OBJ_NEW(orte_grpcomm_caddy_t);
|
|
/* ensure the data doesn't go away */
|
|
OBJ_RETAIN(buf);
|
|
opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
|
|
cd->buf = buf;
|
|
cd->cbfunc = cbfunc;
|
|
cd->cbdata = cbdata;
|
|
opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
|
|
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
|
|
ORTE_POST_OBJECT(cd);
|
|
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
|
|
{
|
|
orte_grpcomm_coll_t *coll;
|
|
int rc;
|
|
orte_namelist_t *nm;
|
|
opal_list_t children;
|
|
size_t n;
|
|
char *routed;
|
|
|
|
/* search the existing tracker list to see if this already exists */
|
|
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
|
|
if (NULL == sig->signature) {
|
|
if (NULL == coll->sig->signature) {
|
|
/* only one collective can operate at a time
|
|
* across every process in the system */
|
|
return coll;
|
|
}
|
|
/* if only one is NULL, then we can't possibly match */
|
|
break;
|
|
}
|
|
if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:returning existing collective",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
return coll;
|
|
}
|
|
}
|
|
/* if we get here, then this is a new collective - so create
|
|
* the tracker for it */
|
|
if (!create) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base: not creating new coll",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return NULL;
|
|
}
|
|
coll = OBJ_NEW(orte_grpcomm_coll_t);
|
|
opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
|
|
|
|
if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
|
|
char *tmp=NULL;
|
|
(void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
|
|
opal_output(0, "%s grpcomm:base: creating new coll for%s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
|
|
free(tmp);
|
|
}
|
|
|
|
opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
|
|
|
|
/* now get the daemons involved */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return NULL;
|
|
}
|
|
|
|
/* get the routed module for our conduit */
|
|
routed = orte_rml.get_routed(orte_coll_conduit);
|
|
if (NULL == routed) {
|
|
/* this conduit is not routed, so we expect all daemons
|
|
* to directly participate */
|
|
coll->nexpected = coll->ndmns;
|
|
} else {
|
|
/* cycle thru the array of daemons and compare them to our
|
|
* children in the routing tree, counting the ones that match
|
|
* so we know how many daemons we should receive contributions from */
|
|
OBJ_CONSTRUCT(&children, opal_list_t);
|
|
orte_routed.get_routing_list(routed, &children);
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (nm->name.vpid == coll->dmns[n]) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&children);
|
|
|
|
/* see if I am in the array of participants - note that I may
|
|
* be in the rollup tree even though I'm not participating
|
|
* in the collective itself */
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return coll;
|
|
}
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns)
|
|
{
|
|
size_t n;
|
|
orte_job_t *jdata;
|
|
orte_proc_t *proc;
|
|
orte_node_t *node;
|
|
int i;
|
|
opal_list_t ds;
|
|
orte_namelist_t *nm;
|
|
orte_vpid_t vpid;
|
|
bool found;
|
|
size_t nds;
|
|
orte_vpid_t *dns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called with %s signature",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == sig->signature) ? "NULL" : "NON-NULL"));
|
|
|
|
/* if NULL == procs, or the target jobid is our own,
|
|
* then all daemons are participating */
|
|
if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
|
|
*ndmns = orte_process_info.num_procs;
|
|
*dmns = NULL;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called for all procs in job %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_JOBID_PRINT(sig->signature[0].jobid)));
|
|
/* all daemons hosting this jobid are participating */
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
|
|
/* we haven't generated a job map yet - if we are the HNP,
|
|
* then we should only involve ourselves. Otherwise, we have
|
|
* no choice but to abort to avoid hangs */
|
|
if (ORTE_PROC_IS_HNP) {
|
|
dns = (orte_vpid_t*)malloc(sizeof(vpid));
|
|
dns[0] = ORTE_PROC_MY_NAME->vpid;
|
|
*ndmns = 1;
|
|
*dmns = dns;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
|
|
nds = 0;
|
|
for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
|
|
if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
|
|
continue;
|
|
}
|
|
if (NULL == node->daemon) {
|
|
/* should never happen */
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
free(dns);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&node->daemon->name)));
|
|
dns[nds++] = node->daemon->name.vpid;
|
|
}
|
|
} else {
|
|
/* lookup the daemon for each proc and add it to the list, checking to
|
|
* ensure any daemon only gets added once. Yes, this isn't a scalable
|
|
* algo - someone can come up with something better! */
|
|
OBJ_CONSTRUCT(&ds, opal_list_t);
|
|
for (n=0; n < sig->sz; n++) {
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s sign: GETTING PROC OBJECT FOR %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&sig->signature[n])));
|
|
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == proc->node || NULL == proc->node->daemon) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
vpid = proc->node->daemon->name.vpid;
|
|
found = false;
|
|
OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
|
|
if (nm->name.vpid == vpid) {
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!found) {
|
|
nm = OBJ_NEW(orte_namelist_t);
|
|
nm->name.vpid = vpid;
|
|
opal_list_append(&ds, &nm->super);
|
|
}
|
|
}
|
|
if (0 == opal_list_get_size(&ds)) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
|
|
nds = 0;
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&nm->name)));
|
|
dns[nds++] = nm->name.vpid;
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
}
|
|
*dmns = dns;
|
|
*ndmns = nds;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag)
|
|
{
|
|
int rc;
|
|
opal_buffer_t data;
|
|
int8_t flag;
|
|
uint8_t *cmpdata;
|
|
size_t cmplen;
|
|
|
|
/* setup an intermediate buffer */
|
|
OBJ_CONSTRUCT(&data, opal_buffer_t);
|
|
|
|
/* pass along the signature */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pass the final tag */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* copy the payload into the new buffer - this is non-destructive, so our
|
|
* caller is still responsible for releasing any memory in the buffer they
|
|
* gave to us
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* see if we want to compress this message */
|
|
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
|
|
&cmpdata, &cmplen)) {
|
|
/* the data was compressed - mark that we compressed it */
|
|
flag = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the uncompressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed info */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
} else {
|
|
/* mark that it was not compressed */
|
|
flag = 0;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
return rc;
|
|
}
|
|
/* transfer the payload across */
|
|
opal_dss.copy_payload(buffer, &data);
|
|
OBJ_DESTRUCT(&data);
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"MSG SIZE: %lu", buffer->bytes_used));
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|
|
|
|
unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|