ABEJA Tech Blog

株式会社ABEJAのエンジニアたちがAI, IoT, Big Dataなど様々なテーマについて発信します

pandas DataFrameを省メモリにpickleする

ABEJAでデータエンジニアをしています、千葉です。

少し前に、pandasのDataFrameをファイルに読み書きする際にメモリを消費しすぎる問題を発見したので、解決策を含めて紹介します。

通常手法の紹介

通常、DataFrameをファイルに保存する際には、pandasの提供するIOモジュールを使用します。

今回は、細かい変換規則を書く必要のないPython Pickleをベースとしたto_pickle機能について取り上げます。

# Dumping pandas.DataFrame
import pandas
df = pandas.DataFrame(..., columns=...)

df.to_pickle(output_file_path)
# Restoring pandas.DataFrame
import pickle

with open(input_file_path, 'rb'):
    df = pickle.load()

上記のようにして、非常に簡単にDataFrameを完全に入出力できます。

通常手法の課題

ここで、注意すべき点として、pickleのメモリ効率の悪さが挙げられます。 実際に実メモリサイズがGBクラスのDataFrameを作成し、パフォーマンスを測定したログが下記になります。

テストコード

GB = 1024 * 1024 * 1024
df = None

def prepare_data():
    global df
    row_count = 40000000
    print('generating data.')
    print('row count', row_count)
    series_1 = numpy.random.randn(row_count)
    series_2 = numpy.random.randn(row_count)
    series_3 = numpy.random.randn(row_count)
    df = pandas.DataFrame({'a': series_1,
                           'b': series_2,
                           'c': series_3})
    return df


def run_to_pickle():
    result_path = 'run_to_pickle.bin'
    df.to_pickle(result_path)


def run_load_pickle():
    result_path = 'run_to_pickle.bin'
    df = pickle.load(open(result_path, 'rb'))

結果

-------prepare_data-------
generating data.
row count 40000000
sizeof df: 2305.9765625MB

-------run_to_pickle-------
Maximum memory usage: 4137.2109375MB
Elapsed time: 3.702019843040034

-------run_load_pickle-------
Maximum memory usage: 2322.0859375MB
Elapsed time: 5.678721209987998

※各関数の実行後には、ガベージコレクションを実行しています。

※メモリ使用量の測定には、memory_profiler.memory_usageを使用しています。

※経過時間の測定には、timeitを使用しています。

残念ながら、約2GBのDataFrameに対して、出力時に最大約4GBほどのメモリを使用しています。およそ2倍です。

調査した結果、

  • オブジェクトのコピーが発生すること
  • 逐次入出力をできないこと

が原因のようです。

解決手法

今回の解決手法では、下記のような性質を持つ入出力用のラッパークラスを設計します。

  • DataFrameをチャンク化して逐次入出力をさせることで、最大メモリ使用量を削減する。
  • pandas系オブジェクトをnumpy系オブジェクトに変換して扱うことで、高速化・小型化する。

イメージ図 - エンコーディング

f:id:archibay:20170202152706p:plain

イメージ図 - デコーディング

f:id:archibay:20170202152710p:plain

コード例

# 部品番号と部品データ格納用のクラス
# indexは、複数のDataFrameを一括で入出力するための索引
class SerializationParts(object):
    def __init__(self, name, index, data):
        self.name = name
        self.index = index
        self.data = data

    def blueprint(self, binary_size):
        return SerializationParts(self.name, self.index, binary_size)

SerializationPartsに部品番号と、部品データを格納します。 設計図生成時には、バイト長を記録するようにします。

# propertiesに登録された変数を SerializationParts に格納し、
# yieldする機能のベースクラス
class WrapperBase():
    properties = []

    def __custom_encode__(self, property_name, index):
        # Please override this function
        yield SerializationParts(name=property_name, index=index, data=getattr(self, property_name))

    def encode(self, index):
        # 最初にクラス情報をパーツとして作成
        yield SerializationParts('base', index, self.__class__)
        for property_name in self.properties:
            # プロパティをそれぞれパーツとして作成
            for a_parts in self.__custom_encode__(property_name, index):
                yield a_parts

    def __custom_decode__(self, parts):
        # Please override this function
        setattr(self, parts.name, parts.data)

    def decode(self, parts):
        self.__custom_decode__(parts)

WrapperBaseに、逐次パーツ作成機能を持たせておきます。 custom_encode、 custom_decodeをオーバーライドすることで、サブクラスは 逐次処理を制御できます。

# DataFrameのラッパークラス

class DataFrameWrapper(WrapperBase):
    properties = ['df', 'max_rows']

    def __init__(self, df, max_rows=1000000):
        super(DataFrameWrapper, self).__init__()
        self.df = df
        self.max_rows = max_rows

    def __custom_encode__(self, property_name, index):
        max_rows = self.max_rows
        # dfプロパティは複数パーツに分割
        if property_name == 'df':
            # 列名、型、Indexを構造情報としてパーツ化
            structure = {'index': numpy.array(self.df.index),
                         'columns': list(self.df.columns),
                         'dtypes': list(self.df.dtypes)}
            yield SerializationParts(['df', 'structure'], index, structure)
            row_counts = len(self.df.index)
            # 数値データは、列毎・max_rows毎にパーツ化
            for series in list(self.df.columns):
                series_name = series
                row_loop_count = 1 + int(row_counts / max_rows)
                for i in range(row_loop_count):
                    # numpy.arrayを使うことで、飛躍的に省メモリ化・高速化を実現できる。
                    yield SerializationParts(
                        ['df', series_name, i], index,
                        (max_rows,
                         numpy.array(self.df[series_name][max_rows * i: min(max_rows * (i + 1), row_counts)])))
        else:
            # dfプロパティ以外は、WrapperBaseの機能を使用
            return super(DataFrameWrapper, self).__custom_encode__(property_name, index)

    def __custom_decode__(self, parts):
        if isinstance(parts.name, list) and parts.name[0] == 'df':
            if parts.name[1] == 'structure':
                l = len(parts.data['index'])
                dtype_dict = OrderedDict()
                for k, v in zip(parts.data['columns'], parts.data['dtypes']):
                    # numpy.arrayを使うことで、飛躍的に高速化を実現できる。
                    dtype_dict[k] = numpy.ndarray(shape=l, dtype=v)
                # 現状、最も高速な行数固定の空DataFrameの生成処理。
                self.df = pandas.DataFrame(dtype_dict, index=parts.data['index'])
                del dtype_dict
            else:
                row_counts = len(self.df.index)
                max_rows, series_data_parts = parts.data
                series_name = parts.name[1]
                series_parts_no = parts.name[2]
                series_data_parts = pandas.Series(series_data_parts)
                # 数値データの設定処理
                self.df[series_name][max_rows * series_parts_no: min(max_rows * (series_parts_no + 1), row_counts)] = \
                    series_data_parts
        else:
            # dfプロパティ以外は、WrapperBaseの機能を使用
            super(DataFrameWrapper, self).__custom_decode__(parts)

WrapperBase クラスを継承し、pandas.DataFrame 用のラッパーを作成します。 プロパティ df に pandas.DataFrame をセットできるようにします。 構造情報の別パーツ化、数値データのChunk処理を行い、逐次処理可能にします。 エンコード・デコードの過程に、pandas.Seriesやpandas.DataFrameの使用を極力避け、 numpy.arrayを使用することで、パフォーマンスを飛躍的に向上させることができます。

# エンコーダークラス
class Encoder(object):
    def __init__(self, data_file, blueprint_file=None):
        self.data_file = data_file
        if blueprint_file is not None:
            self.blueprint_file = blueprint_file
        else:
            self.blueprint_file = '{data_file}.bp'.format(data_file=self.data_file)

    def encode(self, data: [WrapperBase]):
        data_out = open(self.data_file, 'wb')
        blueprint_out = open(self.blueprint_file, 'wb')
        print('Start encoding', self.data_file)
        print('Dumping blueprint')
        blueprint = {'blueprint': [], 'data_length': len(data)}
        for i, d in enumerate(data):
            # パーツを逐次エンコード
            for parts in d.encode(i):
                # 現在のカーソルを記録
                current_cursor = data_out.tell()
                print('Encoding %s.%s' % (i, parts.name))
                # パーツのデータ部分を書き出し
                pickle.dump(parts.data, data_out, protocol=-1)
                # 書き込み後のカーソルを記録
                new_cursor = data_out.tell()
                # ブループリントにカーソル移動量(パーツデータのバイト長)を記録
                blueprint['blueprint'].append(parts.blueprint(new_cursor - current_cursor))
        pickle.dump(blueprint, blueprint_out, protocol=-1)
        logging.info('Finish encoding')

