VM2D  1.12
Vortex methods for 2D flows simulation
VMlib::Queue Class Reference

Класс, опеделяющий список решаемых задач и очередь их прохождения More...

#include <Queue.h>

Collaboration diagram for VMlib::Queue:

Public Member Functions

 Queue (int &argc, char **&argv, void(*_CreateMpiTypes)())
 Конструктор More...
 
 ~Queue ()
 Деструктор More...
 
void TaskSplit ()
 Процедура постановка новых задач на отсчет и занятие процессоров More...
 
void TaskUpdate ()
 Процедура обновления состояния задач и процессоров More...
 
void RunConveyer ()
 Запуск вычислительного конвейера (в рамках кванта времени) More...
 
void LoadTasksList (const std::string &_tasksFile, const std::string &_mechanicsFile, const std::string &_defaultsFile, const std::string &_switchersFile)
 Загрузка списка задач More...
 

Public Attributes

std::vector< Tasktask
 Список описаний решаемых задач More...
 
std::unique_ptr< WorldGenworld
 Умный указатель на текущую решаемую задачу More...
 
int myProcState
 Состояние данного процессора More...
 
int myProcStateVar
 Состояние данного процессора More...
 
const double kvantTime = 10.0
 Продолжительность кванта времени в секундах More...
 
int currentKvant
 Номер текущего кванта времени More...
 
int nextKvant
 Признак необходимости выполнения следующего кванта и продолжения расчета More...
 
Parallel parallel
 Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме More...
 
int prepared
 Число подготовленных к запуску задач More...
 
int solving
 Число решаемых в данный момент задач More...
 
int finished
 Число уже решенных задач More...
 

Private Member Functions

void ConstructProcStateVar ()
 Процедура, нумерующая задачи в возрастающем порядке More...
 
void AddTask (int _nProc, std::unique_ptr< PassportGen > _passport)
 Добавление задачи в список More...
 

Private Attributes

struct {
   int   prepared
 Число подготовленных к запуску задач More...
 
   int   solving
 Число решаемых в данный момент задач More...
 
