VM2D 1.14
Vortex methods for 2D flows simulation
Loading...
Searching...
No Matches
VMlib::Queue Class Reference

Класс, опеделяющий список решаемых задач и очередь их прохождения More...

#include <Queue.h>

Collaboration diagram for VMlib::Queue:

Public Member Functions

 Queue (int &argc, char **&argv)
 Конструктор
 
 ~Queue ()
 Деструктор
 
void TaskSplit ()
 Процедура постановка новых задач на отсчет и занятие процессоров
 
void TaskUpdate ()
 Процедура обновления состояния задач и процессоров
 
void RunConveyer ()
 Запуск вычислительного конвейера (в рамках кванта времени)
 
void LoadTasksList (const std::string &_tasksFile, const std::string &_mechanicsFile, const std::string &_defaultsFile, const std::string &_switchersFile)
 Загрузка списка задач
 

Public Attributes

std::vector< Tasktask
 Список описаний решаемых задач
 
std::unique_ptr< WorldGenworld
 Умный указатель на текущую решаемую задачу
 
int myProcState
 Состояние данного процессора
 
int myProcStateVar
 Состояние данного процессора
 
const double kvantTime = 1.0
 Продолжительность кванта времени в секундах
 
int currentKvant
 Номер текущего кванта времени
 
int nextKvant
 Признак необходимости выполнения следующего кванта и продолжения расчета
 
Parallel parallel
 Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
 

Private Member Functions

void ConstructProcStateVar ()
 Процедура, нумерующая задачи в возрастающем порядке
 
void AddTask (int _nProc, std::unique_ptr< PassportGen > _passport)
 Добавление задачи в список
 

Private Attributes

struct { 
 
   int   prepared 
 Число подготовленных к запуску задач More...
 
   int   solving 
 Число решаемых в данный момент задач More...
 