データファイルと設計図ファイルに分けて格納します。 pickleを使用して部品データをエンコードし、バイト長を設計図ファイルに書き出します。

# デコーダークラス
class Decoder(object):
    def __init__(self, data_file, blueprint_file=None):
        self.data_file = data_file
        if blueprint_file is not None:
            self.blueprint_file = blueprint_file
        else:
            self.blueprint_file = '{data_file}.bp'.format(data_file=self.data_file)

    def decode(self) -> [WrapperBase]:
        data_in = open(self.data_file, 'rb')
        blueprint_in = open(self.blueprint_file, 'rb')
        print('Start decoding', self.data_file)
        print('Loading blueprint')
        # ブループリントデータの読み込み
        blueprint = pickle.load(blueprint_in)
        # initialize output_data
        data = [None for _ in range(blueprint['data_length'])]
        for bp in blueprint['blueprint']:
            # ブループリントのバイト長分だけ読み込み
            parts_data = data_in.read(bp.data)
            index = bp.index
            name = bp.name
            # バイトデータを読み込み
            bp.data = pickle.loads(parts_data)
            if name == 'base':
                data_class = bp.data
                # 空インスタンスの作成
                instance = data_class.__new__(data_class)
                data[index] = instance
            else:
                data[index].decode(bp)
        logging.info('Finish decoding')
        return data

データファイルと設計図ファイルから読み込みます。 設計図に記録された部品番号とバイト長を元に、pickleを使用してデコードします。

# テストコード
def run_my_encode():
    result_path = 'run_my_encode.bin'
    encoder = Encoder(result_path)
    encoder.encode([DataFrameWrapper(df=df)])


def run_my_decode():
    result_path = 'run_my_encode.bin'
    decoder = Decoder(result_path)
    df_wappers = decoder.decode()
    df = df_wappers[0].df

結果

-------prepare_data-------
generating data.
row count 40000000
sizeof df: 2305.9765625MB

-------run_to_pickle-------
Maximum memory usage: 4137.2109375MB
Elapsed time: 3.702019843040034

-------run_load_pickle-------
Maximum memory usage: 2322.0859375MB
Elapsed time: 5.678721209987998

-------run_my_encode-------
Maximum memory usage: 2465.16015625MB
Elapsed time: 3.972677645040676

-------run_my_decode-------
Maximum memory usage: 2184.8671875MB
Elapsed time: 4.480759038007818

※run_my_decode時のメモリ使用量が、元のdfよりも少なくなっていますが、デコード結果には正しい数値が格納されていました。

下記のようにパフォーマンスが改善しました。

  • 出力時の実行時間がほぼ変わらず、メモリ使用量が約40%改善。
  • 入力時の実行時間が約20%改善し、メモリ使用量がほぼ変わらない。

ただし、あらゆる構造のDataFrameに対応しているかというと、Noです。 例えば、列が入れ子になっている場合などは、正しくエンコードできません。

まとめ

pandas.DataFrameの入出力パフォーマンスを改善するラッパーを作成し、

  • 出力時のメモリパフォーマンス
  • 入力時の実行時間

を大幅に改善するすることに成功しました。

デメリットとしては、標準の方法に比べて対応可能なDataFrameが限定的な点です。

みなさまも、大きなデータの入出力を行う際には、 ライブラリを信頼しすぎずに、パフォーマンスチェックをしてみてはいかがでしょうか。

宣伝

ABEJAでは、技術課題を解決し、ブレイクスルーを生み出すエンジニアを募集しています。 今回の記事をきっかけに、少しでも興味が湧いた方は、ぜひ一度話を聞きに来てみてください。

ABEJAが発信する最新テクノロジーに興味がある方は、是非ともブログの読者に!

ABEJAという会社に興味が湧いてきた方はWantedlyで会社、事業、人の情報を発信しているので、是非ともフォローを!! www.wantedly.com

ABEJAの中の人と話ししたい!オフィス見学してみたいも随時受け付けておりますので、気軽にポチッとどうぞ↓↓