Чтение табличных файлов со Spark
Рассмотрим, какие в pyspark имеются основные опции для чтения csv файлов:
- sep - разделитель колонок (по умолчанию ",")
- header - находятся ли в первой строке заголовки
- inferSchema - выводить ли схему логически из данных
- nullValue - определяет строку с null (по умолчанию пустая)
- emptyValue - определяет строку с незаполненным значением (по умолчанию пустая)
- nanValue - определяет строку с нечисловым значением (по умолчанию NaN)
- positiveInf - определяет строку, представляющую положительную бесконечность (по умолчанию Inf)
- negativeInf - определяет строку, представляющую отрицательную бесконечность (по умолчанию -Inf)
- quote - разделитель строки (по умолчанию двойные кавычки)
- schema - схема данных (рассказывал ранее)
- samplingRatio - доля строк для вывода схемы (по умолчанию 1)
- lineSep - определяет разделитель строк (по умолчанию символы \r, \n, \r\n)
- encoding - кодировка (utf-8)
- locale - определяет локаль, которая используется при парсинге даты и времени (en-US)
- mode - режим обработки поврежденных записей:
- PERMISSIVE - поврежденную запись переносит в колонку, конфигурируемую полем columnNameOfCorruptRecord (см. параметр ниже). Колонка из columnNameOfCorruptRecord должна присутствовать в схеме с типом строки, иначе она добавлена не будет.
- DROPMALFORMED - игнорирует все поврежденные строки
- FAILFAST — выбрасывает исключение при столкновении с поврежденной записью.
- columnNameOfCorruptRecord - задает имя колонки для хранения поврежденной записи (по умолчанию _corrupt_record, задается spark.sql.columnNameOfCorruptRecord)
- multiLine - возможность размещения записи на нескольких строках
sep, quote, header, multiline
Возможности данных опций можно продемонстрировать на примере чтения следующего файла:
!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;4;"text3"\nid4;3;$text4#39; > 'read.txt' !cat 'read.txt'
Как видим, намеренно в файле присутствует новый разделитель строки ($), имеется запись с переносом на следующую строку (text\n2), в качестве разделителя полей указана не запятая (";"):
sdf = spark.read.csv('read.txt', sep=';', multiLine=True, quote='#39;, header=True) sdf.collect()
inferschema
Если не указывать схему (подробнее читай здесь), то типы задаются как строки:
sdf.dtypes
Зададим inferSchema=True для автоматического определения типов:
sdf = spark.read.csv('read.txt', sep=';', multiLine=True, quote='#39;, inferSchema=True, header=True) sdf.dtypes
corrupt record
В нашем наборе имеется ошибочная запись, для визуализации которой явно укажем схему:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType schema = StructType([ StructField("col1", StringType(), True), StructField("col2", IntegerType(), True), StructField("col3", StringType(), True), StructField("mistakes", StringType(), True) ]) spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, mode='PERMISSIVE', schema=schema, columnNameOfCorruptRecord='mistakes').collect()
nan
Зададим строку, обозначающюю нечисловое значение:
!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;na;$text3$\nid4;3;$text4#39; > 'read.txt' !cat 'read.txt'
schema = StructType([ StructField("col1", StringType(), True), StructField("col2", DoubleType(), True), StructField("col3", StringType(), True), ]) sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, schema=schema, nanValue='na') sdf.collect()
Можем убедиться, что Spark определяет nan значение:
import pyspark.sql.functions as F sdf.select([F.sum(F.when(F.isnan(c),1).otherwise(0)).alias(c) for c in sdf.columns])
null
То же проделаем для незаполненного значения:
!printf 'col1;col2;col3\r\nid1;1;$text\n2$\nid3;4;$text3$\nid4;3;$text4#39; > 'read.txt' !cat 'read.txt'
sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, schema=schema, nullValue='text3') sdf.collect()
sdf.select([F.sum(F.when(F.isnull(c),1).otherwise(0)).alias(c) for c in sdf.columns])
infinite
Вот пример задания строки с бесконечностью:
!printf 'col1;col2;col3\r\nid1;$Inf$;$text\n2$\nid3;$negInf$;$no$\nid4;3;$text4#39; > 'read.txt' !cat 'read.txt'
sdf = spark.read.csv('read.txt', sep=';', header=True, multiLine=True, quote='#39;, schema=schema, negativeInf='negInf', nullValue='no') sdf.collect()
Определились обе строки с бесконечностями, при этом положительная задана значением по умолчанию, а отрицательную мы переопределили.