旅行好きなソフトエンジニアの備忘録

プログラミングや技術関連のメモを始めました

【Keras】EfficientNetのファインチューニング例

EfficientNetはAutoMLで作成された、パラメータ数の少なさに対して精度が非常に高いモデルです。 OfficialのTensorflowの実装だけでなく、PyTorchやKerasの実装も早速公開されており、使い方を知っておきたく試してみました。

実施内容

EfficientNetをファインチューニングして犬・猫分類を実施してみる

EfficientNet利用手順

① 以下のKeras版実装を利用しました。準備は"pip install -U efficientnet"を実行するだけです。
注意点としては、Kerasのバージョンが2.2.0以上であることが指定されています。
始め2.1.5で試したところ、"DepthwiseConvolutionが無い"といったエラーが出ました。
github.com
② あとは実装ですが、まずプロジェクトフォルダが以下の構成になっていることが前提です。

  • train
    • cats(このフォルダに学習用猫画像が入っている)
    • dogs(このフォルダに学習用犬画像が入っている)
  • val
    • cats(このフォルダに検証用猫画像が入っている)
    • dogs(このフォルダに検証用犬画像が入っている)
  • adam_mult_lr.py(ファインチューニング用にカスタマイズされたAdamオプティマイザ)
  • data_loader.py(fit_generatorに使うgenerator生成クラス)
  • train.py(学習実行ファイル)


まずadam_mult_lr.pyですが、ここでは学習済みEfficientNetを使ってファインチューニングしようとしています。
EfficientNetの特徴抽出部のレイヤーの学習係数を小さくしたいのですが、Kerasだと簡単には出来なさそうです。 ただ幸いこのAdamを使えばレイヤー毎に学習係数を変更できるよ、というブログを見つけたのでそれをコピーしてます。

erikbrorson.github.io

次にdata_loader.pyですが、以前の記事に書いた雛形ほぼそのものになります。
注意点としては、Keras版EfficientNetは画像がRGBであることを期待しているっぽく、 opencvを使った場合はBGRをRGBに変換するロジック追加が必要です。

ni4muraano.hatenablog.com

最後にtrain.pyは以下になります。見るとわかるのですが、ソースはかなり単純です。

from efficientnet import EfficientNetB0
from keras.layers import GlobalAveragePooling2D, Dense, Dropout
from keras.models import Model
from data_loader import DataLoader
from adam_lr_mult import Adam_lr_mult

def prepare_new_model(input_shape, class_count):
    # 学習済みモデルの取り出し
    feature_extractor = EfficientNetB0(input_shape=input_shape, weights='imagenet', include_top=False)
    # 犬猫分類器を引っ付ける
    x = feature_extractor.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(rate=0.25)(x)
    x = Dense(class_count, activation='sigmoid')(x)
    # 新たなモデルの定義
    model = Model(inputs=feature_extractor.input, outputs=x)
    print(model.summary())
    return model

def get_adam_for_fine_tuning(lr, decay, multiplier, model):
    lr_multiplier = {}
    # 自分が引っ付けたレイヤーの学習係数は1、学習済みの部分は小さな値を設定する
    for layer in model.layers:
        if 'dense' in layer.name:
            lr_multiplier[layer.name] = 1.0
        else:
            lr_multiplier[layer.name] = multiplier
    return Adam_lr_mult(lr=lr, decay=decay, multipliers=lr_multiplier)

