Skip to content

Commit

Permalink
sessions: add support for ucx more
Browse files Browse the repository at this point in the history
Greatly simplify support for MPI_Comm_create_from_group and
MPI_Intercomm_create_from_group by removing the need to support
the 128-bit excid notion.

Instead, make use of a PMIx capability - PMIX_GROUP_LOCAL_CID and the notion of
PMIX_GROUP_INFO. This capability was introduced in Open PMIx 4.1.3.
This capability allows us to piggy-back a local cid selected
for the new communicator on the PMIx_Group_construct operation.
Using this approach, a lot of the complex active message style operations
implemented in the OB1 PML to support excids can be avoided.

This PR also includes simplifications to the OFI MTL to make use of the
PMIX_GROUP_LOCAL_CID feature.

Infrastructure for debugging communicator management routines was also
introduced, along with a new MCA parameter -  mpi_comm_verbose.

Related to #12566

Signed-off-by: Howard Pritchard <[email protected]>
  • Loading branch information
hppritcha committed Aug 1, 2024
1 parent 20b900e commit 7f17065
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 382 deletions.
14 changes: 2 additions & 12 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1509,9 +1509,6 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request)

static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request)
{
ompi_comm_idup_with_info_context_t *context =
(ompi_comm_idup_with_info_context_t *) request->context;

/* done */
return MPI_SUCCESS;
}
Expand Down Expand Up @@ -1741,7 +1738,7 @@ int ompi_intercomm_create_from_groups (ompi_group_t *local_group, int local_lead
ompi_communicator_t **newintercomm)
{
ompi_communicator_t *newcomp = NULL, *local_comm, *leader_comm = MPI_COMM_NULL;
ompi_comm_extended_cid_block_t new_block;
ompi_comm_extended_cid_block_t new_block = {{0}};
bool i_am_leader = local_leader == local_group->grp_my_rank;
ompi_proc_t **rprocs;
uint64_t data[4];
Expand Down Expand Up @@ -1867,14 +1864,7 @@ int ompi_intercomm_create_from_groups (ompi_group_t *local_group, int local_lead
return rc;
}

/* will be using a communicator ID derived from the bridge communicator to save some time */
new_block.block_cid.cid_base = data[1];
new_block.block_cid.cid_sub.u64 = data[2];
new_block.block_nextsub = 0;
new_block.block_nexttag = 0;
new_block.block_level = (int8_t) data[3];

rc = ompi_comm_nextcid (newcomp, NULL, NULL, (void *) tag, &new_block, false, OMPI_COMM_CID_GROUP_NEW);
rc = ompi_comm_nextcid (newcomp, NULL, NULL, (void *) tag, NULL, false, OMPI_COMM_CID_GROUP_NEW);
if ( OMPI_SUCCESS != rc ) {
OBJ_RELEASE(newcomp);
return rc;
Expand Down
178 changes: 152 additions & 26 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,22 +310,41 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
const void *arg0, const void *arg1, bool send_first, int mode,
ompi_request_t **req)
{
pmix_info_t pinfo, *results = NULL;
pmix_info_t *pinfo, *results = NULL;
size_t nresults;
opal_process_name_t *name_array = NULL;
char *tag = NULL;
size_t proc_count;
size_t cid_base = 0;
opal_process_name_t *name_array, *rname_array, *tmp_name_array;
bool cid_base_set = false;
char *tag = NULL;
size_t proc_count, rproc_count, cid_base = 0UL, ninfo;
int rc, leader_rank;
int ret = OMPI_SUCCESS;
pmix_proc_t *procs = NULL;
pmix_proc_t *procs;
void *grpinfo = NULL, *list = NULL;
pmix_data_array_t darray;
char tmp[PMIX_MAX_KEYLEN];

rc = ompi_group_to_proc_name_array (newcomm->c_local_group, &name_array, &proc_count);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}

if ( OMPI_COMM_IS_INTER (newcomm) ){
rc = ompi_group_to_proc_name_array (newcomm->c_remote_group, &rname_array, &rproc_count);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
free (name_array);
return rc;
}
tmp_name_array = (opal_process_name_t *)realloc(name_array, (proc_count + rproc_count) * sizeof(opal_process_name_t));
if (NULL == tmp) {
free(name_array);
free(rname_array);
return OMPI_ERR_OUT_OF_RESOURCE ;
}
name_array = tmp_name_array;
memcpy(&name_array[proc_count], rname_array, rproc_count * sizeof(opal_process_name_t));
proc_count += rproc_count;
free(rname_array);
}

