Commit 370fc240 authored by Stefan Westerfeld's avatar Stefan Westerfeld

Use multiple threads when using --detect-speed.

Signed-off-by: Stefan Westerfeld's avatarStefan Westerfeld <stefan@space.twc.de>
parent b53c234c
...@@ -32,13 +32,12 @@ main() ...@@ -32,13 +32,12 @@ main()
{ {
ThreadPool tp; ThreadPool tp;
std::vector<int> ids;
int result1 = 0; int result1 = 0;
int result2 = 0; int result2 = 0;
ids.push_back (tp.add_job ([&result1](){printf ("A\n"); sleep (2); printf ("A done\n"); result1 = 123;})); tp.add_job ([&result1](){printf ("A\n"); sleep (2); printf ("A done\n"); result1 = 123;});
ids.push_back (tp.add_job ([&result2](){printf ("B\n"); sleep (3); printf ("B done\n"); result2 = 456;})); tp.add_job ([&result2](){printf ("B\n"); sleep (3); printf ("B done\n"); result2 = 456;});
tp.wait_jobs (ids); tp.wait_all();
printf ("===\n"); printf ("===\n");
printf ("results: %d, %d\n", result1, result2); printf ("results: %d, %d\n", result1, result2);
} }
...@@ -45,7 +45,7 @@ ThreadPool::worker_run() ...@@ -45,7 +45,7 @@ ThreadPool::worker_run()
job.fun(); job.fun();
std::lock_guard<std::mutex> lg (mutex); std::lock_guard<std::mutex> lg (mutex);
jobs_done.insert (job.id); jobs_done++;
main_cond.notify_one(); main_cond.notify_one();
} }
...@@ -60,39 +60,27 @@ ThreadPool::ThreadPool() ...@@ -60,39 +60,27 @@ ThreadPool::ThreadPool()
} }
} }
int void
ThreadPool::add_job (std::function<void()> fun) ThreadPool::add_job (std::function<void()> fun)
{ {
std::lock_guard<std::mutex> lg (mutex); std::lock_guard<std::mutex> lg (mutex);
Job job; Job job;
job.fun = fun; job.fun = fun;
job.id = next_job_id++;
jobs.push_back (job); jobs.push_back (job);
jobs_added++;
cond.notify_one(); cond.notify_one();
return job.id;
} }
void void
ThreadPool::wait_jobs (std::vector<int>& ids) ThreadPool::wait_all()
{ {
for (;;) for (;;)
{ {
std::unique_lock<std::mutex> lck (mutex); std::unique_lock<std::mutex> lck (mutex);
/* check if at least one of the jobs is still running */ if (jobs_added == jobs_done)
bool done = true; return;
for (auto id : ids)
if (jobs_done.count (id) == 0)
done = false;
if (done)
{
for (auto id : ids)
jobs_done.erase (id);
return;
}
main_cond.wait (lck); main_cond.wait (lck);
} }
...@@ -109,9 +97,9 @@ ThreadPool::~ThreadPool() ...@@ -109,9 +97,9 @@ ThreadPool::~ThreadPool()
for (auto& t : threads) for (auto& t : threads)
t.join(); t.join();
if (jobs_done.size()) if (jobs_added != jobs_done)
{ {
// user must wait for each job before deleting the ThreadPool // user must wait before deleting the ThreadPool
error ("audiowmark: %zd open jobs in ThreadPool::~ThreadPool() - this should not happen\n", jobs_done.size()); error ("audiowmark: open jobs in ThreadPool::~ThreadPool() [added=%zd, done=%zd] - this should not happen\n", jobs_added, jobs_done);
} }
} }
...@@ -32,16 +32,15 @@ class ThreadPool ...@@ -32,16 +32,15 @@ class ThreadPool
struct Job struct Job
{ {
std::function<void()> fun; std::function<void()> fun;
int id;
}; };
std::mutex mutex; std::mutex mutex;
std::condition_variable cond; std::condition_variable cond;
std::condition_variable main_cond; std::condition_variable main_cond;
std::vector<Job> jobs; std::vector<Job> jobs;
std::set<int> jobs_done; size_t jobs_added = 0;
size_t jobs_done = 0;
bool stop_workers = false; bool stop_workers = false;
int next_job_id = 1;
bool worker_next_job (Job& job); bool worker_next_job (Job& job);
void worker_run(); void worker_run();
...@@ -50,8 +49,8 @@ public: ...@@ -50,8 +49,8 @@ public:
ThreadPool(); ThreadPool();
~ThreadPool(); ~ThreadPool();
int add_job (std::function<void()> fun); void add_job (std::function<void()> fun);
void wait_jobs (std::vector<int>& ids); void wait_all();
}; };
#endif /* AUDIOWMARK_THREAD_POOL_HH */ #endif /* AUDIOWMARK_THREAD_POOL_HH */
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "wmspeed.hh" #include "wmspeed.hh"
#include "wmcommon.hh" #include "wmcommon.hh"
#include "syncfinder.hh" #include "syncfinder.hh"
#include "threadpool.hh"
#include "fft.hh" #include "fft.hh"
#include <algorithm> #include <algorithm>
...@@ -143,7 +144,7 @@ class SpeedSync ...@@ -143,7 +144,7 @@ class SpeedSync
public: public:
struct Score struct Score
{ {
double speed = 0;; double speed = 0;
double quality = 0; double quality = 0;
}; };
private: private:
...@@ -153,22 +154,52 @@ private: ...@@ -153,22 +154,52 @@ private:
}; };
vector<vector<SyncFinder::FrameBit>> sync_bits; vector<vector<SyncFinder::FrameBit>> sync_bits;
vector<vector<Mags>> fft_sync_bits; vector<vector<Mags>> fft_sync_bits;
void prepare_mags (const WavData& in_data, double center, double seconds); void prepare_mags();
Score compare (double relative_speed, double center); Score compare (double relative_speed);
std::mutex mutex;
vector<Score> result_scores;
const WavData& in_data;
const double center;
const double step;
const int n_steps;
const double seconds;
public: public:
vector<Score> SpeedSync (const WavData& in_data, double center, double step, int n_steps, double seconds) :
search (const WavData& in_data, double center, double step, int n_steps, double seconds) in_data (in_data),
center (center),
step (step),
n_steps (n_steps),
seconds (seconds)
{
}
void
prepare_job (ThreadPool& thread_pool)
{ {
prepare_mags (in_data, center, seconds); thread_pool.add_job ([this]() { prepare_mags(); });
}
vector<Score> scores; void
search (ThreadPool& thread_pool)
{
for (int p = -n_steps; p <= n_steps; p++) for (int p = -n_steps; p <= n_steps; p++)
{ {
const double relative_speed = pow (step, p); const double relative_speed = pow (step, p);
scores.push_back (compare (relative_speed, center)); thread_pool.add_job ([relative_speed, this]()
{
Score score = compare (relative_speed);
std::lock_guard<std::mutex> lg (mutex);
result_scores.push_back (score);
});
} }
return scores; }
vector<Score>
get_scores()
{
return result_scores;
} }
}; };
...@@ -181,16 +212,43 @@ speed_scan (const WavData& in_data) ...@@ -181,16 +212,43 @@ speed_scan (const WavData& in_data)
const int n_center_steps = 28; const int n_center_steps = 28;
const int n_steps = 5; const int n_steps = 5;
const double step = 1.0007; const double step = 1.0007;
vector<std::unique_ptr<SpeedSync>> speed_sync;
auto t = get_time();
ThreadPool thread_pool;
for (int c = -n_center_steps; c <= n_center_steps; c++) for (int c = -n_center_steps; c <= n_center_steps; c++)
{ {
double c_speed = pow (step, c * (n_steps * 2 + 1)); double c_speed = pow (step, c * (n_steps * 2 + 1));
SpeedSync speed_sync; speed_sync.push_back (std::make_unique<SpeedSync> (in_data, c_speed, step, n_steps, /* seconds */ 21));
vector<SpeedSync::Score> step_scores = speed_sync.search (in_data, c_speed, step, n_steps, /* seconds */ 21);
scores.insert (scores.end(), step_scores.begin(), step_scores.end());
} }
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b) { return s_a.quality > s_b.quality; }); for (auto& s : speed_sync)
s->prepare_job (thread_pool);
thread_pool.wait_all();
printf ("## wait prepare jobs: %f\n", get_time() - t);
t=get_time();
for (auto& s : speed_sync)
s->search (thread_pool);
thread_pool.wait_all();
printf ("## wait search jobs: %f\n", get_time() - t);
t=get_time();
for (auto& s : speed_sync)
{
vector<SpeedSync::Score> step_scores = s->get_scores();
scores.insert (scores.end(), step_scores.begin(), step_scores.end());
}
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b)
{
if (s_a.quality == s_b.quality)
return s_a.speed > s_b.speed;
return s_a.quality > s_b.quality;
});
// we could search the N best matches, but using the best result works well in practice // we could search the N best matches, but using the best result works well in practice
SpeedSync::Score best_s = scores[0]; SpeedSync::Score best_s = scores[0];
...@@ -210,7 +268,7 @@ window_hamming (double x) /* sharp (rectangle) cutoffs at boundaries */ ...@@ -210,7 +268,7 @@ window_hamming (double x) /* sharp (rectangle) cutoffs at boundaries */
} }
void void
SpeedSync::prepare_mags (const WavData& in_data, double center, double seconds) SpeedSync::prepare_mags()
{ {
WavData in_data_trc (truncate (in_data, seconds / center)); WavData in_data_trc (truncate (in_data, seconds / center));
...@@ -293,7 +351,7 @@ SpeedSync::prepare_mags (const WavData& in_data, double center, double seconds) ...@@ -293,7 +351,7 @@ SpeedSync::prepare_mags (const WavData& in_data, double center, double seconds)
} }
SpeedSync::Score SpeedSync::Score
SpeedSync::compare (double relative_speed, double center) SpeedSync::compare (double relative_speed)
{ {
const int frames_per_block = mark_sync_frame_count() + mark_data_frame_count(); const int frames_per_block = mark_sync_frame_count() + mark_data_frame_count();
const int pad_start = frames_per_block * /* HACK */ 4; const int pad_start = frames_per_block * /* HACK */ 4;
...@@ -429,11 +487,24 @@ detect_speed (const WavData& in_data) ...@@ -429,11 +487,24 @@ detect_speed (const WavData& in_data)
speed = speed_scan (in_clip_short); speed = speed_scan (in_clip_short);
/* second pass: fast refine (not always perfect) */ /* second pass: fast refine (not always perfect) */
SpeedSync speed_sync; double t = get_time();
auto scores = speed_sync.search (in_clip_long, speed, 1.00005, 20, /* seconds */ 50); SpeedSync speed_sync (in_clip_long, speed, 1.00005, 20, /* seconds */ 50);
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b) { return s_a.quality > s_b.quality; }); ThreadPool tp;
speed_sync.prepare_job (tp);
tp.wait_all();
speed_sync.search (tp);
tp.wait_all();
auto scores = speed_sync.get_scores();
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b)
{
if (s_a.quality == s_b.quality)
return s_a.speed > s_b.speed;
return s_a.quality > s_b.quality;
});
if (!scores.empty()) if (!scores.empty())
speed = scores[0].speed; speed = scores[0].speed;
printf ("## time for 2nd pass: %f\n", get_time() - t);
} }
return speed; return speed;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment