Вейвлет – анализ. Часть 2
https://habr.com/ru/articles/452474/
17KPython*Математика*Разработка под Windows*Научно-популярноеФизикаТуториал
Введение
В данной публикации рассматривается вейвлет – анализ временных рядов. Основная идея вейвлет-преобразования отвечает специфике многих временных рядов, демонстрирующих эволюцию во времени своих основных характеристик – среднего значения, дисперсии, периодов, амплитуд и фаз гармонических компонент. Подавляющее большинство процессов, изучаемых в различных областях знаний, имеют вышеперечисленные особенности.
Целью настоящей публикации является описание методики непрерывного вейвлет- преобразования временных рядов средствами библиотеки PyWavelets..
Инженер-геофизик Д. Морле в конце 70-х годов XX в. столкнулся с проблемой анализа сигналов от сейсмодатчиков, которые содержали высокочастотную компоненту (сейсмическая активность) в течение короткого промежутка времени и низкочастотные составляющие (спокойное состояние земной коры) – в течение длительного периода. Оконное преобразование Фурье позволяет анализировать либо высокочастотную составляющую, либо низкочастотную составляющую, но не обе составляющие сразу.
Поэтому, был предложен метод анализа, в котором ширина оконной функции для низких частот увеличивалась, а для высоких частот – уменьшалась. Новое оконное преобразование получалось в результате растяжения (сжатия) и смещения по времени одной порождающей (так называемой скейлинг-функции – scaling function, scalet) функции. Эта порождающая функция была названа вейвлетом Д. Морле.
from pylab import* import scaleogram as scg axes = scg.plot_wav('cmor1-1.5', figsize=(14,3)) show()
Непрерывное вейвлет-преобразование (CWT)
Одноуровневое вейвлет – преобразование:
coefs, frequencies =pywt.cwt(data, scales, wavelet)
data : (array_like) -Входной сигнал;
scales (array_like):- Масштаб вейвлета для использования. Можно использовать соотношение f =scale2frequency(scale, wavelet)/sampling_period и определить какая физическая частота f. Здесь, f в hertz когда sampling_period в секундах;
wavelet (Wavelet object or name): — Вейвлет для использования;
sampling_period (float):- Период дискретизации для выходных частот (необязательный параметр). Значения, вычисленные для coefs, не зависят от выбора sampling_period (т. е. scales не масштабируется выборкой периода.);
coefs (array_like):- Непрерывное вейвлет — преобразование входного сигнала для заданных масштабов и вейвлет;
frequencies (array_like ):- Если единица периода выборки секунды и задана, то частоты выводятся в hertz. В противном случае предполагается период выборки, равный 1.
Примечание: Размер массивов коэффициентов зависит от длины входного массива и длин заданных масштабов.
Получим список доступных имен вейвлетов, совместимых с cwt:
>>> import pywt >>> wavlist = pywt.wavelist(kind='continuous') >>> wavlist
['cgau1', 'cgau2', 'cgau3', 'cgau4', 'cgau5', 'cgau6', 'cgau7', 'cgau8', 'cmor', 'fbsp', 'gaus1', 'gaus2', 'gaus3', 'gaus4', 'gaus5', 'gaus6', 'gaus7', 'gaus8', 'mexh', 'morl', 'shan']
Для вейвлет — анализа выбор материнского вейвлета является одной из ключевых задач. Поэтому, перед каждым выбором вейвлета просто необходимо пользоваться следующей программой, позволяющей определить динамические свойства вейвлета:
import numpy as np import pywt import matplotlib.pyplot as plt vav='cmor1.5-1.0' wav = pywt.ContinuousWavelet(vav) # вывести диапазон, в котором будет оцениваться вейвлет print("Непрерывный вейвлет будет оцениваться во всем диапазоне [{}, {}]".format( wav.lower_bound, wav.upper_bound)) width = wav.upper_bound - wav.lower_bound scales = [1, 2, 3, 4, 10, 15] max_len = int(np.max(scales)*width + 1) t = np.arange(max_len) fig, axes = plt.subplots(len(scales), 2, figsize=(12, 6)) for n, scale in enumerate(scales): # Следующий код адаптирован из внутренних частей cwt int_psi, x = pywt.integrate_wavelet(wav, precision=10) step = x[1] - x[0] j = np.floor( np.arange(scale * width + 1) / (scale * step)) if np.max(j) >= np.size(int_psi): j = np.delete(j, np.where((j >= np.size(int_psi)))[0]) j = j.astype(np.int) # normalize int_psi для более простого построения int_psi /= np.abs(int_psi).max() # дискретные выборки интегрированного вейвлета filt = int_psi[j][::-1] # CWT состоит из свертки фильтра с сигналом в этой шкале # Здесь мы строим это дискретное ядро свертки в каждом масштабе. nt = len(filt) t = np.linspace(-nt//2, nt//2, nt) axes[n, 0].plot(t, filt.real, t, filt.imag) axes[n, 0].set_xlim([-max_len//2, max_len//2]) axes[n, 0].set_ylim([-1, 1]) axes[n, 0].text(50, 0.35, 'scale = {}'.format(scale)) f = np.linspace(-np.pi, np.pi, max_len) filt_fft = np.fft.fftshift(np.fft.fft(filt, n=max_len)) filt_fft /= np.abs(filt_fft).max() axes[n, 1].plot(f, np.abs(filt_fft)**2) axes[n, 1].set_xlim([-np.pi, np.pi]) axes[n, 1].set_ylim([0, 1]) axes[n, 1].set_xticks([-np.pi, 0, np.pi]) axes[n, 1].set_xticklabels([r'$-\pi#39;, '0', r'$\pi#39;]) axes[n, 1].grid(True, axis='x') axes[n, 1].text(np.pi/2, 0.5, 'scale = {}'.format(scale)) axes[n, 0].set_xlabel('time (samples)') axes[n, 1].set_xlabel('frequency (radians)') axes[0, 0].legend(['real', 'imaginary'], loc='upper left') axes[0, 1].legend(['Power'], loc='upper left') axes[0, 0].set_title('filter: wavelet - %s'%vav) axes[0, 1].set_title(r'|FFT(filter)|$^2#39;) plt.show()
Далее будем рассматривать вейвлет-функции и их свойства с использованием приведенной программы:
Мексиканская шляпа вейвлет «mexh»:
Комплексный вейвлет Морле «cmorB-C»
где B — пропускная способность; C — центральная частота.
Здесь и далее (без специального указания) величины B,C задаются с плавающей точкой.
где: P — целое число от 1 до 8 вставляется в имя вейвлета, например «gaus8»; C- встроенная константа нормализации.
Комплексные гауссовы вейвлеты «cgauP»
где: P — целое число от 1 до 8 вставляется в имя вейвлета, например «сgaus8» и соответствуют порядку производной от вейвлет функции; C- встроенная константа нормализации.
где: B — ширина полосы; C — центральная частота.
CWT в PyWavelets применяется к дискретным данным — свертки с образцами интеграла вейвлета. Если scale слишком мало, то мы получаем дискретный фильтр с неадекватной выборкой, приводящий к сглаживанию, как показано на графике для вейвлета 'cmor1.5-1.0'.
В левой колонке на графиках показаны дискретные фильтры, используемые в свертке при различных шкалах. Правый столбец — соответствующие спектры мощности Фурье каждого фильтра. При шкалах 1 и 2 для графика 'cmor1.5-1.0' видно, что сглаживание происходит из-за нарушения ограничение Найквиста. Указанное явление может возникнуть и у других вейвлетов при выборе С и B, поэтому при работе с вейвлетами необходимо пользоваться программой – листинг 1.
Чтобы связать заданный масштаб с определенной частотой сигнала, период дискретизации сигнала должен быть известен. При помощи функции pywt.scale2frequency() можно преобразовать список масштабов в соответствующие частоты. Правильный выбор масштабов зависит от выбранного вейвлета, поэтому pywt.scale2frequency() следует использовать для получения представления о соответствующем частотном диапазоне сигнала.
>>> import pywt >>> dt = 0.01 # 100 Hz sampling >>> frequencies = pywt.scale2frequency('cmor1.5-1.0', [1, 2, 3, 4]) / dt >>> frequencies
array([100., 50., 33.33333333, 25. ])
Вейвлет — анализ временных рядов средствами CWT PyWavelets
Набор данных el-Nino представляет собой набор данных временных рядов, используемый для отслеживания El Nino и содержит ежеквартальные измерения температуры морской поверхности с 1871 по 1997 год. Чтобы понять силу масштабограммы, давайте визуализируем ее для набора данных el-Nino вместе с исходными данными временных рядов и его преобразованием Фурье.
Для вейвлет-анализа временных рядов необходимо выполнить следующие действия по пунктам:
1. Выбрать материнский вейвлет: Выбираем комплексный вейвлет Морле «cmorB-C»:
Пропускную способность – B и центральную частоту – С которого будем определять на следующем этапе.
2. Определить центральную частоту, приняв dt=0.25 для нашего временного ряда:
import pywt dt = 0.25 # 4 Hz sampling scale=range(1,4) frequencies = pywt.scale2frequency('cmor1.0-0.5', scale) / dt print(frequencies)
3. Находим преобразование Фурье материнского вейвлета cmor1.0-0.5 (используем листинг 1):
Непрерывный вейвлет будет оцениваться во всем диапазоне [-8.0, 8.0]
4. Выбираем данные для временного ряда:
pd.read_csv("http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat", sep = "|")
Данные — это ежеквартальные измерения температуры морской поверхности с 1871 по 1997 год. Для этих данных: t0=1871 dt=0.25
5. Строим временной ряд с сигналом и его скользящее среднее на одном графике:
from numpy import * from scipy import * import pandas as pd from pylab import * import pywt def get_ave_values(xvalues, yvalues, n = 5): signal_length = len(xvalues) if signal_length % n == 0: padding_length = 0 else: padding_length = n - signal_length//n % n xarr = array(xvalues) yarr = array(yvalues) xarr.resize(signal_length//n, n) yarr.resize(signal_length//n, n) xarr_reshaped = xarr.reshape((-1,n)) yarr_reshaped = yarr.reshape((-1,n)) x_ave = xarr_reshaped[:,0] y_ave = nanmean(yarr_reshaped, axis=1) return x_ave, y_ave def plot_signal_plus_average(time, signal, average_over = 5): fig, ax = subplots(figsize=(15, 3)) time_ave, signal_ave = get_ave_values(time, signal, average_over) ax.plot(time, signal, label='Сигнал') ax.plot(time_ave, signal_ave, label = 'Скользящее среднее сигнала (n={})'.format(5)) ax.set_xlim([time[0], time[-1]]) ax.set_ylabel('Амплитуда сигнала', fontsize=18) ax.set_title('Сигнал + Скользящее среднее сигнала', fontsize=18) ax.set_xlabel('Время', fontsize=18) ax.legend() show() df_nino = pd.read_csv("http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat", sep = "|") N = df_nino.shape[0] t0=1871 dt=0.25 time = arange(0, N) * dt + t0 signal = df_nino.values.squeeze() scales = arange(1, 128) plot_signal_plus_average(time, signal)
6. Проводим преобразование Фурье и модности спектра от временного ряда:
from numpy import * from scipy import * import pandas as pd from pylab import * import pywt def get_ave_values(xvalues, yvalues, n = 5): signal_length = len(xvalues) if signal_length % n == 0: padding_length = 0 else: padding_length = n - signal_length//n % n xarr = array(xvalues) yarr = array(yvalues) xarr.resize(signal_length//n, n) yarr.resize(signal_length//n, n) xarr_reshaped = xarr.reshape((-1,n)) yarr_reshaped = yarr.reshape((-1,n)) x_ave = xarr_reshaped[:,0] y_ave = nanmean(yarr_reshaped, axis=1) return x_ave, y_ave def get_fft_values(y_values, T, N, f_s): f_values = linspace(0.0, 1.0/(2.0*T), N//2) fft_values_ = fft(y_values) fft_values = 2.0/N * abs(fft_values_[0:N//2]) return f_values, fft_values def plot_fft_plus_power(time, signal): dt = time[1] - time[0] N = len(signal) fs = 1/dt fig, ax = subplots(figsize=(15, 3)) variance = std(signal)**2 f_values, fft_values = get_fft_values(signal, dt, N, fs) fft_power = variance * abs(fft_values) ** 2 # FFT power spectrum ax.plot(f_values, fft_values, 'r-', label='FFT преобразование') ax.plot(f_values, fft_power, 'k--', linewidth=1, label='Спектр мощности') ax.set_xlabel('Частота[Герц / год]', fontsize=18) ax.set_ylabel('Амплитуда', fontsize=18) ax.legend() show() df_nino = pd.read_csv("http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat", sep = "|") N = df_nino.shape[0] t0=1871 dt=0.25 time = arange(0, N) * dt + t0 signal = df_nino.values.squeeze() scales = arange(1, 128) plot_fft_plus_power(time, signal)
7. Определяем масштабы: scales = arange(1, 128); levels = [2**-4, 2**-3, 2**-2, 2**-1, 2**0, 2**1, 2**2, 2**3].
from numpy import * import pandas as pd from pylab import * import pywt def plot_wavelet(time, signal, scales, waveletname = 'cmor1.0-0.4', cmap = plt.cm.seismic, title = 'Вейвлет-преобразование(Спектр мощности) сигнала', ylabel = 'Период (год)', xlabel = 'Время'): dt = time[1] - time[0] [coefficients, frequencies] = pywt.cwt(signal, scales, waveletname, dt) power = (abs(coefficients)) ** 2 period = 1. / frequencies levels = [2**-4 , 2**-3 , 2**-2 , 2**-1 , 2**0 , 2**1 , 2**2 , 2**3] contourlevels = log2(levels) fig, ax = subplots(figsize=(15, 10)) im = ax.contourf(time, log2(period), log2(power), contourlevels, extend='both',cmap=cmap) ax.set_title(title, fontsize=20) ax.set_ylabel(ylabel, fontsize=18) ax.set_xlabel(xlabel, fontsize=18) yticks = 2**arange(np.ceil(log2(period.min())), ceil(log2(period.max()))) ax.set_yticks(log2(yticks)) ax.set_yticklabels(yticks) ax.invert_yaxis() ylim = ax.get_ylim() ax.set_ylim(ylim[0], -1) cbar_ax = fig.add_axes([0.95, 0.5, 0.03, 0.25]) fig.colorbar(im, cax=cbar_ax, orientation="vertical") show() df_nino = pd.read_csv("http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat", sep = "|") N = df_nino.shape[0] t0=1871 dt=0.25 time = arange(0, N) * dt + t0 signal = df_nino.values.squeeze() scales = arange(1, 128) plot_wavelet(time, signal, scales)
На масштабограмме видно, что большая часть мощности спектра сконцентрирована за период 2-8 лет, это соответствует частоте 0,125 – 0,5 Гц. В спектре Фурье модность так же концентрируется вокруг этих значений частоты. Однако вейвлет — преобразование также дает нам временную информацию, а преобразование Фурье — нет.
Например, на масштабной диаграмме мы видим, что до 1920 года было много колебаний, в то время как между 1960 и 1990 годами их было не так много. Мы также можем видеть, что с течением времени происходит сдвиг от более коротких периодов к более длинным.
Использование модуля scaleogram
Scaleogram-удобный инструмент для анализа 1D данных с непрерывным вейвлет — преобразованием построен на базе библиотеки PyWavelets. Модуль призван обеспечить надежный инструмент для быстрого анализа данных.
Scaleogram имеет следующие особенности:
- простая подпись вызова для начинающих
- читаемые оси и чистая интеграция matplotlib
- много вариантов для изменения масштаба, фильтра спектра, внедрения colorbar, etc...
- поддержка единиц периодичности и частоты в соответствии с маркировкой
- скорость, использует алгоритм N * log(N) для преобразований
- портативность: испытано с python2.7 и python3.7
- подробные сообщения об ошибках и документация с примерами
Однако, в примерах использования не правильно записано обращение к данным:
nino3 = "http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat"
При этом появляется следующее предупреждение:
nino3 = pd.read_table(nino3)
что приводит к предупреждению:
Warning (from warnings module):
File «C:/Users/User/Desktop/wavelet/pr.py», line 7
nino3 = pd.read_table(nino3)
FutureWarning: read_table is deprecated, use read_csv instead, passing sep='\t'
Обращение к данным должно быть записано вот так:
url="http://paos.colorado.edu/research/wavelets/wave_idl/sst_nino3.dat" nino3 = pd.read_csv(url, sep = "|")
Чтобы показать использование скалограммы и сравнить результаты с результатами предыдущего примера, воспользуемся теми же данными — по ежеквартальным измерениям температуры морской поверхности с 1871 по 1997 год. Для этих данных: t0=1871 dt=0.25
import pandas as pd import pywt from numpy import* import scaleogram as scg from pylab import* url="sst_nino3.dat" nino3 = pd.read_csv(url, sep = "|") data = nino3.values.squeeze() N = data.size t0 = 1871; dt = 0.25 time = t0 + arange(len(data))*dt wavelet = 'cmor1-0.5' ax = scg.cws(time, data, scales=arange(1, 128), wavelet=wavelet, figsize=(14, 3), cmap="jet", cbar=None, ylabel='Период (год)', xlabel="Время [Год]", yscale="log", title='Вейвлет-преобразование временного ряда\n(спектр мощности)') ticks = ax.set_yticks([2,4,8, 16,32]) ticks = ax.set_yticklabels([2,4,8, 16,32]) show()
Если сравнивать масштабограмму с полученной сканограммой, то, за исключением цветовой палитры, они идентичны, а следовательно показывают одинаковую динамику временного ряда.
Листинг 5 проще листинга 3 и имеет большее быстродействие.
Когда частотный спектр и спектр мощности по Фурье не позволяют получить дополнительную информацию о динамике временного ряда, тогда использование скалограммы является предпочтительным.
Выводы
Приведен пример вейвлет-анализа временного ряда средствами библиотеки PyWavelets.Теги: