spark
January 4, 2023

Чтение табличных файлов со Spark

Рассмотрим, какие в pyspark имеются основные опции для чтения csv файлов:

  • sep - разделитель колонок (по умолчанию ",")
  • header - находятся ли в первой строке заголовки
  • inferSchema - выводить ли схему логически из данных
  • nullValue - определяет строку с null (по умолчанию пустая)
  • emptyValue - определяет строку с незаполненным значением (по умолчанию пустая)
  • nanValue - определяет строку с нечисловым значением (по умолчанию NaN)
  • positiveInf - определяет строку, представляющую положительную бесконечность (по умолчанию Inf)
  • negativeInf - определяет строку, представляющую отрицательную бесконечность (по умолчанию -Inf)
  • quote - разделитель строки (по умолчанию двойные кавычки)
  • schema - схема данных (рассказывал ранее)
  • samplingRatio - доля строк для вывода схемы (по умолчанию 1)
  • lineSep - определяет разделитель строк (по умолчанию символы \r, \n, \r\n)
  • encoding - кодировка (utf-8)
  • locale - определяет локаль, которая используется при парсинге даты и времени (en-US)
  • mode - режим обработки поврежденных записей:
  1. PERMISSIVE - поврежденную запись переносит в колонку, конфигурируемую полем columnNameOfCorruptRecord (см. параметр ниже). Колонка из columnNameOfCorruptRecord должна присутствовать в схеме с типом строки, иначе она добавлена не будет.
  2. DROPMALFORMED - игнорирует все поврежденные строки
  3. FAILFAST — выбрасывает исключение при столкновении с поврежденной записью.
  • columnNameOfCorruptRecord - задает имя колонки для хранения поврежденной записи (по умолчанию _corrupt_record, задается spark.sql.columnNameOfCorruptRecord)
  • multiLine - возможность размещения записи на нескольких строках

sep, quote, header, multiline

Возможности данных опций можно продемонстрировать на примере чтения следующего файла:

!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;4;"text3"\nid4;3;$text4#39; > 'read.txt'
!cat 'read.txt'

Как видим, намеренно в файле присутствует новый разделитель строки ($), имеется запись с переносом на следующую строку (text\n2), в качестве разделителя полей указана не запятая (";"):

sdf = spark.read.csv('read.txt', sep=';', multiLine=True, quote='#39;, header=True)
sdf.collect()

inferschema

Если не указывать схему (подробнее читай здесь), то типы задаются как строки:

sdf.dtypes

Зададим inferSchema=True для автоматического определения типов:

sdf = spark.read.csv('read.txt', sep=';', multiLine=True, quote='#39;, inferSchema=True, header=True)
sdf.dtypes

corrupt record

В нашем наборе имеется ошибочная запись, для визуализации которой явно укажем схему:

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", IntegerType(), True),
    StructField("col3", StringType(), True),
    StructField("mistakes", StringType(), True)
])

spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, 
                     mode='PERMISSIVE', schema=schema, columnNameOfCorruptRecord='mistakes').collect()

nan

Зададим строку, обозначающюю нечисловое значение:

!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;na;$text3$\nid4;3;$text4#39; > 'read.txt'
!cat 'read.txt'
schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", DoubleType(), True),
    StructField("col3", StringType(), True),
    
])
sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, 
                      schema=schema, nanValue='na')

sdf.collect()

Можем убедиться, что Spark определяет nan значение:

import pyspark.sql.functions as F

sdf.select([F.sum(F.when(F.isnan(c),1).otherwise(0)).alias(c) for c in sdf.columns])

null

То же проделаем для незаполненного значения:

!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;4;$text3$\nid4;3;$text4#39; > 'read.txt'
!cat 'read.txt'
sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, 
                     schema=schema, nullValue='text3')
sdf.collect()
sdf.select([F.sum(F.when(F.isnull(c),1).otherwise(0)).alias(c) for c in sdf.columns])

infinite

Вот пример задания строки с бесконечностью:

!printf 'col1;col2;col3\r\nid1;$Inf$;$text\n2$\nid3;$negInf$;$no$\nid4;3;$text4#39; > 'read.txt'
!cat 'read.txt'
sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, 
                     schema=schema, negativeInf='negInf', nullValue='no')
sdf.collect()

Определились обе строки с бесконечностями, при этом положительная задана значением по умолчанию, а отрицательную мы переопределили.