ABEJA Tech Blog

中の人の興味のある情報を発信していきます

実験を高速化する機械学習パイプライン開発の挑戦

はじめに

こんにちは、ティアキンで寄り道し過ぎて永遠にストーリークリア出来ない坂元です。データサイエンスチームに所属しています。LLMの一大ブームの中でLLMの記事を書かないのは若干憚られますが、高速に実験を回す用途で気軽に使える機械学習パイプラインライブラリって実はあまりない…?と思ったので、今回は機械学習パイプラインライブラリを個人で開発してみている話をします。なお、本記事では機械学習パイプラインを「データの加工・モデルの学習・推論を一連のワークフローとして実行出来るツール」とし、データ収集やデプロイ、分布シフトの監視などの工程については言及しないものとします。また、比較的小規模なプロジェクトの検証段階で利用することを前提とします。

開発したパイプラインのライブラリは以下のリポジトリでバージョン0.0.1として公開しましたので、実装の詳細はリポジトリをご参照ください。ドキュメントとかもまだ作れてない&超絶雑なコミットしかしてない段階ですがこれから整備しようと思っているので、面白いと思ってくださる方が居たらまずは使ってみて頂けると嬉しいです!(もっと便利なツールに出来そうと思った方はぜひコントリビュートもお願いします!)

github.com

本記事では既存パイプラインの課題感と開発したパイプラインの機能紹介をメインにやっていきます。

TL;DR

本パイプラインのコンセプト:scikit-learnライクにタスクを定義してPyTorchライクに繋げる

高速に実験を回すときの使いやすさの観点で他の代表的なパイプラインとざっくり比較

Kedro gokart sklearn Imker(開発) 
学習コストの低さ
タスクのモジュール性
パイプラインの可読性
実行結果キャッシュの柔軟性
推論ワークフロー構築の容易さ

△:課題がある 〇:良い ◎:より良い

※評価は主観です。

目次

既存の機械学習パイプラインライブラリの課題感

既存ツールのざっくり比較

既存の機械学習パイプラインライブラリとしてはScikit-learn Pipeline、Airflow、Prefect、Kedro、luigiとそのラッパーであるgokartなど様々なものがあり、それぞれ便利な場面やメリット・デメリットが異なります。この辺りの比較は以下の記事によくまとまっています。

qiita.com

上記で言及されているメリット・デメリットを踏まえて、以下に自分が感じたことを書いていきます。

課題感の前に

既存の機械学習パイプラインライブラリへの課題感を書く前に、用語の定義をしておきます。

用語 定義
タスク 処理の単位。例えば説明変数を標準化するStandardScalerなど。モデルの学習・予測も1つのタスクとみなす。
プロジェクト データ分析で取り組む課題。Kaggleなどのコンペもプロジェクトの1つとみなす。
パイプライン 複数のタスクを繋げることでデータに対して一連の処理を指定した順番で実行し結果を出力するもの。

既存の機械学習パイプラインライブラリに思うこと

学習コストが高いがち

既存のパイプラインライブラリを使うにあたって最初にハードルとなるのは、ライブラリのお作法を覚えることです。さすがに普段から慣れ親しんでいるscikit-learnのPipelineくらいならそこまで学習コストも高くないですが、それ以外のツールではどのようにタスクを定義するか、どこに何のコードを書くか、どのようにパラメータを管理するか、どのようにタスク間の依存関係を定義するか等々がライブラリごとに決まっていて覚えることがたくさんあります。そのため、使い始めようにも簡単に使い始められないがちです。

タスクがモジュラーにならないがち

実験の効率化やタスクのテスタビリティのために、タスク間の依存関係が変わったりプロジェクトを跨いだりしてもscikit-learnのクラスのようにタスク自体のソースコードはそのままで流用出来ることが望ましいですが、gokartではluigiよりはモジュラーになっていますが、タスク間の依存関係をタスククラス内のrequiresメソッド内で定義するようになっており、タスク間の依存関係を変更する度にタスクのソースコードを変更する必要が生じます。以下はgokartのtutorialから引用したサンプルコードです。

from logging import getLogger
import gokart
from gokart_example.utils.template import GokartTask
logger = getLogger(__name__)


class Sample(GokartTask):
    def run(self):
        self.dump('sample output')


