覚え書きブログ

tensorflow2を用いたCNNの実装方法

最近ようやくtensorflow2に移行しはじめたが、kerasの流れがあるからなのか実装方法が沢山あって、なんだかややこしい。
以下のサイトに詳しくまとまっているように、Sequential API、Functional APIおよびSubclassing APIの3つの実装方法がある。
qiita.com

以下は、tensorflow2.1を用いている。

Sequential API

初心者向けで、ネットワークの定義から、学習方法の設定、学習および評価まで一貫してtf.keras.models.Sequentialのインスタンスを用いて行う。具体的には、以下のようにSequentialのインスタンスであるmodelにaddメソッドを用いてレイヤーのインスタンスをしていく。そして、compileメソッドを用いて学習・評価方法を設定したのち、fitメソッドを用いて学習を実行しevaluateメソッドを用いて評価を行う。

model = tf.keras.models.Sequential()

# conv1
model.add(tf.keras.layers.Conv2D(32, (3, 3), input_shape=input_shape))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.ReLU())
model.add(tf.keras.layers.MaxPooling2D((2, 2)))

~省略~
# 学習方法の設定
model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
# 学習
model.fit(x_train, y_train, batch_size=200, epochs=2)

~省略~
# 学習データに対する評価
train_loss, train_accuracy = model.evaluate(x_train, y_train, verbose=0)

全体のコードは以下を参照。

gist590a5f7d7b656d8c642b2ed1e03df028

気持ちよいくすっきりした一貫性のあるコードが書ける。しかし、addによりレイヤーの逐次的な追加しかできないので、
途中で分岐したり、skipp connectionしたり、独自の損失関数を用いるなどの複雑なネットワークの定義はできなさそう。
add以外に分岐するためのsplitとかあるかもしれないけど、いずれにしろ限界がきそうだ。

Functional API

おそらくtensroflow 1系を使っていた人向けで、tensor変数を数珠上につなげていくことによりネットワークの定義ができる。
具体的には、以下のように、Input、Conv2D、BatchNormalization、Denseなどのレイヤーのインスタンス変数inputs、conv1、...outputsを繋げていきネットワークを定義する。その後に、なんと画期的なことに、tf.keras.Model(inputs, outputs)を用いてモデルのインスタンスに変換することができる。したがって、この後は、Sequential APIと同じ方法で、学習方法の設定、学習および評価まで一貫してtf.keras.models.Modelのインスタンスを用いて行うことができる。

def cnn(input_shape):
    inputs = tf.keras.layers.Input(shape=input_shape, name="inputs")

    # conv1
    conv1 = tf.keras.layers.Conv2D(32, (3, 3))(inputs)
    conv1 = tf.keras.layers.BatchNormalization()(conv1)
    conv1 = tf.keras.layers.ReLU()(conv1)
    conv1 = tf.keras.layers.MaxPool2D((2, 2))(conv1)

~省略~
    # fc1
    conv3_flat = tf.keras.layers.Flatten()(conv3)
    fc1 = tf.keras.layers.Dense(64,activation='relu')(conv3_flat)

    # fc2
    outputs = tf.keras.layers.Dense(10,activation='softmax')(fc1)

    # モデルの設定
    model = tf.keras.Model(inputs, outputs)

    # 学習方法の設定
    model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

    return model

model = cnn((H,W,C))
model.fit(x_train, y_train, batch_size=200, epochs=2)

~省略~
# 学習データに対する評価
train_loss, train_accuracy = model.evaluate(x_train, y_train, verbose=0)

全体のコードは以下を参照。

gista97af4418257a1f8ee6edbc0cba9f1ad

これにより、skipp connectionは、下位のtensor変数を上位のtensor変数にconcatすることなどにより簡単に実装ができそう。
あと途中で分岐して戻ってくるようなネットワークも簡単に作れそう。ただ、最後まで分岐したまま出力し異なるlossを設定して同時に学習するというのは難しそうな気がする。
以下のように、outputs1, outputs2など異なる出力に対応する2つのモデルを作ったとしても、model1.fit()とmodel2.fit()を実行すると同時ではなく、model1の後にmodel2を学習することになる。
他に実装方法はあるんですかね?

    # fc2
    outputs1 = tf.keras.layers.Dense(10,activation='softmax')(fc1)
    outputs2 = tf.keras.layers.Dense(10,activation='softmax')(conv3_flat)

    # モデルの設定
    model1 = tf.keras.Model(inputs, outputs1)
    model2 = tf.keras.Model(inputs, outputs2)

    # 学習方法の設定
    model1.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
    model2.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

    return model1, model2

Subclassing API

