5#ifndef CPPPROJCT_COMMUNICATIONFUNCTIONS_H
6#define CPPPROJCT_COMMUNICATIONFUNCTIONS_H
13#include "../macros/macros.h"
15#include "../GraphClasses/GeneralGraph.h"
16#include "../GraphClasses/GraphFunctions.h"
18#include "../Utils/HelperClasses.h"
19#include "../Utils/error.h"
20#include "../Utils/global_standard_messages.h"
21#include "../Utils/msleep.h"
25void build_answer_nodes(
double &answer, ReferenceContainer &REF,
double ix,
int owner,
int &MyNProc);
26void build_answer_edges(
double * answer, ReferenceContainer &REF,
double * ix,
int owner,
int &MyNProc);
29template <
typename SpecificRequestObject>
45 bool sendDouble =
true;
47 typedef typename std::conditional< (field >= 0),
int,
double>::type buffer_type;
48 typedef typename std::conditional< (field >= -2),
double,
double>::type answer_type;
51 void buildSendTag(
int * data);
52 void buildSendTag(
double * data);
54 void buildRecvTag(
int * data){};
55 void buildRecvTag(
double * data){};
58 void computeAnswer(ReferenceContainer &REF,
int * buffer,
double * answer, MPI_Status &S,
int MYPROC);
59 void computeReady(ReferenceContainer &REF,
int * buffer,
bool &isReady);
61 void computeReady(ReferenceContainer &REF,
double * buffer,
bool &isReady);
62 void computeAnswer(ReferenceContainer &REF,
double * buffer,
double * answer, MPI_Status &S,
int MYPROC);
73 sendTag = (int) (*data);
74 }
else if (field == -2) {
75 sendTag = (int) ((
int) (*(data + 1)) + (int) OFFSET);
85 }
else if (field == 1) {
88 }
else if (field == 2) {
91 }
else if (field == -1) {
92 recvTag = VERTEXVAL_REQUEST_FLAG;
94 }
else if (field == -2){
95 recvTag = EDGEVAL_REQUEST_FLAG;
109 }
else if (field == 1) {
114 }
else if (field == 2) {
119 }
else if (field == -1) {
126 }
else if (field == -2) {
142#pragma omp atomic read
143 (*answer) = REF.p_LayHelper->data[*(buffer)].RK1[0];
144 }
else if (field == 1) {
145#pragma omp atomic read
146 (*answer) = REF.p_LayHelper->data[*(buffer)].RK2[0];
147 }
else if (field == 2) {
148#pragma omp atomic read
149 (*answer) = REF.p_LayHelper->data[*(buffer)].RK3[0];
161 build_answer_nodes(*answer, REF, *buffer, S.MPI_SOURCE, MYPROC);
162 }
else if (field == -2) {
163 build_answer_edges(answer, REF, buffer, S.MPI_SOURCE, MYPROC);
174 }
else if (field == -2) {
183#pragma omp atomic read
184 isReady = REF.p_LayHelper->data[*(buffer)].RK1_status;
186 }
else if (field == 1) {
187#pragma omp atomic read
188 isReady = REF.p_LayHelper->data[*(buffer)].RK2_status;
190 }
else if (field == 2) {
191#pragma omp atomic read
192 isReady = REF.p_LayHelper->data[*(buffer)].RK3_status;
200template<
int DT,
int TIMETOL,
int BATCH,
typename RequestClass>
201void generic_answer_requests(ReferenceContainer &REF,
int MYTHR, RequestClass ReqObj){
204 int MYPROC = REF.p_ComHelper->WORLD_RANK[MYTHR];
209 bool firstlap =
true;
212 typename decltype(ReqObj)::buffer_type buffer[ReqObj.recvLength];
213 typename decltype(ReqObj)::answer_type answer[ReqObj.sendLength];
215 ReqObj.buildRecvTag();
219 if ((flag == 1) || firstlap) {
224 MPI_Improbe(MPI_ANY_SOURCE,
231 if (firstlap) firstlap =
false;
232 PRINTF_DBG(
"About to probe! recvTag is %d\n", ReqObj.recvTag);
233 while ((flag != 1) && (t < TIMETOL)) {
234 MPI_Improbe(MPI_ANY_SOURCE,
243 if (t >= TIMETOL) ++TRIES;
246 PRINTF_DBG(
"[GAR] About to receive! recvInt is %d, and recvLength is %d\n", ReqObj.recvInt, ReqObj.recvLength);
258 PRINTF_DBG(
"[GAR] Received successfully!\n");
259 bool isReady =
false;
262 ReqObj.computeReady(REF, &buffer[0], isReady);
267 ReqObj.computeReady(REF, &buffer[0], isReady);
269 PRINTF_DBG(
"[GAR] not ready yet!\n");
272 ReqObj.computeAnswer(REF, &buffer[0], &answer[0], S, MYPROC);
273 ReqObj.buildSendTag(&buffer[0]);
274 PRINTF_DBG(
"[GAR] About to send! sendDouble is %d, ReqObj.sendLength is %d, and ReqObj.sendTag is %d\n", ReqObj.sendDouble, ReqObj.sendLength, ReqObj.sendTag);
275 PRINTF_DBG(
"[GAR] The sent message('s first element) will be %f\n", answer[0]);
276 if (ReqObj.sendDouble){
277 MPI_Ssend(&answer[0],
284 MPI_Ssend(&answer[0],
309template<
int DT,
int TIMETOL,
int BATCH>
310void perform_field_requests(ReferenceContainer &REF,
int MYPROC,
int fieldOrder,std::queue<long> * queue){
312 int ASKING_TAGS[4] = {VERTEXVAL_REQUEST_FLAG, K1_REQUEST, K2_REQUEST, K3_REQUEST};
313 bool keep_working =
true;
318 if (!queue->empty()){
322 keep_working =
false;
325 while (keep_working){
327 L = REF.p_LayHelper->data[ix].RK1.size();
328 }
else if (fieldOrder==3) {
329 L = REF.p_LayHelper->data[ix].RK2.size();
330 }
else if (fieldOrder==4) {
331 L = REF.p_LayHelper->data[ix].RK2.size();
332 }
else if (fieldOrder==1) {
333 assert((*REF.p_IntHelper)[ix].ixMap.size() == (REF.p_LayHelper->data[ix].RK2.size()-1));
334 L = (*REF.p_IntHelper)[ix].ixMap.size() + 1;
336 printf(
"[FATAL] field order requested to perform_field_requests does not exist!\n");std::cout<<std::flush;
340 for (
int i=0; i<L-1 ; ++i){
342 if (((
int) std::get<2>((*REF.p_IntHelper)[ix].ixMap[i])) == MYPROC) {
343 unsigned long owner = std::get<1>((*REF.p_IntHelper)[ix].ixMap[i]);
344 bool isReadyYet =
false;
345 if (fieldOrder == 2) {
346#pragma omp atomic read
347 isReadyYet = REF.p_LayHelper->data[owner].RK1_status;
348 while (!isReadyYet) {
350#pragma omp atomic read
351 isReadyYet = REF.p_LayHelper->data[owner].RK1_status;
353#pragma omp atomic read
354 recvBuffer = REF.p_LayHelper->data[owner].RK1[0];
355 }
else if (fieldOrder == 3) {
356#pragma omp atomic read
357 isReadyYet = REF.p_LayHelper->data[owner].RK2_status;
358 while (!isReadyYet) {
360#pragma omp atomic read
361 isReadyYet = REF.p_LayHelper->data[owner].RK2_status;
363#pragma omp atomic read
364 recvBuffer = REF.p_LayHelper->data[owner].RK2[0];
365 }
else if (fieldOrder == 4) {
366#pragma omp atomic read
367 isReadyYet = REF.p_LayHelper->data[owner].RK3_status;
368 while (!isReadyYet) {
370#pragma omp atomic read
371 isReadyYet = REF.p_LayHelper->data[owner].RK3_status;
373#pragma omp atomic read
374 recvBuffer = REF.p_LayHelper->data[owner].RK3[0];
375 }
else if (fieldOrder == 1) {
376#pragma omp atomic read
377 recvBuffer = (*REF.p_IntHelper)[owner].centralValue;
382 if (fieldOrder != 1){
383 int sendBuffer = (int) std::get<1>((*REF.p_IntHelper)[ix].ixMap[i]);
384 PRINTF_DBG(
"[pfr]!=1 About to send! sendbuffer says %d... asking tag is %d\n", sendBuffer, ASKING_TAGS[fieldOrder-1]);
385 MPI_Ssend(&sendBuffer,
388 (
int) std::get<2>((*REF.p_IntHelper)[ix].ixMap[i]),
389 ASKING_TAGS[fieldOrder-1],
391 PRINTF_DBG(
"Sent successfully!\n");
393 double sendBuffer = (double) std::get<1>((*REF.p_IntHelper)[ix].ixMap[i]);
394 PRINTF_DBG(
"[pfr]==1 About to send! sendbuffer says %f... asking tag is %d\n", sendBuffer, ASKING_TAGS[fieldOrder-1]);
395 MPI_Ssend(&sendBuffer,
398 (
int) std::get<2>((*REF.p_IntHelper)[ix].ixMap[i]),
399 ASKING_TAGS[fieldOrder-1],
401 PRINTF_DBG(
"Sent successfully!\n");
403 PRINTF_DBG(
"[pfr] About to receive! tag is %d\n", (
int) std::get<1>((*REF.p_IntHelper)[ix].ixMap[i]));
404 MPI_Recv(&recvBuffer,
407 (
int) std::get<2>((*REF.p_IntHelper)[ix].ixMap[i]),
408 (
int) std::get<1>((*REF.p_IntHelper)[ix].ixMap[i]),
411 PRINTF_DBG(
"[pfr] recieved %f!\n",recvBuffer);
414 if (fieldOrder == 2) {
415 REF.p_LayHelper->data[ix].RK1[i+1] = recvBuffer;
416 }
else if (fieldOrder == 3) {
417 REF.p_LayHelper->data[ix].RK2[i+1] = recvBuffer;
418 }
else if (fieldOrder == 4) {
419 REF.p_LayHelper->data[ix].RK3[i+1] = recvBuffer;
420 }
else if (fieldOrder == 1) {
421 (*REF.p_IntHelper)[ix].neighborValues[i] = recvBuffer;
426 if (!queue->empty()){
430 keep_working =
false;
437template <
int DT,
int TIMETOL,
int BATCH>
438void perform_requests(
int NNodes,
439 ReferenceContainer REF,
444 int total_processed=0;
447 bool globalstatus =
true;
448 bool ix_update =
false;
451 bool all_sent_locally =
false;
453 int sent_locally = 0;
456 std::list<long> all_indexes_seen;
457 if (BATCH<=0)error_report(min_batch_msg);
458 bool waiting =
false;
460 bool was_last_appearance =
false;
461 int special_index = -1;
462 int rstatus = 0, sstatus = 0;
463 MPI_Request requests_send[BATCH] = {MPI_REQUEST_NULL};
464 int fsend[BATCH] = {0};
465 int frecv[BATCH] = {0};
466 int areRecv[BATCH] = {0};
467 int myProbes[BATCH] = {0};
468 MPI_Request requests_recv[BATCH] = {MPI_REQUEST_NULL};
469 InfoVecElem results[BATCH];
470 std::queue<int> QAvailable;
471 std::list<int> QPend;
472 for (
int i = 0; i < BATCH; ++i){
475 double vval[BATCH], their_vix[BATCH];
476 double vval2[BATCH][2], their_vix2[BATCH][2];
480 int status_rstatus=1;
481 int counter=0,MAX_TRIES=3;
482 bool NONBLOCKING =
true;
483 bool HANDSHAKE =
false;
485#pragma omp atomic read
486 atomic_helper = *(REF.p_TOT);
487 globalstatus = (atomic_helper < NNodes);
489 while (globalstatus) {
492 auto itBeg = REF.p_ParHelper->data[ix].MissingA.begin();
493 auto itEnd = REF.p_ParHelper->data[ix].MissingA.end();
497 int localcounter = 0;
499 for (
auto _it= itBeg; _it != itEnd; ++_it) {
501 for (
auto it = thread.begin(); it != thread.end(); it++) {
506 for (
auto _it= itBeg; _it != itEnd; ++_it) {
508 auto it = thread.begin();
509 while (it != thread.end()) {
512 owner[QAvailable.front()] = std::get<1>(*it);
513 their_vix[QAvailable.front()] = (double) std::get<2>(*it);
514 results[QAvailable.front()] = std::make_tuple((
double) 0,
515 (
double) std::get<0>(*it),
516 (
unsigned long) (((
unsigned long) owner[QAvailable.front()]) * N + (
unsigned long) their_vix[QAvailable.front()] ),
517 (
unsigned long) std::get<2>(*it),
518 (
unsigned long) std::get<1>(*it));
519 PRINTF_DBG(
"[PR] About to ask for one node!\n");std::cout<<std::flush;
520 MPI_Ssend(&their_vix[QAvailable.front()],
523 owner[QAvailable.front()],
524 VERTEXVAL_REQUEST_FLAG, MPI_COMM_WORLD);
525 PRINTF_DBG(
"[PR] Asked!\n");std::cout<<std::flush;
527 PRINTF_DBG(
"[PR] About to recv!\n");std::cout<<std::flush;
528 MPI_Recv(&vval[QAvailable.front()],
531 owner[QAvailable.front()],
532 (
int) their_vix[QAvailable.front()],
535 PRINTF_DBG(
"[PR] Correctly recieved!\n");std::cout<<std::flush;
538 QPend.push_back(QAvailable.front());
541 if ((QAvailable.empty()) || (localcounter == tot_locals)) {
543 if (localcounter == tot_locals){
546 target_size = BATCH-1;
548 while (QPend.size() != target_size) {
549 auto i = QPend.begin();
550 while (i != QPend.end()) {
551 std::get<0>(results[*i]) = vval[*i];
553 (*REF.p_IntHelper)[special_index].ResultsPendProcess.push_back(results[*i]);
563 auto itBegE = REF.p_ParHelper->data[ix].MissingB.begin();
564 auto itEndE = REF.p_ParHelper->data[ix].MissingB.end();
570 for (
auto _it= itBegE; _it != itEndE; ++_it) {
572 for (
auto it = thread.begin(); it != thread.end(); it++) {
579 for (
auto _it= itBegE; _it != itEndE; ++_it) {
581 auto it = thread.begin();
582 while (it != thread.end()) {
586 owner[QAvailable.front()] = (int) std::get<1>(*it);
587 their_vix2[QAvailable.front()][0] = (double) std::get<0>(*it);
588 their_vix2[QAvailable.front()][1] = (double) std::get<2>(*it);
590 results[QAvailable.front()] = std::make_tuple(0.0,
592 (
unsigned long) (((
unsigned long) owner[QAvailable.front()]) * N + (
unsigned long) their_vix2[QAvailable.front()][1] ),
593 (
unsigned long) std::get<2>(*it),
594 (
unsigned long) std::get<1>(*it));
597 PRINTF_DBG(
"[PR] About to ask for one node and edge!\n");std::cout<<std::flush;
598 PRINTF_DBG(
"About to send ixs %f and %f\n",their_vix2[QAvailable.front()][0], their_vix2[QAvailable.front()][1]);std::cout<<std::flush;
599 MPI_Ssend(&their_vix2[QAvailable.front()][0],
602 owner[QAvailable.front()],
603 EDGEVAL_REQUEST_FLAG, MPI_COMM_WORLD);
604 PRINTF_DBG(
"[PR] Asked!\n");std::cout<<std::flush;
606 PRINTF_DBG(
"[PR] About to recv!\n");std::cout<<std::flush;
607 MPI_Recv(&vval2[QAvailable.front()][0],
610 owner[QAvailable.front()],
611 (
int) (
int) (OFFSET + (
int) their_vix2[QAvailable.front()][1]),
614 PRINTF_DBG(
"[PR] Correctly recieved!\n");std::cout<<std::flush;
616 QPend.push_back(QAvailable.front());
619 if ((QAvailable.empty()) || (localcounter == tot_locals)) {
621 if (localcounter == tot_locals){
624 target_size = BATCH-1;
627 while (QPend.size() != target_size) {
628 auto i = QPend.begin();
629 while (i != QPend.end()) {
630 std::get<0>(results[*i]) = vval2[*i][0];
631 std::get<1>(results[*i]) = vval2[*i][1];
633 (*REF.p_IntHelper)[special_index].ResultsPendProcess.push_back(results[*i]);
647 (*REF.p_READY_FOR_INTEGRATION).first.push(ix);
648 (*REF.p_READY_FOR_INTEGRATION).second.push(uix);
661 if (!(*REF.p_CHECKED).first.empty()) {
662 ix = (*REF.p_CHECKED).first.front();
663 uix = (*REF.p_CHECKED).second.front();
664 (*REF.p_CHECKED).first.pop();
665 (*REF.p_CHECKED).second.pop();
670 if (total_processed != 0) {
671#pragma omp atomic update
672 *(REF.p_TOT) += total_processed;
677#pragma omp atomic read
678 atomic_helper = *(REF.p_TOT);
679 globalstatus = (atomic_helper < NNodes);
681 PRINTF_DBG(
"TOT=%d, NNodes=%d, ix_update=%d, total_processed=%d, ix=%d\n",
682 atomic_helper, NNodes, ix_update, total_processed, ix);std::cout<<std::flush;
683 if (globalstatus && (!ix_update)) mssleep(DT);
685 PRINTF_DBG(
"Final termination of perform_requests :-)\n");std::cout<<std::flush;
Definition: CommunicationFunctions.h:38
Definition: CommunicationFunctions.h:30