class StringToSplit(GokartTask):
    """Like the function to divide received data by spaces."""
    task = gokart.TaskInstanceParameter()

    def run(self):
        sample = self.load('task')
        self.dump(sample.split(' '))


class Main(GokartTask):
    """Endpoint task."""
    def requires(self):
        return StringToSplit(task=Sample())

多くの実験を回す場面ではタスク間の依存関係が変わることもあるため、依存関係変更の度にタスクレベルでのソースコード修正をしなければならないのは手間です。

可読性下がりがち

パイプラインが大きくなりタスクの依存関係が複雑になってきても挙動が理解しやすいこと、容易に修正出来ることは実験を効率化する上で重要な要素です。scikit-learn Pipelineは複雑なタスク依存関係を定義しようとするとPipelineとタスクだけでなくColumnTransformerやFeatureUnionなども利用するため、処理の内容がぱっと見でイメージしにくくなります。scikit-learnのドキュメントからいくつか例を引用します。

estimators = [('linear_pca', PCA()), ('kernel_pca', KernelPCA())]
combined = FeatureUnion(estimators)
column_trans = ColumnTransformer(
    [('categories', OneHotEncoder(dtype='int'), ['city']),
     ('title_bow', CountVectorizer(), 'title')],
    remainder='drop', verbose_feature_names_out=False)
ct = ColumnTransformer([
    ('scale', StandardScaler(),
     make_column_selector(dtype_include=np.number)),
    ('onehot',
     OneHotEncoder(),
     make_column_selector(pattern='city', dtype_include=object))])

これくらいのコード量なら問題にはならないですが、タスクが増えてパイプラインが複雑化してくると処理の流れを追いにくくなりそうです。

gokartでもパイプラインが大きくなってくるとタスク間の依存関係を把握するのが難しくなってきますが、複数のタスクを一つのタスクにまとめるなど工夫することである程度は可読性を担保できます。

また、DAGでタスクの依存関係を定義するタイプの ライブラリでは、パイプラインが大きくなるほどDAG定義の記述量が増え、可読性が下がります。この問題については以下のブログでも言及されています。

techblog.nhn-techorus.com

Kedroでは、各ノードの入出力とpipeline定義における入出力の名前の対応付けを行いながらコードを追いかける必要があり、タスクが増えてきたりタスク間の依存関係が複雑になってくるとコードを追うのが結構つらいです。以下はKedroのtutorialから引用したコードです。(少し長いので折りたたんでいます)

キャッシュの取り方に悩みがち

同じデータに対して同じ処理をする場合、計算結果をキャッシュしておき2回目以降の実行でキャッシュを読み込むようにすることで実験を効率化出来ます。個々のタスクの出力をキャッシュとして保存する場合、キャッシュするデータをなるべく小さく抑えたくなりますが、scikit-learnのように直線的にデータが流れていくようなパイプラインではキャッシュを小さくするのがそもそも難しく、かといってDAGで依存関係を定義するタイプのパイプラインだと出力を小さく分割するほどタスク間の依存関係が複雑になるのでどのようにパイプラインを設計するか悩ましく、開発速度が低下して多くの実験を高速に回しにくくなります。gokartでもタスク間のデータの受け渡しはキャッシュ経由なので同様の悩みが発生します。

学習・検証時と推論時でタスク・ワークフローが分離されがち

scikit-learnの推定器はfit()transform()、予測器はfit()predict()があるので推論時は基本的にはtransform()またはpredict()だけを実行すればいいのですが、他のライブラリでは学習で1タスク、推論で1タスクと推論用のタスクを改めて定義しなければならないことがあります。例えばgokartでtitanicデータを扱う以下の例では、モデルの学習タスクと推論タスクを分けて定義しています。

www.m3tech.blog

kedroでは推論用のワークフローをどのように定義するのかについてのプラクティスが確立していないようです。この件については以下のブログでも言及されています。

qiita.com

学習・検証と推論で別々のワークフローを定義するか、またはMLFlowと連携することで推論用のワークフローを作れるという説があります。

github.com

推論するために別のワークフローを定義し直すのは手間ですし、推論する目的でMLFlowと連携するのはちょっと過剰ですね。

やりたいことをやる方法が分からないがち

