はじめに
こんにちは。Ultimatrustのmryです。
入社して早5か月経ち、画像解析について研鑽を積む日々ですが、たまには別のこともしたいな、ということで今回は時系列予測について書いていこうと思います。
AIの扱う問題は大きく2つあります。一つは弊社おなじみの画像解析。もう一つが系列データの問題です。系列データとは、ある順番で並んでいるデータについて、その順番に意味があるようなデータです。
一日の中の気温を例にとれば、朝から昼にかけて上がって夜になるにつれ下がる、つまり気温という数字の並び順には意味がある、すなわち気温は系列データだといえるわけです。
順番に意味があるデータというと、気温や株価を思い浮かべがちですが、それだけではありません。代表的な分野に自然言語処理があります。
自然言語処理とは、日本語や英語のような人間が話す言語について分析を行うものです。例えば、次のような日本語を見てみましょう。
「Aさんはワイシャツにコーヒーをこぼしてしまった。だから彼は今Tシャツ一枚で仕事をしている。」
ここで、2文目の「彼」が誰を指しているかといえば、1文目の「Aさん」ですね。単語の順番に意味があることから、こういった自然言語も系列データだといえます。
自然言語処理については、またの機会に書きたいと思います。
系列データの内、各データが時間と関連付けられているものを時系列データといいます。
今回はAI学習の基本的な手順を紹介しつつ、一般的な時系列データ(東京都の平均気温)を使って回帰分析を行っていきます。特に、データの存在しない「未来の予測」に焦点を当てていきます。
0-1.回帰分析
AIにできることは2つあります。分類と回帰です。
分類というのは、例えば画像解析であれば、一枚の画像を見せてそれに写っているものがA,B,Cの内どれなのかを当てる、という感じです。何種類もの候補をあらかじめ学んでおいて、予測対象を候補に分類していく問題です。
回帰というのは、画像解析でいえば、一枚の画像を見せて対象の位置座標を当てる、という感じです。
数学っぽく言えば、説明変数と目的変数の間の関係を調べることです。具体的には、y=ax+bというx(説明変数)とy(目的変数)の関係があったとき、aとbを求めることです。
どちらが難しいというのは一概に言えませんが、両者の違いは実はコスト関数や出力層の関数の違いだけだったりします。
今回行うのは回帰です。
回帰分析には単回帰分析と重回帰分析があります。単回帰は説明変数が一つだけ、重回帰は説明変数が複数あります。例えば、人間の体重が知りたいという場合、単回帰では体重と身長の関係だけ調べますが、重回帰では体重と身長/ウエスト/体脂肪率の関係を調べる、という違いです。
ビジネスでは、既存店舗の売り上げと立地/客層/季節の関係を明らかにし、まだ存在しない店舗の売り上げを立地/客層/季節から予測する、というような使われ方をします。
このような単回帰分析や重回帰分析を行うには、scikit-learnを使った機械学習でも十分可能なのですが、予測の規模を大きくすると(先の未来を予測しようとするほど)問題が出てきます。
体重と身長/ウエスト/体脂肪率の関係が明らかになっていたとして、未来の体重が知りたい場合、未来の身長/ウエスト/体脂肪率を知っておく必要があります。当然そんなデータはありません。
そこで、3日後の体重と現在の身長/ウエスト/体脂肪率の関係を調べる、ということならできます。しかし、機械学習で予測できる期間は限られるでしょう。より先の未来を予測しようとするなら、モデルの規模を大きくしたり、データにも長期的な相関関係が必要です。
0-2.時系列予測
そこでディープラーニングの出番です。ディープラーニングであれば、大量のデータを使って、より未来の予測ができる(はず)でしょう。
そこで、今回は気温データ3871日分について、1か月以上先の未来予測を行ってみます。
30日後までを予測したいとすると、
まず学習は、
1~100日目のデータ と 130日目のデータの関係を学習→
2~101日目のデータ と 131日目のデータの関係を学習→
...............
3742~3841日目のデータ と 3871日目のデータの関係を学習
という感じで過去100日分のデータとその30日後のデータの関係を学習します。今回は100日単位にしていますが、これは場合によります。
そして肝心の予測は、
3743~3842日目のデータから3872日目のデータを予測→
3744~3843日目のデータから3873日目のデータを予測→
...............
3772~3871日目のデータから3901日目のデータを予測
という風に予測を行います。こうすれば、今持っている3871日分のデータから30日未来の3901日目までのデータが予測できます。(分かりにくいので後で図示しています)
今回行うのは気温だけを用いた単回帰分析です。
気温の順番関係だけで、未来の気温を予測しようという試みです。
これから行っていく予測をまとめると、
東京都の気温データ3871日分の関係から1か月以上先の気温を予測する
そのために、AIに過去100日のデータとその30日後のデータの関係を学習させる
ということになります。
1か月"以上"先を予測したいのに30日後のデータまでしか学習させないのは疑問だと思いますが、これは秘密兵器を試すために敢えてこのようにしています。詳しくは第9節にて。
それではコードを見ていきましょう。
環境は以下の通りです。
Python 3.9.6
matplotlib 3.4.3
numpy 1.19.5
pandas 1.3.3
scikit-learn 0.24.2
tensorflow 2.6.0
1.インポート
import os
import math
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM
from tensorflow.keras.regularizers import l2
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger
from sklearn.preprocessing import MinMaxScaler
最初はkeras(tensorflowを使いやすくするためのライブラリ)やnumpy(行列演算を高速に行うためのライブラリ)、pandas(データの加工に使う)、matplotlib(結果を図示するのに使う)等をインポートします。
2.データの読み込み
def preprocess():
current_path = os.path.dirname(__file__)
df = pd.read_csv(current_path + '/average_temperature.csv', header=0)
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
original_data = df.values
return original_data
次に下準備として関数定義を行っていきます。今回は2つの関数を用意します。
一つ目は今回使用する「東京都の平均気温」データをcsvファイルから読み込む関数です。pandasを使って読み込むことができます。
(データは気象庁のwebサイトからダウンロードできます。)
pandasではデータフレームという形式でデータを扱います。データフレームは一般的な配列とは違い、行や列の名前を使ってデータを操作できます。データに欠損などがある場合は、値を補間したりもできます。
また、tensorflowやscikit-learnでは基本的にnumpy形式でデータを扱いますが、データフレームからは".values"で簡単にnumpy形式に変換できます。
データ元:「気象庁」
https://www.data.jma.go.jp/gmd/risk/obsdl/
3.モデルの定義
def create_model(hidden_size, batch_size=None, time_steps=None, stateful=False):
inputs = Input(batch_shape=(batch_size, None, 1))
x = inputs
x = LSTM (hidden_size, stateful=stateful, return_sequences=True)(x)
x = Dropout(0.5)(x)
x = LSTM (hidden_size, stateful=stateful, return_sequences=False)(x)
x = Dropout(0.5)(x)
x = Dense(1, kernel_regularizer=l2(0.0001), activation='relu')(x)
outputs = x
model = Model(inputs=inputs, outputs=outputs)
return model
2つ目はモデルを定義する関数です。今回はLSTM(Long Short Term Memory)という再帰型ニューラルネットワーク(recurrent neural network, RNN)の一種を使います。RNNは系列データを扱うためのニューラルネットワークです。普通のニューラルネットワークでは入力データが一つ一つ独立していますが、RNNの入力は系列データのため、複数のデータを同時に入力します。この中身については、参考書などを読んだほうが分かりやすいので省略します。
ここではLSTMを2層重ねたモデルを作っています。一般に、同じ性能を求める場合、層を重ねるほど必要なパラメータ数が少なくなります。その代わり、訓練するのが難しくなり、学習に工夫が必要になります。今回の場合、工夫というには簡単ですが、dropout層とkernel_regularizerというのを使っています。
もう一つ、予測に直接関係する要素について言及しておきます。コード中の"stateful"という記述です。
RNNには状態を保持するモードと保持しないモードがあり、それぞれ "stateful"、"stateless"といいます。
statelessの場合、例えば以下のように予測します。
ex) 5の倍数を予測する場合
[ 5,10,15,20,25] →(予測)→ [30],
[10,15,20,25,30] →(予測)→ [35],
5の倍数を5つ入力して、その流れから次の1つを予測しています。状態を保持しないため、一つ一つの予測は独立します。
対してstatefulの場合、5~20をあらかじめ入力しておき、それを踏まえた予測を行います。
ex) 5~20は既に入力済み
[25] →(予測)→ [30],
[30] →(予測)→ [35],
以前の予測を覚えている(状態を保持している)ため、それまでのデータをいちいち入力する必要はありません。また、予測をする度に状態は更新され、次の予測に影響します。
予測の速度は入力が少ない分statefulの方が上です。しかし、statefulにすると格段に考慮する要素が増え、学習が大変になります。一般的な回帰分析ならわざわざstatefulで行う必要はないのですが、今回はせっかくなので一工夫します。
statelessで学習し、statefulで予測を行います。
4.パラメータの設定と前処理
# hyper parameters
training = True # 訓練モード or 予測モード
train_rate = 0.8 # 訓練データの割合
time_steps = 100 # 一度の予測に使うデータ数
predict_day = 30 # 予測する日数
hidden_size = 64 # 隠れ層の大きさ
batch_size = 64 # バッチサイズ
epochs = 500 # エポック数
下準備はここまでとして、ここからメインのプロセスを開始します。
まずはハイパーパラメータの設定とデータの前処理を行います。ハイパーパラメータは学習を繰り返して自分で調整していきます。コード中の値は既に調整済みのものです。
# データの読み込み
current_path = os.path.dirname(__file__)
original_data = preprocess() # (3871, 1)
# 訓練データとテストデータの分割と正規化
border = math.floor(len(original_data) * train_rate)
train = original_data[:border] # (3096, 1)
test = original_data[border:] # ( 775, 1)
train_scaler = MinMaxScaler(feature_range=(0, 1))
train_scaler.fit(train)
scaled_train = train_scaler.transform(train)
scaled_test = train_scaler.transform(test) # テストデータを正規化の基準に入れてはならない
前処理ではまず、訓練データとテストデータにデータを分割しています。機械学習やディープラーニングでは、データを訓練/検証/テストに3分割するのが基本です。
訓練データは文字通り、学習(訓練)に使うデータです。
検証データは学習中に客観的に性能を評価し、ハイパーパラメータを調整するために使われます。これはkerasで自動的に行うことができるので、この後の学習部分で紹介します。
テストデータは学習がすべて終わった後、モデルの性能を評価するために使われます。
また、回帰問題ではデータを0~1の間に正規化しておくことが精度向上のための定石です。
# 学習モード
if training:
# 入力データとラベルデータの作成
data_size = len(scaled_train) - time_steps - predict_day # 2966
x_train = np.zeros((data_size, time_steps)) # (2966, 100)
y_train = np.zeros((data_size, 1)) # (2966, 1)
for i in range(data_size):
x_train[i] = scaled_train[i:i + time_steps].T # (2966, 100)
y_train[i] = scaled_train[i + time_steps + predict_day] # (2966, 1)
data_size = len(scaled_test) - time_steps - predict_day # 645
x_test = np.zeros((data_size, time_steps)) # (645, 100)
y_test = np.zeros((data_size, 1)) # (645, 1)
for i in range(data_size):
x_test[i] = scaled_test[i:i + time_steps].T # (645, 100)
y_test[i] = scaled_test[i + time_steps + predict_day] # (645, 1)
# モデルの入力に形状を合わせる
x_train = x_train.reshape(len(x_train), time_steps, 1) # (2966, 100, 1)
x_test = x_test.reshape(len(x_test), time_steps, 1) # ( 645, 100, 1)
y_train = y_train.reshape(len(y_train), 1, 1) # (2966, 1, 1)
y_test = y_test.reshape(len(y_test), 1, 1) # ( 645, 1, 1)
今回は"training"という値を使って、学習と予測を分離した実装を行っています。
学習において最も気を配らなければならないのが、データの形状です。そのため、コード中にはコメントでその時々のデータの形状を表記しています。これがずれていたりすると、エラーが起きて学習できなかったりするので重要です。一番手間がかかる部分といってもいいかもしれません。
5.モデルの学習
model = create_model(hidden_size=hidden_size,
time_steps=time_steps,
stateful=False)
model.compile(optimizer=Adam(learning_rate=1e-4), loss="mean_squared_error", metrics=["mean_absolute_percentage_error"])
checkpointer = ModelCheckpoint(filepath=current_path + '/best_model.hdf5', verbose=1, save_best_only=True, save_weights_only=True)
csv_logger = CSVLogger(current_path + '/history.log')
ようやくモデルの学習に移ります。まずモデルの定義です。今回はstatelessで学習、statefulで予測を行うので、学習段階ではstatelessでモデルを定義します。
回帰分析ですので、コスト関数にはmean squared errorを使っています。分類問題の場合はcategorical_crossentropy等を使ったりします。両者のコスト関数が違う理由は、分類が「予測の正解数」を学習の目安にするのに対して、回帰が「予測と正解がどの程度ずれているか」を目安にするからです。
# 学習
history = model.fit(x=x_train,
y=y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_split=0.3,
shuffle=False,
callbacks=[csv_logger, checkpointer])
学習は"model.fit"で行います。
先ほど話に出した検証データは、コード中の"validation_split"の部分で訓練データから分割されています。こうしておくことで、kerasでは自動的に学習と同時に検証データによる評価を行います。一般的な手順としては、学習終了後もしくは途中で検証データの精度を見て、ハイパーパラメータを調整します。パラメータを調整し終えた後は、検証データは必要ありませんので、最終的な学習として訓練データに検証データを含めて学習させることも可能です。
kerasには色々便利な機能があり、例えばコード中の"checkpointer"は検証データを使って学習中の一番良い状態のモデルを保存してくれるものです。学習が進めば進むほど精度が良くなるわけではなく、過学習等で検証データの精度が落ちることもあるからです。
モデル自体と学習した重みを一度に保存することもできますが、ここでは重みだけを保存しています。これはあとでモデルをstatefulで再定義するためです。
# 学習の評価
eval = model.evaluate(x=x_test,
y=y_test,
batch_size=batch_size,
verbose=1)
print('MAPE:{} %'.format(eval[1]))
モデルの精度評価は"model.evaluate"で行うことができます。モデルの評価には、分割しておいたテストデータを使います。
6.時系列予測の準備
# 予測モード
else:
model_sf = create_model(hidden_size=hidden_size,
batch_size=1,
stateful=True)
model_sf.load_weights(current_path + '/best_model.hdf5')
ここからは予測に入ります。まずstatefulでモデルを再定義します。ここで定義されるモデルは、学習時と全く同じ形状なので、先ほど学習して保存した重みをそのままロードすることができます。
# 状態を保存するため、最初100個のデータで予測(ここでの予測結果は使わない)
inputs_pre = scaled_test[:time_steps]
inputs_pre = inputs_pre.reshape(1, time_steps, 1)
model_sf.predict(inputs_pre)
# 100+30個目までは予測できないため、真値を入れておく
predicted = scaled_test[:time_steps+predict_day] # 130
予測には訓練に使っていないテストデータを使うことにします。
予測の前準備として、statefulモデルに最初の状態を覚えさせます。今回は100個のデータから予測を行うので、テストデータの最初100個を使って一度予測をしておきます(model_sf.predictの部分)。これで状態を覚えさせることができます。ちなみに、ここでの予測結果は使いません。
また、最初の100個とその先の30個目までは予測できません(一回目の予測結果が131個目になるため)。
今回は結果を評価するために、101~(末尾-30)個を使って予測を行い真値と比較します。
更に、末尾までのデータを使って、目的であるテストデータの未来30個を予測します。(わかりづらい...)
図にすると以下のようになります。
7.30日後までの予測
# 予測開始
i = 0
while len(predicted) < len(scaled_test) + predict_day: # 130 ~ 805
input_data = scaled_test[time_steps+i]
input_data = input_data.reshape(1, 1, 1)
score = model_sf.predict(input_data)
predicted = np.append(predicted, score[0,0])
i += 1
predicted = train_scaler.inverse_transform(predicted.reshape(-1, 1)) # 805
いよいよ予測を行います。実装ではデータ数+30個まで予測を繰り返します。
statefulなので、入力しているのは各予測で1個だけです。
予測後、値は0~1に正規化された状態なので、逆変換で元に戻します。
8.予測評価と結果の図示
# 予測の評価
y_pred = predicted[time_steps+predict_day:-predict_day]
y_true = test[time_steps+predict_day:]
mape = np.mean(np.abs((y_pred - y_true) / y_true)) * 100
print('MAPE: {} %'.format(mape))
予測の評価を行います。"model.evaluate"だとあまり細かい指定ができないので、ここでは普通にMAPE(簡単に言うと誤差率)を計算します。
# 結果出力のためのデータフレームを作成
day = datetime.datetime.strptime('2019/6/23', '%Y/%m/%d')
days = []
for _ in range(len(predicted)):
day += datetime.timedelta(days=1)
day_str = day.strftime("%Y/%m/%d")
days.append(day_str)
df_date = pd.DataFrame(days)
df_date.columns = ['Date']
df_pred = pd.concat([pd.DataFrame(predicted), df_date], axis=1)
df_pred = df_pred.set_index('Date')
予測結果を格納しているnumpy配列はただの数字の羅列なので、ここで時間との関連付けを行います。
# プロットの作成
plt.figure(figsize = (15, 7))
plt.plot(test, label = "True value")
plt.plot(df_pred, label = "predicted")
plt.axvspan(0, len(test[:time_steps+predict_day]), facecolor='Cyan', alpha=0.075)
plt.axvspan(len(test[:time_steps+predict_day]), len(test), facecolor='magenta', alpha=0.075)
plt.axvspan(len(test), len(predicted), facecolor='yellow', alpha=0.1)
bbox_dict = dict(boxstyle='round', facecolor='None', edgecolor='red')
plt.text(time_steps-50, 5, 'pre_input', fontsize=16, bbox=bbox_dict)
plt.text(len(test)-90, 5, 'eval', fontsize=16, bbox=bbox_dict)
plt.text(len(predicted)-30, 5, 'future', fontsize=16, bbox=bbox_dict)
plt.xlabel("Days")
plt.ylabel("average temperature", fontsize=16)
plt.xticks(np.arange(0, len(df_pred), 90), rotation=45)
plt.title("Comparison true vs. predicted", fontsize=16)
plt.legend(loc='upper left', fontsize=16)
plt.show()
matplotlibを使って予測結果をグラフにします。
というわけで、結果は次のようになりました。
水色: 状態を覚えさせるために事前に入力したデータ
ピンク: 予測評価のために真値と比較しているデータ
黄色: 元データが存在しない未来のデータ
どうでしょうか。
MAPEの値は誤差率20.68% で、まずまずではないでしょうか。
グラフを見た感じ、特徴も捉えられているように思います。
9.30日以上未来の予測
さて、ここまでは30日先までの予測を行いました。最後に、今回学習したモデルを使って、30日以上先の予測を行います。その方法は自己回帰です。
自己回帰とは、予測により生成した系列データを次の予測の入力として利用することです。
予測部分を以下のように変更し、予測結果を入力として利用するようにします。
# 予測開始
i = 0
while len(predicted) < len(scaled_test) + predict_day: # 130 ~ 805
input_data = scaled_test[time_steps+i]
input_data = input_data.reshape(1, 1, 1)
score = model_sf.predict(input_data)
predicted = np.append(predicted, score[0,0])
i += 1
# 自己回帰による追加予測
j = 0
while len(predicted) < len(scaled_test) + predict_day + 500:
input_data = predicted[-predict_day]
input_data = input_data.reshape(1, 1, 1)
score = model_sf.predict(input_data)
predicted = np.append(predicted, score[0,0])
j += 1
predicted = train_scaler.inverse_transform(predicted.reshape(-1, 1)) # 805
ここでは追加で500日分を予測してみました。
気温は周期的に変化するので、予測の難易度は低いと思いますが、一応うまくいっている感じですね。
おわりに
いかがだったでしょうか。
今回は時系列データを回帰分析して未来のデータを予測してみました。
本当はコロナ陽性者数のような役に立つ予測がしたかったのですが、変動が急激すぎて無理でした...
未来を予測するのは中々ロマンのあることですが、やっぱりそう簡単にはいきませんね。
ここでは気温のみを用いた単回帰分析を行いましたが、より精度を向上させるなら、訓練データを多次元にして重回帰分析を行えばいいでしょう。データとして考えられるのは、気温と相関のありそうなもの、例えば湿度/季節/雲量などですね。
ここまで読んでいただき、ありがとうございました。