Commit 326df6a0 authored by Stefan Westerfeld's avatar Stefan Westerfeld

Use multiple threads when using --detect-speed.

Signed-off-by: Stefan Westerfeld's avatarStefan Westerfeld <stefan@space.twc.de>
parent 2977670d
......@@ -32,13 +32,12 @@ main()
{
ThreadPool tp;
std::vector<int> ids;
int result1 = 0;
int result2 = 0;
ids.push_back (tp.add_job ([&result1](){printf ("A\n"); sleep (2); printf ("A done\n"); result1 = 123;}));
ids.push_back (tp.add_job ([&result2](){printf ("B\n"); sleep (3); printf ("B done\n"); result2 = 456;}));
tp.add_job ([&result1](){printf ("A\n"); sleep (2); printf ("A done\n"); result1 = 123;});
tp.add_job ([&result2](){printf ("B\n"); sleep (3); printf ("B done\n"); result2 = 456;});
tp.wait_jobs (ids);
tp.wait_all();
printf ("===\n");
printf ("results: %d, %d\n", result1, result2);
}
......@@ -45,7 +45,7 @@ ThreadPool::worker_run()
job.fun();
std::lock_guard<std::mutex> lg (mutex);
jobs_done.insert (job.id);
jobs_done++;
main_cond.notify_one();
}
......@@ -60,39 +60,27 @@ ThreadPool::ThreadPool()
}
}
int
void
ThreadPool::add_job (std::function<void()> fun)
{
std::lock_guard<std::mutex> lg (mutex);
Job job;
job.fun = fun;
job.id = next_job_id++;
jobs.push_back (job);
jobs_added++;
cond.notify_one();
return job.id;
}
void
ThreadPool::wait_jobs (std::vector<int>& ids)
ThreadPool::wait_all()
{
for (;;)
{
std::unique_lock<std::mutex> lck (mutex);
/* check if at least one of the jobs is still running */
bool done = true;
for (auto id : ids)
if (jobs_done.count (id) == 0)
done = false;
if (done)
{
for (auto id : ids)
jobs_done.erase (id);
return;
}
if (jobs_added == jobs_done)
return;
main_cond.wait (lck);
}
......@@ -109,9 +97,9 @@ ThreadPool::~ThreadPool()
for (auto& t : threads)
t.join();
if (jobs_done.size())
if (jobs_added != jobs_done)
{
// user must wait for each job before deleting the ThreadPool
error ("audiowmark: %zd open jobs in ThreadPool::~ThreadPool() - this should not happen\n", jobs_done.size());
// user must wait before deleting the ThreadPool
error ("audiowmark: open jobs in ThreadPool::~ThreadPool() [added=%zd, done=%zd] - this should not happen\n", jobs_added, jobs_done);
}
}
......@@ -32,16 +32,15 @@ class ThreadPool
struct Job
{
std::function<void()> fun;
int id;
};
std::mutex mutex;
std::condition_variable cond;
std::condition_variable main_cond;
std::vector<Job> jobs;
std::set<int> jobs_done;
size_t jobs_added = 0;
size_t jobs_done = 0;
bool stop_workers = false;
int next_job_id = 1;
bool worker_next_job (Job& job);
void worker_run();
......@@ -50,8 +49,8 @@ public:
ThreadPool();
~ThreadPool();
int add_job (std::function<void()> fun);
void wait_jobs (std::vector<int>& ids);
void add_job (std::function<void()> fun);
void wait_all();
};
#endif /* AUDIOWMARK_THREAD_POOL_HH */
......@@ -18,6 +18,7 @@
#include "wmspeed.hh"
#include "wmcommon.hh"
#include "syncfinder.hh"
#include "threadpool.hh"
#include "fft.hh"
#include <algorithm>
......@@ -143,7 +144,7 @@ class SpeedSync
public:
struct Score
{
double speed = 0;;
double speed = 0;
double quality = 0;
};
private:
......@@ -153,22 +154,52 @@ private:
};
vector<vector<SyncFinder::FrameBit>> sync_bits;
vector<vector<Mags>> fft_sync_bits;
void prepare_mags (const WavData& in_data, double center, double seconds);
Score compare (double relative_speed, double center);
void prepare_mags();
Score compare (double relative_speed);
std::mutex mutex;
vector<Score> result_scores;
const WavData& in_data;
const double center;
const double step;
const int n_steps;
const double seconds;
public:
vector<Score>
search (const WavData& in_data, double center, double step, int n_steps, double seconds)
SpeedSync (const WavData& in_data, double center, double step, int n_steps, double seconds) :
in_data (in_data),
center (center),
step (step),
n_steps (n_steps),
seconds (seconds)
{
}
void
prepare_job (ThreadPool& thread_pool)
{
prepare_mags (in_data, center, seconds);
thread_pool.add_job ([this]() { prepare_mags(); });
}
vector<Score> scores;
void
search (ThreadPool& thread_pool)
{
for (int p = -n_steps; p <= n_steps; p++)
{
const double relative_speed = pow (step, p);
scores.push_back (compare (relative_speed, center));
thread_pool.add_job ([relative_speed, this]()
{
Score score = compare (relative_speed);
std::lock_guard<std::mutex> lg (mutex);
result_scores.push_back (score);
});
}
return scores;
}
vector<Score>
get_scores()
{
return result_scores;
}
};
......@@ -181,16 +212,43 @@ speed_scan (const WavData& in_data)
const int n_center_steps = 28;
const int n_steps = 5;
const double step = 1.0007;
vector<std::unique_ptr<SpeedSync>> speed_sync;
auto t = get_time();
ThreadPool thread_pool;
for (int c = -n_center_steps; c <= n_center_steps; c++)
{
double c_speed = pow (step, c * (n_steps * 2 + 1));
SpeedSync speed_sync;
vector<SpeedSync::Score> step_scores = speed_sync.search (in_data, c_speed, step, n_steps, /* seconds */ 21);
scores.insert (scores.end(), step_scores.begin(), step_scores.end());
speed_sync.push_back (std::make_unique<SpeedSync> (in_data, c_speed, step, n_steps, /* seconds */ 21));
}
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b) { return s_a.quality > s_b.quality; });
for (auto& s : speed_sync)
s->prepare_job (thread_pool);
thread_pool.wait_all();
printf ("## wait prepare jobs: %f\n", get_time() - t);
t=get_time();
for (auto& s : speed_sync)
s->search (thread_pool);
thread_pool.wait_all();
printf ("## wait search jobs: %f\n", get_time() - t);
t=get_time();
for (auto& s : speed_sync)
{
vector<SpeedSync::Score> step_scores = s->get_scores();
scores.insert (scores.end(), step_scores.begin(), step_scores.end());
}
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b)
{
if (s_a.quality == s_b.quality)
return s_a.speed > s_b.speed;
return s_a.quality > s_b.quality;
});
// we could search the N best matches, but using the best result works well in practice
SpeedSync::Score best_s = scores[0];
......@@ -210,7 +268,7 @@ window_hamming (double x) /* sharp (rectangle) cutoffs at boundaries */
}
void
SpeedSync::prepare_mags (const WavData& in_data, double center, double seconds)
SpeedSync::prepare_mags()
{
WavData in_data_trc (truncate (in_data, seconds / center));
......@@ -293,7 +351,7 @@ SpeedSync::prepare_mags (const WavData& in_data, double center, double seconds)
}
SpeedSync::Score
SpeedSync::compare (double relative_speed, double center)
SpeedSync::compare (double relative_speed)
{
const int frames_per_block = mark_sync_frame_count() + mark_data_frame_count();
const int pad_start = frames_per_block * /* HACK */ 4;
......@@ -429,11 +487,24 @@ detect_speed (const WavData& in_data)
speed = speed_scan (in_clip_short);
/* second pass: fast refine (not always perfect) */
SpeedSync speed_sync;
auto scores = speed_sync.search (in_clip_long, speed, 1.00005, 20, /* seconds */ 50);
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b) { return s_a.quality > s_b.quality; });
double t = get_time();
SpeedSync speed_sync (in_clip_long, speed, 1.00005, 20, /* seconds */ 50);
ThreadPool tp;
speed_sync.prepare_job (tp);
tp.wait_all();
speed_sync.search (tp);
tp.wait_all();
auto scores = speed_sync.get_scores();
sort (scores.begin(), scores.end(), [] (SpeedSync::Score s_a, SpeedSync::Score s_b)
{
if (s_a.quality == s_b.quality)
return s_a.speed > s_b.speed;
return s_a.quality > s_b.quality;
});
if (!scores.empty())
speed = scores[0].speed;
printf ("## time for 2nd pass: %f\n", get_time() - t);
}
return speed;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment