#ifdef INFINI_USE_CNCL #include "bang/bang_runtime.h" #include "bang/cncl_communicator.h" #include "core/graph.h" #include "core/runtime.h" #include "operators/all_gather.h" #include "test.h" #include #include static int WORLD_SIZE = 2; namespace infini { void allGather(const string taskName, int deviceID, vector data, vector> ans) { // Create Runtimes and initiate communication Runtime cpuRuntime = NativeCpuRuntimeObj::getInstance(); Runtime bangRuntime = make_ref(deviceID); bangRuntime->initComm(taskName, WORLD_SIZE, deviceID); // Create Graph and insert allReduce operation Graph g = make_ref(bangRuntime); auto input = g->addTensor(Shape{static_cast(data.size())}, DataType::Float32); auto op = g->addOp(input, std::nullopt, WORLD_SIZE); // Copy data from CPU to MLU g->dataMalloc(); input->copyin(data); // Run operation bangRuntime->run(g); // Copy output from MLU to CPU for (int i = 0; i < WORLD_SIZE; ++i) { auto result = op->getOutputs()[i]->clone(cpuRuntime); EXPECT_TRUE(result->equalData(ans[i])); } } TEST(BANG_AllGather, run) { vector data[2] = {{2., 3.}, {5., 6.}}; vector> ans = {{2., 3.}, {5., 6.}}; for (int i = 0; i < WORLD_SIZE; ++i) { pid_t pid = fork(); if (pid == 0) { // Child process allGather("test_all_gather", i, data[i], ans); exit(0); // Ensure child process exits to avoid unnecessary // repetition in parent } else if (pid < 0) { std::cerr << "Error creating process" << std::endl; } } // Wait for all child processes to finish for (int i = 0; i < WORLD_SIZE; ++i) { wait(NULL); } } } // namespace infini #endif