openmpi/opal/mca/btl/openib/connect/btl_openib_connect_udcm.c

3052 lines
98 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2007-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2008-2009 Mellanox Technologies. All rights reserved.
* Copyright (c) 2009 IBM Corporation. All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Bull SAS. All rights reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
* The UD connection module creates and listens on a unconnected
* datagram (UD) queue pair (QP) for connections requests.
*
* There are two ways an RC connection can be established by UD:
* 1. One side starts a connection and the request is received before
* the receiving side starts a connection. (One sided)
* 2. Both sides send a request before either receives a request.
* (Simulaneous).
*
* The protocol for case 1 looks like:
* peer1 peer2
* | |
* CONNECT |------>|
* | | move QPs to RTS
* | | post rc receive
* |<------| CONNECT
* move QPs to RTS | |
* post rc send | |
* |<------| COMPLETE
* COMPLETE |------>|
*
* The protocol for case 2 looks like:
* peer1 peer2
* | |
* CONNECT |<----->| CONNECT
* move QPs to RTS | | move QPs to RTS
* post rc send | | post rc recv
* COMPLETE |<----->| COMPLETE
*
*/
#include "opal_config.h"
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <fcntl.h>
#include <infiniband/verbs.h>
#include <signal.h>
#include <pthread.h>
#include "opal/util/show_help.h"
#include "opal/util/proc.h"
#include "opal/util/output.h"
#include "opal/util/error.h"
#include "opal/util/alfg.h"
#include "opal_stdint.h"
#include "opal/class/opal_fifo.h"
#include "btl_openib_endpoint.h"
#include "btl_openib_proc.h"
#include "btl_openib_async.h"
#include "connect/connect.h"
#include "opal/util/sys_limits.h"
#include "opal/align.h"
#if (ENABLE_DYNAMIC_SL)
#include "connect/btl_openib_connect_sl.h"
#endif
#if HAVE_XRC
#include "btl_openib_xrc.h"
#endif
/*--------------------------------------------------------------------*/
/*
* Message that this CPC includes in the modex. Filed are laid out in
* order to avoid holes.
*/
typedef struct {
/** The qp_num we are listening on (this alone may be sufficient for
matching the endpoint) */
uint32_t mm_qp_num;
/** The LID that we're listening on; it also identifies the source
endpoint when an UD CM request arrives */
uint16_t mm_lid;
/** The port number of this port, also used to locate the source
endpoint when an UD CM request arrives */
uint8_t mm_port_num;
/** Global ID (needed when routers are in use) */
union ibv_gid mm_gid;
} modex_msg_t;
/*
* The UD module (i.e., the base module plus more meta data required
* by this CPC)
*/
typedef struct udcm_module {
opal_btl_openib_connect_base_module_t cpc;
/* This mutex must be held by any thread modifying
the module directly */
opal_mutex_t cm_lock;
/* Signal callbacks and threads that this module
is exiting */
bool cm_exiting;
/* UD QP this module is listening on */
struct ibv_qp *listen_qp;
/* Work request completion queues */
struct ibv_cq *cm_send_cq, *cm_recv_cq;
/* Completion channel for receive completions */
struct ibv_comp_channel *cm_channel;
/* Memory register for cm_buffer */
struct ibv_mr *cm_mr;
/* All buffers (grh + receive, send) */
char *cm_buffer;
/* Pointer to send buffer (near end of cm_buffer) */
char *cm_send_buffer;
/* Length of largest message */
size_t msg_length;
/* timeout thread */
opal_mutex_t cm_timeout_lock;
/* Messages waiting for ack */
opal_list_t flying_messages;
/* This mutex must be held when calling ibv_post_send
or waiting on cm_send_cq */
opal_mutex_t cm_send_lock;
/* Receive queue */
opal_fifo_t cm_recv_msg_fifo;
/* The associated BTL */
struct mca_btl_openib_module_t *btl;
/* This module's modex message */
modex_msg_t modex;
/* channel monitoring */
/** channel event base */
opal_event_base_t *channel_evbase;
/** channel monitoring event */
opal_event_t channel_event;
/* message processing */
/** mesage event is active */
int32_t cm_message_event_active;
/** message event */
opal_event_t cm_message_event;
} udcm_module_t;
/*
* Per-endpoint UD data
*/
typedef struct {
/* Lock for IPC between threads in the ud CPC */
opal_mutex_t udep_lock;
struct ibv_ah *ah;
bool sent_req, recv_req, recv_resp, recv_comp;
/* Has this endpoint's data been initialized */
bool udep_initialized, udep_created_qps;
} udcm_endpoint_t;
typedef struct udcm_qp_t {
uint32_t qp_num;
uint32_t psn;
} udcm_qp_t;
typedef enum udcm_message_type {
UDCM_MESSAGE_CONNECT = 100,
UDCM_MESSAGE_COMPLETE = 101,
UDCM_MESSAGE_REJECT = 102,
#if HAVE_XRC
UDCM_MESSAGE_XCONNECT = 103,
UDCM_MESSAGE_XRESPONSE = 104,
UDCM_MESSAGE_XCONNECT2 = 105,
UDCM_MESSAGE_XRESPONSE2 = 106,
#endif
UDCM_MESSAGE_ACK = 107
} udcm_message_type_t;
typedef enum {
UDCM_REJ_REMOTE_ERROR = -1,
UDCM_REJ_ALREADY_CONNECTED = -2,
#if HAVE_XRC
UDCM_REJ_NOT_READY = -3,
#endif
} udcm_reject_reason_t;
typedef struct udcm_msg_hdr {
uint8_t type;
/* ack context */
uintptr_t rem_ctx;
/* endpoint local to the sender */
mca_btl_base_endpoint_t *rem_ep;
/* endpoint local to the receiver */
mca_btl_base_endpoint_t *lcl_ep;
union {
/* UDCM_MESSAGE_CONNECT */
struct msg_connect {
opal_process_name_t rem_name;
int32_t rem_ep_index;
uint8_t rem_port_num;
} req;
/* UDCM_MESSAGE_REJECT */
struct msg_reject {
int32_t reason;
} rej;
#if HAVE_XRC
/* UDCM_MESSAGE_XCONNECT, UDCM_MESSAGE_XCONNECT2 */
struct msg_xrc_connect {
opal_process_name_t rem_name;
int32_t rem_ep_index;
uint8_t rem_port_num;
uint32_t rem_qp_num;
uint32_t rem_psn;
} xreq;
/* UDCM_MESSAGE_XRESPONSE */
struct msg_xrc_response {
int32_t rem_ep_index;
uint32_t rem_qp_num;
uint32_t rem_psn;
} xres;
#endif
} data;
} udcm_msg_hdr_t;
typedef struct udcm_msg_t {
udcm_msg_hdr_t hdr;
/* If the message type is UDCM_MESSAGE_CONNECT,
UDCM_MESSAGE_XRESPONSE, or UDCM_MESSAGE_XRESPONSE2
then queue pair/srq data will follow the header */
udcm_qp_t qps[];
} udcm_msg_t;
typedef struct udcm_message_recv {
opal_list_item_t super;
udcm_msg_hdr_t msg_hdr;
} udcm_message_recv_t;
static OBJ_CLASS_INSTANCE(udcm_message_recv_t, opal_list_item_t,
NULL, NULL);
typedef struct udcm_message_sent {
opal_list_item_t super;
udcm_msg_t *data;
size_t length;
mca_btl_base_endpoint_t *endpoint;
int tries;
opal_event_t event;
bool event_active;
} udcm_message_sent_t;
static void udcm_sent_message_constructor (udcm_message_sent_t *);
static void udcm_sent_message_destructor (udcm_message_sent_t *);
static OBJ_CLASS_INSTANCE(udcm_message_sent_t, opal_list_item_t,
udcm_sent_message_constructor,
udcm_sent_message_destructor);
#define UDCM_ENDPOINT_MODULE(ep) ((udcm_module_t *)(ep)->endpoint_local_cpc)
#define UDCM_ENDPOINT_DATA(ep) ((udcm_endpoint_t *)(ep)->endpoint_local_cpc_data)
#define UDCM_ENDPOINT_REM_MODEX(ep) \
(((modex_msg_t *)(ep)->endpoint_remote_cpc_data->cbm_modex_message))
/*--------------------------------------------------------------------*/
static void udcm_component_register(void);
static int udcm_component_query(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t **cpc);
static int udcm_component_finalize(void);
/* Module methods */
static int udcm_endpoint_init(struct mca_btl_base_endpoint_t *lcl_ep);
static int udcm_module_start_connect(opal_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *lcl_ep);
static int udcm_endpoint_finalize(struct mca_btl_base_endpoint_t *lcl_ep);
static int udcm_endpoint_init_data (mca_btl_base_endpoint_t *lcl_ep);
static int udcm_rc_qp_create_all (mca_btl_base_endpoint_t *lcl_ep);
static int udcm_module_finalize(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t *cpc);
static void *udcm_cq_event_dispatch(int fd, int flags, void *context);
static void *udcm_message_callback (int fd, int flags, void *context);
static void udcm_set_message_timeout (udcm_message_sent_t *message);
static void udcm_free_message (udcm_message_sent_t *message);
static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl);
static int udcm_module_create_listen_qp (udcm_module_t *m);
static void udcm_module_destroy_listen_qp (udcm_module_t *m);
static int udcm_module_allocate_buffers (udcm_module_t *m);
static void udcm_module_destroy_buffers (udcm_module_t *m);
static int udcm_module_post_all_recvs (udcm_module_t *m);
static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep);
static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg);
static int udcm_finish_connection (mca_btl_openib_endpoint_t *lcl_ep);
static int udcm_rc_qps_to_rts(mca_btl_openib_endpoint_t *lcl_ep);
/* XRC support */
#if HAVE_XRC
static int udcm_xrc_start_connect (opal_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *lcl_ep);
static int udcm_xrc_restart_connect (mca_btl_base_endpoint_t *lcl_ep);
static int udcm_xrc_send_qp_connect (mca_btl_openib_endpoint_t *lcl_ep, uint32_t rem_qp_num, uint32_t rem_psn);
static int udcm_xrc_send_qp_create (mca_btl_base_endpoint_t *lcl_ep);
static int udcm_xrc_recv_qp_connect (mca_btl_openib_endpoint_t *lcl_ep, uint32_t qp_num);
static int udcm_xrc_recv_qp_create (mca_btl_openib_endpoint_t *lcl_ep, uint32_t rem_qp_num, uint32_t rem_psn);
static int udcm_xrc_send_request (mca_btl_base_endpoint_t *lcl_ep, mca_btl_base_endpoint_t *rem_ep,
uint8_t msg_type);
static int udcm_xrc_send_xresponse (mca_btl_base_endpoint_t *lcl_ep, mca_btl_base_endpoint_t *rem_ep,
uint8_t msg_type);
static int udcm_xrc_handle_xconnect (mca_btl_openib_endpoint_t *lcl_ep, udcm_msg_hdr_t *msg_hdr);
static int udcm_xrc_handle_xresponse (mca_btl_openib_endpoint_t *lcl_ep, udcm_msg_hdr_t *msg_hdr);
#endif
/*--------------------------------------------------------------------*/
#define UDCM_MIN_RECV_COUNT 512
#define UDCM_MIN_TIMEOUT 500000
#define UDCM_SEND_CQ_SIZE 512
#define UDCM_WR_RECV_ID 0x20000000ll
#define UDCM_WR_SEND_ID 0x10000000ll
#define UDCM_WR_ACK_ID 0x10000000ll
#define UDCM_WR_DIR_MASK 0x30000000ll
/* Useless 40 bytes of data that proceeds received scatter gather data.
Can we get rid of this? */
#define UDCM_GRH_SIZE (sizeof (struct ibv_grh))
/* Priority of this connection module */
static int udcm_priority;
/* Number of receive work requests to post */
static int udcm_recv_count;
static int udcm_max_retry;
/* Message ACK timeout in usec */
static int udcm_timeout;
/* seed for rand_r. remove me when opal gets a random number generator */
/* Uses the OPAL ALFG RNG */
static uint32_t udcm_random_seed = 0;
static opal_rng_buff_t udcm_rand_buff;
static struct timeval udcm_timeout_tv;
/*******************************************************************
* Component
*******************************************************************/
/* mark: udcm component */
opal_btl_openib_connect_base_component_t opal_btl_openib_connect_udcm = {
"udcm",
udcm_component_register,
NULL,
udcm_component_query,
udcm_component_finalize
};
static void udcm_component_register(void)
{
/* the priority is initialized in the declaration above */
udcm_priority = 63;
(void) mca_base_component_var_register(&mca_btl_openib_component.super.btl_version,
"connect_udcm_priority", "Priority of the udcm "
"connection method", MCA_BASE_VAR_TYPE_INT, NULL,
0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&udcm_priority);
udcm_recv_count = UDCM_MIN_RECV_COUNT;
(void) mca_base_component_var_register(&mca_btl_openib_component.super.btl_version,
"connect_udcm_recv_count", "Number of registered "
"buffers to post", MCA_BASE_VAR_TYPE_INT, NULL,
0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&udcm_recv_count);
udcm_timeout = UDCM_MIN_TIMEOUT;
(void) mca_base_component_var_register(&mca_btl_openib_component.super.btl_version,
"connect_udcm_timeout", "Ack timeout for udcm "
"connection messages", MCA_BASE_VAR_TYPE_INT, NULL,
0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&udcm_timeout);
udcm_max_retry = 25;
(void) mca_base_component_var_register(&mca_btl_openib_component.super.btl_version,
"connect_udcm_max_retry", "Maximum number of times "
"to retry sending a udcm connection message",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&udcm_max_retry);
}
static int udcm_component_query(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t **cpc)
{
udcm_module_t *m = NULL;
int rc = OPAL_ERR_NOT_SUPPORTED;
do {
/* If we do not have struct ibv_device.transport_device, then
we're in an old version of OFED that is IB only (i.e., no
iWarp), so we can safely assume that we can use this CPC. */
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE) && HAVE_DECL_IBV_LINK_LAYER_ETHERNET
if (BTL_OPENIB_CONNECT_BASE_CHECK_IF_NOT_IB(btl)) {
BTL_VERBOSE(("UD CPC only supported on InfiniBand; skipped on %s:%d",
ibv_get_device_name(btl->device->ib_dev),
btl->port_num));
break;
}
#endif
/* Allocate the module struct. Use calloc so that it's safe to
finalize the module if something goes wrong. */
m = calloc(1, sizeof(*m));
if (NULL == m) {
BTL_ERROR(("malloc failed!"));
rc = OPAL_ERR_OUT_OF_RESOURCE;
break;
}
if (udcm_priority > 100) {
udcm_priority = 100;
} else if (udcm_priority < 0) {
udcm_priority = 0;
}
if (UDCM_MIN_RECV_COUNT > udcm_recv_count) {
udcm_recv_count = UDCM_MIN_RECV_COUNT;
}
if (UDCM_MIN_TIMEOUT > udcm_timeout) {
udcm_timeout = UDCM_MIN_TIMEOUT;
}
rc = udcm_module_init (m, btl);
if (OPAL_SUCCESS != rc) {
break;
}
/* seed the random number generator */
udcm_random_seed = time (NULL);
opal_srand(&udcm_rand_buff,udcm_random_seed);
/* All done */
*cpc = (opal_btl_openib_connect_base_module_t *) m;
BTL_VERBOSE(("available for use on %s:%d",
ibv_get_device_name(btl->device->ib_dev),
btl->port_num));
return OPAL_SUCCESS;
} while (0);
udcm_module_finalize(btl, (opal_btl_openib_connect_base_module_t *) m);
if (OPAL_ERR_NOT_SUPPORTED == rc) {
BTL_VERBOSE(("unavailable for use on %s:%d; skipped",
ibv_get_device_name(btl->device->ib_dev),
btl->port_num));
} else {
BTL_VERBOSE(("unavailable for use on %s:%d; fatal error %d (%s)",
ibv_get_device_name(btl->device->ib_dev),
btl->port_num, rc,
opal_strerror(rc)));
}
return rc;
}
static int udcm_component_finalize(void)
{
return OPAL_SUCCESS;
}
/*--------------------------------------------------------------------*/
/*******************************************************************
* Module
*******************************************************************/
/* mark: udcm module */
#if HAVE_XRC
static int udcm_endpoint_init_self_xrc (struct mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int32_t recv_qpn;
int rc;
opal_mutex_lock (&udep->udep_lock);
do {
if (OPAL_SUCCESS != (rc = udcm_endpoint_init_data (lcl_ep))) {
BTL_VERBOSE(("error initializing loopback endpoint cpc data"));
break;
}
rc = udcm_xrc_send_qp_create (lcl_ep);
if (OPAL_SUCCESS != rc) {
BTL_VERBOSE(("error creating send queue pair for loopback endpoint"));
break;
}
lcl_ep->rem_info.rem_index = lcl_ep->index;
rc = udcm_xrc_recv_qp_create (lcl_ep, lcl_ep->qps[0].qp->lcl_qp->qp_num,
lcl_ep->qps[0].qp->lcl_psn);
if (OPAL_SUCCESS != rc) {
BTL_VERBOSE(("error creating loopback XRC receive queue pair"));
break;
}
for (int i = 0 ; i < mca_btl_openib_component.num_xrc_qps ; ++i) {
uint32_t srq_num;
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
if (ibv_get_srq_num(lcl_ep->endpoint_btl->qps[i].u.srq_qp.srq, &srq_num)) {
BTL_ERROR(("BTL openib UDCM internal error: can't get srq num"));
}
#else
srq_num = lcl_ep->endpoint_btl->qps[i].u.srq_qp.srq->xrc_srq_num;
#endif
lcl_ep->rem_info.rem_srqs[i].rem_srq_num = srq_num;
}
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
recv_qpn = lcl_ep->xrc_recv_qp->qp_num;
#else
recv_qpn = lcl_ep->xrc_recv_qp_num;
#endif
lcl_ep->ib_addr->remote_xrc_rcv_qp_num = recv_qpn;
lcl_ep->rem_info.rem_qps[0].rem_psn = lcl_ep->xrc_recv_psn;
lcl_ep->rem_info.rem_qps[0].rem_qp_num = recv_qpn;
rc = udcm_xrc_send_qp_connect (lcl_ep, recv_qpn, lcl_ep->xrc_recv_psn);
if (OPAL_SUCCESS != rc) {
BTL_VERBOSE(("error connecting loopback XRC send queue pair"));
break;
}
BTL_VERBOSE(("successfully created loopback queue pair"));
/* need to hold the endpoint lock before calling udcm_finish_connection */
OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock);
rc = udcm_finish_connection (lcl_ep);
} while (0);
opal_mutex_unlock (&udep->udep_lock);
return rc;
}
#endif
static int udcm_endpoint_init_self (struct mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int rc;
opal_mutex_lock (&udep->udep_lock);
do {
if (OPAL_SUCCESS != (rc = udcm_endpoint_init_data (lcl_ep))) {
BTL_VERBOSE(("error initializing loopback endpoint cpc data"));
break;
}
if (OPAL_SUCCESS != (rc = udcm_rc_qp_create_all (lcl_ep))) {
BTL_VERBOSE(("error initializing loopback endpoint qps"));
break;
}
/* save queue pair info */
lcl_ep->rem_info.rem_index = lcl_ep->index;
for (int i = 0 ; i < mca_btl_openib_component.num_qps ; ++i) {
lcl_ep->rem_info.rem_qps[i].rem_psn = lcl_ep->qps[i].qp->lcl_psn;
lcl_ep->rem_info.rem_qps[i].rem_qp_num = lcl_ep->qps[i].qp->lcl_qp->qp_num;
}
if (OPAL_SUCCESS != (rc = udcm_rc_qps_to_rts (lcl_ep))) {
BTL_VERBOSE(("error moving loopback endpoint qps to RTS"));
break;
}
/* need to hold the endpoint lock before calling udcm_finish_connection */
OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock);
rc = udcm_finish_connection (lcl_ep);
return OPAL_SUCCESS;
} while (0);
opal_mutex_unlock (&udep->udep_lock);
return rc;
}
static int udcm_endpoint_init (struct mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = lcl_ep->endpoint_local_cpc_data =
calloc(1, sizeof(udcm_endpoint_t));
if (NULL == udep) {
BTL_ERROR(("malloc failed!"));
return OPAL_ERR_OUT_OF_RESOURCE;
}
OBJ_CONSTRUCT(&udep->udep_lock, opal_mutex_t);
if (lcl_ep->endpoint_proc->proc_opal == opal_proc_local_get ()) {
/* go ahead and try to create a loopback queue pair */
#if HAVE_XRC
if (mca_btl_openib_component.num_xrc_qps > 0) {
return udcm_endpoint_init_self_xrc (lcl_ep);
} else
#endif
return udcm_endpoint_init_self (lcl_ep);
}
return OPAL_SUCCESS;
}
static int udcm_endpoint_finalize(struct mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
/* Free the stuff we allocated in udcm_endpoint_init */
if (NULL != udep) {
if (udep->ah) {
ibv_destroy_ah(udep->ah);
}
OBJ_DESTRUCT(&udep->udep_lock);
free(lcl_ep->endpoint_local_cpc_data);
lcl_ep->endpoint_local_cpc_data = NULL;
}
return OPAL_SUCCESS;
}
static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl)
{
int rc = OPAL_ERR_NOT_SUPPORTED;
BTL_VERBOSE(("created cpc module %p for btl %p",
(void*)m, (void*)btl));
OBJ_CONSTRUCT(&m->cm_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&m->cm_recv_msg_fifo, opal_fifo_t);
OBJ_CONSTRUCT(&m->flying_messages, opal_list_t);
OBJ_CONSTRUCT(&m->cm_timeout_lock, opal_mutex_t);
m->btl = btl;
/* Create completion channel */
m->cm_channel = ibv_create_comp_channel (btl->device->ib_dev_context);
if (NULL == m->cm_channel) {
BTL_VERBOSE(("error creating ud completion channel"));
return OPAL_ERR_NOT_SUPPORTED;
}
/* Create completion queues */
m->cm_recv_cq = ibv_create_cq (btl->device->ib_dev_context,
udcm_recv_count, NULL,
m->cm_channel, 0);
if (NULL == m->cm_recv_cq) {
BTL_VERBOSE(("error creating ud recv completion queue"));
return OPAL_ERR_NOT_SUPPORTED;
}
m->cm_send_cq = ibv_create_cq (btl->device->ib_dev_context,
UDCM_SEND_CQ_SIZE, NULL, NULL, 0);
if (NULL == m->cm_send_cq) {
BTL_VERBOSE(("error creating ud send completion queue"));
return OPAL_ERR_NOT_SUPPORTED;
}
if (0 != (rc = udcm_module_allocate_buffers (m))) {
BTL_VERBOSE(("error allocating cm buffers"));
return rc;
}
if (0 != (rc = udcm_module_create_listen_qp (m))) {
BTL_VERBOSE(("error creating UD QP"));
return rc;
}
if (0 != (rc = udcm_module_post_all_recvs (m))) {
BTL_VERBOSE(("error posting receives"));
return rc;
}
/* UD CM initialized properly. So fill in the rest of the CPC
module. */
m->cpc.data.cbm_component = &opal_btl_openib_connect_udcm;
m->cpc.data.cbm_priority = udcm_priority;
m->cpc.data.cbm_modex_message = &m->modex;
/* Initialize module modex */
m->modex.mm_lid = btl->lid;
m->modex.mm_port_num = btl->port_num;
m->modex.mm_qp_num = m->listen_qp->qp_num;
rc = ibv_query_gid (btl->device->ib_dev_context, btl->port_num,
mca_btl_openib_component.gid_index, &m->modex.mm_gid);
if (0 != rc) {
BTL_VERBOSE(("error querying port GID"));
return OPAL_ERROR;
}
BTL_VERBOSE(("my modex = LID: %d, Port: %d, QPN: %d, GID: %08x %08x",
m->modex.mm_lid, m->modex.mm_port_num, m->modex.mm_qp_num,
(unsigned int)m->modex.mm_gid.global.interface_id,
(unsigned int)m->modex.mm_gid.global.subnet_prefix));
m->cpc.data.cbm_modex_message_len = sizeof(m->modex);
/* Initialize module */
m->cpc.cbm_endpoint_init = udcm_endpoint_init;
m->cpc.cbm_start_connect = udcm_module_start_connect;
m->cpc.cbm_endpoint_finalize = udcm_endpoint_finalize;
m->cpc.cbm_finalize = udcm_module_finalize;
m->cpc.cbm_uses_cts = false;
m->cm_exiting = false;
/* Monitor the fd associated with the completion channel */
m->channel_evbase = opal_progress_thread_init (NULL);
opal_event_set (m->channel_evbase, &m->channel_event,
m->cm_channel->fd, OPAL_EV_READ | OPAL_EV_PERSIST,
udcm_cq_event_dispatch, m);
opal_event_add (&m->channel_event, 0);
udcm_timeout_tv.tv_sec = udcm_timeout / 1000000;
udcm_timeout_tv.tv_usec = udcm_timeout - 1000000 *
udcm_timeout_tv.tv_sec;
m->cm_message_event_active = 0;
/* set up the message event */
opal_event_set (opal_sync_event_base, &m->cm_message_event, -1,
OPAL_EV_READ, udcm_message_callback, m);
/* Finally, request CQ notification */
if (0 != ibv_req_notify_cq (m->cm_recv_cq, 0)) {
BTL_VERBOSE(("error requesting recv completions"));
return OPAL_ERROR;
}
/* Ready to use */
return OPAL_SUCCESS;
}
static int
udcm_module_start_connect(opal_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int rc = OPAL_SUCCESS;
BTL_VERBOSE(("endpoint %p (lid %d, ep index %d)",
(void*)lcl_ep, lcl_ep->endpoint_btl->port_info.lid,
lcl_ep->index));
#if HAVE_XRC
if (mca_btl_openib_component.num_xrc_qps > 0) {
return udcm_xrc_start_connect (cpc, lcl_ep);
}
#endif
opal_mutex_lock (&udep->udep_lock);
if (MCA_BTL_IB_CLOSED != lcl_ep->endpoint_state) {
opal_mutex_unlock (&udep->udep_lock);
BTL_VERBOSE(("already ongoing %p. state = %d",
(void *) lcl_ep, lcl_ep->endpoint_state));
return OPAL_SUCCESS;
}
do {
opal_atomic_wmb ();
lcl_ep->endpoint_state = MCA_BTL_IB_CONNECTING;
if (OPAL_SUCCESS != (rc = udcm_endpoint_init_data (lcl_ep))) {
BTL_VERBOSE(("error initializing endpoint cpc data"));
break;
}
if (OPAL_SUCCESS != (rc = udcm_rc_qp_create_all (lcl_ep))) {
BTL_VERBOSE(("error initializing endpoint qps"));
break;
}
rc = udcm_send_request (lcl_ep, NULL);
} while (0);
opal_mutex_unlock (&udep->udep_lock);
return rc;
}
static int udcm_module_finalize(mca_btl_openib_module_t *btl,
opal_btl_openib_connect_base_module_t *cpc)
{
udcm_module_t *m = (udcm_module_t *) cpc;
opal_list_item_t *item;
if (NULL == m) {
return OPAL_SUCCESS;
}
m->cm_exiting = true;
if (m->channel_evbase) {
opal_event_del (&m->channel_event);
opal_progress_thread_finalize (NULL);
}
opal_mutex_lock (&m->cm_lock);
/* clear message queue */
while (NULL != (item = opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&m->cm_recv_msg_fifo);
opal_mutex_lock (&m->cm_timeout_lock);
while ((item = opal_list_remove_first(&m->flying_messages))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&m->flying_messages);
opal_mutex_unlock (&m->cm_timeout_lock);
BTL_VERBOSE(("destroying listing thread"));
/* destroy the listen queue pair. this will cause ibv_get_cq_event to
return. */
udcm_module_destroy_listen_qp (m);
udcm_module_destroy_buffers (m);
if (m->cm_send_cq) {
if (0 != ibv_destroy_cq (m->cm_send_cq)) {
BTL_VERBOSE(("failed to destroy send CQ. errno = %d",
errno));
}
}
if (m->cm_recv_cq) {
if (0 != ibv_destroy_cq (m->cm_recv_cq)) {
BTL_VERBOSE(("failed to destroy recv CQ. errno = %d",
errno));
}
}
if (m->cm_channel) {
if (0 != ibv_destroy_comp_channel (m->cm_channel)) {
BTL_VERBOSE(("failed to completion channel. errno = %d",
errno));
}
m->cm_channel = NULL;
}
opal_mutex_unlock (&m->cm_lock);
OBJ_DESTRUCT(&m->cm_send_lock);
OBJ_DESTRUCT(&m->cm_lock);
OBJ_DESTRUCT(&m->cm_timeout_lock);
return OPAL_SUCCESS;
}
/*--------------------------------------------------------------------*/
static int udcm_module_create_listen_qp (udcm_module_t *m)
{
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
struct ibv_qp *qp;
BTL_VERBOSE(("creating listen QP on port %d", m->btl->port_num));
/* create the UD keypair */
memset(&init_attr, 0, sizeof(init_attr));
init_attr.qp_type = IBV_QPT_UD;
init_attr.send_cq = m->cm_send_cq;
init_attr.recv_cq = m->cm_recv_cq;
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1;
init_attr.cap.max_recv_wr = udcm_recv_count;
init_attr.cap.max_send_wr = 1;
qp = ibv_create_qp(m->btl->device->ib_pd, &init_attr);
if (NULL == qp) {
BTL_VERBOSE(("could not create UD listen queue pair"));
return OPAL_ERROR;
}
/* end: create the UD queue pair */
/* move the UD QP into the INIT state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = m->btl->pkey_index;
attr.port_num = m->btl->port_num;
attr.qkey = 0;
if (0 != ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX
| IBV_QP_PORT | IBV_QP_QKEY)) {
BTL_ERROR(("error modifying qp to INIT errno says %s",
strerror(errno)));
return OPAL_ERROR;
}
/* Move listen QP to RTR */
attr.qp_state = IBV_QPS_RTR;
if (0 != ibv_modify_qp(qp, &attr, IBV_QP_STATE)) {
BTL_ERROR(("error modifing QP to RTR errno says %s",
strerror(errno)));
return OPAL_ERROR;
}
/* Move listen QP to RTS */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = 0;
if (0 != ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN)) {
BTL_ERROR(("error modifing QP to RTS errno says %s; errno=%d",
strerror(errno), errno));
return OPAL_ERROR;
}
m->listen_qp = qp;
BTL_VERBOSE(("listening for connections on lid %d, qpn %d",
m->btl->lid, qp->qp_num));
return OPAL_SUCCESS;
}
static void udcm_module_destroy_listen_qp (udcm_module_t *m)
{
struct ibv_qp_attr attr;
struct ibv_wc wc;
if (NULL == m->listen_qp) {
return;
}
mca_btl_openib_async_add_qp_ignore (m->listen_qp);
do {
/* Move listen QP into the ERR state to cancel all outstanding
work requests */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_ERR;
attr.sq_psn = 0;
BTL_VERBOSE(("Setting qp to err state %p", (void *)m->listen_qp));
if (0 != ibv_modify_qp(m->listen_qp, &attr, IBV_QP_STATE)) {
BTL_VERBOSE(("error modifying qp to ERR. errno = %d",
errno));
break;
}
while (ibv_poll_cq (m->cm_recv_cq, 1, &wc) > 0);
/* move the QP into the RESET state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RESET;
if (0 != ibv_modify_qp(m->listen_qp, &attr, IBV_QP_STATE)) {
BTL_VERBOSE(("error modifying qp to RESET. errno = %d",
errno));
break;
}
} while (0);
if (0 != ibv_destroy_qp (m->listen_qp)) {
BTL_VERBOSE(("error destroying listen qp. errno = %d",
errno));
}
m->listen_qp = NULL;
}
static int udcm_module_allocate_buffers (udcm_module_t *m)
{
size_t total_size, page_size;
m->msg_length = sizeof (udcm_msg_hdr_t) +
mca_btl_openib_component.num_qps * sizeof (udcm_qp_t);
total_size = (udcm_recv_count + 1) * (m->msg_length +
UDCM_GRH_SIZE);
page_size = opal_getpagesize();
total_size = OPAL_ALIGN(total_size, page_size, size_t);
m->cm_buffer = NULL;
posix_memalign ((void **)&m->cm_buffer, (size_t)page_size,
total_size);
if (NULL == m->cm_buffer) {
BTL_ERROR(("malloc failed! errno = %d", errno));
return OPAL_ERR_OUT_OF_RESOURCE;
}
/* mark buffer memory as initialized for valgrind's sake */
memset (m->cm_buffer, 0, total_size);
m->cm_mr = ibv_reg_mr (m->btl->device->ib_pd, m->cm_buffer,
total_size, IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE);
if (NULL == m->cm_mr) {
BTL_ERROR(("failed to register memory. errno = %d", errno));
return OPAL_ERROR;
}
m->cm_send_buffer = m->cm_buffer + ((UDCM_GRH_SIZE +
m->msg_length) *
udcm_recv_count);
return 0;
}
static void udcm_module_destroy_buffers (udcm_module_t *m)
{
if (m->cm_mr) {
if (0 != ibv_dereg_mr (m->cm_mr)) {
BTL_VERBOSE(("failed to deregister memory. errno = %d",
errno));
}
m->cm_mr = NULL;
}
if (m->cm_buffer) {
free (m->cm_buffer);
}
}
static inline char *udcm_module_get_recv_buffer (udcm_module_t *m,
int msg_num, bool skip_grh)
{
return m->cm_buffer + msg_num * (m->msg_length + UDCM_GRH_SIZE) +
skip_grh * UDCM_GRH_SIZE;
}
static inline char *udcm_module_get_send_buffer (udcm_module_t *m)
{
return m->cm_send_buffer;
}
static int udcm_module_post_one_recv (udcm_module_t *m, int msg_num)
{
char *recv_buffer = udcm_module_get_recv_buffer (m, msg_num, 0);
struct ibv_recv_wr wr, *bad_wr;
struct ibv_sge sge;
int rc;
/* GRH + request data*/
sge.addr = (uintptr_t) recv_buffer;
sge.length = UDCM_GRH_SIZE + m->msg_length;
sge.lkey = m->cm_mr->lkey;
wr.next = NULL;
wr.wr_id = UDCM_WR_RECV_ID | (uint64_t)msg_num;
wr.sg_list = &sge;
wr.num_sge = 1;
rc = ibv_post_recv (m->listen_qp, &wr, &bad_wr);
if (0 != rc) {
BTL_VERBOSE(("error posting receive. errno = %d", errno));
}
return (0 == rc) ? OPAL_SUCCESS : OPAL_ERROR;
}
static int udcm_module_post_all_recvs (udcm_module_t *m)
{
int i, rc;
for (i = 0 ; i < udcm_recv_count ; ++i) {
if (0 != (rc = udcm_module_post_one_recv (m, i))) {
return rc;
}
}
return 0;
}
/*--------------------------------------------------------------------*/
/* mark: helper functions */
/* Returns max inlne size for qp #N */
static uint32_t max_inline_size(int qp, mca_btl_openib_device_t *device)
{
if (mca_btl_openib_component.qp_infos[qp].size <= device->max_inline_data) {
/* If qp message size is smaller than max_inline_data,
* we should enable inline messages */
return mca_btl_openib_component.qp_infos[qp].size;
} else if (mca_btl_openib_component.rdma_qp == qp || 0 == qp) {
/* If qp message size is bigger that max_inline_data, we
* should enable inline messages only for RDMA QP (for PUT/GET
* fin messages) and for the first qp */
return device->max_inline_data;
}
/* Otherway it is no reason for inline */
return 0;
}
/* Using OPAL's Additive Lagged Fibonacci RNG */
static inline uint32_t udcm_random (void)
{
return opal_rand(&udcm_rand_buff);
}
/* mark: rc helper functions */
static inline int udcm_rc_qp_to_init (struct ibv_qp *qp,
mca_btl_openib_module_t *btl)
{
enum ibv_qp_attr_mask attr_mask;
struct ibv_qp_attr attr;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = btl->pkey_index;
attr.port_num = btl->port_num;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
#if HAVE_DECL_IBV_ATOMIC_HCA
attr.qp_access_flags |= IBV_ACCESS_REMOTE_ATOMIC;
#endif
attr_mask = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS;
if (0 != ibv_modify_qp(qp, &attr, attr_mask)) {
BTL_ERROR(("error modifying qp to INIT errno says %s",
strerror(errno)));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static inline int udcm_rc_qp_to_rtr (mca_btl_base_endpoint_t *lcl_ep,
int qp_index)
{
struct ibv_qp *qp = lcl_ep->qps[qp_index].qp->lcl_qp;
mca_btl_openib_module_t *btl = lcl_ep->endpoint_btl;
struct ibv_qp_attr attr;
enum ibv_mtu mtu;
int rc;
mtu = (btl->device->mtu < lcl_ep->rem_info.rem_mtu) ?
btl->device->mtu : lcl_ep->rem_info.rem_mtu;
/* Move the QP into the RTR state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
/* Setup attributes */
attr.path_mtu = mtu;
attr.max_dest_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
attr.min_rnr_timer = mca_btl_openib_component.ib_min_rnr_timer;
attr.dest_qp_num = lcl_ep->rem_info.rem_qps[qp_index].rem_qp_num;
attr.rq_psn = lcl_ep->rem_info.rem_qps[qp_index].rem_psn;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = lcl_ep->rem_info.rem_lid;
attr.ah_attr.src_path_bits = btl->src_path_bits;
attr.ah_attr.port_num = btl->port_num;
attr.ah_attr.sl = mca_btl_openib_component.ib_service_level;
attr.ah_attr.static_rate = 0;
#if (ENABLE_DYNAMIC_SL)
/* if user enabled dynamic SL, get it from PathRecord */
if (0 != mca_btl_openib_component.ib_path_record_service_level) {
int rc = btl_openib_connect_get_pathrecord_sl(qp->context,
attr.ah_attr.port_num,
btl->lid,
attr.ah_attr.dlid);
if (OPAL_ERROR == rc) {
return OPAL_ERROR;
}
attr.ah_attr.sl = rc;
}
#endif
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_PATH_MTU |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER |
IBV_QP_RQ_PSN | IBV_QP_AV | IBV_QP_DEST_QPN);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP to RTR errno says %s", strerror(errno)));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static inline int udcm_rc_qp_to_rts (mca_btl_base_endpoint_t *lcl_ep,
int qp_index)
{
struct ibv_qp *qp = lcl_ep->qps[qp_index].qp->lcl_qp;
struct ibv_qp_attr attr;
int rc;
BTL_VERBOSE(("transitioning QP %p to RTS", (void *)qp));
/* Move the QP into the RTS state */
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = mca_btl_openib_component.ib_timeout;
attr.retry_cnt = mca_btl_openib_component.ib_retry_count;
/* On PP QPs we have SW flow control, no need for rnr retries. Setting
* it to zero helps to catch bugs */
attr.rnr_retry = BTL_OPENIB_QP_TYPE_PP(qp_index) ? 0 :
mca_btl_openib_component.ib_rnr_retry;
attr.sq_psn = lcl_ep->qps[qp_index].qp->lcl_psn;
attr.max_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
rc = ibv_modify_qp(qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
if (OPAL_UNLIKELY(0 != rc)) {
BTL_ERROR(("error modifing QP %p to RTS errno says %s",
(void *) qp, strerror(errno)));
return OPAL_ERROR;
}
BTL_VERBOSE(("successfully set RTS"));
return OPAL_SUCCESS;
}
/*--------------------------------------------------------------------*/
/*
* We have received information about the remote peer's QP; move the
* local QP from INIT to RTS through RTR.
*/
static int udcm_rc_qps_to_rts(mca_btl_openib_endpoint_t *lcl_ep)
{
int rc;
for (int qp = 0 ; qp < mca_btl_openib_component.num_qps ; ++qp) {
if (lcl_ep->qps[qp].qp->lcl_qp->state == IBV_QPS_RTS) {
continue;
}
rc = udcm_rc_qp_to_rtr (lcl_ep, qp);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
BTL_VERBOSE(("failed moving QP to RTR"));
return rc;
}
rc = udcm_rc_qp_to_rts (lcl_ep, qp);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
BTL_VERBOSE(("failed moving QP to RTS"));
return rc;
}
}
/* Ensure that all the writes back to the endpoint and associated
* data structures have completed */
opal_atomic_wmb();
mca_btl_openib_endpoint_post_recvs(lcl_ep);
/* All done */
return OPAL_SUCCESS;
}
/*
* Create the local side of one qp. The remote side will be connected
* later.
*/
static int udcm_rc_qp_create_one(udcm_module_t *m, mca_btl_base_endpoint_t* lcl_ep,
int qp, struct ibv_srq *srq, uint32_t max_recv_wr,
uint32_t max_send_wr)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
#if HAVE_DECL_IBV_EXP_CREATE_QP
struct ibv_exp_qp_init_attr init_attr;
#else
struct ibv_qp_init_attr init_attr;
#endif
size_t req_inline;
int rc;
memset(&init_attr, 0, sizeof(init_attr));
init_attr.qp_type = IBV_QPT_RC;
init_attr.send_cq = m->btl->device->ib_cq[BTL_OPENIB_LP_CQ];
init_attr.recv_cq = m->btl->device->ib_cq[qp_cq_prio(qp)];
init_attr.srq = srq;
init_attr.cap.max_inline_data = req_inline =
max_inline_size(qp, m->btl->device);
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1; /* we do not use SG list */
if(BTL_OPENIB_QP_TYPE_PP(qp)) {
init_attr.cap.max_recv_wr = max_recv_wr;
} else {
init_attr.cap.max_recv_wr = 0;
}
init_attr.cap.max_send_wr = max_send_wr;
#if HAVE_DECL_IBV_EXP_CREATE_QP
/* use expanded verbs qp create to enable use of mlx5 atomics */
init_attr.comp_mask = IBV_EXP_QP_INIT_ATTR_PD;
init_attr.pd = m->btl->device->ib_pd;
#if HAVE_DECL_IBV_EXP_QP_INIT_ATTR_ATOMICS_ARG
init_attr.comp_mask |= IBV_EXP_QP_INIT_ATTR_ATOMICS_ARG;
init_attr.max_atomic_arg = sizeof (int64_t);
#endif
#if HAVE_DECL_IBV_EXP_ATOMIC_HCA_REPLY_BE
if (IBV_EXP_ATOMIC_HCA_REPLY_BE == m->btl->device->ib_exp_dev_attr.exp_atomic_cap) {
init_attr.exp_create_flags = IBV_EXP_QP_CREATE_ATOMIC_BE_REPLY;
init_attr.comp_mask |= IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS;
}
#endif
while (NULL == (lcl_ep->qps[qp].qp->lcl_qp = ibv_exp_create_qp (m->btl->device->ib_dev_context,
&init_attr))) {
/* NTH: this process may be out of registered memory. try evicting an item from
the lru of this btl's mpool */
if (false == m->btl->device->rcache->rcache_evict (m->btl->device->rcache)) {
break;
}
}
#else
while (NULL == (lcl_ep->qps[qp].qp->lcl_qp = ibv_create_qp(m->btl->device->ib_pd,
&init_attr))) {
/* NTH: this process may be out of registered memory. try evicting an item from
the lru of this btl's mpool */
if (false == m->btl->device->rcache->rcache_evict (m->btl->device->rcache)) {
break;
}
}
#endif
if (NULL == lcl_ep->qps[qp].qp->lcl_qp) {
opal_show_help("help-mpi-btl-openib-cpc-base.txt",
"ibv_create_qp failed", true, opal_process_info.nodename,
ibv_get_device_name(m->btl->device->ib_dev),
"Reliable connected (RC)");
return OPAL_ERROR;
}
if (init_attr.cap.max_inline_data < req_inline) {
lcl_ep->qps[qp].ib_inline_max = init_attr.cap.max_inline_data;
opal_show_help("help-mpi-btl-openib-cpc-base.txt",
"inline truncated", true, opal_process_info.nodename,
ibv_get_device_name(m->btl->device->ib_dev),
m->btl->port_num, req_inline,
init_attr.cap.max_inline_data);
} else {
lcl_ep->qps[qp].ib_inline_max = req_inline;
}
/* Setup meta data on the endpoint */
lcl_ep->qps[qp].qp->lcl_psn = udcm_random () & 0x00ffffff;
lcl_ep->qps[qp].credit_frag = NULL;
rc = udcm_rc_qp_to_init (lcl_ep->qps[qp].qp->lcl_qp, m->btl);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
return rc;
}
/* If we have already received a request go ahead and move to
RTS. */
if (udep->recv_req) {
rc = udcm_rc_qp_to_rtr (lcl_ep, qp);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
return rc;
}
return udcm_rc_qp_to_rts (lcl_ep, qp);
}
return OPAL_SUCCESS;
}
/*
* Create the local side of all the qp's. The remote sides will be
* connected later.
* NTH: This code is common to (and repeated by) all non-XRC cpcs.
*/
static int udcm_rc_qp_create_all (mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
int qp, rc, pp_qp_num = 0;
int32_t rd_rsv_total = 0;
if (udep->udep_created_qps)
return OPAL_SUCCESS;
for (qp = 0; qp < mca_btl_openib_component.num_qps; ++qp) {
if (BTL_OPENIB_QP_TYPE_PP(qp)) {
rd_rsv_total +=
mca_btl_openib_component.qp_infos[qp].u.pp_qp.rd_rsv;
pp_qp_num++;
}
}
/* if there is no pp QPs we still need reserved WQE for eager rdma flow
* control */
if (0 == pp_qp_num && true == lcl_ep->use_eager_rdma) {
pp_qp_num = 1;
}
for (qp = 0; qp < mca_btl_openib_component.num_qps; ++qp) {
struct ibv_srq *srq = NULL;
uint32_t max_recv_wr, max_send_wr;
int32_t rd_rsv, rd_num_credits;
/* QP used for SW flow control need some additional recourses */
if (qp == mca_btl_openib_component.credits_qp) {
rd_rsv = rd_rsv_total;
rd_num_credits = pp_qp_num;
} else {
rd_rsv = rd_num_credits = 0;
}
if (BTL_OPENIB_QP_TYPE_PP(qp)) {
max_recv_wr = mca_btl_openib_component.qp_infos[qp].rd_num +
rd_rsv;
max_send_wr = mca_btl_openib_component.qp_infos[qp].rd_num +
rd_num_credits;
} else {
srq = lcl_ep->endpoint_btl->qps[qp].u.srq_qp.srq;
max_recv_wr = mca_btl_openib_component.qp_infos[qp].rd_num
+ rd_rsv;
max_send_wr = mca_btl_openib_component.qp_infos[qp].u.srq_qp.sd_max
+ rd_num_credits;
}
/* Go create the actual qp */
rc = udcm_rc_qp_create_one (m, lcl_ep, qp, srq, max_recv_wr, max_send_wr);
if (OPAL_SUCCESS != rc) {
BTL_VERBOSE(("error creating qp %d for endpoint %p", qp, (void *) lcl_ep));
return rc;
}
}
/* All done! */
udep->udep_created_qps = true;
return OPAL_SUCCESS;
}
/* mark: endpoint helper functions */
/* JMS: optimization target -- can we send something in private
data to find the proc directly instead of having to search
through *all* procs? */
static mca_btl_openib_endpoint_t *udcm_find_endpoint (struct mca_btl_openib_module_t *btl,
uint32_t qp_num, uint16_t lid,
udcm_msg_hdr_t *msg_hdr)
{
mca_btl_base_endpoint_t *endpoint;
struct opal_proc_t *opal_proc;
opal_proc = opal_proc_for_name (msg_hdr->data.req.rem_name);
if (NULL == opal_proc) {
BTL_ERROR(("could not get proc associated with remote peer"));
return NULL;
}
endpoint = mca_btl_openib_get_ep (&btl->super, opal_proc);
if (NULL == endpoint) {
BTL_ERROR(("could not find endpoint with port: %d, lid: %d, msg_type: %d",
msg_hdr->data.req.rem_port_num, lid, msg_hdr->type));
}
return endpoint;
}
static int udcm_endpoint_init_data (mca_btl_base_endpoint_t *lcl_ep)
{
modex_msg_t *remote_msg = UDCM_ENDPOINT_REM_MODEX(lcl_ep);
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
struct ibv_ah_attr ah_attr;
int rc = OPAL_SUCCESS;
do {
if (udep->udep_initialized)
break;
/* Cache an address handle for this endpoint */
memset(&ah_attr, 0, sizeof(ah_attr));
ah_attr.dlid = lcl_ep->rem_info.rem_lid;
ah_attr.port_num = remote_msg->mm_port_num;
ah_attr.sl = mca_btl_openib_component.ib_service_level;
ah_attr.src_path_bits = lcl_ep->endpoint_btl->src_path_bits;
if (0 != memcmp (&remote_msg->mm_gid, &m->modex.mm_gid, sizeof (m->modex.mm_gid))) {
ah_attr.is_global = 1;
ah_attr.grh.flow_label = 0;
ah_attr.grh.dgid = remote_msg->mm_gid;
ah_attr.grh.sgid_index = mca_btl_openib_component.gid_index;
/* NTH: probably won't need to go over more than a single router. changeme if this
* assumption is wrong. this value should never be <= 1 as it will not leave the
* the subnet. */
ah_attr.grh.hop_limit = 2;
/* Seems reasonable to set this to 0 for connection messages. */
ah_attr.grh.traffic_class = 0;
}
udep->ah = ibv_create_ah (lcl_ep->endpoint_btl->device->ib_pd, &ah_attr);
if (!udep->ah) {
rc = OPAL_ERROR;
break;
}
} while (0);
if (OPAL_SUCCESS == rc) {
udep->udep_initialized = true;
}
return rc;
}
/* mark: ud send */
static inline int udcm_wait_for_send_completion (udcm_module_t *m)
{
struct ibv_wc wc;
int rc;
do {
rc = ibv_poll_cq (m->cm_send_cq, 1, &wc);
if (0 > rc) {
BTL_VERBOSE(("send failed"));
return OPAL_ERROR;
} else if (0 == rc) {
continue;
} else if (IBV_WC_SUCCESS != wc.status) {
BTL_ERROR(("send failed with verbs status %d", wc.status));
return OPAL_ERROR;
}
break;
} while (1);
return OPAL_SUCCESS;
}
static int udcm_post_send (mca_btl_base_endpoint_t *lcl_ep, void *data,
int length, int lkey)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
volatile static int msg_num = 0;
struct ibv_send_wr wr, *bad_wr;
struct ibv_sge sge;
int rc;
/* NTH: need to lock here or we run into problems (slowness) */
opal_mutex_lock(&m->cm_send_lock);
if (0 == lkey) {
/* copy the message into the registered send buffer */
sge.addr = (uintptr_t) udcm_module_get_send_buffer (m);
sge.length = length;
sge.lkey = m->cm_mr->lkey;
memcpy ((uintptr_t *)sge.addr, data, length);
} else {
sge.addr = (uintptr_t) data;
sge.length = length;
sge.lkey = lkey;
}
wr.wr_id = UDCM_WR_SEND_ID | msg_num++;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND;
wr.send_flags = IBV_SEND_SOLICITED | IBV_SEND_SIGNALED;
wr.wr.ud.ah = udep->ah;
wr.wr.ud.remote_qpn = UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num;
wr.wr.ud.remote_qkey = 0;
rc = ibv_post_send (m->listen_qp, &wr, &bad_wr);
if (0 != rc) {
BTL_VERBOSE(("error posting send. errno: %d", errno));
} else {
rc = udcm_wait_for_send_completion (m);
}
opal_mutex_unlock (&m->cm_send_lock);
return rc;
}
/* mark: message allocation */
static int udcm_new_message (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep, uint8_t type,
size_t length, udcm_message_sent_t **msgp)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
udcm_message_sent_t *message;
message = OBJ_NEW(udcm_message_sent_t);
if (NULL == message) {
BTL_ERROR(("malloc failed!"));
return OPAL_ERR_OUT_OF_RESOURCE;
}
message->data = calloc (m->msg_length, 1);
if (NULL == message->data) {
OBJ_RELEASE(message);
return OPAL_ERR_OUT_OF_RESOURCE;
}
message->length = length;
message->data->hdr.rem_ep = lcl_ep;
message->data->hdr.lcl_ep = rem_ep;
message->data->hdr.type = type;
message->data->hdr.rem_ctx = (uintptr_t) message;
message->endpoint = lcl_ep;
udcm_set_message_timeout (message);
opal_atomic_wmb ();
*msgp = message;
BTL_VERBOSE(("created message %p with type %d", (void *) message, type));
return OPAL_SUCCESS;
}
/* mark: rc message functions */
/*
* Allocate a CM request structure and initialize some common fields
* (that are independent of the specific QP, etc.)
*/
static int udcm_send_request (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
udcm_message_sent_t *msg;
int i, rc;
BTL_VERBOSE(("sending request for endpoint %p", (void *) lcl_ep));
udep->sent_req = true;
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, UDCM_MESSAGE_CONNECT,
m->msg_length, &msg))) {
return rc;
}
msg->data->hdr.data.req.rem_ep_index = htonl(lcl_ep->index);
msg->data->hdr.data.req.rem_port_num = m->modex.mm_port_num;
msg->data->hdr.data.req.rem_name = OPAL_PROC_MY_NAME;
for (i = 0 ; i < mca_btl_openib_component.num_qps ; ++i) {
msg->data->qps[i].psn = htonl(lcl_ep->qps[i].qp->lcl_psn);
msg->data->qps[i].qp_num = htonl(lcl_ep->qps[i].qp->lcl_qp->qp_num);
}
if (0 != (rc = udcm_post_send (lcl_ep, msg->data, m->msg_length, 0))) {
BTL_VERBOSE(("error posting REQ"));
udcm_free_message (msg);
return rc;
}
return 0;
}
static int udcm_send_complete (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep)
{
udcm_message_sent_t *msg;
int rc;
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, UDCM_MESSAGE_COMPLETE,
sizeof (udcm_msg_hdr_t), &msg))) {
return rc;
}
rc = udcm_post_send (lcl_ep, msg->data, sizeof (udcm_msg_hdr_t), 0);
if (0 != rc) {
BTL_VERBOSE(("error posting complete"));
udcm_free_message (msg);
return rc;
}
return 0;
}
static int udcm_send_reject (mca_btl_base_endpoint_t *lcl_ep,
mca_btl_base_endpoint_t *rem_ep,
int rej_reason)
{
udcm_message_sent_t *msg;
int rc;
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, UDCM_MESSAGE_REJECT,
sizeof (udcm_msg_hdr_t), &msg))) {
return rc;
}
msg->data->hdr.data.rej.reason = htonl(rej_reason);
rc = udcm_post_send (lcl_ep, msg->data, sizeof (udcm_msg_hdr_t), 0);
if (0 != rc) {
BTL_VERBOSE(("error posting rejection"));
udcm_free_message (msg);
return rc;
}
return 0;
}
static int udcm_send_ack (mca_btl_base_endpoint_t *lcl_ep, uintptr_t rem_ctx)
{
udcm_msg_hdr_t hdr;
BTL_VERBOSE(("sending ack for message %p on ep %p", (void *) rem_ctx, (void *) lcl_ep));
hdr.type = UDCM_MESSAGE_ACK;
hdr.rem_ctx = rem_ctx;
return udcm_post_send (lcl_ep, &hdr, sizeof (hdr), 0);
}
static int udcm_handle_ack (udcm_module_t *m, const uintptr_t ctx, const uint16_t slid,
const uint32_t rem_qp)
{
udcm_message_sent_t *msg, *next;
bool found = false;
opal_mutex_lock (&m->cm_timeout_lock);
BTL_VERBOSE(("got ack for message %p from slid 0x%04x qp 0x%08x", (void *) ctx, slid,
rem_qp));
/* verify that the message is still active */
OPAL_LIST_FOREACH_SAFE(msg, next, &m->flying_messages, udcm_message_sent_t) {
if ((uintptr_t) msg != ctx) {
continue;
}
BTL_VERBOSE(("found matching message"));
found = true;
/* mark that this event is not active anymore */
msg->event_active = false;
/* there is a possibility this event is being handled by another thread right now. it
* should be safe to activate the event even in this case. the callback will handle
* releasing the message. this is done to avoid a race between the message handling
* thread and the thread progressing libevent. if the message handler is ever put
* in the event base then it will be safe to just release the message here but that
* is not the case atm. */
opal_event_active (&msg->event, 0, 0);
break;
}
if (!found) {
BTL_VERBOSE(("message %p not found in the list of flying messages", (void *) ctx));
}
opal_mutex_unlock (&m->cm_timeout_lock);
return OPAL_SUCCESS;
}
/* mark: rc message handling */
static int udcm_handle_connect(mca_btl_openib_endpoint_t *lcl_ep,
mca_btl_openib_endpoint_t *rem_ep)
{
udcm_reject_reason_t rej_reason = UDCM_REJ_REMOTE_ERROR;
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int rc = OPAL_ERROR;
if (NULL == udep) {
return OPAL_ERROR;
}
do {
opal_mutex_lock (&udep->udep_lock);
if (true == udep->recv_req) {
/* this endpoint is already connected */
BTL_VERBOSE(("already connected"));
rc = OPAL_SUCCESS;
rej_reason = UDCM_REJ_ALREADY_CONNECTED;
break;
}
udep->recv_req = true;
opal_atomic_wmb ();
if (MCA_BTL_IB_CLOSED == lcl_ep->endpoint_state) {
lcl_ep->endpoint_state = MCA_BTL_IB_CONNECTING;
}
if (OPAL_SUCCESS != (rc = udcm_rc_qp_create_all (lcl_ep))) {
BTL_VERBOSE(("error initializing endpoint qps"));
break;
}
rc = udcm_rc_qps_to_rts (lcl_ep);
if (OPAL_SUCCESS != rc) {
break;
}
if (false == udep->sent_req) {
rc = udcm_send_request (lcl_ep, rem_ep);
if (OPAL_SUCCESS != rc) {
break;
}
}
rc = udcm_send_complete (lcl_ep, rem_ep);
if (OPAL_SUCCESS != rc) {
break;
}
if (udep->recv_comp) {
udcm_finish_connection (lcl_ep);
}
opal_mutex_unlock (&udep->udep_lock);
return OPAL_SUCCESS;
} while (0);
opal_mutex_unlock (&udep->udep_lock);
/* Reject the request */
BTL_VERBOSE(("rejecting request for reason %d", rej_reason));
udcm_send_reject (lcl_ep, rem_ep, rej_reason);
if (OPAL_SUCCESS != rc) {
/* Communicate to the upper layer that the connection on this
endpoint has failed */
mca_btl_openib_endpoint_invoke_error (lcl_ep);
}
return rc;
}
static int udcm_handle_reject(mca_btl_openib_endpoint_t *lcl_ep,
udcm_msg_hdr_t *msg_hdr)
{
int32_t reason = ntohl(msg_hdr->data.rej.reason);
BTL_VERBOSE(("reject received: reason %d", reason));
if (UDCM_REJ_ALREADY_CONNECTED == reason) {
return OPAL_SUCCESS;
}
#if HAVE_XRC
else if (UDCM_REJ_NOT_READY == reason) {
return udcm_xrc_restart_connect (lcl_ep);
}
#endif
/* Communicate to the upper layer that the connection on this
endpoint has failed */
mca_btl_openib_endpoint_invoke_error (lcl_ep);
return OPAL_ERR_NOT_FOUND;
}
static int udcm_finish_connection (mca_btl_openib_endpoint_t *lcl_ep)
{
BTL_VERBOSE(("finishing connection for endpoint %p.", (void *) lcl_ep));
/* Ensure that all the writes back to the endpoint and associated
data structures have completed */
opal_atomic_wmb();
mca_btl_openib_endpoint_cpc_complete(lcl_ep);
return OPAL_SUCCESS;
}
static int udcm_handle_complete (mca_btl_openib_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
udep->recv_comp = true;
if (udep->recv_req) {
udcm_finish_connection (lcl_ep);
} else {
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
}
return OPAL_SUCCESS;
}
/* mark: message processing */
static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m)
{
mca_btl_openib_endpoint_t *lcl_ep;
int msg_num, i, count;
udcm_msg_t *message = NULL;
udcm_message_recv_t *item;
struct ibv_wc wc[20];
#if OPAL_ENABLE_DEBUG
struct ibv_grh *grh;
#endif
udcm_endpoint_t *udep;
uint64_t dir;
memset(wc, 0, sizeof(wc));
count = ibv_poll_cq (event_cq, 20, wc);
if (count < 0)
return count;
for (i = 0 ; i < count ; i++) {
dir = wc[i].wr_id & UDCM_WR_DIR_MASK;
if (UDCM_WR_RECV_ID != dir) {
opal_output (0, "unknown packet");
continue;
}
msg_num = (int)(wc[i].wr_id & (~UDCM_WR_DIR_MASK));
#if OPAL_ENABLE_DEBUG
grh = (wc[i].wc_flags & IBV_WC_GRH) ? (struct ibv_grh *) udcm_module_get_recv_buffer (m, msg_num, false) : NULL;
#endif
BTL_VERBOSE(("WC: wr_id: 0x%016" PRIu64 ", status: %d, opcode: 0x%x, byte_len: %x, imm_data: 0x%08x, "
"qp_num: 0x%08x, src_qp: 0x%08x, wc_flags: 0x%x, slid: 0x%04x grh_present: %s",
wc[i].wr_id, wc[i].status, wc[i].opcode, wc[i].byte_len,
wc[i].imm_data, wc[i].qp_num, wc[i].src_qp, wc[i].wc_flags, wc[i].slid,
grh ? "yes" : "no"));
if (IBV_WC_SUCCESS != wc[i].status) {
BTL_ERROR(("recv work request for buffer %d failed, code = %d",
msg_num, wc[i].status));
count = -1;
break;
}
message = (udcm_msg_t *) udcm_module_get_recv_buffer (m, msg_num, true);
if (UDCM_MESSAGE_ACK == message->hdr.type) {
/* ack! */
udcm_handle_ack (m, message->hdr.rem_ctx, wc[i].slid, wc[i].src_qp);
udcm_module_post_one_recv (m, msg_num);
continue;
}
lcl_ep = message->hdr.lcl_ep;
if (NULL == lcl_ep) {
lcl_ep = udcm_find_endpoint (m->btl, wc[i].src_qp, wc[i].slid, &message->hdr);
}
if (NULL == lcl_ep ) {
/* cant find associated endpoint */
BTL_ERROR(("could not find associated endpoint."));
udcm_module_post_one_recv (m, msg_num);
continue;
}
message->hdr.lcl_ep = lcl_ep;
BTL_VERBOSE(("received message. type: %u, lcl_ep = %p, rem_ep = %p, "
"src qpn = %d, length = %d, local buffer # = %d",
message->hdr.type, (void *) message->hdr.lcl_ep, (void *) message->hdr.rem_ep,
wc[i].src_qp, wc[i].byte_len, msg_num));
udep = UDCM_ENDPOINT_DATA(lcl_ep);
if (NULL == udep) {
/* Endpoint was not initialized or was finalized */
udcm_module_post_one_recv (m, msg_num);
continue;
}
opal_mutex_lock (&udep->udep_lock);
/* Need to ensure endpoint data is initialized before sending the ack */
if (OPAL_SUCCESS != udcm_endpoint_init_data (lcl_ep)) {
BTL_ERROR(("could not initialize cpc data for endpoint"));
udcm_module_post_one_recv (m, msg_num);
opal_mutex_unlock (&udep->udep_lock);
continue;
}
/* save message data in the endpoint */
if (UDCM_MESSAGE_CONNECT == message->hdr.type) {
/* Save remote queue pair information */
int num_qps = mca_btl_openib_component.num_qps;
lcl_ep->rem_info.rem_index = ntohl(message->hdr.data.req.rem_ep_index);
for (int qp_index = 0 ; qp_index < num_qps ; ++qp_index) {
/* Save these numbers on the endpoint for reference. */
lcl_ep->rem_info.rem_qps[qp_index].rem_psn =
ntohl(message->qps[qp_index].psn);
lcl_ep->rem_info.rem_qps[qp_index].rem_qp_num =
ntohl(message->qps[qp_index].qp_num);
}
}
#if HAVE_XRC
else if (UDCM_MESSAGE_XRESPONSE == message->hdr.type ||
UDCM_MESSAGE_XRESPONSE2 == message->hdr.type) {
/* save remote srq information */
int num_srqs = mca_btl_openib_component.num_xrc_qps;
lcl_ep->rem_info.rem_index = ntohl(message->hdr.data.xres.rem_ep_index);
for (int i = 0 ; i < num_srqs ; ++i) {
lcl_ep->rem_info.rem_srqs[i].rem_srq_num = ntohl(message->qps[i].qp_num);
BTL_VERBOSE(("Received srq[%d] num = %d", i, lcl_ep->rem_info.rem_srqs[i].rem_srq_num));
}
if (UDCM_MESSAGE_XRESPONSE == message->hdr.type) {
/* swap response header data */
message->hdr.data.xres.rem_psn = ntohl(message->hdr.data.xres.rem_psn);
message->hdr.data.xres.rem_qp_num = ntohl(message->hdr.data.xres.rem_qp_num);
/* save remote qp information not included in the XRESPONSE2 message */
lcl_ep->rem_info.rem_qps[0].rem_psn = message->hdr.data.xres.rem_psn;
lcl_ep->rem_info.rem_qps[0].rem_qp_num = message->hdr.data.xres.rem_qp_num;
BTL_VERBOSE(("Received remote qp: %d, psn: %d", lcl_ep->rem_info.rem_qps[0].rem_qp_num,
lcl_ep->rem_info.rem_qps[0].rem_psn))
/* update ib_addr with remote qp number */
lcl_ep->ib_addr->remote_xrc_rcv_qp_num = lcl_ep->rem_info.rem_qps[0].rem_qp_num;
}
} else if (UDCM_MESSAGE_XCONNECT == message->hdr.type ||
UDCM_MESSAGE_XCONNECT2 == message->hdr.type) {
lcl_ep->rem_info.rem_index = ntohl(message->hdr.data.xreq.rem_ep_index);
/* swap request header data */
message->hdr.data.xreq.rem_qp_num = ntohl(message->hdr.data.xreq.rem_qp_num);
message->hdr.data.xreq.rem_psn = ntohl(message->hdr.data.xreq.rem_psn);
if (UDCM_MESSAGE_XCONNECT2 == message->hdr.type) {
/* save the qp number for unregister */
#if ! OPAL_HAVE_CONNECTX_XRC_DOMAINS
lcl_ep->xrc_recv_qp_num = message->hdr.data.xreq.rem_qp_num;
#endif
}
}
#endif
opal_mutex_unlock (&udep->udep_lock);
item = OBJ_NEW(udcm_message_recv_t);
/* Copy just the message header */
memcpy (&item->msg_hdr, &message->hdr, sizeof (message->hdr));
opal_fifo_push_atomic (&m->cm_recv_msg_fifo, &item->super);
udcm_send_ack (lcl_ep, message->hdr.rem_ctx);
/* Repost the receive */
udcm_module_post_one_recv (m, msg_num);
}
opal_atomic_wmb ();
if (0 == opal_atomic_swap_32 (&m->cm_message_event_active, 1)) {
opal_event_active (&m->cm_message_event, OPAL_EV_READ, 1);
}
return count;
}
static void *udcm_cq_event_dispatch(int fd, int flags, void *context)
{
udcm_module_t *m = (udcm_module_t *) context;
struct ibv_cq *event_cq = m->cm_recv_cq;
void *event_context;
int rc;
opal_mutex_lock (&m->cm_lock);
do {
if (OPAL_UNLIKELY(NULL == m || NULL == m->cm_channel)) {
break;
}
rc = ibv_get_cq_event (m->cm_channel, &event_cq, &event_context);
if (0 != rc || NULL == event_cq) {
break;
}
/* acknowlege the event */
ibv_ack_cq_events (event_cq, 1);
if (m->cm_exiting) {
break;
}
rc = udcm_process_messages (event_cq, m);
if (rc < 0) {
BTL_VERBOSE(("error processing incomming messages"));
break;
}
if (ibv_req_notify_cq(event_cq, 0)) {
BTL_VERBOSE(("error asking for cq notifications"));
}
} while (0);
opal_mutex_unlock (&m->cm_lock);
return NULL;
}
static void *udcm_message_callback (int fd, int flags, void *context)
{
udcm_module_t *m = (udcm_module_t *) context;
udcm_message_recv_t *item;
BTL_VERBOSE(("running message thread"));
/* Mark that the callback was started */
opal_atomic_swap_32 (&m->cm_message_event_active, 0);
opal_atomic_wmb ();
while ((item = (udcm_message_recv_t *) opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) {
mca_btl_openib_endpoint_t *lcl_ep = item->msg_hdr.lcl_ep;
OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock);
switch (item->msg_hdr.type) {
case UDCM_MESSAGE_CONNECT:
udcm_handle_connect (lcl_ep, item->msg_hdr.rem_ep);
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
break;
case UDCM_MESSAGE_REJECT:
udcm_handle_reject (lcl_ep, &item->msg_hdr);
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
break;
case UDCM_MESSAGE_COMPLETE:
udcm_handle_complete (lcl_ep);
break;
#if HAVE_XRC
case UDCM_MESSAGE_XRESPONSE2:
udcm_finish_connection (lcl_ep);
break;
case UDCM_MESSAGE_XRESPONSE:
/* udcm_handle_xresponse will call mca_btl_openib_endpoint_cpc_complete
which will drop the thread lock */
udcm_xrc_handle_xresponse (lcl_ep, &item->msg_hdr);
break;
case UDCM_MESSAGE_XCONNECT:
case UDCM_MESSAGE_XCONNECT2:
udcm_xrc_handle_xconnect (lcl_ep, &item->msg_hdr);
OPAL_THREAD_UNLOCK(&lcl_ep->endpoint_lock);
break;
#endif
default:
BTL_VERBOSE(("unknown message type"));
}
OBJ_RELEASE (item);
}
BTL_VERBOSE(("exiting message thread"));
return NULL;
}
/* mark: udcm_message_sent_t class */
static void udcm_sent_message_constructor (udcm_message_sent_t *message)
{
memset ((char *)message + sizeof (message->super), 0,
sizeof (*message) - sizeof (message->super));
opal_event_evtimer_set(opal_sync_event_base, &message->event, udcm_send_timeout, message);
}
static void udcm_sent_message_destructor (udcm_message_sent_t *message)
{
if (message->data) {
free (message->data);
}
opal_event_evtimer_del (&message->event);
message->event_active = false;
}
/* mark: message timeout code */
/* Message timeouts */
static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg)
{
udcm_message_sent_t *msg = (udcm_message_sent_t *) arg;
mca_btl_base_endpoint_t *lcl_ep = msg->endpoint;
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
opal_mutex_lock (&m->cm_timeout_lock);
opal_list_remove_item (&m->flying_messages, &msg->super);
opal_mutex_unlock (&m->cm_timeout_lock);
if (m->cm_exiting || !msg->event_active) {
/* we are exiting or the event is no longer valid */
OBJ_RELEASE(msg);
return;
}
msg->event_active = false;
do {
BTL_VERBOSE(("send for message to 0x%04x:0x%08x timed out",
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_lid,
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num));
/* This happens from time to time at the end of a run (probably due to a
lost ack on the completion message). */
if (NULL == lcl_ep->endpoint_local_cpc_data ||
MCA_BTL_IB_CONNECTED == lcl_ep->endpoint_state ||
m->cm_exiting) {
OBJ_RELEASE (msg);
break;
}
if (msg->tries == udcm_max_retry) {
opal_output (0, "too many retries sending message to 0x%04x:0x%08x, giving up",
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_lid,
UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num);
/* We are running in the timeout thread. Invoke the error in the
* "main thread" because it may call up into the pml or another
* component that may not have threading support enabled. */
mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep);
break;
}
msg->tries++;
udcm_set_message_timeout (msg);
if (0 != udcm_post_send (lcl_ep, msg->data, msg->length, 0)) {
BTL_VERBOSE(("error reposting message"));
mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep);
break;
}
} while (0);
}
static void udcm_set_message_timeout (udcm_message_sent_t *message)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(message->endpoint);
BTL_VERBOSE(("activating timeout for message %p", (void *) message));
opal_mutex_lock (&m->cm_timeout_lock);
opal_list_append (&m->flying_messages, &message->super);
/* start the event */
opal_event_evtimer_add (&message->event, &udcm_timeout_tv);
message->event_active = true;
opal_mutex_unlock (&m->cm_timeout_lock);
}
static void udcm_free_message (udcm_message_sent_t *message)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(message->endpoint);
BTL_VERBOSE(("releasing message %p", (void *) message));
opal_mutex_lock (&m->cm_timeout_lock);
if (message->event_active) {
opal_list_remove_item (&m->flying_messages, &message->super);
message->event_active = false;
}
opal_mutex_unlock (&m->cm_timeout_lock);
OBJ_RELEASE(message);
}
/* mark: xrc connection support */
/* XRC support functions */
#if HAVE_XRC
static int udcm_xrc_start_connect (opal_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *lcl_ep)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int rc = OPAL_SUCCESS;
opal_mutex_lock (&udep->udep_lock);
opal_mutex_lock (&lcl_ep->ib_addr->addr_lock);
if (OPAL_SUCCESS != (rc = udcm_endpoint_init_data (lcl_ep))) {
BTL_VERBOSE(("error initializing endpoint cpc data"));
opal_mutex_unlock (&udep->udep_lock);
opal_mutex_unlock (&lcl_ep->ib_addr->addr_lock);
return rc;
}
lcl_ep->endpoint_state = MCA_BTL_IB_CONNECTING;
BTL_VERBOSE(("The IB addr: sid %" PRIx64 " lid %d with status %d, "
"subscribing to this address", lcl_ep->ib_addr->subnet_id,
lcl_ep->ib_addr->status, lcl_ep->ib_addr->lid));
switch (lcl_ep->ib_addr->status) {
case MCA_BTL_IB_ADDR_CLOSED:
if (OPAL_SUCCESS != (rc = udcm_xrc_send_qp_create(lcl_ep))) {
break;
}
/* Send connection info over to remote endpoint */
lcl_ep->ib_addr->status = MCA_BTL_IB_ADDR_CONNECTING;
if (OPAL_SUCCESS != (rc = udcm_xrc_send_request (lcl_ep, NULL, UDCM_MESSAGE_XCONNECT))) {
BTL_ERROR(("Error sending connect request, error code %d", rc));
}
break;
case MCA_BTL_IB_ADDR_CONNECTING:
/* somebody already connectng to this machine, lets wait */
opal_list_append(&lcl_ep->ib_addr->pending_ep, &lcl_ep->super);
break;
case MCA_BTL_IB_ADDR_CONNECTED:
/* so we have the send qp, we just need the receive site.
* Send request for SRQ numbers */
if (OPAL_SUCCESS != (rc = udcm_xrc_send_request (lcl_ep, NULL, UDCM_MESSAGE_XCONNECT2))) {
BTL_ERROR(("error sending xrc connect request, error code %d", rc));
}
break;
default:
BTL_ERROR(("Invalid endpoint status %d", lcl_ep->ib_addr->status));
}
opal_mutex_unlock (&lcl_ep->ib_addr->addr_lock);
opal_mutex_unlock (&udep->udep_lock);
return rc;
}
/* In case if XRC recv qp was closed and sender still don't know about it
* we need close the qp, reset the ib_adrr status to CLOSED and start everything
* from scratch.
*/
static int udcm_xrc_restart_connect (mca_btl_base_endpoint_t *lcl_ep)
{
opal_mutex_lock (&lcl_ep->ib_addr->addr_lock);
BTL_VERBOSE(("Restart connection for IB addr: sid %" PRIx64 " lid %d, with status "
"%d, resetting and starting from scratch", lcl_ep->ib_addr->subnet_id,
lcl_ep->ib_addr->lid, lcl_ep->ib_addr->status));
if (MCA_BTL_IB_ADDR_CONNECTED == lcl_ep->ib_addr->status) {
/* so we have the send qp, we just need the recive site.
* Send request for SRQ numbers */
/* Switching back to closed and starting from scratch */
lcl_ep->ib_addr->status = MCA_BTL_IB_ADDR_CLOSED;
/* destroy the qp */
/* the reciver site was alredy closed so all pending list must be clean ! */
assert (opal_list_is_empty(&lcl_ep->qps->no_wqe_pending_frags[0]));
assert (opal_list_is_empty(&lcl_ep->qps->no_wqe_pending_frags[1]));
if (ibv_destroy_qp (lcl_ep->qps[0].qp->lcl_qp))
BTL_ERROR(("Failed to destroy QP. errno %d", errno));
}
opal_mutex_unlock (&lcl_ep->ib_addr->addr_lock);
/* udcm_xrc_start_connect () should automaticly handle all other cases */
return udcm_xrc_start_connect (NULL, lcl_ep);
}
/* mark: xrc send qp */
/* Send qp connect */
static int udcm_xrc_send_qp_connect (mca_btl_openib_endpoint_t *lcl_ep, uint32_t rem_qp_num, uint32_t rem_psn)
{
mca_btl_openib_module_t *openib_btl = lcl_ep->endpoint_btl;
struct ibv_qp_attr attr;
struct ibv_qp *qp;
uint32_t psn;
int ret;
BTL_VERBOSE(("Connecting send qp: %p, remote qp: %d", (void *)lcl_ep->qps[0].qp->lcl_qp,
rem_qp_num));
assert(NULL != lcl_ep->qps);
qp = lcl_ep->qps[0].qp->lcl_qp;
psn = lcl_ep->qps[0].qp->lcl_psn;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = (openib_btl->device->mtu < lcl_ep->rem_info.rem_mtu) ?
openib_btl->device->mtu : lcl_ep->rem_info.rem_mtu;
attr.dest_qp_num = rem_qp_num;
attr.rq_psn = rem_psn;
attr.max_dest_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
attr.min_rnr_timer = mca_btl_openib_component.ib_min_rnr_timer;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = lcl_ep->rem_info.rem_lid;
attr.ah_attr.src_path_bits = openib_btl->src_path_bits;
attr.ah_attr.port_num = openib_btl->port_num;
attr.ah_attr.static_rate = 0;
attr.ah_attr.sl = mca_btl_openib_component.ib_service_level;
#if (ENABLE_DYNAMIC_SL)
/* if user enabled dynamic SL, get it from PathRecord */
if (0 != mca_btl_openib_component.ib_path_record_service_level) {
int rc = btl_openib_connect_get_pathrecord_sl(qp->context,
attr.ah_attr.port_num,
openib_btl->lid,
attr.ah_attr.dlid);
if (OPAL_ERROR == rc) {
return OPAL_ERROR;
}
attr.ah_attr.sl = rc;
}
#endif
if (mca_btl_openib_component.verbose) {
BTL_VERBOSE(("Set MTU to IBV value %d (%s bytes)", attr.path_mtu,
(attr.path_mtu == IBV_MTU_256) ? "256" :
(attr.path_mtu == IBV_MTU_512) ? "512" :
(attr.path_mtu == IBV_MTU_1024) ? "1024" :
(attr.path_mtu == IBV_MTU_2048) ? "2048" :
(attr.path_mtu == IBV_MTU_4096) ? "4096" :
"unknown (!)"));
}
ret = ibv_modify_qp(qp, &attr,
IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER);
if (ret) {
BTL_ERROR(("Error modifying QP[%x] to IBV_QPS_RTR errno says: %s [%d]",
qp->qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
attr.qp_state = IBV_QPS_RTS;
attr.timeout = mca_btl_openib_component.ib_timeout;
attr.retry_cnt = mca_btl_openib_component.ib_retry_count;
attr.rnr_retry = mca_btl_openib_component.ib_rnr_retry;
attr.sq_psn = psn;
attr.max_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
ret = ibv_modify_qp(qp, &attr,
IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
if (ret) {
BTL_ERROR(("Error modifying QP[%x] to IBV_QPS_RTS errno says: %s [%d]",
qp->qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
/* Create XRC send qp */
static int udcm_xrc_send_qp_create (mca_btl_base_endpoint_t *lcl_ep)
{
int prio = BTL_OPENIB_LP_CQ; /* all send completions go to low prio CQ */
uint32_t send_wr;
struct ibv_qp **qp;
uint32_t *psn;
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
struct ibv_qp_init_attr_ex qp_init_attr;
#else
struct ibv_qp_init_attr qp_init_attr;
#endif
struct ibv_qp_attr attr;
int ret;
size_t req_inline;
mca_btl_openib_module_t *openib_btl =
(mca_btl_openib_module_t*)lcl_ep->endpoint_btl;
/* Prepare QP structs */
BTL_VERBOSE(("creating xrc send qp"));
qp = &lcl_ep->qps[0].qp->lcl_qp;
psn = &lcl_ep->qps[0].qp->lcl_psn;
/* reserve additional wr for eager rdma credit management */
send_wr = lcl_ep->ib_addr->max_wqe +
(mca_btl_openib_component.use_eager_rdma ?
mca_btl_openib_component.max_eager_rdma : 0);
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr_ex));
#else
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr));
#endif
memset(&attr, 0, sizeof(struct ibv_qp_attr));
qp_init_attr.send_cq = qp_init_attr.recv_cq = openib_btl->device->ib_cq[prio];
/* if this code is update the code in endpoint_init_qp_xrc may need to
* be updated as well */
/* no need recv queue; receives are posted to srq */
qp_init_attr.cap.max_recv_wr = 0;
qp_init_attr.cap.max_send_wr = send_wr;
qp_init_attr.cap.max_inline_data = req_inline =
openib_btl->device->max_inline_data;
qp_init_attr.cap.max_send_sge = 1;
/* this one is ignored by driver */
qp_init_attr.cap.max_recv_sge = 1; /* we do not use SG list */
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
qp_init_attr.qp_type = IBV_QPT_XRC_SEND;
qp_init_attr.comp_mask = IBV_QP_INIT_ATTR_PD;
qp_init_attr.pd = openib_btl->device->ib_pd;
*qp = ibv_create_qp_ex(openib_btl->device->ib_dev_context, &qp_init_attr);
#else
qp_init_attr.qp_type = IBV_QPT_XRC;
qp_init_attr.xrc_domain = openib_btl->device->xrc_domain;
*qp = ibv_create_qp(openib_btl->device->ib_pd, &qp_init_attr);
#endif
if (NULL == *qp) {
opal_show_help("help-mpi-btl-openib-cpc-base.txt",
"ibv_create_qp failed", true,
opal_process_info.nodename,
ibv_get_device_name(openib_btl->device->ib_dev),
"Reliable connected (XRC)");
return OPAL_ERROR;
}
if (qp_init_attr.cap.max_inline_data < req_inline) {
lcl_ep->qps[0].ib_inline_max = qp_init_attr.cap.max_inline_data;
opal_show_help("help-mpi-btl-openib-cpc-base.txt",
"inline truncated", opal_process_info.nodename,
ibv_get_device_name(openib_btl->device->ib_dev),
openib_btl->port_num,
req_inline, qp_init_attr.cap.max_inline_data);
} else {
lcl_ep->qps[0].ib_inline_max = req_inline;
}
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = openib_btl->pkey_index;
attr.port_num = openib_btl->port_num;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
#if HAVE_DECL_IBV_ATOMIC_HCA
attr.qp_access_flags |= IBV_ACCESS_REMOTE_ATOMIC;
#endif
ret = ibv_modify_qp(*qp, &attr,
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS );
if (ret) {
BTL_ERROR(("Error modifying QP[%x] to IBV_QPS_INIT errno says: %s [%d]",
(*qp)->qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
/* Setup meta data on the endpoint */
*psn = udcm_random () & 0x00ffffff;
/* Now that all the qp's are created locally, post some receive
buffers, setup credits, etc. */
return mca_btl_openib_endpoint_post_recvs(lcl_ep);
}
/* mark: xrc receive qp */
/* Recv qp connect */
static int udcm_xrc_recv_qp_connect (mca_btl_openib_endpoint_t *lcl_ep, uint32_t qp_num)
{
mca_btl_openib_module_t *openib_btl = lcl_ep->endpoint_btl;
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
struct ibv_qp_open_attr attr;
memset(&attr, 0, sizeof(struct ibv_qp_open_attr));
attr.comp_mask = IBV_QP_OPEN_ATTR_NUM | IBV_QP_OPEN_ATTR_XRCD | IBV_QP_OPEN_ATTR_TYPE;
attr.qp_num = qp_num;
attr.qp_type = IBV_QPT_XRC_RECV;
attr.xrcd = openib_btl->device->xrcd;
BTL_VERBOSE(("Connecting Recv QP\n"));
lcl_ep->xrc_recv_qp = ibv_open_qp(openib_btl->device->ib_dev_context, &attr);
if (NULL == lcl_ep->xrc_recv_qp) { /* failed to regester the qp, so it is already die and we should create new one */
/* Return NOT READY !!!*/
BTL_VERBOSE(("Failed to register qp_num: %d, get error: %s (%d)\n. Replying with RNR",
qp_num, strerror(errno), errno));
return OPAL_ERROR;
} else {
BTL_VERBOSE(("Connected to XRC Recv qp [%d]", lcl_ep->xrc_recv_qp->qp_num));
return OPAL_SUCCESS;
}
#else
int ret;
/* silence unused variable warning */
(void) qp_num;
BTL_VERBOSE(("Connecting receive qp: %d", lcl_ep->xrc_recv_qp_num));
ret = ibv_reg_xrc_rcv_qp(openib_btl->device->xrc_domain, lcl_ep->xrc_recv_qp_num);
if (ret) { /* failed to regester the qp, so it is already die and we should create new one */
/* Return NOT READY !!!*/
lcl_ep->xrc_recv_qp_num = 0;
BTL_VERBOSE(("Failed to register qp_num: %d , get error: %s (%d). Replying with RNR",
lcl_ep->xrc_recv_qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
#endif
return OPAL_SUCCESS;
}
/* Recv qp create */
static int udcm_xrc_recv_qp_create (mca_btl_openib_endpoint_t *lcl_ep, uint32_t rem_qp_num, uint32_t rem_psn)
{
mca_btl_openib_module_t* openib_btl = lcl_ep->endpoint_btl;
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
struct ibv_qp_init_attr_ex qp_init_attr;
#else
struct ibv_qp_init_attr qp_init_attr;
#endif
struct ibv_qp_attr attr;
int ret;
BTL_VERBOSE(("creating xrc receive qp"));
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr_ex));
qp_init_attr.qp_type = IBV_QPT_XRC_RECV;
qp_init_attr.comp_mask = IBV_QP_INIT_ATTR_XRCD;
qp_init_attr.xrcd = openib_btl->device->xrcd;
lcl_ep->xrc_recv_qp = ibv_create_qp_ex(openib_btl->device->ib_dev_context,
&qp_init_attr);
if (NULL == lcl_ep->xrc_recv_qp) {
BTL_ERROR(("Error creating XRC recv QP, errno says: %s [%d]",
strerror(errno), errno));
return OPAL_ERROR;
}
#else
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr));
/* Only xrc_domain is required, all other are ignored */
qp_init_attr.xrc_domain = openib_btl->device->xrc_domain;
ret = ibv_create_xrc_rcv_qp(&qp_init_attr, &lcl_ep->xrc_recv_qp_num);
if (ret) {
BTL_ERROR(("Error creating XRC recv QP[%x], errno says: %s [%d]",
lcl_ep->xrc_recv_qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
#endif
memset(&attr, 0, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = openib_btl->pkey_index;
attr.port_num = openib_btl->port_num;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
#if HAVE_DECL_IBV_ATOMIC_HCA
attr.qp_access_flags |= IBV_ACCESS_REMOTE_ATOMIC;
#endif
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
ret = ibv_modify_qp(lcl_ep->xrc_recv_qp,
&attr,
IBV_QP_STATE|
IBV_QP_PKEY_INDEX|
IBV_QP_PORT|
IBV_QP_ACCESS_FLAGS);
if (ret) {
BTL_ERROR(("Error modifying XRC recv QP to IBV_QPS_INIT, errno says: %s [%d]",
strerror(ret), ret));
return OPAL_ERROR;
}
#else
ret = ibv_modify_xrc_rcv_qp(openib_btl->device->xrc_domain,
lcl_ep->xrc_recv_qp_num, &attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
if (ret) {
BTL_ERROR(("Error modifying XRC recv QP[%x] to IBV_QPS_INIT, errno says: %s [%d]",
lcl_ep->xrc_recv_qp_num, strerror(ret), ret));
while(1);
return OPAL_ERROR;
}
#endif
memset(&attr, 0, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = (openib_btl->device->mtu < lcl_ep->rem_info.rem_mtu) ?
openib_btl->device->mtu : lcl_ep->rem_info.rem_mtu;
attr.dest_qp_num = rem_qp_num;
attr.rq_psn = rem_psn;
attr.max_dest_rd_atomic = mca_btl_openib_component.ib_max_rdma_dst_ops;
attr.min_rnr_timer = mca_btl_openib_component.ib_min_rnr_timer;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = lcl_ep->rem_info.rem_lid;
attr.ah_attr.src_path_bits = openib_btl->src_path_bits;
attr.ah_attr.port_num = openib_btl->port_num;
attr.ah_attr.static_rate = 0;
attr.ah_attr.sl = mca_btl_openib_component.ib_service_level;
#if (ENABLE_DYNAMIC_SL)
/* if user enabled dynamic SL, get it from PathRecord */
if (0 != mca_btl_openib_component.ib_path_record_service_level) {
int rc = btl_openib_connect_get_pathrecord_sl(
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
openib_btl->device->xrcd->context,
#else
openib_btl->device->xrc_domain->context,
#endif
attr.ah_attr.port_num,
openib_btl->lid,
attr.ah_attr.dlid);
if (OPAL_ERROR == rc) {
return OPAL_ERROR;
}
attr.ah_attr.sl = rc;
}
#endif
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
ret = ibv_modify_qp(lcl_ep->xrc_recv_qp,
&attr,
IBV_QP_STATE|
IBV_QP_AV|
IBV_QP_PATH_MTU|
IBV_QP_DEST_QPN|
IBV_QP_RQ_PSN|
IBV_QP_MAX_DEST_RD_ATOMIC|
IBV_QP_MIN_RNR_TIMER);
if (ret) {
BTL_ERROR(("Error modifying XRC recv QP to IBV_QPS_RTR, errno says: %s [%d]",
strerror(ret), ret));
return OPAL_ERROR;
}
#else
ret = ibv_modify_xrc_rcv_qp(openib_btl->device->xrc_domain,
lcl_ep->xrc_recv_qp_num,
&attr,
IBV_QP_STATE|
IBV_QP_AV|
IBV_QP_PATH_MTU|
IBV_QP_DEST_QPN|
IBV_QP_RQ_PSN|
IBV_QP_MAX_DEST_RD_ATOMIC|
IBV_QP_MIN_RNR_TIMER);
if (ret) {
BTL_ERROR(("Error modifying XRC recv QP[%x] to IBV_QPS_RTR, errno says: %s [%d]",
lcl_ep->xrc_recv_qp_num, strerror(ret), ret));
return OPAL_ERROR;
}
#endif
if (APM_ENABLED) {
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
mca_btl_openib_load_apm(lcl_ep->xrc_recv_qp, lcl_ep);
#else
mca_btl_openib_load_apm_xrc_rcv(lcl_ep->xrc_recv_qp_num, lcl_ep);
#endif
}
return OPAL_SUCCESS;
}
/* mark: xrc message functions */
static int udcm_xrc_send_request (mca_btl_base_endpoint_t *lcl_ep, mca_btl_base_endpoint_t *rem_ep,
uint8_t msg_type)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
udcm_message_sent_t *msg;
int rc;
assert (UDCM_MESSAGE_XCONNECT == msg_type || UDCM_MESSAGE_XCONNECT2 == msg_type);
BTL_VERBOSE(("sending xrc request for endpoint %p", (void *) lcl_ep));
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, msg_type,
sizeof (udcm_msg_hdr_t), &msg))) {
return rc;
}
msg->data->hdr.data.xreq.rem_ep_index = htonl(lcl_ep->index);
msg->data->hdr.data.xreq.rem_port_num = m->modex.mm_port_num;
msg->data->hdr.data.xreq.rem_name = OPAL_PROC_MY_NAME;
if (UDCM_MESSAGE_XCONNECT == msg_type) {
BTL_VERBOSE(("Sending XConnect with qp: %d, psn: %d", lcl_ep->qps[0].qp->lcl_qp->qp_num,
lcl_ep->qps[0].qp->lcl_psn));
msg->data->hdr.data.xreq.rem_qp_num = htonl(lcl_ep->qps[0].qp->lcl_qp->qp_num);
msg->data->hdr.data.xreq.rem_psn = htonl(lcl_ep->qps[0].qp->lcl_psn);
} else {
BTL_VERBOSE(("Sending XConnect2 with qp: %d", lcl_ep->ib_addr->remote_xrc_rcv_qp_num));
msg->data->hdr.data.xreq.rem_qp_num = htonl(lcl_ep->ib_addr->remote_xrc_rcv_qp_num);
}
if (0 != (rc = udcm_post_send (lcl_ep, msg->data, sizeof (udcm_msg_hdr_t), 0))) {
BTL_VERBOSE(("error posting XREQ"));
udcm_free_message (msg);
return rc;
}
return 0;
}
static int udcm_xrc_send_xresponse (mca_btl_base_endpoint_t *lcl_ep, mca_btl_base_endpoint_t *rem_ep,
uint8_t msg_type)
{
udcm_module_t *m = UDCM_ENDPOINT_MODULE(lcl_ep);
udcm_message_sent_t *msg;
int rc;
assert (UDCM_MESSAGE_XRESPONSE == msg_type || UDCM_MESSAGE_XRESPONSE2 == msg_type);
if (0 != (rc = udcm_new_message (lcl_ep, rem_ep, msg_type, m->msg_length, &msg))) {
return rc;
}
msg->data->hdr.data.xres.rem_ep_index = htonl(lcl_ep->index);
if (UDCM_MESSAGE_XRESPONSE == msg_type) {
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
BTL_VERBOSE(("Sending qp: %d, psn: %d", lcl_ep->xrc_recv_qp->qp_num, lcl_ep->xrc_recv_psn));
msg->data->hdr.data.xres.rem_qp_num = htonl(lcl_ep->xrc_recv_qp->qp_num);
msg->data->hdr.data.xres.rem_psn = htonl(lcl_ep->xrc_recv_psn);
#else
BTL_VERBOSE(("Sending qp: %d, psn: %d", lcl_ep->xrc_recv_qp_num, lcl_ep->xrc_recv_psn));
msg->data->hdr.data.xres.rem_qp_num = htonl(lcl_ep->xrc_recv_qp_num);
msg->data->hdr.data.xres.rem_psn = htonl(lcl_ep->xrc_recv_psn);
#endif
}
for (int i = 0; i < mca_btl_openib_component.num_xrc_qps; ++i) {
#if OPAL_HAVE_CONNECTX_XRC_DOMAINS
uint32_t srq_num;
if (ibv_get_srq_num(lcl_ep->endpoint_btl->qps[i].u.srq_qp.srq, &srq_num)) {
BTL_ERROR(("BTL openib XOOB internal error: can't get srq num"));
}
BTL_VERBOSE(("Sending srq[%d] num = %d", i, srq_num));
msg->data->qps[i].qp_num = htonl(srq_num);
#else
BTL_VERBOSE(("Sending srq[%d] num = %d", i, lcl_ep->endpoint_btl->qps[i].u.srq_qp.srq->xrc_srq_num));
msg->data->qps[i].qp_num = htonl(lcl_ep->endpoint_btl->qps[i].u.srq_qp.srq->xrc_srq_num);
#endif
}
rc = udcm_post_send (lcl_ep, msg->data, m->msg_length, 0);
if (0 != rc) {
BTL_VERBOSE(("error posting complete"));
udcm_free_message (msg);
return rc;
}
return 0;
}
/* mark: xrc message handling */
static int udcm_xrc_handle_xconnect (mca_btl_openib_endpoint_t *lcl_ep, udcm_msg_hdr_t *msg_hdr)
{
udcm_reject_reason_t rej_reason = UDCM_REJ_REMOTE_ERROR;
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int response_type;
int rc = OPAL_ERROR;
/* sanity check on message type */
assert (UDCM_MESSAGE_XCONNECT == msg_hdr->type || UDCM_MESSAGE_XCONNECT2 == msg_hdr->type);
do {
if (NULL == udep) {
break;
}
if (udep->recv_req) {
/* duplicate request */
return OPAL_SUCCESS;
}
udep->recv_req = true;
opal_mutex_lock (&udep->udep_lock);
if (UDCM_MESSAGE_XCONNECT2 == msg_hdr->type) {
response_type = UDCM_MESSAGE_XRESPONSE2;
rc = udcm_xrc_recv_qp_connect (lcl_ep, msg_hdr->data.xreq.rem_qp_num);
if (OPAL_SUCCESS != rc) {
/* return not ready. remote side will retry */
rej_reason = UDCM_REJ_NOT_READY;
break;
}
}
/* prepost receives */
rc = mca_btl_openib_endpoint_post_recvs (lcl_ep);
if (OPAL_SUCCESS != rc) {
break;
}
/* Create local QP's and post receive resources */
if (UDCM_MESSAGE_XCONNECT == msg_hdr->type) {
BTL_VERBOSE(("Initialized QPs, LID = %d", ((mca_btl_openib_module_t *) lcl_ep->endpoint_btl)->lid));
response_type = UDCM_MESSAGE_XRESPONSE;
rc = udcm_xrc_recv_qp_create (lcl_ep, msg_hdr->data.xreq.rem_qp_num, msg_hdr->data.xreq.rem_psn);
if (OPAL_SUCCESS != rc) {
break;
}
}
rc = udcm_xrc_send_xresponse (lcl_ep, msg_hdr->rem_ep, response_type);
if (OPAL_SUCCESS != rc) {
break;
}
opal_mutex_unlock (&udep->udep_lock);
return OPAL_SUCCESS;
} while (0);
if (udep) {
opal_mutex_unlock (&udep->udep_lock);
}
/* Reject the request */
BTL_VERBOSE(("rejecting request for reason %d", rej_reason));
udcm_send_reject (lcl_ep, msg_hdr->rem_ep, rej_reason);
if (OPAL_SUCCESS != rc) {
/* Communicate to the upper layer that the connection on this
endpoint has failed */
mca_btl_openib_endpoint_invoke_error (lcl_ep);
}
return rc;
}
static int udcm_xrc_handle_xresponse (mca_btl_openib_endpoint_t *lcl_ep, udcm_msg_hdr_t *msg_hdr)
{
udcm_endpoint_t *udep = UDCM_ENDPOINT_DATA(lcl_ep);
int rc;
BTL_VERBOSE(("finishing xrc connection for endpoint %p.", (void *) lcl_ep));
/* duplicate message */
if (udep->recv_resp) {
return OPAL_SUCCESS;
}
udep->recv_resp = true;
rc = udcm_xrc_send_qp_connect (lcl_ep, msg_hdr->data.xres.rem_qp_num, msg_hdr->data.xres.rem_psn);
if (OPAL_SUCCESS != rc) {
mca_btl_openib_endpoint_invoke_error (lcl_ep);
}
return udcm_finish_connection (lcl_ep);
}
#endif