これは学習コストに関係する部分でもあるのですが、やりたいことを実現するのに工夫が必要だと開発速度は下がります。例えばパイプライン内で交差検証でOOFサブセットごとに処理したい時や後処理を行いたい時に、たいていの場合どこにどういう処理を書くのか考える必要が出てきます。

gokartでは1タスクの中でデータ分割、分割後のOOFサブセットに対する前処理、モデル学習、後処理、スコアの計算を行うか、どのfoldのデータかが分かるようにしつつdumpするデータにOOFサブセットを全てまとめて後続のタスクに渡す(キャッシュサイズが大きくなる)等、コードの書き方も少し工夫が必要になってきます。(1タスクにまとめる方が楽ではありつつも、gokartの思想に反する書き方のような気もします。)

目的変数の変換をしたい場合、scikit-learn PipelineではTransformedTargetRegressorを使えるようですが、これは連続変数の変換を想定しているのでそれに当てはまらない場合はPipelineの外側で変換することになります。

既存の機械学習パイプラインライブラリは多くの実験を高速で回す用途には向いてなさそう

そんなこんなで、既存の機械学習パイプラインライブラリもとても便利なのですが、多くの実験を高速で回すという用途においてはどうも痒い所に手が届かない感がありました。最適なツールが無いなら作ってみようということで、自分で開発をしてみています。

新しい機械学習パイプラインライブラリをつくってみた

実装方針

ライブラリを作るにあたって最低限意識・実装したいことと実装の方針は以下です。

ざっくり要件 実装方針
・学習コストが低い
・タスクをモジュラーに保ちやすい
・学習時と推論時でワークフローを分離する必要が無い
scikit-learnライクにタスクを定義する
・複雑なパイプラインも可読性を維持しつつ構築できる
・キャッシュのサイズや実行を制御できる
PyTorchライクに各タスクを接続する
・再現性が確保できる 各タスク実行前にseedを固定する
・タスクのパラメータが自動で保存される 各タスクの実行時にパラメータをyaml形式で出力する

コンセプト

このライブラリの特徴を一言で言うと「タスクをscikit-learnライクに定義してPyTorchライクに繋げる」です。パイプラインは全データの前処理、データ分割、OOFサブセットの前処理、モデルの学習・推論、後処理というコンポーネントで構成されていて、これらのコンポーネントの中で処理がPyTorchライクに記述されます。

各タスクはscikit-learnライクに定義されるので、これらをPyTorchのレイヤーのように動作させるためのクラスとしてTaskクラスを定義しています。Taskクラスはタスクに応じて実行するメソッドを変えることで、前処理、データ分割、モデル学習・推論を共通のインターフェースで実行出来るようになっています(後述)。

以下では、titanicデータをもとに実際にライブラリを動かしていきます。

Titanicデータで自作のライブラリをつかってみる

ライブラリのインストール

ライブラリ自体はpipかpoetryでインストール出来ます。依存ライブラリはnumpy、pandas、pyyamlです。

# pip
pip install git+https://github.com/taikinman/imker.git

# poetry
poetry add git+https://github.com/taikinman/imker.git

データの読み込み

scikit-learnのdatasetモジュールを使ってOpenMLからTitanicデータセットをダウンロードし、学習データとテストデータに分けます。ついでに、後の説明のために目的変数をテキストに変換しておきます。

import pandas as pd
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

X, y = fetch_openml(data_id=40945, as_frame=True, return_X_y=True, parser='auto')

X, X_test, y, y_test = train_test_split(X, y, shuffle=True, random_state=0, test_size=0.3)
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)
X_test = X_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)
y = y.map({"0":"not survived", "1":"survived"})

必要なモジュールのインポート

from imker import Pipeline, Task, TaskConfig, BaseTask, BaseProcessor, BaseSplitter, BaseModel

それぞれのモジュールの役割は以下の通りです。

モジュール 役割
Task 各タスクをPyTorchレイヤーのように動作させるためのモジュール
TaskConfig Taskの引数となるdataclass。タスクのパラメータやキャッシュ時の挙動などを指定する。
BaseTask 各タスクが継承するベースクラス
BaseProcessor 各タスクを繋げて一連の処理を定義するクラスが継承するベースクラス。全データに対する前処理、OOFサブセットに対する前処理、後処理で利用する。
BaseSplitter データ分割を担うクラスが継承するベースクラス
BaseModel 学習・推論をするモデルのクラスが継承するベースクラス
Pipeline Processor、Splitter、Modelを繋げてパイプライン化するクラス

