読者です 読者をやめる 読者になる 読者になる

旅行好きなソフトエンジニアの備忘録

プログラミングや技術関連のメモを始めました

【Python】 特異スペクトル解析法の実装

Python 信号処理

特異スペクトル解析法をPythonで実装したのでメモします。書籍「信号解析 馬杉著」からの引用ですが、

特異スペクトル解析法(singular spectrum analysis)は、観測信号からの主要な変動成分の分離・抽出、観測信号の変化点や不規則点の検出、観測信号からの雑音除去などを目的として、非線形信号解析分野において発展した解析手法の一つである。フーリエ級数展開やウェーブレット級数展開のように、特定の基底関数を適用したり、あるいは、ARモデルのような特定のモデルを仮定せずに、信号の構造変化そのものを解析するため、非定常信号の分離に向いている。

これをSingularSpectrumAnalysisクラスとしてpythonで実装します。

import numpy as np

class SingularSpectrumAnalysis(object):
    """特異スペクトル解析法を行うクラス"""
    
    def __init__(self, signal, window_size):
        """
        コンストラクタ
        @param signal 解析対象信号
        @param window_size 部分時系列区間数
        """
        self.__signal_length = len(signal)
        self.__window_size = window_size
        X = self.__create_trajectory_matrix(signal, window_size)
        self.__U, self.__W, self.__V = np.linalg.svd(X, False)

    def __create_trajectory_matrix(self, signal, window_size):
        """
        軌道行列を生成するメソッド
        @param signal 解析対象信号
        @param window_size 部分時系列区間数
        @return 軌道行列
        """
        row = len(signal) - window_size + 1
        col = window_size
        trajectory_matrix = np.empty((row, col))
        for i in range(row):
            trajectory_matrix[i, :] = signal[i:i + window_size]
        return trajectory_matrix

    def restore_signal(self, num_component):
        """
        信号を復元するメソッド
        @param num_component 軌道行列を近似する項数
        @return 復元された信号
        """
        # 軌道行列を復元する
        X = np.zeros((self.__U.shape[0], self.__window_size))
        for i in range(num_component):
            lambda_ = self.__W[i]
            U = (self.__U[:,i])[:, np.newaxis]
            V = (self.__V[i,:])[np.newaxis, :]
            X += lambda_*U*V
        
        # 軌道行列から信号を復元する
        signal = []
        for i in range(self.__signal_length):
            value = 0.0
            count = 0
            for j in range(X.shape[1]):
                row = i - j
                if (row < 0):
                    break
                elif (row >= X.shape[0]):
                    continue
                col = j
                value += X[row, col]
                count += 1
            value /= count
            signal.append(value)
        return np.array(signal)

軌道行列から信号を復元する部分に関して、numpyでfor文を使うのはコードの見た目やパフォーマンス上良くないようなのですが、綺麗に実装する方法が分かりませんでした。次にSingularSpectrumAnalysisクラスを利用する例を見てみます。解析する信号を下図に示します。
f:id:ni4muraano:20170131155032j:plain
この信号は下図に示す三種類の信号を混ぜた信号です。基本トレンドは直線で、直線にsin波とノイズを合わせました。f:id:ni4muraano:20170131155136j:plain
以下はこの信号をSingularSpectrumAnalysisクラスで解析する例です。

import numpy as np
np.random.seed(1) # 常に同じノイズを生成する
import matplotlib.pyplot as plt
from SingularSpectrumAnalysis import SingularSpectrumAnalysis

# 信号の生成
x = np.arange(100)
signal1 = x/10
signal2 = np.random.rand(len(x))/2
signal3 = np.sin(x/2)
signal = signal1 + signal2 + signal3
# 特異スペクトル解析の実施
window_size = 20
ssa = SingularSpectrumAnalysis(signal, window_size)
num_component = 1
restored_signal = ssa.restore_signal(num_component)

plt.hold(True)
plt.title("num_component=" + str(num_component))
plt.plot(signal)
plt.plot(restored_signal)
plt.show()

上記のnum_componentを1から4まで変えて結果をプロットします。
f:id:ni4muraano:20170131155650j:plain:w280f:id:ni4muraano:20170131155655j:plain:w280f:id:ni4muraano:20170131155700j:plain:w280f:id:ni4muraano:20170131155706j:plain:w280 num_component=1の時は基本トレンドの直線らしき信号が復元されています。num_componentの数字を上げるにしたがって元の信号に近くなっており、num_component=4で直線とsin波を合わせたような信号が復元されています。

信号解析-信号処理とデータ分析の基礎

信号解析-信号処理とデータ分析の基礎