SequentialとFunctionの利点であった一貫性の高い実装方法を完全に捨てたような実装方法で、とにかく見た目が複雑なので、玄人向けの実装方法である。
tf.keras.layers.Layerクラスとtf.keras.Modelクラスを継承して独自のクラスを作ることにより複雑なネットワークを定義することができ、
@tf.functionの学習とテストの処理の関数を独自に定義することにより、同時に複数の損失関数を最適化するような学習も可能である。
具体的には、以下のように、tf.keras.layers.Layerを継承して複数のレイヤーをまとめた独自のレイヤークラスを定義する。この例では、Conv2D、BatchNormalization、ReLUおよびMaxPool2Dの一連の処理をまとめた、myConvというクラスを定義している。基本的な手順としては、コンストラクタの__init__で、レイヤーのインスタンス化した変数を定義し、

# Layerクラスを継承して独自のconvolution用のレイヤークラスを作成
class myConv(tf.keras.layers.Layer):
    def __init__(self,chn=32, conv_kernel=(3,3), pool_kernel=(2,2), isPool=True):
        super(myConv, self).__init__()
        self.isPool = isPool

        self.conv = tf.keras.layers.Conv2D(chn, conv_kernel)
        self.batchnorm = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.layers.ReLU()
        self.pool = tf.keras.layers.MaxPool2D(pool_kernel)        

    def call(self, x):
        x = self.conv(x)
        x = self.batchnorm(x)
        x = self.relu(x)

        if self.isPool:
            x = self.pool(x)
        return x

そして、レイヤー間の繋がりを定義するためにtf.keras.Modelを継承した以下のような独自のモデルクラスを定義し、そのインスタンスをmodel変数に設定する。

# Modelクラスを継承し,独自のlayerクラス(myConvとmyFC)を用いてネットワークを定義する
# 独自のモデルクラスを作成
class myModel(tf.keras.Model):
    def __init__(self):
        super(myModel, self).__init__()
        self.conv1 = myConv(chn=32, conv_kernel=(3,3), pool_kernel=(2,2))
        self.conv2 = myConv(chn=64, conv_kernel=(3,3), pool_kernel=(2,2))
        self.conv3 = myConv(chn=64, conv_kernel=(3,3), isPool=False)
        self.fc = myFC(hidden_chn=64, out_chn=10)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        return self.fc(x)

# モデルの設定
model = myModel()

ここまでは割とスッキリ、SequentialとFunction APIと大差なく書けるが、Subclassingではこの先の学習の部分も独自に書くので複雑になる。
以下のように、@tf.functionというtensorグラフに繋げた状態で、学習の各ステップの処理をするtrain_step関数を定義する。
学習の各ステップは、①modelを使って予測値predictionsを取得、②真値tの誤差lossをloss_object関数を使って計算、③tape.gradientsを用いてモデルパラメータの勾配を計算し、④optimizerを使ってモデルパラメータを更新している。

#----------------------------
# 学習方法の設定
# 学習用と評価用の関数train_stepとtest_stepを定義
# @tf.functionを用いることにより,予測と損失をtensorグラフに繋げることができる
#損失関数
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()

#最適化関数
optimizer = tf.keras.optimizers.Adam()

# 評価指標
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
~省略~

@tf.function
def train_step(x, t):
    with tf.GradientTape() as tape:

        # 予測
        predictions = model(x, training=True)

        # 損失
        loss = loss_object(t, predictions)
    
    # 勾配を用いた学習
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # 評価
    train_loss(loss)
    train_accuracy(t, predictions)
#----------------------------

このtrain_step関数を、以下のようにミニバッチデータを使って実行する。

# ミニバッチの作成
train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(10000).batch(32)

for epoch in range(5):
    for images, labels in train_ds:
        train_step(images, labels) #学習

この辺はtensorflow 1系と同じように、学習にてミニバッチに関するfor文が必要になるとともに、train_stepの関数も独自に作るため、
SequentialとFunctional APIと比べるとコードが煩雑になったように感じるが、その代わり複雑なネットワークを構築し、学習方法にも手を入れることができるようになる。
全体のコードは以下を参照。

gist2dc475cb9ad9786e3febe530f6d29c00

例えば、fc2層の重み係数WのL2ノルムを最小化する損失も同時に最小化する(つまり、L2正則化)は以下のように簡単に書ける。
また、途中で分岐し異なるlossを設定するようなネットワークも同様に簡単に書ける(今回のMNISTの分類用のデータではよい問題が思いつかなったので、L2ノルム制約で実装)。
さすがに、Subclassingはかゆいところに手が届くので、慣れれば一番簡単かも。

# Layerクラスを継承して独自のFC用のレイヤークラスを作成
class myFC(tf.keras.layers.Layer):
    def __init__(self, hidden_chn=64, out_chn=10):
        super(myFC, self).__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(hidden_chn, activation='relu')
        self.fc2 = tf.keras.layers.Dense(out_chn, activation='softmax')

    def call(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return x, self.fc2.weights
~省略~

def train_step(x, t):
    with tf.GradientTape() as tape:

        # 予測
        pred, weights = model(x, training=True)

        # 損失
        loss = loss_object(t, pred) + tf.norm(weights[0],axis=0)
    
    # 勾配を用いた学習
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # 評価
    train_loss(loss)
    train_accuracy(t, pred)

全体のコードは以下を参照。

gistc879599fc7855660db1b10fdd870507b