パイプラインの初期化

続いてパイプラインを初期化します。

pipe = Pipeline(repo_dir="../../../cache", exp_name="example", pipeline_name="titanic")

repo_dirで指定したパスにキャッシュやfitしたタスクが保存されていきます。今はローカル環境への保存だけ対応しています。

タスクの定義

前処理で利用するタスクを定義していきます。各タスクはBaseTaskクラスを継承します。BaseTaskクラスには空のfitメソッドがあるので、fitする必要のないタスクはtransformメソッドだけ実装すればOKです。

from typing import Any, Union
import time

class DropCols(BaseTask):
    def __init__(self, cols:list) -> None:
        self.cols = cols
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X = X.drop(self.cols, axis=1)
        return X

class TimeSleep(BaseTask):
    def transform(self, X: Any) -> Any:
        time.sleep(1)
        return X

class DTypeConverter(BaseTask):
    def __init__(self, dtype: str) -> None:
        self.dtype = dtype

    def transform(self, X: Union[pd.DataFrame, pd.Series]) -> Union[pd.DataFrame, pd.Series]:
        X = X.astype(self.dtype)
        return X

このように、各タスクはscikit-learnライクに定義しておきます。TimeSleepタスクは1秒間停止するだけのクラスですが、後でキャッシュの効果を見るために定義しています。

前処理クラスの定義

続いて、上記で定義した各タスクをPyTorchライクに繋げて前処理を行うクラスを定義します。ここでは自分で定義したタスク以外に、scikit-learnのOrdinalEncoderLabelEncoderをタスクとして利用しています。

from sklearn.preprocessing import OrdinalEncoder, LabelEncoder

class PreProcessor(BaseProcessor):
    def __init__(self):
        self.drop = Task(TaskConfig(task=DropCols, 
                                    init_params={"cols":["name", "cabin", "ticket", "body", "boat", "home.dest"]}, 
                                    ))
        self.cat_encoder = Task(TaskConfig(task=OrdinalEncoder, # you can use scikit learn class as it is
                                           init_params={"handle_unknown":"use_encoded_value", 
                                                        "unknown_value":-1, 
                                                        "encoded_missing_value":-999}, 
                                            ))
        self.target_label_enc = Task(TaskConfig(task=LabelEncoder))
        self.dtype_converter = Task(TaskConfig(task=DTypeConverter, 
                                               init_params={"dtype":"int8"}))
        self.sleep = Task(TaskConfig(task=TimeSleep, 
                                     cache=True))# If you pass True, output of the task will be cached.
        
    def forward(self, X, y=None):
        X = self.drop(X)
        X[["sex", "embarked"]] = self.cat_encoder(X[["sex", "embarked"]])
        X[["sex", "embarked"]] = self.dtype_converter(X[["sex", "embarked"]])
        if y is not None:
            y = self.target_label_enc(y) # target variable can be transformed as well as features
        X = self.sleep(X)
        return X, y

TaskConfigの引数でcache=Trueとしておくとtransform()predict()などの処理の結果がキャッシュされます。デフォルトでは、キャッシュ時はタスクのソースコードとタスクに入力されたデータ・パラメータ、タスクの初期状態の4つを使ってハッシュを生成しています。また、データのサイズや処理内容によってはキャッシュを読み書きするよりもメモリ上でデータを流した方が処理時間が短いということもあります。そのため、タスクごとに処理結果をキャッシュするかしないかを選べるようにしています。ここではTimeSleepクラスだけキャッシュしておきます。forward()はX, yを両方受け取って、処理後のX, yを返すようにします。推論時はyは無いので、初期値としてNoneを入れておきます。

Taskクラスのインスタンスは__call__メソッド経由で利用します。各タスクは呼び出し時に必要に応じてfit()が実行され、その後タスクが持っているメソッドに応じて処理を実行するようになっています。例えば、タスクがtransform()を持っていればtransform()を、split()を持っていればsplit()を、predict()を持っていればpredict()を実行します。実行の優先順位はpredict() or predict_proba()transform()split()となっているので、transform()predict()を両方持つタスクについてはpredict()メソッドが優先されます(バグの原因になりそうだけどいったんこれで。。。)。fit()の返り値=インスタンス自身は自動でキャッシュされるようになっています。

forward()内でタスクの依存関係や各タスクの入出力が明確になっているので、パイプラインが大きくなってきてもある程度可読性を維持出来そうですし、実行結果のキャッシュもタスクごとに柔軟に出来ます。この前処理クラスをパイプラインにセットします。なお、各タスクの実行前にはseedが固定されます。

pipe.set_preprocessor(PreProcessor)

前処理の結果の確認

test_preprocessing()で前処理の結果を確認出来ます。

%%time
pipe.test_preprocessing(X, y)

TimeSleepタスクがあるので1秒程度処理に時間がかかっています。ただしTimeSleepタスクの出力をキャッシュしているので2回目以降の実行では1秒待つ処理がスキップされます。

%%time
pipe.test_preprocessing(X, y)

いいですね。こんな感じで、前処理追加→結果確認→前処理追加という営みが自然に出来そうです。

なお、imkerには簡易的なリポジトリのビューアがあり、以下のように簡単にキャッシュの結果を見ることが出来ます。

from imker import RepositoryViewer
viewer = RepositoryViewer(repo_dir="../../../cache")
viewer.search_repo()

task_idload_cache()に渡してTimeSleepタスクのキャッシュの結果を見てみます(といっても入力をそのまま出力しているだけですが)

viewer.load_cache(5)

各タスクのconfig、例えばOrdinalEncoderタスクのconfigは以下で見れます。各タスクのfit実行時のパラメータはyamlファイルで保存されています。

viewer.load_config(3)

TaskConfigfit_params以外にもtransform_paramspredict_paramsも引数として受け取れます。transformやpredictの実行時に指定したいパラメータはこれらの引数で指定します。

データ分割クラスの定義

学習データと検証データに分ける処理を定義します。ここでは単にStratifiedKFoldをするだけにします。Splitterも前処理のPreProcessorクラスと同様にデータ分割をTaskとして扱います。

from sklearn.model_selection import StratifiedKFold

class Splitter(BaseSplitter):
    def __init__(self):
        self.splitter = Task(TaskConfig(task=StratifiedKFold, 
                                    init_params={"n_splits":5, "shuffle":True}))

    def get_n_splits(self):
        return self.splitter.get_n_splits()
    
    def split(self, X, y=None):
        return self.splitter(X, y)

このSplitterクラスをパイプラインにセットします。

pipe.set_splitter(Splitter)

OOFサブセットに対する前処理クラスの定義

続いて、学習データと検証データに分割した後の各サブセットに対して適用する前処理を定義します。ここではscikit-learnのStandardScalerと欠損値補間のタスクを使います。

# 欠損値補間のタスク
class FillNa(BaseTask):
    def __init__(self, values:Union[dict, float, str]) -> None:
        self.values = values

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        return X.fillna(self.values)
from sklearn.preprocessing import StandardScaler

class OOFPreProcessor(BaseProcessor):
    def __init__(self):
        self.std_scaler = Task(TaskConfig(task=StandardScaler))
        self.fillna = Task(TaskConfig(task=FillNa, 
                                      init_params={"values":-999}))

    def forward(self, X, y=None):
        X[["age", "fare"]] = self.std_scaler(X[["age", "fare"]])
        X = self.fillna(X)
        return X, y

この前処理クラスをパイプラインにセットします。

pipe.set_oof_preprocessor(OOFPreProcessor)

OOFサブセットに対する前処理結果の確認

OOFに対する前処理の結果はジェネレータになっているので、それぞれのfoldの結果を確認する際はnext()を使います。

g = pipe.test_oof_preprocessing(X, y)
oof = next(g)
oof.keys()

next()の返り値はOrderedDictになっており、上記のように学習データと検証データを表すキーが入っています。また、見た目はただのOrderedDictですが、ドット(.)でキーにアクセス出来るようにしているので以下のようにキーの値にアクセス出来ます。

oof.X_train.head()

モデルクラスの定義

本記事ではLogisticRegression、KNN、LightGBMを使います。LightGBMを使うには不要な前処理も入っていますが、説明のために入れておきます。モデルの定義は以下のように行います。パラメータはほぼデフォルトで行きましょう。

from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from lightgbm.callback import early_stopping
from lightgbm import LGBMClassifier

