Commit 2977670d authored by Stefan Westerfeld's avatar Stefan Westerfeld

Implement ThreadPool class.

Signed-off-by: Stefan Westerfeld's avatarStefan Westerfeld <stefan@space.twc.de>
parent 98acfe1d
...@@ -6,13 +6,13 @@ COMMON_SRC = utils.hh utils.cc convcode.hh convcode.cc random.hh random.cc wavda ...@@ -6,13 +6,13 @@ COMMON_SRC = utils.hh utils.cc convcode.hh convcode.cc random.hh random.cc wavda
sfoutputstream.cc sfoutputstream.hh rawinputstream.cc rawinputstream.hh rawoutputstream.cc rawoutputstream.hh \ sfoutputstream.cc sfoutputstream.hh rawinputstream.cc rawinputstream.hh rawoutputstream.cc rawoutputstream.hh \
rawconverter.cc rawconverter.hh mp3inputstream.cc mp3inputstream.hh wmcommon.cc wmcommon.hh fft.cc fft.hh \ rawconverter.cc rawconverter.hh mp3inputstream.cc mp3inputstream.hh wmcommon.cc wmcommon.hh fft.cc fft.hh \
limiter.cc limiter.hh shortcode.cc shortcode.hh mpegts.cc mpegts.hh hls.cc hls.hh audiobuffer.hh \ limiter.cc limiter.hh shortcode.cc shortcode.hh mpegts.cc mpegts.hh hls.cc hls.hh audiobuffer.hh \
wmget.cc wmadd.cc syncfinder.cc syncfinder.hh wmspeed.cc wmspeed.hh wmget.cc wmadd.cc syncfinder.cc syncfinder.hh wmspeed.cc wmspeed.hh threadpool.cc threadpool.hh
COMMON_LIBS = $(SNDFILE_LIBS) $(FFTW_LIBS) $(LIBGCRYPT_LIBS) $(LIBMPG123_LIBS) $(FFMPEG_LIBS) COMMON_LIBS = $(SNDFILE_LIBS) $(FFTW_LIBS) $(LIBGCRYPT_LIBS) $(LIBMPG123_LIBS) $(FFMPEG_LIBS)
audiowmark_SOURCES = audiowmark.cc $(COMMON_SRC) audiowmark_SOURCES = audiowmark.cc $(COMMON_SRC)
audiowmark_LDFLAGS = $(COMMON_LIBS) audiowmark_LDFLAGS = $(COMMON_LIBS)
noinst_PROGRAMS = testconvcode testrandom testmp3 teststream testlimiter testshortcode testmpegts noinst_PROGRAMS = testconvcode testrandom testmp3 teststream testlimiter testshortcode testmpegts testthreadpool
testconvcode_SOURCES = testconvcode.cc $(COMMON_SRC) testconvcode_SOURCES = testconvcode.cc $(COMMON_SRC)
testconvcode_LDFLAGS = $(COMMON_LIBS) testconvcode_LDFLAGS = $(COMMON_LIBS)
...@@ -35,6 +35,9 @@ testshortcode_LDFLAGS = $(COMMON_LIBS) ...@@ -35,6 +35,9 @@ testshortcode_LDFLAGS = $(COMMON_LIBS)
testmpegts_SOURCES = testmpegts.cc $(COMMON_SRC) testmpegts_SOURCES = testmpegts.cc $(COMMON_SRC)
testmpegts_LDFLAGS = $(COMMON_LIBS) testmpegts_LDFLAGS = $(COMMON_LIBS)
testthreadpool_SOURCES = testthreadpool.cc $(COMMON_SRC)
testthreadpool_LDFLAGS = $(COMMON_LIBS)
if COND_WITH_FFMPEG if COND_WITH_FFMPEG
COMMON_SRC += hlsoutputstream.cc hlsoutputstream.hh COMMON_SRC += hlsoutputstream.cc hlsoutputstream.hh
......
/*
* Copyright (C) 2020 Stefan Westerfeld
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include <thread>
#include <vector>
#include <mutex>
#include <set>
#include <functional>
#include <condition_variable>
#include "threadpool.hh"
int
main()
{
ThreadPool tp;
std::vector<int> ids;
int result1 = 0;
int result2 = 0;
ids.push_back (tp.add_job ([&result1](){printf ("A\n"); sleep (2); printf ("A done\n"); result1 = 123;}));
ids.push_back (tp.add_job ([&result2](){printf ("B\n"); sleep (3); printf ("B done\n"); result2 = 456;}));
tp.wait_jobs (ids);
printf ("===\n");
printf ("results: %d, %d\n", result1, result2);
}
/*
* Copyright (C) 2020 Stefan Westerfeld
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "threadpool.hh"
#include "utils.hh"
bool
ThreadPool::worker_next_job (Job& job)
{
std::unique_lock<std::mutex> lck (mutex);
if (jobs.empty())
cond.wait (lck);
if (jobs.empty())
return false;
job = jobs.front();
jobs.erase (jobs.begin());
return true;
}
void
ThreadPool::worker_run()
{
while (!stop_workers)
{
Job job;
if (worker_next_job (job))
{
job.fun();
std::lock_guard<std::mutex> lg (mutex);
jobs_done.insert (job.id);
main_cond.notify_one();
}
}
}
ThreadPool::ThreadPool()
{
for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++)
{
threads.push_back (std::thread (&ThreadPool::worker_run, this));
}
}
int
ThreadPool::add_job (std::function<void()> fun)
{
std::lock_guard<std::mutex> lg (mutex);
Job job;
job.fun = fun;
job.id = next_job_id++;
jobs.push_back (job);
cond.notify_one();
return job.id;
}
void
ThreadPool::wait_jobs (std::vector<int>& ids)
{
for (;;)
{
std::unique_lock<std::mutex> lck (mutex);
/* check if at least one of the jobs is still running */
bool done = true;
for (auto id : ids)
if (jobs_done.count (id) == 0)
done = false;
if (done)
{
for (auto id : ids)
jobs_done.erase (id);
return;
}
main_cond.wait (lck);
}
}
ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lg (mutex);
stop_workers = true;
cond.notify_all();
}
for (auto& t : threads)
t.join();
if (jobs_done.size())
{
// user must wait for each job before deleting the ThreadPool
error ("audiowmark: %zd open jobs in ThreadPool::~ThreadPool() - this should not happen\n", jobs_done.size());
}
}
/*
* Copyright (C) 2020 Stefan Westerfeld
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef AUDIOWMARK_THREAD_POOL_HH
#define AUDIOWMARK_THREAD_POOL_HH
#include <vector>
#include <thread>
#include <functional>
#include <set>
#include <mutex>
#include <condition_variable>
class ThreadPool
{
std::vector<std::thread> threads;
struct Job
{
std::function<void()> fun;
int id;
};
std::mutex mutex;
std::condition_variable cond;
std::condition_variable main_cond;
std::vector<Job> jobs;
std::set<int> jobs_done;
bool stop_workers = false;
int next_job_id = 1;
bool worker_next_job (Job& job);
void worker_run();
public:
ThreadPool();
~ThreadPool();
int add_job (std::function<void()> fun);
void wait_jobs (std::vector<int>& ids);
};
#endif /* AUDIOWMARK_THREAD_POOL_HH */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment