大規模データセットへのスケーリング#

pandas はメモリ内分析のためのデータ構造を提供しますが、メモリよりも大きいデータセットを分析するために pandas を使用すると、やや扱いが難しくなります。メモリのかなり大きな部分を占めるデータセットでも、pandas の一部の操作では中間コピーを作成する必要があるため、扱いにくくなります。

このドキュメントでは、分析をより大規模なデータセットにスケーリングするためのいくつかの推奨事項を示します。これは、メモリに収まるデータセットの分析速度向上に焦点を当てたパフォーマンスの向上の補足です。

データの読み込み量を減らす#

ディスク上の生のデータセットに多くの列があるとします。

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
   ...:     index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
   ...:     n = len(index)
   ...:     state = np.random.RandomState(seed)
   ...:     columns = {
   ...:         "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
   ...:         "id": state.poisson(1000, size=n),
   ...:         "x": state.rand(n) * 2 - 1,
   ...:         "y": state.rand(n) * 2 - 1,
   ...:     }
   ...:     df = pd.DataFrame(columns, index=index, columns=sorted(columns))
   ...:     if df.index[-1] == end:
   ...:         df = df.iloc[:-1]
   ...:     return df
   ...: 

In [4]: timeseries = [
   ...:     make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}")
   ...:     for i in range(10)
   ...: ]
   ...: 

In [5]: ts_wide = pd.concat(timeseries, axis=1)

In [6]: ts_wide.head()
Out[6]: 
                     id_0 name_0       x_0  ...   name_9       x_9       y_9
timestamp                                   ...                             
2000-01-01 00:00:00   977  Alice -0.821225  ...  Charlie -0.957208 -0.757508
2000-01-01 00:01:00  1018    Bob -0.219182  ...    Alice -0.414445 -0.100298
2000-01-01 00:02:00   927  Alice  0.660908  ...  Charlie -0.325838  0.581859
2000-01-01 00:03:00   997    Bob -0.852458  ...      Bob  0.992033 -0.686692
2000-01-01 00:04:00   965    Bob  0.717283  ...  Charlie -0.924556 -0.184161

[5 rows x 40 columns]

In [7]: ts_wide.to_parquet("timeseries_wide.parquet")

必要な列を読み込むには、2 つの方法があります。方法 1 はすべてのデータを読み込んでから、必要なものだけをフィルタリングする方法です。

In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]

In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[9]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

方法 2 は、要求した列のみを読み込む方法です。

In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[10]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

2 つの呼び出しのメモリ使用量を測定すると、columns を指定した場合、このケースでは約 1/10 のメモリを使用することがわかります。

pandas.read_csv() を使用すると、usecols を指定して、メモリに読み込む列を制限できます。pandas で読み取ることができるすべてのファイル形式が、列のサブセットを読み取るオプションを提供するわけではありません。

効率的なデータ型を使用する#

デフォルトの pandas データ型は、最もメモリ効率の良いものではありません。これは、比較的少ない一意の値を持つテキストデータ列(一般的に「低カーディナリティ」データと呼ばれます)の場合に特に当てはまります。より効率的なデータ型を使用することで、メモリに格納できるデータセットのサイズを大きくすることができます。

In [11]: ts = make_timeseries(freq="30s", seed=0)

In [12]: ts.to_parquet("timeseries.parquet")

In [13]: ts = pd.read_parquet("timeseries.parquet")

In [14]: ts
Out[14]: 
                       id     name         x         y
timestamp                                             
2000-01-01 00:00:00  1041    Alice  0.889987  0.281011
2000-01-01 00:00:30   988      Bob -0.455299  0.488153
2000-01-01 00:01:00  1018    Alice  0.096061  0.580473
2000-01-01 00:01:30   992      Bob  0.142482  0.041665
2000-01-01 00:02:00   960      Bob -0.036235  0.802159
...                   ...      ...       ...       ...
2000-12-30 23:58:00  1022    Alice  0.266191  0.875579
2000-12-30 23:58:30   974    Alice -0.009826  0.413686
2000-12-30 23:59:00  1028  Charlie  0.307108 -0.656789
2000-12-30 23:59:30  1002    Alice  0.202602  0.541335
2000-12-31 00:00:00   987    Alice  0.200832  0.615972

[1051201 rows x 4 columns]

それでは、データ型とメモリ使用量を調べて、注意を払うべき場所を確認しましょう。

In [15]: ts.dtypes
Out[15]: 
id        int64
name     object
x       float64
y       float64
dtype: object
In [16]: ts.memory_usage(deep=True)  # memory usage in bytes
Out[16]: 
Index     8409608
id        8409608
name     65176434
x         8409608
y         8409608
dtype: int64

name 列は、他のどの列よりも多くのメモリを消費しています。一意の値がわずかしかないため、pandas.Categorical に変換するのに適しています。pandas.Categorical を使用すると、それぞれの一意の名前を一度だけ格納し、メモリ効率の良い整数を用いて、各行で使用されている特定の名前を識別します。

In [17]: ts2 = ts.copy()