   int   finished 
 Число уже решенных задач More...
 
numberOfTask 
 Структура, содержащая информацию о количестве задач в данный момент времени
 
std::vector< int > procState
 Список состояний процессоров
 
std::vector< int > procStateVar
 Модифицированный список состояний процессоров
 
std::vector< int > flagFinish
 Список возвращаемых флагов останова задачи
 
int myidAll
 Глобальный номер процессора
 
int nProcAll
 Общее число процессоров
 
int groupAll
 
int groupStarting
 
int commStarting
 
int groupSolving
 
int commSolving
 
int sizeCommSolving
 Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах
 
LogStream info
 Поток для вывода логов и сообщений об ошибках
 

Detailed Description

Класс, опеделяющий список решаемых задач и очередь их прохождения

Управляет распределением задач по процессорам, инициализацией их запуска и выгрузки

Author
Марчевский Илья Константинович \Version 1.14
Date
6 марта 2026 г.

Definition at line 75 of file Queue.h.

Constructor & Destructor Documentation

◆ Queue()

Queue::Queue ( int &  argc,
char **&  argv 
)

Конструктор

Производит инициализацию очереди перед запуском всего процесса

Parameters
[in]argcссылка на число параметров командной строки
[in]argvссылка на указатель на список параметров командной строки
[in]_CreateMpiTypesуказатель на функцию, инициализирующую все необходимые MPI-описания типов

Definition at line 77 of file Queue.cpp.

78{
79#ifdef USE_MPI
80 MPI_Init(&argc, &argv);
81 MPI_Comm_size(MPI_COMM_WORLD, &nProcAll);
82 MPI_Comm_rank(MPI_COMM_WORLD, &myidAll);
83 MPI_Comm_group(MPI_COMM_WORLD, &groupAll);
84#else
85 nProcAll = 1;
86 myidAll = 0;
87 groupAll = -1;
88#endif
89
90 //готовим систему к запуску
91 //(выполняется на глобальном нулевом процессоре)
92 if (myidAll == 0)
93 {
95
97
98 //Устанавливаем флаг занятости в состояние "свободен" всем процессорам,
99#ifdef USE_MPI
100 procState.resize(nProcAll, MPI_UNDEFINED);
101 procStateVar.resize(nProcAll, MPI_UNDEFINED);
102#else
103 procState.resize(nProcAll, -32766);
104 procStateVar.resize(nProcAll, -32766);
105#endif
106
107 numberOfTask.solving = 0; //число решаемых в данный момент задач
108 numberOfTask.prepared = 0; //число подготовленных к запуску задач (в том числе)
109 numberOfTask.finished = 0; //число уже отсчитанных задач
110
111 //текущий номер кванта времени
112 currentKvant = -1;
113 }//if myid==0
114}//Queue()
void assignStream(std::ostream *pStr_, const std::string &label_)
Связывание потока логов с потоком вывода
Definition LogStream.h:80
int nProcAll
Общее число процессоров
Definition Queue.h:134
std::vector< int > procStateVar
Модифицированный список состояний процессоров
Definition Queue.h:120
struct VMlib::Queue::@0 numberOfTask
Структура, содержащая информацию о количестве задач в данный момент времени
int currentKvant
Номер текущего кванта времени
Definition Queue.h:213
int myidAll
Глобальный номер процессора
Definition Queue.h:131
int groupAll
Definition Queue.h:156
LogStream info
Поток для вывода логов и сообщений об ошибках
Definition Queue.h:180
std::vector< int > procState
Список состояний процессоров
Definition Queue.h:108
void PrintUniversalLogoToStream(std::ostream &str)
Передача в поток вывода универсальной шапки программы VM2D/VM3D.
Definition defs.cpp:92
static std::ostream * defaultQueueLogStream
Поток вывода логов и ошибок очереди
Definition defs.h:221
static std::ostream * defaultWorld2DLogStream
Поток вывода логов и ошибок задачи
Definition defs.h:224
Here is the call graph for this function:

◆ ~Queue()

Queue::~Queue ( )

Деструктор

Definition at line 118 of file Queue.cpp.

119{
120 info('i') << "Goodbye!" << std::endl;
121#ifdef USE_MPI
122 MPI_Finalize();
123#endif
124}//~Queue()

Member Function Documentation

◆ AddTask()

void Queue::AddTask ( int  _nProc,
std::unique_ptr< PassportGen _passport 
)
private

Добавление задачи в список

Parameters
[in]_nProcчисло запрашиваемых процессоров
[in]_passportумный указатель на паспорт задачи, направляемой в очередь

Definition at line 701 of file Queue.cpp.

702{
703 task.resize(task.size() + 1);
704 (task.end() - 1)->nProc = _nProc;
705 (task.end() - 1)->passport = std::move(_passport);
706 (task.end() - 1)->proc.resize(_nProc);
707 (task.end() - 1)->state = TaskState::waiting;
708}//AddTask(...)
std::vector< Task > task
Список описаний решаемых задач
Definition Queue.h:186
@ waiting
задача ожидает запуска
Here is the caller graph for this function:

◆ ConstructProcStateVar()

void Queue::ConstructProcStateVar ( )
private

Процедура, нумерующая задачи в возрастающем порядке

Необходима для корректного распределения задач по процессорам

Definition at line 615 of file Queue.cpp.

616{
617 std::vector<bool> prFlag;
618 int number = -1;
619
620 prFlag.resize(nProcAll, true); //их надо просматривать
621
622 for (int i = 0; i<nProcAll; ++i)
623
624#ifdef USE_MPI
625 if (procState[i] == MPI_UNDEFINED)
626 {
627 prFlag[i] = false; //уже просмотрели
628 procStateVar[i] = MPI_UNDEFINED;
629 } //if (ProcState[i]==MPI_UNDEFINED)
630#else
631 if (procState[i] == -32766)
632 {
633 prFlag[i] = false; //уже просмотрели
634 procStateVar[i] = -32766;
635 } //if (ProcState[i]==-32766)
636#endif
637
638 for (int i = 0; i<nProcAll; ++i)
639 if (prFlag[i])
640 {
641 prFlag[i] = false;
642 number++;
643
644 for (int s = i; s<nProcAll; ++s)
645 if (procState[s] == procState[i])
646 {
647 procStateVar[s] = number;
648 prFlag[s] = false;
649 }//if (ProcState[s]==ProcState[i])
650 } //if (pr_flag[i])
651}//ConstructProcStateVar()
Here is the caller graph for this function:

◆ LoadTasksList()

void Queue::LoadTasksList ( const std::string &  _tasksFile,
const std::string &  _mechanicsFile,
const std::string &  _defaultsFile,
const std::string &  _switchersFile 
)

Загрузка списка задач

Parameters
[in]_tasksFileконстантная ссылка на имя файла с описанием очереди задач
[in]_mechanicsFileконстантная ссылка на имя файла со словарем механических систем
[in]_defaultsFileконстантная ссылка на имя файла с описанием параметров по умолчанию
[in]_switchersFileконстантная ссылка на имя файла со значениями параметров-переключателей

Definition at line 712 of file Queue.cpp.

713{
714 std::string extTasksFile = _tasksFile;
715 std::string extMechanicsFile = _mechanicsFile;
716 std::string extDefaultsFile = _defaultsFile;
717 std::string extSwitchersFile = _switchersFile;
718
719 if (
720 fileExistTest(extTasksFile, info, true, {"txt", "TXT"}) &&
721 fileExistTest(extDefaultsFile, info, true, { "txt", "TXT" }) &&
722 fileExistTest(extSwitchersFile, info, true, { "txt", "TXT" })
723 )
724 {
725 std::stringstream tasksFile(Preprocessor(extTasksFile).resultString);
726 std::stringstream defaultsFile(Preprocessor(extDefaultsFile).resultString);
727 std::stringstream switchersFile(Preprocessor(extSwitchersFile).resultString);
728 std::vector<std::string> taskFolder;
729
730 std::unique_ptr<StreamParser> parserTaskList;
731 parserTaskList.reset(new StreamParser(info, "parser", tasksFile, '(', ')'));
732
733 std::vector<std::string> alltasks;
734 parserTaskList->get("problems", alltasks);
735
736 std::ptrdiff_t nTasks = std::count_if(alltasks.begin(), alltasks.end(), [](const std::string& a) {return (a.length() > 0);});
737 info.endl();
738 info('i') << "Number of problems to be solved: " << nTasks << std::endl;
739
740 for (size_t i = 0; i < alltasks.size(); ++i)
741 {
742 if (alltasks[i].length() > 0)
743 {
744 //делим имя задачи + выражение в скобках на 2 подстроки
745 std::pair<std::string, std::string> taskLine = StreamParser::SplitString(info, alltasks[i], false);
746
747 std::string dir = taskLine.first;
748
749 info.endl();
750 info('i') << "-------- Loading problem #" << i << " (" << dir << ") --------" << std::endl;
751
752 //вторую подстроку разделяем на вектор из строк по запятым, стоящим вне фигурных скобок
753 std::vector<std::string> vecTaskLineSecond = StreamParser::StringToVector(taskLine.second, '{', '}');
754
755 //создаем парсер и связываем его с параметрами профиля
756 std::stringstream tskStream(StreamParser::VectorStringToString(vecTaskLineSecond));
757 std::unique_ptr<StreamParser> parserTask;
758
759 parserTask.reset(new StreamParser(info, "problem parameters", tskStream, defaultsFile, {"pspfile", "np", "copyPath"}));
760
761 std::string pspFile;
762 int np;
763
764 //считываем нужные параметры с учетом default-значений
765 parserTask->get("pspfile", pspFile, &defaults::defaultPspFile);
766 parserTask->get("np", np, &defaults::defaultNp);
767
768 if (np > 1) // 31/12/2023
769 {
770 info.endl();
771 info('e') << "Internal MPI-parallelization is not supported longer!" << std::endl;
772 exit(-1);
773 }
774
775 std::string copyPath;
776 parserTask->get("copyPath", copyPath, &defaults::defaultCopyPath);
777 if (copyPath.length() > 0)
778 {
779 //Копировать паспорт поручаем только одному процессу
780 if (myidAll == 0)
781 {
782 std::string initDir(copyPath.begin(), copyPath.end());
783 std::string command;
784
785#if defined(_WIN32)
786 std::replace(dir.begin(), dir.end(), '/', '\\');
787 std::replace(initDir.begin(), initDir.end(), '/', '\\');
788#endif
789
790 VMlib::CreateDirectory(dir.c_str(), "");
791
792#if defined(_WIN32)
793 command = "copy \"" + initDir + "\\*.*\" "+ dir + "\\";
794#else
795 command = "cp ./" + initDir + "/* " + dir + "/";
796#endif
797
798 std::cout << "Copying files from folder \"" << initDir << "\" to \"" << dir << "\"" << std::endl;
799
800 int systemRet = system(command.c_str());
801 if(systemRet == -1)
802 {
803 // The system method failed
804 info('e') << "problem #" << i << " (" << dir << \
805 ") copying passport system method failed" << std::endl;
806 exit(-1);
807 }
808 std::cout << "Copying OK " << std::endl << std::endl;
809
810 //copyFile(pspFile, "./" + dir + "/" + pspFile);
811 }
812 }
813
814#ifdef USE_MPI
815 MPI_Barrier(MPI_COMM_WORLD);
816#endif
817
818 std::unique_ptr<PassportGen> ptrPsp;
819
820#ifdef CODE2D
821 ptrPsp.reset(new VM2D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
822#endif
823
824#ifdef CODE3D
825 ptrPsp.reset(new VM3D::Passport(info, dir, i, pspFile, extMechanicsFile, extDefaultsFile, extSwitchersFile, vecTaskLineSecond));
826#endif
827
828 AddTask(np, std::move(ptrPsp));
829
830 info('i') << "-------- Problem #" << i << " (" << dir << ") is loaded --------" << std::endl << std::endl;
831 }
832 } //for i
833
834 //tasksFile.close();
835 tasksFile.clear();
836
837 //defaultsFile.close();
838 defaultsFile.clear();
839
840 //switchersFile.close();
841 switchersFile.clear();
842
843 }
844 else
845 {
846 exit(1);
847 }
848
849}
void endl()
Вывод в поток логов пустой строки
Definition LogStream.h:103
Класс, позволяющий выполнять предварительную обработку файлов
void AddTask(int _nProc, std::unique_ptr< PassportGen > _passport)
Добавление задачи в список
Definition Queue.cpp:701
Класс, позволяющий выполнять разбор файлов и строк с настройками и параметрами
static std::vector< std::string > StringToVector(std::string line, char openBracket='(', char closeBracket=')')
Pазбор строки, содержащей запятые, на отдельные строки
static std::pair< std::string, std::string > SplitString(LogStream &info, std::string line, bool upcase=true)
Разбор строки на пару ключ-значение
static std::string VectorStringToString(const std::vector< std::string > &_vecString)
Объединение вектора (списка) из строк в одну строку
bool fileExistTest(std::string &fileName, LogStream &info, bool exitKey=false, const std::list< std::string > &extList={})
Проверка существования файла
Definition defs.h:342
void CreateDirectory(const std::string &dir, const std::string &name)
Создание каталога
Definition defs.h:441
const int defaultNp
Необходимое число процессоров для решения задачи
Definition defs.h:215
const std::string defaultCopyPath
Путь к каталогу с задачей для копирования в новые каталоги
Definition defs.h:218
const std::string defaultPspFile
Имя файла с паспортом задачи
Definition defs.h:212
Here is the call graph for this function:
Here is the caller graph for this function:

◆ RunConveyer()

void Queue::RunConveyer ( )

Запуск вычислительного конвейера (в рамках кванта времени)

Definition at line 655 of file Queue.cpp.

656{
657#ifdef USE_MPI
658 if (parallel.commWork != MPI_COMM_NULL)
659#else
660 if (parallel.commWork != 0x04000000)
661#endif
662 {
663 double kvantStartWallTime; //Время на главном процессоре, когда начался очередной квант
664 double deltaWallTime;
665
666 if (parallel.myidWork == 0)
667#ifdef USE_MPI
668 kvantStartWallTime = MPI_Wtime();
669#else
670 kvantStartWallTime = omp_get_wtime();
671#endif
672
674 //if (world->getCurrentStep() == 0)
675 // world->ZeroStep();
676
677 do
678 {
679 if (!world->isFinished())
680 world->Step();
681
682 if (parallel.myidWork == 0)
683#ifdef USE_MPI
684 deltaWallTime = MPI_Wtime() - kvantStartWallTime;
685#else
686 deltaWallTime = omp_get_wtime() - kvantStartWallTime;
687#endif
688
689#ifdef USE_MPI
690 MPI_Bcast(&deltaWallTime, 1, MPI_DOUBLE, 0, parallel.commWork);
691#endif
692 }
693 //Проверка окончания кванта по времени (или завершения задачи)
694 while (deltaWallTime < kvantTime);
695
696 }//if (commWork != MPI_COMM_NULL)
697}//RunConveyer()
int myidWork
Локальный номер процессора, решающего конкретную задачу
Definition Parallel.h:97
int commWork
Коммуникатор для решения конкретной задачи
Definition Parallel.h:93
const double kvantTime
Продолжительность кванта времени в секундах
Definition Queue.h:210
std::unique_ptr< WorldGen > world
Умный указатель на текущую решаемую задачу
Definition Queue.h:189
Parallel parallel
Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме
Definition Queue.h:228
Here is the caller graph for this function:

◆ TaskSplit()

void Queue::TaskSplit ( )

Процедура постановка новых задач на отсчет и занятие процессоров

Выполняется в начале очередного кванта

Definition at line 128 of file Queue.cpp.

129{
130 //Пересылаем информацию о состоянии процессоров на все компьютеры
131
132#ifdef USE_MPI
133 MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
134#else
136#endif
137
138#ifdef USE_MPI
139 if (myProcState == MPI_UNDEFINED)
140 world.reset(nullptr);
141#else
142 if (myProcState == -32766)
143 world.reset(nullptr);
144#endif
145
146 if (myidAll == 0)
147 {
148 currentKvant++;
149
150 int nfree = 0; //число свободных процессоров
151
152 //Обнуляем число готовых к старту задач
153 numberOfTask.prepared = 0;
154
155 //считаем число свободных процессоров
156 for (int i = 0; i < nProcAll; ++i)
157#ifdef USE_MPI
158 if (procState[i] == MPI_UNDEFINED)
159#else
160 if (procState[i] == -32766)
161#endif
162 nfree++;
163
164 //формируем номер следующей задачи, которая возможно будет поставлена на отсчет:
165 size_t taskFol = numberOfTask.finished + numberOfTask.solving;
166
167 //проверяем, достаточно ли свободных процессоров для запуска еще одной задачи
168 //при условии, что не все задачи уже решены
169 while ((taskFol < task.size()) && (nfree >= task[taskFol].nProc))
170 {
171 int p = 0; //число найденных свободных процессоров под задачу
172 int j = 0; //текущий счетчик процессоров
173
174 //подбираем свободные процессоры
175 do
176 {
177#ifdef USE_MPI
178 if (procState[j] == MPI_UNDEFINED)
179#else
180 if (procState[j] == -32766)
181#endif
182 {
183 //состояние процессора устанавливаем в "занят текущей задачей"
184 procState[j] = static_cast<int>(taskFol);
185
186 //номер процессора посылаем в перечень процессоров, решающих текущую задачу
187 task[taskFol].proc[p] = j;
188
189 //увеличиваем число найденных процессоров на единицу
190 p++;
191 }//if (ProcState[j]==MPI_UNDEFINED)
192
193 //переходим к следующему процессору
194 j++;
195 } while (p < task[taskFol].nProc);
196
197 //состояние задачи устанавливаем в режим "стартует"
198 task[taskFol].state = TaskState::starting;
199
200 //отмечаем номер кванта, когда задача начала считаться
201 task[taskFol].startEndKvant.first = currentKvant;
202
203 //изменяем счетчики числа задач
204 numberOfTask.prepared++;
205 numberOfTask.solving++;
206
207 //изменяем счетчик свободных процессоров
208 nfree -= task[taskFol].nProc;
209
210 //переходим к следующей задаче
211 taskFol++;
212
213 }//while ((nfree >= Task[task_fol].nproc)&&(task_fol<Task.size()))
214
215 }//if (myidAll == 0)
216
217 //Вывод информации о занятости процессоров задачами
218 if (myidAll == 0)
219 {
220 info.endl();
221 info('i') << "ProcStates: " << std::endl;
222 for (int i = 0; i < nProcAll; ++i)
223 info('-') << "proc[" << i << "] <=> " << ((procState[i] >= 0) ? std::string("problem[" + std::to_string(procState[i]) + "]") : std::string("free")) << std::endl;
224 info.endl();
225 }
226
227 //Пересылаем информацию о состоянии процессоров на все компьютеры
228#ifdef USE_MPI
229 MPI_Scatter(procState.data(), 1, MPI_INT, &myProcState, 1, MPI_INT, 0, MPI_COMM_WORLD);
230#else
232#endif
233
234 //Синхронизируем информацию
235#ifdef USE_MPI
236 MPI_Bcast(&numberOfTask.solving, 1, MPI_INT, 0, MPI_COMM_WORLD);
237 MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
238 MPI_Bcast(&numberOfTask.finished, 1, MPI_INT, 0, MPI_COMM_WORLD);
239#endif
240
241 //список головных процессоров на тех задачах,
242 //которые только что поставлены на счет:
243 std::vector<int> prepList;
244
245 //формируем списки prepList и pspList
246 //(выполняется на глобальном нулевом процессоре)
247 if (myidAll == 0)
248 {
249 for (size_t i = 0; i<task.size(); ++i)
250 //находим задачи в состоянии "стартует"
251 if (task[i].state == TaskState::starting)
252 {
253 //запоминаем номер того процессора, что является там головным
254 prepList.push_back(task[i].proc[0]);
255
256 //состояние задачи вереводим в "считает"
257 task[i].state = TaskState::running;
258 }
259 }//if myidAll==0
260
261
262 //Рассылаем количество стартующих задач в данном кванте на все машины
263#ifdef USE_MPI
264 MPI_Bcast(&numberOfTask.prepared, 1, MPI_INT, 0, MPI_COMM_WORLD);
265#endif
266 if (myidAll > 0)
267 {
268 prepList.resize(numberOfTask.prepared);
269 }//if myid==0
270
271 //следующий фрагмент выполняется только если в данном кванте стартуют новые задачи
272 if (numberOfTask.prepared > 0)
273 {
274 //пересылаем список головных машин стартующих задач на все процессоры
275#ifdef USE_MPI
276 MPI_Bcast(prepList.data(), numberOfTask.prepared, MPI_INT, 0, MPI_COMM_WORLD);
277#endif
278
279 //формируем группу и коммуникатор стартующих головных процессоров
280#ifdef USE_MPI
281 commStarting = MPI_COMM_NULL;
282 MPI_Group_incl(groupAll, numberOfTask.prepared, prepList.data(), &groupStarting);
283 MPI_Comm_create(MPI_COMM_WORLD, groupStarting, &commStarting);
284#else
285 commStarting = 0x04000000;
286 if (numberOfTask.prepared > 0)
287 commStarting = 0;
288#endif
289
290 //следующий фрагмент кода только для стартующих процессов,
291 //которые входят в коммуникатор comm_starting
292#ifdef USE_MPI
293 if (commStarting != MPI_COMM_NULL)
294#else
295 if (commStarting != 0x04000000)
296#endif
297 {
298 //получаем номер процесса в списке стартующих
299 int myidStarting;
300
301#ifdef USE_MPI
302 MPI_Comm_rank(commStarting, &myidStarting);
303#else
304 myidStarting = 0;
305#endif
306
307 //подготовка стартующих задач
308 parallel.myidWork = 0;
309
310#ifdef CODE2D
311 world.reset(new VM2D::World2D(task[myProcState].getPassport()));
312#endif
313
314#ifdef CODE3D
315 world.reset(new VM3D::World3D(task[myProcState].getPassport()));
316#endif
317
318 //Коммуникатор головных процессоров стартующих задач
319 //выполнил свое черное дело и будет удален
320#ifdef USE_MPI
321 MPI_Comm_free(&commStarting);
322#else
323 commStarting = 0x04000000;
324#endif
325 } //if(comm_starting != MPI_COMM_NULL)
326
327 //их группа тоже больше без надобности
328#ifdef USE_MPI
329 MPI_Group_free(&groupStarting);
330#endif
331 }//if (task_prepared>0)
332
333 //формируем группу и коммуникатор считающих головных процессоров
334 //в том числе тех, которые стартуют, и тех, которые ранее входили в commStarting
335 std::vector<int> solvList; //список номеров головных процессов
336
337 if (myidAll == 0)
338 {
339 //Если нулевой процессор свободен
340 //(не является головным в решении задачи в данном кванте) -
341 //- формально присоединяем его в группу головных
342 //необходимо для корректного обмена данными
343#ifdef USE_MPI
344 if (procState[0] == MPI_UNDEFINED)
345#else
346 if (procState[0] == -32766)
347#endif
348 solvList.push_back(0);
349
350 //далее ищем считающие задачи (в том числе только что стартующие)
351 //и собираем номера их головных процессоров
352 //(собираем в порядке просмотра номеров процессоров))
353 for (int s = 0; s<nProcAll; ++s)
354#ifdef USE_MPI
355 if ((procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
356#else
357 if ((procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s))
358#endif
359 solvList.push_back(s);
360
361 sizeCommSolving = static_cast<int>(solvList.size());
362
363 flagFinish.clear();
364 flagFinish.resize(solvList.size());
365
366 }//if myidAll==0
367
368 //число sizeCommSolving на единицу больше, чем taskSolving, если нулевой процессор свободен
369 //и равно taskSolving, если он занят решением задачи
370 //(если он занят решением задачи, то непременно является головным)
371
372 //пересылаем число головных процессоров и их список на все компьютеры
373 //(в том числе головной процессор - независимо от его состояния)
374#ifdef USE_MPI
375 MPI_Bcast(&sizeCommSolving, 1, MPI_INT, 0, MPI_COMM_WORLD);
376#endif
377
378 if (myidAll > 0)
379 solvList.resize(sizeCommSolving);
380
381#ifdef USE_MPI
382 MPI_Bcast(solvList.data(), sizeCommSolving, MPI_INT, 0, MPI_COMM_WORLD);
383#endif
384
385
386#ifdef USE_MPI
387 commSolving = MPI_COMM_NULL;
388
389 //формируем группу и коммуникатор головных процессоров
390 MPI_Group_incl(groupAll, sizeCommSolving, solvList.data(), &groupSolving);
391 MPI_Comm_create(MPI_COMM_WORLD, groupSolving, &commSolving);
392#else
393 commSolving = 0x04000000;
394 if (sizeCommSolving > 0)
395 commSolving = 1;
396#endif
397
398 if (myidAll == 0)
399 {
400 //составление неубывающего массива номеров для верного
401 //распределения задач по процессоров, с тем чтобы задачи
402 //с прошлого кванта оказались в тех же группах, что и раньше
404 }
405
406 //Пересылаем информацию о состоянии процессоров на все компьютеры
407#ifdef USE_MPI
408 MPI_Scatter(procStateVar.data(), 1, MPI_INT, &myProcStateVar, 1, MPI_INT, 0, MPI_COMM_WORLD);
409#else
411#endif
412
413 //Расщепляем весь коммуникатор MPI_COMM_WORLD (т.е. вообще все процессоры)
414 //на коммуникаторы решаемых задач
415 //В результате все процессоры, решающие конкретную задачу,
416 //объединяются в коммуникатор comm_work
417 //Несмотря на то, что он называется везде одинаково, у каждой задачи он свой
418 //и объединяет ровно те процессоры, которые надо
419 //Процессоры, состояние которых установлено в "свободен", т.е.
420 //ProcState=MPI_UNDEFINED (=-32766) ни в один
421 //новый коммуникатор commWork не попадут
422#ifdef USE_MPI
423 MPI_Comm_split(MPI_COMM_WORLD, myProcStateVar, 0, &parallel.commWork);
424#else
425 if (myProcStateVar != -32766)
426 parallel.commWork = 0;
427 else
428 parallel.commWork = 0x04000000;
429#endif
430
431 //следующий код выполняется процессорами, участвующими в решении задач,
432 //следовательно, входящими в коммуникаторы comm_work
433#ifdef USE_MPI
434 if (parallel.commWork != MPI_COMM_NULL)
435#else
436 if (parallel.commWork != 0x04000000)
437#endif
438 {
439 //опеределяем "локальный" номер данного процессора в коммуникаторе commWork и число процессов в нем
440#ifdef USE_MPI
441 MPI_Comm_size(parallel.commWork, &parallel.nProcWork);
442 MPI_Comm_rank(parallel.commWork, &parallel.myidWork);
443#else
445 parallel.myidWork = 0;
446#endif
447 //World равен nullptr только в случае, когда выполняется первый шаг
448 //в этом случае он создан только на главном процессоре группы;
449 //на остальных происходит его создание
450#ifdef CODE2D
451 if (world == nullptr)
452 world.reset(new VM2D::World2D(task[myProcState].getPassport()));
453#endif
454
455#ifdef CODE3D
456 if (world == nullptr)
457 world.reset(new VM3D::World3D(task[myProcState].getPassport()));
458#endif
459
460
461 if ((parallel.myidWork == 0) && (world->getCurrentStep() == 0))
462 {
463#ifdef CODE2D
464 VM2D::World2D& world2D = dynamic_cast<VM2D::World2D&>(*world);
465 //Создание файлов для записи сил
466 for (size_t q = 0; q < world2D.getPassport().airfoilParams.size(); ++q)
467 world2D.GenerateMechanicsHeader(q);
468 //Создание файла для записи временной статистики
469 world2D.getTimers().GenerateStatHeader();
470#endif
471
472#ifdef CODE3D
473 VM3D::World3D& world3D = dynamic_cast<VM3D::World3D&>(*world);
474 //Создание файлов для записи сил
475 //for (size_t q = 0; q < world3D.getPassport().airfoilParams.size(); ++q)
476 // world3D.GenerateMechanicsHeader(q);
477#endif
478 }
479 }
480
481}//TaskSplit()
Класс, опеделяющий текущую решаемую задачу
Definition World2D.h:74
VMlib::TimersGen & getTimers() const
Возврат ссылки на временную статистику выполнения шага расчета по времени
Definition World2D.h:276
const Passport & getPassport() const
Возврат константной ссылки на паспорт
Definition World2D.h:251
void GenerateMechanicsHeader(size_t mechanicsNumber)
Definition World2D.cpp:1457
int nProcWork
Число процессоров, решающих конкретную задачу
Definition Parallel.h:100
void ConstructProcStateVar()
Процедура, нумерующая задачи в возрастающем порядке
Definition Queue.cpp:615
int sizeCommSolving
Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах
Definition Queue.h:166
int myProcState
Состояние данного процессора
Definition Queue.h:196
int myProcStateVar
Состояние данного процессора
Definition Queue.h:203
int groupSolving
Definition Queue.h:159
int commSolving
Definition Queue.h:160
std::vector< int > flagFinish
Список возвращаемых флагов останова задачи
Definition Queue.h:128
int groupStarting
Definition Queue.h:157
int commStarting
Definition Queue.h:158
void GenerateStatHeader()
Формирование заголовка файла временной статистики
Definition TimesGen.cpp:87
@ starting
задача стартует
@ running
задача решается
Here is the call graph for this function:
Here is the caller graph for this function:

◆ TaskUpdate()

void Queue::TaskUpdate ( )

Процедура обновления состояния задач и процессоров

Выполняется в конце очередного кванта

Definition at line 485 of file Queue.cpp.

486{
487 info('i') << "------ Kvant finished ------" << std::endl;
488
489 //Код только для головных процессов, которые входят в коммуникатор commSolving
490 //т.е. выполняется только на головных процессорах, решающих задачи.
491 //Кажется, что можно было бы с тем же эфектом написать здесь if (myidWork==0),
492 //но это не так, поскольку нужно включить в себя еще нулевой процессор,
493 //который в любом случае присоединен к коммуникатору comm_solving,
494 //даже если он не решает задачу (а если решает - то он всегда головной)
495#ifdef USE_MPI
496 if (commSolving != MPI_COMM_NULL)
497 {
498 //определяем номер процессора в данном коммуникаторе -
499 // - вдруг потребуется на будущее!
500 int myidSolving;
501 MPI_Comm_rank(commSolving, &myidSolving);
502
503 //Алгоритм возвращения результатов расчета конкретной задачи
504 //Если срабатывает признак того, что решение задачи можно прекращать
505 //или не решается ни одна задача (а в комуникатор входит
506 //только 0-й процессор, присоединеннй туда насильно)
507 int stopSignal = ((parallel.commWork == MPI_COMM_NULL) || (world->isFinished())) ? 1 : 0;
508
509 //пересылаем признаки прекращения решения задач на нулевой процессор
510 //коммуникатора comm_solving - т.е. на глобальный процессор с номером 0 ---
511 // --- вот для чего его насильно присоединяли к этому коммуникатору
512 //если сам по себе он туда не входил
513 MPI_Gather(&stopSignal, 1, MPI_INT, flagFinish.data(), 1, MPI_INT, 0, commSolving);
514
515 //после пересылки информации коммуникатор comm_solving уничтожается
516 MPI_Comm_free(&commSolving);
517
518 } //if(comm_solving != MPI_COMM_NULL)
519
520 //и соответствующая ему группа тоже удаляется
521 MPI_Group_free(&groupSolving);
522
523
524 if (parallel.commWork != MPI_COMM_NULL)
525 {
526 //К этому месту все процессоры, решающие задачи, приходят уже выполнив все расчеты
527 //в рамках одного кванта времени
528 //Поэтому коммуникаторы решаемых задач нам уже ни к чему - уничтожаем их
529 MPI_Comm_free(&parallel.commWork);
530 }
531#else
532 if (commSolving != 0x04000000)
533 {
534 int myidSolving = 0;
535 int stopSignal = ((parallel.commWork == 0x04000000) || (world->isFinished())) ? 1 : 0;
536 flagFinish[0] = stopSignal;
537 commSolving = 0x04000000;
538 } //if(comm_solving != 0x04000000)
539
540 if (parallel.commWork != 0x04000000)
541 parallel.commWork = 0x04000000;
542#endif
543
544 //Обновление состояния задач и высвобождение процессоров из отсчитавших задач
545 //(выполняется на глобальном нулевом процессоре)
546 if (myidAll == 0)
547 {
548 //Список номеров задач, которые сейчас считаются
549 std::vector<int> taskList;
550
551 //если состояние "считается" запоминаем эту задачу
552 //задачи запоминаются в порядке просмотра процессоров
553 for (int s = 0; s<nProcAll; ++s)
554#ifdef USE_MPI
555 if ( (procState[s] != MPI_UNDEFINED) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
556#else
557 if ( (procState[s] != -32766) && (task[procState[s]].state == TaskState::running) && (task[procState[s]].proc[0] == s) )
558#endif
559 taskList.push_back(procState[s]);
560
561 //Тем задачам, которые во флаге прекращения счета вернули "1",
562 //ставим состояние завершения счета.
563 //Конструкция +sizeCommSolving-taskSolving введена для смещения в массиве на единицу
564 //если нулевой процессор свободен - тогда он присоединяется формально в нулевом
565 //элементе массива flagFinish, а содержательная часть массива смещается на 1
566 //(в этом случае sizeCommSolving как раз будет на 1 больше, чем taskSolving, см. выше)
567 //если же нулевой процесс занят решением задачи, то смещения нет,
568 //поскольку в этом случае sizeCommSolving и taskSolving равны
569 for (int i = 0; i < numberOfTask.solving; ++i)
570 if (flagFinish[i + sizeCommSolving - numberOfTask.solving] == 1)
571 task[taskList[i]].state = TaskState::finishing;
572 } //if myid==0
573
574 if (myidAll == 0)
575 {
576 for (size_t i = 0; i < task.size(); ++i)
577 {
578 //освобождаем процессоры от сосчитавшихся задач
579 if (task[i].state == TaskState::finishing)
580 {
581 //устанавливаем состояние задачи в "отсчитано"
582 task[i].state = TaskState::done;
583
584 //отмечаем последний квант, когда задача считалась
585 task[i].startEndKvant.second = currentKvant;
586
587 //изменяем счетчики
588 numberOfTask.solving--;
589 numberOfTask.finished++;
590
591 //состояние процессоров, решавших данную задачу, устанавливаем в "свободен"
592 for (int p = 0; p < task[i].nProc; ++p)
593 {
594#ifdef USE_MPI
595 procState[task[i].proc[p]] = MPI_UNDEFINED;
596#else
597 procState[task[i].proc[p]] = -32766;
598#endif
599 }//for p
600 }//if (Task[i].state == 3)
601 }//for i
602
603 //определяем, надо ли еще выделять квант времени или можно заканчивать работу
604 //путем сравнения числа отсчитанных задач с общим числом задач
605 nextKvant = (numberOfTask.finished < static_cast<int>(task.size())) ? 1 : 0;
606 }//if (myid == 0)
607
608#ifdef USE_MPI
609 MPI_Bcast(&nextKvant, 1, MPI_INT, 0, MPI_COMM_WORLD);
610#endif
611}//TaskUpdate()
int nextKvant
Признак необходимости выполнения следующего кванта и продолжения расчета
Definition Queue.h:220
@ finishing
задача финиширует
@ done
задача решена
Here is the caller graph for this function:

Member Data Documentation

◆ commSolving

int VMlib::Queue::commSolving
private

Definition at line 160 of file Queue.h.

◆ commStarting

int VMlib::Queue::commStarting
private

Definition at line 158 of file Queue.h.

◆ currentKvant

int VMlib::Queue::currentKvant

Номер текущего кванта времени

Definition at line 213 of file Queue.h.

◆ finished

int VMlib::Queue::finished

Число уже решенных задач

Definition at line 98 of file Queue.h.

◆ flagFinish

std::vector<int> VMlib::Queue::flagFinish
private

Список возвращаемых флагов останова задачи

Позиции 0, 1, 2 и т.д. заполняется головными процессами, решающими задачи

Warning
Нулевая позиция — всегда соответствует процессору с myid=0 (глобальному) даже если он не участвует в данном кванте в решении задачи; в этом случае головные процессы заполняют позиции 1, 2, 3 и т.д.

Definition at line 128 of file Queue.h.

◆ groupAll

int VMlib::Queue::groupAll
private

Definition at line 156 of file Queue.h.

◆ groupSolving

int VMlib::Queue::groupSolving
private

Definition at line 159 of file Queue.h.

◆ groupStarting

int VMlib::Queue::groupStarting
private

Definition at line 157 of file Queue.h.

◆ info

LogStream VMlib::Queue::info
private

Поток для вывода логов и сообщений об ошибках

Definition at line 180 of file Queue.h.

◆ kvantTime

const double VMlib::Queue::kvantTime = 1.0

Продолжительность кванта времени в секундах

В пределах кванта по времени не производится глобальной синхронизации всех процессоров, решающих все задачи. Проверка состояния очереди и ее обновление производятся между квантами. По исчерпании кванта времени выполнение шагов по времени всеми процессорами прекращается до обновления очереди

Definition at line 210 of file Queue.h.

◆ myidAll

int VMlib::Queue::myidAll
private

Глобальный номер процессора

Definition at line 131 of file Queue.h.

◆ myProcState

int VMlib::Queue::myProcState

Состояние данного процессора


Принимает значения :

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 196 of file Queue.h.

◆ myProcStateVar

int VMlib::Queue::myProcStateVar

Состояние данного процессора


Принимает значения :

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 203 of file Queue.h.

◆ nextKvant

int VMlib::Queue::nextKvant

Признак необходимости выполнения следующего кванта и продолжения расчета

Принимает значения:

  • 1 — делать еще один квант (еще есть что считать),
  • 0 — расчет окончен, выход (все расчеты выполнены).

Definition at line 220 of file Queue.h.

◆ nProcAll

int VMlib::Queue::nProcAll
private

Общее число процессоров

Definition at line 134 of file Queue.h.

◆ [struct]

struct { ... } VMlib::Queue::numberOfTask

Структура, содержащая информацию о количестве задач в данный момент времени

Содержит следующие поля:

  • prepared — число подготовленных к запуску задач;
  • solving — число решаемых в данный момент задач;
  • finished — число уже решенных задач.

◆ parallel

Parallel VMlib::Queue::parallel

Класс, опеделяющий параметры исполнения задачи в параллельном MPI-режиме

Внутри него содержатся такие параметры, как:

  • commWork — коммуникатор для решения конкретной задачи;
  • myidWork — локальный номер данного процессора в коммуникаторе процессоров, решающих конкретную задачу;
  • nProcWork — число процессоров, решающих конкретную задачу.

Definition at line 228 of file Queue.h.

◆ prepared

int VMlib::Queue::prepared

Число подготовленных к запуску задач

Включено в число решаемых в данный момент задач

Definition at line 90 of file Queue.h.

◆ procState

std::vector<int> VMlib::Queue::procState
private

Список состояний процессоров

Длина списка равна числу процессоров
Заполняется только для на главном узле, у которого myid=0
Принимает значения:

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).

Definition at line 108 of file Queue.h.

◆ procStateVar

std::vector<int> VMlib::Queue::procStateVar
private

Модифицированный список состояний процессоров

Длина списка равна числу процессоров
Заполняется только на главном узле, у которого myid=0
Принимает значения:

  • MPI_UNDEFINED (=-32766, но в зависимости от реализации MPI может быть и другой константой) — свободен;
  • xxx — занят задачей номер xxx (номер из списка задач).
    Аналогичен procState, но отличается от него следующим:
  • процессы, занятые одной задачей, получают один и тот же номер;
  • позиции, в которых впервые появляются номера в этом списке, образуют возрастающую последовательность.

Definition at line 120 of file Queue.h.

◆ sizeCommSolving

int VMlib::Queue::sizeCommSolving
private

Число процессоров в группе для головных процессоров в решаемых в данном кванте времени задачах

В него всегда входит 0-й процессор, даже если он не раешает задачу в данном кванте времени

Definition at line 166 of file Queue.h.

◆ solving

int VMlib::Queue::solving

Число решаемых в данный момент задач

Включает число подготовленных к запуску задач

Definition at line 95 of file Queue.h.

◆ task

std::vector<Task> VMlib::Queue::task

Список описаний решаемых задач

В описаниях содержится информация о прохождении задач и их текущем состоянии

Definition at line 186 of file Queue.h.

◆ world

std::unique_ptr<WorldGen> VMlib::Queue::world

Умный указатель на текущую решаемую задачу

Definition at line 189 of file Queue.h.


The documentation for this class was generated from the following files: