旅行好きなソフトエンジニアの備忘録

プログラミングや技術関連のメモを始めました

【Python】Siamese NetworkでMNISTの少量データ学習を試す

ディープラーニングは一般的に多くの学習データが必要とされますが、少量しかない場合にどれ位精度が落ちるのか気になり 実験してみようと思いました。そうは言っても普通にCNNに少量のデータを学習させても簡単に過学習しそうに思えるので、 少量のデータでも学習できそうなモデルとしてSiamese Networkを使ってみます。

Siamese Networkとは www.youtube.com

KerasでのSiamese Networkの実装例 github.com

MNISTデータは以下で取得できますが、(X_train, y_train)から各クラス10サンプルをピックアップして それを学習データとし、各クラス100サンプルをピックアップしてそれを検証データとします。 (X_test, y_test)はそのまま精度評価に利用します。

from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

プロジェクトのフォルダ構成は以下のようになっています。
train(学習データを格納するフォルダ)
 |-0(ここに手書き0が10枚)
 |-1(ここに手書き1が10枚)
 |-xxx
 |
 |-9(ここに手書き9が10枚)
val(検証データを格納するフォルダ)
 |-0(ここに手書き0が100枚)
 |-1(ここに手書き1が100枚)
 |-xxx
 |
 |-9(ここに手書き9が100枚)
test(テストデータを格納するフォルダ)
 |-0
 |-1
 |-xxx
 |
 |-9
siamese_data_loader.py(データローダー)
siamese_network.py(モデル)
test_siamese.py(精度検証)
train_siamese.py(学習)

siamese_data_loader.pyは以下になります。get_train_dataを呼び出すとSiamese Networkが必要とする 画像のペア(positiveとnegative)を受け取れるようにしています。

import os, random, cv2
import numpy as np

class SiameseDataLoader(object):
    def __init__(self, root_train_folder_path, samples_per_class, grayscale=False):
        self._root_train_folder_path = root_train_folder_path
        self._samples_per_class = samples_per_class
        self._sample_file_names = self._get_samples()
        self._grayscale = grayscale
        if self._grayscale:
            image = cv2.imread(self._sample_file_names[0][0], cv2.IMREAD_GRAYSCALE)
            self.input_shape = (image.shape[0], image.shape[1], 1)
        else:
            image = cv2.imread(self._sample_file_names[0][0])
            self.input_shape = image.shape

    def get_train_data(self):
        # positiveとnegativeの画像ペアファイルパスを受け取る
        pairs, labels = self._create_pairs(self._sample_file_names, self._samples_per_class)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
        Y = np.zeros((len(pairs), 1), dtype=np.float32)
        i = 0
        if self._grayscale:
            for pair, label in zip(pairs, labels):
                x1 = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)
                X1[i] = x1[:,:,np.newaxis]
                x2 = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)
                X2[i] = x2[:,:,np.newaxis]
                Y[i] = labels[i]
                i += 1
        else:
            for pair, label in zip(pairs, labels):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
                Y[i] = labels[i]
                i += 1
        return [self._normalize(X1), self._normalize(X2)], Y

    def _get_samples(self):
        sample_file_names = []
        folders = os.listdir(self._root_train_folder_path)
        for folder_name in folders:
            folder_path = self._root_train_folder_path + folder_name
            if os.path.isdir(folder_path):
                files = os.listdir(folder_path)
                sample_file_names_per_class = []
                for file in files:
                    sample_file_names_per_class.append(folder_path + os.sep + file)
                sample_file_names.append(sample_file_names_per_class)
        return sample_file_names

    def _create_pairs(self, sample_file_names, samples_per_class):
        positive_pairs, positive_labels = self._create_positive_pairs(sample_file_names, samples_per_class)
        negative_pairs, negative_labels = self._create_negative_pairs(sample_file_names, samples_per_class)
        positive_pairs.extend(negative_pairs)
        positive_labels.extend(negative_labels)

        return positive_pairs, positive_labels

    # 手書き数字の0と0等同じクラスのペアを作成するためのメソッド
    def _create_positive_pairs(self, sample_file_names, samples_per_class):
        positive_pairs = []
        for sample_file_names_per_class in sample_file_names:
            for k in range(samples_per_class):
                positive_pairs.append(random.sample(sample_file_names_per_class, 2))
        labels = [1]*len(positive_pairs)
        return positive_pairs, labels

    # 手書き数字の2と3等異なるクラスのペアを作成するためのメソッド
    def _create_negative_pairs(self, sample_file_names, samples_per_class):
        negative_pairs = []
        class_count = len(sample_file_names)
        for i, sample_file_names_per_class in enumerate(sample_file_names):
            class_ids = list(range(class_count))
            class_ids.remove(i)
            for k in range(samples_per_class):
                pair = []
                pair.append(random.choice(sample_file_names[i]))
                pair.append(random.choice(sample_file_names[random.choice(class_ids)]))
                negative_pairs.append(pair)
        labels = [0]*len(negative_pairs)
        return negative_pairs, labels

    def _normalize(self, X):
        return X/255

    def get_test_data(self, test_image_path, samples_per_class):
        pairs = []
        for sample_file_names_per_class in self._sample_file_names:
            selected_files = random.sample(sample_file_names_per_class, samples_per_class)
            for selected_file in selected_files:
                pair = []
                pair.append(test_image_path)
                pair.append(selected_file)
                pairs.append(pair)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
                X2[i] = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
        return [self._normalize(X1), self._normalize(X2)]


