NEDISS: Network Diffusion and Synchronization Simulator
All Classes Pages
GraphFunctions.h
1//
2// Created by m4zz31 on 3/11/21.
3//
4
5#ifndef CPPPROJCT_GRAPHFUNCTIONS_H
6#define CPPPROJCT_GRAPHFUNCTIONS_H
7
8#include <mpi.h>
9#include <set>
10#include <boost/mpi/environment.hpp>
11
12#include "GeneralGraph.h"
13
14#include "../macros/macros.h"
15
16#include "../Communication/CommunicationFunctions.h"
17
18#include "../Solvers/GeneralSolver.h"
19
20#include "../Utils/adequate_synchronization.h"
21#include "../Utils/memory_management.h"
22#include "../Utils/msleep.h"
23#include "../Utils/error.h"
24#include "../Utils/HelperClasses.h"
25#include "../Utils/display_vectors.h"
26
27
28//*****************************DECLARATIONS GO HERE******************************
29template <typename SpecificRequestObject> class RequestObject;
30template <int field> class FieldRequestObject;
31template<int DT, int TIMETOL, int BATCH, typename RequestClass>
32void generic_answer_requests(ReferenceContainer &REF,int MYTHR, RequestClass ReqObj);
38template <int DT, int TIMETOL, int BATCH>
39void perform_requests(int NNodes,
40 ReferenceContainer REF,
41 unsigned long N,
42 OpenMPHelper &O);
43void register_to_value(Graph &g);
44void destroyRequestWithoutCounter(MPI_Request &R);
45void freeRequestWithoutCounter(MPI_Request &R);
46template<int DT, int TIMETOL, int BATCH>
47void perform_field_requests(ReferenceContainer &REF,int MYPROC, int fieldOrder,std::queue<long> * queue);
48void update_neighbor_values(ReferenceContainer &REF);
49
50
51
52
53/******************************FUNCTIONS GO HERE******************************
54 1) contribute_to_integration< differential equation type, solver type>
55 2) contribute_to_higher_integration< differential equation type, solver type, batch size>
56 3) finalize_integration< differential equation type, solver type, batch size>
57 4) single_evolution< differential equation type, solver type, batch size>
58 5) single_evolution2< differential equation type, solver type, batch size>
59 TODO: explain them, change names, make batch size an actual hyperparameter
60 TODO: make things work with length>1 node values and length>1 edges (interactions)
61*/
62
63template<typename DIFFEQ, typename SOLVER>
64void contribute_to_integration(ReferenceContainer &REF, GeneralSolver<DIFFEQ,SOLVER> &solver, int MYPROC){
65 PRINTF_DBG("Starting the contribution to int\n");std::cout<< std::flush;
66 // Define timeout utilities
67 const int DT= 0;
68 int totlaps=0;
69 // Build a graph vertex iterator
70 auto vs = vertices(*REF.p_g);
71 auto start = vs.first;
72 auto end = vs.second;
73 auto v = start;
74 // Define relevant variables
75 bool keepGoing = true;
76 bool is_in = false;
77 long * i;
78 int remaining;
79 long ix;
80 unsigned long uix, currentuix;
81 bool wait = false;
82 std::vector<InfoVecElem> temp;
83#pragma omp critical
84 {
85 remaining = *(REF.p_PENDING_INT);
86 }
87 // Main loop
88 while (keepGoing) {
89 // Fetch the data
90#pragma omp critical
91 {
92 if (!REF.p_READY_FOR_INTEGRATION->second.empty()){
93 ix = REF.p_READY_FOR_INTEGRATION->first.front();
94 uix = REF.p_READY_FOR_INTEGRATION->second.front();
95 REF.p_READY_FOR_INTEGRATION->first.pop();
96 REF.p_READY_FOR_INTEGRATION->second.pop();
97 } else {
98 wait = true;
99 }
100 }
101
102 if (wait) {
103 mssleep(DT);
104 PRINTF_DBG("(NOTHING TO INT)\n");std::cout<< std::flush;
105 } else {
106 ++totlaps;
107 PRINTF_DBG("starting to locate this uix: %lu\n",uix);
108 v = vertices(*REF.p_g).first;
109 is_in = false;
110 while (!is_in) {
111 if (get(get(boost::vertex_owner, *(REF.p_g)), *v) == MYPROC) {
112 currentuix = (unsigned long) get(get(boost::vertex_index, *(REF.p_g)), *v);
113 if (currentuix == uix) {
114 is_in = true;
115 } else {
116 v++;
117 }
118 } else ++v;
119 if (v == end) {
120 std::cout << "[CRITICAL] Failed to found ix!!!" << std::endl;
121 };
122 }
123 // Join the data as required by the integrator
124 temp = std::vector<InfoVecElem>();
125 temp.resize((*REF.p_IntHelper)[ix].ResultsPendProcess.size());
126 int _it = 0;
127 auto it = (*REF.p_IntHelper)[ix].ResultsPendProcess.begin();
128 auto end = (*REF.p_IntHelper)[ix].ResultsPendProcess.end();
129 int oldsize = (*REF.p_IntHelper)[ix].ResultsPendProcess.size();
130 std::set<unsigned long> seen;
131 while (it != end){
132 unsigned long elemix = std::get<2>(*it);
133 const bool is_in = seen.find(elemix) != seen.end();
134 PRINTF_DBG("Std output uix: %lu ix: %ld proc: %d. vals are %f , %f , %lu\n",
135 uix, ix, MYPROC,
136 std::get<0>(*it), std::get<1>(*it), std::get<2>(*it));
137 std::cout<<std::flush;
138 if (!is_in) {
139 temp[_it] = *it;
140 _it++;
141 seen.insert(elemix);
142 } else {
143 }
144 (*REF.p_IntHelper)[ix].ResultsPendProcess.erase(it++);
145 }
146 temp.resize(_it);
147 std::sort(temp.begin(),
148 temp.end(),
149 [](InfoVecElem &t1, InfoVecElem &t2) {
150 return std::get<2>(t1) > std::get<2>(t2);
151 }
152 );
153 PRINTF_DBG("Finished sorting! :-)\n");
154 (*REF.p_IntHelper)[ix].neighborValues.resize(temp.size());
155 (*REF.p_IntHelper)[ix].edgeValues.resize(temp.size());
156 for (int j=0; j<temp.size(); ++j){
157 (*REF.p_IntHelper)[ix].neighborValues[j] = std::get<0>(temp[j]);
158 (*REF.p_IntHelper)[ix].edgeValues[j] = std::get<1>(temp[j]);
159 std::get<0>((*REF.p_IntHelper)[ix].ixMap[j]) = (unsigned long) std::get<2>(temp[j]);
160 std::get<1>((*REF.p_IntHelper)[ix].ixMap[j]) = (unsigned long) std::get<3>(temp[j]);
161 std::get<2>((*REF.p_IntHelper)[ix].ixMap[j]) = (unsigned long) std::get<4>(temp[j]);
162 PRINTF_DBG("temp[j] would be integerisized to %d %d %d\n",
163 (int)((unsigned long) std::get<2>(temp[j])),
164 (int)((unsigned long) std::get<3>(temp[j])),
165 (int)((unsigned long) std::get<4>(temp[j]))
166 );
167 }
168 solver.Term1(
169 (*REF.p_g)[*v].value,
170 (*REF.p_g)[*v].params,
171 (*REF.p_IntHelper)[ix].neighborValues,
172 (*REF.p_IntHelper)[ix].edgeValues,
173 REF.p_LayHelper->data[ix].RK1
174 );
175 REF.p_LayHelper->data[ix].RK1_status = true;
176
177 // for ccompatibility during development:
178 (*REF.p_g)[*v].temporal_register = 0;
179 }
180 wait = false;
181#pragma omp critical
182{
183 remaining = REF.p_READY_FOR_INTEGRATION->first.size();
184}
185
186 PRINTF_DBG("There are %d remaining vals to integrate...\n", remaining);std::cout<<std::flush;
187
188 // Recompute the status of 'KeepGoing'
189 keepGoing = (remaining != 0);
190 }
191 PRINTF_DBG("In total this thread of PROC %d performed %d laps!\n",MYPROC, totlaps);std::cout<<std::flush;
192}
193
194
195template<typename DIFFEQ, typename SOLVER, int BATCH>
196void contribute_to_higher_integration(ReferenceContainer &REF,
198 int fieldNum){
199 int N = REF.p_LayHelper->data.size();
200 auto vs = vertices(*REF.p_g);
201#pragma omp parallel firstprivate(N, REF, vs, solver)
202{
203 int Nthreads = (int) omp_get_num_threads();
204 int myThread = (int) omp_get_thread_num();
205 int begin = N/Nthreads * myThread;
206 int end = N/Nthreads * (myThread + 1);
207 if (myThread + 1 == Nthreads) end += N%Nthreads;
208 for (auto v = vs.first + begin; v != vs.first + end; ++v){
209 long ix = get(get(boost::vertex_index, *(REF.p_g)), *v);
210
211 // FLAGGED FOR KILL AFTER NEXT DEBUG 01 jan 2022
212// if (ix == 0){
213// PRINTF_DBG("rk0, central val: %f and neighbors ",(*REF.p_IntHelper)[ix].centralValue);
214// display((*REF.p_IntHelper)[ix].neighborValues);
215// PRINTF_DBG("rk1 "); display(REF.p_LayHelper->data[ix].RK1);
216// if (fieldNum >= 2) {printf("rk2 "); display(REF.p_LayHelper->data[ix].RK2);}
217// if (fieldNum >= 3) {printf("rk3 "); display(REF.p_LayHelper->data[ix].RK3);}
218// if (fieldNum >= 4) {printf("rk4 "); display(REF.p_LayHelper->data[ix].RK4);}
219// }
220
221
222
223 if (fieldNum == 2){
224 solver.Term2((*REF.p_IntHelper)[ix].centralValue,
225 (*REF.p_IntHelper)[ix].centralParams,
226 (*REF.p_IntHelper)[ix].neighborValues,
227 (*REF.p_IntHelper)[ix].edgeValues,
228 REF.p_LayHelper->data[ix].RK1,
229 REF.p_LayHelper->data[ix].RK2);
230 REF.p_LayHelper->data[ix].RK2_status = true;
231 PRINTF_DBG("RK2 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK2[0]);
232 } else if (fieldNum == 3) {
233 solver.Term3((*REF.p_IntHelper)[ix].centralValue,
234 (*REF.p_IntHelper)[ix].centralParams,
235 (*REF.p_IntHelper)[ix].neighborValues,
236 (*REF.p_IntHelper)[ix].edgeValues,
237 REF.p_LayHelper->data[ix].RK1,
238 REF.p_LayHelper->data[ix].RK2,
239 REF.p_LayHelper->data[ix].RK3);
240 REF.p_LayHelper->data[ix].RK3_status = true;
241 PRINTF_DBG("RK3 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK3[0]);
242 } else if (fieldNum == 4){
243 solver.Term4((*REF.p_IntHelper)[ix].centralValue,
244 (*REF.p_IntHelper)[ix].centralParams,
245 (*REF.p_IntHelper)[ix].neighborValues,
246 (*REF.p_IntHelper)[ix].edgeValues,
247 REF.p_LayHelper->data[ix].RK1,
248 REF.p_LayHelper->data[ix].RK2,
249 REF.p_LayHelper->data[ix].RK3,
250 REF.p_LayHelper->data[ix].RK4);
251 REF.p_LayHelper->data[ix].RK4_status = true;
252 PRINTF_DBG("RK4 with ix %ld correctly computed! it yielded %f\n", ix, REF.p_LayHelper->data[ix].RK4[0]);
253 } else if (fieldNum == 1){
254 solver.Term1((*REF.p_IntHelper)[ix].centralValue,
255 (*REF.p_IntHelper)[ix].centralParams,
256 (*REF.p_IntHelper)[ix].neighborValues,
257 (*REF.p_IntHelper)[ix].edgeValues,
258 REF.p_LayHelper->data[ix].RK1);
259 REF.p_LayHelper->data[ix].RK1_status = true;
260 PRINTF_DBG("RK1 with ix %ld correctly computed! it yielded %f and central value was %f\n", ix, REF.p_LayHelper->data[ix].RK1[0], (*REF.p_IntHelper)[ix].centralValue);
261 if (VERBOSE) {
262 PRINTF_DBG("centralParams are : \n");display((*REF.p_IntHelper)[ix].centralParams);
263 PRINTF_DBG("Neighbor values are : \n");display((*REF.p_IntHelper)[ix].neighborValues);
264 PRINTF_DBG("edgeValues values are : \n");display((*REF.p_IntHelper)[ix].edgeValues);
265 }
266 } else {
267 printf("[FATAL] field order requested does not exist. Requested was: %d\n", fieldNum);
268 std::cout<<std::flush;
269 exit(1);
270 }
271 }
272}
273};
274
275
276template<typename DIFFEQ, typename SOLVER, int BATCH>
277void finalize_integration(ReferenceContainer &REF, GeneralSolver<DIFFEQ,SOLVER> &solver){
278
279 int N = REF.p_LayHelper->data.size();
280
281 auto vs = vertices(*REF.p_g);
282#pragma omp parallel firstprivate(N, REF, vs, solver)
283{
284 int Nthreads = (int) omp_get_num_threads();
285 int myThread = (int) omp_get_thread_num();
286 int begin = N/Nthreads * myThread;
287 int end = N/Nthreads * (myThread + 1);
288 if (myThread + 1 == Nthreads) end += N % Nthreads;
289
290 PRINTF_DBG("Currently at 'finalize_integration': N=%d, Nthreads=%d, myThread=%d, begin=%d and end=%d\n",
291 N, Nthreads, myThread, begin, end); std::cout << std::flush;
292
293 for (auto v = vs.first + begin; v != vs.first + end; ++v){
294 long ix = get(get(boost::vertex_index, *(REF.p_g)), *v);
295 double answer = 0;
296 solver.evolve(
297 (*REF.p_IntHelper)[ix].centralValue,
298 (*REF.p_IntHelper)[ix].centralParams,
299 (*REF.p_IntHelper)[ix].neighborValues,
300 (*REF.p_IntHelper)[ix].edgeValues,
301 REF.p_LayHelper->data[ix].RK1,
302 REF.p_LayHelper->data[ix].RK2,
303 REF.p_LayHelper->data[ix].RK3,
304 REF.p_LayHelper->data[ix].RK4,
305 answer
306 );
307 //(*REF.p_g)[*v].temporal_register = answer;
308 (*REF.p_g)[*v].value = answer;
309 REF.p_LayHelper->data[ix].RK1_status = false;
310 REF.p_LayHelper->data[ix].RK2_status = false;
311 REF.p_LayHelper->data[ix].RK3_status = false;
312 REF.p_LayHelper->data[ix].RK4_status = false;
313 }
314}
315};
316
317
318
319
320template<typename DIFFEQ, typename SOLVER, int BATCH>
321void single_evolution2(Graph &g,
323 ReferenceContainer REF,
324 unsigned long N_total_nodes){
325
326
327 unsigned long NVtot = REF.NVtot;
328 unsigned long NT;
329 int PENDING_INT = NVtot;
330 int TOT = 1;
331 auto vs = vertices(g);
332 double temporalResult;
333 bool is_unclaimed = true, keep_responding = true;
334 int active_responders=0, finalized_responders=0;
335 NT = REF.p_ComHelper->NUM_THREADS;
336
337 std::pair<std::queue<long>, std::queue<unsigned long>> CHECKED;
338 std::pair<std::queue<long>, std::queue<unsigned long>> READY_FOR_INTEGRATION;
339
340 REF.p_CHECKED = &CHECKED;
341 REF.p_READY_FOR_INTEGRATION = &READY_FOR_INTEGRATION;
342 REF.p_TOT = &TOT;
343 REF.p_PENDING_INT = &PENDING_INT;
344
345 const int MAX_SUBTHR = 1;
346 const int TIMETOL = 30;
347 const int DT = 0;
348
349 bool isLayerBuilt = REF.p_LayHelper->built;
350 int request_performers=0;
351 int request_performers_ended=0;
352 auto MapHelper = *REF.p_MapHelper;
353
354 // Function that transvereses the graph looking for
355 // ixMap indexes that are owned by the current prcessor and updating em.
356 update_neighbor_values(REF);
357
358 for (int i=1; i < solver.deg+1 ; ++i){
359 PRINTF_DBG("entering the for, i is %d\n",i);
360 if (solver.requires_communication){
361 bool keep_responding = true;
362 int Ncapturers = 0;
363 int NFinalizedCapturers = 0;
364 int Nresponders = 0;
365 int NFinalizedResponders = 0;
366 long pending = (long) NVtot;
367 std::queue<long> CAPTURED;
368 for (long k=0; k<pending; ++k) CAPTURED.push(k);
369#pragma omp parallel firstprivate(solver)
370 {
371 if (omp_get_thread_num() == 0) {
372 int v_Ncapturers = 0;
373 int v_NFinalizedCapturers = 0;
374 int v_Nresponders = 0;
375 int v_NFinalizedResponders = 0;
376 long locallyPending;
377#pragma omp atomic read
378 v_Ncapturers = Ncapturers;
379 while (v_Ncapturers == 0){
380#pragma omp atomic read
381 v_Ncapturers = Ncapturers;
382 }
383#pragma omp atomic read
384 v_NFinalizedCapturers = NFinalizedCapturers;
385 while (v_Ncapturers > v_NFinalizedCapturers){
386#pragma omp atomic read
387 v_Ncapturers = Ncapturers;
388#pragma omp atomic read
389 v_NFinalizedCapturers = NFinalizedCapturers;
390 mssleep(DT);
391 }
392 //std::cout << "about to implement the first integration barrier "<< v_Ncapturers << std::endl;
393 MPI_Barrier(MPI_COMM_WORLD);
394#pragma omp atomic write
395 keep_responding = false;
396#pragma omp atomic read
397 v_Nresponders = Nresponders;
398 while (v_Nresponders == 0){
399#pragma omp atomic read
400 v_Nresponders = Nresponders;
401 }
402#pragma omp atomic read
403 v_NFinalizedResponders = NFinalizedResponders;
404 while (v_Nresponders > v_NFinalizedResponders){
405#pragma omp atomic read
406 v_Nresponders = Nresponders;
407#pragma omp atomic read
408 v_NFinalizedResponders = NFinalizedResponders;
409 mssleep(DT);
410 }
411 //std::cout << "about to implement the second integration barrier "<< v_Ncapturers << std::endl;
412 MPI_Barrier(MPI_COMM_WORLD);
413 } else if (omp_get_thread_num() % 2 == 1){
414
415#pragma omp atomic update
416 Ncapturers++;
417 std::queue<long> * locallyQueue;
418#pragma omp critical
419 {
420 locallyQueue = &CAPTURED;
421 }
422 perform_field_requests<DT, TIMETOL, BATCH>(
423 REF,
424 (int) REF.p_ComHelper->WORLD_RANK[omp_get_thread_num()],
425 i,
426 locallyQueue);
427#pragma omp atomic update
428 NFinalizedCapturers++;
429
430 } else if (omp_get_thread_num() % 2 == 0) {
431#pragma omp atomic update
432 Nresponders++;
433 bool l_keep_responding = true;
434 NodesRequester RO_ground;
435 Field0Requester RO_field0;
436 Field1Requester RO_field1;
437 Field2Requester RO_field2;
438 while (l_keep_responding){
439 if (i == 1){
440 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
441 (int) omp_get_thread_num(),
442 RO_ground);
443 } else if (i == 2){
444 generic_answer_requests<DT, TIMETOL, BATCH, Field0Requester>(REF,
445 (int) omp_get_thread_num(),
446 RO_field0);
447 } else if (i == 3){
448 generic_answer_requests<DT, TIMETOL, BATCH, Field1Requester>(REF,
449 (int) omp_get_thread_num(),
450 RO_field1);
451 } else if (i == 4){
452 generic_answer_requests<DT, TIMETOL, BATCH, Field2Requester>(REF,
453 (int) omp_get_thread_num(),
454 RO_field2);
455 }
456#pragma omp atomic read
457 l_keep_responding = keep_responding;
458 mssleep(DT);
459 }
460#pragma omp atomic update
461 NFinalizedResponders++;
462 }
463 }
464 }
465 PRINTF_DBG("About to call 'contribute_to_higher_integration' with i=%d\n",i);
466 contribute_to_higher_integration<DIFFEQ, SOLVER, BATCH>(REF,
467 solver,
468 i);
469 }
470 // Join all the integration terms
471 PRINTF_DBG("Reached the final integration :-)\n");
472 finalize_integration<DIFFEQ, SOLVER, BATCH>(REF, solver);
473 PRINTF_DBG("Ended the final integration :-)\n");
474
475 // Swap temporal and main registers
476 PRINTF_DBG("starting to swap register\n");std::cout<<std::flush;
477 //register_to_value(g);
478
479 // Synchronize
480 PRINTF_DBG("About to synchronize");std::cout<<std::flush;
481 MPI_Barrier(MPI_COMM_WORLD);
482
483 // Evolve time
484 PRINTF_DBG("About to increase time by h\n");
485 solver.EvolveTime();
486
487 // Finalize
488 PRINTF_DBG("Done");
489 printf("-^-^-H-E-A-R-T-B-E-A-T-^-^-");
490 PRINTF_DBG("\n\n\n\\n\n\n\n\n\n");std::cout<<std::flush;
491 PRINTF_DBG("exited");
492
493}
494
495
496template<typename DIFFEQ, typename SOLVER, int BATCH>
497void single_evolution(Graph &g,
499 ReferenceContainer REF,
500 unsigned long N_total_nodes){
501 /* Roughly we are:
502 (1) gathering info from neighbors in parallel using MPI calls to other procs.
503 (2) solving the respective differential equation
504 (3) calling the proc synchronization */
505
506 unsigned long NVtot = REF.NVtot;
507 unsigned long NT;
508 int PENDING_INT = NVtot;
509 int TOT = 1;
510 auto vs = vertices(g);
511 double temporalResult;
512 bool is_unclaimed = true, keep_responding = true;
513 int active_responders=0, finalized_responders=0;
514 NT = REF.p_ComHelper->NUM_THREADS;
515
516 std::pair<std::queue<long>, std::queue<unsigned long>> CHECKED;
517 std::pair<std::queue<long>, std::queue<unsigned long>> READY_FOR_INTEGRATION;
518
519 REF.p_CHECKED = &CHECKED;
520 REF.p_READY_FOR_INTEGRATION = &READY_FOR_INTEGRATION;
521 REF.p_TOT = &TOT;
522 REF.p_PENDING_INT = &PENDING_INT;
523
524 const int MAX_SUBTHR = 1;
525 const int TIMETOL = 20;
526 const int DT = 1;
527
528 bool isLayerBuilt = REF.p_LayHelper->built;
529 int request_performers=0;
530 int request_performers_ended=0;
531 auto MapHelper = *REF.p_MapHelper;
532
533#pragma omp parallel firstprivate(NVtot, vs, NT, N_total_nodes, REF, MAX_SUBTHR, TIMETOL, DT, solver, isLayerBuilt, MapHelper)
534 {
535 bool am_i_first;
536 int SplitCoef = omp_get_num_threads()/2; // 3/2
537 if (SplitCoef < 2){SplitCoef = 2;}
538 OpenMPHelper OmpHelper(NVtot, SplitCoef);
539 if (OmpHelper.MY_THREAD_n < SplitCoef){
540 if (OmpHelper.MY_THREAD_n % (MAX_SUBTHR + 1) == 0) {
541 bool atomic_bool;
542#pragma omp atomic read
543 atomic_bool = keep_responding;
544#pragma omp atomic update
545 ++active_responders;
547 EdgesRequester RO_edges;
548 while (atomic_bool) {
549 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
550 OmpHelper.MY_THREAD_n,
551 RO);
552 generic_answer_requests<DT, TIMETOL, BATCH, EdgesRequester>(REF,
553 OmpHelper.MY_THREAD_n,
554 RO_edges);
555
556#pragma omp atomic read
557 atomic_bool = keep_responding;
558 mssleep(DT);
559 }
560#pragma omp atomic update
561 ++finalized_responders;
562
563 if (!atomic_bool) PRINTF_DBG("OVER! :O\n");
564 PRINTF_DBG(" I am thread %d (MESSAGE ANSWERER) and I have finished doing my job ;-)\n",omp_get_thread_num());
565
566 } else {
567#pragma omp atomic update
568 request_performers++;
569 perform_requests<DT, TIMETOL, BATCH>(NVtot, REF, N_total_nodes, OmpHelper);
570
571#pragma omp atomic update
572 request_performers_ended++;
573 }
574 } else {
575 unsigned long NLocals, NInedges, M, rank, NOwned;
576 long i=-1;
577 unsigned long ui =0;
578 bool ready4int = false;
579 i += OmpHelper.MY_OFFSET_n;
580 for (auto v = vs.first + OmpHelper.MY_OFFSET_n;
581 v != vs.first + OmpHelper.MY_OFFSET_n + OmpHelper.MY_LENGTH_n; ++v) {
582 ++i;
583 // Capture the central node's values
584 PRINTF_DBG("Accesing values 1\n");std::cout<<std::flush;
585 (*REF.p_IntHelper)[i].centralValue = g[*v].value;
586 PRINTF_DBG("Accesing values 2\n");std::cout<<std::flush;
587 (*REF.p_IntHelper)[i].centralParams = g[*v].params;
588 (*REF.p_IntHelper)[i].build(g, *v, MapHelper, NOwned, rank, NLocals, M);
589
590 if (NOwned == rank){
591 ready4int = true;
592 } else {
593 ready4int = false;
594 };
595 if (get(MapHelper.NodeOwner,*v) != REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]){
596 // we are treating a node which we dont own.
597 PRINTF_DBG("\n\n\n[ERROR] Node not owned by processor.\n\n\n\n");std::cout<<std::flush;
598 exit(1);
599 } else {
600 ui = ((unsigned long) get(get(boost::vertex_index, g), *v));
601 }
602 if (!isLayerBuilt) {
603#pragma omp critical
604{
605 REF.p_LayHelper->buildForRank((long) ui, (long) rank);
606}
607 }
608 auto neighbors = boost::adjacent_vertices(*v, g);
609 auto in_edges = boost::in_edges(*v, g);
610 unsigned long MYPROCN = REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n];
611#pragma omp parallel firstprivate(i, v, M, neighbors, NLocals, NInedges, NOwned, N_total_nodes, MYPROCN)
612 {
613 OpenMPHelper OmpHelperN(NLocals, 0);
614 for (auto n = neighbors.first + OmpHelperN.MY_OFFSET_n;
615 n != neighbors.first + OmpHelperN.MY_OFFSET_n + OmpHelperN.MY_LENGTH_n; ++n) {
616 auto local_e = edge(*v, *n, g).first;
617 auto local_v = *n;
618 if (REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n] == get(get(boost::vertex_owner, g), local_v)) {
619 if (edge(*v, *n, g).second == 1) {
620 PRINTF_DBG("Accesing values 3: are we the owner? Nowner: %d, we: %d, Vowner %d, Eowner: %d (<-temp placeholder) MapHelper.Local applied to neighbor yields: %d\n",
621 get(get(boost::vertex_owner, g), local_v),
622 process_id(g.process_group()),
623 get(get(boost::vertex_owner, g),*v),
624 0,
625 get(MapHelper.Local, *n));std::cout<<std::flush;
626
627#pragma omp critical // Just store the results directly in the results.
628 // we can probably afford being critical as there is
629 // MPI communication overhead in other threads.
630{
631 (*REF.p_IntHelper)[i].ResultsPendProcess.emplace_back(g[*n].value,
632 g[edge(*v, *n,
633 g).first].value, // index to UID :-)
634 (unsigned long) ((unsigned long) get(get(boost::vertex_index, g), *n) + N_total_nodes * ((unsigned long) MYPROCN) ) ,
635 (unsigned long) get(get(boost::vertex_index, g), local_v),
636 (unsigned long) get(get(boost::vertex_owner, g), local_v));
637}
638 } else { error_report("Push back mechanism for local nodes has failed"); };
639 } else { // Case (2.A): We 'see' this neighbor because it is connected
640 // via an edge that we own. Get the edge and record who is the other node's owner
641 PRINTF_DBG("Accesing values 4\n");std::cout<<std::flush;
642
643#pragma omp critical
644 {
645 REF.p_ParHelper->data[i].MissingA[OmpHelperN.MY_THREAD_n].emplace_back(g[local_e].value,
646 (int) get(MapHelper.NodeOwner,
647 *n),
648 (int) get(get(boost::vertex_index,
649 g), *n));
650 }
651 }
652 }
653 // Finally please contemplate the 3rd case:
654 // we have in-edges that are not available
655 // locally so we have neighbors that cant
656 // be indexed by 'neighbors'
657 OpenMPHelper OmpHelperE(M, 0, OmpHelperN.N_THREADS_n, OmpHelperN.MY_THREAD_n);
658 int j = 0;
659 long central_ix;
660#pragma omp critical
661 {
662 central_ix = get(get(boost::vertex_index,g), *v);
663 }
664 for (auto e = in_edges.first; e != in_edges.second; ++e) {
665 if ((j >= OmpHelperE.MY_OFFSET_n) && (j < OmpHelperE.MY_OFFSET_n + OmpHelperE.MY_LENGTH_n)) {
666 auto local_e = *e;
667 auto local_v = boost::source(*e, g);
668#pragma omp critical
669 {
670 REF.p_ParHelper->data[i].MissingB[OmpHelperE.MY_THREAD_n].emplace_back(
671 (double) central_ix,
672 (int) get(MapHelper.NodeOwner,
673 local_v),
674 (int) get(get(boost::vertex_index,
675 g), local_v));
676 }
677 }
678 ++j;
679 }
680 } // end of the nested parallelism! :-)
681 if (ready4int){
682#pragma omp critical
683 {
684 READY_FOR_INTEGRATION.first.push(i);
685 READY_FOR_INTEGRATION.second.push(ui);
686 }
687#pragma omp atomic update
688 ++TOT;
689 PRINTF_DBG("Increased by one the number of total vertex\n");std::cout<<std::flush;
690 } else {
691#pragma omp critical
692 {
693 CHECKED.first.push(i); // Adding the index to the list of checked indexes ;-)
694 CHECKED.second.push(ui); // Adding the index to the list of checked indexes ;-)
695 }
696 PRINTF_DBG("Added ix %d to CHECKED\n", i);std::cout<<std::flush;
697 }
698 }
699 PRINTF_DBG(" I am thread %d (FOR WORKER) and I have finished doing my job ;-)\n",omp_get_thread_num());
700 PRINTF_DBG("This thread is a for worker that has ended :-)\n");
701 }
702
703 PRINTF_DBG("about to enter 'omp critical unclaimed'\n"); std::cout << std::flush;
704#pragma omp critical
705 {
706 am_i_first = is_unclaimed;
707 if (is_unclaimed){
708 is_unclaimed = false;
709 PRINTF_DBG("Claimed by thread %ld of processor %d\n", OmpHelper.MY_THREAD_n, REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
710 }
711 }
712
713 if (am_i_first) {
714 // Prepare vars
715 int atomical_int;
716 bool are_we_over = false;
717
718 // While our Process is not over, I am the official responder :-)
719#pragma omp atomic read
720 atomical_int = TOT;
721 PRINTF_DBG("Already checking if we are over, proc %d\n", REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
722 are_we_over = (atomical_int >= NVtot); // TODO: bug. it should be exactly equal (looks at perform_requests)
723 int notreadyyet=0;
724
725 // FIRST are we over: checking that all nodes have been processed
726 while (!are_we_over){
727 // Re-check if we are over
728#pragma omp atomic read
729 atomical_int = TOT;
730 are_we_over = (atomical_int >= NVtot); // TODO: same here ;-).
731 ++notreadyyet;
732 mssleep(DT);
733 PRINTF_DBG("not ready yet!\n");
734 }
735
736 // SECOND are we over: checking that all perform requesters have exited
737 int aux1,aux2;
738#pragma omp atomic read
739 aux1 = request_performers;
740#pragma omp atomic read
741 aux2 = request_performers_ended;
742 are_we_over = (aux1 == aux2);
743 while (!are_we_over){
744#pragma omp atomic read
745 aux2 = request_performers_ended;
746 are_we_over = (aux1 == aux2);
747 mssleep(DT);
748 }
749
750 PRINTF_DBG("\n\n\n\n\n\n\WE WERE OVER! NVtot and TOT are: %d & %d\n\n\n\n", NVtot, TOT);
751 PRINTF_DBG("Before being ready, we waited for %d laps!\n", notreadyyet);
752 std::cout << std::flush;
753
754
755 int worldsize = REF.p_ComHelper->WORLD_SIZE[OmpHelper.MY_THREAD_n];
756 int worldrank = REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n];
757
758 PRINTF_DBG("\nabout to 1st barrier says P:%d\n", worldrank);std::cout<<std::flush;
759 MPI_Barrier(MPI_COMM_WORLD);
760
761 PRINTF_DBG("The (FIRST) cherry of the cake ;-)\n");std::cout<<std::flush;
762#pragma omp atomic write
763 keep_responding = false;
764
765 bool readme;
766#pragma omp atomic read
767 readme = keep_responding;
768 PRINTF_DBG("keep_responding was now set as %d\n",readme);std::cout<<std::flush;
769
770 int auxy1, auxy2;
771#pragma omp atomic read
772 auxy1 = active_responders;
773#pragma omp atomic read
774 auxy2 = finalized_responders;
775 while (auxy1 > auxy2){
776 mssleep(DT);
777#pragma omp atomic read
778 auxy2 = finalized_responders;
779#pragma omp atomic read
780 auxy1 = active_responders;
781 PRINTF_DBG("active (total) are %d and finalized are %d\n", auxy1, auxy2);
782 }
783 PRINTF_DBG("\nabout to 2nd barrier says P:%d\n", worldrank);std::cout<<std::flush;
784 MPI_Barrier(MPI_COMM_WORLD);
785 PRINTF_DBG("\nEnded barrier says P:%d\n", worldrank);std::cout<<std::flush;
786
787 PRINTF_DBG("The (SECOND) cherry of the cake (-;\n");std::cout<<std::flush;
788
789 } else if (OmpHelper.MY_THREAD_n % 2 == 0) {
790 bool atomic_bool;
791#pragma omp atomic read
792 atomic_bool = keep_responding;
793 bool later_mark_finalized = false;
794 if (atomic_bool){
795#pragma omp atomic update
796 active_responders ++;
797 later_mark_finalized = true;
798 }
800 EdgesRequester RO_edges;
801 while (atomic_bool){
802 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
803 OmpHelper.MY_THREAD_n,
804 RO);
805 generic_answer_requests<DT, TIMETOL, BATCH, EdgesRequester>(REF,
806 OmpHelper.MY_THREAD_n,
807 RO_edges);
808#pragma omp atomic read
809 atomic_bool = keep_responding;
810 PRINTF_DBG("so far I keep responding U.u cuz keep responding was %d\n", atomic_bool);
811 }
812 if (later_mark_finalized) {
813#pragma omp atomic update
814 finalized_responders++;
815 }
816 }
817
818 // Integration section: this is only exited when there are no more pending integration remaining :-)
819 PRINTF_DBG("starting to contribute\n");std::cout<<std::flush;
820 int auxv1,auxv2;
821#pragma omp atomic read
822 auxv1 = request_performers;
823#pragma omp atomic read
824 auxv2 = request_performers_ended;
825 bool keep_integrating = (auxv1 > auxv2);
826 while (keep_integrating) { // V2
827 contribute_to_integration<DIFFEQ, SOLVER>(REF, solver,
828 REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
829#pragma omp atomic read
830 auxv1 = request_performers;
831#pragma omp atomic read
832 auxv2 = request_performers_ended;
833 keep_integrating = (auxv1 > auxv2);
834 mssleep(DT);
835 } // V2
836 contribute_to_integration<DIFFEQ, SOLVER>(REF, solver,
837 REF.p_ComHelper->WORLD_RANK[OmpHelper.MY_THREAD_n]);
838 PRINTF_DBG("ending to contribute\n");std::cout<<std::flush;
839
840} // end of the parallel construct
841
842 PRINTF_DBG("Exited the parallel construct! solver deg is %d\n", solver.deg);
843
844 // IF FIRST LAP THEN START FROM i=2 after running the previous part,
845 // else skip the previous part and start from i == 1;
846 // #WARNING This enhacement is only possible if the edge values are not changing over time!
847
848 for (int i=2; i < solver.deg+1 ; ++i){
849 PRINTF_DBG("entering the for, i is %d\n",i);
850 if (solver.requires_communication){
851 bool keep_responding = true;
852 int Ncapturers = 0;
853 int NFinalizedCapturers = 0;
854 int Nresponders = 0;
855 int NFinalizedResponders = 0;
856 long pending = (long) NVtot;
857 std::queue<long> CAPTURED;
858 for (long k=0; k<pending; ++k) CAPTURED.push(k);
859#pragma omp parallel
860{
861 if (omp_get_thread_num() == 0) {
862 int v_Ncapturers = 0;
863 int v_NFinalizedCapturers = 0;
864 int v_Nresponders = 0;
865 int v_NFinalizedResponders = 0;
866 long locallyPending;
867#pragma omp atomic read
868 v_Ncapturers = Ncapturers;
869 while (v_Ncapturers == 0){
870#pragma omp atomic read
871 v_Ncapturers = Ncapturers;
872 }
873#pragma omp atomic read
874 v_NFinalizedCapturers = NFinalizedCapturers;
875 while (v_Ncapturers > v_NFinalizedCapturers){
876#pragma omp atomic read
877 v_Ncapturers = Ncapturers;
878#pragma omp atomic read
879 v_NFinalizedCapturers = NFinalizedCapturers;
880 mssleep(DT);
881 }
882 //std::cout << "about to implement the first integration barrier "<< v_Ncapturers << std::endl;
883 MPI_Barrier(MPI_COMM_WORLD);
884#pragma omp atomic write
885 keep_responding = false;
886#pragma omp atomic read
887 v_Nresponders = Nresponders;
888 while (v_Nresponders == 0){
889#pragma omp atomic read
890 v_Nresponders = Nresponders;
891 }
892#pragma omp atomic read
893 v_NFinalizedResponders = NFinalizedResponders;
894 while (v_Nresponders > v_NFinalizedResponders){
895#pragma omp atomic read
896 v_Nresponders = Nresponders;
897#pragma omp atomic read
898 v_NFinalizedResponders = NFinalizedResponders;
899 mssleep(DT);
900 }
901 //std::cout << "about to implement the second integration barrier "<< v_Ncapturers << std::endl;
902 MPI_Barrier(MPI_COMM_WORLD);
903 } else if (omp_get_thread_num() % 2 == 1){
904
905#pragma omp atomic update
906 Ncapturers++;
907 std::queue<long> * locallyQueue;
908#pragma omp critical
909{
910 locallyQueue = &CAPTURED;
911}
912 perform_field_requests<DT, TIMETOL, BATCH>(
913 REF,
914 (int) REF.p_ComHelper->WORLD_RANK[omp_get_thread_num()],
915 i,
916 locallyQueue);
917#pragma omp atomic update
918 NFinalizedCapturers++;
919
920 } else if (omp_get_thread_num() % 2 == 0) {
921#pragma omp atomic update
922 Nresponders++;
923 bool l_keep_responding = true;
924 NodesRequester RO_ground;
925 Field0Requester RO_field0;
926 Field1Requester RO_field1;
927 Field2Requester RO_field2;
928 while (l_keep_responding){
929 if (i == 1){
930 generic_answer_requests<DT, TIMETOL, BATCH, NodesRequester>(REF,
931 (int) omp_get_thread_num(),
932 RO_ground);
933 } else if (i == 2){
934 generic_answer_requests<DT, TIMETOL, BATCH, Field0Requester>(REF,
935 (int) omp_get_thread_num(),
936 RO_field0);
937 } else if (i == 3){
938 generic_answer_requests<DT, TIMETOL, BATCH, Field1Requester>(REF,
939 (int) omp_get_thread_num(),
940 RO_field1);
941 } else if (i == 4){
942 generic_answer_requests<DT, TIMETOL, BATCH, Field2Requester>(REF,
943 (int) omp_get_thread_num(),
944 RO_field2);
945 }
946#pragma omp atomic read
947 l_keep_responding = keep_responding;
948 mssleep(DT);
949 }
950#pragma omp atomic update
951 NFinalizedResponders++;
952 }
953}
954 }
955 contribute_to_higher_integration<DIFFEQ, SOLVER, BATCH>(REF,
956 solver,
957 i);
958 }
959 // Join all the integration terms
960 PRINTF_DBG("Reached the final integration :-)\n");
961 finalize_integration<DIFFEQ, SOLVER, BATCH>(REF, solver);
962
963 // Swap temporal and main registers
964 PRINTF_DBG("starting to swap register\n");std::cout<<std::flush;
965 //register_to_value(g);
966
967 // Synchronize
968 PRINTF_DBG("About to synchronize");std::cout<<std::flush;
969 MPI_Barrier(MPI_COMM_WORLD);
970
971 // Evolve time
972 PRINTF_DBG("About to increase time by h\n");
973 solver.EvolveTime();
974
975 // Finalize
976 PRINTF_DBG("Done");
977 printf("-^-^-H-E-A-R-T-B-E-A-T-^-^-");
978 PRINTF_DBG("\n\n\n\\n\n\n\n\n\n");std::cout<<std::flush;
979 PRINTF_DBG("exited");
980}
981
982
983
984
985
986#endif //CPPPROJCT_GRAPHFUNCTIONS_H
Definition: CommunicationFunctions.h:38
Definition: GeneralSolver.h:38
Definition: CommunicationFunctions.h:30