class Classifier(BaseModel):
    def __init__(self):
        self.lr = Task(TaskConfig(LogisticRegression))
        self.knn = Task(TaskConfig(KNeighborsClassifier))
        self.lgb = Task(TaskConfig(task=LGBMClassifier, 
                                   init_params={"n_estimators":200, 
                                                "n_jobs":1}, 
                                   fit_params={"callbacks":[
                                       early_stopping(stopping_rounds=30)
                                   ]}))

    def forward(self, X, y=None, eval_set=None, proba=False):
        return {"lr": self.lr(X, y, proba=proba), 
                "knn":self.knn(X, y, proba=proba), 
                "lgb":self.lgb(X, y, eval_set=eval_set, proba=proba)}

出力はモデル名と予測結果の辞書にします。引数のprobaをTrueにすればタスクのpredict_proba()が実行されます。タスクのfit()の引数にeval_setがある場合はModelクラスのforward()の引数にeval_setを追加してタスク呼び出し時に渡せばOKです。

このモデルをパイプラインにセットします。

pipe.set_model(Classifier)

学習

それではモデルを学習していきます。といっても以下を実行するだけです。

pipe.train(X, y)

LightGBMのEarly Stoppingもちゃんと働いてそうです。いいですね。

Cross Validation

評価指標をパイプラインにセットして精度を見ていきます。今回はaccuracy_scoreとf1_scoreをセットします。

from sklearn.metrics import accuracy_score, f1_score
pipe.set_metrics([accuracy_score, f1_score])

以下を実行して、OOFサブセットの予測結果をまとめたものを取得します。

val_preds = pipe.validate(X, y)

簡単ですね。精度はget_scores()で確認します。

pipe.get_scores()

特に何も工夫などしていないのでこんなもんでしょうか。OOFの予測結果は以下で確認します。

pd.DataFrame(val_preds)

validate()probacalc_metricsを引数に持ちます。probaをTrueにすれば予測確率が取得出来ます。calc_metricsをFalseにすると評価指標の計算は行いません。

val_preds = pipe.validate(X, y, proba=True, calc_metrics=False)
val_preds.lr

出力が確率になりました。いいですね。

推論

評価データを推論します。推論も基本的にはinference()を実行するだけで、確率が欲しい時にproba引数をTrueにする点も同様です。

test_preds = pipe.inference(X_test)
test_preds.lgb[:30]

推論結果はfoldごとのモデルで平均されています。

パイプラインのロード

最後に、作ったパイプラインをロードする方法を紹介します。パイプラインのtrain()実行時に内部でどのタスクを使用したかを記録したyamlファイルをdumpしています。dumpされたyamlファイルは以下のようになっています。

パイプラインをロードする際は、repo_direxp_namepipeline_nameを指定して上記のyamlファイルを読み込み、これまでに定義した各クラスのタスクにマッピングしていきます。

pipe2 = Pipeline.load(repo_dir="../../../cache", exp_name="example", pipeline_name="titanic", 
                      preprocessor=PreProcessor, 
                      splitter=Splitter,
                      oof_preprocessor=OOFPreProcessor, 
                      model=Classifier
                      )

パイプラインが再現されたことを確認しましょう。

再現も簡単ですね。

簡単な例ではありますが、こんな感じのライブラリになっています。

その他

キャッシュはデフォルトではpickle化したものをbz2で圧縮するようになっていますが、以下のような独自のプロセッサーをTaskConfigcache_processorに渡せば好きなフォーマットで実行結果をキャッシュすることも出来ます。タスクごとにキャッシュのフォーマットを変えることも出来ます(この機能が必要になるかは不明ですが)。

from imker import BaseCacher

class CSVCacher(BaseCacher):
    @staticmethod
    def save(filepath: str, obj: pd.DataFrame):
        obj.to_csv(filepath, index=False)

    @staticmethod
    def load(filepath: str):
        return pd.read_csv(filepath)

    @staticmethod
    def format():
        return "csv"

また、今回は使いませんでしたが、後処理もPreProcessorと同様に定義し、パイプラインにセットすることが出来ます。

キャッシュが保管されているリポジトリの中は以下のような構成になっています。

