Source code for torch_timeseries.dataloader.maskts

from typing import Sequence, Tuple, Type

import numpy as np
import torch

from torch_timeseries.utils.timefeatures import time_features
from ..scaler import Scaler
from torch_timeseries.core import (
    TimeSeriesDataset,
    TimeseriesSubset,
)
from torch.utils.data import Dataset, DataLoader, RandomSampler, Subset

from .wrapper import MultiStepTimeFeatureSet




[docs]class MaskTimeFeatureSet(Dataset): def __init__(self, dataset: TimeseriesSubset, scaler: Scaler, mask_rate=0.4, time_enc=0, window: int = 168, freq=None, scaler_fit=True): self.dataset = dataset self.window = window self.time_enc = time_enc self.scaler = scaler self.mask_rate = mask_rate self.num_features = self.dataset.num_features self.length = self.dataset.length if freq is None: self.freq = self.dataset.freq else: self.freq = freq if scaler_fit: self.scaler.fit(self.dataset.data) self.scaled_data = self.scaler.transform(self.dataset.data) self.date_enc_data = time_features( self.dataset.dates, self.time_enc, self.freq) assert len(self.dataset) - self.window > 0, "Dataset is not long enough!!!" def transform(self, values): return self.scaler.transform(values) def inverse_transform(self, values): return self.scaler.inverse_transform(values) def __getitem__(self, index): if isinstance(index, int): scaled_x = self.scaled_data[index:index+self.window] x = self.dataset.data[index:index+self.window] x_date_enc = self.date_enc_data[index:index+self.window] mask = np.random.rand(self.window, self.dataset.num_features) mask[mask <= self.mask_rate] = 0 mask[mask > self.mask_rate] = 1 masked_scaled_x = scaled_x*mask return masked_scaled_x, scaled_x, x , mask, x_date_enc else: raise TypeError('Not surpported index type!!!') def __len__(self): return len(self.dataset) - self.window
[docs]class MaskTS: """ Data loader for imputation time series datasets. Attributes: dataset (TimeSeriesDataset): Time series dataset to be used. scaler (Scaler): Scaler to normalize the data. time_enc (int): Time encoding flag. window (int): Window size for the time series data. mask_rate (float): Rate at which data is masked. scale_in_train (bool): Whether to scale data during training. shuffle_train (bool): Whether to shuffle the training data. freq (str or None): Frequency of the time series data. batch_size (int): Number of samples per batch. train_ratio (float): Ratio of the dataset to be used for training. val_ratio (float): Ratio of the dataset to be used for validation. num_worker (int): Number of worker threads for data loading. uniform_eval (bool): Whether to use uniform evaluation. train_loader (DataLoader): DataLoader for the training data. val_loader (DataLoader): DataLoader for the validation data. test_loader (DataLoader): DataLoader for the test data. """ def __init__( self, dataset: TimeSeriesDataset, scaler: Scaler, time_enc=0, window: int = 168, mask_rate=0.4, scale_in_train=True, shuffle_train=True, freq=None, batch_size: int = 32, train_ratio: float = 0.7, val_ratio: float = 0.2, num_worker: int = 3, uniform_eval=True, ) -> None: self.train_ratio = train_ratio self.val_ratio = val_ratio self.test_ratio = 1 - self.train_ratio - self.val_ratio self.uniform_eval = uniform_eval self.mask_rate = mask_rate assert ( self.train_ratio + self.val_ratio + self.test_ratio == 1.0 ), "Split ratio must sum up to 1.0" self.batch_size = batch_size self.num_worker = num_worker self.dataset = dataset self.scaler = scaler self.window = window self.freq = freq self.time_enc = time_enc self.shuffle_train = shuffle_train self.scale_in_train = scale_in_train self._load() def _load(self): self._load_dataset() self._load_dataloader() def _load_dataset(self): """ Return the splitted training, testing and validation dataloders :return: a tuple of train_dataloader, test_dataloader and val_dataloader """ # fixed suquence dataset indices = range(0, len(self.dataset)) train_size = int(self.train_ratio * len(self.dataset)) val_size = int(self.val_ratio * len(self.dataset)) test_size = len(self.dataset) - val_size - train_size train_subset = TimeseriesSubset(self.dataset, indices[0:train_size]) if self.uniform_eval: val_subset = TimeseriesSubset( self.dataset, indices[train_size - self.window: (val_size + train_size)] ) else: val_subset = TimeseriesSubset( self.dataset,indices[train_size: (val_size + train_size)] ) if self.uniform_eval: test_subset = TimeseriesSubset(self.dataset, indices[-test_size - self.window:]) else: test_subset = TimeseriesSubset(self.dataset, indices[-test_size:]) if self.scale_in_train: self.scaler.fit(train_subset.data) else: self.scaler.fit(self.dataset.data) self.train_dataset = MaskTimeFeatureSet( train_subset, scaler=self.scaler, time_enc=self.time_enc, window=self.window, freq=self.freq, scaler_fit=False, mask_rate=self.mask_rate ) self.val_dataset = MaskTimeFeatureSet( val_subset, scaler=self.scaler, time_enc=self.time_enc, window=self.window, freq=self.freq, scaler_fit=False, mask_rate=self.mask_rate ) self.test_dataset = MaskTimeFeatureSet( test_subset, scaler=self.scaler, time_enc=self.time_enc, window=self.window, freq=self.freq, scaler_fit=False, mask_rate=self.mask_rate ) def _load_dataloader(self): self.train_size = len(self.train_dataset) self.val_size = len(self.val_dataset) self.test_size = len(self.test_dataset) self.train_loader = DataLoader( self.train_dataset, batch_size=self.batch_size, shuffle=self.shuffle_train, num_workers=self.num_worker, ) self.val_loader = DataLoader( self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_worker, ) self.test_loader = DataLoader( self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_worker, )