Code_TYMPAN  4.4.0
Industrial site acoustic simulation
threading.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) <2012-2024> <EDF-DTG> <FRANCE>
3  * This file is part of Code_TYMPAN (R).
4  * Code_TYMPAN (R) is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  * Code_TYMPAN (R) is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  * See the GNU General Public License for more details.
12  * You should have received a copy of the GNU General Public License along
13  * with Code_TYMPAN (R). If not, see <https://www.gnu.org/licenses/>.
14  */
15 
16 #include "threading.h"
17 
19 {
20  _bToEnd = false;
21 }
22 
24 {
25  this->terminate();
26 }
27 
29 {
30  bool remainingTasksToComplete = true;
31 
32  while (isRunning() && !_bToEnd && remainingTasksToComplete)
33  {
34  // Wait for available task.
36  if (!_pool->_tasks.empty())
37  {
38  // Dequeue next task.
39  LPOTask task = _pool->_tasks.front();
40  _pool->_tasks.pop();
41 
43 
44  // Signal all that task is running.
46  task->_running = true;
47 
48  // Run task.
49  task->main();
50 
51  // Signal all that task is completed.
52  task->_running = false;
53  task->_completed = true;
54  task->wakeAll();
56 
57  // Increment pool counter
59  _pool->_counter++;
61  }
62  else
63  {
65  remainingTasksToComplete = false;
66  }
67  }
68 }
69 
70 OTask::OTask() : _running(false), _completed(false), _canceled(false) {}
71 
73 {
74  while (!isCompleted() && !isCanceled())
75  {
76  wait(this);
77  }
78 }
79 
80 bool OTask::isRunning() const
81 {
83  return _running;
84 }
85 
86 bool OTask::isCompleted() const
87 {
89  return _completed;
90 }
91 
92 bool OTask::isCanceled() const
93 {
95  return _canceled;
96 }
97 
99 {
101  _running = _completed = _canceled = false;
102 }
103 
104 OThreadPool::OThreadPool(unsigned int slaves) : _totalCount(0), _counter(0)
105 {
106  // Allocate slave threads.
107  for (unsigned int i = 0; i < slaves; ++i)
108  {
109  OSlaveThread* thread = new OSlaveThread(this);
110  push_back(thread);
111  }
112 }
113 
115 {
117 
118  // Wait for queue to become empty.
119  while (!_tasks.empty())
120  {
121  // Wait for last task in queue to at least start running.
122  LPOTask task = _tasks.back();
123  task->_completed = true;
124  task->_canceled = true;
125  }
126 
127  // Now terminate all threads.
128  unsigned int i = 0;
129  for (i = 0; i < size(); ++i)
130  {
131  (*this)[i]->_bToEnd = true;
132  (*this)[i]->terminate();
133  }
134 
135  // Signal them to wake up.
137 
138  // Then delete them (the thread destructor will wait for thread completion).
139  for (i = 0; i < size(); ++i)
140  {
141  delete (*this)[i];
142  }
143 }
144 
146 {
147  // Reset task flags.
148  task->reset();
149 
150  // Push task onto queue and signal availability.
152  _tasks.push(task);
154 }
155 
156 unsigned int OThreadPool::getTotalCount() const
157 {
159  return _totalCount;
160 }
161 
162 unsigned int OThreadPool::getCount() const
163 {
165  return _counter;
166 }
167 
168 void OThreadPool::begin(unsigned int count)
169 {
170  TY_LOCK_SHARED_MUTEX(this);
171  _totalCount = count;
172  _counter = 0;
174 }
175 
177 {
178 
179  for (unsigned int i = 0; i < size(); ++i)
180  {
181  (*this)[i]->_bToEnd = false;
182  (*this)[i]->start();
183  }
184 }
185 
187 {
188  std::vector<OSlaveThread*>& threads = static_cast<std::vector<OSlaveThread*>&>(*this);
189  for (OSlaveThread* slaveThread : threads)
190  {
191  slaveThread->wait();
192  }
193  stop();
194  return true;
195 }
196 
198 {
199  TY_LOCK_SHARED_MUTEX(this);
200 
201  // For each task
202  for (size_t i = 0; i < _tasks.size(); ++i)
203  {
204  // Dequeue next task
205  LPOTask task = _tasks.front();
206  _tasks.pop();
207 
208  // Cancel task
210  task->_canceled = true;
212 
213  // Increment counter
214  _counter++;
215  }
216 
217  for (unsigned int i = 0; i < size(); ++i)
218  {
219  (*this)[i]->_bToEnd = true;
220  }
221 
223 
224  // Waiting for thread termination
225  while (getCount() < getTotalCount())
226  {
227  OSleeper::msleep(1);
228  }
229 }
This class defines a thread for running tasks in a threads collection. Slave thread for the threads c...
Definition: threading.h:132
OThreadPool * _pool
Pointer on the parent threads collection.
Definition: threading.h:144
~OSlaveThread()
Destroy the slave thread; wait for the end of the thread.
Definition: threading.cpp:23
bool _bToEnd
Definition: threading.h:140
OSlaveThread(OThreadPool *pool)
Build a slave thread for a threads collection.
Definition: threading.cpp:18
void run()
Run a waiting task.
Definition: threading.cpp:28
static void msleep(unsigned long msecs)
Definition: threading.h:47
Task of a threads collection.
Definition: threading.h:168
bool isCanceled() const
Return true if the task has been cancelled, false otherwise.
Definition: threading.cpp:92
void reset()
Reset the task status (_running=false and _completed=false)
Definition: threading.cpp:98
bool _running
Running flag.
Definition: threading.h:209
bool isCompleted() const
Return true if the task is completed, false otherwise.
Definition: threading.cpp:86
bool isRunning() const
Return true if the task is running, false otherwise.
Definition: threading.cpp:80
bool _completed
Completed flag.
Definition: threading.h:212
virtual ~OTask()
Destructor : waits for the end of the task to destroy it.
Definition: threading.cpp:72
OTask()
Default constructor.
Definition: threading.cpp:70
bool _canceled
Cancel flag.
Definition: threading.h:215
Slave threads collection.
Definition: threading.h:259
unsigned int _totalCount
Total number of tasks to run.
Definition: threading.h:307
std::queue< LPOTask > _tasks
Tasks queue.
Definition: threading.h:304
unsigned int getTotalCount() const
Return the total number of tasks.
Definition: threading.cpp:156
void begin(unsigned int count)
Begin solver.
Definition: threading.cpp:168
OThreadPool(unsigned int slaves)
Build a threads collection and allocate "slaves" thread.
Definition: threading.cpp:104
void startPool()
Definition: threading.cpp:176
virtual ~OThreadPool()
Destructor.
Definition: threading.cpp:114
bool end()
End solver.
Definition: threading.cpp:186
unsigned int _counter
Total number of ended tasks.
Definition: threading.h:310
unsigned int getCount() const
Return the counter.
Definition: threading.cpp:162
void stop()
Cancel the pending tasks.
Definition: threading.cpp:197
virtual void push(OTask *task)
Add a task to the queue.
Definition: threading.cpp:145
friend class OSlaveThread
Definition: threading.h:312
#define TY_LOCK_SHARED_MUTEX(name)
Definition: threading.h:81
#define TY_UNLOCK_SHARED_MUTEX(name)
Definition: threading.h:82
#define TY_OMUTEXLOCKER_SHARED_MUTEX(name)
Definition: threading.h:83