diff --git a/extras/Introjucer/Source/Project Saving/jucer_ProjectSaver.h b/extras/Introjucer/Source/Project Saving/jucer_ProjectSaver.h index 3bd1ccea30..b3e0dbf608 100644 --- a/extras/Introjucer/Source/Project Saving/jucer_ProjectSaver.h +++ b/extras/Introjucer/Source/Project Saving/jucer_ProjectSaver.h @@ -417,7 +417,7 @@ private: void writeProjects (const OwnedArray& modules) { - ThreadPool threadPool (4, false, 30000); + ThreadPool threadPool; // keep a copy of the basic generated files group, as each exporter may modify it. const ValueTree originalGeneratedGroup (generatedFilesGroup.state.createCopy()); @@ -437,7 +437,7 @@ private: sortGroupRecursively (generatedFilesGroup); exporter->groups.add (generatedFilesGroup); - threadPool.addJob (new ExporterJob (*this, exporter.exporter.release(), modules)); + threadPool.addJob (new ExporterJob (*this, exporter.exporter.release(), modules), true); } else { @@ -471,7 +471,7 @@ private: owner.addError (error.message); } - return jobHasFinishedAndShouldBeDeleted; + return jobHasFinished; } private: diff --git a/extras/JuceDemo/Source/demos/ThreadingDemo.cpp b/extras/JuceDemo/Source/demos/ThreadingDemo.cpp index d86a17c5cf..270006dd2a 100644 --- a/extras/JuceDemo/Source/demos/ThreadingDemo.cpp +++ b/extras/JuceDemo/Source/demos/ThreadingDemo.cpp @@ -177,10 +177,6 @@ public: { } - ~DemoThreadPoolJob() - { - } - JobStatus runJob() { // this is the code that runs this job. It'll be repeatedly called until we return @@ -252,7 +248,7 @@ public: while (balls.size() < 5) addABall(); - startTimer (2000); + startTimer (300); } } @@ -276,7 +272,7 @@ public: addAndMakeVisible (newBall); newBall->parentSizeChanged(); - pool.addJob (newBall); + pool.addJob (newBall, false); } else { diff --git a/modules/juce_core/system/juce_SystemStats.h b/modules/juce_core/system/juce_SystemStats.h index 5ca17771a7..7d67acef06 100644 --- a/modules/juce_core/system/juce_SystemStats.h +++ b/modules/juce_core/system/juce_SystemStats.h @@ -112,6 +112,9 @@ public: //============================================================================== // CPU and memory information.. + /** Returns the number of CPUs. */ + static int getNumCpus() noexcept { return getCPUFlags().numCpus; } + /** Returns the approximate CPU speed. @returns the speed in megahertz, e.g. 1500, 2500, 32000 (depending on @@ -137,9 +140,6 @@ public: /** Checks whether AMD 3DNOW instructions are available. */ static bool has3DNow() noexcept { return getCPUFlags().has3DNow; } - /** Returns the number of CPUs. */ - static int getNumCpus() noexcept { return getCPUFlags().numCpus; } - //============================================================================== /** Finds out how much RAM is in the machine. diff --git a/modules/juce_core/threads/juce_ThreadPool.cpp b/modules/juce_core/threads/juce_ThreadPool.cpp index 5aa2c5bf5d..f57e988d58 100644 --- a/modules/juce_core/threads/juce_ThreadPool.cpp +++ b/modules/juce_core/threads/juce_ThreadPool.cpp @@ -55,15 +55,13 @@ void ThreadPoolJob::signalJobShouldExit() shouldStop = true; } - //============================================================================== class ThreadPool::ThreadPoolThread : public Thread { public: ThreadPoolThread (ThreadPool& pool_) : Thread ("Pool"), - pool (pool_), - busy (false) + pool (pool_) { } @@ -78,41 +76,48 @@ public: private: ThreadPool& pool; - bool volatile busy; JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread); }; //============================================================================== -ThreadPool::ThreadPool (const int numThreads, - const bool startThreadsOnlyWhenNeeded, - const int stopThreadsWhenNotUsedTimeoutMs) - : threadStopTimeout (stopThreadsWhenNotUsedTimeoutMs), - priority (5) +ThreadPool::ThreadPool (const int numThreads) { - jassert (numThreads > 0); // not much point having one of these with no threads in it. + jassert (numThreads > 0); // not much point having a pool without any threads! - for (int i = jmax (1, numThreads); --i >= 0;) - threads.add (new ThreadPoolThread (*this)); + createThreads (numThreads); +} - if (! startThreadsOnlyWhenNeeded) - for (int i = threads.size(); --i >= 0;) - threads.getUnchecked(i)->startThread (priority); +ThreadPool::ThreadPool() +{ + createThreads (SystemStats::getNumCpus()); } ThreadPool::~ThreadPool() { - removeAllJobs (true, 4000); + removeAllJobs (true, 5000); + stopThreads(); +} - int i; - for (i = threads.size(); --i >= 0;) +void ThreadPool::createThreads (int numThreads) +{ + for (int i = jmax (1, numThreads); --i >= 0;) + threads.add (new ThreadPoolThread (*this)); + + for (int i = threads.size(); --i >= 0;) + threads.getUnchecked(i)->startThread(); +} + +void ThreadPool::stopThreads() +{ + for (int i = threads.size(); --i >= 0;) threads.getUnchecked(i)->signalThreadShouldExit(); - for (i = threads.size(); --i >= 0;) + for (int i = threads.size(); --i >= 0;) threads.getUnchecked(i)->stopThread (500); } -void ThreadPool::addJob (ThreadPoolJob* const job) +void ThreadPool::addJob (ThreadPoolJob* const job, const bool deleteJobWhenFinished) { jassert (job != nullptr); jassert (job->pool == nullptr); @@ -122,38 +127,11 @@ void ThreadPool::addJob (ThreadPoolJob* const job) job->pool = this; job->shouldStop = false; job->isActive = false; + job->shouldBeDeleted = deleteJobWhenFinished; { const ScopedLock sl (lock); jobs.add (job); - - int numRunning = 0; - - for (int i = threads.size(); --i >= 0;) - if (threads.getUnchecked(i)->isThreadRunning() && ! threads.getUnchecked(i)->threadShouldExit()) - ++numRunning; - - if (numRunning < threads.size()) - { - bool startedOne = false; - int n = 1000; - - while (--n >= 0 && ! startedOne) - { - for (int i = threads.size(); --i >= 0;) - { - if (! threads.getUnchecked(i)->isThreadRunning()) - { - threads.getUnchecked(i)->startThread (priority); - startedOne = true; - break; - } - } - - if (! startedOne) - Thread::sleep (2); - } - } } for (int i = threads.size(); --i >= 0;) @@ -208,6 +186,7 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job, const int timeOutMs) { bool dontWait = true; + OwnedArray deletionList; if (job != nullptr) { @@ -225,7 +204,7 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job, else { jobs.removeValue (job); - job->pool = nullptr; + addToDeleteList (deletionList, job); } } } @@ -233,37 +212,35 @@ bool ThreadPool::removeJob (ThreadPoolJob* const job, return dontWait || waitForJobToFinish (job, timeOutMs); } -bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, - const int timeOutMs, - const bool deleteInactiveJobs, +bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, const int timeOutMs, ThreadPool::JobSelector* selectedJobsToRemove) { Array jobsToWaitFor; { - const ScopedLock sl (lock); + OwnedArray deletionList; - for (int i = jobs.size(); --i >= 0;) { - ThreadPoolJob* const job = jobs.getUnchecked(i); + const ScopedLock sl (lock); - if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job)) + for (int i = jobs.size(); --i >= 0;) { - if (job->isActive) - { - jobsToWaitFor.add (job); + ThreadPoolJob* const job = jobs.getUnchecked(i); - if (interruptRunningJobs) - job->signalJobShouldExit(); - } - else + if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job)) { - jobs.remove (i); + if (job->isActive) + { + jobsToWaitFor.add (job); - if (deleteInactiveJobs) - delete job; + if (interruptRunningJobs) + job->signalJobShouldExit(); + } else - job->pool = nullptr; + { + jobs.remove (i); + addToDeleteList (deletionList, job); + } } } } @@ -274,8 +251,12 @@ bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, for (;;) { for (int i = jobsToWaitFor.size(); --i >= 0;) - if (! isJobRunning (jobsToWaitFor.getUnchecked (i))) + { + ThreadPoolJob* const job = jobsToWaitFor.getUnchecked (i); + + if (! isJobRunning (job)) jobsToWaitFor.remove (i); + } if (jobsToWaitFor.size() == 0) break; @@ -308,96 +289,90 @@ bool ThreadPool::setThreadPriorities (const int newPriority) { bool ok = true; - if (priority != newPriority) - { - priority = newPriority; - - for (int i = threads.size(); --i >= 0;) - if (! threads.getUnchecked(i)->setPriority (newPriority)) - ok = false; - } + for (int i = threads.size(); --i >= 0;) + if (! threads.getUnchecked(i)->setPriority (newPriority)) + ok = false; return ok; } -bool ThreadPool::runNextJob() +ThreadPoolJob* ThreadPool::pickNextJobToRun() { - ThreadPoolJob* job = nullptr; + OwnedArray deletionList; { const ScopedLock sl (lock); for (int i = 0; i < jobs.size(); ++i) { - job = jobs[i]; + ThreadPoolJob* job = jobs[i]; - if (job != nullptr && ! (job->isActive || job->shouldStop)) - break; - - job = nullptr; - } - - if (job != nullptr) - job->isActive = true; - - } - - if (job != nullptr) - { - JUCE_TRY - { - ThreadPoolJob::JobStatus result = job->runJob(); - - lastJobEndTime = Time::getApproximateMillisecondCounter(); - - const ScopedLock sl (lock); - - if (jobs.contains (job)) + if (job != nullptr && ! job->isActive) { - job->isActive = false; - - if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop) + if (job->shouldStop) { - job->pool = nullptr; - job->shouldStop = true; - jobs.removeValue (job); - - if (result == ThreadPoolJob::jobHasFinishedAndShouldBeDeleted) - delete job; - - jobFinishedSignal.signal(); - } - else - { - // move the job to the end of the queue if it wants another go - jobs.move (jobs.indexOf (job), -1); + jobs.remove (i); + addToDeleteList (deletionList, job); + --i; + continue; } + + job->isActive = true; + return job; } } -#if JUCE_CATCH_UNHANDLED_EXCEPTIONS - catch (...) - { - const ScopedLock sl (lock); - jobs.removeValue (job); - } -#endif } - else - { - if (threadStopTimeout > 0 - && Time::getApproximateMillisecondCounter() > lastJobEndTime + threadStopTimeout) - { - const ScopedLock sl (lock); - if (jobs.size() == 0) - for (int i = threads.size(); --i >= 0;) - threads.getUnchecked(i)->signalThreadShouldExit(); - } - else + return nullptr; +} + +bool ThreadPool::runNextJob() +{ + ThreadPoolJob* const job = pickNextJobToRun(); + + if (job == nullptr) + return false; + + ThreadPoolJob::JobStatus result = ThreadPoolJob::jobHasFinished; + + JUCE_TRY + { + result = job->runJob(); + } + JUCE_CATCH_ALL_ASSERT + + OwnedArray deletionList; + + { + const ScopedLock sl (lock); + + if (jobs.contains (job)) { - return false; + job->isActive = false; + + if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop) + { + jobs.removeValue (job); + addToDeleteList (deletionList, job); + + jobFinishedSignal.signal(); + } + else + { + // move the job to the end of the queue if it wants another go + jobs.move (jobs.indexOf (job), -1); + } } } return true; } + +void ThreadPool::addToDeleteList (OwnedArray& deletionList, ThreadPoolJob* const job) const +{ + job->shouldStop = true; + job->pool = nullptr; + + if (job->shouldBeDeleted) + deletionList.add (job); +} diff --git a/modules/juce_core/threads/juce_ThreadPool.h b/modules/juce_core/threads/juce_ThreadPool.h index 4895a8b032..1e6744f6d6 100644 --- a/modules/juce_core/threads/juce_ThreadPool.h +++ b/modules/juce_core/threads/juce_ThreadPool.h @@ -53,7 +53,6 @@ class JUCE_API ThreadPoolJob public: //============================================================================== /** Creates a thread pool job object. - After creating your job, add it to a thread pool with ThreadPool::addJob(). */ explicit ThreadPoolJob (const String& name); @@ -80,9 +79,6 @@ public: jobHasFinished = 0, /**< indicates that the job has finished and can be removed from the pool. */ - jobHasFinishedAndShouldBeDeleted, /**< indicates that the job has finished and that it - should be automatically deleted by the pool. */ - jobNeedsRunningAgain /**< indicates that the job would like to be called again when a thread is free. */ }; @@ -106,7 +102,7 @@ public: //============================================================================== /** Returns true if this job is currently running its runJob() method. */ - bool isRunning() const { return isActive; } + bool isRunning() const noexcept { return isActive; } /** Returns true if something is trying to interrupt this job and make it stop. @@ -115,7 +111,7 @@ public: @see signalJobShouldExit() */ - bool shouldExit() const { return shouldStop; } + bool shouldExit() const noexcept { return shouldStop; } /** Calling this will cause the shouldExit() method to return true, and the job should (if it's been implemented correctly) stop as soon as possible. @@ -150,22 +146,19 @@ class JUCE_API ThreadPool public: //============================================================================== /** Creates a thread pool. - - Once you've created a pool, you can give it some things to do with the addJob() - method. - - @param numberOfThreads the maximum number of actual threads to run. - @param startThreadsOnlyWhenNeeded if this is true, then no threads will be started - until there are some jobs to run. If false, then - all the threads will be fired-up immediately so that - they're ready for action - @param stopThreadsWhenNotUsedTimeoutMs if this timeout is > 0, then if any threads have been - inactive for this length of time, they will automatically - be stopped until more jobs come along and they're needed + Once you've created a pool, you can give it some jobs by calling addJob(). + @param numberOfThreads the number of threads to run. These will be started + immediately, and will run until the pool is deleted. */ - ThreadPool (int numberOfThreads, - bool startThreadsOnlyWhenNeeded = true, - int stopThreadsWhenNotUsedTimeoutMs = 5000); + ThreadPool (int numberOfThreads); + + /** Creates a thread pool with one thread per CPU core. + Once you've created a pool, you can give it some jobs by calling addJob(). + If you want to specify the number of threads, use the other constructor; this + one creates a pool which has one thread for each CPU core. + @see SystemStats::getNumCpus() + */ + ThreadPool(); /** Destructor. @@ -200,8 +193,17 @@ public: the job's ThreadPoolJob::runJob() method. Depending on the return value of the runJob() method, the pool will either remove the job from the pool or add it to the back of the queue to be run again. + + If deleteJobWhenFinished is true, then the job object will be owned and deleted by + the pool when not needed - if you do this, make sure that your object's destructor + is thread-safe. + + If deleteJobWhenFinished is false, the pointer will be used but not deleted, and + the caller is responsible for making sure the object is not deleted before it has + been removed from the pool. */ - void addJob (ThreadPoolJob* job); + void addJob (ThreadPoolJob* job, + bool deleteJobWhenFinished); /** Tries to remove a job from the pool. @@ -230,10 +232,6 @@ public: methods called to try to interrupt them @param timeOutMilliseconds the length of time this method should wait for all the jobs to finish before giving up and returning false - @param deleteInactiveJobs if true, any jobs that aren't currently running will be deleted. If false, - they will simply be removed from the pool. Jobs that are already running when - this method is called can choose whether they should be deleted by - returning jobHasFinishedAndShouldBeDeleted from their runJob() method. @param selectedJobsToRemove if this is non-zero, the JobSelector object is asked to decide which jobs should be removed. If it is zero, all jobs are removed @returns true if all jobs are successfully stopped and removed; false if the timeout period @@ -241,7 +239,6 @@ public: */ bool removeAllJobs (bool interruptRunningJobs, int timeOutMilliseconds, - bool deleteInactiveJobs = false, JobSelector* selectedJobsToRemove = nullptr); /** Returns the number of jobs currently running or queued. @@ -277,7 +274,6 @@ public: int timeOutMilliseconds) const; /** Returns a list of the names of all the jobs currently running or queued. - If onlyReturnActiveJobs is true, only the ones currently running are returned. */ StringArray getNamesOfAllJobs (bool onlyReturnActiveJobs) const; @@ -292,19 +288,25 @@ public: private: //============================================================================== - const int threadStopTimeout; - int priority; - class ThreadPoolThread; - friend class OwnedArray ; - OwnedArray threads; Array jobs; + class ThreadPoolThread; + friend class ThreadPoolThread; + friend class OwnedArray ; + OwnedArray threads; + CriticalSection lock; - uint32 lastJobEndTime; WaitableEvent jobFinishedSignal; - friend class ThreadPoolThread; bool runNextJob(); + ThreadPoolJob* pickNextJobToRun(); + void addToDeleteList (OwnedArray&, ThreadPoolJob*) const; + void createThreads (int numThreads); + void stopThreads(); + + // Note that this method has changed, and no longer has a parameter to indicate + // whether the jobs should be deleted - see the new method for details. + void removeAllJobs (bool, int, bool); JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPool); };