August 22, 2022

O(N^2). Часть 2: prefetching

Продолжаю рассказывать о том, как получать AC с простыми решениями, у которых асимптотика сильно хуже чем хотели авторы задачи.

В этот раз речь пойдет о задаче CF814F. Ее формулировка довольно простая. Есть массив a из 10^5 чисел, каждое из которых не больше 2⋅10^4. Нужно ответить на 10^5 запросов (l, r). Для каждого запроса нужно посчитать количество взаимнопростых чисел от 1 до 10^5 с числом a[l] * a[l + 1] * ... * a[r].

Авторское решение — какая-то корневая со сложной асимптотикой. Мы же будем писать простое решение, которое работает за O(N^2).

Первая идея

Для каждого числа x от 1 до 2⋅10^4 посчитаем битовую маску mask[x] длиной 10^5, где i-й бит проставлен, если gcd(x, i) != 1. Тогда чтобы ответить на запрос (l, r) нужно посчитать mask[a[l]] | mask[a[l + 1]] | ... | mask[a[r]], а потом посчитать количество проставленных бит в полученной маске.

Ровно в таком виде решение имеет асимптотику O(N^3) и естественно будет работать долго. Но можно добавить divide&conquer и немного оптимизаций для маленьких простых и получить AC. Но мы пойдем другим путем.

Вторая идея

Идея с масками прикольная, но асимптотика получается слишком большой. Вместо этого можно воспользоваться методом сканирующей прямой. Будем перебирать левую границу запросов l от больших к меньшим и для каждого числа x поддерживать массив first[x] — наименьшая позиция >= l такая, что gcd(x, a[first[x]]) != 1. Тогда чтобы ответить на запрос (l, r) нужно посчитать количество позиций x таких, что first[x] <= r. Это можно сделать простым циклом, который хорошо оптимизируется и работает быстро.

Осталось только научиться быстро пересчитывать массив first, когда передвигаем указатель l. Все что нужно сделать — посмотреть на mask[a[l]] и для всех позиций x, где проставлен бит, обновить first[x] = l.

mask[x]

Давайте обсудим как хранить mask[x]. Во-первых, он занимает довольно много памяти. Сам массив имеет длину 2⋅10^4, и для каждого элемента нужно сохранить 10^5 бит. Суммарно это уже порядка 250мб, а memory limit в задаче 256 мегабайт.

Во-вторых, если мы хотим быстро пересчитывать массив first, то получение i-го бита mask[x] должно работать очень быстро (и использовать операции, которые можно ускорить с помощью simd). А в самом простом случае получение i-го бита из битсета требует каких-то битовых сдвигов, которые вряд ли соптимизируются.

На самом деле существуют simd инструкции, которые могут проставить конкретные элементы в одном массиве из другого исходя из проставленных битов в маске. Но для этого компилятор должен догадаться, как трансформировать биты из битсета в формат, который нужен для этих инструкций. Если вы умеете помогать компилятору это делать без написания инструкций руками — расскажите мне :)

Если же хранить mask[x] как просто массив bool/u8, то код, который обновляет first, будет хорошо векторизоваться. Но тогда он будет занимать в 8 раз больше памяти и не будет помещаться в 256 мегабайт.

Давайте не хранить все элементы mask, а только первые сколько-то элементов (например, 2400, чтобы все еще помещаться в ML). Если мы хотим узнать какой-нибудь mask[a * b], то можно просто посчитать mask[a] | mask[b].

Как для конкретно x найти разложение x = a * b такое, чтобы a и b были не очень большими? Давайте посмотрим на наибольший простой делитель x (назовем его p). Если и для p и для x/p у нас есть подсчитанная маска, то все хорошо. Иначе p очень большое, а mask[p] состоит только из битов вида p*i, так что такую маску можно создать заново за время пропорциональное N/p.

Реализация

Как и в прошлый раз будем выделять горячие куски кода в отдельные функции и включать для них simd оптимизации. Например, функция обновление массива first из какой-то маски будет выглядеть так:

#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
    for i in 0..first.len() {
        if mask[i] {
            first[i] = new_value;
        }
    }
}

Подсчет ответа так:

#[target_feature(enable = "avx2")]
unsafe fn calc_res(first: &[u32], r: u32) -> i32 {
    first.iter().map(|x| (*x <= r) as i32).sum()
}

А главная часть решения так:

for l in (0..n).rev() {
    let cur = a[l];

    unsafe {
        let l = l as u32;
        if cur < mask.len() {
            update_first(&mut first, &mask[cur], l)
        } else {
            let p = largest_prime[cur];

            if p < mask.len() {
                update_first(&mut first, &mask[cur / p], l);
                update_first(&mut first, &mask[p], l);
            } else {
                update_first(&mut first, &mask[cur / p], l);
                for i in (p..first.len()).step_by(p) {
                    first[i] = l;
                }
            }
        }
    }

    for query in queries[l].iter() {
        res[query.id] = max_v as i32 - unsafe { calc_res(&first, query.r as u32) };
    }
}

