Skip to content

TimeSeriesDataSet

deepts_forecasting.utils.data.dataset.TimeSeriesDataSet (Dataset)

Source code in deepts_forecasting\utils\data\dataset.py
class TimeSeriesDataSet(Dataset):
    def __init__(
        self,
        data: pd.DataFrame,
        time_idx: Union[str, List[str]],
        target: Union[str, List[str]],
        group_ids: Union[str, List[str]],
        max_encoder_length: int = 30,
        min_encoder_length: int = None,
        min_prediction_idx: int = None,
        max_prediction_length: int = 1,
        min_prediction_length: int = None,
        static_categoricals: Union[str, List[str]] = [],
        static_reals: Union[str, List[str]] = [],
        time_varying_known_categoricals: Union[str, List[str]] = [],
        time_varying_known_reals: Union[str, List[str]] = [],
        time_varying_unknown_categoricals: Union[str, List[str]] = [],
        time_varying_unknown_reals: Union[str, List[str]] = [],
        variable_groups: Dict[str, List[int]] = {},
        lags: Dict[str, List[int]] = {},
        categorical_encoders: Dict[str, BaseEstimator] = {},
        scalers: Dict[str, BaseEstimator] = {},
        target_normalizer: Union[
            NORMALIZER, str, List[NORMALIZER], Tuple[NORMALIZER]
        ] = "auto",
        randomize_length: Union[None, Tuple[float, float], bool] = False,
        predict_mode: bool = False,
    ) -> None:
        """

        Args:
            data (pd.DataFrame): dataframe with sequence data - each row can be identified with
                ``time_idx`` and the ``group_ids``
            time_idx (str): integer column denoting the time index. This columns is used to determine
                the sequence of samples.
            target (Union[str, List[str]]): column denoting the target.
            group_ids (Union[str, List[str]]): list of column names identifying a time series. This means that the ``group_ids``
                identify a sample together with the ``time_idx``. If you have only one timeseries, set this to the
                name of column that is constant.
            max_encoder_length (int): maximum length to encode.
                This is the maximum history length used by the time series dataset.
            min_encoder_length (int): minimum allowed length to encode. Defaults to max_encoder_length.
            min_prediction_idx (int): minimum ``time_idx`` from where to start predictions. This parameter
                can be useful to create a validation or test set.
            max_prediction_length (int): maximum prediction/decoder length.
            min_prediction_length (int): minimum prediction/decoder length. Defaults to max_prediction_length.
            static_categoricals (Union[str, List[str]]): list of categorical variables that do not change over time.
            static_reals (Union[str, List[str]]): list of continuous variables that do not change over time.
            time_varying_known_categoricals (Union[str, List[str]]): list of categorical variables that change over
                time and are known in the future, entries can be also lists which are then encoded together
                (e.g. useful for special days or promotion categories).
            time_varying_known_reals (Union[str, List[str]]): list of continuous variables that change over
                time and are known in the future (e.g. price of a product, but not demand of a product).
            time_varying_unknown_categoricals (Union[str, List[str]]): list of categorical variables that change over
                time and are not known in the future, entries can be also lists which are then encoded together
                (e.g. useful for weather categories). You might want to include your target here.
            time_varying_unknown_reals (Union[str, List[str]]): list of continuous variables that change over
                time and are not known in the future.  You might want to include your target here.
            categorical_encoders (Dict[str, NaNLabelEncoder]): dictionary of scikit learn label transformers.
            scalers (Dict[str, Union[StandardScaler, RobustScaler, TorchNormalizer, EncoderNormalizer]]): dictionary of
                scikit-learn scalers.
        """
        super().__init__()
        self.time_idx = time_idx
        self.target = to_list(target)
        self.group_ids = to_list(group_ids)
        self.max_encoder_length = max_encoder_length
        if min_encoder_length is None:
            min_encoder_length = max_encoder_length
        self.min_encoder_length = min_encoder_length
        if min_prediction_idx is None:
            min_prediction_idx = data[self.time_idx].min()
        self.min_prediction_idx = min_prediction_idx
        self.max_prediction_length = max_prediction_length
        if min_prediction_length is None:
            min_prediction_length = max_prediction_length
        self.min_prediction_length = min_prediction_length
        assert (
            self.min_prediction_length <= self.max_prediction_length
        ), "max prediction length has to be larger equals min prediction length"
        self.static_categoricals = to_list(static_categoricals)
        self.static_reals = to_list(static_reals)
        self.time_varying_known_categoricals = to_list(time_varying_known_categoricals)
        for tar in self.target:
            assert (
                tar not in self.time_varying_known_categoricals
            ), f"target {target} should be an unknown categorical variable in the future"
        self.time_varying_known_reals = to_list(time_varying_known_reals)
        for tar in self.target:
            assert (
                tar not in self.time_varying_known_reals
            ), f"target {target} should be an unknown continuous variable in the future"
        self.time_varying_unknown_categoricals = to_list(
            time_varying_unknown_categoricals
        )
        self.time_varying_unknown_reals = to_list(time_varying_unknown_reals)
        self.variable_groups = {} if len(variable_groups) == 0 else variable_groups
        self.lags = {} if len(lags) == 0 else lags

        # set automatic defaults
        # if isinstance(randomize_length, bool):
        #     if not randomize_length:
        #         randomize_length = None
        #     else:
        #         randomize_length = (0.2, 0.05)
        self.randomize_length = randomize_length
        self.predict_mode = predict_mode
        # initalize encoders for categoricals and scalers for reals
        self.categorical_encoders = (
            {} if len(categorical_encoders) == 0 else categorical_encoders
        )
        self.scalers = {} if len(scalers) == 0 else scalers
        self.target_normalizer = target_normalizer
        # throw out warnings if data does not meet requirements
        self._warning(data)

        # filter data
        if min_prediction_idx is not None:
            data = data[
                lambda x: x[self.time_idx]
                >= self.min_prediction_idx - self.max_encoder_length
            ]
        data = self._sort_data(data, self.group_ids + [self.time_idx])

        # target normalizer
        self._set_target_normalizer(data)
        # preprocessing
        data, scales = self._preprocess_data(data)

        # create index
        self.index = self._construct_index(data, predict_mode=predict_mode)
        #
        # # convert to torch tensor for high performance data loading later
        self.data = self._data_to_tensors(data)

    @property
    def _group_ids_mapping(self):
        """
        Mapping of group id names to group ids used to identify series in dataset -
        group ids can also be used for target normalizer.
        The former can change from training to validation and test dataset while the later must not.
        """
        return {name: f"__group_id__{name}" for name in self.group_ids}

    @property
    def _group_ids(self):
        """
        Group ids used to identify series in dataset.
        """
        return list(self._group_ids_mapping.values())

    def save(self, fname: str) -> None:
        """
        Save dataset to disk

        Args:
            fname (str): filename to save to
        """
        torch.save(self, fname)

    @classmethod
    def load(cls, fname: str):
        """
        Load dataset from disk

        Args:
            fname (str): filename to load from

        Returns:
            TimeSeriesDataSet
        """
        obj = torch.load(fname)
        assert isinstance(obj, cls), f"Loaded file is not of class {cls}"
        return obj

    @property
    def categoricals(self) -> List[str]:
        """
        Categorical variables as used for modelling. Excluding categorical target if classification dataset.

        Returns:
            List[str]: list of variables
        """
        return (
            self.static_categoricals
            + self.time_varying_known_categoricals
            + self.time_varying_unknown_categoricals
        )

    @property
    def flat_categoricals(self) -> List[str]:
        """
        Categorical variables as defined in input data.

        Returns:
            List[str]: list of variables
        """
        categories = []
        for name in self.categoricals:
            if name in self.variable_groups:
                categories.extend(self.variable_groups[name])
            else:
                categories.append(name)
        return categories

    @property
    def variable_to_group_mapping(self) -> Dict[str, str]:
        """
        Mapping from categorical variables to variables in input data.

        Returns:
            Dict[str, str]: dictionary mapping from meth:`~categorical` to meth:`~flat_categoricals`.
        """
        groups = {}
        for group_name, sublist in self.variable_groups.items():
            groups.update({name: group_name for name in sublist})
        return groups

    @property
    def reals(self) -> List[str]:
        """
        Continous variables as used for modelling. Excluding continuous target if regression dataset.

        Returns:
            List[str]: list of variables
        """
        return (
            self.static_reals
            + self.time_varying_known_reals
            + self.time_varying_unknown_reals
        )

    @property
    def time_index(self) -> List[str]:
        return self.time_idx

    @property
    def target_name(self) -> List[str]:
        return self.target

    def _set_target_normalizer(self, data: pd.DataFrame):
        """
        Determine target normalizer.

        Args:
            data (pd.DataFrame): input data
        """
        if isinstance(self.target_normalizer, str) and self.target_normalizer == "auto":
            normalizers = []
            for target in self.target:
                if data[target].dtype.kind != "f":  # category
                    normalizers.append(NaNLabelEncoder())
                    # if self.add_target_scales:
                    #     warnings.warn("Target scales will be only added for continous targets", UserWarning)
                else:
                    data_positive = (data[target] > 0).all()
                    if data_positive:
                        if data[target].skew() > 2.5:
                            transformer = "log"
                        else:
                            transformer = "relu"
                    else:
                        transformer = None
                    if self.max_encoder_length > 20 and self.min_encoder_length > 1:
                        normalizers.append(
                            EncoderNormalizer(transformation=transformer)
                        )
                    else:
                        normalizers.append(GroupNormalizer(transformation=transformer))
            if self.multi_target:
                self.target_normalizer = MultiNormalizer(normalizers)
            else:
                self.target_normalizer = normalizers[0]
        elif isinstance(self.target_normalizer, (tuple, list)):
            self.target_normalizer = MultiNormalizer(self.target_normalizer)
        elif self.target_normalizer is None:
            self.target_normalizer = TorchNormalizer(method="identity")
        assert self.min_encoder_length > 1 or not isinstance(
            self.target_normalizer, EncoderNormalizer
        ), "EncoderNormalizer is only allowed if min_encoder_length > 1"
        assert isinstance(
            self.target_normalizer, (TorchNormalizer, NaNLabelEncoder)
        ), f"target_normalizer has to be either None or of class TorchNormalizer but found {self.target_normalizer}"
        assert not self.multi_target or isinstance(
            self.target_normalizer, MultiNormalizer
        ), (
            "multiple targets / list of targets requires MultiNormalizer as target_normalizer "
            f"but found {self.target_normalizer}"
        )

    @property
    def multi_target(self) -> bool:
        """
        If dataset encodes one or multiple targets.

        Returns:
            bool: true if multiple targets
        """
        return len(self.target) > 1

    def get_parameters(self) -> Dict[str, Any]:
        """
        Get parameters that can be used with meth:`~from_parameters` to create a new dataset with the same scalers.

        Returns:
            Dict[str, Any]: dictionary of parameters
        """
        kwargs = {
            name: getattr(self, name)
            for name in inspect.signature(self.__class__.__init__).parameters.keys()
            if name not in ["data", "self"]
        }
        kwargs["categorical_encoders"] = self.categorical_encoders
        kwargs["scalers"] = self.scalers
        return kwargs

    @classmethod
    def from_dataset(
        cls,
        dataset,
        data: pd.DataFrame,
        stop_randomization: bool = False,
        predict: bool = False,
        **update_kwargs,
    ):
        """
        Generate dataset with different underlying data but same variable encoders and scalers, etc.

        Calls meth:`~from_parameters` under the hood.

        Args:
            dataset (TimeSeriesDataSet): dataset from which to copy parameters
            data (pd.DataFrame): data from which new dataset will be generated
            stop_randomization (bool, optional): If to stop randomizing encoder and decoder lengths,
                e.g. useful for validation set. Defaults to False.
            predict (bool, optional): If to predict the decoder length on the last entries in the
                time index (i.e. one prediction per group only). Defaults to False.
            **update_kwargs: keyword arguments overriding parameters in the original dataset

        Returns:
            TimeSeriesDataSet: new dataset
        """
        return cls.from_parameters(
            dataset.get_parameters(),
            data,
            stop_randomization=stop_randomization,
            predict=predict,
            **update_kwargs,
        )

    @classmethod
    def from_parameters(
        cls,
        parameters: Dict[str, Any],
        data: pd.DataFrame,
        stop_randomization: bool = None,
        predict: bool = False,
        **update_kwargs,
    ):
        """
        Generate dataset with different underlying data but same variable encoders and scalers, etc.

        Args:
            parameters (Dict[str, Any]): dataset parameters which to use for the new dataset
            data (pd.DataFrame): data from which new dataset will be generated
            stop_randomization (bool, optional): If to stop randomizing encoder and decoder lengths,
                e.g. useful for validation set. Defaults to False.
            predict (bool, optional): If to predict the decoder length on the last entries in the
                time index (i.e. one prediction per group only). Defaults to False.
            **kwargs: keyword arguments overriding parameters

        Returns:
            TimeSeriesDataSet: new dataset
        """
        parameters = deepcopy(parameters)
        if predict:
            if stop_randomization is None:
                stop_randomization = True
            elif not stop_randomization:
                warnings.warn(
                    "If predicting, no randomization should be possible - setting stop_randomization=True",
                    UserWarning,
                )
                stop_randomization = True
            parameters["min_prediction_length"] = parameters["max_prediction_length"]
            parameters["predict_mode"] = True
        elif stop_randomization is None:
            stop_randomization = False

        if stop_randomization:
            parameters["randomize_length"] = None
        parameters.update(update_kwargs)

        new = cls(data, **parameters)
        return new

    def _warning(self, data: pd.DataFrame) -> None:
        """
        Check if dataset meets the requirements of TimeSeriesDataSet class. If not, throw out
        warnings.

        e.g. Multiple rows with the same group id and time index exist.

        """
        # check data type

        assert (
            data[self.time_idx].dtypes.kind == "i"
        ), "Timeseries index should be of type integer"

        # if multiple rows with the same group ids and time index exist
        if sum(data.groupby(self.group_ids + [self.time_idx]).size() > 1) > 0:
            raise ValueError(
                "Data Error: Multiple rows with the same group id and time index exist"
            )

        # check for numeric categoricals which can cause hick-ups in logging in tensorboard
        category_columns = data.head(1).select_dtypes("category").columns
        object_columns = data.head(1).select_dtypes(object).columns
        for name in self.flat_categoricals:
            if name not in data.columns:
                raise KeyError(f"variable {name} specified but not found in data")
            if not (
                name in object_columns
                or (
                    name in category_columns
                    and data[name].cat.categories.dtype.kind not in "bifc"
                )
            ):
                raise ValueError(
                    f"Data type of category {name} was found to be numeric - use a string type / categorified string"
                )
        # check for "." in column names
        columns_with_dot = data.columns[data.columns.str.contains(r"\.")]
        if len(columns_with_dot) > 0:
            raise ValueError(
                f"column names must not contain '.' characters. Names {columns_with_dot.tolist()} are invalid"
            )

    def _preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Scale continuous variables, encode categories and set aside target and weight.

        Args:
            data (pd.DataFrame): original data

        Returns:
            pd.DataFrame: pre-processed dataframe

        """
        # filter groups whose size is smaller than requirement
        data = self._filter(data)

        # ensure data order

        data = self._sort_data(data=data, columns=self.group_ids + [self.time_idx])

        # fill missing values with zero. TODO:more methods to fill missing values
        data = fill_missing_values(
            data=data, reals=self.reals, categoricals=self.categoricals, method="zero"
        )

        # step 1
        # encode group ids - this encoding
        for name, group_name in self._group_ids_mapping.items():
            # use existing encoder - but a copy of it not too loose current encodings
            encoder = deepcopy(
                self.categorical_encoders.get(group_name, NaNLabelEncoder())
            )
            self.categorical_encoders[group_name] = encoder.fit(
                data[name].to_numpy().reshape(-1), overwrite=False
            )
            data[group_name] = self.transform_values(
                name, data[name], inverse=False, group_id=True
            )

        # step 2
        # encode categoricals first to ensure that group normalizer for relies on encoded categories
        if isinstance(
            self.target_normalizer, (GroupNormalizer, MultiNormalizer)
        ):  # if we use a group normalizer, group_ids must be encoded as well
            group_ids_to_encode = self.group_ids
        else:
            group_ids_to_encode = []

        for name in dict.fromkeys(group_ids_to_encode + self.categoricals):
            if name in self.variable_groups:  # fit groups
                columns = self.variable_groups[name]
                if name not in self.categorical_encoders:
                    self.categorical_encoders[name] = NaNLabelEncoder().fit(
                        data[columns].to_numpy().reshape(-1)
                    )
                elif self.categorical_encoders[name] is not None:
                    try:
                        check_is_fitted(self.categorical_encoders[name])
                    except NotFittedError:
                        self.categorical_encoders[name] = self.categorical_encoders[
                            name
                        ].fit(data[columns].to_numpy().reshape(-1))
            else:
                if name not in self.categorical_encoders:
                    self.categorical_encoders[name] = LabelEncoder().fit(data[name])
                elif (
                    self.categorical_encoders[name] is not None
                    and name not in self.target
                ):
                    try:
                        check_is_fitted(self.categorical_encoders[name])
                    except NotFittedError:
                        self.categorical_encoders[name] = self.categorical_encoders[
                            name
                        ].fit(data[name])

        # transform them
        for name in dict.fromkeys(group_ids_to_encode + self.flat_categoricals):
            # targets and its lagged versions are handled separetely
            if name not in self.target:
                data[name] = self.transform_values(
                    name,
                    data[name],
                    inverse=False,
                )

        # save special variables
        assert (
            "__time_idx__" not in data.columns
        ), "__time_idx__ is a protected column and must not be present in data"
        data["__time_idx__"] = data[self.time_idx]  # save unscaled
        for target in self.target:
            assert (
                f"__target__{target}" not in data.columns
            ), f"__target__{target} is a protected column and must not be present in data"
            data[f"__target__{target}"] = data[target]

        # step 3
        # encode and  normalizer target
        if self.target_normalizer is not None:

            # fit target normalizer
            try:
                check_is_fitted(self.target_normalizer)
            except NotFittedError:
                if isinstance(self.target_normalizer, EncoderNormalizer):
                    self.target_normalizer.fit(data[self.target])
                elif isinstance(
                    self.target_normalizer, (GroupNormalizer, MultiNormalizer)
                ):
                    self.target_normalizer.fit(data[self.target], data)
                else:
                    self.target_normalizer.fit(data[self.target])

            # transform target
            if isinstance(self.target_normalizer, EncoderNormalizer):
                # we approximate the scales and target transformation by assuming one
                # transformation over the entire time range but by each group
                common_init_args = [
                    name
                    for name in inspect.signature(
                        GroupNormalizer.__init__
                    ).parameters.keys()
                    if name
                    in inspect.signature(EncoderNormalizer.__init__).parameters.keys()
                    and name not in ["data", "self"]
                ]
                copy_kwargs = {
                    name: getattr(self.target_normalizer, name)
                    for name in common_init_args
                }
                normalizer = GroupNormalizer(groups=self.group_ids, **copy_kwargs)
                data[self.target], scales = normalizer.fit_transform(
                    data[self.target], data, return_norm=True
                )

            elif isinstance(self.target_normalizer, GroupNormalizer):
                data[self.target], scales = self.target_normalizer.transform(
                    data[self.target], data, return_norm=True
                )

            elif isinstance(self.target_normalizer, MultiNormalizer):
                transformed, scales = self.target_normalizer.transform(
                    data[self.target], data, return_norm=True
                )

                for idx, target in enumerate(self.target):
                    data[target] = transformed[idx]

                    if isinstance(self.target_normalizer[idx], NaNLabelEncoder):
                        # overwrite target because it requires encoding (continuous targets should not be normalized)
                        data[f"__target__{target}"] = data[target]

            elif isinstance(self.target_normalizer, NaNLabelEncoder):
                data[self.target] = self.target_normalizer.transform(data[self.target])
                # overwrite target because it requires encoding (continuous targets should not be normalized)
                data[f"__target__{self.target}"] = data[self.target]
                scales = None

            else:
                data[self.target], scales = self.target_normalizer.transform(
                    data[self.target], return_norm=True
                )

        # step 4
        # rescale continuous variables apart from target
        for name in self.reals:
            if name in self.target:
                # lagged variables are only transformed - not fitted
                continue
            elif name not in self.scalers:
                self.scalers[name] = StandardScaler().fit(data[[name]])
            elif self.scalers[name] is not None:
                try:
                    check_is_fitted(self.scalers[name])
                except NotFittedError:
                    if isinstance(self.scalers[name], GroupNormalizer):
                        self.scalers[name] = self.scalers[name].fit(data[[name]], data)
                    else:
                        self.scalers[name] = self.scalers[name].fit(data[[name]])

        # transformer them after fitting
        for name in self.reals:
            # targets are handled separately
            transformer = self.get_transformer(name)
            if (
                name not in self.target
                and transformer is not None
                and not isinstance(transformer, EncoderNormalizer)
            ):
                data[name] = self.transform_values(
                    name, data[name], data=data, inverse=False
                )

        return data, scales

    def get_transformer(self, name: str, group_id: bool = False):
        """
        Get transformer for variable.

        Args:
            name (str): variable name
            group_id (bool, optional): If the passed name refers to a group id (different encoders are used for these).
                Defaults to False.

        Returns:
            transformer
        """
        if group_id:
            name = self._group_ids_mapping[name]

        if name in set(self.flat_categoricals + self.group_ids + self._group_ids):
            name = self.variable_to_group_mapping.get(name, name)  # map name to encoder
            transformer = self.categorical_encoders.get(name, None)
            return transformer

        elif name in self.reals:
            # take target normalizer if required
            transformer = self.scalers.get(name, None)
            return transformer
        else:
            return None

    def transform_values(
        self,
        name: str,
        values: Union[pd.Series, torch.Tensor, np.ndarray],
        data: pd.DataFrame = None,
        inverse=False,
        group_id: bool = False,
        **kwargs,
    ) -> np.ndarray:
        """
        Scale and encode values.

        Args:
            name (str): name of variable
            values (Union[pd.Series, torch.Tensor, np.ndarray]): values to encode/scale
            data (pd.DataFrame, optional): extra data used for scaling (e.g. dataframe with groups columns).
                Defaults to None.
            inverse (bool, optional): if to conduct inverse transformation. Defaults to False.
            group_id (bool, optional): If the passed name refers to a group id (different encoders are used for these).
                Defaults to False.
            **kwargs: additional arguments for transform/inverse_transform method

        Returns:
            np.ndarray: (de/en)coded/(de)scaled values
        """
        transformer = self.get_transformer(name, group_id=group_id)
        if transformer is None:
            return values
        if inverse:
            transform = transformer.inverse_transform
        else:
            transform = transformer.transform

        if group_id:
            name = self._group_ids_mapping[name]
        # remaining categories
        if name in self.flat_categoricals + self.group_ids + self._group_ids:
            return transform(values, **kwargs)

        # reals
        elif name in self.reals:
            if isinstance(transformer, GroupNormalizer):
                return transform(values, data, **kwargs)
            elif isinstance(transformer, EncoderNormalizer):
                return transform(values, **kwargs)
            else:
                if isinstance(values, pd.Series):
                    values = values.to_frame()
                    return np.asarray(transform(values, **kwargs)).reshape(-1)
                else:
                    values = values.reshape(-1, 1)
                    return transform(values, **kwargs).reshape(-1)
        else:
            return values

    def __len__(self) -> int:
        """
        Length of dataset.

        Returns:
            int: length
        """
        return len(self.index)

    def summary(self):
        """
        Summarize basic statistics of given dataset.

        Missing values, number of categorical features, number of numeric features,
        size of dataset, time interval, number of groups, and etc.


        """
        pass

    def extract_features(self):
        """
        Based on given timeseries dataset, generate new features to extract time
        series information.

        """
        pass

    def _sort_data(self, data: pd.DataFrame, columns: Union[str, List[str]]):
        """
        sort data and reset index.
        """
        return data.sort_values(by=columns).reset_index(drop=True)

    def _filter(self, data: pd.DataFrame):
        """
        filter groups whose size does not meet `min_encoder_length`+`min_prediction_length`.
        """
        group_size = data.groupby(self.group_ids).size()
        drop_list = group_size[
            group_size < (self.min_encoder_length + self.min_prediction_length)
        ].index
        data[~data[self.group_ids].isin(drop_list)].reset_index()
        return data

    def _construct_index(self, data: pd.DataFrame, predict_mode: bool) -> pd.DataFrame:
        """
        Create index of samples.

        Args:
            data (pd.DataFrame): preprocessed data
            index_name (str):
        Returns:
            pd.DataFrame: index dataframe
        """
        g = data.groupby(self.group_ids, observed=True)

        df_index_first = g["__time_idx__"].transform("nth", 0).to_frame("time_first")
        df_index_last = g["__time_idx__"].transform("nth", -1).to_frame("time_last")
        df_index_diff_to_next = (
            -g["__time_idx__"]
            .diff(-1)
            .fillna(-1)
            .astype(int)
            .to_frame("time_diff_to_next")
        )
        df_index = pd.concat(
            [df_index_first, df_index_last, df_index_diff_to_next], axis=1
        )
        df_index["index_start"] = np.arange(len(df_index))
        df_index["time"] = data["__time_idx__"]
        df_index["count"] = (df_index["time_last"] - df_index["time_first"]).astype(
            int
        ) + 1
        group_ids = g.ngroup()
        df_index["group_id"] = group_ids

        min_sequence_length = self.min_prediction_length + self.min_encoder_length
        max_sequence_length = self.max_prediction_length + self.max_encoder_length

        # calculate maximum index to include from current index_start
        max_time = (df_index["time"] + max_sequence_length - 1).clip(
            upper=df_index["count"] + df_index.time_first - 1
        )

        df_index["index_end"], missing_sequences = _find_end_indices(
            diffs=df_index.time_diff_to_next.to_numpy(),
            max_lengths=(max_time - df_index.time).to_numpy() + 1,
            min_length=min_sequence_length,
        )
        # filter out where encode and decode length are not satisfied
        df_index["sequence_length"] = (
            df_index["index_start"].iloc[df_index["index_end"]].to_numpy()
            - df_index["index_start"]
            + 1
        )

        # filter too short sequences
        df_index = df_index[
            # sequence must be at least of minimal prediction length
            lambda x: (x.sequence_length >= min_sequence_length)
        ]

        if (
            predict_mode
        ):  # keep longest element per series (i.e. the first element that spans to the end of the series)
            # filter all elements that are longer than the allowed maximum sequence length
            df_index = df_index[
                lambda x: (x["time_last"] - x["time"] + 1 <= max_sequence_length)
                & (x["sequence_length"] >= min_sequence_length)
            ]
            # choose longest sequence
            df_index = df_index.loc[
                df_index.groupby("group_id").sequence_length.idxmax()
            ]

        assert (
            len(df_index) > 0
        ), "filters should not remove entries all entries - check encoder/decoder lengths "

        df_index.reset_index(inplace=True)
        return df_index

    def _data_to_tensors(self, data: pd.DataFrame) -> Dict[str, torch.Tensor]:
        """
        Convert data to tensors for faster access with :py:meth:`~__getitem__`.

        Args:
            data (pd.DataFrame): preprocessed data

        Returns:
            Dict[str, torch.Tensor]: dictionary of tensors for continous, categorical data, groups, target and
                time index
        """
        index = check_for_nonfinite(
            torch.tensor(data[self.group_ids].to_numpy(np.int64), dtype=torch.int64),
            self.group_ids,
        )
        time = check_for_nonfinite(
            torch.tensor(data[self.time_idx].to_numpy(np.int64), dtype=torch.int64),
            self.time_idx,
        )
        categorical = check_for_nonfinite(
            torch.tensor(data[self.categoricals].to_numpy(np.int64), dtype=torch.int64),
            self.categoricals,
        )
        target = check_for_nonfinite(
            torch.tensor(
                data[self.target].to_numpy(dtype=np.float64), dtype=torch.float
            ),
            self.target,
        )
        continuous = check_for_nonfinite(
            torch.tensor(
                data[self.reals].to_numpy(dtype=np.float64), dtype=torch.float
            ),
            self.reals,
        )
        # target_scale = torch.tensor(target_scale, dtype=torch.float)
        tensors = dict(
            reals=continuous,
            categoricals=categorical,
            groups=index,
            target=target,
            time=time,
            # target_scale=target_scale
        )

        return tensors

    def _collate_fn(
        self, batches: List[Tuple[Dict[str, torch.Tensor], torch.Tensor]]
    ) -> Tuple[Dict[str, torch.Tensor], torch.Tensor]:
        """
        Collate function to combine items into mini-batch for dataloader.

        Args:
            batches (List[Tuple[Dict[str, torch.Tensor], torch.Tensor]]): List of samples generated with
                :py:meth:`~__getitem__`.

        Returns:
            Tuple[Dict[str, torch.Tensor], Tuple[Union[torch.Tensor, List[torch.Tensor]], torch.Tensor]: minibatch
        """
        # collate function for dataloader
        # lengths
        encoder_lengths = torch.tensor(
            [batch[0]["encoder_length"] for batch in batches], dtype=torch.long
        )
        decoder_lengths = torch.tensor(
            [batch[0]["decoder_length"] for batch in batches], dtype=torch.long
        )

        # ids
        decoder_time_idx_start = (
            torch.tensor(
                [batch[0]["encoder_time_idx_start"] for batch in batches],
                dtype=torch.long,
            )
            + encoder_lengths
        )
        decoder_time_idx = decoder_time_idx_start.unsqueeze(1) + torch.arange(
            decoder_lengths.max()
        ).unsqueeze(0)
        groups = torch.stack([batch[0]["groups"] for batch in batches])

        # features
        encoder_cont = rnn.pad_sequence(
            [
                batch[0]["x_cont"][:length]
                for length, batch in zip(encoder_lengths, batches)
            ],
            batch_first=True,
        )
        encoder_cat = rnn.pad_sequence(
            [
                batch[0]["x_cat"][:length]
                for length, batch in zip(encoder_lengths, batches)
            ],
            batch_first=True,
        )

        decoder_cont = rnn.pad_sequence(
            [
                batch[0]["x_cont"][length:]
                for length, batch in zip(encoder_lengths, batches)
            ],
            batch_first=True,
        )
        decoder_cat = rnn.pad_sequence(
            [
                batch[0]["x_cat"][length:]
                for length, batch in zip(encoder_lengths, batches)
            ],
            batch_first=True,
        )

        # target
        target = rnn.pad_sequence([batch[1][0] for batch in batches], batch_first=True)
        encoder_target = rnn.pad_sequence(
            [batch[0]["encoder_target"] for batch in batches], batch_first=True
        )

        return (
            dict(
                encoder_cat=encoder_cat,
                encoder_cont=encoder_cont,
                encoder_target=encoder_target,
                encoder_lengths=encoder_lengths,
                decoder_cat=decoder_cat,
                decoder_cont=decoder_cont,
                decoder_target=target,
                decoder_lengths=decoder_lengths,
                decoder_time_idx=decoder_time_idx,
                groups=groups,
            ),
            target,
        )

    def __getitem__(self, idx: int) -> Tuple[Dict[str, torch.Tensor], torch.Tensor]:
        """
        Get sample for model

        Args:
            idx (int): index of prediction (between ``0`` and ``len(dataset) - 1``)

        Returns:
            Tuple[Dict[str, torch.Tensor], torch.Tensor]: x and y for model
        """
        index = self.index.iloc[idx]
        # get index data
        data_cont = self.data["reals"][index.index_start : index.index_end + 1].clone()
        data_cat = self.data["categoricals"][
            index.index_start : index.index_end + 1
        ].clone()
        time = self.data["time"][index.index_start : index.index_end + 1].clone()
        target = self.data["target"][index.index_start : index.index_end + 1].clone()
        groups = self.data["groups"][index.index_start].clone()
        target_scale = self.target_normalizer.get_parameters(groups, self.group_ids)
        # determine encoder length and decoder length
        # why we should determine these values: if min_encoder_length != max_encoder_length
        # and min_prediction_length != max_prediction_length and sequence length <
        # max_encoder_length + max_prediction_length, we must determine what encoder length is
        # and decoder length is.

        decoder_length = min(
            self.max_prediction_length,
            index.sequence_length - self.min_encoder_length,
        )

        encoder_length = index.sequence_length - decoder_length

        assert (
            decoder_length >= self.min_prediction_length
        ), "Decoder length should be at least minimum prediction length"

        assert (
            encoder_length >= self.min_encoder_length
        ), "Encoder length should be at least minimum encoder length"

        assert decoder_length > 0, "Decoder length should be greater than 0"

        assert encoder_length > 0, "Encoder length should be greater than 0"

        encoder_cat = data_cat[:encoder_length]
        encoder_cont = data_cont[:encoder_length]
        encoder_target = target[:encoder_length]
        decoder_cat = data_cat[encoder_length:]
        decoder_cont = data_cont[encoder_length:]
        decoder_time_idx = time[encoder_length:]
        target = target[encoder_length:]

        return (
            dict(
                encoder_cat=encoder_cat,
                encoder_cont=encoder_cont,
                encoder_target=encoder_target,
                encoder_length=encoder_length,
                decoder_cat=decoder_cat,
                decoder_cont=decoder_cont,
                decoder_length=decoder_length,
                encoder_time_idx_start=time[0],
                decoder_time_idx=decoder_time_idx,
                groups=groups,
                target_scale=target_scale,
            ),
            target,
        )

    def x_to_index(self, x: Dict[str, torch.Tensor]) -> pd.DataFrame:
        """
        Decode dataframe index from x.

        Returns:
            dataframe with time index column for first prediction and group ids
        """
        index_data = {self.time_idx: x["decoder_time_idx"][:, 0].cpu()}
        for id in self.group_ids:
            index_data[id] = x["groups"][:, self.group_ids.index(id)].cpu()
            # decode if possible
            index_data[id] = self.transform_values(
                id, index_data[id], inverse=True, group_id=True
            )
        index = pd.DataFrame(index_data)
        return index

categoricals: List[str] property readonly

Categorical variables as used for modelling. Excluding categorical target if classification dataset.

Returns:

Type Description
List[str]

list of variables

flat_categoricals: List[str] property readonly

Categorical variables as defined in input data.

Returns:

Type Description
List[str]

list of variables

multi_target: bool property readonly

If dataset encodes one or multiple targets.

Returns:

Type Description
bool

true if multiple targets

reals: List[str] property readonly

Continous variables as used for modelling. Excluding continuous target if regression dataset.

Returns:

Type Description
List[str]

list of variables

variable_to_group_mapping: Dict[str, str] property readonly

Mapping from categorical variables to variables in input data.

Returns:

Type Description
Dict[str, str]

dictionary mapping from meth:~categorical to meth:~flat_categoricals.

extract_features(self)

Based on given timeseries dataset, generate new features to extract time series information.

Source code in deepts_forecasting\utils\data\dataset.py
def extract_features(self):
    """
    Based on given timeseries dataset, generate new features to extract time
    series information.

    """
    pass

from_dataset(dataset, data, stop_randomization=False, predict=False, **update_kwargs) classmethod

Generate dataset with different underlying data but same variable encoders and scalers, etc.

Calls meth:~from_parameters under the hood.

Parameters:

Name Type Description Default
dataset TimeSeriesDataSet

dataset from which to copy parameters

required
data pd.DataFrame

data from which new dataset will be generated

required
stop_randomization bool

If to stop randomizing encoder and decoder lengths, e.g. useful for validation set. Defaults to False.

False
predict bool

If to predict the decoder length on the last entries in the time index (i.e. one prediction per group only). Defaults to False.

False
**update_kwargs

keyword arguments overriding parameters in the original dataset

{}

Returns:

Type Description
TimeSeriesDataSet

new dataset

Source code in deepts_forecasting\utils\data\dataset.py
@classmethod
def from_dataset(
    cls,
    dataset,
    data: pd.DataFrame,
    stop_randomization: bool = False,
    predict: bool = False,
    **update_kwargs,
):
    """
    Generate dataset with different underlying data but same variable encoders and scalers, etc.

    Calls meth:`~from_parameters` under the hood.

    Args:
        dataset (TimeSeriesDataSet): dataset from which to copy parameters
        data (pd.DataFrame): data from which new dataset will be generated
        stop_randomization (bool, optional): If to stop randomizing encoder and decoder lengths,
            e.g. useful for validation set. Defaults to False.
        predict (bool, optional): If to predict the decoder length on the last entries in the
            time index (i.e. one prediction per group only). Defaults to False.
        **update_kwargs: keyword arguments overriding parameters in the original dataset

    Returns:
        TimeSeriesDataSet: new dataset
    """
    return cls.from_parameters(
        dataset.get_parameters(),
        data,
        stop_randomization=stop_randomization,
        predict=predict,
        **update_kwargs,
    )

from_parameters(parameters, data, stop_randomization=None, predict=False, **update_kwargs) classmethod

Generate dataset with different underlying data but same variable encoders and scalers, etc.

Parameters:

Name Type Description Default
parameters Dict[str, Any]

dataset parameters which to use for the new dataset

required
data pd.DataFrame

data from which new dataset will be generated

required
stop_randomization bool

If to stop randomizing encoder and decoder lengths, e.g. useful for validation set. Defaults to False.

None
predict bool

If to predict the decoder length on the last entries in the time index (i.e. one prediction per group only). Defaults to False.

False
**kwargs

keyword arguments overriding parameters

required

Returns:

Type Description
TimeSeriesDataSet

new dataset

Source code in deepts_forecasting\utils\data\dataset.py
@classmethod
def from_parameters(
    cls,
    parameters: Dict[str, Any],
    data: pd.DataFrame,
    stop_randomization: bool = None,
    predict: bool = False,
    **update_kwargs,
):
    """
    Generate dataset with different underlying data but same variable encoders and scalers, etc.

    Args:
        parameters (Dict[str, Any]): dataset parameters which to use for the new dataset
        data (pd.DataFrame): data from which new dataset will be generated
        stop_randomization (bool, optional): If to stop randomizing encoder and decoder lengths,
            e.g. useful for validation set. Defaults to False.
        predict (bool, optional): If to predict the decoder length on the last entries in the
            time index (i.e. one prediction per group only). Defaults to False.
        **kwargs: keyword arguments overriding parameters

    Returns:
        TimeSeriesDataSet: new dataset
    """
    parameters = deepcopy(parameters)
    if predict:
        if stop_randomization is None:
            stop_randomization = True
        elif not stop_randomization:
            warnings.warn(
                "If predicting, no randomization should be possible - setting stop_randomization=True",
                UserWarning,
            )
            stop_randomization = True
        parameters["min_prediction_length"] = parameters["max_prediction_length"]
        parameters["predict_mode"] = True
    elif stop_randomization is None:
        stop_randomization = False

    if stop_randomization:
        parameters["randomize_length"] = None
    parameters.update(update_kwargs)

    new = cls(data, **parameters)
    return new

get_parameters(self)

Get parameters that can be used with meth:~from_parameters to create a new dataset with the same scalers.

Returns:

Type Description
Dict[str, Any]

dictionary of parameters

Source code in deepts_forecasting\utils\data\dataset.py
def get_parameters(self) -> Dict[str, Any]:
    """
    Get parameters that can be used with meth:`~from_parameters` to create a new dataset with the same scalers.

    Returns:
        Dict[str, Any]: dictionary of parameters
    """
    kwargs = {
        name: getattr(self, name)
        for name in inspect.signature(self.__class__.__init__).parameters.keys()
        if name not in ["data", "self"]
    }
    kwargs["categorical_encoders"] = self.categorical_encoders
    kwargs["scalers"] = self.scalers
    return kwargs

get_transformer(self, name, group_id=False)

Get transformer for variable.

Parameters:

Name Type Description Default
name str

variable name

required
group_id bool

If the passed name refers to a group id (different encoders are used for these). Defaults to False.

False

Returns:

Type Description

transformer

Source code in deepts_forecasting\utils\data\dataset.py
def get_transformer(self, name: str, group_id: bool = False):
    """
    Get transformer for variable.

    Args:
        name (str): variable name
        group_id (bool, optional): If the passed name refers to a group id (different encoders are used for these).
            Defaults to False.

    Returns:
        transformer
    """
    if group_id:
        name = self._group_ids_mapping[name]

    if name in set(self.flat_categoricals + self.group_ids + self._group_ids):
        name = self.variable_to_group_mapping.get(name, name)  # map name to encoder
        transformer = self.categorical_encoders.get(name, None)
        return transformer

    elif name in self.reals:
        # take target normalizer if required
        transformer = self.scalers.get(name, None)
        return transformer
    else:
        return None

load(fname) classmethod

Load dataset from disk

Parameters:

Name Type Description Default
fname str

filename to load from

required

Returns:

Type Description

TimeSeriesDataSet

Source code in deepts_forecasting\utils\data\dataset.py
@classmethod
def load(cls, fname: str):
    """
    Load dataset from disk

    Args:
        fname (str): filename to load from

    Returns:
        TimeSeriesDataSet
    """
    obj = torch.load(fname)
    assert isinstance(obj, cls), f"Loaded file is not of class {cls}"
    return obj

save(self, fname)

Save dataset to disk

Parameters:

Name Type Description Default
fname str

filename to save to

required
Source code in deepts_forecasting\utils\data\dataset.py
def save(self, fname: str) -> None:
    """
    Save dataset to disk

    Args:
        fname (str): filename to save to
    """
    torch.save(self, fname)

summary(self)

Summarize basic statistics of given dataset.

Missing values, number of categorical features, number of numeric features, size of dataset, time interval, number of groups, and etc.

Source code in deepts_forecasting\utils\data\dataset.py
def summary(self):
    """
    Summarize basic statistics of given dataset.

    Missing values, number of categorical features, number of numeric features,
    size of dataset, time interval, number of groups, and etc.


    """
    pass

transform_values(self, name, values, data=None, inverse=False, group_id=False, **kwargs)

Scale and encode values.

Parameters:

Name Type Description Default
name str

name of variable

required
values Union[pd.Series, torch.Tensor, np.ndarray]

values to encode/scale

required
data pd.DataFrame

extra data used for scaling (e.g. dataframe with groups columns). Defaults to None.

None
inverse bool

if to conduct inverse transformation. Defaults to False.

False
group_id bool

If the passed name refers to a group id (different encoders are used for these). Defaults to False.

False
**kwargs

additional arguments for transform/inverse_transform method

{}

Returns:

Type Description
np.ndarray

(de/en)coded/(de)scaled values

Source code in deepts_forecasting\utils\data\dataset.py
def transform_values(
    self,
    name: str,
    values: Union[pd.Series, torch.Tensor, np.ndarray],
    data: pd.DataFrame = None,
    inverse=False,
    group_id: bool = False,
    **kwargs,
) -> np.ndarray:
    """
    Scale and encode values.

    Args:
        name (str): name of variable
        values (Union[pd.Series, torch.Tensor, np.ndarray]): values to encode/scale
        data (pd.DataFrame, optional): extra data used for scaling (e.g. dataframe with groups columns).
            Defaults to None.
        inverse (bool, optional): if to conduct inverse transformation. Defaults to False.
        group_id (bool, optional): If the passed name refers to a group id (different encoders are used for these).
            Defaults to False.
        **kwargs: additional arguments for transform/inverse_transform method

    Returns:
        np.ndarray: (de/en)coded/(de)scaled values
    """
    transformer = self.get_transformer(name, group_id=group_id)
    if transformer is None:
        return values
    if inverse:
        transform = transformer.inverse_transform
    else:
        transform = transformer.transform

    if group_id:
        name = self._group_ids_mapping[name]
    # remaining categories
    if name in self.flat_categoricals + self.group_ids + self._group_ids:
        return transform(values, **kwargs)

    # reals
    elif name in self.reals:
        if isinstance(transformer, GroupNormalizer):
            return transform(values, data, **kwargs)
        elif isinstance(transformer, EncoderNormalizer):
            return transform(values, **kwargs)
        else:
            if isinstance(values, pd.Series):
                values = values.to_frame()
                return np.asarray(transform(values, **kwargs)).reshape(-1)
            else:
                values = values.reshape(-1, 1)
                return transform(values, **kwargs).reshape(-1)
    else:
        return values

x_to_index(self, x)

Decode dataframe index from x.

Returns:

Type Description
DataFrame

dataframe with time index column for first prediction and group ids

Source code in deepts_forecasting\utils\data\dataset.py
def x_to_index(self, x: Dict[str, torch.Tensor]) -> pd.DataFrame:
    """
    Decode dataframe index from x.

    Returns:
        dataframe with time index column for first prediction and group ids
    """
    index_data = {self.time_idx: x["decoder_time_idx"][:, 0].cpu()}
    for id in self.group_ids:
        index_data[id] = x["groups"][:, self.group_ids.index(id)].cpu()
        # decode if possible
        index_data[id] = self.transform_values(
            id, index_data[id], inverse=True, group_id=True
        )
    index = pd.DataFrame(index_data)
    return index