旅行好きなソフトエンジニアの備忘録

プログラミングや技術関連のメモを始めました

【Python】Siamese NetworkでMNISTの少量データ学習を試す

ディープラーニングは一般的に多くの学習データが必要とされますが、少量しかない場合にどれ位精度が落ちるのか気になり 実験してみようと思いました。そうは言っても普通にCNNに少量のデータを学習させても簡単に過学習しそうに思えるので、 少量のデータでも学習できそうなモデルとしてSiamese Networkを使ってみます。

Siamese Networkとは www.youtube.com

KerasでのSiamese Networkの実装例 github.com

MNISTデータは以下で取得できますが、(X_train, y_train)から各クラス10サンプルをピックアップして それを学習データとし、各クラス100サンプルをピックアップしてそれを検証データとします。 (X_test, y_test)はそのまま精度評価に利用します。

from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

プロジェクトのフォルダ構成は以下のようになっています。
train(学習データを格納するフォルダ)
 |-0(ここに手書き0が10枚)
 |-1(ここに手書き1が10枚)
 |-xxx
 |
 |-9(ここに手書き9が10枚)
val(検証データを格納するフォルダ)
 |-0(ここに手書き0が100枚)
 |-1(ここに手書き1が100枚)
 |-xxx
 |
 |-9(ここに手書き9が100枚)
test(テストデータを格納するフォルダ)
 |-0
 |-1
 |-xxx
 |
 |-9
siamese_data_loader.py(データローダー)
siamese_network.py(モデル)
test_siamese.py(精度検証)
train_siamese.py(学習)

siamese_data_loader.pyは以下になります。get_train_dataを呼び出すとSiamese Networkが必要とする 画像のペア(positiveとnegative)を受け取れるようにしています。

import os, random, cv2
import numpy as np

class SiameseDataLoader(object):
    def __init__(self, root_train_folder_path, samples_per_class, grayscale=False):
        self._root_train_folder_path = root_train_folder_path
        self._samples_per_class = samples_per_class
        self._sample_file_names = self._get_samples()
        self._grayscale = grayscale
        if self._grayscale:
            image = cv2.imread(self._sample_file_names[0][0], cv2.IMREAD_GRAYSCALE)
            self.input_shape = (image.shape[0], image.shape[1], 1)
        else:
            image = cv2.imread(self._sample_file_names[0][0])
            self.input_shape = image.shape

    def get_train_data(self):
        # positiveとnegativeの画像ペアファイルパスを受け取る
        pairs, labels = self._create_pairs(self._sample_file_names, self._samples_per_class)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
        Y = np.zeros((len(pairs), 1), dtype=np.float32)
        i = 0
        if self._grayscale:
            for pair, label in zip(pairs, labels):
                x1 = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)
                X1[i] = x1[:,:,np.newaxis]
                x2 = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)
                X2[i] = x2[:,:,np.newaxis]
                Y[i] = labels[i]
                i += 1
        else:
            for pair, label in zip(pairs, labels):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
                Y[i] = labels[i]
                i += 1
        return [self._normalize(X1), self._normalize(X2)], Y

    def _get_samples(self):
        sample_file_names = []
        folders = os.listdir(self._root_train_folder_path)
        for folder_name in folders:
            folder_path = self._root_train_folder_path + folder_name
            if os.path.isdir(folder_path):
                files = os.listdir(folder_path)
                sample_file_names_per_class = []
                for file in files:
                    sample_file_names_per_class.append(folder_path + os.sep + file)
                sample_file_names.append(sample_file_names_per_class)
        return sample_file_names

    def _create_pairs(self, sample_file_names, samples_per_class):
        positive_pairs, positive_labels = self._create_positive_pairs(sample_file_names, samples_per_class)
        negative_pairs, negative_labels = self._create_negative_pairs(sample_file_names, samples_per_class)
        positive_pairs.extend(negative_pairs)
        positive_labels.extend(negative_labels)

        return positive_pairs, positive_labels

    # 手書き数字の0と0等同じクラスのペアを作成するためのメソッド
    def _create_positive_pairs(self, sample_file_names, samples_per_class):
        positive_pairs = []
        for sample_file_names_per_class in sample_file_names:
            for k in range(samples_per_class):
                positive_pairs.append(random.sample(sample_file_names_per_class, 2))
        labels = [1]*len(positive_pairs)
        return positive_pairs, labels

    # 手書き数字の2と3等異なるクラスのペアを作成するためのメソッド
    def _create_negative_pairs(self, sample_file_names, samples_per_class):
        negative_pairs = []
        class_count = len(sample_file_names)
        for i, sample_file_names_per_class in enumerate(sample_file_names):
            class_ids = list(range(class_count))
            class_ids.remove(i)
            for k in range(samples_per_class):
                pair = []
                pair.append(random.choice(sample_file_names[i]))
                pair.append(random.choice(sample_file_names[random.choice(class_ids)]))
                negative_pairs.append(pair)
        labels = [0]*len(negative_pairs)
        return negative_pairs, labels

    def _normalize(self, X):
        return X/255

    def get_test_data(self, test_image_path, samples_per_class):
        pairs = []
        for sample_file_names_per_class in self._sample_file_names:
            selected_files = random.sample(sample_file_names_per_class, samples_per_class)
            for selected_file in selected_files:
                pair = []
                pair.append(test_image_path)
                pair.append(selected_file)
                pairs.append(pair)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
                X2[i] = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
        return [self._normalize(X1), self._normalize(X2)]


