VM2D  1.12
Vortex methods for 2D flows simulation
Queue.cpp
Go to the documentation of this file.
1 /*--------------------------------*- VMlib -*----------------*---------------*\
2 | ## ## ## ## ## ## ## | | Version 1.12 |
3 | ## ## ### ### ## ## | VMlib: VM2D/VM3D Library | 2024/01/14 |
4 | ## ## ## # ## ## ## #### | Open Source Code *----------------*
5 | #### ## ## ## ## ## ## | https://www.github.com/vortexmethods/VM2D |
6 | ## ## ## #### ### #### | https://www.github.com/vortexmethods/VM3D |
7 | |
8 | Copyright (C) 2017-2024 Ilia Marchevsky |
9 *-----------------------------------------------------------------------------*
10 | File name: Queue.cpp |
11 | Info: Source code of VMlib |
12 | |
13 | This file is part of VMlib. |
14 | VMLib is free software: you can redistribute it and/or modify it |
15 | under the terms of the GNU General Public License as published by |
16 | the Free Software Foundation, either version 3 of the License, or |
17 | (at your option) any later version. |
18 | |
19 | VMlib is distributed in the hope that it will be useful, but WITHOUT |
20 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
21 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
22 | for more details. |
23 | |
24 | You should have received a copy of the GNU General Public License |
25 | along with VMlib. If not, see <http://www.gnu.org/licenses/>. |
26 \*---------------------------------------------------------------------------*/
27 
28 
37 #if defined(_WIN32)
38  #include <direct.h>
39 #endif
40 
41 #include <fstream>
42 #include <sstream>
43 #include <sys/stat.h>
44 #include <sys/types.h>
45 
46 #include "Queue.h"
47 
48 #include "Preprocessor.h"
49 #include "StreamParser.h"
50 #include "WorldGen.h"
51 
52 #ifdef CODE2D
53  #include "Airfoil2D.h"
54  #include "Boundary2D.h"
55  #include "MeasureVP2D.h"
56  #include "Mechanics2D.h"
57  #include "Passport2D.h"
58  #include "Velocity2D.h"
59  #include "Wake2D.h"
60  #include "WakeDataBase2D.h"
61  #include "World2D.h"
62 #endif
63 
64 #ifdef CODE3D
65  #include "Body3D.h"
66  #include "Passport3D.h"
67  #include "Velocity3D.h"
68  #include "Wake3D.h"
69  #include "World3D.h"
70 #endif
71 
72 using namespace VMlib;
73 
74 //Конструктор
75 Queue::Queue(int& argc, char**& argv, void (*_CreateMpiTypes)())
76 {
77 #ifdef USE_MPI
78  MPI_Init(&argc, &argv);
79  MPI_Comm_size(MPI_COMM_WORLD, &nProcAll);
80  MPI_Comm_rank(MPI_COMM_WORLD, &myidAll);
81  MPI_Comm_group(MPI_COMM_WORLD, &groupAll);
82  _CreateMpiTypes();
83 #else
84  nProcAll = 1;
85  myidAll = 0;
86  groupAll = -1;
87 #endif
88 
89  //готовим систему к запуску
90  //(выполняется на глобальном нулевом процессоре)
91  if (myidAll == 0)
92  {
94 
96 
97  //Устанавливаем флаг занятости в состояние "свободен" всем процессорам,
98 #ifdef USE_MPI
99  procState.resize(nProcAll, MPI_UNDEFINED);
100  procStateVar.resize(nProcAll, MPI_UNDEFINED);
101 #else
102  procState.resize(nProcAll, -32766);
103  procStateVar.resize(nProcAll, -32766);
104 #endif
105 
106  numberOfTask.solving = 0; //число решаемых в данный момент задач
107  numberOfTask.prepared = 0; //число подготовленных к запуску задач (в том числе)
108  numberOfTask.finished = 0; //число уже отсчитанных задач
109 
110  //текущий номер кванта времени
111  currentKvant = -1;
112  }//if myid==0
113 }//Queue()
114 
115 
116 //Деструктор
118 {
119  info('i') << "Goodbye!" << std::endl;
120 #ifdef USE_MPI
121  MPI_Finalize();
122 #endif
123 }//~Queue()
124 
125 
126 //Процедура постановки новых задач на отсчет и занятие процессоров
128 {
129  //Пересылаем информацию о состоянии процессоров на все компьютеры
130 
131 #ifdef USE_MPI
132  MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
133 #else
134  myProcState = procState[0];
135 #endif
136 
137 #ifdef USE_MPI
138  if (myProcState == MPI_UNDEFINED)
139  world.reset(nullptr);
140 #else
141  if (myProcState == -32766)
142  world.reset(nullptr);
143 #endif
144 
145  if (myidAll == 0)
146  {
147  currentKvant++;
148 
149  int nfree = 0; //число свободных процессоров
150 
151  //Обнуляем число готовых к старту задач
152  numberOfTask.prepared = 0;
153 
154  //считаем число свободных процессоров
155  for (int i = 0; i < nProcAll; ++i)
156 #ifdef USE_MPI
157  if (procState[i] == MPI_UNDEFINED)
158 #else
159  if (procState[i] == -32766)
160 #endif
161  nfree++;
162 
163  //формируем номер следующей задачи, которая возможно будет поставлена на отсчет:
164  size_t taskFol = numberOfTask.finished + numberOfTask.solving;
165 
166  //проверяем, достаточно ли свободных процессоров для запуска еще одной задачи
167  //при условии, что не все задачи уже решены
168  while ((taskFol < task.size()) && (nfree >= task[taskFol].nProc))
169  {
170  int p = 0; //число найденных свободных процессоров под задачу
171  int j = 0; //текущий счетчик процессоров
172 
173  //подбираем свободные процессоры
174  do
175  {
176 #ifdef USE_MPI
177  if (procState[j] == MPI_UNDEFINED)
178 #else
179  if (procState[j] == -32766)
180 #endif
181  {
182  //состояние процессора устанавливаем в "занят текущей задачей"
183  procState[j] = static_cast<int>(taskFol);
184 
185  //номер процессора посылаем в перечень процессоров, решающих текущую задачу
186  task[taskFol].proc[p] = j;
187 
188  //увеличиваем число найденных процессоров на единицу
189  p++;
190  }//if (ProcState[j]==MPI_UNDEFINED)
191 
192  //переходим к следующему процессору
193  j++;
194  } while (p < task[taskFol].nProc);
195 
196  //состояние задачи устанавливаем в режим "стартует"
197  task[taskFol].state = TaskState::starting;
198 
199  //отмечаем номер кванта, когда задача начала считаться
200  task[taskFol].startEndKvant.first = currentKvant;
201 
202  //изменяем счетчики числа задач
203  numberOfTask.prepared++;
204  numberOfTask.solving++;
205 
206  //изменяем счетчик свободных процессоров
207  nfree -= task[taskFol].nProc;
208 
209  //переходим к следующей задаче
210  taskFol++;
211 
212  }//while ((nfree >= Task[task_fol].nproc)&&(task_fol<Task.size()))
213 
214  }//if (myidAll == 0)
215 
216  //Вывод информации о занятости процессоров задачами
217  if (myidAll == 0)
218  {
219  info.endl();
220  info('i') << "ProcStates: " << std::endl;
221  for (int i = 0; i < nProcAll; ++i)
222  info('-') << "proc[" << i << "] <=> problem[" << procState[i] << "]" << std::endl;
223  info.endl();
224  }
225 
226  //Пересылаем информацию о состоянии процессоров на все компьютеры
227 #ifdef USE_MPI
228  MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
229 #else
230  myProcState = procState[0];
231 #endif
232 
233  //Синхронизируем информацию
234 #ifdef USE_MPI
235  MPI_Bcast(&numberOfTask.solving, 1, MPI_INT, 0, MPI_COMM_WORLD);
236  MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
237  MPI_Bcast(&numberOfTask.finished, 1, MPI_INT, 0, MPI_COMM_WORLD);
238 #endif
239 
240  //список головных процессоров на тех задачах,
241  //которые только что поставлены на счет:
242  std::vector<int> prepList;
243 
244  //формируем списки prepList и pspList
245  //(выполняется на глобальном нулевом процессоре)
246  if (myidAll == 0)
247  {
248  for (size_t i = 0; i<task.size(); ++i)
249  //находим задачи в состоянии "стартует"
250  if (task[i].state == TaskState::starting)
251  {
252  //запоминаем номер того процессора, что является там головным
253  prepList.push_back(task[i].proc[0]);
254 
255  //состояние задачи вереводим в "считает"
256  task[i].state = TaskState::running;
257  }
258  }//if myidAll==0
259 
260 
261  //Рассылаем количество стартующих задач в данном кванте на все машины
262 #ifdef USE_MPI
263  MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
264 #endif
265  if (myidAll > 0)
266  {
267  prepList.resize(numberOfTask.prepared);
268  }//if myid==0
269 
270  //следующий фрагмент выполняется только если в данном кванте стартуют новые задачи
271  if (numberOfTask.prepared > 0)
272  {
273  //пересылаем список головных машин стартующих задач на все процессоры
274 #ifdef USE_MPI
275  MPI_Bcast(prepList.data(), numberOfTask.prepared, MPI_INT, 0, MPI_COMM_WORLD);
276 #endif
277 
278  //формируем группу и коммуникатор стартующих головных процессоров
279 #ifdef USE_MPI
280  commStarting = MPI_COMM_NULL;
281  MPI_Group_incl(groupAll, numberOfTask.prepared, prepList.data(), &groupStarting);
282  MPI_Comm_create(MPI_COMM_WORLD, groupStarting, &commStarting);
283 #else
284  commStarting = 0x04000000;
285  if (numberOfTask.prepared > 0)
286  commStarting = 0;
287 #endif
288 
289  //следующий фрагмент кода только для стартующих процессов,
290  //которые входят в коммуникатор comm_starting
291 #ifdef USE_MPI
292  if (commStarting != MPI_COMM_NULL)
293 #else
294  if (commStarting != 0x04000000)
295 #endif
296  {
297  //получаем номер процесса в списке стартующих
298  int myidStarting;
299 
300 #ifdef USE_MPI
301  MPI_Comm_rank(commStarting, &myidStarting);
302 #else
303  myidStarting = 0;
304 #endif
305 
306  //подготовка стартующих задач
307  parallel.myidWork = 0;
308 
309 #ifdef CODE2D
310  world.reset(new VM2D::World2D(task[myProcState].getPassport()));
311 #endif
312 
313 #ifdef CODE3D
314  world.reset(new VM3D::World3D(task[myProcState].getPassport()));
315 #endif
316 
317  //Коммуникатор головных процессоров стартующих задач
318  //выполнил свое черное дело и будет удален
319 #ifdef USE_MPI
320  MPI_Comm_free(&commStarting);
321 #else
322  commStarting = 0x04000000;
323 #endif
324  } //if(comm_starting != MPI_COMM_NULL)
325 
326  //их группа тоже больше без надобности
327 #ifdef USE_MPI
328  MPI_Group_free(&groupStarting);
329 #endif
330  }//if (task_prepared>0)
331 
332  //формируем группу и коммуникатор считающих головных процессоров
333  //в том числе тех, которые стартуют, и тех, которые ранее входили в commStarting
334  std::vector<int> solvList; //список номеров головных процессов
335 
336  if (myidAll == 0)
337  {
338  //Если нулевой процессор свободен
339  //(не является головным в решении задачи в данном кванте) -
340  //- формально присоединяем его в группу головных
341  //необходимо для корректного обмена данными
342 #ifdef USE_MPI
343  if (procState[0] == MPI_UNDEFINED)
344 #else
345  if (procState[0] == -32766)
346 #endif
347  solvList.push_back(0);
348 
349  //далее ищем считающие задачи (в том числе только что стартующие)
350  //и собираем номера их головных процессоров
351  //(собираем в порядке просмотра номеров процессоров))
352  for (int s = 0; s<nProcAll; ++s)
353 #ifdef USE_MPI
354  if ((procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
355 #else
356  if ((procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
357 #endif
358  solvList.push_back(s);
359 
360  sizeCommSolving = static_cast<int>(solvList.size());
361 
362  flagFinish.clear();
363  flagFinish.resize(solvList.size());
364 
365  }//if myidAll==0
366 
367  //число sizeCommSolving на единицу больше, чем taskSolving, если нулевой процессор свободен
368  //и равно taskSolving, если он занят решением задачи
369  //(если он занят решением задачи, то непременно является головным)
370 
371  //пересылаем число головных процессоров и их список на все компьютеры
372  //(в том числе головной процессор - независимо от его состояния)
373 #ifdef USE_MPI
374  MPI_Bcast(&sizeCommSolving, 1, MPI_INT, 0, MPI_COMM_WORLD);
375 #endif
376 
377  if (myidAll > 0)
378  solvList.resize(sizeCommSolving);
379 
380 #ifdef USE_MPI
381  MPI_Bcast(solvList.data(), sizeCommSolving, MPI_INT, 0, MPI_COMM_WORLD);
382 #endif
383 
384 
385 #ifdef USE_MPI
386  commSolving = MPI_COMM_NULL;
387 
388  //формируем группу и коммуникатор головных процессоров
389  MPI_Group_incl(groupAll, sizeCommSolving, solvList.data(), &groupSolving);
390  MPI_Comm_create(MPI_COMM_WORLD, groupSolving, &commSolving);
391 #else
392  commSolving = 0x04000000;
393  if (sizeCommSolving > 0)
394  commSolving = 1;
395 #endif
396 
397  if (myidAll == 0)
398  {
399  //составление неубывающего массива номеров для верного
400  //распределения задач по процессоров, с тем чтобы задачи
401  //с прошлого кванта оказались в тех же группах, что и раньше
403  }
404 
405  //Пересылаем информацию о состоянии процессоров на все компьютеры
406 #ifdef USE_MPI
407  MPI_Scatter(procStateVar.data(), 1, MPI_INT, &myProcStateVar, 1, MPI_INT, 0, MPI_COMM_WORLD);
408 #else
409  myProcStateVar = procStateVar[0];
410 #endif
411 
412  //Расщепляем весь коммуникатор MPI_COMM_WORLD (т.е. вообще все процессоры)
413  //на коммуникаторы решаемых задач
414  //В результате все процессоры, решающие конкретную задачу,
415  //объединяются в коммуникатор comm_work
416  //Несмотря на то, что он называется везде одинаково, у каждой задачи он свой
417  //и объединяет ровно те процессоры, которые надо
418  //Процессоры, состояние которых установлено в "свободен", т.е.
419  //ProcState=MPI_UNDEFINED (=-32766) ни в один
420  //новый коммуникатор commWork не попадут
421 #ifdef USE_MPI
422  MPI_Comm_split(MPI_COMM_WORLD, myProcStateVar, 0, &parallel.commWork);
423 #else
424  if (myProcStateVar != -32766)
425  parallel.commWork = 0;
426  else
427  parallel.commWork = 0x04000000;
428 #endif
429 
430  //следующий код выполняется процессорами, участвующими в решении задач,
431  //следовательно, входящими в коммуникаторы comm_work
432 #ifdef USE_MPI
433  if (parallel.commWork != MPI_COMM_NULL)
434 #else
435  if (parallel.commWork != 0x04000000)
436 #endif
437  {
438  //опеределяем "локальный" номер данного процессора в коммуникаторе commWork и число процессов в нем
439 #ifdef USE_MPI
440  MPI_Comm_size(parallel.commWork, &parallel.nProcWork);
441  MPI_Comm_rank(parallel.commWork, &parallel.myidWork);
442 #else
443  parallel.nProcWork = 1;
444  parallel.myidWork = 0;
445 #endif
446  //World равен nullptr только в случае, когда выполняется первый шаг
447  //в этом случае он создан только на главном процессоре группы;
448  //на остальных происходит его создание
449 #ifdef CODE2D
450  if (world == nullptr)
451  world.reset(new VM2D::World2D(task[myProcState].getPassport()));
452 #endif
453 
454 #ifdef CODE3D
455  if (world == nullptr)
456  world.reset(new VM3D::World3D(task[myProcState].getPassport()));
457 #endif
458 
459 
460  //Синхронизация параметров
461 #ifdef USE_MPI
462  MPI_Bcast(&(world->getPassportGen().timeDiscretizationProperties.currTime), 1, MPI_DOUBLE, 0, parallel.commWork);
463 #endif
464 
465  if ((parallel.myidWork == 0) && (world->getCurrentStep() == 0))
466  {
467 #ifdef CODE2D
468  VM2D::World2D& world2D = dynamic_cast<VM2D::World2D&>(*world);
469  //Создание файлов для записи сил
470  for (size_t q = 0; q < world2D.getPassport().airfoilParams.size(); ++q)
471  world2D.GenerateMechanicsHeader(q);
472  //Создание файла для записи временной статистики
473  world2D.getTimestat().GenerateStatHeader();
474 #endif
475 
476 #ifdef CODE3D
477  VM3D::World3D& world3D = dynamic_cast<VM3D::World3D&>(*world);
478  //Создание файлов для записи сил
479  //for (size_t q = 0; q < world3D.getPassport().airfoilParams.size(); ++q)
480  // world3D.GenerateMechanicsHeader(q);
481 
482  //Создание файла для записи временной статистики
483  world3D.getTimestat().GenerateStatHeader();
484 #endif
485  }
486  }
487 
488 }//TaskSplit()
489 
490 
491 //Процедура обновления состояния задач и процессоров
492 void Queue::TaskUpdate()//Обновление состояния задач и процессоров
493 {
494  info('i') << "------ Kvant finished ------" << std::endl;
495 
496  //Код только для головных процессов, которые входят в коммуникатор commSolving
497  //т.е. выполняется только на головных процессорах, решающих задачи.
498  //Кажется, что можно было бы с тем же эфектом написать здесь if (myidWork==0),
499  //но это не так, поскольку нужно включить в себя еще нулевой процессор,
500  //который в любом случае присоединен к коммуникатору comm_solving,
501  //даже если он не решает задачу (а если решает - то он всегда головной)
502 #ifdef USE_MPI
503  if (commSolving != MPI_COMM_NULL)
504  {
505  //определяем номер процессора в данном коммуникаторе -
506  // - вдруг потребуется на будущее!
507  int myidSolving;
508  MPI_Comm_rank(commSolving, &myidSolving);
509 
510  //Алгоритм возвращения результатов расчета конкретной задачи
511  //Если срабатывает признак того, что решение задачи можно прекращать
512  //или не решается ни одна задача (а в комуникатор входит
513  //только 0-й процессор, присоединеннй туда насильно)
514  int stopSignal = ((parallel.commWork == MPI_COMM_NULL) || (world->isFinished())) ? 1 : 0;
515 
516  //пересылаем признаки прекращения решения задач на нулевой процессор
517  //коммуникатора comm_solving - т.е. на глобальный процессор с номером 0 ---
518  // --- вот для чего его насильно присоединяли к этому коммуникатору
519  //если сам по себе он туда не входил
520  MPI_Gather(&stopSignal, 1, MPI_INT, flagFinish.data(), 1, MPI_INT, 0, commSolving);
521 
522  //после пересылки информации коммуникатор comm_solving уничтожается
523  MPI_Comm_free(&commSolving);
524 
525  } //if(comm_solving != MPI_COMM_NULL)
526 
527  //и соответствующая ему группа тоже удаляется
528  MPI_Group_free(&groupSolving);
529 
530 
531  if (parallel.commWork != MPI_COMM_NULL)
532  {
533  //К этому месту все процессоры, решающие задачи, приходят уже выполнив все расчеты
534  //в рамках одного кванта времени
535  //Поэтому коммуникаторы решаемых задач нам уже ни к чему - уничтожаем их
536  MPI_Comm_free(&parallel.commWork);
537  }
538 #else
539  if (commSolving != 0x04000000)
540  {
541  int myidSolving = 0;
542  int stopSignal = ((parallel.commWork == 0x04000000) || (world->isFinished())) ? 1 : 0;
543  flagFinish[0] = stopSignal;
544  commSolving = 0x04000000;
545  } //if(comm_solving != 0x04000000)
546 
547  if (parallel.commWork != 0x04000000)
548  parallel.commWork = 0x04000000;
549 #endif
550 
551  //Обновление состояния задач и высвобождение процессоров из отсчитавших задач
552  //(выполняется на глобальном нулевом процессоре)
553  if (myidAll == 0)
554  {
555  //Список номеров задач, которые сейчас считаются
556  std::vector<int> taskList;
557 
558  //если состояние "считается" запоминаем эту задачу
559  //задачи запоминаются в порядке просмотра процессоров
560  for (int s = 0; s<nProcAll; ++s)
561 #ifdef USE_MPI
562  if ( (procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
563 #else
564  if ( (procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
565 #endif
566  taskList.push_back(procState[s]);
567 
568  //Тем задачам, которые во флаге прекращения счета вернули "1",
569  //ставим состояние завершения счета.
570  //Конструкция +sizeCommSolving-taskSolving введена для смещения в массиве на единицу
571  //если нулевой процессор свободен - тогда он присоединяется формально в нулевом
572  //элементе массива flagFinish, а содержательная часть массива смещается на 1
573  //(в этом случае sizeCommSolving как раз будет на 1 больше, чем taskSolving, см. выше)
574  //если же нулевой процесс занят решением задачи, то смещения нет,
575  //поскольку в этом случае sizeCommSolving и taskSolving равны
576  for (int i = 0; i < numberOfTask.solving; ++i)
577  if (flagFinish[i + sizeCommSolving - numberOfTask.solving] == 1)
578  task[taskList[i]].state = TaskState::finishing;
579  } //if myid==0
580 
581  if (myidAll == 0)
582  {
583  for (size_t i = 0; i < task.size(); ++i)
584  {
585  //освобождаем процессоры от сосчитавшихся задач
586  if (task[i].state == TaskState::finishing)
587  {
588  //устанавливаем состояние задачи в "отсчитано"
589  task[i].state = TaskState::done;
590 
591  //отмечаем последний квант, когда задача считалась
592  task[i].startEndKvant.second = currentKvant;
593 
594  //изменяем счетчики
595  numberOfTask.solving--;
596  numberOfTask.finished++;
597 
598  //состояние процессоров, решавших данную задачу, устанавливаем в "свободен"
599  for (int p = 0; p < task[i].nProc; ++p)
600  {
601 #ifdef USE_MPI
602  procState[task[i].proc[p]] = MPI_UNDEFINED;
603 #else
604  procState[task[i].proc[p]] = -32766;
605 #endif
606  }//for p
607  }//if (Task[i].state == 3)
608  }//for i
609 
610  //определяем, надо ли еще выделять квант времени или можно заканчивать работу
611  //путем сравнения числа отсчитанных задач с общим числом задач
612  nextKvant = (numberOfTask.finished < static_cast<int>(task.size())) ? 1 : 0;
613  }//if (myid == 0)
614 
615 #ifdef USE_MPI
616  MPI_Bcast(&nextKvant, 1, MPI_INT, 0, MPI_COMM_WORLD);
617 #endif
618 }//TaskUpdate()
619 
620 
621 //Процедура, нумерующая задачи в возрастающем порядке
623 {
624  std::vector<bool> prFlag;
625  int number = -1;
626 
627  prFlag.resize(nProcAll, true); //их надо просматривать
628 
629  for (int i = 0; i<nProcAll; ++i)
630 
631 #ifdef USE_MPI
632  if (procState[i] == MPI_UNDEFINED)
633  {
634  prFlag[i] = false; //уже просмотрели
635  procStateVar[i] = MPI_UNDEFINED;
636  } //if (ProcState[i]==MPI_UNDEFINED)
637 #else
638  if (procState[i] == -32766)
639  {
640  prFlag[i] = false; //уже просмотрели
641  procStateVar[i] = -32766;
642  } //if (ProcState[i]==-32766)
643 #endif
644 
645  for (int i = 0; i<nProcAll; ++i)
646  if (prFlag[i])
647  {
648  prFlag[i] = false;
649  number++;
650 
651  for (int s = i; s<nProcAll; ++s)
652  if (procState[s] == procState[i])
653  {
654  procStateVar[s] = number;
655  prFlag[s] = false;
656  }//if (ProcState[s]==ProcState[i])
657  } //if (pr_flag[i])
658 }//ConstructProcStateVar()
659 
660 
661 // Запуск вычислительного конвейера (в рамках кванта времени)
663 {
664 #ifdef USE_MPI
665  if (parallel.commWork != MPI_COMM_NULL)
666 #else
667  if (parallel.commWork != 0x04000000)
668 #endif
669  {
670  double kvantStartWallTime; //Время на главном процессоре, когда начался очередной квант
671  double deltaWallTime;
672 
673  if (parallel.myidWork == 0)
674 #ifdef USE_MPI
675  kvantStartWallTime = MPI_Wtime();
676 #else
677  kvantStartWallTime = omp_get_wtime();
678 #endif
679 
681  //if (world->getCurrentStep() == 0)
682  // world->ZeroStep();
683 
684  do
685  {
686  if (!world->isFinished())
687  world->Step();
688 
689  if (parallel.myidWork == 0)
690 #ifdef USE_MPI
691  deltaWallTime = MPI_Wtime() - kvantStartWallTime;
692 #else
693  deltaWallTime = omp_get_wtime() - kvantStartWallTime;
694 #endif
695 
696 #ifdef USE_MPI
697  MPI_Bcast(&deltaWallTime, 1, MPI_DOUBLE, 0, parallel.commWork);
698 #endif
699  }
700  //Проверка окончания кванта по времени (или завершения задачи)
701  while (deltaWallTime < kvantTime);
702 
703  }//if (commWork != MPI_COMM_NULL)
704 }//RunConveyer()
705 
706 
707 //Добавление задачи в список
708 void Queue::AddTask(int _nProc, std::unique_ptr<PassportGen> _passport)
709 {
710  task.resize(task.size() + 1);
711  (task.end() - 1)->nProc = _nProc;
712  (task.end() - 1)->passport = std::move(_passport);
713  (task.end() - 1)->proc.resize(_nProc);
714  (task.end() - 1)->state = TaskState::waiting;
715 }//AddTask(...)
716 
717 
718 //Загрузка списка задач
719 void Queue::LoadTasksList(const std::string& _tasksFile, const std::string& _mechanicsFile, const std::string& _defaultsFile, const std::string& _switchersFile)
720 {
721  std::string extTasksFile = _tasksFile;
722  std::string extMechanicsFile = _mechanicsFile;
723  std::string extDefaultsFile = _defaultsFile;
724  std::string extSwitchersFile = _switchersFile;
725 
726  if (
727  fileExistTest(extTasksFile, info, {"txt", "TXT"}) &&
728  fileExistTest(extDefaultsFile, info, { "txt", "TXT" }) &&
729  fileExistTest(extSwitchersFile, info, { "txt", "TXT" })
730  )
731  {
732  std::stringstream tasksFile(Preprocessor(extTasksFile).resultString);
733  std::stringstream defaultsFile(Preprocessor(extDefaultsFile).resultString);
734  std::stringstream switchersFile(Preprocessor(extSwitchersFile).resultString);
735  std::vector<std::string> taskFolder;
736 
737  std::unique_ptr<StreamParser> parserTaskList;
738  parserTaskList.reset(new StreamParser(info, "parser", tasksFile, '(', ')'));
739 
740  std::vector<std::string> alltasks;
741  parserTaskList->get("problems", alltasks);
742 
743  std::ptrdiff_t nTasks = std::count_if(alltasks.begin(), alltasks.end(), [](const std::string& a) {return (a.length() > 0);});
744  info.endl();
745  info('i') << "Number of problems to be solved: " << nTasks << std::endl;
746 
747  for (size_t i = 0; i < alltasks.size(); ++i)
748  {
749  if (alltasks[i].length() > 0)
750  {
751  //делим имя задачи + выражение в скобках на 2 подстроки
752  std::pair<std::string, std::string> taskLine = StreamParser::SplitString(info, alltasks[i], false);
753 
754  std::string dir = taskLine.first;
755 
756  info.endl();
757  info('i') << "-------- Loading problem #" << i << " (" << dir << ") --------" << std::endl;
758 
759  //вторую подстроку разделяем на вектор из строк по запятым, стоящим вне фигурных скобок
760  std::vector<std::string> vecTaskLineSecond = StreamParser::StringToVector(taskLine.second, '{', '}');
761 
762  //создаем парсер и связываем его с параметрами профиля
763  std::stringstream tskStream(StreamParser::VectorStringToString(vecTaskLineSecond));
764  std::unique_ptr<StreamParser> parserTask;
765 
766  parserTask.reset(new StreamParser(info, "problem parameters", tskStream, defaultsFile, {"pspfile", "np", "copyPath"}));
767 
768  std::string pspFile;
769  int np;
770 
771  //считываем нужные параметры с учетом default-значений
772  parserTask->get("pspfile", pspFile, &defaults::defaultPspFile);
773  parserTask->get("np", np, &defaults::defaultNp);
774 
775  if (np > 1) // 31/12/2023
776  {
777  info.endl();
778  info('e') << "Internal MPI-parallelization is not supported longer!" << std::endl;
779  }
780 
781  std::string copyPath;
782  parserTask->get("copyPath", copyPath, &defaults::defaultCopyPath);
783  if (copyPath.length() > 0)
784  {
785  //Копировать паспорт поручаем только одному процессу
786  if (myidAll == 0)
787  {
788  std::string initDir(copyPath.begin(), copyPath.end());
789  std::string command;
790 
791 #if defined(_WIN32)
792  std::replace(dir.begin(), dir.end(), '/', '\\');
793  std::replace(initDir.begin(), initDir.end(), '/', '\\');
794 #endif
795 
796  VMlib::CreateDirectory(dir.c_str(), "");
797 
798 #if defined(_WIN32)
799  command = "copy \"" + initDir + "\\*.*\" "+ dir + "\\";
800 #else
801  command = "cp ./" + initDir + "/* " + dir + "/";
802 #endif
803 
804  std::cout << "Copying files from folder \"" << initDir << "\" to \"" << dir << "\"" << std::endl;
805 
806  int systemRet = system(command.c_str());
807  if(systemRet == -1)
808  {
809  // The system method failed
810  info('e') << "problem #" << i << " (" << dir << \
811  ") copying passport system method failed" << std::endl;
812  exit(-1);
813  }
814  std::cout << "Copying OK " << std::endl << std::endl;
815 
816  //copyFile(pspFile, "./" + dir + "/" + pspFile);
817  }
818  }
819 
820 #ifdef USE_MPI
821  MPI_Barrier(MPI_COMM_WORLD);
822 #endif
823 
824  std::unique_ptr<PassportGen> ptrPsp;
825 
826 #ifdef CODE2D
827  ptrPsp.reset(new VM2D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
828 #endif
829 
830 #ifdef CODE3D
831  ptrPsp.reset(new VM3D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
832 #endif
833 
834  AddTask(np, std::move(ptrPsp));
835 
836  info('i') << "-------- Problem #" << i << " (" << dir << ") is loaded --------" << std::endl << std::endl;
837  }
838  } //for i
839 
840  //tasksFile.close();
841  tasksFile.clear();
842 
843  //defaultsFile.close();
844  defaultsFile.clear();
845 
846  //switchersFile.close();
847  switchersFile.clear();
848 
849  }
850  else
851  {
852  exit(1);
853  }
854 
855 }//Queue::LoadTasks()
const std::string defaultPspFile
Имя файла с паспортом задачи
Definition: defs.h:201
Заголовочный файл с описанием класса Passport (двумерный) и cоответствующими структурами ...
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177
Times & getTimestat() const
Возврат ссылки на временную статистику выполнения шага расчета по времени
Definition: World2D.h:242
static std::ostream * defaultWorld2DLogStream
Поток вывода логов и ошибок задачи
Definition: defs.h:213
int myProcState
Состояние данного процессора
Definition: Queue.h:193
const double kvantTime
Продолжительность кванта времени в секундах
Definition: Queue.h:207
~Queue()
Деструктор
Definition: Queue.cpp:117
const std::string defaultCopyPath
Путь к каталогу с задачей для копирования в новые каталоги
Definition: defs.h:207
int myidWork
Локальный номер процессора, решающего конкретную задачу
Definition: Parallel.h:94
void TaskSplit()
Процедура постановка новых задач на отсчет и занятие процессоров
Definition: Queue.cpp:127
Заголовочный файл с описанием класса Wake.
int groupSolving
Definition: Queue.h:156
Заголовочный файл с описанием класса World2D.
задача ожидает запуска
задача решена
Заголовочный файл с описанием класса Airfoil.
void assignStream(std::ostream *pStr_, const std::string &label_)
Связывание потока логов с потоком вывода
Definition: LogStream.h:77
Заголовочный файл с описанием класса WorldGen.
int commStarting
Definition: Queue.h:155
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition: Queue.h:117
Заголовочный файл с описанием класса WakeDataBase.
int commSolving
Definition: Queue.h:157
void AddTask(int _nProc, std::unique_ptr< PassportGen > _passport)
Добавление задачи в список
Definition: Queue.cpp:708
Заголовочный файл с описанием класса Preprocessor.
Класс, позволяющий выполнять предварительную обработку файлов
Definition: Preprocessor.h:59
Заголовочный файл с описанием класса Mechanics.
int myidAll
Глобальный номер процессора
Definition: Queue.h:128
int commWork
Коммуникатор для решения конкретной задачи
Definition: Parallel.h:90
void RunConveyer()
Запуск вычислительного конвейера (в рамках кванта времени)
Definition: Queue.cpp:662
void LoadTasksList(const std::string &_tasksFile, const std::string &_mechanicsFile, const std::string &_defaultsFile, const std::string &_switchersFile)
Загрузка списка задач
Definition: Queue.cpp:719
Класс, опеделяющий паспорт двумерной задачи
Definition: Passport2D.h:258
задача решается
int myProcStateVar
Состояние данного процессора
Definition: Queue.h:200
задача стартует
void GenerateMechanicsHeader(size_t mechanicsNumber)
Definition: World2D.cpp:725
std::vector< int > procState
Список состояний процессоров
Definition: Queue.h:105
Queue(int &argc, char **&argv, void(*_CreateMpiTypes)())
Конструктор
Definition: Queue.cpp:75
static std::vector< std::string > StringToVector(std::string line, char openBracket= '(', char closeBracket= ')')
Pазбор строки, содержащей запятые, на отдельные строки
void ConstructProcStateVar()
Процедура, нумерующая задачи в возрастающем порядке
Definition: Queue.cpp:622
std::vector< int > flagFinish
Список возвращаемых флагов останова задачи
Definition: Queue.h:125
int nProcAll
Общее число процессоров
Definition: Queue.h:131
int groupStarting
Definition: Queue.h:154
Заголовочный файл с описанием класса StreamParser.
Заголовочный файл с описанием класса Queue.
std::vector< Task > task
Список описаний решаемых задач
Definition: Queue.h:183
int currentKvant
Номер текущего кванта времени
Definition: Queue.h:210
static std::ostream * defaultQueueLogStream
Поток вывода логов и ошибок очереди
Definition: defs.h:210
bool fileExistTest(std::string &fileName, LogStream &info, const std::list< std::string > &extList={})
Проверка существования файла
Definition: defs.h:324
static std::pair< std::string, std::string > SplitString(LogStream &info, std::string line, bool upcase=true)
Разбор строки на пару ключ-значение
задача финиширует
Заголовочный файл с описанием класса MeasureVP.
int groupAll
Definition: Queue.h:153
const Passport & getPassport() const
Возврат константной ссылки на паспорт
Definition: World2D.h:222
static std::string VectorStringToString(const std::vector< std::string > &_vecString)
Объединение вектора (списка) из строк в одну строку
void TaskUpdate()
Процедура обновления состояния задач и процессоров
Definition: Queue.cpp:492
virtual void GenerateStatHeader() const override
Генерация заголовка файла временной статистики
Definition: Times2D.cpp:55
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition: Queue.h:186
Класс, опеделяющий текущую решаемую задачу
Definition: World2D.h:68
std::vector< AirfoilParams > airfoilParams
Список структур с параметрами профилей
Definition: Passport2D.h:280
Заголовочный файл с описанием класса Velocity.
void endl()
Вывод в поток логов пустой строки
Definition: LogStream.h:100
void PrintUniversalLogoToStream(std::ostream &str)
Передача в поток вывода универсальной шапки программы VM2D/VM3D.
Definition: defs.cpp:89
void CreateDirectory(const std::string &dir, const std::string &name)
Создание каталога
Definition: defs.h:414
Класс, позволяющий выполнять разбор файлов и строк с настройками и параметрами
Definition: StreamParser.h:151
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition: Queue.h:225
bool get(const std::string &name, std::vector< Point2D > &res, const std::vector< Point2D > *defValue=nullptr, bool echoDefault=true) const
Считывание вектора из двумерных точек из базы данных
Definition: StreamParser.h:314
const int defaultNp
Необходимое число процессоров для решения задачи
Definition: defs.h:204
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени
Заголовочный файл с описанием класса Boundary.
int nProcWork
Число процессоров, решающих конкретную задачу
Definition: Parallel.h:97
int sizeCommSolving
Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах ...
Definition: Queue.h:163
int nextKvant
Признак необходимости выполнения следующего кванта и продолжения расчета
Definition: Queue.h:217