switch (mode) {
case OMPI_COMM_CID_GROUP_NEW:
tag = (char *) arg0;
Expand All @@ -341,15 +360,58 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
break;
}

PMIX_INFO_LOAD(&pinfo, PMIX_GROUP_ASSIGN_CONTEXT_ID, NULL, PMIX_BOOL);
grpinfo = PMIx_Info_list_start();
if (NULL == grpinfo) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}

rc = PMIx_Info_list_add(grpinfo, PMIX_GROUP_ASSIGN_CONTEXT_ID, NULL, PMIX_BOOL);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__));
return OMPI_ERR_OUT_OF_RESOURCE ;
}

list = PMIx_Info_list_start();

size_t c_index = (size_t)newcomm->c_index;
rc = PMIx_Info_list_add(list, PMIX_GROUP_LOCAL_CID, &c_index, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__));
return OMPI_ERR_OUT_OF_RESOURCE ;
}

rc = PMIx_Info_list_convert(list, &darray);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_convert failed %s %d", PMIx_Error_string(rc), __LINE__));
return OMPI_ERR_OUT_OF_RESOURCE ;
}
rc = PMIx_Info_list_add(grpinfo, PMIX_GROUP_INFO, &darray, PMIX_DATA_ARRAY);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__));
return OMPI_ERR_OUT_OF_RESOURCE ;
}
PMIx_Info_list_release(list);
PMIX_DATA_ARRAY_DESTRUCT(&darray);


rc = PMIx_Info_list_convert(grpinfo, &darray);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_convert failed %s %d", PMIx_Error_string(rc), __LINE__));
return OMPI_ERR_OUT_OF_RESOURCE ;
}

pinfo = (pmix_info_t*)darray.array;
ninfo = darray.size;
PMIx_Info_list_release(grpinfo);

PMIX_PROC_CREATE(procs, proc_count);
for (size_t i = 0 ; i < proc_count; ++i) {
OPAL_PMIX_CONVERT_NAME(&procs[i],&name_array[i]);
}

rc = PMIx_Group_construct(tag, procs, proc_count, &pinfo, 1, &results, &nresults);
PMIX_INFO_DESTRUCT(&pinfo);
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "calling PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n", tag, proc_count, ninfo, cid_base));
rc = PMIx_Group_construct(tag, procs, proc_count, pinfo, ninfo, &results, &nresults);
PMIX_DATA_ARRAY_DESTRUCT(&darray);
if(PMIX_SUCCESS != rc) {
char msg_string[1024];
switch (rc) {
Expand All @@ -361,7 +423,7 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
"MPI_Comm_create_from_group/MPI_Intercomm_create_from_groups",
msg_string);

ret = MPI_ERR_UNSUPPORTED_OPERATION;
rc = MPI_ERR_UNSUPPORTED_OPERATION;
break;
case PMIX_ERR_NOT_SUPPORTED:
sprintf(msg_string,"PMIx server does not support PMIx Group operations");
Expand All @@ -370,10 +432,10 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
true,
"MPI_Comm_create_from_group/MPI_Intercomm_create_from_groups",
msg_string);
ret = MPI_ERR_UNSUPPORTED_OPERATION;
rc = MPI_ERR_UNSUPPORTED_OPERATION;
break;
default:
ret = opal_pmix_convert_status(rc);
rc = opal_pmix_convert_status(rc);
break;
}
goto fn_exit;
Expand All @@ -383,23 +445,27 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
if (PMIX_CHECK_KEY(&results[i], PMIX_GROUP_CONTEXT_ID)) {
PMIX_VALUE_GET_NUMBER(rc, &results[i].value, cid_base, size_t);
if(PMIX_SUCCESS != rc) {
ret = opal_pmix_convert_status(rc);
rc = opal_pmix_convert_status(rc);
goto fn_exit;
}
cid_base_set = true;
break;
}
}

OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n", tag, proc_count, ninfo, cid_base));

/* destruct the group */
rc = PMIx_Group_destruct (tag, NULL, 0);
if(PMIX_SUCCESS != rc) {
ret = opal_pmix_convert_status(rc);
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_destruct failed %s", PMIx_Error_string(rc)));
rc = opal_pmix_convert_status(rc);
goto fn_exit;
}

if (!cid_base_set) {
opal_show_help("help-comm.txt", "cid-base-not-set", true);
ret = OMPI_ERROR;
rc = OMPI_ERROR;
goto fn_exit;
}

Expand All @@ -421,7 +487,7 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
name_array = NULL;
}

return ret;
return rc;
}

static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
Expand All @@ -446,6 +512,15 @@ static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communic
block = &comm->c_contextidb;
}

for (unsigned int i = ompi_mpi_communicators.lowest_free ; i < mca_pml.pml_max_contextid ; ++i) {
bool flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, newcomm);
if (true == flag) {
newcomm->c_index = i;
break;
}
}
assert(newcomm->c_index > 2);

if (NULL == arg1) {
if (OMPI_COMM_CID_GROUP == mode || OMPI_COMM_CID_GROUP_NEW == mode ||
!ompi_comm_extended_cid_block_available (&comm->c_contextidb)) {
Expand All @@ -464,18 +539,11 @@ static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communic
is_new_block = true;
}


if (block != &newcomm->c_contextidb) {
(void) ompi_comm_extended_cid_block_new (block, &newcomm->c_contextidb, is_new_block);
}

for (unsigned int i = ompi_mpi_communicators.lowest_free ; i < mca_pml.pml_max_contextid ; ++i) {
bool flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, newcomm);
if (true == flag) {
newcomm->c_index = i;
break;
}
}

newcomm->c_contextid = newcomm->c_contextidb.block_cid;

opal_hash_table_set_value_ptr (&ompi_comm_hash, &newcomm->c_contextid,
Expand All @@ -498,7 +566,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com

/* old CID algorighm */

/* if we got here and comm is NULL then that means the app is invoking MPI-4 Sessions or later
/* if we got here and comm is NULL then that means the app is invoking MPI-4 Sessions or later
functions but the pml does not support these functions so return not supported */
if (NULL == comm) {
char msg_string[1024];
Expand Down Expand Up @@ -963,6 +1031,64 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm
return rc;
}

int ompi_comm_get_remote_cid (ompi_communicator_t *comm, int dest, uint32_t *remote_cid)
{
ompi_proc_t *ompi_proc;
pmix_proc_t pmix_proc;
pmix_info_t tinfo[2];
pmix_value_t *val = NULL;
ompi_comm_extended_cid_t excid;
int rc = OMPI_SUCCESS;
size_t remote_cid64;

assert(NULL != remote_cid);

if (OMPI_COMM_IS_GLOBAL_INDEX(comm)) {
*remote_cid = comm->c_index;
} else {
ompi_proc = ompi_comm_peer_lookup(comm, dest);
OPAL_PMIX_CONVERT_NAME(&pmix_proc, &ompi_proc->super.proc_name);

PMIx_Info_construct(&tinfo[0]);
PMIX_INFO_LOAD(&tinfo[0], PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32);

excid = ompi_comm_get_extended_cid (comm);

PMIX_INFO_CONSTRUCT(&tinfo[1]);
PMIX_INFO_LOAD(&tinfo[1], PMIX_GROUP_CONTEXT_ID, &excid.cid_base, PMIX_SIZE);
PMIX_INFO_SET_QUALIFIER(&tinfo[1]);
if (PMIX_SUCCESS != (rc = PMIx_Get(&pmix_proc, PMIX_GROUP_LOCAL_CID, tinfo, 2, &val))) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID cid_base %ld %s", excid.cid_base, PMIx_Error_string(rc)));
}

if (NULL == val) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID val returned NULL"));
rc = OMPI_ERR_NOT_FOUND;
goto done;
}

if (val->type != PMIX_SIZE) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID type mismatch"));
rc = OMPI_ERR_TYPE_MISMATCH;
goto done;
}