def train(epochs, batch_size, input_shape, class_count):
    # 学習用画像データローダー
    train_data_loader = DataLoader('train', batch_size, input_shape, do_augmentation=True)
    train_generator = train_data_loader.get_data_loader()
    # 検証用画像データローダー
    val_data_loader = DataLoader('val', batch_size, input_shape, do_augmentation=False)
    val_generator = val_data_loader.get_data_loader()
    # モデルの生成
    model = prepare_new_model(input_shape, class_count)
    # ファインチューニング用Adamオプティマイザ
    optimizer = get_adam_for_fine_tuning(lr=1e-3, decay=1e-5, multiplier=0.01, model=model)
    # コンパイルして
    model.compile(optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    # fit_generatorするだけ
    h = model.fit_generator(train_generator, train_data_loader.iterations, epochs,
                                  validation_data=val_generator, validation_steps=val_data_loader.iterations)
    # 学習データ、検証データのロスとAccをファイルに出力
    with open('loss.csv', 'a') as f:
        for loss_t, acc_t, loss_v, acc_v in zip(h.history['loss'], h.history['acc'], h.history['val_loss'], h.history['val_acc']):
            f.write(str(loss_t) + ',' + str(acc_t) + ',' + str(loss_v) + ',' + str(acc_v) + '\n')


if __name__ == '__main__':
    epochs = 5
    batch_size = 16
    input_shape = (224, 224, 3)
    class_count = 2
    train(epochs, batch_size, input_shape, class_count)


以下は5エポック学習した時のAccの推移ですが、こんな適当に作った物でもかなり良い結果が得られています。 f:id:ni4muraano:20190616083143p:plain

試しにResnet50でも同様の実験を行いました。論文上ではEfficientNetの方が僅かに精度が高いのですが、5エポック試した範囲ではResnet50の方が僅かに精度が高い結果となりました。 f:id:ni4muraano:20190616083256p:plain

【Keras】fit_generatorに使うgeneratorの雛形メモ

クラス分類用のfit_generatorに使うgeneratorの雛形をメモします。
画像が格納されているフォルダが以下のような構造であることを前提とします。

  • トップフォルダ
    • class1フォルダ
      • class1に属する画像ファイル
    • class2フォルダ
      • class2に属する画像ファイル
    • 以下同様


また、opencvやデータ拡張ライブラリのalbumentationsに依存しています。
ni4muraano.hatenablog.com

data_loader.py

import os, random, cv2
import numpy as np
from albumentations import Compose
from albumentations.augmentations.transforms import HorizontalFlip, Normalize

class DataLoader(object):
    def __init__(self, data_folder, batch_size, input_shape, do_augmentation, gray_scale=False):
        self._file_paths = []
        self._annotations = []
        folders = os.listdir(data_folder)
        folders.sort()
        # 画像のパスとクラスIDを取得する
        for class_id, class_name in enumerate(folders):
            folder_path = data_folder + os.sep + class_name
            file_paths = [folder_path + os.sep + fn for fn in os.listdir(folder_path + os.sep)]
            self._file_paths += file_paths
            self._annotations += [class_id]*len(file_paths)
        # クラス数
        self._class_count = class_id + 1
        self._batch_size = batch_size
        self._input_shape = input_shape
        self._gray_scale = gray_scale
        if len(self._file_paths)%self._batch_size == 0:
            self.iterations = len(self._file_paths) // self._batch_size
            self._has_extra_data = False
        else:
            self.iterations = len(self._file_paths) // self._batch_size + 1
            self._has_extra_data = True
        self._compose = self._define_augment(input_shape, do_augmentation)

    def _define_augment(self, input_shape, do_augmentation):
        # mean, std, max_pixel_valueは適宜変更してください
        mean = (0.485*255, 0.456*255, 0.406*255)
        std = (0.229*255, 0.224*255, 0.225*255)
        normalize = Normalize(mean=mean, std=std, max_pixel_value=1)
        # データ拡張内容は適宜変更してください
        if do_augmentation:
            return Compose([normalize, HorizontalFlip(p=0.5)])
        else:
            return Compose([normalize])

    def get_data_loader(self):
        while True:
            file_paths, annotations = self._shuffle(self._file_paths, self._annotations)
            for iteration in range(self.iterations):
                if iteration == self.iterations - 1 and self._has_extra_data:
                    shape = (len(file_paths)%self._batch_size, self._input_shape[0],
                             self._input_shape[1], self._input_shape[2])
                else:
                    shape = (self._batch_size, self._input_shape[0], self._input_shape[1], self._input_shape[2])
                # バッチサイズ分のデータを取得する
                X = np.zeros(shape, dtype=np.float32)
                y = np.zeros((shape[0], self._class_count), dtype=np.float32)
                for i in range(X.shape[0]):
                    index = self._batch_size*iteration + i
                    if self._gray_scale:
                        image = cv2.imread(file_paths[index], cv2.IMREAD_GRAYSCALE)
                        image = image[:,:,np.newaxis]
                    else:
                        image = cv2.imread(file_paths[index])
                    image = cv2.resize(image, (self._input_shape[1], self._input_shape[0]))
                    image = image.astype(np.float32)
                    # データ拡張を実行する
                    X[i] = self._augment(image)
                    y[i, annotations[index]] = 1
                yield X, y

    def _shuffle(self, x, y):
        p = list(zip(x, y))
        random.shuffle(p)
        return zip(*p)

    def _augment(self, image):
        dict = {'image': image}
        augmented = self._compose(**dict)
        return augmented['image']


このdata_loader.pyは以下のように使います。

from data_loader import DataLoader

train_data_loader = DataLoader('train', batch_size, input_shape, do_augmentation=True)
train_generator = train_data_loader.get_data_loader()
h = model.fit_generator(train_generator, train_data_loader.iterations, epochs)

データ分析・AIのビジネス導入を読んでのメモ

書籍「データ分析・AIのビジネス導入」を読んだので、気をつけたいことを自分用にメモします。

失敗しない データ分析・AIのビジネス導入: プロジェクト進行から組織づくりまで

失敗しない データ分析・AIのビジネス導入: プロジェクト進行から組織づくりまで

  • 作者: 株式会社ブレインパッド,太田満久,井上佳,今津義充,中山英樹,上総虎智,山?裕市,薗頭隆太,草野隆史
  • 出版社/メーカー: 森北出版
  • 発売日: 2018/07/13
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

データ分析プロジェクトの7つのリスク

  1. 時間と成果が比例しない
  2. データの量や質が不十分
  3. データのフォーマット、入手元へ依存する
  4. データのトレンドが変化してしまう
  5. 分析結果が当たり前の内容、もしくは悪い意味で想定外になる
  6. 分析結果の解釈が難しく現場で使われない
  7. PoCが終わりシステム化で失敗する

機械学習システムの短所を補う工夫

  1. 分析遂行前に業務ユースケースや必要な入出力情報、必要精度や評価指標を明確にする
  2. PoCを実施し、ビジネス適用可能性、性能にまつわる潜在リスク、ビジネスインパクトを把握する
  3. 結果に対して人手による調整を加える余地を残す
  4. 必要なデータを収集、分析する環境を確保する

よくある問題点と対応

課題 問題点 必要な対応
ビジネスの理解 分析官と現場担当者のコミュニケーション不足により分析のゴールがビジネスの目的と乖離する ビジネス上の課題を理解した上で、その課題達成に向けた適切な分析設計を行う
データの理解・準備 データを整理・集計した結果、分析に使用できるデータが少ない/存在しないことに気づく データの質と量を把握して、ビジネス課題の達成に向けて必要なデータを収集・加工して準備する
評価指標の選択 分析官と現場の合意がないままビジネス現場の目的と乖離した評価指標が選択されてしまう ビジネス課題の達成度を測る評価指標と、分析精度を測る指標が一致するよう、適切な評価指標を選択する


データ分析の仕事の流れ

  1. プロジェクト立ち上げ
    |
    |---ゴール設定
    |---アセスメント
    |
  2. PoC
    |
    |---分析設計(分析に求められる要件の確認、分析アプローチの考案)
    |---実施・評価
    |
  3. ビジネス適用
    |
    |---実地試験
    |---開発
    |---運用・保守


プロジェクト立ち上げ詳細

ゴール設定
|
|---プロジェクト目的の設定
    |---ビジネスの理解
    |---分析結果の活用方法検討
|---データ利活用で解決可能な目標の設定
|
アセスメント
|
|---活用されるデータの収集と概要把握
|---スコープの設定
|---プロジェクトメンバーの選定と役割の設定

ビジネスの理解は「何が解決すべき課題か」だけでなく、「なぜそうなっているか」「どのようにすれば改善できるか」といった仮説の検討が重要。さらにかけれるコスト、課題が解決された後のあるべき姿を考えることでプロジェクトの意義が明確になる。


良い目標設定と悪い目標設定の例

良い例 ビジネス目的、具体的な目標、手元にあるデータ、分析アプローチに妥当性がある

目的 目標 良いポイント
ECサイト内での売上向上 データから顧客の理解を深め、レコメンドアルゴリズムを洗練させることでCTRを1%向上させる ビジネス上の目的と目標が明確であり、かつ関係が明らか
製造コスト削減 製造物の需要を予測し、必要な資材の量を計算、さらに最適化を実施することで無駄な資材購入を削減する ビジネス目的を需要予測と資材計画の最適化という2つの目標を組み合わせて達成しようとしている

悪い例 ビジネス目的、具体的な目標、手元にあるデータ、手法のどれかに乖離がある

目的 目標 悪いポイント
データから設計図を書き起こす人件費の削減 深層学習を活用した図面設計AIをつくり出す ①目標に具体性が欠ける。②何かを作り出すタイプの課題は、記録されているデータ以外の、その業務における常識や暗黙的な知識をもった人間がこなしている業務が対象とされており、データ上の内容をいくら機械学習モデルに読み込ませても目的を達成できない場合がある
良品・不良品を見分ける業務を自動化・効率化する 機械学習による異常検知を精度99%で実現する アルゴリズムの精度そのものが目標になっており、ビジネス目的と直接的な整合性が取れていない。②どのような種類の精度かが不明瞭


データ収集で気にする必要がある点 ~ 既存データを活用する場合

データ管理者の特定(プロジェクトメンバー?他部署?社外?)
収集されているデータの粒度(日毎の予測を行いたいのに週毎のデータしかない等)


データ取得で気にする必要がある点 ~ これからデータを取得する場合

データ取得の難易度(データ取得者とデータ分析者が異なることにより、解析に使えないデータが集まる)
著作権等法律面の検討


データ取得で気にする必要がある点 ~ 既存データ/新規データ両方

  • データ取得のコスト
  • データの量や取得期間
  • 取得期間内でデータ取得の方法やフォーマットが変更されていないか
  • データの変遷
  • 取得時のエラーの可能性
  • 取得時点から活用可能になるまでのタイムラグ


PoCフェーズ詳細

  1. 分析要件の確認
  2. アプローチ概要の決定
  3. データ理解
  4. 分析設計
  5. 分析実施
  6. 結果考察・改善方針検討


分析要件の確認詳細

  1. 分析結果はどのようにビジネスに活用されるのか
    1.1. 結果を人が見て意思決定の参考とする
    1.2. 意思決定は人が行うが、意思決定者に具体的行動を提案する
    1.3. 意思決定を含めた自動化を行う
  2. 何が対象か(全ての商品?主力商品のみ?)
  3. どんな出力値が必要化(分類?予測?可視化?)
  4. モデルにはどの程度解釈性が必要か
  5. 分析結果をどのように評価するか
  6. 利用するデータに制約はあるか
  7. 処理時間に制約はあるか(学習時間・予測時間)
  8. 環境面の制約はあるか(オンプレミス/クラウド、PCやプログラミング言語等)


アプローチ概要の決定詳細

  1. 教師あり問題として扱うのか、教師なし問題としてあつかうのか
    ⇒ 教師なしを選択するなら、何故教師ありでないのかを説明できること
  2. どんなデータを用いるか
  3. どんな手法を用いるか
  4. 分析結果をどのように評価するか
  5. どんな環境を用いるか
  6. データ分析特有のリスクについての対応方針

【Python】cv2.rectangleでのエラー"TypeError: an integer is required (got type tuple)"の対処方法

物体検出を行っていて、各クラスに色を割り当ててバウンディングボックスを描画したい、という状況でした。何故か修正前コードでは表題のエラーが発生し、修正後コードのような書き換えをしなければなりませんでした。本質でない部分に大分時間を使ったので、同じエラーが起きる場合に備えてメモしておきます。

修正前コード

# 各クラスに色を割り当てる
colors = np.random.uniform(0, 255, (num_classes, 3)).astype(np.int32)

# クラスに応じて異なる色でバウンディングボックスを描画
for i in range(boxes.size(0)): 
    class_id = boxes[i, 0]
    box = boxes[i, 1:] 
    pt1 = (box[0], box[1]) 
    pt2 = (box[2], box[3]) 
    color = tuple(colors[class_id]) 
    # 何故かここで"TypeError: an integer is required"エラーが発生
    cv2.rectangle(orig_image, pt1, pt2, color, 4) 

修正後コード

# 各クラスに色を割り当てる
colors = np.random.uniform(0, 255, (num_classes, 3))

# クラスに応じて異なる色でバウンディングボックスを描画
for i in range(boxes.size(0)): 
    class_id = boxes[i, 0]
    box = boxes[i, 1:] 
    pt1 = (box[0], box[1]) 
    pt2 = (box[2], box[3]) 
    # ここを変更した
    color = tuple(map(int, colors[class_id])) 
    cv2.rectangle(orig_image, pt1, pt2, color, 4) 

【Python】KPSS検定で単位根の有無を調べる

以下の書籍を読んでて、単位根の有無を調べるのにKPSS検定を行うと書かれている箇所があります(pp.67)。 このPythonコードが欲しかったのでメモしておきます。 ちなみにKPSS検定の帰無仮説は単位根なし、対立仮説は単位根ありとなります。

時系列分析と状態空間モデルの基礎: RとStanで学ぶ理論と実装

時系列分析と状態空間モデルの基礎: RとStanで学ぶ理論と実装

# ホワイトノイズに対してKPSS検定を行います。帰無仮説は棄却されません

import numpy as np
import statsmodels.api as sm

def create_white_noise(n):
    return np.random.rand(n) - 0.5

if __name__ == '__main__':
    data = create_white_noise(1000)
    stats, p_value, lags, crit = sm.tsa.kpss(data)
    # 0.1が出力される
    print(p_value)
# ランダムウォークに対してKPSS検定を行います。帰無仮説は棄却されます

import numpy as np
import statsmodels.api as sm

def create_random_walk(n):
    w = np.random.rand(n)
    w = np.where(w > 0.5, 1, -1)
    w = w.cumsum()
    return w

if __name__ == '__main__':
    data = create_random_walk(1000)
    stats, p_value, lags, crit = sm.tsa.kpss(data)
    # 大体0.01が出力されます
    print(p_value)
# ランダムウォークの1階差分に対してKPSS検定を行います。帰無仮説は棄却されません

import numpy as np
import statsmodels.api as sm
import pandas as pd

def create_random_walk(n):
    w = np.random.rand(n)
    w = np.where(w > 0.5, 1, -1)
    w = w.cumsum()
    return w

if __name__ == '__main__':
    data = create_random_walk(1000)
    data = pd.Series(data)
    data = data.diff().dropna()
    stats, p_value, lags, crit = sm.tsa.kpss(data)
    # 0.1が出力されます
    print(p_value)

【C#】【Python】Pythonのprint文出力をC#側に表示したい

Processで起動したPythonプログラムの中で使っているprint文の内容をC#側に表示したいと考えました。 方法は以下の記事に書かれているようにProcessクラスのBeginOutputReadLineで可能です。

zawapro.com

ただ問題はprint文の結果がC#側にリアルタイムで表示されず、Python側のプログラムが終了後に 一気にそれまでのprint文の結果がC#側に表示されました。Python側では処理途中経過をprint文に 表示しているのでこれでは全く意味がありません。

幸いこの問題の解決方法は以下の記事で見つかり、Python3.3以降ではprint文にflushという 引数がおり、これをTrueにすることで即座にprint文の結果がC#側に表示されるようになりました。 qiita.com

【Python】Siamese NetworkでMNISTの少量データ学習を試す

ディープラーニングは一般的に多くの学習データが必要とされますが、少量しかない場合にどれ位精度が落ちるのか気になり 実験してみようと思いました。そうは言っても普通にCNNに少量のデータを学習させても簡単に過学習しそうに思えるので、 少量のデータでも学習できそうなモデルとしてSiamese Networkを使ってみます。

Siamese Networkとは www.youtube.com

KerasでのSiamese Networkの実装例 github.com

MNISTデータは以下で取得できますが、(X_train, y_train)から各クラス10サンプルをピックアップして それを学習データとし、各クラス100サンプルをピックアップしてそれを検証データとします。 (X_test, y_test)はそのまま精度評価に利用します。

from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

プロジェクトのフォルダ構成は以下のようになっています。
train(学習データを格納するフォルダ)
 |-0(ここに手書き0が10枚)
 |-1(ここに手書き1が10枚)
 |-xxx
 |
 |-9(ここに手書き9が10枚)
val(検証データを格納するフォルダ)
 |-0(ここに手書き0が100枚)
 |-1(ここに手書き1が100枚)
 |-xxx
 |
 |-9(ここに手書き9が100枚)
test(テストデータを格納するフォルダ)
 |-0
 |-1
 |-xxx
 |
 |-9
siamese_data_loader.py(データローダー)
siamese_network.py(モデル)
test_siamese.py(精度検証)
train_siamese.py(学習)

siamese_data_loader.pyは以下になります。get_train_dataを呼び出すとSiamese Networkが必要とする 画像のペア(positiveとnegative)を受け取れるようにしています。

import os, random, cv2
import numpy as np

class SiameseDataLoader(object):
    def __init__(self, root_train_folder_path, samples_per_class, grayscale=False):
        self._root_train_folder_path = root_train_folder_path
        self._samples_per_class = samples_per_class
        self._sample_file_names = self._get_samples()
        self._grayscale = grayscale
        if self._grayscale:
            image = cv2.imread(self._sample_file_names[0][0], cv2.IMREAD_GRAYSCALE)
            self.input_shape = (image.shape[0], image.shape[1], 1)
        else:
            image = cv2.imread(self._sample_file_names[0][0])
            self.input_shape = image.shape

    def get_train_data(self):
        # positiveとnegativeの画像ペアファイルパスを受け取る
        pairs, labels = self._create_pairs(self._sample_file_names, self._samples_per_class)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
        Y = np.zeros((len(pairs), 1), dtype=np.float32)
        i = 0
        if self._grayscale:
            for pair, label in zip(pairs, labels):
                x1 = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)
                X1[i] = x1[:,:,np.newaxis]
                x2 = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)
                X2[i] = x2[:,:,np.newaxis]
                Y[i] = labels[i]
                i += 1
        else:
            for pair, label in zip(pairs, labels):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
                Y[i] = labels[i]
                i += 1
        return [self._normalize(X1), self._normalize(X2)], Y

    def _get_samples(self):
        sample_file_names = []
        folders = os.listdir(self._root_train_folder_path)
        for folder_name in folders:
            folder_path = self._root_train_folder_path + folder_name
            if os.path.isdir(folder_path):
                files = os.listdir(folder_path)
                sample_file_names_per_class = []
                for file in files:
                    sample_file_names_per_class.append(folder_path + os.sep + file)
                sample_file_names.append(sample_file_names_per_class)
        return sample_file_names

    def _create_pairs(self, sample_file_names, samples_per_class):
        positive_pairs, positive_labels = self._create_positive_pairs(sample_file_names, samples_per_class)
        negative_pairs, negative_labels = self._create_negative_pairs(sample_file_names, samples_per_class)
        positive_pairs.extend(negative_pairs)
        positive_labels.extend(negative_labels)

        return positive_pairs, positive_labels

    # 手書き数字の0と0等同じクラスのペアを作成するためのメソッド
    def _create_positive_pairs(self, sample_file_names, samples_per_class):
        positive_pairs = []
        for sample_file_names_per_class in sample_file_names:
            for k in range(samples_per_class):
                positive_pairs.append(random.sample(sample_file_names_per_class, 2))
        labels = [1]*len(positive_pairs)
        return positive_pairs, labels

    # 手書き数字の2と3等異なるクラスのペアを作成するためのメソッド
    def _create_negative_pairs(self, sample_file_names, samples_per_class):
        negative_pairs = []
        class_count = len(sample_file_names)
        for i, sample_file_names_per_class in enumerate(sample_file_names):
            class_ids = list(range(class_count))
            class_ids.remove(i)
            for k in range(samples_per_class):
                pair = []
                pair.append(random.choice(sample_file_names[i]))
                pair.append(random.choice(sample_file_names[random.choice(class_ids)]))
                negative_pairs.append(pair)
        labels = [0]*len(negative_pairs)
        return negative_pairs, labels

    def _normalize(self, X):
        return X/255

    def get_test_data(self, test_image_path, samples_per_class):
        pairs = []
        for sample_file_names_per_class in self._sample_file_names:
            selected_files = random.sample(sample_file_names_per_class, samples_per_class)
            for selected_file in selected_files:
                pair = []
                pair.append(test_image_path)
                pair.append(selected_file)
                pairs.append(pair)
        tmp = cv2.imread(pairs[0][0])
        if self._grayscale:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], 1), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
                X2[i] = cv2.imread(pair[1], cv2.IMREAD_GRAYSCALE)[:,:,np.newaxis]
        else:
            X1 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            X2 = np.zeros((len(pairs), tmp.shape[0], tmp.shape[1], tmp.shape[2]), np.float32)
            for i, pair in enumerate(pairs):
                X1[i] = cv2.imread(pair[0])
                X2[i] = cv2.imread(pair[1])
        return [self._normalize(X1), self._normalize(X2)]