In [18]: ts2["name"] = ts2["name"].astype("category")

In [19]: ts2.memory_usage(deep=True)
Out[19]: 
Index    8409608
id       8409608
name     1051495
x        8409608
y        8409608
dtype: int64

pandas.to_numeric() を使用して、数値列を最小の型にダウンキャストすることもできます。

In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")

In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")

In [22]: ts2.dtypes
Out[22]: 
id        uint16
name    category
x        float32
y        float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]: 
Index    8409608
id       2102402
name     1051495
x        4204804
y        4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()

In [25]: print(f"{reduction:0.2f}")
0.20

全体として、このデータセットのメモリ内フットプリントを元のサイズの 1/5 に削減しました。

カテゴリカルデータpandas.Categoricalの詳細、およびdtypesでpandasのすべてのdtypesの概要を参照してください。

チャンクを使用する#

いくつかのワークロードは、大きな問題を小さな問題の集まりに分割することで、チャンク化によって達成できます。たとえば、個々の CSV ファイルを Parquet ファイルに変換し、ディレクトリの各ファイルに対してそれを繰り返します。各チャンクがメモリに収まる限り、メモリよりもはるかに大きいデータセットを処理できます。

注記

チャンク化は、実行する操作にチャンク間の調整がゼロまたは最小限で済む場合に効果を発揮します。より複雑なワークフローの場合、別のライブラリを使用する方が良いでしょう。

ディスク上のさらに大きな「論理データセット」が、Parquet ファイルのディレクトリであるとします。ディレクトリの各ファイルは、データセット全体の異なる年を表します。

In [26]: import pathlib

In [27]: N = 12

In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)

In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
   ....:     ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
   ....:     ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
   ....: 
data
└── timeseries
    ├── ts-00.parquet
    ├── ts-01.parquet
    ├── ts-02.parquet
    ├── ts-03.parquet
    ├── ts-04.parquet
    ├── ts-05.parquet
    ├── ts-06.parquet
    ├── ts-07.parquet
    ├── ts-08.parquet
    ├── ts-09.parquet
    ├── ts-10.parquet
    └── ts-11.parquet

ここで、アウトオブコアpandas.Series.value_counts()を実装します。このワークフローのピークメモリ使用量は、単一の最大のチャンクと、この時点までの固有値の数を格納する小さなシリーズです。個々のファイルがメモリに収まる限り、これは任意のサイズのデータセットで機能します。

In [32]: %%time
   ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
   ....: counts = pd.Series(dtype=int)
   ....: for path in files:
   ....:     df = pd.read_parquet(path)
   ....:     counts = counts.add(df["name"].value_counts(), fill_value=0)
   ....: counts.astype(int)
   ....: 
CPU times: user 744 ms, sys: 28.7 ms, total: 773 ms
Wall time: 551 ms
Out[32]: 
name
Alice      1994645
Bob        1993692
Charlie    1994875
dtype: int64

pandas.read_csv()などのいくつかのリーダーは、単一のファイルを読み取るときのchunksizeを制御するためのパラメータを提供します。

手動によるチャンク化は、あまり洗練された操作を必要としないワークフローには適切なオプションです。pandas.DataFrame.groupby()などの一部の操作は、チャンク単位で行うことがはるかに困難です。このような場合、これらのアウトオブコアアルゴリズムを自分で実装する別のライブラリに切り替える方が良いかもしれません。

Dask を使用する#

pandas は DataFrame API を提供するライブラリの1つに過ぎません。その人気のために、pandas の API は他のライブラリが実装する標準のようなものになっています。pandas のドキュメントでは、エコシステムページで DataFrame API を実装するライブラリのリストを維持しています。

たとえば、並列コンピューティングライブラリであるDaskには、メモリよりも大きいデータセットを並列処理するための pandas に似た API であるdask.dataframeがあります。Dask は、単一のマシン上の複数のスレッドまたはプロセス、またはマシンのクラスタを使用して、データを並列処理できます。

dask.dataframeをインポートすると、API は pandas と似ていることがわかります。read_parquet関数を使用できますが、読み込むファイルのglob文字列を提供します。

In [33]: import dask.dataframe as dd

In [34]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")

In [35]: ddf
Out[35]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  string  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

ddfオブジェクトを調べると、いくつかの点に気付きます。

  • .columns.dtypesのような、よく知られた属性があります。

  • .groupby.sumなど、よく知られたメソッドがあります。

  • .npartitions.divisionsのような、新しい属性があります。

パーティションとディビジョンは、Dask が計算を並列化する仕組みです。**Dask** DataFrame は、多くの pandas pandas.DataFrameで構成されています。Dask DataFrame で単一のメソッド呼び出しを行うと、多くの pandas メソッド呼び出しが行われることになり、Dask はすべてを調整して結果を取得する方法を知っています。

In [36]: ddf.columns
Out[36]: Index(['id', 'name', 'x', 'y'], dtype='object')

In [37]: ddf.dtypes
Out[37]: 
id                int64
name    string[pyarrow]
x               float64
y               float64
dtype: object

In [38]: ddf.npartitions
Out[38]: 12

主な違いの1つは、dask.dataframe API は遅延評価であることです。上記の repr を見ると、値が実際には出力されていないことに気付くでしょう。列名と dtypes だけです。これは、Dask がまだデータを読み込んでいないためです。すぐに実行するのではなく、操作を行うことで**タスクグラフ**が構築されます。

In [39]: ddf
Out[39]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  string  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

In [40]: ddf["name"]
Out[40]: 
Dask Series Structure:
npartitions=12
    string
       ...
     ...  
       ...
       ...
Name: name, dtype: string
Dask Name: getitem, 2 graph layers

In [41]: ddf["name"].value_counts()
Out[41]: 
Dask Series Structure:
npartitions=1
    int64[pyarrow]
               ...
Name: count, dtype: int64[pyarrow]
Dask Name: value-counts-agg, 4 graph layers

これらの呼び出しはそれぞれ瞬時に行われるため、結果はまだ計算されていません。誰かが結果を必要とするまで、行う計算のリストを構築しているだけです。Dask は、pandas.Series.value_countsの戻り値の型が、特定の dtype と特定の名前を持つ pandas pandas.Seriesであることを知っています。そのため、Dask のバージョンは同じ dtype と同じ名前を持つ Dask Series を返します。

実際の結果を取得するには、.compute()を呼び出すことができます。

In [42]: %time ddf["name"].value_counts().compute()
CPU times: user 538 ms, sys: 45.8 ms, total: 584 ms
Wall time: 165 ms
Out[42]: 
name
Charlie    1994875
Alice      1994645
Bob        1993692
Name: count, dtype: int64[pyarrow]

その時点で、pandas を使用した場合と同じものが返されます。この場合、各 name のカウントを含む具体的な pandas pandas.Seriesです。

.computeを呼び出すと、完全なタスクグラフが実行されます。これには、データの読み取り、列の選択、value_countsの実行が含まれます。実行は可能な限り並列で行われ、Dask は全体的なメモリフットプリントを小さくしようとします。各パーティション(通常の pandas pandas.DataFrame)がメモリに収まる限り、メモリよりもはるかに大きいデータセットを処理できます。

デフォルトでは、dask.dataframe操作はスレッドプールを使用して操作を並列処理します。多くのマシンで作業を分散するために、クラスタに接続することもできます。この場合、この単一のマシン上のいくつかのプロセスで構成されるローカル「クラスタ」に接続します。

>>> from dask.distributed import Client, LocalCluster

>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>

このclientが作成されると、Daskの計算はすべてクラスタ上で行われます(この場合は単なるプロセスです)。

Daskはpandas APIで最も使用される部分を実装しています。例えば、おなじみのgroupby集計を行うことができます。

In [43]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
CPU times: user 1.04 s, sys: 66.7 ms, total: 1.1 s
Wall time: 319 ms
Out[43]: 
                x         y
name                       
Alice   -0.000224 -0.000194
Bob     -0.000746  0.000349
Charlie  0.000604  0.000250

グループ化と集計は、アウトオブコアで並列に行われます。

Daskはデータセットのdivisionsを認識すると、特定の最適化が可能です。daskによって書き込まれたparquetデータセットを読み込む場合、divisionsは自動的に認識されます。この場合、parquetファイルを手動で作成したため、divisionsを手動で指定する必要があります。

In [44]: N = 12

In [45]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [46]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [47]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)

In [48]: ddf.divisions = divisions

In [49]: ddf
Out[49]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
2000-01-01      int64  string  float64  float64
2001-01-01        ...     ...      ...      ...
...               ...     ...      ...      ...
2011-01-01        ...     ...      ...      ...
2011-12-13        ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

これで、.locを使った高速なランダムアクセスなどが可能になります。

In [50]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
Out[50]: 
                       id     name         x         y
timestamp                                             
2002-01-01 12:01:00   971      Bob -0.659481  0.556184
2002-01-01 12:02:00  1015  Charlie  0.120131 -0.609522
2002-01-01 12:03:00   991      Bob -0.357816  0.811362
2002-01-01 12:04:00   984    Alice -0.608760  0.034187
2002-01-01 12:05:00   998  Charlie  0.551662 -0.461972

Daskは、2002年の値を選択するために3番目のパーティションだけを見れば良いことを認識しています。他のデータを見る必要はありません。

多くのワークフローでは大量のデータが関与し、メモリに収まるサイズに縮小するような方法で処理されます。この場合、日次頻度でリサンプリングし、平均を取ります。平均を取ると、結果がメモリに収まることがわかっているので、メモリ不足になることなくcomputeを安全に呼び出すことができます。その時点では、通常のpandasオブジェクトになります。

In [51]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
Out[51]: <Axes: xlabel='timestamp'>
../_images/dask_resample.png

これらのDaskの例はすべて、単一マシン上の複数のプロセスを使用して行われています。Daskはクラスタ上に展開して、さらに大規模なデータセットにも対応できます。

さらに多くのdaskの例はhttps://examples.dask.orgをご覧ください。