大規模データセットへのスケーリング#

pandasはインメモリ分析用のデータ構造を提供するため、メモリよりも大きいデータセットを分析する際に、pandasを使用するのは少々厄介です。メモリのかなりの部分を占めるデータセットでも、一部のpandas操作では中間コピーを作成する必要があるため、扱いにくくなります。

このドキュメントでは、大規模データセットへの分析をスケーリングするためのいくつかの推奨事項を提供します。これは、メモリに収まるデータセットの分析を高速化することに焦点を当てたパフォーマンスの向上を補完するものです。

読み込むデータを減らす#

ディスク上の生のデータセットに多くのカラムがあると仮定します。

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
   ...:     index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
   ...:     n = len(index)
   ...:     state = np.random.RandomState(seed)
   ...:     columns = {
   ...:         "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
   ...:         "id": state.poisson(1000, size=n),
   ...:         "x": state.rand(n) * 2 - 1,
   ...:         "y": state.rand(n) * 2 - 1,
   ...:     }
   ...:     df = pd.DataFrame(columns, index=index, columns=sorted(columns))
   ...:     if df.index[-1] == end:
   ...:         df = df.iloc[:-1]
   ...:     return df
   ...: 

In [4]: timeseries = [
   ...:     make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}")
   ...:     for i in range(10)
   ...: ]
   ...: 

In [5]: ts_wide = pd.concat(timeseries, axis=1)

In [6]: ts_wide.head()
Out[6]: 
                     id_0 name_0       x_0  ...   name_9       x_9       y_9
timestamp                                   ...                             
2000-01-01 00:00:00   977  Alice -0.821225  ...  Charlie -0.957208 -0.757508
2000-01-01 00:01:00  1018    Bob -0.219182  ...    Alice -0.414445 -0.100298
2000-01-01 00:02:00   927  Alice  0.660908  ...  Charlie -0.325838  0.581859
2000-01-01 00:03:00   997    Bob -0.852458  ...      Bob  0.992033 -0.686692
2000-01-01 00:04:00   965    Bob  0.717283  ...  Charlie -0.924556 -0.184161

[5 rows x 40 columns]

In [7]: ts_wide.to_parquet("timeseries_wide.parquet")

目的のカラムを読み込むには、2つの選択肢があります。オプション1は、すべてのデータを読み込んでから必要なものにフィルタリングします。

In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]

In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[9]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

オプション2は、要求されたカラムのみを読み込みます。

In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[10]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

2つの呼び出しのメモリ使用量を測定すると、このケースではcolumnsを指定すると約10分の1のメモリを使用することがわかります。

pandas.read_csv()では、usecolsを指定して、メモリに読み込むカラムを制限できます。pandasで読み取れるすべてのファイル形式が、カラムのサブセットを読み取るオプションを提供しているわけではありません。

効率的なデータ型を使用する#

デフォルトのpandasデータ型は、最もメモリ効率が良いわけではありません。これは、ユニークな値が比較的少ないテキストデータ列(一般に「低カーディナリティ」データと呼ばれる)に特に当てはまります。より効率的なデータ型を使用することで、より大きなデータセットをメモリに格納できます。

In [11]: ts = make_timeseries(freq="30s", seed=0)

In [12]: ts.to_parquet("timeseries.parquet")

In [13]: ts = pd.read_parquet("timeseries.parquet")

In [14]: ts
Out[14]: 
                       id     name         x         y
timestamp                                             
2000-01-01 00:00:00  1041    Alice  0.889987  0.281011
2000-01-01 00:00:30   988      Bob -0.455299  0.488153
2000-01-01 00:01:00  1018    Alice  0.096061  0.580473
2000-01-01 00:01:30   992      Bob  0.142482  0.041665
2000-01-01 00:02:00   960      Bob -0.036235  0.802159
...                   ...      ...       ...       ...
2000-12-30 23:58:00  1022    Alice  0.266191  0.875579
2000-12-30 23:58:30   974    Alice -0.009826  0.413686
2000-12-30 23:59:00  1028  Charlie  0.307108 -0.656789
2000-12-30 23:59:30  1002    Alice  0.202602  0.541335
2000-12-31 00:00:00   987    Alice  0.200832  0.615972

[1051201 rows x 4 columns]

次に、データ型とメモリ使用量を調べて、どこに注意を集中すべきかを確認しましょう。

