Scaling to large datasets#
pandas provides data structures for in-memory analytics, which makes using pandas to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets that are a sizable fraction of memory become unwieldy, as some pandas operations need to make intermediate copies.
This document provides a few recommendations for scaling your analysis to larger datasets. It’s a complement to Enhancing performance, which focuses on speeding up analysis for datasets that fit in memory.
Load less data#
Suppose our raw dataset on disk has many columns.
In [1]: import pandas as pd
In [2]: import numpy as np
In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
...: n = len(index)
...: state = np.random.RandomState(seed)
...: columns = {
...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
...: "id": state.poisson(1000, size=n),
...: "x": state.rand(n) * 2 - 1,
...: "y": state.rand(n) * 2 - 1,
...: }
...: df = pd.DataFrame(columns, index=index, columns=sorted(columns))
...: if df.index[-1] == end:
...: df = df.iloc[:-1]
...: return df
...:
In [4]: timeseries = [
...: make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}")
...: for i in range(10)
...: ]
...:
In [5]: ts_wide = pd.concat(timeseries, axis=1)
In [6]: ts_wide.head()
Out[6]:
id_0 name_0 x_0 ... name_9 x_9 y_9
timestamp ...
2000-01-01 00:00:00 977 Alice -0.821225 ... Charlie -0.957208 -0.757508
2000-01-01 00:01:00 1018 Bob -0.219182 ... Alice -0.414445 -0.100298
2000-01-01 00:02:00 927 Alice 0.660908 ... Charlie -0.325838 0.581859
2000-01-01 00:03:00 997 Bob -0.852458 ... Bob 0.992033 -0.686692
2000-01-01 00:04:00 965 Bob 0.717283 ... Charlie -0.924556 -0.184161
[5 rows x 40 columns]
In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
---------------------------------------------------------------------------
ArrowNotImplementedError Traceback (most recent call last)
Cell In[7], line 1
----> 1 ts_wide.to_parquet("timeseries_wide.parquet")
File /usr/lib/python3/dist-packages/pandas/util/_decorators.py:333, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
327 if len(args) > num_allow_args:
328 warnings.warn(
329 msg.format(arguments=_format_argument_list(allow_args)),
330 FutureWarning,
331 stacklevel=find_stack_level(),
332 )
--> 333 return func(*args, **kwargs)
File /usr/lib/python3/dist-packages/pandas/core/frame.py:3113, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
3032 """
3033 Write a DataFrame to the binary parquet format.
3034
(...)
3109 >>> content = f.read()
3110 """
3111 from pandas.io.parquet import to_parquet
-> 3113 return to_parquet(
3114 self,
3115 path,
3116 engine,
3117 compression=compression,
3118 index=index,
3119 partition_cols=partition_cols,
3120 storage_options=storage_options,
3121 **kwargs,
3122 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:480, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
476 impl = get_engine(engine)
478 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
--> 480 impl.write(
481 df,
482 path_or_buf,
483 compression=compression,
484 index=index,
485 partition_cols=partition_cols,
486 storage_options=storage_options,
487 filesystem=filesystem,
488 **kwargs,
489 )
491 if path is None:
492 assert isinstance(path_or_buf, io.BytesIO)
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:228, in PyArrowImpl.write(self, df, path, compression, index, storage_options, partition_cols, filesystem, **kwargs)
218 self.api.parquet.write_to_dataset(
219 table,
220 path_or_handle,
(...)
224 **kwargs,
225 )
226 else:
227 # write to single output file
--> 228 self.api.parquet.write_table(
229 table,
230 path_or_handle,
231 compression=compression,
232 filesystem=filesystem,
233 **kwargs,
234 )
235 finally:
236 if handles is not None:
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1908, in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, column_encoding, data_page_version, use_compliant_nested_type, encryption_properties, write_batch_size, dictionary_pagesize_limit, store_schema, write_page_index, write_page_checksum, sorting_columns, **kwargs)
1882 try:
1883 with ParquetWriter(
1884 where, table.schema,
1885 filesystem=filesystem,
(...)
1906 sorting_columns=sorting_columns,
1907 **kwargs) as writer:
-> 1908 writer.write_table(table, row_group_size=row_group_size)
1909 except Exception:
1910 if _is_path_like(where):
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1104, in ParquetWriter.write_table(self, table, row_group_size)
1099 msg = ('Table schema does not match schema used to create file: '
1100 '\ntable:\n{!s} vs. \nfile:\n{!s}'
1101 .format(table.schema, self.schema))
1102 raise ValueError(msg)
-> 1104 self.writer.write_table(table, row_group_size=row_group_size)
File /usr/lib/python3/dist-packages/pyarrow/_parquet.pyx:2180, in pyarrow._parquet.ParquetWriter.write_table()
File /usr/lib/python3/dist-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
ArrowNotImplementedError: Support for codec 'snappy' not built
To load the columns we want, we have two options. Option 1 loads in all the data and then filters to what we need.
In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]
In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[9], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet")[columns]
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:667, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
664 use_nullable_dtypes = False
665 check_dtype_backend(dtype_backend)
--> 667 return impl.read(
668 path,
669 columns=columns,
670 filters=filters,
671 storage_options=storage_options,
672 use_nullable_dtypes=use_nullable_dtypes,
673 dtype_backend=dtype_backend,
674 filesystem=filesystem,
675 **kwargs,
676 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:267, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
264 if manager == "array":
265 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
--> 267 path_or_handle, handles, filesystem = _get_path_or_handle(
268 path,
269 filesystem,
270 storage_options=storage_options,
271 mode="rb",
272 )
273 try:
274 pa_table = self.api.parquet.read_table(
275 path_or_handle,
276 columns=columns,
(...)
279 **kwargs,
280 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:140, in _get_path_or_handle(path, fs, storage_options, mode, is_dir)
130 handles = None
131 if (
132 not fs
133 and not is_dir
(...)
138 # fsspec resources can also point to directories
139 # this branch is used for example when reading from non-fsspec URLs
--> 140 handles = get_handle(
141 path_or_handle, mode, is_text=False, storage_options=storage_options
142 )
143 fs = None
144 path_or_handle = handles.handle
File /usr/lib/python3/dist-packages/pandas/io/common.py:882, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
873 handle = open(
874 handle,
875 ioargs.mode,
(...)
878 newline="",
879 )
880 else:
881 # Binary mode
--> 882 handle = open(handle, ioargs.mode)
883 handles.append(handle)
885 # Convert BytesIO or file objects passed with an encoding
FileNotFoundError: [Errno 2] No such file or directory: 'timeseries_wide.parquet'
Option 2 only loads the columns we request.
In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[10], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet", columns=columns)
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:667, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
664 use_nullable_dtypes = False
665 check_dtype_backend(dtype_backend)
--> 667 return impl.read(
668 path,
669 columns=columns,
670 filters=filters,
671 storage_options=storage_options,
672 use_nullable_dtypes=use_nullable_dtypes,
673 dtype_backend=dtype_backend,
674 filesystem=filesystem,
675 **kwargs,
676 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:267, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
264 if manager == "array":
265 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
--> 267 path_or_handle, handles, filesystem = _get_path_or_handle(
268 path,
269 filesystem,
270 storage_options=storage_options,
271 mode="rb",
272 )
273 try:
274 pa_table = self.api.parquet.read_table(
275 path_or_handle,
276 columns=columns,
(...)
279 **kwargs,
280 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:140, in _get_path_or_handle(path, fs, storage_options, mode, is_dir)
130 handles = None
131 if (
132 not fs
133 and not is_dir
(...)
138 # fsspec resources can also point to directories
139 # this branch is used for example when reading from non-fsspec URLs
--> 140 handles = get_handle(
141 path_or_handle, mode, is_text=False, storage_options=storage_options
142 )
143 fs = None
144 path_or_handle = handles.handle
File /usr/lib/python3/dist-packages/pandas/io/common.py:882, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
873 handle = open(
874 handle,
875 ioargs.mode,
(...)
878 newline="",
879 )
880 else:
881 # Binary mode
--> 882 handle = open(handle, ioargs.mode)
883 handles.append(handle)
885 # Convert BytesIO or file objects passed with an encoding
FileNotFoundError: [Errno 2] No such file or directory: 'timeseries_wide.parquet'
If we were to measure the memory usage of the two calls, we’d see that specifying
columns
uses about 1/10th the memory in this case.
With pandas.read_csv()
, you can specify usecols
to limit the columns
read into memory. Not all file formats that can be read by pandas provide an option
to read a subset of columns.
Use efficient datatypes#
The default pandas data types are not the most memory efficient. This is especially true for text data columns with relatively few unique values (commonly referred to as “low-cardinality” data). By using more efficient data types, you can store larger datasets in memory.
In [11]: ts = make_timeseries(freq="30s", seed=0)
In [12]: ts.to_parquet("timeseries.parquet")
---------------------------------------------------------------------------
ArrowNotImplementedError Traceback (most recent call last)
Cell In[12], line 1
----> 1 ts.to_parquet("timeseries.parquet")
File /usr/lib/python3/dist-packages/pandas/util/_decorators.py:333, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
327 if len(args) > num_allow_args:
328 warnings.warn(
329 msg.format(arguments=_format_argument_list(allow_args)),
330 FutureWarning,
331 stacklevel=find_stack_level(),
332 )
--> 333 return func(*args, **kwargs)
File /usr/lib/python3/dist-packages/pandas/core/frame.py:3113, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
3032 """
3033 Write a DataFrame to the binary parquet format.
3034
(...)
3109 >>> content = f.read()
3110 """
3111 from pandas.io.parquet import to_parquet
-> 3113 return to_parquet(
3114 self,
3115 path,
3116 engine,
3117 compression=compression,
3118 index=index,
3119 partition_cols=partition_cols,
3120 storage_options=storage_options,
3121 **kwargs,
3122 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:480, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
476 impl = get_engine(engine)
478 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
--> 480 impl.write(
481 df,
482 path_or_buf,
483 compression=compression,
484 index=index,
485 partition_cols=partition_cols,
486 storage_options=storage_options,
487 filesystem=filesystem,
488 **kwargs,
489 )
491 if path is None:
492 assert isinstance(path_or_buf, io.BytesIO)
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:228, in PyArrowImpl.write(self, df, path, compression, index, storage_options, partition_cols, filesystem, **kwargs)
218 self.api.parquet.write_to_dataset(
219 table,
220 path_or_handle,
(...)
224 **kwargs,
225 )
226 else:
227 # write to single output file
--> 228 self.api.parquet.write_table(
229 table,
230 path_or_handle,
231 compression=compression,
232 filesystem=filesystem,
233 **kwargs,
234 )
235 finally:
236 if handles is not None:
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1908, in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, column_encoding, data_page_version, use_compliant_nested_type, encryption_properties, write_batch_size, dictionary_pagesize_limit, store_schema, write_page_index, write_page_checksum, sorting_columns, **kwargs)
1882 try:
1883 with ParquetWriter(
1884 where, table.schema,
1885 filesystem=filesystem,
(...)
1906 sorting_columns=sorting_columns,
1907 **kwargs) as writer:
-> 1908 writer.write_table(table, row_group_size=row_group_size)
1909 except Exception:
1910 if _is_path_like(where):
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1104, in ParquetWriter.write_table(self, table, row_group_size)
1099 msg = ('Table schema does not match schema used to create file: '
1100 '\ntable:\n{!s} vs. \nfile:\n{!s}'
1101 .format(table.schema, self.schema))
1102 raise ValueError(msg)
-> 1104 self.writer.write_table(table, row_group_size=row_group_size)
File /usr/lib/python3/dist-packages/pyarrow/_parquet.pyx:2180, in pyarrow._parquet.ParquetWriter.write_table()
File /usr/lib/python3/dist-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
ArrowNotImplementedError: Support for codec 'snappy' not built
In [13]: ts = pd.read_parquet("timeseries.parquet")
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[13], line 1
----> 1 ts = pd.read_parquet("timeseries.parquet")
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:667, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
664 use_nullable_dtypes = False
665 check_dtype_backend(dtype_backend)
--> 667 return impl.read(
668 path,
669 columns=columns,
670 filters=filters,
671 storage_options=storage_options,
672 use_nullable_dtypes=use_nullable_dtypes,
673 dtype_backend=dtype_backend,
674 filesystem=filesystem,
675 **kwargs,
676 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:267, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
264 if manager == "array":
265 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
--> 267 path_or_handle, handles, filesystem = _get_path_or_handle(
268 path,
269 filesystem,
270 storage_options=storage_options,
271 mode="rb",
272 )
273 try:
274 pa_table = self.api.parquet.read_table(
275 path_or_handle,
276 columns=columns,
(...)
279 **kwargs,
280 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:140, in _get_path_or_handle(path, fs, storage_options, mode, is_dir)
130 handles = None
131 if (
132 not fs
133 and not is_dir
(...)
138 # fsspec resources can also point to directories
139 # this branch is used for example when reading from non-fsspec URLs
--> 140 handles = get_handle(
141 path_or_handle, mode, is_text=False, storage_options=storage_options
142 )
143 fs = None
144 path_or_handle = handles.handle
File /usr/lib/python3/dist-packages/pandas/io/common.py:882, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
873 handle = open(
874 handle,
875 ioargs.mode,
(...)
878 newline="",
879 )
880 else:
881 # Binary mode
--> 882 handle = open(handle, ioargs.mode)
883 handles.append(handle)
885 # Convert BytesIO or file objects passed with an encoding
FileNotFoundError: [Errno 2] No such file or directory: 'timeseries.parquet'
In [14]: ts
Out[14]:
id name x y
timestamp
2000-01-01 00:00:00 1041 Alice 0.889987 0.281011
2000-01-01 00:00:30 988 Bob -0.455299 0.488153
2000-01-01 00:01:00 1018 Alice 0.096061 0.580473
2000-01-01 00:01:30 992 Bob 0.142482 0.041665
2000-01-01 00:02:00 960 Bob -0.036235 0.802159
... ... ... ... ...
2000-12-30 23:58:00 1022 Alice 0.266191 0.875579
2000-12-30 23:58:30 974 Alice -0.009826 0.413686
2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789
2000-12-30 23:59:30 1002 Alice 0.202602 0.541335
2000-12-31 00:00:00 987 Alice 0.200832 0.615972
[1051201 rows x 4 columns]
Now, let’s inspect the data types and memory usage to see where we should focus our attention.
In [15]: ts.dtypes
Out[15]:
id int64
name object
x float64
y float64
dtype: object
In [16]: ts.memory_usage(deep=True) # memory usage in bytes
Out[16]:
Index 8409608
id 8409608
name 65176434
x 8409608
y 8409608
dtype: int64
The name
column is taking up much more memory than any other. It has just a
few unique values, so it’s a good candidate for converting to a
pandas.Categorical
. With a pandas.Categorical
, we store each unique name once and use
space-efficient integers to know which specific name is used in each row.
In [17]: ts2 = ts.copy()
In [18]: ts2["name"] = ts2["name"].astype("category")
In [19]: ts2.memory_usage(deep=True)
Out[19]:
Index 8409608
id 8409608
name 1051495
x 8409608
y 8409608
dtype: int64
We can go a bit further and downcast the numeric columns to their smallest types
using pandas.to_numeric()
.
In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")
In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")
In [22]: ts2.dtypes
Out[22]:
id uint16
name category
x float32
y float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]:
Index 8409608
id 2102402
name 1051495
x 4204804
y 4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
In [25]: print(f"{reduction:0.2f}")
0.20
In all, we’ve reduced the in-memory footprint of this dataset to 1/5 of its original size.
See Categorical data for more on pandas.Categorical
and dtypes
for an overview of all of pandas’ dtypes.
Use chunking#
Some workloads can be achieved with chunking by splitting a large problem into a bunch of small problems. For example, converting an individual CSV file into a Parquet file and repeating that for each file in a directory. As long as each chunk fits in memory, you can work with datasets that are much larger than memory.
Note
Chunking works well when the operation you’re performing requires zero or minimal coordination between chunks. For more complicated workflows, you’re better off using other libraries.
Suppose we have an even larger “logical dataset” on disk that’s a directory of parquet files. Each file in the directory represents a different year of the entire dataset.
In [26]: import pathlib
In [27]: N = 12
In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)
In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
....: ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
....:
---------------------------------------------------------------------------
ArrowNotImplementedError Traceback (most recent call last)
Cell In[31], line 3
1 for i, (start, end) in enumerate(zip(starts, ends)):
2 ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
----> 3 ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
File /usr/lib/python3/dist-packages/pandas/util/_decorators.py:333, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
327 if len(args) > num_allow_args:
328 warnings.warn(
329 msg.format(arguments=_format_argument_list(allow_args)),
330 FutureWarning,
331 stacklevel=find_stack_level(),
332 )
--> 333 return func(*args, **kwargs)
File /usr/lib/python3/dist-packages/pandas/core/frame.py:3113, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
3032 """
3033 Write a DataFrame to the binary parquet format.
3034
(...)
3109 >>> content = f.read()
3110 """
3111 from pandas.io.parquet import to_parquet
-> 3113 return to_parquet(
3114 self,
3115 path,
3116 engine,
3117 compression=compression,
3118 index=index,
3119 partition_cols=partition_cols,
3120 storage_options=storage_options,
3121 **kwargs,
3122 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:480, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
476 impl = get_engine(engine)
478 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
--> 480 impl.write(
481 df,
482 path_or_buf,
483 compression=compression,
484 index=index,
485 partition_cols=partition_cols,
486 storage_options=storage_options,
487 filesystem=filesystem,
488 **kwargs,
489 )
491 if path is None:
492 assert isinstance(path_or_buf, io.BytesIO)
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:228, in PyArrowImpl.write(self, df, path, compression, index, storage_options, partition_cols, filesystem, **kwargs)
218 self.api.parquet.write_to_dataset(
219 table,
220 path_or_handle,
(...)
224 **kwargs,
225 )
226 else:
227 # write to single output file
--> 228 self.api.parquet.write_table(
229 table,
230 path_or_handle,
231 compression=compression,
232 filesystem=filesystem,
233 **kwargs,
234 )
235 finally:
236 if handles is not None:
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1908, in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, column_encoding, data_page_version, use_compliant_nested_type, encryption_properties, write_batch_size, dictionary_pagesize_limit, store_schema, write_page_index, write_page_checksum, sorting_columns, **kwargs)
1882 try:
1883 with ParquetWriter(
1884 where, table.schema,
1885 filesystem=filesystem,
(...)
1906 sorting_columns=sorting_columns,
1907 **kwargs) as writer:
-> 1908 writer.write_table(table, row_group_size=row_group_size)
1909 except Exception:
1910 if _is_path_like(where):
File /usr/lib/python3/dist-packages/pyarrow/parquet/core.py:1104, in ParquetWriter.write_table(self, table, row_group_size)
1099 msg = ('Table schema does not match schema used to create file: '
1100 '\ntable:\n{!s} vs. \nfile:\n{!s}'
1101 .format(table.schema, self.schema))
1102 raise ValueError(msg)
-> 1104 self.writer.write_table(table, row_group_size=row_group_size)
File /usr/lib/python3/dist-packages/pyarrow/_parquet.pyx:2180, in pyarrow._parquet.ParquetWriter.write_table()
File /usr/lib/python3/dist-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
ArrowNotImplementedError: Support for codec 'snappy' not built
data
└── timeseries
├── ts-00.parquet
├── ts-01.parquet
├── ts-02.parquet
├── ts-03.parquet
├── ts-04.parquet
├── ts-05.parquet
├── ts-06.parquet
├── ts-07.parquet
├── ts-08.parquet
├── ts-09.parquet
├── ts-10.parquet
└── ts-11.parquet
Now we’ll implement an out-of-core pandas.Series.value_counts()
. The peak memory usage of this
workflow is the single largest chunk, plus a small series storing the unique value
counts up to this point. As long as each individual file fits in memory, this will
work for arbitrary-sized datasets.
In [32]: %%time
....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
....: counts = pd.Series(dtype=int)
....: for path in files:
....: df = pd.read_parquet(path)
....: counts = counts.add(df["name"].value_counts(), fill_value=0)
....: counts.astype(int)
....:
CPU times: user 499 us, sys: 82 us, total: 581 us
Wall time: 586 us
Out[32]: Series([], dtype: int64)
Some readers, like pandas.read_csv()
, offer parameters to control the
chunksize
when reading a single file.
Manually chunking is an OK option for workflows that don’t
require too sophisticated of operations. Some operations, like pandas.DataFrame.groupby()
, are
much harder to do chunkwise. In these cases, you may be better switching to a
different library that implements these out-of-core algorithms for you.
Use Other Libraries#
There are other libraries which provide similar APIs to pandas and work nicely with pandas DataFrame, and can give you the ability to scale your large dataset processing and analytics by parallel runtime, distributed memory, clustering, etc. You can find more information in the ecosystem page.