   int   finished
 Число уже решенных задач More...
 
numberOfTask
 Структура, содержащая информацию о количестве задач в данный момент времени More...
 
std::vector< int > procState
 Список состояний процессоров More...
 
std::vector< int > procStateVar
 Модифицированный список состояний процессоров More...
 
std::vector< int > flagFinish
 Список возвращаемых флагов останова задачи More...
 
int myidAll
 Глобальный номер процессора More...
 
int nProcAll
 Общее число процессоров More...
 
int groupAll
 
int groupStarting
 
int commStarting
 
int groupSolving
 
int commSolving
 
int sizeCommSolving
 Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах More...
 
LogStream info
 Поток для вывода логов и сообщений об ошибках More...
 

Detailed Description

Класс, опеделяющий список решаемых задач и очередь их прохождения

Управляет распределением задач по процессорам, инициализацией их запуска и выгрузки

Author
Марчевский Илья Константинович 1.12
Date
14 января 2024 г.

Definition at line 72 of file Queue.h.

Constructor & Destructor Documentation

Queue::Queue ( int &  argc,
char **&  argv,
void(*)()  _CreateMpiTypes 
)

Конструктор

Производит инициализацию очереди перед запуском всего процесса

Parameters
[in]argcссылка на число параметров командной строки
[in]argvссылка на указатель на список параметров командной строки
[in]_CreateMpiTypesуказатель на функцию, инициализирующую все необходимые MPI-описания типов

Definition at line 75 of file Queue.cpp.

76 {
77 #ifdef USE_MPI
78  MPI_Init(&argc, &argv);
79  MPI_Comm_size(MPI_COMM_WORLD, &nProcAll);
80  MPI_Comm_rank(MPI_COMM_WORLD, &myidAll);
81  MPI_Comm_group(MPI_COMM_WORLD, &groupAll);
82  _CreateMpiTypes();
83 #else
84  nProcAll = 1;
85  myidAll = 0;
86  groupAll = -1;
87 #endif
88 
89  //готовим систему к запуску
90  //(выполняется на глобальном нулевом процессоре)
91  if (myidAll == 0)
92  {
94 
96 
97  //Устанавливаем флаг занятости в состояние "свободен" всем процессорам,
98 #ifdef USE_MPI
99  procState.resize(nProcAll, MPI_UNDEFINED);
100  procStateVar.resize(nProcAll, MPI_UNDEFINED);
101 #else
102  procState.resize(nProcAll, -32766);
103  procStateVar.resize(nProcAll, -32766);
104 #endif
105 
106  numberOfTask.solving = 0; //число решаемых в данный момент задач
107  numberOfTask.prepared = 0; //число подготовленных к запуску задач (в том числе)
108  numberOfTask.finished = 0; //число уже отсчитанных задач
109 
110  //текущий номер кванта времени
111  currentKvant = -1;
112  }//if myid==0
113 }//Queue()
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177
static std::ostream * defaultWorld2DLogStream
Поток вывода логов и ошибок задачи
Definition: defs.h:213
void assignStream(std::ostream *pStr_, const std::string &label_)
Связывание потока логов с потоком вывода
Definition: LogStream.h:77
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition: Queue.h:117
int myidAll
Глобальный номер процессора
Definition: Queue.h:128
std::vector< int > procState
Список состояний процессоров
Definition: Queue.h:105
int nProcAll
Общее число процессоров
Definition: Queue.h:131
int currentKvant
Номер текущего кванта времени
Definition: Queue.h:210
static std::ostream * defaultQueueLogStream
Поток вывода логов и ошибок очереди
Definition: defs.h:210
int groupAll
Definition: Queue.h:153
void PrintUniversalLogoToStream(std::ostream &str)
Передача в поток вывода универсальной шапки программы VM2D/VM3D.
Definition: defs.cpp:89
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени

Here is the call graph for this function:

Queue::~Queue ( )

Деструктор

Definition at line 117 of file Queue.cpp.

118 {
119  info('i') << "Goodbye!" << std::endl;
120 #ifdef USE_MPI
121  MPI_Finalize();
122 #endif
123 }//~Queue()
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177

Member Function Documentation

void Queue::AddTask ( int  _nProc,
std::unique_ptr< PassportGen _passport 
)
private

Добавление задачи в список

Parameters
[in]_nProcчисло запрашиваемых процессоров
[in]_passportумный указатель на паспорт задачи, направляемой в очередь

Definition at line 708 of file Queue.cpp.

709 {
710  task.resize(task.size() + 1);
711  (task.end() - 1)->nProc = _nProc;
712  (task.end() - 1)->passport = std::move(_passport);
713  (task.end() - 1)->proc.resize(_nProc);
714  (task.end() - 1)->state = TaskState::waiting;
715 }//AddTask(...)
задача ожидает запуска
std::vector< Task > task
Список описаний решаемых задач
Definition: Queue.h:183

Here is the caller graph for this function:

void Queue::ConstructProcStateVar ( )
private

Процедура, нумерующая задачи в возрастающем порядке

Необходима для корректного распределения задач по процессорам

Definition at line 622 of file Queue.cpp.

623 {
624  std::vector<bool> prFlag;
625  int number = -1;
626 
627  prFlag.resize(nProcAll, true); //их надо просматривать
628 
629  for (int i = 0; i<nProcAll; ++i)
630 
631 #ifdef USE_MPI
632  if (procState[i] == MPI_UNDEFINED)
633  {
634  prFlag[i] = false; //уже просмотрели
635  procStateVar[i] = MPI_UNDEFINED;
636  } //if (ProcState[i]==MPI_UNDEFINED)
637 #else
638  if (procState[i] == -32766)
639  {
640  prFlag[i] = false; //уже просмотрели
641  procStateVar[i] = -32766;
642  } //if (ProcState[i]==-32766)
643 #endif
644 
645  for (int i = 0; i<nProcAll; ++i)
646  if (prFlag[i])
647  {
648  prFlag[i] = false;
649  number++;
650 
651  for (int s = i; s<nProcAll; ++s)
652  if (procState[s] == procState[i])
653  {
654  procStateVar[s] = number;
655  prFlag[s] = false;
656  }//if (ProcState[s]==ProcState[i])
657  } //if (pr_flag[i])
658 }//ConstructProcStateVar()
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition: Queue.h:117
std::vector< int > procState
Список состояний процессоров
Definition: Queue.h:105
int nProcAll
Общее число процессоров
Definition: Queue.h:131

Here is the caller graph for this function:

void Queue::LoadTasksList ( const std::string &  _tasksFile,
const std::string &  _mechanicsFile,
const std::string &  _defaultsFile,
const std::string &  _switchersFile 
)

Загрузка списка задач

Parameters
[in]_tasksFileконстантная ссылка на имя файла с описанием очереди задач
[in]_mechanicsFileконстантная ссылка на имя файла со словарем механических систем
[in]_defaultsFileконстантная ссылка на имя файла с описанием параметров по умолчанию
[in]_switchersFileконстантная ссылка на имя файла со значениями параметров-переключателей

Definition at line 719 of file Queue.cpp.

720 {
721  std::string extTasksFile = _tasksFile;
722  std::string extMechanicsFile = _mechanicsFile;
723  std::string extDefaultsFile = _defaultsFile;
724  std::string extSwitchersFile = _switchersFile;
725 
726  if (
727  fileExistTest(extTasksFile, info, {"txt", "TXT"}) &&
728  fileExistTest(extDefaultsFile, info, { "txt", "TXT" }) &&
729  fileExistTest(extSwitchersFile, info, { "txt", "TXT" })
730  )
731  {
732  std::stringstream tasksFile(Preprocessor(extTasksFile).resultString);
733  std::stringstream defaultsFile(Preprocessor(extDefaultsFile).resultString);
734  std::stringstream switchersFile(Preprocessor(extSwitchersFile).resultString);
735  std::vector<std::string> taskFolder;
736 
737  std::unique_ptr<StreamParser> parserTaskList;
738  parserTaskList.reset(new StreamParser(info, "parser", tasksFile, '(', ')'));
739 
740  std::vector<std::string> alltasks;
741  parserTaskList->get("problems", alltasks);
742 
743  std::ptrdiff_t nTasks = std::count_if(alltasks.begin(), alltasks.end(), [](const std::string& a) {return (a.length() > 0);});
744  info.endl();
745  info('i') << "Number of problems to be solved: " << nTasks << std::endl;
746 
747  for (size_t i = 0; i < alltasks.size(); ++i)
748  {
749  if (alltasks[i].length() > 0)
750  {
751  //делим имя задачи + выражение в скобках на 2 подстроки
752  std::pair<std::string, std::string> taskLine = StreamParser::SplitString(info, alltasks[i], false);
753 
754  std::string dir = taskLine.first;
755 
756  info.endl();
757  info('i') << "-------- Loading problem #" << i << " (" << dir << ") --------" << std::endl;
758 
759  //вторую подстроку разделяем на вектор из строк по запятым, стоящим вне фигурных скобок
760  std::vector<std::string> vecTaskLineSecond = StreamParser::StringToVector(taskLine.second, '{', '}');
761 
762  //создаем парсер и связываем его с параметрами профиля
763  std::stringstream tskStream(StreamParser::VectorStringToString(vecTaskLineSecond));
764  std::unique_ptr<StreamParser> parserTask;
765 
766  parserTask.reset(new StreamParser(info, "problem parameters", tskStream, defaultsFile, {"pspfile", "np", "copyPath"}));
767 
768  std::string pspFile;
769  int np;
770 
771  //считываем нужные параметры с учетом default-значений
772  parserTask->get("pspfile", pspFile, &defaults::defaultPspFile);
773  parserTask->get("np", np, &defaults::defaultNp);
774 
775  if (np > 1) // 31/12/2023
776  {
777  info.endl();
778  info('e') << "Internal MPI-parallelization is not supported longer!" << std::endl;
779  }
780 
781  std::string copyPath;
782  parserTask->get("copyPath", copyPath, &defaults::defaultCopyPath);
783  if (copyPath.length() > 0)
784  {
785  //Копировать паспорт поручаем только одному процессу
786  if (myidAll == 0)
787  {
788  std::string initDir(copyPath.begin(), copyPath.end());
789  std::string command;
790 
791 #if defined(_WIN32)
792  std::replace(dir.begin(), dir.end(), '/', '\\');
793  std::replace(initDir.begin(), initDir.end(), '/', '\\');
794 #endif
795 
796  VMlib::CreateDirectory(dir.c_str(), "");
797 
798 #if defined(_WIN32)
799  command = "copy \"" + initDir + "\\*.*\" "+ dir + "\\";
800 #else
801  command = "cp ./" + initDir + "/* " + dir + "/";
802 #endif
803 
804  std::cout << "Copying files from folder \"" << initDir << "\" to \"" << dir << "\"" << std::endl;
805 
806  int systemRet = system(command.c_str());
807  if(systemRet == -1)
808  {
809  // The system method failed
810  info('e') << "problem #" << i << " (" << dir << \
811  ") copying passport system method failed" << std::endl;
812  exit(-1);
813  }
814  std::cout << "Copying OK " << std::endl << std::endl;
815 
816  //copyFile(pspFile, "./" + dir + "/" + pspFile);
817  }
818  }
819 
820 #ifdef USE_MPI
821  MPI_Barrier(MPI_COMM_WORLD);
822 #endif
823 
824  std::unique_ptr<PassportGen> ptrPsp;
825 
826 #ifdef CODE2D
827  ptrPsp.reset(new VM2D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
828 #endif
829 
830 #ifdef CODE3D
831  ptrPsp.reset(new VM3D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
832 #endif
833 
834  AddTask(np, std::move(ptrPsp));
835 
836  info('i') << "-------- Problem #" << i << " (" << dir << ") is loaded --------" << std::endl << std::endl;
837  }
838  } //for i
839 
840  //tasksFile.close();
841  tasksFile.clear();
842 
843  //defaultsFile.close();
844  defaultsFile.clear();
845 
846  //switchersFile.close();
847  switchersFile.clear();
848 
849  }
850  else
851  {
852  exit(1);
853  }
854 
855 }
const std::string defaultPspFile
Имя файла с паспортом задачи
Definition: defs.h:201
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177
const std::string defaultCopyPath
Путь к каталогу с задачей для копирования в новые каталоги
Definition: defs.h:207
void AddTask(int _nProc, std::unique_ptr< PassportGen > _passport)
Добавление задачи в список
Definition: Queue.cpp:708
Класс, позволяющий выполнять предварительную обработку файлов
Definition: Preprocessor.h:59
int myidAll
Глобальный номер процессора
Definition: Queue.h:128
Класс, опеделяющий паспорт двумерной задачи
Definition: Passport2D.h:258
static std::vector< std::string > StringToVector(std::string line, char openBracket= '(', char closeBracket= ')')
Pазбор строки, содержащей запятые, на отдельные строки
bool fileExistTest(std::string &fileName, LogStream &info, const std::list< std::string > &extList={})
Проверка существования файла
Definition: defs.h:324
static std::pair< std::string, std::string > SplitString(LogStream &info, std::string line, bool upcase=true)
Разбор строки на пару ключ-значение
static std::string VectorStringToString(const std::vector< std::string > &_vecString)
Объединение вектора (списка) из строк в одну строку
void endl()
Вывод в поток логов пустой строки
Definition: LogStream.h:100
void CreateDirectory(const std::string &dir, const std::string &name)
Создание каталога
Definition: defs.h:414
Класс, позволяющий выполнять разбор файлов и строк с настройками и параметрами
Definition: StreamParser.h:151
bool get(const std::string &name, std::vector< Point2D > &res, const std::vector< Point2D > *defValue=nullptr, bool echoDefault=true) const
Считывание вектора из двумерных точек из базы данных
Definition: StreamParser.h:314
const int defaultNp
Необходимое число процессоров для решения задачи
Definition: defs.h:204

Here is the call graph for this function:

Here is the caller graph for this function:

void Queue::RunConveyer ( )

Запуск вычислительного конвейера (в рамках кванта времени)

Definition at line 662 of file Queue.cpp.

663 {
664 #ifdef USE_MPI
665  if (parallel.commWork != MPI_COMM_NULL)
666 #else
667  if (parallel.commWork != 0x04000000)
668 #endif
669  {
670  double kvantStartWallTime; //Время на главном процессоре, когда начался очередной квант
671  double deltaWallTime;
672 
673  if (parallel.myidWork == 0)
674 #ifdef USE_MPI
675  kvantStartWallTime = MPI_Wtime();
676 #else
677  kvantStartWallTime = omp_get_wtime();
678 #endif
679 
681  //if (world->getCurrentStep() == 0)
682  // world->ZeroStep();
683 
684  do
685  {
686  if (!world->isFinished())
687  world->Step();
688 
689  if (parallel.myidWork == 0)
690 #ifdef USE_MPI
691  deltaWallTime = MPI_Wtime() - kvantStartWallTime;
692 #else
693  deltaWallTime = omp_get_wtime() - kvantStartWallTime;
694 #endif
695 
696 #ifdef USE_MPI
697  MPI_Bcast(&deltaWallTime, 1, MPI_DOUBLE, 0, parallel.commWork);
698 #endif
699  }
700  //Проверка окончания кванта по времени (или завершения задачи)
701  while (deltaWallTime < kvantTime);
702 
703  }//if (commWork != MPI_COMM_NULL)
704 }//RunConveyer()
const double kvantTime
Продолжительность кванта времени в секундах
Definition: Queue.h:207
int myidWork
Локальный номер процессора, решающего конкретную задачу
Definition: Parallel.h:94
int commWork
Коммуникатор для решения конкретной задачи
Definition: Parallel.h:90
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition: Queue.h:186
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition: Queue.h:225

Here is the caller graph for this function:

void Queue::TaskSplit ( )

Процедура постановка новых задач на отсчет и занятие процессоров

Выполняется в начале очередного кванта

Definition at line 127 of file Queue.cpp.

128 {
129  //Пересылаем информацию о состоянии процессоров на все компьютеры
130 
131 #ifdef USE_MPI
132  MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
133 #else
134  myProcState = procState[0];
135 #endif
136 
137 #ifdef USE_MPI
138  if (myProcState == MPI_UNDEFINED)
139  world.reset(nullptr);
140 #else
141  if (myProcState == -32766)
142  world.reset(nullptr);
143 #endif
144 
145  if (myidAll == 0)
146  {
147  currentKvant++;
148 
149  int nfree = 0; //число свободных процессоров
150 
151  //Обнуляем число готовых к старту задач
152  numberOfTask.prepared = 0;
153 
154  //считаем число свободных процессоров
155  for (int i = 0; i < nProcAll; ++i)
156 #ifdef USE_MPI
157  if (procState[i] == MPI_UNDEFINED)
158 #else
159  if (procState[i] == -32766)
160 #endif
161  nfree++;
162 
163  //формируем номер следующей задачи, которая возможно будет поставлена на отсчет:
164  size_t taskFol = numberOfTask.finished + numberOfTask.solving;
165 
166  //проверяем, достаточно ли свободных процессоров для запуска еще одной задачи
167  //при условии, что не все задачи уже решены
168  while ((taskFol < task.size()) && (nfree >= task[taskFol].nProc))
169  {
170  int p = 0; //число найденных свободных процессоров под задачу
171  int j = 0; //текущий счетчик процессоров
172 
173  //подбираем свободные процессоры
174  do
175  {
176 #ifdef USE_MPI
177  if (procState[j] == MPI_UNDEFINED)
178 #else
179  if (procState[j] == -32766)
180 #endif
181  {
182  //состояние процессора устанавливаем в "занят текущей задачей"
183  procState[j] = static_cast<int>(taskFol);
184 
185  //номер процессора посылаем в перечень процессоров, решающих текущую задачу
186  task[taskFol].proc[p] = j;
187 
188  //увеличиваем число найденных процессоров на единицу
189  p++;
190  }//if (ProcState[j]==MPI_UNDEFINED)
191 
192  //переходим к следующему процессору
193  j++;
194  } while (p < task[taskFol].nProc);
195 
196  //состояние задачи устанавливаем в режим "стартует"
197  task[taskFol].state = TaskState::starting;
198 
199  //отмечаем номер кванта, когда задача начала считаться
200  task[taskFol].startEndKvant.first = currentKvant;
201 
202  //изменяем счетчики числа задач
203  numberOfTask.prepared++;
204  numberOfTask.solving++;
205 
206  //изменяем счетчик свободных процессоров
207  nfree -= task[taskFol].nProc;
208 
209  //переходим к следующей задаче
210  taskFol++;
211 
212  }//while ((nfree >= Task[task_fol].nproc)&&(task_fol<Task.size()))
213 
214  }//if (myidAll == 0)
215 
216  //Вывод информации о занятости процессоров задачами
217  if (myidAll == 0)
218  {
219  info.endl();
220  info('i') << "ProcStates: " << std::endl;
221  for (int i = 0; i < nProcAll; ++i)
222  info('-') << "proc[" << i << "] <=> problem[" << procState[i] << "]" << std::endl;
223  info.endl();
224  }
225 
226  //Пересылаем информацию о состоянии процессоров на все компьютеры
227 #ifdef USE_MPI
228  MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
229 #else
230  myProcState = procState[0];
231 #endif
232 
233  //Синхронизируем информацию
234 #ifdef USE_MPI
235  MPI_Bcast(&numberOfTask.solving, 1, MPI_INT, 0, MPI_COMM_WORLD);
236  MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
237  MPI_Bcast(&numberOfTask.finished, 1, MPI_INT, 0, MPI_COMM_WORLD);
238 #endif
239 
240  //список головных процессоров на тех задачах,
241  //которые только что поставлены на счет:
242  std::vector<int> prepList;
243 
244  //формируем списки prepList и pspList
245  //(выполняется на глобальном нулевом процессоре)
246  if (myidAll == 0)
247  {
248  for (size_t i = 0; i<task.size(); ++i)
249  //находим задачи в состоянии "стартует"
250  if (task[i].state == TaskState::starting)
251  {
252  //запоминаем номер того процессора, что является там головным
253  prepList.push_back(task[i].proc[0]);
254 
255  //состояние задачи вереводим в "считает"
256  task[i].state = TaskState::running;
257  }
258  }//if myidAll==0
259 
260 
261  //Рассылаем количество стартующих задач в данном кванте на все машины
262 #ifdef USE_MPI
263  MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
264 #endif
265  if (myidAll > 0)
266  {
267  prepList.resize(numberOfTask.prepared);
268  }//if myid==0
269 
270  //следующий фрагмент выполняется только если в данном кванте стартуют новые задачи
271  if (numberOfTask.prepared > 0)
272  {
273  //пересылаем список головных машин стартующих задач на все процессоры
274 #ifdef USE_MPI
275  MPI_Bcast(prepList.data(), numberOfTask.prepared, MPI_INT, 0, MPI_COMM_WORLD);
276 #endif
277 
278  //формируем группу и коммуникатор стартующих головных процессоров
279 #ifdef USE_MPI
280  commStarting = MPI_COMM_NULL;
281  MPI_Group_incl(groupAll, numberOfTask.prepared, prepList.data(), &groupStarting);
282  MPI_Comm_create(MPI_COMM_WORLD, groupStarting, &commStarting);
283 #else
284  commStarting = 0x04000000;
285  if (numberOfTask.prepared > 0)
286  commStarting = 0;
287 #endif
288 
289  //следующий фрагмент кода только для стартующих процессов,
290  //которые входят в коммуникатор comm_starting
291 #ifdef USE_MPI
292  if (commStarting != MPI_COMM_NULL)
293 #else
294  if (commStarting != 0x04000000)
295 #endif
296  {
297  //получаем номер процесса в списке стартующих
298  int myidStarting;
299 
300 #ifdef USE_MPI
301  MPI_Comm_rank(commStarting, &myidStarting);
302 #else
303  myidStarting = 0;
304 #endif
305 
306  //подготовка стартующих задач
307  parallel.myidWork = 0;
308 
309 #ifdef CODE2D
310  world.reset(new VM2D::World2D(task[myProcState].getPassport()));
311 #endif
312 
313 #ifdef CODE3D
314  world.reset(new VM3D::World3D(task[myProcState].getPassport()));
315 #endif
316 
317  //Коммуникатор головных процессоров стартующих задач
318  //выполнил свое черное дело и будет удален
319 #ifdef USE_MPI
320  MPI_Comm_free(&commStarting);
321 #else
322  commStarting = 0x04000000;
323 #endif
324  } //if(comm_starting != MPI_COMM_NULL)
325 
326  //их группа тоже больше без надобности
327 #ifdef USE_MPI
328  MPI_Group_free(&groupStarting);
329 #endif
330  }//if (task_prepared>0)
331 
332  //формируем группу и коммуникатор считающих головных процессоров
333  //в том числе тех, которые стартуют, и тех, которые ранее входили в commStarting
334  std::vector<int> solvList; //список номеров головных процессов
335 
336  if (myidAll == 0)
337  {
338  //Если нулевой процессор свободен
339  //(не является головным в решении задачи в данном кванте) -
340  //- формально присоединяем его в группу головных
341  //необходимо для корректного обмена данными
342 #ifdef USE_MPI
343  if (procState[0] == MPI_UNDEFINED)
344 #else
345  if (procState[0] == -32766)
346 #endif
347  solvList.push_back(0);
348 
349  //далее ищем считающие задачи (в том числе только что стартующие)
350  //и собираем номера их головных процессоров
351  //(собираем в порядке просмотра номеров процессоров))
352  for (int s = 0; s<nProcAll; ++s)
353 #ifdef USE_MPI
354  if ((procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
355 #else
356  if ((procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
357 #endif
358  solvList.push_back(s);
359 
360  sizeCommSolving = static_cast<int>(solvList.size());
361 
362  flagFinish.clear();
363  flagFinish.resize(solvList.size());
364 
365  }//if myidAll==0
366 
367  //число sizeCommSolving на единицу больше, чем taskSolving, если нулевой процессор свободен
368  //и равно taskSolving, если он занят решением задачи
369  //(если он занят решением задачи, то непременно является головным)
370 
371  //пересылаем число головных процессоров и их список на все компьютеры
372  //(в том числе головной процессор - независимо от его состояния)
373 #ifdef USE_MPI
374  MPI_Bcast(&sizeCommSolving, 1, MPI_INT, 0, MPI_COMM_WORLD);
375 #endif
376 
377  if (myidAll > 0)
378  solvList.resize(sizeCommSolving);
379 
380 #ifdef USE_MPI
381  MPI_Bcast(solvList.data(), sizeCommSolving, MPI_INT, 0, MPI_COMM_WORLD);
382 #endif
383 
384 
385 #ifdef USE_MPI
386  commSolving = MPI_COMM_NULL;
387 
388  //формируем группу и коммуникатор головных процессоров
389  MPI_Group_incl(groupAll, sizeCommSolving, solvList.data(), &groupSolving);
390  MPI_Comm_create(MPI_COMM_WORLD, groupSolving, &commSolving);
391 #else
392  commSolving = 0x04000000;
393  if (sizeCommSolving > 0)
394  commSolving = 1;
395 #endif
396 
397  if (myidAll == 0)
398  {
399  //составление неубывающего массива номеров для верного
400  //распределения задач по процессоров, с тем чтобы задачи
401  //с прошлого кванта оказались в тех же группах, что и раньше
403  }
404 
405  //Пересылаем информацию о состоянии процессоров на все компьютеры
406 #ifdef USE_MPI
407  MPI_Scatter(procStateVar.data(), 1, MPI_INT, &myProcStateVar, 1, MPI_INT, 0, MPI_COMM_WORLD);
408 #else
409  myProcStateVar = procStateVar[0];
410 #endif
411 
412  //Расщепляем весь коммуникатор MPI_COMM_WORLD (т.е. вообще все процессоры)
413  //на коммуникаторы решаемых задач
414  //В результате все процессоры, решающие конкретную задачу,
415  //объединяются в коммуникатор comm_work
416  //Несмотря на то, что он называется везде одинаково, у каждой задачи он свой
417  //и объединяет ровно те процессоры, которые надо
418  //Процессоры, состояние которых установлено в "свободен", т.е.
419  //ProcState=MPI_UNDEFINED (=-32766) ни в один
420  //новый коммуникатор commWork не попадут
421 #ifdef USE_MPI
422  MPI_Comm_split(MPI_COMM_WORLD, myProcStateVar, 0, &parallel.commWork);
423 #else
424  if (myProcStateVar != -32766)
425  parallel.commWork = 0;
426  else
427  parallel.commWork = 0x04000000;
428 #endif
429 
430  //следующий код выполняется процессорами, участвующими в решении задач,
431  //следовательно, входящими в коммуникаторы comm_work
432 #ifdef USE_MPI
433  if (parallel.commWork != MPI_COMM_NULL)
434 #else
435  if (parallel.commWork != 0x04000000)
436 #endif
437  {
438  //опеределяем "локальный" номер данного процессора в коммуникаторе commWork и число процессов в нем
439 #ifdef USE_MPI
440  MPI_Comm_size(parallel.commWork, &parallel.nProcWork);
441  MPI_Comm_rank(parallel.commWork, &parallel.myidWork);
442 #else
443  parallel.nProcWork = 1;
444  parallel.myidWork = 0;
445 #endif
446  //World равен nullptr только в случае, когда выполняется первый шаг
447  //в этом случае он создан только на главном процессоре группы;
448  //на остальных происходит его создание
449 #ifdef CODE2D
450  if (world == nullptr)
451  world.reset(new VM2D::World2D(task[myProcState].getPassport()));
452 #endif
453 
454 #ifdef CODE3D
455  if (world == nullptr)
456  world.reset(new VM3D::World3D(task[myProcState].getPassport()));
457 #endif
458 
459 
460  //Синхронизация параметров
461 #ifdef USE_MPI
462  MPI_Bcast(&(world->getPassportGen().timeDiscretizationProperties.currTime), 1, MPI_DOUBLE, 0, parallel.commWork);
463 #endif
464 
465  if ((parallel.myidWork == 0) && (world->getCurrentStep() == 0))
466  {
467 #ifdef CODE2D
468  VM2D::World2D& world2D = dynamic_cast<VM2D::World2D&>(*world);
469  //Создание файлов для записи сил
470  for (size_t q = 0; q < world2D.getPassport().airfoilParams.size(); ++q)
471  world2D.GenerateMechanicsHeader(q);
472  //Создание файла для записи временной статистики
473  world2D.getTimestat().GenerateStatHeader();
474 #endif
475 
476 #ifdef CODE3D
477  VM3D::World3D& world3D = dynamic_cast<VM3D::World3D&>(*world);
478  //Создание файлов для записи сил
479  //for (size_t q = 0; q < world3D.getPassport().airfoilParams.size(); ++q)
480  // world3D.GenerateMechanicsHeader(q);
481 
482  //Создание файла для записи временной статистики
483  world3D.getTimestat().GenerateStatHeader();
484 #endif
485  }
486  }
487 
488 }//TaskSplit()
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177
Times & getTimestat() const
Возврат ссылки на временную статистику выполнения шага расчета по времени
Definition: World2D.h:242
int myProcState
Состояние данного процессора
Definition: Queue.h:193
int myidWork
Локальный номер процессора, решающего конкретную задачу
Definition: Parallel.h:94
int groupSolving
Definition: Queue.h:156
int commStarting
Definition: Queue.h:155
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition: Queue.h:117
int commSolving
Definition: Queue.h:157
int myidAll
Глобальный номер процессора
Definition: Queue.h:128
int commWork
Коммуникатор для решения конкретной задачи
Definition: Parallel.h:90
задача решается
int myProcStateVar
Состояние данного процессора
Definition: Queue.h:200
задача стартует
void GenerateMechanicsHeader(size_t mechanicsNumber)
Definition: World2D.cpp:725
std::vector< int > procState
Список состояний процессоров
Definition: Queue.h:105
void ConstructProcStateVar()
Процедура, нумерующая задачи в возрастающем порядке
Definition: Queue.cpp:622
std::vector< int > flagFinish
Список возвращаемых флагов останова задачи
Definition: Queue.h:125
int nProcAll
Общее число процессоров
Definition: Queue.h:131
int groupStarting
Definition: Queue.h:154
std::vector< Task > task
Список описаний решаемых задач
Definition: Queue.h:183
int currentKvant
Номер текущего кванта времени
Definition: Queue.h:210
int groupAll
Definition: Queue.h:153
const Passport & getPassport() const
Возврат константной ссылки на паспорт
Definition: World2D.h:222
virtual void GenerateStatHeader() const override
Генерация заголовка файла временной статистики
Definition: Times2D.cpp:55
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition: Queue.h:186
Класс, опеделяющий текущую решаемую задачу
Definition: World2D.h:68
std::vector< AirfoilParams > airfoilParams
Список структур с параметрами профилей
Definition: Passport2D.h:280
void endl()
Вывод в поток логов пустой строки
Definition: LogStream.h:100
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition: Queue.h:225
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени
int nProcWork
Число процессоров, решающих конкретную задачу
Definition: Parallel.h:97
int sizeCommSolving
Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах ...
Definition: Queue.h:163

Here is the call graph for this function:

Here is the caller graph for this function:

void Queue::TaskUpdate ( )

Процедура обновления состояния задач и процессоров

Выполняется в конце очередного кванта

Definition at line 492 of file Queue.cpp.

493 {
494  info('i') << "------ Kvant finished ------" << std::endl;
495 
496  //Код только для головных процессов, которые входят в коммуникатор commSolving
497  //т.е. выполняется только на головных процессорах, решающих задачи.
498  //Кажется, что можно было бы с тем же эфектом написать здесь if (myidWork==0),
499  //но это не так, поскольку нужно включить в себя еще нулевой процессор,
500  //который в любом случае присоединен к коммуникатору comm_solving,
501  //даже если он не решает задачу (а если решает - то он всегда головной)
502 #ifdef USE_MPI
503  if (commSolving != MPI_COMM_NULL)
504  {
505  //определяем номер процессора в данном коммуникаторе -
506  // - вдруг потребуется на будущее!
507  int myidSolving;
508  MPI_Comm_rank(commSolving, &myidSolving);
509 
510  //Алгоритм возвращения результатов расчета конкретной задачи
511  //Если срабатывает признак того, что решение задачи можно прекращать
512  //или не решается ни одна задача (а в комуникатор входит
513  //только 0-й процессор, присоединеннй туда насильно)
514  int stopSignal = ((parallel.commWork == MPI_COMM_NULL) || (world->isFinished())) ? 1 : 0;
515 
516  //пересылаем признаки прекращения решения задач на нулевой процессор
517  //коммуникатора comm_solving - т.е. на глобальный процессор с номером 0 ---
518  // --- вот для чего его насильно присоединяли к этому коммуникатору
519  //если сам по себе он туда не входил
520  MPI_Gather(&stopSignal, 1, MPI_INT, flagFinish.data(), 1, MPI_INT, 0, commSolving);
521 
522  //после пересылки информации коммуникатор comm_solving уничтожается
523  MPI_Comm_free(&commSolving);
524 
525  } //if(comm_solving != MPI_COMM_NULL)
526 
527  //и соответствующая ему группа тоже удаляется
528  MPI_Group_free(&groupSolving);
529 
530 
531  if (parallel.commWork != MPI_COMM_NULL)
532  {
533  //К этому месту все процессоры, решающие задачи, приходят уже выполнив все расчеты
534  //в рамках одного кванта времени
535  //Поэтому коммуникаторы решаемых задач нам уже ни к чему - уничтожаем их
536  MPI_Comm_free(&parallel.commWork);
537  }
538 #else
539  if (commSolving != 0x04000000)
540  {
541  int myidSolving = 0;
542  int stopSignal = ((parallel.commWork == 0x04000000) || (world->isFinished())) ? 1 : 0;
543  flagFinish[0] = stopSignal;
544  commSolving = 0x04000000;
545  } //if(comm_solving != 0x04000000)
546 
547  if (parallel.commWork != 0x04000000)
548  parallel.commWork = 0x04000000;
549 #endif
550 
551  //Обновление состояния задач и высвобождение процессоров из отсчитавших задач
552  //(выполняется на глобальном нулевом процессоре)
553  if (myidAll == 0)
554  {
555  //Список номеров задач, которые сейчас считаются
556  std::vector<int> taskList;
557 
558  //если состояние "считается" запоминаем эту задачу
559  //задачи запоминаются в порядке просмотра процессоров
560  for (int s = 0; s<nProcAll; ++s)
561 #ifdef USE_MPI
562  if ( (procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
563 #else
564  if ( (procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
565 #endif
566  taskList.push_back(procState[s]);
567 
568  //Тем задачам, которые во флаге прекращения счета вернули "1",
569  //ставим состояние завершения счета.
570  //Конструкция +sizeCommSolving-taskSolving введена для смещения в массиве на единицу
571  //если нулевой процессор свободен - тогда он присоединяется формально в нулевом
572  //элементе массива flagFinish, а содержательная часть массива смещается на 1
573  //(в этом случае sizeCommSolving как раз будет на 1 больше, чем taskSolving, см. выше)
574  //если же нулевой процесс занят решением задачи, то смещения нет,
575  //поскольку в этом случае sizeCommSolving и taskSolving равны
576  for (int i = 0; i < numberOfTask.solving; ++i)
577  if (flagFinish[i + sizeCommSolving - numberOfTask.solving] == 1)
578  task[taskList[i]].state = TaskState::finishing;
579  } //if myid==0
580 
581  if (myidAll == 0)
582  {
583  for (size_t i = 0; i < task.size(); ++i)
584  {
585  //освобождаем процессоры от сосчитавшихся задач
586  if (task[i].state == TaskState::finishing)
587  {
588  //устанавливаем состояние задачи в "отсчитано"
589  task[i].state = TaskState::done;
590 
591  //отмечаем последний квант, когда задача считалась
592  task[i].startEndKvant.second = currentKvant;
593 
594  //изменяем счетчики
595  numberOfTask.solving--;
596  numberOfTask.finished++;
597 
598  //состояние процессоров, решавших данную задачу, устанавливаем в "свободен"
599  for (int p = 0; p < task[i].nProc; ++p)
600  {
601 #ifdef USE_MPI
602  procState[task[i].proc[p]] = MPI_UNDEFINED;
603 #else
604  procState[task[i].proc[p]] = -32766;
605 #endif
606  }//for p
607  }//if (Task[i].state == 3)
608  }//for i
609 
610  //определяем, надо ли еще выделять квант времени или можно заканчивать работу
611  //путем сравнения числа отсчитанных задач с общим числом задач
612  nextKvant = (numberOfTask.finished < static_cast<int>(task.size())) ? 1 : 0;
613  }//if (myid == 0)
614 
615 #ifdef USE_MPI
616  MPI_Bcast(&nextKvant, 1, MPI_INT, 0, MPI_COMM_WORLD);
617 #endif
618 }//TaskUpdate()
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition: Queue.h:177
int groupSolving
Definition: Queue.h:156
задача решена
int commSolving
Definition: Queue.h:157
int myidAll
Глобальный номер процессора
Definition: Queue.h:128
int commWork
Коммуникатор для решения конкретной задачи
Definition: Parallel.h:90
задача решается
std::vector< int > procState
Список состояний процессоров
Definition: Queue.h:105
std::vector< int > flagFinish
Список возвращаемых флагов останова задачи
Definition: Queue.h:125
int nProcAll
Общее число процессоров
Definition: Queue.h:131
std::vector< Task > task
Список описаний решаемых задач
Definition: Queue.h:183
int currentKvant
Номер текущего кванта времени
Definition: Queue.h:210
задача финиширует
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition: Queue.h:186
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition: Queue.h:225
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени
int sizeCommSolving
Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах ...
Definition: Queue.h:163
int nextKvant
Признак необходимости выполнения следующего кванта и продолжения расчета
Definition: Queue.h:217

Here is the caller graph for this function:

Member Data Documentation

int VMlib::Queue::commSolving
private

Definition at line 157 of file Queue.h.

int VMlib::Queue::commStarting
private

Definition at line 155 of file Queue.h.

int VMlib::Queue::currentKvant

Номер текущего кванта времени

Definition at line 210 of file Queue.h.

int VMlib::Queue::finished

Число уже решенных задач

Definition at line 95 of file Queue.h.

std::vector<int> VMlib::Queue::flagFinish
private

Список возвращаемых флагов останова задачи

Позиции 0, 1, 2 и т.д. заполняется головными процессами, решающими задачи

Warning
Нулевая позиция — всегда соответствует процессору с myid=0 (глобальному) даже если он не участвует в данном кванте в решении задачи; в этом случае головные процессы заполняют позиции 1, 2, 3 и т.д.

Definition at line 125 of file Queue.h.

int VMlib::Queue::groupAll
private

Definition at line 153 of file Queue.h.

int VMlib::Queue::groupSolving
private

Definition at line 156 of file Queue.h.

int VMlib::Queue::groupStarting
private

Definition at line 154 of file Queue.h.

LogStream VMlib::Queue::info
private

Поток для вывода логов и сообщений об ошибках

Definition at line 177 of file Queue.h.

const double VMlib::Queue::kvantTime = 10.0

Продолжительность кванта времени в секундах

В пределах кванта по времени не производится глобальной синхронизации всех процессоров, решающих все задачи. Проверка состояния очереди и ее обновление производятся между квантами. По исчерпании кванта времени выполнение шагов по времени всеми процессорами прекращается до обновления очереди

Definition at line 207 of file Queue.h.

int VMlib::Queue::myidAll
private

Глобальный номер процессора

Definition at line 128 of file Queue.h.

int VMlib::Queue::myProcState

Состояние данного процессора


Принимает значения :

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 193 of file Queue.h.

int VMlib::Queue::myProcStateVar

Состояние данного процессора


Принимает значения :

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 200 of file Queue.h.

int VMlib::Queue::nextKvant

Признак необходимости выполнения следующего кванта и продолжения расчета

Принимает значения:

  • 1 — делать еще один квант (еще есть что считать),
  • 0 — расчет окончен, выход (все расчеты выполнены).

Definition at line 217 of file Queue.h.

int VMlib::Queue::nProcAll
private

Общее число процессоров

Definition at line 131 of file Queue.h.

struct { ... } VMlib::Queue::numberOfTask

Структура, содержащая информацию о количестве задач в данный момент времени

Содержит следующие поля:

  • prepared — число подготовленных к запуску задач;
  • solving — число решаемых в данный момент задач;
  • finished — число уже решенных задач.
Parallel VMlib::Queue::parallel

Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме

Внутри него содержатся такие параметры, как:

  • commWork — коммуникатор для решения конкретной задачи;
  • myidWork — локальный номер данного процессора в коммуникаторе процессоров, решающих конкретную задачу;
  • nProcWork — число процессоров, решающих конкретную задачу.

Definition at line 225 of file Queue.h.

int VMlib::Queue::prepared

Число подготовленных к запуску задач

Включено в число решаемых в данный момент задач

Definition at line 87 of file Queue.h.

std::vector<int> VMlib::Queue::procState
private

Список состояний процессоров

Длина списка равна числу процессоров
Заполняется только для на главном узле, у которого myid=0
Принимает значения:

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 105 of file Queue.h.

std::vector<int> VMlib::Queue::procStateVar
private

Модифицированный список состояний процессоров

Длина списка равна числу процессоров
Заполняется только на главном узле, у которого myid=0
Принимает значения:

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).
    Аналогичен procState, но отличается от него следующим:
  • процессы, занятые одной задачей, получают один и тот же номер;
  • позиции, в которых впервые появляются номера в этом списке, образуют возрастающую последовательность.

Definition at line 117 of file Queue.h.

int VMlib::Queue::sizeCommSolving
private

Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах

В него всегда входит 0-й процессор, даже если он не раешает задачу в данном кванте времени

Definition at line 163 of file Queue.h.

int VMlib::Queue::solving

Число решаемых в данный момент задач

Включает число подготовленных к запуску задач

Definition at line 92 of file Queue.h.

std::vector<Task> VMlib::Queue::task

Список описаний решаемых задач

В описаниях содержится информация о прохождении задач и их текущем состоянии

Definition at line 183 of file Queue.h.

std::unique_ptr<WorldGen> VMlib::Queue::world

Умный указатель на текущую решаемую задачу

Definition at line 186 of file Queue.h.


The documentation for this class was generated from the following files: