O(N^2). Часть 2: prefetching
Продолжаю рассказывать о том, как получать AC с простыми решениями, у которых асимптотика сильно хуже чем хотели авторы задачи.
В этот раз речь пойдет о задаче CF814F. Ее формулировка довольно простая. Есть массив a
из 10^5 чисел, каждое из которых не больше 2⋅10^4. Нужно ответить на 10^5 запросов (l, r)
. Для каждого запроса нужно посчитать количество взаимнопростых чисел от 1 до 10^5 с числом a[l] * a[l + 1] * ... * a[r]
.
Авторское решение — какая-то корневая со сложной асимптотикой. Мы же будем писать простое решение, которое работает за O(N^2).
Первая идея
Для каждого числа x
от 1 до 2⋅10^4 посчитаем битовую маску mask[x]
длиной 10^5, где i
-й бит проставлен, если gcd(x, i) != 1
. Тогда чтобы ответить на запрос (l, r)
нужно посчитать mask[a[l]] | mask[a[l + 1]] | ... | mask[a[r]]
, а потом посчитать количество проставленных бит в полученной маске.
Ровно в таком виде решение имеет асимптотику O(N^3) и естественно будет работать долго. Но можно добавить divide&conquer и немного оптимизаций для маленьких простых и получить AC. Но мы пойдем другим путем.
Вторая идея
Идея с масками прикольная, но асимптотика получается слишком большой. Вместо этого можно воспользоваться методом сканирующей прямой. Будем перебирать левую границу запросов l
от больших к меньшим и для каждого числа x
поддерживать массив first[x]
— наименьшая позиция >= l
такая, что gcd(x, a[first[x]]) != 1
. Тогда чтобы ответить на запрос (l, r)
нужно посчитать количество позиций x
таких, что first[x] <= r
. Это можно сделать простым циклом, который хорошо оптимизируется и работает быстро.
Осталось только научиться быстро пересчитывать массив first
, когда передвигаем указатель l
. Все что нужно сделать — посмотреть на mask[a[l]]
и для всех позиций x
, где проставлен бит, обновить first[x] = l
.
mask[x]
Давайте обсудим как хранить mask[x]
. Во-первых, он занимает довольно много памяти. Сам массив имеет длину 2⋅10^4, и для каждого элемента нужно сохранить 10^5 бит. Суммарно это уже порядка 250мб, а memory limit в задаче 256 мегабайт.
Во-вторых, если мы хотим быстро пересчитывать массив first
, то получение i
-го бита mask[x]
должно работать очень быстро (и использовать операции, которые можно ускорить с помощью simd). А в самом простом случае получение i
-го бита из битсета требует каких-то битовых сдвигов, которые вряд ли соптимизируются.
На самом деле существуют simd инструкции, которые могут проставить конкретные элементы в одном массиве из другого исходя из проставленных битов в маске. Но для этого компилятор должен догадаться, как трансформировать биты из битсета в формат, который нужен для этих инструкций. Если вы умеете помогать компилятору это делать без написания инструкций руками — расскажите мне :)
Если же хранить mask[x]
как просто массив bool/u8
, то код, который обновляет first
, будет хорошо векторизоваться. Но тогда он будет занимать в 8 раз больше памяти и не будет помещаться в 256 мегабайт.
Давайте не хранить все элементы mask
, а только первые сколько-то элементов (например, 2400, чтобы все еще помещаться в ML). Если мы хотим узнать какой-нибудь mask[a * b]
, то можно просто посчитать mask[a] | mask[b]
.
Как для конкретно x
найти разложение x = a * b
такое, чтобы a
и b
были не очень большими? Давайте посмотрим на наибольший простой делитель x
(назовем его p
). Если и для p
и для x/p
у нас есть подсчитанная маска, то все хорошо. Иначе p
очень большое, а mask[p]
состоит только из битов вида p*i
, так что такую маску можно создать заново за время пропорциональное N/p
.
Реализация
Как и в прошлый раз будем выделять горячие куски кода в отдельные функции и включать для них simd оптимизации. Например, функция обновление массива first
из какой-то маски будет выглядеть так:
#[target_feature(enable = "avx2")] unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) { for i in 0..first.len() { if mask[i] { first[i] = new_value; } } }
#[target_feature(enable = "avx2")] unsafe fn calc_res(first: &[u32], r: u32) -> i32 { first.iter().map(|x| (*x <= r) as i32).sum() }
for l in (0..n).rev() { let cur = a[l]; unsafe { let l = l as u32; if cur < mask.len() { update_first(&mut first, &mask[cur], l) } else { let p = largest_prime[cur]; if p < mask.len() { update_first(&mut first, &mask[cur / p], l); update_first(&mut first, &mask[p], l); } else { update_first(&mut first, &mask[cur / p], l); for i in (p..first.len()).step_by(p) { first[i] = l; } } } } for query in queries[l].iter() { res[query.id] = max_v as i32 - unsafe { calc_res(&first, query.r as u32) }; } }
Массив first
имеет тип [u32]
, а не [usize]
, потому что usize
64 бита и работает медленнее. Поэтому приходится добавлять касты из одного типа в другой :(
К сожалению ровно в таком виде решение работает долго. На случайном тесте локально оно работает 1.7с, а на серверах CodeForces — 4.4c (TL в задаче 3с).
Профилируем
Почему такая большая разница между локальным временем работы и на КФ?
Если бы у нас была возможность запускать любые программы на серверах КФ, можно было бы попробовать использовать perf
или какую-нибудь другую утилиту, чтобы понять, что конкретно тормозит. Но ее нет.
Вместо этого просто попробуем минимизировать пример, на котором видно проблему. Померяем отдельно сколько работают функции update_first
и calc_res
. Будем измерять скорость с помощью другой небольшой функции:
#[inline(never)] fn measure<F>(f: &mut F) where F: FnMut(), { let start = Instant::now(); const MAX_MILLIS: u128 = 1000; let mut iters = 0; while start.elapsed().as_millis() < MAX_MILLIS { iters += 1; f(); } println!( "{} iters, av.time: {}mcs", iters, start.elapsed().as_secs_f64() / (iters as f64) * 1000.0 * 1000.0 ); }
Сгенерируем случайные массивы и посмотрим, сколько будут работать функции:
pub fn main() { let mut rnd = Random::new(787788); const N: usize = 100_000; let mut first = vec![0; N]; let mask = gen_vec(N, |_| rnd.gen_bool()); measure(&mut || unsafe { update_first(&mut first, &mask, 123) }); }
70355 iters, av.time: 14.213780456257549mcs
pub fn main() { let mut rnd = Random::new(787788); const N: usize = 100_000; let first = gen_vec(N, |_| rnd.gen_u32(100)); let mut hash = 0; measure(&mut || unsafe { hash += calc_res(&first, 50); }); }
В этом случае еще посчитаем какое-то число hash
, чтобы компилятор не мог просто не выполнять код внутри calc_res
.
115493 iters, av.time: 8.658555583455275mcs
В худшем случае наше решение 10^5 раз вызывает (2 раза update_first
и 1 раз calc_res
), что по идее должно работать 10^5 * (2 * 14.2 + 8.65) = 3.7c, а на случайных данных еще меньше. А решение в запуске на КФ почему-то работало 4.4с.
Секрет довольно прост — в случае с маленьким тестом мы запускаем update_fist
на одном и том же массиве mask
, а на реальных данных каждый раз берем случайный, который скорее всего не лежит в кеше.
Чтобы проверить эту гипотезу можно немного переписать тест:
pub fn main() { let mut rnd = Random::new(787788); const N: usize = 100_000; let mut mask = vec![vec![false; N]; 2400]; for it in 0..mask.len() { for i in 0..N { mask[it][i] = rnd.gen_bool(); } } let mut first = vec![0; N]; measure(&mut || unsafe { update_first(&mut first, &mask[rnd.gen_usize(mask.len())], 123) }); }
Который работает в полтора раза дольше:
40216 iters, av.time: 24.866071638153972mcs
Улучшаем
Теоретически, когда программа обращается к последовательным элементам массива, cpu hardware prefetcher должен заметить этот паттерн и подгружать следующие элементы массива в кеш до того как они потребуются. Но в данном случае почему-то этого не происходит.
Но мы можем загрузить нужные элементы в кеш руками. Сделаем небольшую функцию, которая будет нам помогать:
fn prefetch(ptr: *const i8) { unsafe { core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(ptr); } }
Функция _mm_prefetch
принимает не только адрес, но и уровень кеша, в который нужно загружать данные. Мы будем подгружать во все уровни сразу.
Перепишем функцию update_first
с использованием prefetch
. Самая простая версия может просто подгружать данные, которые находятся с каким-то сдвигом от текущих обрабатываемых элементов:
#[target_feature(enable = "avx2")] unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) { const SHIFT: usize = 1024; for i in 0..first.len() { if mask[i] { first[i] = new_value; } prefetch(mask.as_ptr().add(i + SHIFT) as *const i8); } }
Но такая версия работает гораздо хуже исходной. Как минимум потому, что обновление first
хорошо векторизовалось, а prefetch
делается для каждого отдельного байта. На самом деле этого делать не нужно, так как процессор подгружает сразу кеш линию в 64 байта. Так что можно вызывать его только каждую 64-ю итерацию.
Чтобы цикл все еще хорошо векторизовался можно разделить его на блоки. После обработки каждого блока, будем подгружать следующий блок в кеш.
#[target_feature(enable = "avx2")] unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) { let n = first.len(); let mask = &mask[..n]; const SHIFT: usize = 1024; for start in (0..n).step_by(SHIFT) { for i in start..min(start + SHIFT, n) { if mask[i] { first[i] = new_value; } } for it in 0..SHIFT / 64 { prefetch(mask.as_ptr().add(start + SHIFT + it * 64) as *const i8); } } }
Такой код уже работает быстрее:
51547 iters, av.time: 19.399798669175702mcs
Но почему-то он все еще работает дольше, чем когда используется один и тот же массив mask
:(
Но даже этого (плюс еще каких-то небольших оптимизаций) хватает, чтобы получить АС на текущих тестах!