Массив first имеет тип [u32], а не [usize], потому что usize 64 бита и работает медленнее. Поэтому приходится добавлять касты из одного типа в другой :(

К сожалению ровно в таком виде решение работает долго. На случайном тесте локально оно работает 1.7с, а на серверах CodeForces — 4.4c (TL в задаче 3с).

Профилируем

Почему такая большая разница между локальным временем работы и на КФ?

Если бы у нас была возможность запускать любые программы на серверах КФ, можно было бы попробовать использовать perf или какую-нибудь другую утилиту, чтобы понять, что конкретно тормозит. Но ее нет.

Вместо этого просто попробуем минимизировать пример, на котором видно проблему. Померяем отдельно сколько работают функции update_first и calc_res. Будем измерять скорость с помощью другой небольшой функции:

#[inline(never)]
fn measure<F>(f: &mut F)
where
    F: FnMut(),
{
    let start = Instant::now();
    const MAX_MILLIS: u128 = 1000;
    let mut iters = 0;
    while start.elapsed().as_millis() < MAX_MILLIS {
        iters += 1;
        f();
    }
    println!(
        "{} iters, av.time: {}mcs",
        iters,
        start.elapsed().as_secs_f64() / (iters as f64) * 1000.0 * 1000.0
    );
}

Сгенерируем случайные массивы и посмотрим, сколько будут работать функции:

pub fn main() {
    let mut rnd = Random::new(787788);
    const N: usize = 100_000;
    let mut first = vec![0; N];
    let mask = gen_vec(N, |_| rnd.gen_bool());
    measure(&mut || unsafe { update_first(&mut first, &mask, 123) });
}

Результат:

70355 iters, av.time: 14.213780456257549mcs

Аналогично для calc_res:

pub fn main() {
    let mut rnd = Random::new(787788);
    const N: usize = 100_000;
    let first = gen_vec(N, |_| rnd.gen_u32(100));
    let mut hash = 0;
    measure(&mut || unsafe {
        hash += calc_res(&first, 50);
    });
}

В этом случае еще посчитаем какое-то число hash, чтобы компилятор не мог просто не выполнять код внутри calc_res.

Результат получается такой:

115493 iters, av.time: 8.658555583455275mcs

В худшем случае наше решение 10^5 раз вызывает (2 раза update_first и 1 раз calc_res), что по идее должно работать 10^5 * (2 * 14.2 + 8.65) = 3.7c, а на случайных данных еще меньше. А решение в запуске на КФ почему-то работало 4.4с.

Секрет довольно прост — в случае с маленьким тестом мы запускаем update_fist на одном и том же массиве mask, а на реальных данных каждый раз берем случайный, который скорее всего не лежит в кеше.

Чтобы проверить эту гипотезу можно немного переписать тест:

pub fn main() {
    let mut rnd = Random::new(787788);
    const N: usize = 100_000;
    let mut mask = vec![vec![false; N]; 2400];
    for it in 0..mask.len() {
        for i in 0..N {
            mask[it][i] = rnd.gen_bool();
        }
    }
    let mut first = vec![0; N];
    measure(&mut || unsafe { update_first(&mut first, &mask[rnd.gen_usize(mask.len())], 123) });
}

Который работает в полтора раза дольше:

40216 iters, av.time: 24.866071638153972mcs

Улучшаем

Теоретически, когда программа обращается к последовательным элементам массива, cpu hardware prefetcher должен заметить этот паттерн и подгружать следующие элементы массива в кеш до того как они потребуются. Но в данном случае почему-то этого не происходит.

Но мы можем загрузить нужные элементы в кеш руками. Сделаем небольшую функцию, которая будет нам помогать:

fn prefetch(ptr: *const i8) {
    unsafe {
        core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(ptr);
    }
}

Функция _mm_prefetch принимает не только адрес, но и уровень кеша, в который нужно загружать данные. Мы будем подгружать во все уровни сразу.

Перепишем функцию update_first с использованием prefetch. Самая простая версия может просто подгружать данные, которые находятся с каким-то сдвигом от текущих обрабатываемых элементов:

#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
    const SHIFT: usize = 1024;
    for i in 0..first.len() {
        if mask[i] {
            first[i] = new_value;
        }
        prefetch(mask.as_ptr().add(i + SHIFT) as *const i8);
    }
}

Но такая версия работает гораздо хуже исходной. Как минимум потому, что обновление first хорошо векторизовалось, а prefetch делается для каждого отдельного байта. На самом деле этого делать не нужно, так как процессор подгружает сразу кеш линию в 64 байта. Так что можно вызывать его только каждую 64-ю итерацию.

Чтобы цикл все еще хорошо векторизовался можно разделить его на блоки. После обработки каждого блока, будем подгружать следующий блок в кеш.

#[target_feature(enable = "avx2")]
unsafe fn update_first(first: &mut [u32], mask: &[bool], new_value: u32) {
    let n = first.len();
    let mask = &mask[..n];
    const SHIFT: usize = 1024;
    for start in (0..n).step_by(SHIFT) {
        for i in start..min(start + SHIFT, n) {
            if mask[i] {
                first[i] = new_value;
            }
        }
        for it in 0..SHIFT / 64 {
            prefetch(mask.as_ptr().add(start + SHIFT + it * 64) as *const i8);
        }
    }
} 

Такой код уже работает быстрее:

51547 iters, av.time: 19.399798669175702mcs

Но почему-то он все еще работает дольше, чем когда используется один и тот же массив mask:(

Но даже этого (плюс еще каких-то небольших оптимизаций) хватает, чтобы получить АС на текущих тестах!