1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-10 23:44:24 +00:00

Cleaned up and simplified the ThreadPool class - addJob now takes a flag to indicate whether the pool should delete that job, and the class's constructor arguments have been simplified.

This commit is contained in:
jules 2012-02-28 15:08:26 +00:00
parent 8369d9b3ab
commit 5b22611306
5 changed files with 156 additions and 183 deletions

View file

@ -417,7 +417,7 @@ private:
void writeProjects (const OwnedArray<LibraryModule>& modules)
{
ThreadPool threadPool (4, false, 30000);
ThreadPool threadPool;
// keep a copy of the basic generated files group, as each exporter may modify it.
const ValueTree originalGeneratedGroup (generatedFilesGroup.state.createCopy());
@ -437,7 +437,7 @@ private:
sortGroupRecursively (generatedFilesGroup);
exporter->groups.add (generatedFilesGroup);
threadPool.addJob (new ExporterJob (*this, exporter.exporter.release(), modules));
threadPool.addJob (new ExporterJob (*this, exporter.exporter.release(), modules), true);
}
else
{
@ -471,7 +471,7 @@ private:
owner.addError (error.message);
}
return jobHasFinishedAndShouldBeDeleted;
return jobHasFinished;
}
private:

View file

@ -177,10 +177,6 @@ public:
{
}
~DemoThreadPoolJob()
{
}
JobStatus runJob()
{
// this is the code that runs this job. It'll be repeatedly called until we return
@ -252,7 +248,7 @@ public:
while (balls.size() < 5)
addABall();
startTimer (2000);
startTimer (300);
}
}
@ -276,7 +272,7 @@ public:
addAndMakeVisible (newBall);
newBall->parentSizeChanged();
pool.addJob (newBall);
pool.addJob (newBall, false);
}
else
{

View file

@ -112,6 +112,9 @@ public:
//==============================================================================
// CPU and memory information..
/** Returns the number of CPUs. */
static int getNumCpus() noexcept { return getCPUFlags().numCpus; }
/** Returns the approximate CPU speed.
@returns the speed in megahertz, e.g. 1500, 2500, 32000 (depending on
@ -137,9 +140,6 @@ public:
/** Checks whether AMD 3DNOW instructions are available. */
static bool has3DNow() noexcept { return getCPUFlags().has3DNow; }
/** Returns the number of CPUs. */
static int getNumCpus() noexcept { return getCPUFlags().numCpus; }
//==============================================================================
/** Finds out how much RAM is in the machine.

View file

@ -55,15 +55,13 @@ void ThreadPoolJob::signalJobShouldExit()
shouldStop = true;
}
//==============================================================================
class ThreadPool::ThreadPoolThread : public Thread
{
public:
ThreadPoolThread (ThreadPool& pool_)
: Thread ("Pool"),
pool (pool_),
busy (false)
pool (pool_)
{
}
@ -78,41 +76,48 @@ public:
private:
ThreadPool& pool;
bool volatile busy;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread);
};
//==============================================================================
ThreadPool::ThreadPool (const int numThreads,
const bool startThreadsOnlyWhenNeeded,
const int stopThreadsWhenNotUsedTimeoutMs)
: threadStopTimeout (stopThreadsWhenNotUsedTimeoutMs),
priority (5)
ThreadPool::ThreadPool (const int numThreads)
{
jassert (numThreads > 0); // not much point having one of these with no threads in it.
jassert (numThreads > 0); // not much point having a pool without any threads!
for (int i = jmax (1, numThreads); --i >= 0;)
threads.add (new ThreadPoolThread (*this));
createThreads (numThreads);
}
if (! startThreadsOnlyWhenNeeded)
for (int i = threads.size(); --i >= 0;)
threads.getUnchecked(i)->startThread (priority);
ThreadPool::ThreadPool()
{
createThreads (SystemStats::getNumCpus());
}
ThreadPool::~ThreadPool()
{
removeAllJobs (true, 4000);
removeAllJobs (true, 5000);
stopThreads();
}
int i;
for (i = threads.size(); --i >= 0;)
void ThreadPool::createThreads (int numThreads)
{
for (int i = jmax (1, numThreads); --i >= 0;)
threads.add (new ThreadPoolThread (*this));
for (int i = threads.size(); --i >= 0;)
threads.getUnchecked(i)->startThread();
}
void ThreadPool::stopThreads()
{
for (int i = threads.size(); --i >= 0;)
threads.getUnchecked(i)->signalThreadShouldExit();
for (i = threads.size(); --i >= 0;)
for (int i = threads.size(); --i >= 0;)
threads.getUnchecked(i)->stopThread (500);
}
void ThreadPool::addJob (ThreadPoolJob* const job)
void ThreadPool::addJob (ThreadPoolJob* const job, const bool deleteJobWhenFinished)
{
jassert (job != nullptr);
jassert (job->pool == nullptr);
@ -122,38 +127,11 @@ void ThreadPool::addJob (ThreadPoolJob* const job)
job->pool = this;
job->shouldStop = false;
job->isActive = false;
job->shouldBeDeleted = deleteJobWhenFinished;
{
const ScopedLock sl (lock);
jobs.add (job);
int numRunning = 0;
for (int i = threads.size(); --i >= 0;)
if (threads.getUnchecked(i)->isThreadRunning() && ! threads.getUnchecked(i)->threadShouldExit())
++numRunning;
if (numRunning < threads.size())
{
bool startedOne = false;
int n = 1000;
while (--n >= 0 && ! startedOne)
{
for (int i = threads.size(); --i >= 0;)
{
if (! threads.getUnchecked(i)->isThreadRunning())
{
threads.getUnchecked(i)->startThread (priority);
startedOne = true;
break;
}
}
if (! startedOne)
Thread::sleep (2);
}
}
}
for (int i = threads.size(); --i >= 0;)
@ -208,6 +186,7 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job,
const int timeOutMs)
{
bool dontWait = true;
OwnedArray<ThreadPoolJob> deletionList;
if (job != nullptr)
{
@ -225,7 +204,7 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job,
else
{
jobs.removeValue (job);
job->pool = nullptr;
addToDeleteList (deletionList, job);
}
}
}
@ -233,37 +212,35 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job,
return dontWait || waitForJobToFinish (job, timeOutMs);
}
bool ThreadPool::removeAllJobs (const bool interruptRunningJobs,
const int timeOutMs,
const bool deleteInactiveJobs,
bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, const int timeOutMs,
ThreadPool::JobSelector* selectedJobsToRemove)
{
Array <ThreadPoolJob*> jobsToWaitFor;
{
const ScopedLock sl (lock);
OwnedArray<ThreadPoolJob> deletionList;
for (int i = jobs.size(); --i >= 0;)
{
ThreadPoolJob* const job = jobs.getUnchecked(i);
const ScopedLock sl (lock);
if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
for (int i = jobs.size(); --i >= 0;)
{
if (job->isActive)
{
jobsToWaitFor.add (job);
ThreadPoolJob* const job = jobs.getUnchecked(i);
if (interruptRunningJobs)
job->signalJobShouldExit();
}
else
if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
{
jobs.remove (i);
if (job->isActive)
{
jobsToWaitFor.add (job);
if (deleteInactiveJobs)
delete job;
if (interruptRunningJobs)
job->signalJobShouldExit();
}
else
job->pool = nullptr;
{
jobs.remove (i);
addToDeleteList (deletionList, job);
}
}
}
}
@ -274,8 +251,12 @@ bool ThreadPool::removeAllJobs (const bool interruptRunningJobs,
for (;;)
{
for (int i = jobsToWaitFor.size(); --i >= 0;)
if (! isJobRunning (jobsToWaitFor.getUnchecked (i)))
{
ThreadPoolJob* const job = jobsToWaitFor.getUnchecked (i);
if (! isJobRunning (job))
jobsToWaitFor.remove (i);
}
if (jobsToWaitFor.size() == 0)
break;
@ -308,96 +289,90 @@ bool ThreadPool::setThreadPriorities (const int newPriority)
{
bool ok = true;
if (priority != newPriority)
{
priority = newPriority;
for (int i = threads.size(); --i >= 0;)
if (! threads.getUnchecked(i)->setPriority (newPriority))
ok = false;
}
for (int i = threads.size(); --i >= 0;)
if (! threads.getUnchecked(i)->setPriority (newPriority))
ok = false;
return ok;
}
bool ThreadPool::runNextJob()
ThreadPoolJob* ThreadPool::pickNextJobToRun()
{
ThreadPoolJob* job = nullptr;
OwnedArray<ThreadPoolJob> deletionList;
{
const ScopedLock sl (lock);
for (int i = 0; i < jobs.size(); ++i)
{
job = jobs[i];
ThreadPoolJob* job = jobs[i];
if (job != nullptr && ! (job->isActive || job->shouldStop))
break;
job = nullptr;
}
if (job != nullptr)
job->isActive = true;
}
if (job != nullptr)
{
JUCE_TRY
{
ThreadPoolJob::JobStatus result = job->runJob();
lastJobEndTime = Time::getApproximateMillisecondCounter();
const ScopedLock sl (lock);
if (jobs.contains (job))
if (job != nullptr && ! job->isActive)
{
job->isActive = false;
if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
if (job->shouldStop)
{
job->pool = nullptr;
job->shouldStop = true;
jobs.removeValue (job);
if (result == ThreadPoolJob::jobHasFinishedAndShouldBeDeleted)
delete job;
jobFinishedSignal.signal();
}
else
{
// move the job to the end of the queue if it wants another go
jobs.move (jobs.indexOf (job), -1);
jobs.remove (i);
addToDeleteList (deletionList, job);
--i;
continue;
}
job->isActive = true;
return job;
}
}
#if JUCE_CATCH_UNHANDLED_EXCEPTIONS
catch (...)
{
const ScopedLock sl (lock);
jobs.removeValue (job);
}
#endif
}
else
{
if (threadStopTimeout > 0
&& Time::getApproximateMillisecondCounter() > lastJobEndTime + threadStopTimeout)
{
const ScopedLock sl (lock);
if (jobs.size() == 0)
for (int i = threads.size(); --i >= 0;)
threads.getUnchecked(i)->signalThreadShouldExit();
}
else
return nullptr;
}
bool ThreadPool::runNextJob()
{
ThreadPoolJob* const job = pickNextJobToRun();
if (job == nullptr)
return false;
ThreadPoolJob::JobStatus result = ThreadPoolJob::jobHasFinished;
JUCE_TRY
{
result = job->runJob();
}
JUCE_CATCH_ALL_ASSERT
OwnedArray<ThreadPoolJob> deletionList;
{
const ScopedLock sl (lock);
if (jobs.contains (job))
{
return false;
job->isActive = false;
if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
{
jobs.removeValue (job);
addToDeleteList (deletionList, job);
jobFinishedSignal.signal();
}
else
{
// move the job to the end of the queue if it wants another go
jobs.move (jobs.indexOf (job), -1);
}
}
}
return true;
}
void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* const job) const
{
job->shouldStop = true;
job->pool = nullptr;
if (job->shouldBeDeleted)
deletionList.add (job);
}

View file

@ -53,7 +53,6 @@ class JUCE_API ThreadPoolJob
public:
//==============================================================================
/** Creates a thread pool job object.
After creating your job, add it to a thread pool with ThreadPool::addJob().
*/
explicit ThreadPoolJob (const String& name);
@ -80,9 +79,6 @@ public:
jobHasFinished = 0, /**< indicates that the job has finished and can be
removed from the pool. */
jobHasFinishedAndShouldBeDeleted, /**< indicates that the job has finished and that it
should be automatically deleted by the pool. */
jobNeedsRunningAgain /**< indicates that the job would like to be called
again when a thread is free. */
};
@ -106,7 +102,7 @@ public:
//==============================================================================
/** Returns true if this job is currently running its runJob() method. */
bool isRunning() const { return isActive; }
bool isRunning() const noexcept { return isActive; }
/** Returns true if something is trying to interrupt this job and make it stop.
@ -115,7 +111,7 @@ public:
@see signalJobShouldExit()
*/
bool shouldExit() const { return shouldStop; }
bool shouldExit() const noexcept { return shouldStop; }
/** Calling this will cause the shouldExit() method to return true, and the job
should (if it's been implemented correctly) stop as soon as possible.
@ -150,22 +146,19 @@ class JUCE_API ThreadPool
public:
//==============================================================================
/** Creates a thread pool.
Once you've created a pool, you can give it some things to do with the addJob()
method.
@param numberOfThreads the maximum number of actual threads to run.
@param startThreadsOnlyWhenNeeded if this is true, then no threads will be started
until there are some jobs to run. If false, then
all the threads will be fired-up immediately so that
they're ready for action
@param stopThreadsWhenNotUsedTimeoutMs if this timeout is > 0, then if any threads have been
inactive for this length of time, they will automatically
be stopped until more jobs come along and they're needed
Once you've created a pool, you can give it some jobs by calling addJob().
@param numberOfThreads the number of threads to run. These will be started
immediately, and will run until the pool is deleted.
*/
ThreadPool (int numberOfThreads,
bool startThreadsOnlyWhenNeeded = true,
int stopThreadsWhenNotUsedTimeoutMs = 5000);
ThreadPool (int numberOfThreads);
/** Creates a thread pool with one thread per CPU core.
Once you've created a pool, you can give it some jobs by calling addJob().
If you want to specify the number of threads, use the other constructor; this
one creates a pool which has one thread for each CPU core.
@see SystemStats::getNumCpus()
*/
ThreadPool();
/** Destructor.
@ -200,8 +193,17 @@ public:
the job's ThreadPoolJob::runJob() method. Depending on the return value of the
runJob() method, the pool will either remove the job from the pool or add it to
the back of the queue to be run again.
If deleteJobWhenFinished is true, then the job object will be owned and deleted by
the pool when not needed - if you do this, make sure that your object's destructor
is thread-safe.
If deleteJobWhenFinished is false, the pointer will be used but not deleted, and
the caller is responsible for making sure the object is not deleted before it has
been removed from the pool.
*/
void addJob (ThreadPoolJob* job);
void addJob (ThreadPoolJob* job,
bool deleteJobWhenFinished);
/** Tries to remove a job from the pool.
@ -230,10 +232,6 @@ public:
methods called to try to interrupt them
@param timeOutMilliseconds the length of time this method should wait for all the jobs to finish
before giving up and returning false
@param deleteInactiveJobs if true, any jobs that aren't currently running will be deleted. If false,
they will simply be removed from the pool. Jobs that are already running when
this method is called can choose whether they should be deleted by
returning jobHasFinishedAndShouldBeDeleted from their runJob() method.
@param selectedJobsToRemove if this is non-zero, the JobSelector object is asked to decide which
jobs should be removed. If it is zero, all jobs are removed
@returns true if all jobs are successfully stopped and removed; false if the timeout period
@ -241,7 +239,6 @@ public:
*/
bool removeAllJobs (bool interruptRunningJobs,
int timeOutMilliseconds,
bool deleteInactiveJobs = false,
JobSelector* selectedJobsToRemove = nullptr);
/** Returns the number of jobs currently running or queued.
@ -277,7 +274,6 @@ public:
int timeOutMilliseconds) const;
/** Returns a list of the names of all the jobs currently running or queued.
If onlyReturnActiveJobs is true, only the ones currently running are returned.
*/
StringArray getNamesOfAllJobs (bool onlyReturnActiveJobs) const;
@ -292,19 +288,25 @@ public:
private:
//==============================================================================
const int threadStopTimeout;
int priority;
class ThreadPoolThread;
friend class OwnedArray <ThreadPoolThread>;
OwnedArray <ThreadPoolThread> threads;
Array <ThreadPoolJob*> jobs;
class ThreadPoolThread;
friend class ThreadPoolThread;
friend class OwnedArray <ThreadPoolThread>;
OwnedArray <ThreadPoolThread> threads;
CriticalSection lock;
uint32 lastJobEndTime;
WaitableEvent jobFinishedSignal;
friend class ThreadPoolThread;
bool runNextJob();
ThreadPoolJob* pickNextJobToRun();
void addToDeleteList (OwnedArray<ThreadPoolJob>&, ThreadPoolJob*) const;
void createThreads (int numThreads);
void stopThreads();
// Note that this method has changed, and no longer has a parameter to indicate
// whether the jobs should be deleted - see the new method for details.
void removeAllJobs (bool, int, bool);
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPool);
};