O(N^2). Часть 2: prefetching
Продолжаю рассказывать о том, как получать AC с простыми решениями, у которых асимптотика сильно хуже чем хотели авторы задачи.
В этот раз речь пойдет о задаче CF814F. Ее формулировка довольно простая. Есть массив a из 10^5 чисел, каждое из которых не больше 2⋅10^4. Нужно ответить на 10^5 запросов (l, r). Для каждого запроса нужно посчитать количество взаимнопростых чисел от 1 до 10^5 с числом a[l] * a[l + 1] * ... * a[r].
Авторское решение — какая-то корневая со сложной асимптотикой. Мы же будем писать простое решение, которое работает за O(N^2).
Первая идея
Для каждого числа x от 1 до 2⋅10^4 посчитаем битовую маску mask[x] длиной 10^5, где i-й бит проставлен, если gcd(x, i) != 1. Тогда чтобы ответить на запрос (l, r) нужно посчитать mask[a[l]] | mask[a[l + 1]] | ... | mask[a[r]], а потом посчитать количество проставленных бит в полученной маске.
Ровно в таком виде решение имеет асимптотику O(N^3) и естественно будет работать долго. Но можно добавить divide&conquer и немного оптимизаций для маленьких простых и получить AC. Но мы пойдем другим путем.
Вторая идея
Идея с масками прикольная, но асимптотика получается слишком большой. Вместо этого можно воспользоваться методом сканирующей прямой. Будем перебирать левую границу запросов l от больших к меньшим и для каждого числа x поддерживать массив first[x] — наименьшая позиция >= l такая, что gcd(x, a[first[x]]) != 1. Тогда чтобы ответить на запрос (l, r) нужно посчитать количество позиций x таких, что first[x] <= r. Это можно сделать простым циклом, который хорошо оптимизируется и работает быстро.
Осталось только научиться быстро пересчитывать массив first, когда передвигаем указатель l. Все что нужно сделать — посмотреть на mask[a[l]] и для всех позиций x, где проставлен бит, обновить first[x] = l.
mask[x]
Давайте обсудим как хранить mask[x]. Во-первых, он занимает довольно много памяти. Сам массив имеет длину 2⋅10^4, и для каждого элемента нужно сохранить 10^5 бит. Суммарно это уже порядка 250мб, а memory limit в задаче 256 мегабайт.
Во-вторых, если мы хотим быстро пересчитывать массив first, то получение i-го бита mask[x] должно работать очень быстро (и использовать операции, которые можно ускорить с помощью simd). А в самом простом случае получение i-го бита из битсета требует каких-то битовых сдвигов, которые вряд ли соптимизируются.
На самом деле существуют simd инструкции, которые могут проставить конкретные элементы в одном массиве из другого исходя из проставленных битов в маске. Но для этого компилятор должен догадаться, как трансформировать биты из битсета в формат, который нужен для этих инструкций. Если вы умеете помогать компилятору это делать без написания инструкций руками — расскажите мне :)
Если же хранить mask[x] как просто массив bool/u8, то код, который обновляет first, будет хорошо векторизоваться. Но тогда он будет занимать в 8 раз больше памяти и не будет помещаться в 256 мегабайт.
Давайте не хранить все элементы mask, а только первые сколько-то элементов (например, 2400, чтобы все еще помещаться в ML). Если мы хотим узнать какой-нибудь mask[a * b], то можно просто посчитать mask[a] | mask[b].
Как для конкретно x найти разложение x = a * b такое, чтобы a и b были не очень большими? Давайте посмотрим на наибольший простой делитель x (назовем его p). Если и для p и для x/p у нас есть подсчитанная маска, то все хорошо. Иначе p очень большое, а mask[p] состоит только из битов вида p*i, так что такую маску можно создать заново за время пропорциональное N/p.
Реализация
Как и в прошлый раз будем выделять горячие куски кода в отдельные функции и включать для них simd оптимизации. Например, функция обновление массива first из какой-то маски будет выглядеть так:
#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
for i in 0..first.len() {
if mask[i] {
first[i] = new_value;
}
}
}#[target_feature(enable = "avx2")]
unsafe fn calc_res(first: &[u32], r: u32) -> i32 {
first.iter().map(|x| (*x <= r) as i32).sum()
}for l in (0..n).rev() {
let cur = a[l];
unsafe {
let l = l as u32;
if cur < mask.len() {
update_first(&mut first, &mask[cur], l)
} else {
let p = largest_prime[cur];
if p < mask.len() {
update_first(&mut first, &mask[cur / p], l);
update_first(&mut first, &mask[p], l);
} else {
update_first(&mut first, &mask[cur / p], l);
for i in (p..first.len()).step_by(p) {
first[i] = l;
}
}
}
}
for query in queries[l].iter() {
res[query.id] = max_v as i32 - unsafe { calc_res(&first, query.r as u32) };
}
}Массив first имеет тип [u32], а не [usize], потому что usize 64 бита и работает медленнее. Поэтому приходится добавлять касты из одного типа в другой :(
К сожалению ровно в таком виде решение работает долго. На случайном тесте локально оно работает 1.7с, а на серверах CodeForces — 4.4c (TL в задаче 3с).
Профилируем
Почему такая большая разница между локальным временем работы и на КФ?
Если бы у нас была возможность запускать любые программы на серверах КФ, можно было бы попробовать использовать perf или какую-нибудь другую утилиту, чтобы понять, что конкретно тормозит. Но ее нет.
Вместо этого просто попробуем минимизировать пример, на котором видно проблему. Померяем отдельно сколько работают функции update_first и calc_res. Будем измерять скорость с помощью другой небольшой функции:
#[inline(never)]
fn measure<F>(f: &mut F)
where
F: FnMut(),
{
let start = Instant::now();
const MAX_MILLIS: u128 = 1000;
let mut iters = 0;
while start.elapsed().as_millis() < MAX_MILLIS {
iters += 1;
f();
}
println!(
"{} iters, av.time: {}mcs",
iters,
start.elapsed().as_secs_f64() / (iters as f64) * 1000.0 * 1000.0
);
}Сгенерируем случайные массивы и посмотрим, сколько будут работать функции:
pub fn main() {
let mut rnd = Random::new(787788);
const N: usize = 100_000;
let mut first = vec![0; N];
let mask = gen_vec(N, |_| rnd.gen_bool());
measure(&mut || unsafe { update_first(&mut first, &mask, 123) });
}70355 iters, av.time: 14.213780456257549mcs
pub fn main() {
let mut rnd = Random::new(787788);
const N: usize = 100_000;
let first = gen_vec(N, |_| rnd.gen_u32(100));
let mut hash = 0;
measure(&mut || unsafe {
hash += calc_res(&first, 50);
});
}В этом случае еще посчитаем какое-то число hash, чтобы компилятор не мог просто не выполнять код внутри calc_res.
115493 iters, av.time: 8.658555583455275mcs
В худшем случае наше решение 10^5 раз вызывает (2 раза update_first и 1 раз calc_res), что по идее должно работать 10^5 * (2 * 14.2 + 8.65) = 3.7c, а на случайных данных еще меньше. А решение в запуске на КФ почему-то работало 4.4с.
Секрет довольно прост — в случае с маленьким тестом мы запускаем update_fist на одном и том же массиве mask, а на реальных данных каждый раз берем случайный, который скорее всего не лежит в кеше.
Чтобы проверить эту гипотезу можно немного переписать тест:
pub fn main() {
let mut rnd = Random::new(787788);
const N: usize = 100_000;
let mut mask = vec![vec![false; N]; 2400];
for it in 0..mask.len() {
for i in 0..N {
mask[it][i] = rnd.gen_bool();
}
}
let mut first = vec![0; N];
measure(&mut || unsafe { update_first(&mut first, &mask[rnd.gen_usize(mask.len())], 123) });
}Который работает в полтора раза дольше:
40216 iters, av.time: 24.866071638153972mcs
Улучшаем
Теоретически, когда программа обращается к последовательным элементам массива, cpu hardware prefetcher должен заметить этот паттерн и подгружать следующие элементы массива в кеш до того как они потребуются. Но в данном случае почему-то этого не происходит.
Но мы можем загрузить нужные элементы в кеш руками. Сделаем небольшую функцию, которая будет нам помогать:
fn prefetch(ptr: *const i8) {
unsafe {
core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(ptr);
}
}Функция _mm_prefetch принимает не только адрес, но и уровень кеша, в который нужно загружать данные. Мы будем подгружать во все уровни сразу.
Перепишем функцию update_first с использованием prefetch. Самая простая версия может просто подгружать данные, которые находятся с каким-то сдвигом от текущих обрабатываемых элементов:
#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
const SHIFT: usize = 1024;
for i in 0..first.len() {
if mask[i] {
first[i] = new_value;
}
prefetch(mask.as_ptr().add(i + SHIFT) as *const i8);
}
}Но такая версия работает гораздо хуже исходной. Как минимум потому, что обновление first хорошо векторизовалось, а prefetch делается для каждого отдельного байта. На самом деле этого делать не нужно, так как процессор подгружает сразу кеш линию в 64 байта. Так что можно вызывать его только каждую 64-ю итерацию.
Чтобы цикл все еще хорошо векторизовался можно разделить его на блоки. После обработки каждого блока, будем подгружать следующий блок в кеш.
#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
let n = first.len();
let mask = &mask[..n];
const SHIFT: usize = 1024;
for start in (0..n).step_by(SHIFT) {
for i in start..min(start + SHIFT, n) {
if mask[i] {
first[i] = new_value;
}
}
for it in 0..SHIFT / 64 {
prefetch(mask.as_ptr().add(start + SHIFT + it * 64) as *const i8);
}
}
} Такой код уже работает быстрее:
51547 iters, av.time: 19.399798669175702mcs
Но почему-то он все еще работает дольше, чем когда используется один и тот же массив mask:(
Но даже этого (плюс еще каких-то небольших оптимизаций) хватает, чтобы получить АС на текущих тестах!