siamese_network.pyは以下になります。トップの方に貼ったリンク先はDenseのみで構成されていますが、 ここではConvolutionalなSiamese Networkにしました。

from keras.models import Sequential, Model
from keras.layers import Dense, Input, Lambda, Conv2D, Activation, MaxPool2D, BatchNormalization, Dropout, Flatten
import keras.backend as K

class SiameseNet(object):
    def __init__(self, input_shape, feature_dim):
        seq = Sequential()
        seq.add(Conv2D(16, 3, padding='same', input_shape=input_shape))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(32, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(64, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Flatten())
        seq.add(Dense(256, activation='sigmoid'))
        seq.add(Dropout(0.2))
        seq.add(Dense(feature_dim, activation='linear'))

        input_a = Input(shape=input_shape)
        input_b = Input(shape=input_shape)
        processed_a = seq(input_a)
        processed_b = seq(input_b)
        distance = Lambda(self._euclidean_distance, output_shape=self._eucl_dist_output_shape)([processed_a, processed_b])
        self._model = Model(inputs=[input_a, input_b], outputs=distance)

    def _euclidean_distance(self, vects):
        x, y = vects
        distance = K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))
        return distance

    def _eucl_dist_output_shape(self, shapes):
        shape1, shape2 = shapes
        return (shape1[0], 1)

    def get_model(self):
        return self._model

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true*K.square(y_pred) + (1 - y_true)*K.square(K.maximum(margin - y_pred, 0)))


続いてtrain_siamese.pyは以下になります。オプティマイザに何故RMSPropを使っているかですが、 単純にAdamだと収束しなかったためです(いくつか初期学習係数を変更したのですが、 RMSPropを使った方が良かったです)。

from siamese_net import SiameseNet
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

