OOM: direct memory при работе с сетью TCP/IP через NIO в Java
Вступление
Привет, меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut. Работаю в компании без малого 20 лет, из них 18 пишу на Java.
Сегодня я расскажу об опыте увеличения производительности сетевого стэка и проблемах, с которыми можно столкнуться при использовании NIO в Java.
Эта статья основана на реальной практике борьбы с "OutOfMemory: direct memory" в шине данных гибридной интеграционной платформы.
Группа Platform Core, которой я руковожу, занимается разработкой и развитием гибридной интеграционной платформы, поддержкой систем и сервисов, написанных на платформе.
- Шину данных ESB.
- Приложения API Gateway, SLES (сервер исполнения бизнес-процессов), SA Container (сервер с сервисами на Java), Notification Broker.
- Платформенные сервисы: Scheduler, Service Profile Management и прочие.
- Поддержку интеграции со Spring.
Итак, начнём с предпосылок, которые подвигли заняться анализом данной проблемы.
Особенности работы шины данных в Bercut
Наша гибридная интеграционная платформа имеет свою транспортную шину (RTSIB). Это ESB (enterprise service bus) в рамках архитектуры SOA (service-oriented architecture) со своими стеками HTTP и проприетарного асинхронного протокола RTSIB.
По своей сути это mesh-сеть между разными узлами и приложениями платформы.
Каждое RTSIB соединение обслуживается двумя потоками - читающим и пишущим, при этом пишущий поток вступает в игру только в том случае, если появились данные для отправки, а сокет занят отправкой другого пакета. В этом случае текущий добавляется в очередь отправки. Если же сокет свободен и доступен для записи, то запись в сокет происходит прямо с потока бизнес-логики.
HTTP соединения (на текущий момент мы поддерживаем версию 1.x) в виду синхронной архитектуры не требуют большого количества потоков обслуживания, потоки для них достаются из пула - примерно один поток на 25 соединений.
Такой подход имеет как плюсы и минусы, сегодня обсуждать мы их не будем, а просто возьмём за исходные данные, что на каждом приложении нашей платформы существует довольно большое количество потоков, которые работают с сокетами.
Особенности работы с сетью TCP/IP в неблокирующем режиме в Java
Кто уже интересовался тем, как работает запись в сокет на Java или просто любит смотреть исходные коды JDK, вероятно знает основные особенности работы с сокетом, но для понимания проблемы предлагаю ещё раз их проговорить.
Из исходных кодов Open JDK 13 (основная версия, используемой у нас Java) видно, что если записываемый ByteBuffer является DirectByteBuffer, то запись происходит сразу (writeFromNativeBuffer), а если он расположен в Heap, то сначала достаётся временный DirectByteBuffer, производится копирование и запись из временного DirectByteBuffer.
Код записи в сокет из Open JDK 13 ( IOUtil.java ):
static int write(FileDescriptor fd, ByteBuffer src, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { if (src instanceof DirectBuffer) { return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd); } // Substitute a native buffer int pos = src.position(); int lim = src.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); ByteBuffer bb; if (directIO) { Util.checkRemainingBufferSizeAligned(rem, alignment); bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment); } else { bb = Util.getTemporaryDirectBuffer(rem); } try { bb.put(src); bb.flip(); // Do not update src until we see how many bytes were written src.position(pos); int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd); if (n > 0) { // now update src src.position(pos + n); } return n; } finally { Util.offerFirstTemporaryDirectBuffer(bb); } }
Дополнительно осложняет ситуацию то, что внутри реализации JDK имеется КЭШ DirectByteBuffer с привязкой к потоку (ThreadLocal):
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() { @Override protected BufferCache initialValue() { return new BufferCache(); } @Override protected void threadTerminated(BufferCache cache) { // will never be null while (!cache.isEmpty()) { ByteBuffer bb = cache.removeFirst(); free(bb); } } }; public static ByteBuffer getTemporaryDirectBuffer(int size) { // If a buffer of this size is too large for the cache, there // should not be a buffer in the cache that is at least as // large. So we'll just create a new one. Also, we don't have // to remove the buffer from the cache (as this method does // below) given that we won't put the new buffer in the cache. if (isBufferTooLarge(size)) { return ByteBuffer.allocateDirect(size); } BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } }
И после каждого использования временного DirectByteBuffer, он помещается в КЭШ. При этом, если в КЭШе нет DirectByteBuffer необходимого размера, он аллоцируется и после использования также помещается в КЭШ ( Util.offerFirstTemporaryDirectBuffer(bb) ).
Суть проблемы
В первых версиях платформы мы просто использовали HeapByteBuffer через простую и понятную static-функцию ByteBuffer.wrap(byte[] data) и бед вроде как не знали.
Всё работало, скорость была достаточная для текущих telecom-сервисов, работающих на платформе, но в один прекрасный день размер данных DWH (Data Warehouse), проходящих через нашу шину достиг критического объёма в мегабайтах и мы получили OOM Direct Memory.
Почему же так произошло? А вот почему: как обозначил выше, мы имеем mesh-сеть с множеством обслуживающих потоков и имеем данные большого размера, проходящие через эти потоки, которые складывают off-heap память в ThreadLocal КЭШи этих потоков. Достигнув предела насыщения использования off-heap памяти мы получаем OOM. Конечно, первым действием было увеличение параметра запуска JVM "-XX:MaxDirectMemorySize". Размер используемой direct памяти пришлось увеличить, потом увеличить ещё. Это стало тем самым звоночком, что с проблемой надо разбираться и как можно скорее.
Анализ возможных путей решения
После осознания проблемы, мы провели анализ возможных путей её решения и нашли следующие варианты:
- Писать в цикле блоками, сдвигая вручную position и limit в записываемом ByteBuffer. Это должно помочь, так как в IOUtils временный DirectByteBuffer выделяется размером size = limit - position.
- Перейти на использование ByteBuffer.allocateDirect().
- Написать промежуточную абстракцию, содержащую нарезку из ByteBuffer одного размера, где ByteBuffer одного размера берутся из общего пула и после использования возвращаются обратно.
Погружаясь всё глубже в исследование проблемы стало понятно, что необходимо провести сравнительное тестирование разных размеров ByteBuffer и разных вариантов их использования для чтения и записи из сокета.
За несколько часов я написал тестовое приложение, которое эмулирует 4 вида работы с сокетом:
- Аллоцирование HeapByteBuffer при каждой записи/чтении.
- Аллоцирование DirectByteBuffer при каждой записи/чтении.
- Переиспользование HeapByteBuffer при каждой записи/чтении.
- Переиспользование DirectByteBuffer при каждой записи/чтении.
В тестовом приложении отсутствует маршаллинг (заполнение реальными данными), а присутствует только работа по записи и чтению из сокета с разными вариантами использования ByteBuffer.
Здесь приводить исходные коды не буду, но кто желает может ознакомится с ними на Github.
На выходе мы получили такую картину:
Из результатов видно, что до 1Mb самым медленным вариантом является аллокация DirectByteBuffer. Аллокация HeapByteBuffer через wrap и кэшированный HeapByteBuffer примерно равны с небольшим лидерством кэшированного. Из общей картины выбивается кэшированный DirectByteBuffer, что логично, так как пишется он напрямую, а время на аллокацию отсутствует.
Выбор и реализация
Для реализации выбрали 3 вариант решения проблемы: написать промежуточную абстракцию, содержащую нарезку из DirectByteBuffer одного размера, которые берутся и возвращаются в общий пул. За основной размер части пакета (размер DirectByteBuffer) было выбрано значение в 32Kb как минимальный по размеру пик при тестировании пропускной способности. Безусловно, так как у нас реализован и стэк HTTPs, фабрика может отдавать и пул с отличными от 32Kb размерами DirectByteBuffer, опираясь на PacketBufferSize и ApplicationBufferSize из настроек текущей сессии SSLEngine.
При написании слоя абстракции, названной CompositeBuffer, конечно же я реализовал и Input/Output streams, работающие напрямую с CompositeBuffer. Это было необходимо для нормальной работы слоя marshalling/unmarshalling.
В качестве хранилища уже аллоцированных DirectByteBuffer сделал простой стэк на CAS механизме:
public class CasStack<L extends LinkedObject<L>> { public interface LinkedObject<L extends LinkedObject> { public L getNext(); public void setNext(L next); } private final AtomicReference<L> head = new AtomicReference<>(); public void add(L lo) { for (;;) { lo.setNext(head.get()); if (head.compareAndSet(lo.getNext(), lo)) { return; } } } public L poll() { L lo; for (;;) { lo = head.get(); if (lo == null) { return null; } if (head.compareAndSet(lo, lo.getNext())) { lo.setNext(null); return lo; } } } }
А примерно вот так выглядит часть основного класса CompositeBuffer в разрезе работы с чтением из сокета и записью в сокет (код был адаптирован для статьи):
DirectByteBuffer[] buffers; int pos; @Override public int getBufIndex(int position) { return position / pool.getPartCapacity(); } @Override public int read(ReadableByteChannel channel) throws IOException { ensureCapacity(); int cur = getBufIndex(pos), readed = 0, read; for (;;) { try { if (hasRemaining()) { if (buffers[cur].hasRemaining()) { read = channel.read(containers[cur]); if (read < 0) { return readed > 0 ? readed : read; } pos += read; readed += read; if (containers[cur].hasRemaining()) { return readed; } } cur++; if (cur == containers.length) { expandCapacity(); } } else { return readed; } } catch (IOException) { if (readed > 0) { return readed; } throw ex; } } } @Override public int write(WritableByteChannel channel) throws IOException { int cur = getBufIndex(pos), writed = 0, write; for (;;) { try { if (hasRemaining()) { if (buffers[cur].hasRemaining()) { write = channel.write(buffers[cur]); if (write < 0) { return writed > 0 ? writed : write; } pos += write; writed += write; if (buffers[cur].hasRemaining()) { return writed; } } cur++; if (cur == containers.length) { if (writed == 0 && hasRemaining()) { release(); throw new CompositeBufferLifecycleError(); } return writed; } } else { return writed; } } catch (IOException ex) { if (writed > 0) { return writed; } throw ex; } } }
Конечно же пришлось написать далеко не один класс, а ещё несколько уровней абстракции, таких, как DirectContainer и механизмы addRef/releaseRef, проверку на ошибки жизненного цикла всей библиотеки и многое другое.
Заключение
По завершению оптимизации и переходу на переиспользуемый DirectByteBuffer пропускная способность шины увеличилась примерно вдвое.
До данной доработки размер off-heap памяти мог достигать 1-3Gb и складывался из максимальных размеров сообщений, прошедших через каждое соединение.
Сейчас же потребление off-heap памяти пулами довольно скромное - на среднем сервисе оно составляет всего 10-20 Mb.
На более сложном компоненте с парой сотен входящих вызовов в секунду, которые порождают до 15 тысяч внутренних вызовов на каждый входящий вызов - размер off-heap пула занимает менее 100Mb.
Важно ещё и то, что теперь off-heap память, используемая приложениями для работы с сетью более контролируема и растёт не от корреляции с количеством соединений (потоков), а только от корреляции с количеством сообщений проходящих через шину в единицу времени и средним размером сообщения.
При этом надо понимать, что размер off-heap памяти не может быть меньше максимального размера сообщения, когда-либо проходившего через узел.
Конечно, полностью исключить OOM direct memory таким решением всё равно не получится, но теперь off-heap память можно прогнозировать.