siamese_network.pyは以下になります。トップの方に貼ったリンク先はDenseのみで構成されていますが、 ここではConvolutionalなSiamese Networkにしました。

from keras.models import Sequential, Model
from keras.layers import Dense, Input, Lambda, Conv2D, Activation, MaxPool2D, BatchNormalization, Dropout, Flatten
import keras.backend as K

class SiameseNet(object):
    def __init__(self, input_shape, feature_dim):
        seq = Sequential()
        seq.add(Conv2D(16, 3, padding='same', input_shape=input_shape))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(32, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(64, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Flatten())
        seq.add(Dense(256, activation='sigmoid'))
        seq.add(Dropout(0.2))
        seq.add(Dense(feature_dim, activation='linear'))

        input_a = Input(shape=input_shape)
        input_b = Input(shape=input_shape)
        processed_a = seq(input_a)
        processed_b = seq(input_b)
        distance = Lambda(self._euclidean_distance, output_shape=self._eucl_dist_output_shape)([processed_a, processed_b])
        self._model = Model(inputs=[input_a, input_b], outputs=distance)

    def _euclidean_distance(self, vects):
        x, y = vects
        distance = K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))
        return distance

    def _eucl_dist_output_shape(self, shapes):
        shape1, shape2 = shapes
        return (shape1[0], 1)

    def get_model(self):
        return self._model

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true*K.square(y_pred) + (1 - y_true)*K.square(K.maximum(margin - y_pred, 0)))


続いてtrain_siamese.pyは以下になります。オプティマイザに何故RMSPropを使っているかですが、 単純にAdamだと収束しなかったためです(いくつか初期学習係数を変更したのですが、 RMSPropを使った方が良かったです)。

from siamese_net import SiameseNet
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

if __name__ == '__main__':
    iterations = 10000
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    # Adam not works well for Siamese net
    optim = RMSprop(decay=1e-4)
    #optim = Adam(lr=0.0001, decay=1e-4, amsgrad=True)

    loader_train = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)
    loader_val = SiameseDataLoader('val' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader_train.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    min_loss = 9999
    min_iter = -1
    for iteration in range(iterations):
        X, y = loader_train.get_train_data()
        loss_train = siamese.train_on_batch(X, y)
        if (iteration+1)%100 == 0:
            X, y = loader_val.get_train_data()
            loss_val = siamese.evaluate(X, y, verbose=0)
            if loss_val < min_loss:
                min_iter = iteration
                min_loss = loss_val
                siamese.save_weights('weights.h5', True)
            print('loss@' + str(iteration) + ' = ' + str(loss_train) + ',' + str(loss_val) + ' (' + str(min_loss) + '@' + str(min_iter) + ')')


最後にtest_siamese.pyです。この結果の正答率が86.2%となりました。 一方で試しにSiamese Networkと似たような単純な構成のCNNでも同様のデータセットで学習させたところ 正答率が87.61%となり、Siamese Networkよりも良い成績となりました。 ちなみにデータ拡張(拡大/回転/シフト)を学習に加えたところ、Siamese Networkの正答率が95.5%、 CNNの正答率が94.05%となりました。 少量のデータでもMNISTであればそこそこ精度が出ているので、データセットによってはデータが少量でも ディープラーニングを適用できるかもしれません(少量しかデータが無いのにディープラーニングを適用 することが良いかは別問題として)。

from siamese_net import SiameseNet, contrastive_loss
import numpy as np
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

# Siamese Networkはペア画像との距離を返してくるので、
# 一番近い距離の画像が所属しているクラスを予測クラスとする
def distance_to_class(y, classes, samples_per_class):
    i = 0
    class_distances = []
    for c in range(classes):
        distances = []
        for s in range(samples_per_class):
            distances.append(y[i])
            i += 1
        median = np.median(np.array(distances))
        class_distances.append(median)
    return np.argmin(np.array(class_distances))

if __name__ == '__main__':
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    optim = RMSprop(decay=1e-4)
    test_root_path = 'test' + os.sep
    classes = 10

    loader = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    siamese.load_weights('weights.h5')

    correct = 0
    count = 0
    for c in range(classes):
        test_class_folder_path = test_root_path + str(c) + os.sep
        test_file_names = os.listdir(test_class_folder_path)
        distances = []
        for test_file_name in test_file_names:
            test_file_path = test_class_folder_path + test_file_name
            X = loader.get_test_data(test_file_path, samples_per_class)
            y = siamese.predict_on_batch(X)
            predicted_class = distance_to_class(y, classes, samples_per_class)
            if predicted_class == c:
                correct += 1
            count += 1
    accuracy = correct/count*100
    print('accuracy=' + str(accuracy))