siamese_network.pyは以下になります。トップの方に貼ったリンク先はDenseのみで構成されていますが、 ここではConvolutionalなSiamese Networkにしました。

from keras.models import Sequential, Model
from keras.layers import Dense, Input, Lambda, Conv2D, Activation, MaxPool2D, BatchNormalization, Dropout, Flatten
import keras.backend as K

class SiameseNet(object):
    def __init__(self, input_shape, feature_dim):
        seq = Sequential()
        seq.add(Conv2D(16, 3, padding='same', input_shape=input_shape))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(32, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Conv2D(64, 3, padding='same'))
        seq.add(BatchNormalization())
        seq.add(Activation('relu'))
        seq.add(MaxPool2D())

        seq.add(Flatten())
        seq.add(Dense(256, activation='sigmoid'))
        seq.add(Dropout(0.2))
        seq.add(Dense(feature_dim, activation='linear'))

        input_a = Input(shape=input_shape)
        input_b = Input(shape=input_shape)
        processed_a = seq(input_a)
        processed_b = seq(input_b)
        distance = Lambda(self._euclidean_distance, output_shape=self._eucl_dist_output_shape)([processed_a, processed_b])
        self._model = Model(inputs=[input_a, input_b], outputs=distance)

    def _euclidean_distance(self, vects):
        x, y = vects
        distance = K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))
        return distance

    def _eucl_dist_output_shape(self, shapes):
        shape1, shape2 = shapes
        return (shape1[0], 1)

    def get_model(self):
        return self._model

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true*K.square(y_pred) + (1 - y_true)*K.square(K.maximum(margin - y_pred, 0)))


続いてtrain_siamese.pyは以下になります。オプティマイザに何故RMSPropを使っているかですが、 単純にAdamだと収束しなかったためです(いくつか初期学習係数を変更したのですが、 RMSPropを使った方が良かったです)。

from siamese_net import SiameseNet
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

if __name__ == '__main__':
    iterations = 10000
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    # Adam not works well for Siamese net
    optim = RMSprop(decay=1e-4)
    #optim = Adam(lr=0.0001, decay=1e-4, amsgrad=True)

    loader_train = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)
    loader_val = SiameseDataLoader('val' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader_train.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    min_loss = 9999
    min_iter = -1
    for iteration in range(iterations):
        X, y = loader_train.get_train_data()
        loss_train = siamese.train_on_batch(X, y)
        if (iteration+1)%100 == 0:
            X, y = loader_val.get_train_data()
            loss_val = siamese.evaluate(X, y, verbose=0)
            if loss_val < min_loss:
                min_iter = iteration
                min_loss = loss_val
                siamese.save_weights('weights.h5', True)
            print('loss@' + str(iteration) + ' = ' + str(loss_train) + ',' + str(loss_val) + ' (' + str(min_loss) + '@' + str(min_iter) + ')')


最後にtest_siamese.pyです。この結果の正答率が86.2%となりました。 一方で試しにSiamese Networkと似たような単純な構成のCNNでも同様のデータセットで学習させたところ 正答率が87.61%となり、Siamese Networkよりも良い成績となりました。 ちなみにデータ拡張(拡大/回転/シフト)を学習に加えたところ、Siamese Networkの正答率が95.5%、 CNNの正答率が94.05%となりました。 少量のデータでもMNISTであればそこそこ精度が出ているので、データセットによってはデータが少量でも ディープラーニングを適用できるかもしれません(少量しかデータが無いのにディープラーニングを適用 することが良いかは別問題として)。

from siamese_net import SiameseNet, contrastive_loss
import numpy as np
from keras.optimizers import RMSprop
from siamese_data_loader import SiameseDataLoader
import os

# Siamese Networkはペア画像との距離を返してくるので、
# 一番近い距離の画像が所属しているクラスを予測クラスとする
def distance_to_class(y, classes, samples_per_class):
    i = 0
    class_distances = []
    for c in range(classes):
        distances = []
        for s in range(samples_per_class):
            distances.append(y[i])
            i += 1
        median = np.median(np.array(distances))
        class_distances.append(median)
    return np.argmin(np.array(class_distances))

if __name__ == '__main__':
    samples_per_class = 5
    feature_dim = 10
    grayscale = True
    optim = RMSprop(decay=1e-4)
    test_root_path = 'test' + os.sep
    classes = 10

    loader = SiameseDataLoader('train' + os.sep, samples_per_class, grayscale)

    siamese = SiameseNet(loader.input_shape, feature_dim).get_model()
    siamese.compile(optimizer=optim, loss=contrastive_loss)
    siamese.load_weights('weights.h5')

    correct = 0
    count = 0
    for c in range(classes):
        test_class_folder_path = test_root_path + str(c) + os.sep
        test_file_names = os.listdir(test_class_folder_path)
        distances = []
        for test_file_name in test_file_names:
            test_file_path = test_class_folder_path + test_file_name
            X = loader.get_test_data(test_file_path, samples_per_class)
            y = siamese.predict_on_batch(X)
            predicted_class = distance_to_class(y, classes, samples_per_class)
            if predicted_class == c:
                correct += 1
            count += 1
    accuracy = correct/count*100
    print('accuracy=' + str(accuracy))