if __name__ == '__main__':
    iterations = 10000
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    # Adam not works well for Siamese net
    optim = RMSprop(decay=1e-4)
    #optim = Adam(lr=0.0001, decay=1e-4, amsgrad=True)

    loader_train = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)
    loader_val = SiameseDataLoader('val' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader_train.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    min_loss = 9999
    min_iter = -1
    for iteration in range(iterations):
        X, y = loader_train.get_train_data()
        loss_train = siamese.train_on_batch(X, y)
        if (iteration+1)%100 == 0:
            X, y = loader_val.get_train_data()
            loss_val = siamese.evaluate(X, y, verbose=0)
            if loss_val < min_loss:
                min_iter = iteration
                min_loss = loss_val
                siamese.save_weights('weights.h5', True)
            print('loss@' + str(iteration) + ' = ' + str(loss_train) + ',' + str(loss_val) + ' (' + str(min_loss) + '@' + str(min_iter) + ')')


最後にtest_siamese.pyです。この結果の正答率が86.2%となりました。 一方で試しにSiamese Networkと似たような単純な構成のCNNでも同様のデータセットで学習させたところ 正答率が87.61%となり、Siamese Networkよりも良い成績となりました。 ちなみにデータ拡張(拡大/回転/シフト)を学習に加えたところ、Siamese Networkの正答率が95.5%、 CNNの正答率が94.05%となりました。 少量のデータでもMNISTであればそこそこ精度が出ているので、データセットによってはデータが少量でも ディープラーニングを適用できるかもしれません(少量しかデータが無いのにディープラーニングを適用 することが良いかは別問題として)。

from siamese_net import SiameseNet, contrastive_loss
import numpy as np
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

# Siamese Networkはペア画像との距離を返してくるので、
# 一番近い距離の画像が所属しているクラスを予測クラスとする
def distance_to_class(y, classes, samples_per_class):
    i = 0
    class_distances = []
    for c in range(classes):
        distances = []
        for s in range(samples_per_class):
            distances.append(y[i])
            i += 1
        median = np.median(np.array(distances))
        class_distances.append(median)
    return np.argmin(np.array(class_distances))

if __name__ == '__main__':
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    optim = RMSprop(decay=1e-4)
    test_root_path = 'test' + os.sep
    classes = 10

    loader = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    siamese.load_weights('weights.h5')

    correct = 0
    count = 0
    for c in range(classes):
        test_class_folder_path = test_root_path + str(c) + os.sep
        test_file_names = os.listdir(test_class_folder_path)
        distances = []
        for test_file_name in test_file_names:
            test_file_path = test_class_folder_path + test_file_name
            X = loader.get_test_data(test_file_path, samples_per_class)
            y = siamese.predict_on_batch(X)
            predicted_class = distance_to_class(y, classes, samples_per_class)
            if predicted_class == c:
                correct += 1
            count += 1
    accuracy = correct/count*100
    print('accuracy=' + str(accuracy))

【C#】DispatcherTimerを即起動させる

10秒毎に○○したいという時DispatcherTimerを使うわけですが、Startメソッドを呼んで 10秒経ってからイベントが開始されます。そうではなくて、Startメソッドを呼んだ時に イベント開始となってほしかったのですが、以下のやり方でOKです。

// Intervalを0に指定で即起動する
timer.Interval = new TimeSpan(0, 0, 0);
timer.Start();

void timer_Tick(object sender, EventArgs e)
{
    // ここで本来の間隔に指定しなおす
    ((Timer)sender).Interval = new TimeSpan(0, 0, 10);
}

stackoverflow.com

【Python】pillow-simdによる画像読み込みの高速化

ディープラーニングのモデルをネットから拾ってくると、画像読み込みにopencvが使われているケースやpillowが使われているケースがあります。自分は使い慣れている/速いという理由でopencvを使ってもらえると助かるのですが、pillowが使われているケースもあり、pillowはopencvと比較して画像読み込み時間が長く、学習に時間がかかって困る時があります。

pillow部分をopencvに書き直す手もありますが、それが面倒という場合もあり、pillow-simdというライブラリを使えばプログラムを書き換えることなく画像読み込みを高速化できると聞いたので試してみます。しかし、pillow-simdはWindowsをサポートしていないのか、インストールでエラーが出ましたが、issueに回避策を見つけた人がいるのでそれに倣いひとまずインストールして試します。

pip uninstall pillow
pip install --upgrade pillow-simd --global-option="build_ext" --global-option="--disable-jpeg" --global-option="--disable-zlib"

4000×3000の画像を読み込んでみたところ、むしろopencvのimreadよりpillow-simdの方が速そうです。

import cv2
from PIL import Image
import time

start = time.time()
image = Image.open('pic.bmp') # Case pillow-simd: time = 78msec
#image = cv2.imread('pic.bmp') # Case opencv: time = 93msec
end = time.time()
print(str(end - start))

【Python】画像データ拡張ライブラリAlbumentationsを使ってみる

PyTorch版のYOLO v3を作っている人がいたので試してみようと思っています。 github.com

ただ、Trainにデータ拡張が入っていないのでデータ拡張ロジックを追加したいと思ったところ、 Albumentationsというライブラリを見つけました。 github.com

物体検出やセグメンテーションにも利用可能そうなので早速試してみました。 使い方は以下を実施すれば良さげです。

  1. Composeを作って、Composeの中に実施したいデータ拡張を記述
  2. Composeに画像、ラベル、クラスIDを含むディクショナリを投入

以下はComposeを作るコードになります。

from albumentations import Compose
from albumentations.augmentations.transforms import Resize, HorizontalFlip, RandomSizedCrop, HueSaturationValue

def get_compose(crop_min_max, image_height, image_width, hue_shift, saturation_shift, value_shift):
    # Resize image to (image_height, image_width) with 100% probability
    # Flip LR with 50% probability
    # Crop image and resize image to (image_height, image_width) with 100% probability
    # Change HSV from -hue_shift to +hue_shift and so on with 100% probability
    # Format 'pascal_voc' means label is given like [x_min, y_min, x_max, y_max]
    return Compose([Resize(image_height, image_width, p=1.0),
                     HorizontalFlip(p=0.5),
                     RandomSizedCrop(crop_min_max, image_height, image_width, p=1.0),
                     HueSaturationValue(hue_shift, saturation_shift, value_shift, p=1.0)],
                    bbox_params={'format':'pascal_voc', 'label_fields':['category_id']})

このComposeは以下のように使います。

# Image size for YOLO
image_size = 416
# Crop 80 - 100% of image
crop_min = image_size*80//100
crop_max = image_size
crop_min_max = (crop_min, crop_max)
# HSV shift limits
hue_shift = 10
saturation_shift = 10
value_shift = 10
# Get compose
compose = get_compose(crop_min_max, image_size, image_size, hue_shift, saturation_shift, value_shift)
# image: numpy array like return value of cv2.imread
# labels: bounding box lists like [[366.7, 80.84, 132.8, 181.84], [5.66, 138.95, 147.09, 164.88]]
# classes: class of each bounding box like [0, 1]
annotation = {'image': image, 'bboxes': labels, 'category_id': classes}
# Do augmentation
augmented = compose(**annotation)
augmented_image = augmented['image']
augmented_labels = augmented['bboxes']

f:id:ni4muraano:20181120235803j:plain:w300 f:id:ni4muraano:20181121000404j:plain:w200

他にも様々な拡張が用意されているっぽい。API Referenceはここ

【C#】ToList()の挙動についての勘違い

ToList()が思ってた動作と違ったのでメモしておきます。

↓のようなクラスを作ります。

class MyClass
{
    public MyClass()
    {
        var random = new Random();
        Value = random.Next(100);
    }

    public int Value { get; set; }
}

上記クラスを使って以下のようなコードを書いてしまっていました。 ToList()は新しいリストを作成するので、中身もコピーされると勝手に勘違いしてました。

var myClasses = new List<MyClass>();
for (int i = 0; i < 1000; ++i)
{
    myClasses.Add(new MyClass());
}
// リストからある条件を満たすものを取り出し、コピーしたつもりになっていた
var selectedMyClasses = myClasses.Where(i => i.Value >= 50).ToList();
// 値を書き換えると
selectedMyClasses[0].Value = 10000;
// 元の中身が書き換わってるので、countは1と表示される
int count = myClasses.Count(i => i.Value == 10000);
Console.WriteLine(count);

中身もコピーしたいのであれば、MyClassにコピー用メソッドを加えて

class MyClass : ICloneable
{
    public MyClass()
    {
        var random = new Random();
        Value = random.Next(100);
    }

    public int Value { get; set; }

    public object Clone()
    {
        return MemberwiseClone();
    }
}

↓こんな感じに書くのでしょうか。。。

var myClasses = new List<MyClass>();
for (int i = 0; i < 1000; ++i)
{
    myClasses.Add(new MyClass());
}
var selectedMyClasses = myClasses.Where(i => i.Value >= 50).Select(i => (MyClass)i.Clone()).ToList();
selectedMyClasses[0].Value = 10000;
int count = myClasses.Count(i => i.Value == 10000);
Console.WriteLine(count);

当たり前という突っ込みを受けそうですが、他に同じ質問している人いたので若干ほっとしました。

stackoverflow.com

【C#】System.IO.Compression.ZipFileクラスのCreateFromDirectoryメソッドはデフォルト設定だと日本語フォルダ名に非対応

アプリケーションでエラーが起きたら必要なログを全てフォルダに集めて、そのフォルダをzip化するということを やろうとしたら、フォルダ名に日本語が入っていると文字化けすることが分かりました。 解決策は以下に書かれていて、encodingにEncoding.GetEncoding("sjis")を指定すれば大丈夫でした。 System.IO.Compression.ZipFileクラスにて、日本語のディレクトリ名やファイル名を文字化けさせずに圧縮処理 · GitHub

【Visual Studio】NuGetを実行したら「'xxxxx' にはすでに 'NETStandard.Library' に対して定義された依存関係があります。」というエラーが出るときの対処法

下記記事にあるようにNuGetをアップデートするのが正解。 というか何故こんな意味不明なエラーメッセージ。。。 qiita.com