dask.dataframe.Series.map_overlap
- Series.map_overlap(func, before, after, *args, **kwargs)
Apply a function to each partition, sharing rows with adjacent partitions.
This can be useful for implementing windowing functions such as
df.rolling(...).mean()ordf.diff().- Parameters
- funcfunction
Function applied to each partition.
- beforeint
The number of rows to prepend to partition
ifrom the end of partitioni - 1.- afterint
The number of rows to append to partition
ifrom the beginning of partitioni + 1.- args, kwargs
Arguments and keywords to pass to the function. The partition will be the first argument, and these will be passed after.
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrameorpd.Seriesthat matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame, adictof{name: dtype}or iterable of(name, dtype)can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmetais recommended. For more information, seedask.dataframe.utils.make_meta.
Notes
Given positive integers
beforeandafter, and a functionfunc,map_overlapdoes the following:Prepend
beforerows to each partitionifrom the end of partitioni - 1. The first partition has no rows prepended.Append
afterrows to each partitionifrom the beginning of partitioni + 1. The last partition has no rows appended.Apply
functo each partition, passing in any extraargsandkwargsif provided.Trim
beforerows from the beginning of all but the first partition.Trim
afterrows from the end of all but the last partition.
Note that the index and divisions are assumed to remain unchanged.
Examples
Given a DataFrame, Series, or Index, such as:
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
A rolling sum with a trailing moving window of size 2 can be computed by overlapping 2 rows before each partition, and then mapping calls to
df.rolling(2).sum():>>> ddf.compute() x y 0 1 1.0 1 2 2.0 2 4 3.0 3 7 4.0 4 11 5.0 >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() x y 0 NaN NaN 1 3.0 3.0 2 6.0 5.0 3 11.0 7.0 4 18.0 9.0
The pandas
diffmethod computes a discrete difference shifted by a number of periods (can be positive or negative). This can be implemented by mapping calls todf.diffto each partition after prepending/appending that many rows, depending on sign:>>> def diff(df, periods=1): ... before, after = (periods, 0) if periods > 0 else (0, -periods) ... return df.map_overlap(lambda df, periods=1: df.diff(periods), ... periods, 0, periods=periods) >>> diff(ddf, 1).compute() x y 0 NaN NaN 1 1.0 1.0 2 2.0 1.0 3 3.0 1.0 4 4.0 1.0
If you have a
DatetimeIndex, you can use apd.Timedeltafor time- based windows.>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10)) >>> dts = dd.from_pandas(ts, npartitions=2) >>> dts.map_overlap(lambda df: df.rolling('2D').sum(), ... pd.Timedelta('2D'), 0).compute() 2017-01-01 0.0 2017-01-02 1.0 2017-01-03 3.0 2017-01-04 5.0 2017-01-05 7.0 2017-01-06 9.0 2017-01-07 11.0 2017-01-08 13.0 2017-01-09 15.0 2017-01-10 17.0 Freq: D, dtype: float64