Code_TYMPAN  4.4.0
Industrial site acoustic simulation
threading.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) <2012-2024> <EDF-DTG> <FRANCE>
3  * This file is part of Code_TYMPAN (R).
4  * Code_TYMPAN (R) is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  * Code_TYMPAN (R) is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  * See the GNU General Public License for more details.
12  * You should have received a copy of the GNU General Public License along
13  * with Code_TYMPAN (R). If not, see <https://www.gnu.org/licenses/>.
14  */
15 
16 #include "threading.h"
17 
19 {
20  _bToEnd = false;
21 }
22 
24 {
25  this->terminate();
26 }
27 
29 {
30  while (isRunning() && !_bToEnd)
31  {
32  // Wait for available task.
34  if (isRunning() && !_bToEnd && !_pool->_tasks.empty())
35  {
36  // Dequeue next task.
37  LPOTask task = _pool->_tasks.front();
38  _pool->_tasks.pop();
39 
41 
42  // Signal all that task is running.
44  task->_running = true;
45 
46  // Run task.
47  task->main();
48 
49  // Signal all that task is completed.
50  task->_running = false;
51  task->_completed = true;
52  task->wakeAll();
54 
55  // Increment pool counter
57  _pool->_counter++;
59  }
60  else
62  }
63 }
64 
65 OTask::OTask() : _running(false), _completed(false), _canceled(false) {}
66 
68 {
69  while (!isCompleted() && !isCanceled())
70  {
71  wait(this);
72  }
73 }
74 
75 bool OTask::isRunning() const
76 {
78  return _running;
79 }
80 
81 bool OTask::isCompleted() const
82 {
84  return _completed;
85 }
86 
87 bool OTask::isCanceled() const
88 {
90  return _canceled;
91 }
92 
94 {
96  _running = _completed = _canceled = false;
97 }
98 
99 OThreadPool::OThreadPool(unsigned int slaves) : _totalCount(0), _counter(0)
100 {
101  // Allocate slave threads.
102  for (unsigned int i = 0; i < slaves; ++i)
103  {
104  OSlaveThread* thread = new OSlaveThread(this);
105  push_back(thread);
106  }
107 }
108 
110 {
112 
113  // Wait for queue to become empty.
114  while (!_tasks.empty())
115  {
116  // Wait for last task in queue to at least start running.
117  LPOTask task = _tasks.back();
118  task->_completed = true;
119  task->_canceled = true;
120  }
121 
122  // Now terminate all threads.
123  unsigned int i = 0;
124  for (i = 0; i < size(); ++i)
125  {
126  (*this)[i]->_bToEnd = true;
127  (*this)[i]->terminate();
128  }
129 
130  // Signal them to wake up.
132 
133  // Then delete them (the thread destructor will wait for thread completion).
134  for (i = 0; i < size(); ++i)
135  {
136  delete (*this)[i];
137  }
138 }
139 
141 {
142  // Reset task flags.
143  task->reset();
144 
145  // Push task onto queue and signal availability.
147  _tasks.push(task);
149 }
150 
151 unsigned int OThreadPool::getTotalCount() const
152 {
154  return _totalCount;
155 }
156 
157 unsigned int OThreadPool::getCount() const
158 {
160  return _counter;
161 }
162 
163 void OThreadPool::begin(unsigned int count)
164 {
165  TY_LOCK_SHARED_MUTEX(this);
166  _totalCount = count;
167  _counter = 0;
169 }
170 
172 {
173 
174  for (unsigned int i = 0; i < size(); ++i)
175  {
176  (*this)[i]->_bToEnd = false;
177  (*this)[i]->start();
178  }
179 }
180 
182 {
183  unsigned int totalCount = getTotalCount();
184 
185  unsigned int last = 0;
186  while (last < totalCount)
187  {
188  unsigned int current = getCount();
189  last = current;
190  OSleeper::msleep(5);
191  }
192  stop();
193  return true;
194 }
195 
197 {
198  TY_LOCK_SHARED_MUTEX(this);
199 
200  // For each task
201  for (size_t i = 0; i < _tasks.size(); ++i)
202  {
203  // Dequeue next task
204  LPOTask task = _tasks.front();
205  _tasks.pop();
206 
207  // Cancel task
209  task->_canceled = true;
211 
212  // Increment counter
213  _counter++;
214  }
215 
216  for (unsigned int i = 0; i < size(); ++i)
217  {
218  (*this)[i]->_bToEnd = true;
219  }
220 
222 
223  // Waiting for thread termination
224  while (getCount() < getTotalCount())
225  {
226  OSleeper::msleep(1);
227  }
228 }
This class defines a thread for running tasks in a threads collection. Slave thread for the threads c...
Definition: threading.h:132
OThreadPool * _pool
Pointer on the parent threads collection.
Definition: threading.h:144
~OSlaveThread()
Destroy the slave thread; wait for the end of the thread.
Definition: threading.cpp:23
bool _bToEnd
Definition: threading.h:140
OSlaveThread(OThreadPool *pool)
Build a slave thread for a threads collection.
Definition: threading.cpp:18
void run()
Run a waiting task.
Definition: threading.cpp:28
static void msleep(unsigned long msecs)
Definition: threading.h:47
Task of a threads collection.
Definition: threading.h:168
bool isCanceled() const
Return true if the task has been cancelled, false otherwise.
Definition: threading.cpp:87
void reset()
Reset the task status (_running=false and _completed=false)
Definition: threading.cpp:93
bool _running
Running flag.
Definition: threading.h:209
bool isCompleted() const
Return true if the task is completed, false otherwise.
Definition: threading.cpp:81
bool isRunning() const
Return true if the task is running, false otherwise.
Definition: threading.cpp:75
bool _completed
Completed flag.
Definition: threading.h:212
virtual ~OTask()
Destructor : waits for the end of the task to destroy it.
Definition: threading.cpp:67
OTask()
Default constructor.
Definition: threading.cpp:65
bool _canceled
Cancel flag.
Definition: threading.h:215
Slave threads collection.
Definition: threading.h:259
unsigned int _totalCount
Total number of tasks to run.
Definition: threading.h:307
std::queue< LPOTask > _tasks
Tasks queue.
Definition: threading.h:304
unsigned int getTotalCount() const
Return the total number of tasks.
Definition: threading.cpp:151
void begin(unsigned int count)
Begin solver.
Definition: threading.cpp:163
OThreadPool(unsigned int slaves)
Build a threads collection and allocate "slaves" thread.
Definition: threading.cpp:99
void startPool()
Definition: threading.cpp:171
virtual ~OThreadPool()
Destructor.
Definition: threading.cpp:109
bool end()
End solver.
Definition: threading.cpp:181
unsigned int _counter
Total number of ended tasks.
Definition: threading.h:310
unsigned int getCount() const
Return the counter.
Definition: threading.cpp:157
void stop()
Cancel the pending tasks.
Definition: threading.cpp:196
virtual void push(OTask *task)
Add a task to the queue.
Definition: threading.cpp:140
friend class OSlaveThread
Definition: threading.h:312
#define TY_LOCK_SHARED_MUTEX(name)
Definition: threading.h:81
#define TY_UNLOCK_SHARED_MUTEX(name)
Definition: threading.h:82
#define TY_OMUTEXLOCKER_SHARED_MUTEX(name)
Definition: threading.h:83