cache
|--pipeline
|  |--example
|  |  |--titanic.yml
|--task
|  |--fit
|  |  |--DropCols
|  |  |  |--074bd8c8041151257e49f742bae1e7a1cfa5e9f4ffda58077af2294c36664f1a
|  |  |  |  |--9129df9b95d615ae2b5c8cfceec3a77a3cbfea5a5f90e20d68ce9356ef2e0a66
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
|  |  |--DTypeConverter
|  |  |  |--ae1dc0ef9063ccccb23624e6c33c078eb71e03364b61e0152afdb3c069c40da3
|  |  |  |  |--5cd33d1bc0d0cb776a1ada4efce987d2000edc94d5b2f6b380ccf091725fc72a
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
(中略)
|  |  |--KNeighborsClassifier
|  |  |  |--001e3a90e3f1f700f06563c8e2894e6c64feb242e656bdb04db65e236bf23841
|  |  |  |  |--a01c9a385f6dd0ce7274f0b875460737a7f0a76e2795eb1118c125879f4df863
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
|  |  |  |--4918a8f939cb1febd90fff0fc2df6dea7187a22f635bf624b25dcbf2022ad68a
|  |  |  |  |--b0710eb1f93fb96490c106928c9d8907baee32425a5aee358c27229451217e52
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
(中略)
|  |--split
|  |  |--StratifiedKFold
|  |  |  |--41ee55d9056014b9f3682607f34bb7d1a781603eddb89a084617fe8a4cc3946b
|  |  |  |  |--45cb87deb4ef3f7814e651abd88a866b9a399f23b94878b8f3e6270a75556ba0
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
|  |--transform
|  |  |--TimeSleep
|  |  |  |--44cb8d5ce6672a6be52e8bf09d091245a46d7a8e1ce34a0044711fc466c64cb1
|  |  |  |  |--d8623aa0940216e260d9d3fc4e74db27dcbdddb488e94b348fda21ed14e96cbd
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2
|  |  |  |--51397ca57c1dd22dfbb49cd4792e985b31c274f0d6f83e0031de164a7b9a26e1
|  |  |  |  |--d8623aa0940216e260d9d3fc4e74db27dcbdddb488e94b348fda21ed14e96cbd
|  |  |  |  |  |--task_config.yml
|  |  |  |  |  |--task.pbz2

このライブラリが出来ないこと

  • 独立したタスクを並列実行する今のところ機能はありません。
  • パイプラインどうしを接続する機能は今のところありません。
  • GUIでパイプラインを可視化する機能はありません。
  • 各種クラウドとの連携機能は今のところありません。

作ってみての感想と振り返り

まだまだ原形ですが、既存のツールに感じていた課題感はある程度払拭出来るライブラリになりうるんじゃないかなと思っています。一方でタスク自体の挙動は__call__()内に隠ぺいされているため、挙動の理解のしやすさという観点ではやや課題が残る形となりました。汎用性の高さと制約がトレードオフになっている部分もあり、改めてこういうツールを作る難しさを実感しました。とりあえず作ってみてから考えようの精神だったのでここに来るまでに数回作ってみて最初からやり直してを繰り返したと思います。当たり前と言えば当たり前ですが、最初の課題感の深掘りとか設計が肝心ですね。。。こういう経験が無かったのでライブラリとして開発するための諸々(リポジトリの構成だったりパッケージ管理周りの話だったり開発環境のことだったり)から調べる必要もありました。正直いまだによく分かっていない部分もあります。いい加減なライブラリならいくらでも作れてしまいますが、ちゃんと開発していくならdocstringをちゃんと書いたりドキュメントを書いたりCIでテストを実行したり等も考えていかないとですし、きちんとしたライブラリを作る労力ってすごいんですね。。。大きなライブラリを開発・メンテしている方々は本当に偉大です。。。どんなライブラリにもイマイチな部分があったりバグがあったりドキュメントが開発に追いついてなかったりもしますが、そういう所により寛容になりました。

おわりに

まだ最低限の機能ではありますが、実験を高速化する用途で使いやすい新しい機械学習パイプラインライブラリを開発して公開しました。面白そうだなとか便利そうだなと思った方はぜひ使ってみてください!(フィードバックやコントリビューションも大歓迎です!!)

ABEJAでは一緒に働く仲間を募集しています!俺が考えた最強のツールを開発したい方もそうじゃない方もぜひABEJAへ!

careers.abejainc.com