In [15]: ts.dtypes
Out[15]: 
id        int64
name     object
x       float64
y       float64
dtype: object
In [16]: ts.memory_usage(deep=True)  # memory usage in bytes
Out[16]: 
Index     8409608
id        8409608
name     65176434
x         8409608
y         8409608
dtype: int64

nameカラムが他のどのカラムよりも多くのメモリを消費しています。ユニークな値が少ないため、pandas.Categoricalに変換するのに適した候補です。pandas.Categoricalを使用すると、各ユニークな名前を一度だけ格納し、省スペースの整数を使用して各行でどの特定の名前が使用されているかを知ることができます。

In [17]: ts2 = ts.copy()

In [18]: ts2["name"] = ts2["name"].astype("category")

In [19]: ts2.memory_usage(deep=True)
Out[19]: 
Index    8409608
id       8409608
name     1051495
x        8409608
y        8409608
dtype: int64

さらに、pandas.to_numeric()を使用して数値カラムを最小の型にダウンキャストすることもできます。

In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")

In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")

In [22]: ts2.dtypes
Out[22]: 
id        uint16
name    category
x        float32
y        float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]: 
Index    8409608
id       2102402
name     1051495
x        4204804
y        4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()

In [25]: print(f"{reduction:0.2f}")
0.20

全体として、このデータセットのインメモリフットプリントを元のサイズの5分の1に削減しました。

pandas.Categoricalの詳細についてはカテゴリカルデータを、pandasのすべてのdtypesの概要についてはdtypesを参照してください。

チャンキングを使用する#

大規模な問題を多数の小さな問題に分割するチャンキングによって、一部のワークロードを達成できます。例えば、個々のCSVファイルをParquetファイルに変換し、ディレクトリ内の各ファイルに対してそれを繰り返すといった具合です。各チャンクがメモリに収まる限り、メモリよりもはるかに大きなデータセットを扱うことができます。

チャンキングは、実行する操作がチャンク間でゼロまたは最小限の調整しか必要としない場合にうまく機能します。より複雑なワークフローの場合、他のライブラリを使用する方が良いでしょう。

ディスク上に、Parquetファイルのディレクトリであるさらに大きな「論理データセット」があると仮定します。ディレクトリ内の各ファイルは、データセット全体の異なる年を表します。

In [26]: import pathlib

In [27]: N = 12

In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)

In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
   ....:     ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
   ....:     ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
   ....: 
data
└── timeseries
    ├── ts-00.parquet
    ├── ts-01.parquet
    ├── ts-02.parquet
    ├── ts-03.parquet
    ├── ts-04.parquet
    ├── ts-05.parquet
    ├── ts-06.parquet
    ├── ts-07.parquet
    ├── ts-08.parquet
    ├── ts-09.parquet
    ├── ts-10.parquet
    └── ts-11.parquet

次に、アウトオブコアのpandas.Series.value_counts()を実装します。このワークフローのピークメモリ使用量は、単一の最大のチャンクと、それまでのユニークな値の数を格納する小さなシリーズです。個々のファイルがそれぞれメモリに収まる限り、これは任意のサイズのデータセットで機能します。

In [32]: %%time
   ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
   ....: counts = pd.Series(dtype=int)
   ....: for path in files:
   ....:     df = pd.read_parquet(path)
   ....:     counts = counts.add(df["name"].value_counts(), fill_value=0)
   ....: counts.astype(int)
   ....: 
CPU times: user 1.01 s, sys: 37.3 ms, total: 1.04 s
Wall time: 1.03 s
Out[32]: 
name
Alice      1994645
Bob        1993692
Charlie    1994875
dtype: int64

pandas.read_csv()のような一部のリーダーは、単一ファイルを読み込む際にchunksizeを制御するパラメータを提供します。

手動チャンキングは、あまり洗練された操作を必要としないワークフローには良い選択肢です。pandas.DataFrame.groupby()のような一部の操作は、チャンクごとに実行するのがはるかに困難です。このような場合、これらのアウトオブコアアルゴリズムを実装している別のライブラリに切り替える方が良いかもしれません。

他のライブラリを使用する#

pandasと似たAPIを提供し、pandas DataFrameと連携して動作する他のライブラリもあり、並列実行、分散メモリ、クラスタリングなどによって大規模データセットの処理と分析をスケーリングする機能を提供できます。詳細については、エコシステムページを参照してください。