5#ifndef CPPPROJCT_GRAPHFUNCTIONS_H
6#define CPPPROJCT_GRAPHFUNCTIONS_H
10#include <boost/mpi/environment.hpp>
12#include "GeneralGraph.h"
14#include "../macros/macros.h"
16#include "../Communication/CommunicationFunctions.h"
18#include "../Solvers/GeneralSolver.h"
20#include "../Utils/adequate_synchronization.h"
21#include "../Utils/memory_management.h"
22#include "../Utils/msleep.h"
23#include "../Utils/error.h"
24#include "../Utils/HelperClasses.h"
25#include "../Utils/display_vectors.h"
31template<
int DT,
int TIMETOL,
int BATCH,
typename RequestClass>
32void generic_answer_requests(ReferenceContainer &REF,
int MYTHR, RequestClass ReqObj);
38template <
int DT,
int TIMETOL,
int BATCH>
39void perform_requests(
int NNodes,
40 ReferenceContainer REF,
43void register_to_value(Graph &g);
44void destroyRequestWithoutCounter(MPI_Request &R);
45void freeRequestWithoutCounter(MPI_Request &R);
46template<
int DT,
int TIMETOL,
int BATCH>
47void perform_field_requests(ReferenceContainer &REF,
int MYPROC,
int fieldOrder,std::queue<long> * queue);
48void update_neighbor_values(ReferenceContainer &REF);
63template<
typename DIFFEQ,
typename SOLVER>
65 PRINTF_DBG(
"Starting the contribution to int\n");std::cout<< std::flush;
70 auto vs = vertices(*REF.p_g);
71 auto start = vs.first;
75 bool keepGoing =
true;
80 unsigned long uix, currentuix;
82 std::vector<InfoVecElem> temp;
85 remaining = *(REF.p_PENDING_INT);
92 if (!REF.p_READY_FOR_INTEGRATION->second.empty()){
93 ix = REF.p_READY_FOR_INTEGRATION->first.front();
94 uix = REF.p_READY_FOR_INTEGRATION->second.front();
95 REF.p_READY_FOR_INTEGRATION->first.pop();
96 REF.p_READY_FOR_INTEGRATION->second.pop();
104 PRINTF_DBG(
"(NOTHING TO INT)\n");std::cout<< std::flush;
107 PRINTF_DBG(
"starting to locate this uix: %lu\n",uix);
108 v = vertices(*REF.p_g).first;
111 if (get(get(boost::vertex_owner, *(REF.p_g)), *v) == MYPROC) {
112 currentuix = (
unsigned long) get(get(boost::vertex_index, *(REF.p_g)), *v);
113 if (currentuix == uix) {
120 std::cout <<
"[CRITICAL] Failed to found ix!!!" << std::endl;
124 temp = std::vector<InfoVecElem>();
125 temp.resize((*REF.p_IntHelper)[ix].ResultsPendProcess.size());
127 auto it = (*REF.p_IntHelper)[ix].ResultsPendProcess.begin();
128 auto end = (*REF.p_IntHelper)[ix].ResultsPendProcess.end();
129 int oldsize = (*REF.p_IntHelper)[ix].ResultsPendProcess.size();
130 std::set<unsigned long> seen;
132 unsigned long elemix = std::get<2>(*it);
133 const bool is_in = seen.find(elemix) != seen.end();
134 PRINTF_DBG(
"Std output uix: %lu ix: %ld proc: %d. vals are %f , %f , %lu\n",
136 std::get<0>(*it), std::get<1>(*it), std::get<2>(*it));
137 std::cout<<std::flush;
144 (*REF.p_IntHelper)[ix].ResultsPendProcess.erase(it++);
147 std::sort(temp.begin(),
149 [](InfoVecElem &t1, InfoVecElem &t2) {
150 return std::get<2>(t1) > std::get<2>(t2);
153 PRINTF_DBG(
"Finished sorting! :-)\n");
154 (*REF.p_IntHelper)[ix].neighborValues.resize(temp.size());
155 (*REF.p_IntHelper)[ix].edgeValues.resize(temp.size());
156 for (
int j=0; j<temp.size(); ++j){
157 (*REF.p_IntHelper)[ix].neighborValues[j] = std::get<0>(temp[j]);
158 (*REF.p_IntHelper)[ix].edgeValues[j] = std::get<1>(temp[j]);
159 std::get<0>((*REF.p_IntHelper)[ix].ixMap[j]) = (
unsigned long) std::get<2>(temp[j]);
160 std::get<1>((*REF.p_IntHelper)[ix].ixMap[j]) = (
unsigned long) std::get<3>(temp[j]);
161 std::get<2>((*REF.p_IntHelper)[ix].ixMap[j]) = (
unsigned long) std::get<4>(temp[j]);
162 PRINTF_DBG(
"temp[j] would be integerisized to %d %d %d\n",
163 (
int)((
unsigned long) std::get<2>(temp[j])),
164 (
int)((
unsigned long) std::get<3>(temp[j])),
165 (
int)((
unsigned long) std::get<4>(temp[j]))
169 (*REF.p_g)[*v].value,
170 (*REF.p_g)[*v].params,
171 (*REF.p_IntHelper)[ix].neighborValues,
172 (*REF.p_IntHelper)[ix].edgeValues,
173 REF.p_LayHelper->data[ix].RK1
175 REF.p_LayHelper->data[ix].RK1_status =
true;
178 (*REF.p_g)[*v].temporal_register = 0;
183 remaining = REF.p_READY_FOR_INTEGRATION->first.size();
186 PRINTF_DBG(
"There are %d remaining vals to integrate...\n", remaining);std::cout<<std::flush;
189 keepGoing = (remaining != 0);
191 PRINTF_DBG(
"In total this thread of PROC %d performed %d laps!\n",MYPROC, totlaps);std::cout<<std::flush;
195template<
typename DIFFEQ,
typename SOLVER,
int BATCH>
196void contribute_to_higher_integration(ReferenceContainer &REF,
199 int N = REF.p_LayHelper->data.size();
200 auto vs = vertices(*REF.p_g);
201#pragma omp parallel firstprivate(N, REF, vs, solver)
203 int Nthreads = (int) omp_get_num_threads();
204 int myThread = (int) omp_get_thread_num();
205 int begin = N/Nthreads * myThread;
206 int end = N/Nthreads * (myThread + 1);
207 if (myThread + 1 == Nthreads) end += N%Nthreads;
208 for (
auto v = vs.first + begin; v != vs.first + end; ++v){
209 long ix = get(get(boost::vertex_index, *(REF.p_g)), *v);
224 solver.Term2((*REF.p_IntHelper)[ix].centralValue,
225 (*REF.p_IntHelper)[ix].centralParams,
226 (*REF.p_IntHelper)[ix].neighborValues,
227 (*REF.p_IntHelper)[ix].edgeValues,
228 REF.p_LayHelper->data[ix].RK1,
229 REF.p_LayHelper->data[ix].RK2);
230 REF.p_LayHelper->data[ix].RK2_status =
true;
231 PRINTF_DBG(
"RK2 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK2[0]);
232 }
else if (fieldNum == 3) {
233 solver.Term3((*REF.p_IntHelper)[ix].centralValue,
234 (*REF.p_IntHelper)[ix].centralParams,
235 (*REF.p_IntHelper)[ix].neighborValues,
236 (*REF.p_IntHelper)[ix].edgeValues,
237 REF.p_LayHelper->data[ix].RK1,
238 REF.p_LayHelper->data[ix].RK2,
239 REF.p_LayHelper->data[ix].RK3);
240 REF.p_LayHelper->data[ix].RK3_status =
true;
241 PRINTF_DBG(
"RK3 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK3[0]);
242 }
else if (fieldNum == 4){
243 solver.Term4((*REF.p_IntHelper)[ix].centralValue,
244 (*REF.p_IntHelper)[ix].centralParams,
245 (*REF.p_IntHelper)[ix].neighborValues,
246 (*REF.p_IntHelper)[ix].edgeValues,
247 REF.p_LayHelper->data[ix].RK1,
248 REF.p_LayHelper->data[ix].RK2,
249 REF.p_LayHelper->data[ix].RK3,
250 REF.p_LayHelper->data[ix].RK4);
251 REF.p_LayHelper->data[ix].RK4_status =
true;
252 PRINTF_DBG(
"RK4 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK4[0]);
253 }
else if (fieldNum == 1){
254 solver.Term1((*REF.p_IntHelper)[ix].centralValue,
255 (*REF.p_IntHelper)[ix].centralParams,
256 (*REF.p_IntHelper)[ix].neighborValues,
257 (*REF.p_IntHelper)[ix].edgeValues,
258 REF.p_LayHelper->data[ix].RK1);
259 REF.p_LayHelper->data[ix].RK1_status =
true;
260 PRINTF_DBG(
"RK1 with ix %ld correctly computed! it yielded %f and central value was %f\n", ix, REF.p_LayHelper->data[ix].RK1[0], (*REF.p_IntHelper)[ix].centralValue);
262 PRINTF_DBG(
"centralParams are : \n");display((*REF.p_IntHelper)[ix].centralParams);
263 PRINTF_DBG(
"Neighbor values are : \n");display((*REF.p_IntHelper)[ix].neighborValues);
264 PRINTF_DBG(
"edgeValues values are : \n");display((*REF.p_IntHelper)[ix].edgeValues);
267 printf(
"[FATAL] field order requested does not exist. Requested was: %d\n", fieldNum);
268 std::cout<<std::flush;
276template<
typename DIFFEQ,
typename SOLVER,
int BATCH>
279 int N = REF.p_LayHelper->data.size();
281 auto vs = vertices(*REF.p_g);
282#pragma omp parallel firstprivate(N, REF, vs, solver)
284 int Nthreads = (int) omp_get_num_threads();
285 int myThread = (int) omp_get_thread_num();
286 int begin = N/Nthreads * myThread;
287 int end = N/Nthreads * (myThread + 1);
288 if (myThread + 1 == Nthreads) end += N % Nthreads;
290 PRINTF_DBG(
"Currently at 'finalize_integration': N=%d, Nthreads=%d, myThread=%d, begin=%d and end=%d\n",
291 N, Nthreads, myThread, begin, end); std::cout << std::flush;
293 for (
auto v = vs.first + begin; v != vs.first + end; ++v){
294 long ix = get(get(boost::vertex_index, *(REF.p_g)), *v);
297 (*REF.p_IntHelper)[ix].centralValue,
298 (*REF.p_IntHelper)[ix].centralParams,
299 (*REF.p_IntHelper)[ix].neighborValues,
300 (*REF.p_IntHelper)[ix].edgeValues,
301 REF.p_LayHelper->data[ix].RK1,
302 REF.p_LayHelper->data[ix].RK2,
303 REF.p_LayHelper->data[ix].RK3,
304 REF.p_LayHelper->data[ix].RK4,
308 (*REF.p_g)[*v].value = answer;
309 REF.p_LayHelper->data[ix].RK1_status =
false;
310 REF.p_LayHelper->data[ix].RK2_status =
false;
311 REF.p_LayHelper->data[ix].RK3_status =
false;
312 REF.p_LayHelper->data[ix].RK4_status =
false;
320template<
typename DIFFEQ,
typename SOLVER,
int BATCH>
321void single_evolution2(Graph &g,
323 ReferenceContainer REF,
324 unsigned long N_total_nodes){
327 unsigned long NVtot = REF.NVtot;
329 int PENDING_INT = NVtot;
331 auto vs = vertices(g);
332 double temporalResult;
333 bool is_unclaimed =
true, keep_responding =
true;
334 int active_responders=0, finalized_responders=0;
335 NT = REF.p_ComHelper->NUM_THREADS;
337 std::pair<std::queue<long>, std::queue<unsigned long>> CHECKED;
338 std::pair<std::queue<long>, std::queue<unsigned long>> READY_FOR_INTEGRATION;
340 REF.p_CHECKED = &CHECKED;
341 REF.p_READY_FOR_INTEGRATION = &READY_FOR_INTEGRATION;
343 REF.p_PENDING_INT = &PENDING_INT;
345 const int MAX_SUBTHR = 1;
346 const int TIMETOL = 30;
349 bool isLayerBuilt = REF.p_LayHelper->built;
350 int request_performers=0;
351 int request_performers_ended=0;
352 auto MapHelper = *REF.p_MapHelper;
356 update_neighbor_values(REF);
358 for (
int i=1; i < solver.deg+1 ; ++i){
359 PRINTF_DBG(
"entering the for, i is %d\n",i);
360 if (solver.requires_communication){
361 bool keep_responding =
true;
363 int NFinalizedCapturers = 0;
365 int NFinalizedResponders = 0;
366 long pending = (long) NVtot;
367 std::queue<long> CAPTURED;
368 for (
long k=0; k<pending; ++k) CAPTURED.push(k);
369#pragma omp parallel firstprivate(solver)
371 if (omp_get_thread_num() == 0) {
372 int v_Ncapturers = 0;
373 int v_NFinalizedCapturers = 0;
374 int v_Nresponders = 0;
375 int v_NFinalizedResponders = 0;
377#pragma omp atomic read
378 v_Ncapturers = Ncapturers;
379 while (v_Ncapturers == 0){
380#pragma omp atomic read
381 v_Ncapturers = Ncapturers;
383#pragma omp atomic read
384 v_NFinalizedCapturers = NFinalizedCapturers;
385 while (v_Ncapturers > v_NFinalizedCapturers){
386#pragma omp atomic read
387 v_Ncapturers = Ncapturers;
388#pragma omp atomic read
389 v_NFinalizedCapturers = NFinalizedCapturers;
393 MPI_Barrier(MPI_COMM_WORLD);
394#pragma omp atomic write
395 keep_responding =
false;
396#pragma omp atomic read
397 v_Nresponders = Nresponders;
398 while (v_Nresponders == 0){
399#pragma omp atomic read
400 v_Nresponders = Nresponders;
402#pragma omp atomic read
403 v_NFinalizedResponders = NFinalizedResponders;
404 while (v_Nresponders > v_NFinalizedResponders){
405#pragma omp atomic read
406 v_Nresponders = Nresponders;
407#pragma omp atomic read
408 v_NFinalizedResponders = NFinalizedResponders;
412 MPI_Barrier(MPI_COMM_WORLD);
413 }
else if (omp_get_thread_num() % 2 == 1){
415#pragma omp atomic update
417 std::queue<long> * locallyQueue;
420 locallyQueue = &CAPTURED;
422 perform_field_requests<DT, TIMETOL, BATCH>(
424 (
int) REF.p_ComHelper->WORLD_RANK[omp_get_thread_num()],
427#pragma omp atomic update
428 NFinalizedCapturers++;
430 }
else if (omp_get_thread_num() % 2 == 0) {
431#pragma omp atomic update
433 bool l_keep_responding =
true;
438 while (l_keep_responding){
440 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
441 (
int) omp_get_thread_num(),
444 generic_answer_requests<DT, TIMETOL, BATCH, Field0Requester>(REF,
445 (
int) omp_get_thread_num(),
448 generic_answer_requests<DT, TIMETOL, BATCH, Field1Requester>(REF,
449 (
int) omp_get_thread_num(),
452 generic_answer_requests<DT, TIMETOL, BATCH, Field2Requester>(REF,
453 (
int) omp_get_thread_num(),
456#pragma omp atomic read
457 l_keep_responding = keep_responding;
460#pragma omp atomic update
461 NFinalizedResponders++;
465 PRINTF_DBG(
"About to call 'contribute_to_higher_integration' with i=%d\n",i);
466 contribute_to_higher_integration<DIFFEQ, SOLVER, BATCH>(REF,
471 PRINTF_DBG(
"Reached the final integration :-)\n");
472 finalize_integration<DIFFEQ, SOLVER, BATCH>(REF, solver);
473 PRINTF_DBG(
"Ended the final integration :-)\n");
476 PRINTF_DBG(
"starting to swap register\n");std::cout<<std::flush;
480 PRINTF_DBG(
"About to synchronize");std::cout<<std::flush;
481 MPI_Barrier(MPI_COMM_WORLD);
484 PRINTF_DBG(
"About to increase time by h\n");
489 printf(
"-^-^-H-E-A-R-T-B-E-A-T-^-^-");
490 PRINTF_DBG(
"\n\n\n\\n\n\n\n\n\n");std::cout<<std::flush;
491 PRINTF_DBG(
"exited");
496template<
typename DIFFEQ,
typename SOLVER,
int BATCH>
497void single_evolution(Graph &g,
499 ReferenceContainer REF,
500 unsigned long N_total_nodes){
506 unsigned long NVtot = REF.NVtot;
508 int PENDING_INT = NVtot;
510 auto vs = vertices(g);
511 double temporalResult;
512 bool is_unclaimed =
true, keep_responding =
true;
513 int active_responders=0, finalized_responders=0;
514 NT = REF.p_ComHelper->NUM_THREADS;
516 std::pair<std::queue<long>, std::queue<unsigned long>> CHECKED;
517 std::pair<std::queue<long>, std::queue<unsigned long>> READY_FOR_INTEGRATION;
519 REF.p_CHECKED = &CHECKED;
520 REF.p_READY_FOR_INTEGRATION = &READY_FOR_INTEGRATION;
522 REF.p_PENDING_INT = &PENDING_INT;
524 const int MAX_SUBTHR = 1;
525 const int TIMETOL = 20;
528 bool isLayerBuilt = REF.p_LayHelper->built;
529 int request_performers=0;
530 int request_performers_ended=0;
531 auto MapHelper = *REF.p_MapHelper;
533#pragma omp parallel firstprivate(NVtot, vs, NT, N_total_nodes, REF, MAX_SUBTHR, TIMETOL, DT, solver, isLayerBuilt, MapHelper)
536 int SplitCoef = omp_get_num_threads()/2;
537 if (SplitCoef < 2){SplitCoef = 2;}
538 OpenMPHelper OmpHelper(NVtot, SplitCoef);
539 if (OmpHelper.MY_THREAD_n < SplitCoef){
540 if (OmpHelper.MY_THREAD_n % (MAX_SUBTHR + 1) == 0) {
542#pragma omp atomic read
543 atomic_bool = keep_responding;
544#pragma omp atomic update
548 while (atomic_bool) {
549 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
550 OmpHelper.MY_THREAD_n,
552 generic_answer_requests<DT, TIMETOL, BATCH, EdgesRequester>(REF,
553 OmpHelper.MY_THREAD_n,
556#pragma omp atomic read
557 atomic_bool = keep_responding;
560#pragma omp atomic update
561 ++finalized_responders;
563 if (!atomic_bool) PRINTF_DBG(
"OVER! :O\n");
564 PRINTF_DBG(
" I am thread %d (MESSAGE ANSWERER) and I have finished doing my job ;-)\n",omp_get_thread_num());
567#pragma omp atomic update
568 request_performers++;
569 perform_requests<DT, TIMETOL, BATCH>(NVtot, REF, N_total_nodes, OmpHelper);
571#pragma omp atomic update
572 request_performers_ended++;
575 unsigned long NLocals, NInedges, M, rank, NOwned;
578 bool ready4int =
false;
579 i += OmpHelper.MY_OFFSET_n;
580 for (
auto v = vs.first + OmpHelper.MY_OFFSET_n;
581 v != vs.first + OmpHelper.MY_OFFSET_n + OmpHelper.MY_LENGTH_n; ++v) {
584 PRINTF_DBG(
"Accesing values 1\n");std::cout<<std::flush;
585 (*REF.p_IntHelper)[i].centralValue = g[*v].value;
586 PRINTF_DBG(
"Accesing values 2\n");std::cout<<std::flush;
587 (*REF.p_IntHelper)[i].centralParams = g[*v].params;
588 (*REF.p_IntHelper)[i].build(g, *v, MapHelper, NOwned, rank, NLocals, M);
595 if (get(MapHelper.NodeOwner,*v) != REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]){
597 PRINTF_DBG(
"\n\n\n[ERROR] Node not owned by processor.\n\n\n\n");std::cout<<std::flush;
600 ui = ((
unsigned long) get(get(boost::vertex_index, g), *v));
605 REF.p_LayHelper->buildForRank((
long) ui, (long) rank);
608 auto neighbors = boost::adjacent_vertices(*v, g);
609 auto in_edges = boost::in_edges(*v, g);
610 unsigned long MYPROCN = REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n];
611#pragma omp parallel firstprivate(i, v, M, neighbors, NLocals, NInedges, NOwned, N_total_nodes, MYPROCN)
613 OpenMPHelper OmpHelperN(NLocals, 0);
614 for (
auto n = neighbors.first + OmpHelperN.MY_OFFSET_n;
615 n != neighbors.first + OmpHelperN.MY_OFFSET_n + OmpHelperN.MY_LENGTH_n; ++n) {
616 auto local_e = edge(*v, *n, g).first;
618 if (REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n] == get(get(boost::vertex_owner, g), local_v)) {
619 if (edge(*v, *n, g).second == 1) {
620 PRINTF_DBG(
"Accesing values 3: are we the owner? Nowner: %d, we: %d, Vowner %d, Eowner: %d (<-temp placeholder) MapHelper.Local applied to neighbor yields: %d\n",
621 get(get(boost::vertex_owner, g), local_v),
622 process_id(g.process_group()),
623 get(get(boost::vertex_owner, g),*v),
625 get(MapHelper.Local, *n));std::cout<<std::flush;
631 (*REF.p_IntHelper)[i].ResultsPendProcess.emplace_back(g[*n].value,
634 (
unsigned long) ((
unsigned long) get(get(boost::vertex_index, g), *n) + N_total_nodes * ((
unsigned long) MYPROCN) ) ,
635 (
unsigned long) get(get(boost::vertex_index, g), local_v),
636 (
unsigned long) get(get(boost::vertex_owner, g), local_v));
638 }
else { error_report(
"Push back mechanism for local nodes has failed"); };
641 PRINTF_DBG(
"Accesing values 4\n");std::cout<<std::flush;
645 REF.p_ParHelper->data[i].MissingA[OmpHelperN.MY_THREAD_n].emplace_back(g[local_e].value,
646 (
int) get(MapHelper.NodeOwner,
648 (
int) get(get(boost::vertex_index,
657 OpenMPHelper OmpHelperE(M, 0, OmpHelperN.N_THREADS_n, OmpHelperN.MY_THREAD_n);
662 central_ix = get(get(boost::vertex_index,g), *v);
664 for (
auto e = in_edges.first; e != in_edges.second; ++e) {
665 if ((j >= OmpHelperE.MY_OFFSET_n) && (j < OmpHelperE.MY_OFFSET_n + OmpHelperE.MY_LENGTH_n)) {
667 auto local_v = boost::source(*e, g);
670 REF.p_ParHelper->data[i].MissingB[OmpHelperE.MY_THREAD_n].emplace_back(
672 (
int) get(MapHelper.NodeOwner,
674 (
int) get(get(boost::vertex_index,
684 READY_FOR_INTEGRATION.first.push(i);
685 READY_FOR_INTEGRATION.second.push(ui);
687#pragma omp atomic update
689 PRINTF_DBG(
"Increased by one the number of total vertex\n");std::cout<<std::flush;
693 CHECKED.first.push(i);
694 CHECKED.second.push(ui);
696 PRINTF_DBG(
"Added ix %d to CHECKED\n", i);std::cout<<std::flush;
699 PRINTF_DBG(
" I am thread %d (FOR WORKER) and I have finished doing my job ;-)\n",omp_get_thread_num());
700 PRINTF_DBG(
"This thread is a for worker that has ended :-)\n");
703 PRINTF_DBG(
"about to enter 'omp critical unclaimed'\n"); std::cout << std::flush;
706 am_i_first = is_unclaimed;
708 is_unclaimed =
false;
709 PRINTF_DBG(
"Claimed by thread %ld of processor %d\n", OmpHelper.MY_THREAD_n, REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
716 bool are_we_over =
false;
719#pragma omp atomic read
721 PRINTF_DBG(
"Already checking if we are over, proc %d\n", REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
722 are_we_over = (atomical_int >= NVtot);
726 while (!are_we_over){
728#pragma omp atomic read
730 are_we_over = (atomical_int >= NVtot);
733 PRINTF_DBG(
"not ready yet!\n");
738#pragma omp atomic read
739 aux1 = request_performers;
740#pragma omp atomic read
741 aux2 = request_performers_ended;
742 are_we_over = (aux1 == aux2);
743 while (!are_we_over){
744#pragma omp atomic read
745 aux2 = request_performers_ended;
746 are_we_over = (aux1 == aux2);
750 PRINTF_DBG(
"\n\n\n\n\n\n\WE WERE OVER! NVtot and TOT are: %d & %d\n\n\n\n", NVtot, TOT);
751 PRINTF_DBG(
"Before being ready, we waited for %d laps!\n", notreadyyet);
752 std::cout << std::flush;
755 int worldsize = REF.p_ComHelper->WORLD_SIZE[OmpHelper.MY_THREAD_n];
756 int worldrank = REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n];
758 PRINTF_DBG(
"\nabout to 1st barrier says P:%d\n", worldrank);std::cout<<std::flush;
759 MPI_Barrier(MPI_COMM_WORLD);
761 PRINTF_DBG(
"The (FIRST) cherry of the cake ;-)\n");std::cout<<std::flush;
762#pragma omp atomic write
763 keep_responding =
false;
766#pragma omp atomic read
767 readme = keep_responding;
768 PRINTF_DBG(
"keep_responding was now set as %d\n",readme);std::cout<<std::flush;
771#pragma omp atomic read
772 auxy1 = active_responders;
773#pragma omp atomic read
774 auxy2 = finalized_responders;
775 while (auxy1 > auxy2){
777#pragma omp atomic read
778 auxy2 = finalized_responders;
779#pragma omp atomic read
780 auxy1 = active_responders;
781 PRINTF_DBG(
"active (total) are %d and finalized are %d\n", auxy1, auxy2);
783 PRINTF_DBG(
"\nabout to 2nd barrier says P:%d\n", worldrank);std::cout<<std::flush;
784 MPI_Barrier(MPI_COMM_WORLD);
785 PRINTF_DBG(
"\nEnded barrier says P:%d\n", worldrank);std::cout<<std::flush;
787 PRINTF_DBG(
"The (SECOND) cherry of the cake (-;\n");std::cout<<std::flush;
789 }
else if (OmpHelper.MY_THREAD_n % 2 == 0) {
791#pragma omp atomic read
792 atomic_bool = keep_responding;
793 bool later_mark_finalized =
false;
795#pragma omp atomic update
796 active_responders ++;
797 later_mark_finalized =
true;
802 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
803 OmpHelper.MY_THREAD_n,
805 generic_answer_requests<DT, TIMETOL, BATCH, EdgesRequester>(REF,
806 OmpHelper.MY_THREAD_n,
808#pragma omp atomic read
809 atomic_bool = keep_responding;
810 PRINTF_DBG(
"so far I keep responding U.u cuz keep responding was %d\n", atomic_bool);
812 if (later_mark_finalized) {
813#pragma omp atomic update
814 finalized_responders++;
819 PRINTF_DBG(
"starting to contribute\n");std::cout<<std::flush;
821#pragma omp atomic read
822 auxv1 = request_performers;
823#pragma omp atomic read
824 auxv2 = request_performers_ended;
825 bool keep_integrating = (auxv1 > auxv2);
826 while (keep_integrating) {
827 contribute_to_integration<DIFFEQ, SOLVER>(REF, solver,
828 REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
829#pragma omp atomic read
830 auxv1 = request_performers;
831#pragma omp atomic read
832 auxv2 = request_performers_ended;
833 keep_integrating = (auxv1 > auxv2);
836 contribute_to_integration<DIFFEQ, SOLVER>(REF, solver,
837 REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
838 PRINTF_DBG(
"ending to contribute\n");std::cout<<std::flush;
842 PRINTF_DBG(
"Exited the parallel construct! solver deg is %d\n", solver.deg);
848 for (
int i=2; i < solver.deg+1 ; ++i){
849 PRINTF_DBG(
"entering the for, i is %d\n",i);
850 if (solver.requires_communication){
851 bool keep_responding =
true;
853 int NFinalizedCapturers = 0;
855 int NFinalizedResponders = 0;
856 long pending = (long) NVtot;
857 std::queue<long> CAPTURED;
858 for (
long k=0; k<pending; ++k) CAPTURED.push(k);
861 if (omp_get_thread_num() == 0) {
862 int v_Ncapturers = 0;
863 int v_NFinalizedCapturers = 0;
864 int v_Nresponders = 0;
865 int v_NFinalizedResponders = 0;
867#pragma omp atomic read
868 v_Ncapturers = Ncapturers;
869 while (v_Ncapturers == 0){
870#pragma omp atomic read
871 v_Ncapturers = Ncapturers;
873#pragma omp atomic read
874 v_NFinalizedCapturers = NFinalizedCapturers;
875 while (v_Ncapturers > v_NFinalizedCapturers){
876#pragma omp atomic read
877 v_Ncapturers = Ncapturers;
878#pragma omp atomic read
879 v_NFinalizedCapturers = NFinalizedCapturers;
883 MPI_Barrier(MPI_COMM_WORLD);
884#pragma omp atomic write
885 keep_responding =
false;
886#pragma omp atomic read
887 v_Nresponders = Nresponders;
888 while (v_Nresponders == 0){
889#pragma omp atomic read
890 v_Nresponders = Nresponders;
892#pragma omp atomic read
893 v_NFinalizedResponders = NFinalizedResponders;
894 while (v_Nresponders > v_NFinalizedResponders){
895#pragma omp atomic read
896 v_Nresponders = Nresponders;
897#pragma omp atomic read
898 v_NFinalizedResponders = NFinalizedResponders;
902 MPI_Barrier(MPI_COMM_WORLD);
903 }
else if (omp_get_thread_num() % 2 == 1){
905#pragma omp atomic update
907 std::queue<long> * locallyQueue;
910 locallyQueue = &CAPTURED;
912 perform_field_requests<DT, TIMETOL, BATCH>(
914 (
int) REF.p_ComHelper->WORLD_RANK[omp_get_thread_num()],
917#pragma omp atomic update
918 NFinalizedCapturers++;
920 }
else if (omp_get_thread_num() % 2 == 0) {
921#pragma omp atomic update
923 bool l_keep_responding =
true;
928 while (l_keep_responding){
930 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
931 (
int) omp_get_thread_num(),
934 generic_answer_requests<DT, TIMETOL, BATCH, Field0Requester>(REF,
935 (
int) omp_get_thread_num(),
938 generic_answer_requests<DT, TIMETOL, BATCH, Field1Requester>(REF,
939 (
int) omp_get_thread_num(),
942 generic_answer_requests<DT, TIMETOL, BATCH, Field2Requester>(REF,
943 (
int) omp_get_thread_num(),
946#pragma omp atomic read
947 l_keep_responding = keep_responding;
950#pragma omp atomic update
951 NFinalizedResponders++;
955 contribute_to_higher_integration<DIFFEQ, SOLVER, BATCH>(REF,
960 PRINTF_DBG(
"Reached the final integration :-)\n");
961 finalize_integration<DIFFEQ, SOLVER, BATCH>(REF, solver);
964 PRINTF_DBG(
"starting to swap register\n");std::cout<<std::flush;
968 PRINTF_DBG(
"About to synchronize");std::cout<<std::flush;
969 MPI_Barrier(MPI_COMM_WORLD);
972 PRINTF_DBG(
"About to increase time by h\n");
977 printf(
"-^-^-H-E-A-R-T-B-E-A-T-^-^-");
978 PRINTF_DBG(
"\n\n\n\\n\n\n\n\n\n");std::cout<<std::flush;
979 PRINTF_DBG(
"exited");
Definition: CommunicationFunctions.h:38
Definition: GeneralSolver.h:38
Definition: CommunicationFunctions.h:30