if (PMIX_SUCCESS == rc) {
PMIX_VALUE_GET_NUMBER(rc, val, remote_cid64, size_t);
rc = OMPI_SUCCESS;
*remote_cid = (uint32_t)remote_cid64;
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get PMIX_GROUP_LOCAL_CID %d for cid_base %ld", *remote_cid, excid.cid_base));
}
}

done:
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}

return rc;
}

static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
Expand Down
16 changes: 15 additions & 1 deletion ompi/communicator/comm_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2018-2022 Triad National Security, LLC. All rights
* Copyright (c) 2018-2024 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2023 NVIDIA Corporation. All rights reserved.
Expand Down Expand Up @@ -69,6 +69,8 @@ ompi_predefined_communicator_t ompi_mpi_comm_self = {{{{0}}}};
ompi_predefined_communicator_t ompi_mpi_comm_null = {{{{0}}}};
ompi_communicator_t *ompi_mpi_comm_parent = NULL;

int ompi_comm_output = -1;

static bool ompi_comm_intrinsic_init;

ompi_predefined_communicator_t *ompi_mpi_comm_world_addr =
Expand Down Expand Up @@ -97,6 +99,14 @@ static int ompi_comm_finalize (void);
*/
int ompi_comm_init(void)
{

/* create output stream */

if (ompi_comm_output == -1) {
ompi_comm_output = opal_output_open(NULL);
opal_output_set_verbosity(ompi_comm_output, ompi_comm_verbose_level);
}

/* Setup communicator array */
OBJ_CONSTRUCT(&ompi_mpi_communicators, opal_pointer_array_t);
if( OPAL_SUCCESS != opal_pointer_array_init(&ompi_mpi_communicators, 16,
Expand Down Expand Up @@ -392,6 +402,10 @@ static int ompi_comm_finalize (void)
/* finalize communicator requests */
ompi_comm_request_fini ();

/* close output stream */

opal_output_close(ompi_comm_output);

/* release a reference to the attributes subsys */
return ompi_attr_put_ref();
}
Expand Down
9 changes: 9 additions & 0 deletions ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ OMPI_DECLSPEC extern opal_hash_table_t ompi_comm_hash;
OMPI_DECLSPEC extern opal_pointer_array_t ompi_mpi_communicators;
OMPI_DECLSPEC extern opal_pointer_array_t ompi_comm_f_to_c_table;

OMPI_DECLSPEC extern int ompi_comm_output;

struct ompi_comm_extended_cid_t {
uint64_t cid_base;
union {
Expand Down Expand Up @@ -614,6 +616,13 @@ static inline struct ompi_proc_t* ompi_comm_peer_lookup (const ompi_communicator
return ompi_group_peer_lookup(comm->c_remote_group,peer_id);
}

static inline bool ompi_comm_instances_same(const ompi_communicator_t *comm1, const ompi_communicator_t *comm2)
{
return comm1->instance == comm2->instance;
}

int ompi_comm_get_remote_cid (ompi_communicator_t *comm, int dest, uint32_t *remote_cid);

#if OPAL_ENABLE_FT_MPI
/*
* Support for MPI_ANY_SOURCE point-to-point operations
Expand Down
Loading

0 comments on commit 7f17065

Please sign in to comment.