VM2D 1.14
Vortex methods for 2D flows simulation
Loading...
Searching...
No Matches
Queue.cpp
Go to the documentation of this file.
1/*--------------------------------*- VM2D -*-----------------*---------------*\
2| ## ## ## ## #### ##### | | Version 1.14 |
3| ## ## ### ### ## ## ## ## | VM2D: Vortex Method | 2026/03/06 |
4| ## ## ## # ## ## ## ## | for 2D Flow Simulation *----------------*
5| #### ## ## ## ## ## | Open Source Code |
6| ## ## ## ###### ##### | https://www.github.com/vortexmethods/VM2D |
7| |
8| Copyright (C) 2017-2026 I. Marchevsky, K. Sokol, E. Ryatina, A. Kolganova |
9*-----------------------------------------------------------------------------*
10| File name: Queue.cpp |
11| Info: Source code of VM2D |
12| |
13| This file is part of VM2D. |
14| VM2D is free software: you can redistribute it and/or modify it |
15| under the terms of the GNU General Public License as published by |
16| the Free Software Foundation, either version 3 of the License, or |
17| (at your option) any later version. |
18| |
19| VM2D is distributed in the hope that it will be useful, but WITHOUT |
20| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
21| FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
22| for more details. |
23| |
24| You should have received a copy of the GNU General Public License |
25| along with VM2D. If not, see <http://www.gnu.org/licenses/>. |
26\*---------------------------------------------------------------------------*/
27
28
40#if defined(_WIN32)
41 #include <direct.h>
42#endif
43
44#include <fstream>
45#include <sstream>
46#include <sys/stat.h>
47#include <sys/types.h>
48
49#include "Queue.h"
50
51#include "Preprocessor.h"
52#include "StreamParser.h"
53#include "WorldGen.h"
54
55#ifdef CODE2D
56 #include "Airfoil2D.h"
57 #include "Boundary2D.h"
58 #include "MeasureVP2D.h"
59 #include "Mechanics2D.h"
60 #include "Velocity2D.h"
61 #include "Wake2D.h"
62 #include "WakeDataBase2D.h"
63 #include "World2D.h"
64#endif
65
66#ifdef CODE3D
67 #include "Body3D.h"
68 #include "Passport3D.h"
69 #include "Velocity3D.h"
70 #include "Wake3D.h"
71 #include "World3D.h"
72#endif
73
74using namespace VMlib;
75
76//Конструктор
77Queue::Queue(int& argc, char**& argv)
78{
79#ifdef USE_MPI
80 MPI_Init(&argc, &argv);
81 MPI_Comm_size(MPI_COMM_WORLD, &nProcAll);
82 MPI_Comm_rank(MPI_COMM_WORLD, &myidAll);
83 MPI_Comm_group(MPI_COMM_WORLD, &groupAll);
84#else
85 nProcAll = 1;
86 myidAll = 0;
87 groupAll = -1;
88#endif
89
90 //готовим систему к запуску
91 //(выполняется на глобальном нулевом процессоре)
92 if (myidAll == 0)
93 {
95
97
98 //Устанавливаем флаг занятости в состояние "свободен" всем процессорам,
99#ifdef USE_MPI
100 procState.resize(nProcAll, MPI_UNDEFINED);
101 procStateVar.resize(nProcAll, MPI_UNDEFINED);
102#else
103 procState.resize(nProcAll, -32766);
104 procStateVar.resize(nProcAll, -32766);
105#endif
106
107 numberOfTask.solving = 0; //число решаемых в данный момент задач
108 numberOfTask.prepared = 0; //число подготовленных к запуску задач (в том числе)
109 numberOfTask.finished = 0; //число уже отсчитанных задач
110
111 //текущий номер кванта времени
112 currentKvant = -1;
113 }//if myid==0
114}//Queue()
115
116
117//Деструктор
119{
120 info('i') << "Goodbye!" << std::endl;
121#ifdef USE_MPI
122 MPI_Finalize();
123#endif
124}//~Queue()
125
126
127//Процедура постановки новых задач на отсчет и занятие процессоров
129{
130 //Пересылаем информацию о состоянии процессоров на все компьютеры
131
132#ifdef USE_MPI
133 MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
134#else
136#endif
137
138#ifdef USE_MPI
139 if (myProcState == MPI_UNDEFINED)
140 world.reset(nullptr);
141#else
142 if (myProcState == -32766)
143 world.reset(nullptr);
144#endif
145
146 if (myidAll == 0)
147 {
148 currentKvant++;
149
150 int nfree = 0; //число свободных процессоров
151
152 //Обнуляем число готовых к старту задач
153 numberOfTask.prepared = 0;
154
155 //считаем число свободных процессоров
156 for (int i = 0; i < nProcAll; ++i)
157#ifdef USE_MPI
158 if (procState[i] == MPI_UNDEFINED)
159#else
160 if (procState[i] == -32766)
161#endif
162 nfree++;
163
164 //формируем номер следующей задачи, которая возможно будет поставлена на отсчет:
165 size_t taskFol = numberOfTask.finished + numberOfTask.solving;
166
167 //проверяем, достаточно ли свободных процессоров для запуска еще одной задачи
168 //при условии, что не все задачи уже решены
169 while ((taskFol < task.size()) && (nfree >= task[taskFol].nProc))
170 {
171 int p = 0; //число найденных свободных процессоров под задачу
172 int j = 0; //текущий счетчик процессоров
173
174 //подбираем свободные процессоры
175 do
176 {
177#ifdef USE_MPI
178 if (procState[j] == MPI_UNDEFINED)
179#else
180 if (procState[j] == -32766)
181#endif
182 {
183 //состояние процессора устанавливаем в "занят текущей задачей"
184 procState[j] = static_cast<int>(taskFol);
185
186 //номер процессора посылаем в перечень процессоров, решающих текущую задачу
187 task[taskFol].proc[p] = j;
188
189 //увеличиваем число найденных процессоров на единицу
190 p++;
191 }//if (ProcState[j]==MPI_UNDEFINED)
192
193 //переходим к следующему процессору
194 j++;
195 } while (p < task[taskFol].nProc);
196
197 //состояние задачи устанавливаем в режим "стартует"
198 task[taskFol].state = TaskState::starting;
199
200 //отмечаем номер кванта, когда задача начала считаться
201 task[taskFol].startEndKvant.first = currentKvant;
202
203 //изменяем счетчики числа задач
204 numberOfTask.prepared++;
205 numberOfTask.solving++;
206
207 //изменяем счетчик свободных процессоров
208 nfree -= task[taskFol].nProc;
209
210 //переходим к следующей задаче
211 taskFol++;
212
213 }//while ((nfree >= Task[task_fol].nproc)&&(task_fol<Task.size()))
214
215 }//if (myidAll == 0)
216
217 //Вывод информации о занятости процессоров задачами
218 if (myidAll == 0)
219 {
220 info.endl();
221 info('i') << "ProcStates: " << std::endl;
222 for (int i = 0; i < nProcAll; ++i)
223 info('-') << "proc[" << i << "] <=> " << ((procState[i] >= 0) ? std::string("problem[" + std::to_string(procState[i]) + "]") : std::string("free")) << std::endl;
224 info.endl();
225 }
226
227 //Пересылаем информацию о состоянии процессоров на все компьютеры
228#ifdef USE_MPI
229 MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
230#else
232#endif
233
234 //Синхронизируем информацию
235#ifdef USE_MPI
236 MPI_Bcast(&numberOfTask.solving, 1, MPI_INT, 0, MPI_COMM_WORLD);
237 MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
238 MPI_Bcast(&numberOfTask.finished, 1, MPI_INT, 0, MPI_COMM_WORLD);
239#endif
240
241 //список головных процессоров на тех задачах,
242 //которые только что поставлены на счет:
243 std::vector<int> prepList;
244
245 //формируем списки prepList и pspList
246 //(выполняется на глобальном нулевом процессоре)
247 if (myidAll == 0)
248 {
249 for (size_t i = 0; i<task.size(); ++i)
250 //находим задачи в состоянии "стартует"
251 if (task[i].state == TaskState::starting)
252 {
253 //запоминаем номер того процессора, что является там головным
254 prepList.push_back(task[i].proc[0]);
255
256 //состояние задачи вереводим в "считает"
257 task[i].state = TaskState::running;
258 }
259 }//if myidAll==0
260
261
262 //Рассылаем количество стартующих задач в данном кванте на все машины
263#ifdef USE_MPI
264 MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
265#endif
266 if (myidAll > 0)
267 {
268 prepList.resize(numberOfTask.prepared);
269 }//if myid==0
270
271 //следующий фрагмент выполняется только если в данном кванте стартуют новые задачи
272 if (numberOfTask.prepared > 0)
273 {
274 //пересылаем список головных машин стартующих задач на все процессоры
275#ifdef USE_MPI
276 MPI_Bcast(prepList.data(), numberOfTask.prepared, MPI_INT, 0, MPI_COMM_WORLD);
277#endif
278
279 //формируем группу и коммуникатор стартующих головных процессоров
280#ifdef USE_MPI
281 commStarting = MPI_COMM_NULL;
282 MPI_Group_incl(groupAll, numberOfTask.prepared, prepList.data(), &groupStarting);
283 MPI_Comm_create(MPI_COMM_WORLD, groupStarting, &commStarting);
284#else
285 commStarting = 0x04000000;
286 if (numberOfTask.prepared > 0)
287 commStarting = 0;
288#endif
289
290 //следующий фрагмент кода только для стартующих процессов,
291 //которые входят в коммуникатор comm_starting
292#ifdef USE_MPI
293 if (commStarting != MPI_COMM_NULL)
294#else
295 if (commStarting != 0x04000000)
296#endif
297 {
298 //получаем номер процесса в списке стартующих
299 int myidStarting;
300
301#ifdef USE_MPI
302 MPI_Comm_rank(commStarting, &myidStarting);
303#else
304 myidStarting = 0;
305#endif
306
307 //подготовка стартующих задач
308 parallel.myidWork = 0;
309
310#ifdef CODE2D
311 world.reset(new VM2D::World2D(task[myProcState].getPassport()));
312#endif
313
314#ifdef CODE3D
315 world.reset(new VM3D::World3D(task[myProcState].getPassport()));
316#endif
317
318 //Коммуникатор головных процессоров стартующих задач
319 //выполнил свое черное дело и будет удален
320#ifdef USE_MPI
321 MPI_Comm_free(&commStarting);
322#else
323 commStarting = 0x04000000;
324#endif
325 } //if(comm_starting != MPI_COMM_NULL)
326
327 //их группа тоже больше без надобности
328#ifdef USE_MPI
329 MPI_Group_free(&groupStarting);
330#endif
331 }//if (task_prepared>0)
332
333 //формируем группу и коммуникатор считающих головных процессоров
334 //в том числе тех, которые стартуют, и тех, которые ранее входили в commStarting
335 std::vector<int> solvList; //список номеров головных процессов
336
337 if (myidAll == 0)
338 {
339 //Если нулевой процессор свободен
340 //(не является головным в решении задачи в данном кванте) -
341 //- формально присоединяем его в группу головных
342 //необходимо для корректного обмена данными
343#ifdef USE_MPI
344 if (procState[0] == MPI_UNDEFINED)
345#else
346 if (procState[0] == -32766)
347#endif
348 solvList.push_back(0);
349
350 //далее ищем считающие задачи (в том числе только что стартующие)
351 //и собираем номера их головных процессоров
352 //(собираем в порядке просмотра номеров процессоров))
353 for (int s = 0; s<nProcAll; ++s)
354#ifdef USE_MPI
355 if ((procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
356#else
357 if ((procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
358#endif
359 solvList.push_back(s);
360
361 sizeCommSolving = static_cast<int>(solvList.size());
362
363 flagFinish.clear();
364 flagFinish.resize(solvList.size());
365
366 }//if myidAll==0
367
368 //число sizeCommSolving на единицу больше, чем taskSolving, если нулевой процессор свободен
369 //и равно taskSolving, если он занят решением задачи
370 //(если он занят решением задачи, то непременно является головным)
371
372 //пересылаем число головных процессоров и их список на все компьютеры
373 //(в том числе головной процессор - независимо от его состояния)
374#ifdef USE_MPI
375 MPI_Bcast(&sizeCommSolving, 1, MPI_INT, 0, MPI_COMM_WORLD);
376#endif
377
378 if (myidAll > 0)
379 solvList.resize(sizeCommSolving);
380
381#ifdef USE_MPI
382 MPI_Bcast(solvList.data(), sizeCommSolving, MPI_INT, 0, MPI_COMM_WORLD);
383#endif
384
385
386#ifdef USE_MPI
387 commSolving = MPI_COMM_NULL;
388
389 //формируем группу и коммуникатор головных процессоров
390 MPI_Group_incl(groupAll, sizeCommSolving, solvList.data(), &groupSolving);
391 MPI_Comm_create(MPI_COMM_WORLD, groupSolving, &commSolving);
392#else
393 commSolving = 0x04000000;
394 if (sizeCommSolving > 0)
395 commSolving = 1;
396#endif
397
398 if (myidAll == 0)
399 {
400 //составление неубывающего массива номеров для верного
401 //распределения задач по процессоров, с тем чтобы задачи
402 //с прошлого кванта оказались в тех же группах, что и раньше
404 }
405
406 //Пересылаем информацию о состоянии процессоров на все компьютеры
407#ifdef USE_MPI
408 MPI_Scatter(procStateVar.data(), 1, MPI_INT, &myProcStateVar, 1, MPI_INT, 0, MPI_COMM_WORLD);
409#else
411#endif
412
413 //Расщепляем весь коммуникатор MPI_COMM_WORLD (т.е. вообще все процессоры)
414 //на коммуникаторы решаемых задач
415 //В результате все процессоры, решающие конкретную задачу,
416 //объединяются в коммуникатор comm_work
417 //Несмотря на то, что он называется везде одинаково, у каждой задачи он свой
418 //и объединяет ровно те процессоры, которые надо
419 //Процессоры, состояние которых установлено в "свободен", т.е.
420 //ProcState=MPI_UNDEFINED (=-32766) ни в один
421 //новый коммуникатор commWork не попадут
422#ifdef USE_MPI
423 MPI_Comm_split(MPI_COMM_WORLD, myProcStateVar, 0, &parallel.commWork);
424#else
425 if (myProcStateVar != -32766)
426 parallel.commWork = 0;
427 else
428 parallel.commWork = 0x04000000;
429#endif
430
431 //следующий код выполняется процессорами, участвующими в решении задач,
432 //следовательно, входящими в коммуникаторы comm_work
433#ifdef USE_MPI
434 if (parallel.commWork != MPI_COMM_NULL)
435#else
436 if (parallel.commWork != 0x04000000)
437#endif
438 {
439 //опеределяем "локальный" номер данного процессора в коммуникаторе commWork и число процессов в нем
440#ifdef USE_MPI
441 MPI_Comm_size(parallel.commWork, &parallel.nProcWork);
442 MPI_Comm_rank(parallel.commWork, &parallel.myidWork);
443#else
445 parallel.myidWork = 0;
446#endif
447 //World равен nullptr только в случае, когда выполняется первый шаг
448 //в этом случае он создан только на главном процессоре группы;
449 //на остальных происходит его создание
450#ifdef CODE2D
451 if (world == nullptr)
452 world.reset(new VM2D::World2D(task[myProcState].getPassport()));
453#endif
454
455#ifdef CODE3D
456 if (world == nullptr)
457 world.reset(new VM3D::World3D(task[myProcState].getPassport()));
458#endif
459
460
461 if ((parallel.myidWork == 0) && (world->getCurrentStep() == 0))
462 {
463#ifdef CODE2D
464 VM2D::World2D& world2D = dynamic_cast<VM2D::World2D&>(*world);
465 //Создание файлов для записи сил
466 for (size_t q = 0; q < world2D.getPassport().airfoilParams.size(); ++q)
467 world2D.GenerateMechanicsHeader(q);
468 //Создание файла для записи временной статистики
469 world2D.getTimers().GenerateStatHeader();
470#endif
471
472#ifdef CODE3D
473 VM3D::World3D& world3D = dynamic_cast<VM3D::World3D&>(*world);
474 //Создание файлов для записи сил
475 //for (size_t q = 0; q < world3D.getPassport().airfoilParams.size(); ++q)
476 // world3D.GenerateMechanicsHeader(q);
477#endif
478 }
479 }
480
481}//TaskSplit()
482
483
484//Процедура обновления состояния задач и процессоров
485void Queue::TaskUpdate()//Обновление состояния задач и процессоров
486{
487 info('i') << "------ Kvant finished ------" << std::endl;
488
489 //Код только для головных процессов, которые входят в коммуникатор commSolving
490 //т.е. выполняется только на головных процессорах, решающих задачи.
491 //Кажется, что можно было бы с тем же эфектом написать здесь if (myidWork==0),
492 //но это не так, поскольку нужно включить в себя еще нулевой процессор,
493 //который в любом случае присоединен к коммуникатору comm_solving,
494 //даже если он не решает задачу (а если решает - то он всегда головной)
495#ifdef USE_MPI
496 if (commSolving != MPI_COMM_NULL)
497 {
498 //определяем номер процессора в данном коммуникаторе -
499 // - вдруг потребуется на будущее!
500 int myidSolving;
501 MPI_Comm_rank(commSolving, &myidSolving);
502
503 //Алгоритм возвращения результатов расчета конкретной задачи
504 //Если срабатывает признак того, что решение задачи можно прекращать
505 //или не решается ни одна задача (а в комуникатор входит
506 //только 0-й процессор, присоединеннй туда насильно)
507 int stopSignal = ((parallel.commWork == MPI_COMM_NULL) || (world->isFinished())) ? 1 : 0;
508
509 //пересылаем признаки прекращения решения задач на нулевой процессор
510 //коммуникатора comm_solving - т.е. на глобальный процессор с номером 0 ---
511 // --- вот для чего его насильно присоединяли к этому коммуникатору
512 //если сам по себе он туда не входил
513 MPI_Gather(&stopSignal, 1, MPI_INT, flagFinish.data(), 1, MPI_INT, 0, commSolving);
514
515 //после пересылки информации коммуникатор comm_solving уничтожается
516 MPI_Comm_free(&commSolving);
517
518 } //if(comm_solving != MPI_COMM_NULL)
519
520 //и соответствующая ему группа тоже удаляется
521 MPI_Group_free(&groupSolving);
522
523
524 if (parallel.commWork != MPI_COMM_NULL)
525 {
526 //К этому месту все процессоры, решающие задачи, приходят уже выполнив все расчеты
527 //в рамках одного кванта времени
528 //Поэтому коммуникаторы решаемых задач нам уже ни к чему - уничтожаем их
529 MPI_Comm_free(&parallel.commWork);
530 }
531#else
532 if (commSolving != 0x04000000)
533 {
534 int myidSolving = 0;
535 int stopSignal = ((parallel.commWork == 0x04000000) || (world->isFinished())) ? 1 : 0;
536 flagFinish[0] = stopSignal;
537 commSolving = 0x04000000;
538 } //if(comm_solving != 0x04000000)
539
540 if (parallel.commWork != 0x04000000)
541 parallel.commWork = 0x04000000;
542#endif
543
544 //Обновление состояния задач и высвобождение процессоров из отсчитавших задач
545 //(выполняется на глобальном нулевом процессоре)
546 if (myidAll == 0)
547 {
548 //Список номеров задач, которые сейчас считаются
549 std::vector<int> taskList;
550
551 //если состояние "считается" запоминаем эту задачу
552 //задачи запоминаются в порядке просмотра процессоров
553 for (int s = 0; s<nProcAll; ++s)
554#ifdef USE_MPI
555 if ( (procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
556#else
557 if ( (procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
558#endif
559 taskList.push_back(procState[s]);
560
561 //Тем задачам, которые во флаге прекращения счета вернули "1",
562 //ставим состояние завершения счета.
563 //Конструкция +sizeCommSolving-taskSolving введена для смещения в массиве на единицу
564 //если нулевой процессор свободен - тогда он присоединяется формально в нулевом
565 //элементе массива flagFinish, а содержательная часть массива смещается на 1
566 //(в этом случае sizeCommSolving как раз будет на 1 больше, чем taskSolving, см. выше)
567 //если же нулевой процесс занят решением задачи, то смещения нет,
568 //поскольку в этом случае sizeCommSolving и taskSolving равны
569 for (int i = 0; i < numberOfTask.solving; ++i)
570 if (flagFinish[i + sizeCommSolving - numberOfTask.solving] == 1)
571 task[taskList[i]].state = TaskState::finishing;
572 } //if myid==0
573
574 if (myidAll == 0)
575 {
576 for (size_t i = 0; i < task.size(); ++i)
577 {
578 //освобождаем процессоры от сосчитавшихся задач
579 if (task[i].state == TaskState::finishing)
580 {
581 //устанавливаем состояние задачи в "отсчитано"
582 task[i].state = TaskState::done;
583
584 //отмечаем последний квант, когда задача считалась
585 task[i].startEndKvant.second = currentKvant;
586
587 //изменяем счетчики
588 numberOfTask.solving--;
589 numberOfTask.finished++;
590
591 //состояние процессоров, решавших данную задачу, устанавливаем в "свободен"
592 for (int p = 0; p < task[i].nProc; ++p)
593 {
594#ifdef USE_MPI
595 procState[task[i].proc[p]] = MPI_UNDEFINED;
596#else
597 procState[task[i].proc[p]] = -32766;
598#endif
599 }//for p
600 }//if (Task[i].state == 3)
601 }//for i
602
603 //определяем, надо ли еще выделять квант времени или можно заканчивать работу
604 //путем сравнения числа отсчитанных задач с общим числом задач
605 nextKvant = (numberOfTask.finished < static_cast<int>(task.size())) ? 1 : 0;
606 }//if (myid == 0)
607
608#ifdef USE_MPI
609 MPI_Bcast(&nextKvant, 1, MPI_INT, 0, MPI_COMM_WORLD);
610#endif
611}//TaskUpdate()
612
613
614//Процедура, нумерующая задачи в возрастающем порядке
616{
617 std::vector<bool> prFlag;
618 int number = -1;
619
620 prFlag.resize(nProcAll, true); //их надо просматривать
621
622 for (int i = 0; i<nProcAll; ++i)
623
624#ifdef USE_MPI
625 if (procState[i] == MPI_UNDEFINED)
626 {
627 prFlag[i] = false; //уже просмотрели
628 procStateVar[i] = MPI_UNDEFINED;
629 } //if (ProcState[i]==MPI_UNDEFINED)
630#else
631 if (procState[i] == -32766)
632 {
633 prFlag[i] = false; //уже просмотрели
634 procStateVar[i] = -32766;
635 } //if (ProcState[i]==-32766)
636#endif
637
638 for (int i = 0; i<nProcAll; ++i)
639 if (prFlag[i])
640 {
641 prFlag[i] = false;
642 number++;
643
644 for (int s = i; s<nProcAll; ++s)
645 if (procState[s] == procState[i])
646 {
647 procStateVar[s] = number;
648 prFlag[s] = false;
649 }//if (ProcState[s]==ProcState[i])
650 } //if (pr_flag[i])
651}//ConstructProcStateVar()
652
653
654// Запуск вычислительного конвейера (в рамках кванта времени)
656{
657#ifdef USE_MPI
658 if (parallel.commWork != MPI_COMM_NULL)
659#else
660 if (parallel.commWork != 0x04000000)
661#endif
662 {
663 double kvantStartWallTime; //Время на главном процессоре, когда начался очередной квант
664 double deltaWallTime;
665
666 if (parallel.myidWork == 0)
667#ifdef USE_MPI
668 kvantStartWallTime = MPI_Wtime();
669#else
670 kvantStartWallTime = omp_get_wtime();
671#endif
672
674 //if (world->getCurrentStep() == 0)
675 // world->ZeroStep();
676
677 do
678 {
679 if (!world->isFinished())
680 world->Step();
681
682 if (parallel.myidWork == 0)
683#ifdef USE_MPI
684 deltaWallTime = MPI_Wtime() - kvantStartWallTime;
685#else
686 deltaWallTime = omp_get_wtime() - kvantStartWallTime;
687#endif
688
689#ifdef USE_MPI
690 MPI_Bcast(&deltaWallTime, 1, MPI_DOUBLE, 0, parallel.commWork);
691#endif
692 }
693 //Проверка окончания кванта по времени (или завершения задачи)
694 while (deltaWallTime < kvantTime);
695
696 }//if (commWork != MPI_COMM_NULL)
697}//RunConveyer()
698
699
700//Добавление задачи в список
701void Queue::AddTask(int _nProc, std::unique_ptr<PassportGen> _passport)
702{
703 task.resize(task.size() + 1);
704 (task.end() - 1)->nProc = _nProc;
705 (task.end() - 1)->passport = std::move(_passport);
706 (task.end() - 1)->proc.resize(_nProc);
707 (task.end() - 1)->state = TaskState::waiting;
708}//AddTask(...)
709
710
711//Загрузка списка задач
712void Queue::LoadTasksList(const std::string& _tasksFile, const std::string& _mechanicsFile, const std::string& _defaultsFile, const std::string& _switchersFile)
713{
714 std::string extTasksFile = _tasksFile;
715 std::string extMechanicsFile = _mechanicsFile;
716 std::string extDefaultsFile = _defaultsFile;
717 std::string extSwitchersFile = _switchersFile;
718
719 if (
720 fileExistTest(extTasksFile, info, true, {"txt", "TXT"}) &&
721 fileExistTest(extDefaultsFile, info, true, { "txt", "TXT" }) &&
722 fileExistTest(extSwitchersFile, info, true, { "txt", "TXT" })
723 )
724 {
725 std::stringstream tasksFile(Preprocessor(extTasksFile).resultString);
726 std::stringstream defaultsFile(Preprocessor(extDefaultsFile).resultString);
727 std::stringstream switchersFile(Preprocessor(extSwitchersFile).resultString);
728 std::vector<std::string> taskFolder;
729
730 std::unique_ptr<StreamParser> parserTaskList;
731 parserTaskList.reset(new StreamParser(info, "parser", tasksFile, '(', ')'));
732
733 std::vector<std::string> alltasks;
734 parserTaskList->get("problems", alltasks);
735
736 std::ptrdiff_t nTasks = std::count_if(alltasks.begin(), alltasks.end(), [](const std::string& a) {return (a.length() > 0);});
737 info.endl();
738 info('i') << "Number of problems to be solved: " << nTasks << std::endl;
739
740 for (size_t i = 0; i < alltasks.size(); ++i)
741 {
742 if (alltasks[i].length() > 0)
743 {
744 //делим имя задачи + выражение в скобках на 2 подстроки
745 std::pair<std::string, std::string> taskLine = StreamParser::SplitString(info, alltasks[i], false);
746
747 std::string dir = taskLine.first;
748
749 info.endl();
750 info('i') << "-------- Loading problem #" << i << " (" << dir << ") --------" << std::endl;
751
752 //вторую подстроку разделяем на вектор из строк по запятым, стоящим вне фигурных скобок
753 std::vector<std::string> vecTaskLineSecond = StreamParser::StringToVector(taskLine.second, '{', '}');
754
755 //создаем парсер и связываем его с параметрами профиля
756 std::stringstream tskStream(StreamParser::VectorStringToString(vecTaskLineSecond));
757 std::unique_ptr<StreamParser> parserTask;
758
759 parserTask.reset(new StreamParser(info, "problem parameters", tskStream, defaultsFile, {"pspfile", "np", "copyPath"}));
760
761 std::string pspFile;
762 int np;
763
764 //считываем нужные параметры с учетом default-значений
765 parserTask->get("pspfile", pspFile, &defaults::defaultPspFile);
766 parserTask->get("np", np, &defaults::defaultNp);
767
768 if (np > 1) // 31/12/2023
769 {
770 info.endl();
771 info('e') << "Internal MPI-parallelization is not supported longer!" << std::endl;
772 exit(-1);
773 }
774
775 std::string copyPath;
776 parserTask->get("copyPath", copyPath, &defaults::defaultCopyPath);
777 if (copyPath.length() > 0)
778 {
779 //Копировать паспорт поручаем только одному процессу
780 if (myidAll == 0)
781 {
782 std::string initDir(copyPath.begin(), copyPath.end());
783 std::string command;
784
785#if defined(_WIN32)
786 std::replace(dir.begin(), dir.end(), '/', '\\');
787 std::replace(initDir.begin(), initDir.end(), '/', '\\');
788#endif
789
790 VMlib::CreateDirectory(dir.c_str(), "");
791
792#if defined(_WIN32)
793 command = "copy \"" + initDir + "\\*.*\" "+ dir + "\\";
794#else
795 command = "cp ./" + initDir + "/* " + dir + "/";
796#endif
797
798 std::cout << "Copying files from folder \"" << initDir << "\" to \"" << dir << "\"" << std::endl;
799
800 int systemRet = system(command.c_str());
801 if(systemRet == -1)
802 {
803 // The system method failed
804 info('e') << "problem #" << i << " (" << dir << \
805 ") copying passport system method failed" << std::endl;
806 exit(-1);
807 }
808 std::cout << "Copying OK " << std::endl << std::endl;
809
810 //copyFile(pspFile, "./" + dir + "/" + pspFile);
811 }
812 }
813
814#ifdef USE_MPI
815 MPI_Barrier(MPI_COMM_WORLD);
816#endif
817
818 std::unique_ptr<PassportGen> ptrPsp;
819
820#ifdef CODE2D
821 ptrPsp.reset(new VM2D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
822#endif
823
824#ifdef CODE3D
825 ptrPsp.reset(new VM3D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
826#endif
827
828 AddTask(np, std::move(ptrPsp));
829
830 info('i') << "-------- Problem #" << i << " (" << dir << ") is loaded --------" << std::endl << std::endl;
831 }
832 } //for i
833
834 //tasksFile.close();
835 tasksFile.clear();
836
837 //defaultsFile.close();
838 defaultsFile.clear();
839
840 //switchersFile.close();
841 switchersFile.clear();
842
843 }
844 else
845 {
846 exit(1);
847 }
848
849}//Queue::LoadTasks()
Заголовочный файл с описанием класса Airfoil.
Заголовочный файл с описанием класса Boundary.
Заголовочный файл с описанием класса MeasureVP.
Заголовочный файл с описанием класса Mechanics.
Заголовочный файл с описанием класса Preprocessor.
Заголовочный файл с описанием класса Queue.
Заголовочный файл с описанием класса StreamParser.
Заголовочный файл с описанием класса Velocity.
Заголовочный файл с описанием класса Wake.
Заголовочный файл с описанием класса WakeDataBase.
Заголовочный файл с описанием класса World2D.
Заголовочный файл с описанием класса WorldGen.
Класс, опеделяющий текущую решаемую задачу
Definition World2D.h:74
VMlib::TimersGen & getTimers() const
Возврат ссылки на временную статистику выполнения шага расчета по времени
Definition World2D.h:276
const Passport & getPassport() const
Возврат константной ссылки на паспорт
Definition World2D.h:251
void GenerateMechanicsHeader(size_t mechanicsNumber)
Definition World2D.cpp:1457
void endl()
Вывод в поток логов пустой строки
Definition LogStream.h:103
void assignStream(std::ostream *pStr_, const std::string &label_)
Связывание потока логов с потоком вывода
Definition LogStream.h:80
int nProcWork
Число процессоров, решающих конкретную задачу
Definition Parallel.h:100
int myidWork
Локальный номер процессора, решающего конкретную задачу
Definition Parallel.h:97
int commWork
Коммуникатор для решения конкретной задачи
Definition Parallel.h:93
Класс, позволяющий выполнять предварительную обработку файлов
~Queue()
Деструктор
Definition Queue.cpp:118
void ConstructProcStateVar()
Процедура, нумерующая задачи в возрастающем порядке
Definition Queue.cpp:615
const double kvantTime
Продолжительность кванта времени в секундах
Definition Queue.h:210
Queue(int &argc, char **&argv)
Конструктор
Definition Queue.cpp:77
int sizeCommSolving
Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах
Definition Queue.h:166
int myProcState
Состояние данного процессора
Definition Queue.h:196
void LoadTasksList(const std::string &_tasksFile, const std::string &_mechanicsFile, const std::string &_defaultsFile, const std::string &_switchersFile)
Загрузка списка задач
Definition Queue.cpp:712
int myProcStateVar
Состояние данного процессора
Definition Queue.h:203
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition Queue.h:189
int groupSolving
Definition Queue.h:159
int nProcAll
Общее число процессоров
Definition Queue.h:134
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition Queue.h:120
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени
int commSolving
Definition Queue.h:160
std::vector< int > flagFinish
Список возвращаемых флагов останова задачи
Definition Queue.h:128
std::vector< Task > task
Список описаний решаемых задач
Definition Queue.h:186
void RunConveyer()
Запуск вычислительного конвейера (в рамках кванта времени)
Definition Queue.cpp:655
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition Queue.h:228
int currentKvant
Номер текущего кванта времени
Definition Queue.h:213
void AddTask(int _nProc, std::unique_ptr< PassportGen > _passport)
Добавление задачи в список
Definition Queue.cpp:701
void TaskUpdate()
Процедура обновления состояния задач и процессоров
Definition Queue.cpp:485
int myidAll
Глобальный номер процессора
Definition Queue.h:131
int groupAll
Definition Queue.h:156
int groupStarting
Definition Queue.h:157
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition Queue.h:180
int commStarting
Definition Queue.h:158
void TaskSplit()
Процедура постановка новых задач на отсчет и занятие процессоров
Definition Queue.cpp:128
std::vector< int > procState
Список состояний процессоров
Definition Queue.h:108
int nextKvant
Признак необходимости выполнения следующего кванта и продолжения расчета
Definition Queue.h:220
Класс, позволяющий выполнять разбор файлов и строк с настройками и параметрами
static std::vector< std::string > StringToVector(std::string line, char openBracket='(', char closeBracket=')')
Pазбор строки, содержащей запятые, на отдельные строки
static std::pair< std::string, std::string > SplitString(LogStream &info, std::string line, bool upcase=true)
Разбор строки на пару ключ-значение
static std::string VectorStringToString(const std::vector< std::string > &_vecString)
Объединение вектора (списка) из строк в одну строку
void GenerateStatHeader()
Формирование заголовка файла временной статистики
Definition TimesGen.cpp:87
void PrintUniversalLogoToStream(std::ostream &str)
Передача в поток вывода универсальной шапки программы VM2D/VM3D.
Definition defs.cpp:92
bool fileExistTest(std::string &fileName, LogStream &info, bool exitKey=false, const std::list< std::string > &extList={})
Проверка существования файла
Definition defs.h:342
void CreateDirectory(const std::string &dir, const std::string &name)
Создание каталога
Definition defs.h:441
@ finishing
задача финиширует
@ starting
задача стартует
@ done
задача решена
@ running
задача решается
@ waiting
задача ожидает запуска
static std::ostream * defaultQueueLogStream
Поток вывода логов и ошибок очереди
Definition defs.h:221
const int defaultNp
Необходимое число процессоров для решения задачи
Definition defs.h:215
const std::string defaultCopyPath
Путь к каталогу с задачей для копирования в новые каталоги
Definition defs.h:218
static std::ostream * defaultWorld2DLogStream
Поток вывода логов и ошибок задачи
Definition defs.h:224
const std::string defaultPspFile
Имя файла с паспортом задачи
Definition defs.h:212