data_pipeline.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """ Download dataset, preprocess data and provide utility methods to manage the dataset for training and evaluation """
  2. import logging
  3. import numpy as np
  4. import os
  5. from tensorflow.python.keras.utils import Sequence
  6. from util.movielens_utils import load_ratings_data
  7. import util.movielens_utils as ml
  8. COL_USER_ID = 'userId'
  9. COL_ITEM_ID = 'itemId'
  10. COL_RATING = 'rating'
  11. COL_LABEL = 'label'
  12. class MovieLensDataGenerator(Sequence):
  13. def __init__(self, dataset_name, data_df, batch_size, negatives_per_positive, extra_data_df=None, shuffle=True):
  14. # TODO yield last incomplete batch (optional)
  15. """
  16. Create a generator that provides batches of Movielens data for training or testing purposes.
  17. Every batch contains positive and negative examples. The positive examples are taken from `data_df`.
  18. The negative examples are generated for every batch by picking a random list of items per user that are not
  19. present in either `data_df` or `extra_data_df`.
  20. Parameters
  21. ----------
  22. dataset_name : str
  23. Movielens dataset name. Must be one of MOVIELENS_DATASET_NAMES.
  24. data_df : `Dataframe`
  25. Data to build the generator from. It is a Movielens dataset containing users, items and ratings.
  26. batch_size : int
  27. Batch size to yield data. Must be divisible by (negatives_per_positive + 1)
  28. negatives_per_positive : int
  29. Number of negatives examples to generate for every positive example in a batch. Batch size must be divisible
  30. by (negatives_per_positive + 1).
  31. extra_data_df : `DataFrame`
  32. Optional dataframe to be used when computing negatives. Negative items for a user are those that do not
  33. exist for that user in 'data_df' or 'extra_data_df'. The data of this is not directly provided by the
  34. generator.
  35. shuffle : bool
  36. Whether to shuffle the data_df between epochs. Note that the negative examples are randomly generated for
  37. every batch, so, even when `shuffle` is False, the batches will be different every time (positives will be
  38. equal, but negatives will be different).
  39. """
  40. if dataset_name not in ml.MOVIELENS_DATASET_NAMES:
  41. raise ValueError('Invalid dataset name {}. Must be one of {}'
  42. .format(dataset_name, ', '.join(ml.MOVIELENS_DATASET_NAMES)))
  43. if negatives_per_positive <= 0:
  44. raise ValueError("negatives_per_positive must be > 0, found {}".format(negatives_per_positive))
  45. if batch_size % (negatives_per_positive + 1):
  46. raise ValueError("Batch size must be divisible by (negatives_per_positive + 1). Found: batch_size={}, "
  47. "negatives_per_positive={}".format(batch_size, negatives_per_positive))
  48. self._dataset_name = dataset_name
  49. self._num_users = ml.NUM_USERS[dataset_name]
  50. self._num_items = ml.NUM_ITEMS[dataset_name]
  51. self.data = data_df
  52. self.extra_data = extra_data_df
  53. self.batch_size = batch_size
  54. self.negatives_per_positive = negatives_per_positive
  55. self.num_positives_per_batch = self.batch_size // (negatives_per_positive + 1)
  56. self.num_negatives_per_batch = self.batch_size - self.num_positives_per_batch
  57. self.shuffle = shuffle
  58. self.indexes = np.arange(len(self.data))
  59. self.on_epoch_end()
  60. logging.info('Created generator for {}. Num users={}, num items={}, num_batches={}, batch size={}, '
  61. 'positives per batch={}, negatives per batch={}'
  62. .format(dataset_name, self._num_users, self._num_items, len(self), batch_size,
  63. self.num_positives_per_batch, self.num_negatives_per_batch))
  64. @property
  65. def num_users(self):
  66. return self._num_users
  67. @property
  68. def num_items(self):
  69. return self._num_items
  70. @property
  71. def dataset_name(self):
  72. return self._dataset_name
  73. def __len__(self):
  74. """
  75. Number of batches in the Sequence.
  76. Returns
  77. -------
  78. The number of batches.
  79. """
  80. return int(np.floor(len(self.indexes) / self.batch_size))
  81. def _get_random_negatives_and_positive(self, row):
  82. # Given a DataFrame row, generate an array of random negative items and the positive one at the end.
  83. user = row[COL_USER_ID]
  84. positives_user = self.data[self.data[COL_USER_ID] == user][COL_ITEM_ID]
  85. if self.extra_data is not None:
  86. positives_user = positives_user.append(self.extra_data[self.extra_data[COL_USER_ID] == user][COL_ITEM_ID])
  87. # obtain possible negatives
  88. possible_negs = np.setdiff1d(np.arange(self.num_items), positives_user, assume_unique=True)
  89. # select randomly, without replacement, if possible
  90. replace = len(possible_negs) < self.negatives_per_positive
  91. negative_items = np.random.choice(possible_negs, self.negatives_per_positive, replace=replace)
  92. return np.append(negative_items, int(row[COL_ITEM_ID]))
  93. def __getitem__(self, idx):
  94. """
  95. Generate and return a batch of data. A batch contains a ratio of 1:self.negatives_per_positive of positive
  96. to negative examples per user. The positive examples are taken from self.data. The negative examples are
  97. generated by this method by picking a random list of items per user that are not present in either `self.data`
  98. or `self.extra_data`. The random selection is performed without replacement, when possible (if num items to be
  99. selected < num possible items to select).
  100. Parameters
  101. ----------
  102. idx : int
  103. Batch index. Must be between 0 and len(self)-1.
  104. Returns
  105. -------
  106. X, Y: Tuple of type (List of 2 np.arrays, np.arrays)
  107. A batch of data. `X` are the inputs: an array of users and an array of items. `Y` is an array of labels.
  108. All 3 arrays have size `self.batch_size`.
  109. """
  110. # Get indexes of the positive examples for this batch:
  111. idxs_pos = self.indexes[idx * self.num_positives_per_batch:(idx + 1) * self.num_positives_per_batch]
  112. # Get the positives
  113. positives = self.data.iloc[idxs_pos]
  114. # users are repeated to include negatives
  115. x_user = np.repeat(positives[COL_USER_ID].values, 1 + self.negatives_per_positive)
  116. # items: for every positive, create array of random negatives and the positive at the end
  117. items_with_negatives = positives.apply(self._get_random_negatives_and_positive, axis=1)
  118. x_item = np.concatenate(items_with_negatives.values)
  119. # labels: first negative labels, then one positive (N times)
  120. y = np.tile([0] * self.negatives_per_positive + [1], self.num_positives_per_batch)
  121. return [x_user, x_item], y
  122. def on_epoch_end(self):
  123. if self.shuffle:
  124. np.random.shuffle(self.indexes)
  125. def load_ratings_train_test_sets(dataset_name, data_dir, download=True):
  126. """
  127. Load Movielens ratings dataset from a file. Optionally download it first if it does not exist.
  128. Split the dataset in train, validation and test sets.
  129. Parameters
  130. ----------
  131. dataset_name : str
  132. Movielens dataset name. Must be one of MOVIELENS_DATASET_NAMES.
  133. data_dir : str or os.path
  134. Dataset directory to read from. The file to read from the directory will be:
  135. data_dir/dataset_name/RATINGS_FILE_NAME[dataset_name].
  136. download : boolean
  137. Download and extract Movielens dataset if it does not exist in the 'data_dir'. Default=True.
  138. Returns
  139. -------
  140. (train, validation, test) : (DataFrame)
  141. Dataset split into 3 DataFrames with columns COL_USER_ID, COL_ITEM_ID, COL_RATING.
  142. """
  143. if dataset_name not in ml.MOVIELENS_DATASET_NAMES:
  144. raise ValueError('Invalid dataset name {}. Must be one of {}'
  145. .format(dataset_name, ', '.join(ml.MOVIELENS_DATASET_NAMES)))
  146. # Load dataset in a dataframe (maybe download dataset first)
  147. ratings_df = load_ratings_data(data_dir, dataset_name, COL_USER_ID, COL_ITEM_ID, COL_RATING, download)
  148. # TODO: log a small summary, check for duplicates?
  149. # TODO: implement other splitting options. Add more params for splits
  150. # split in train, validation and test by taking the out the first rating of every user as test, the rest is train
  151. # (every user in movielens has at least 20 items rated)
  152. grouped_by_user = ratings_df.groupby(COL_USER_ID, group_keys=False)
  153. test_ratings_df = grouped_by_user.apply(lambda x: x.iloc[[-1]])
  154. validation_df = grouped_by_user.apply(lambda x: x.iloc[[-2]])
  155. train_ratings_df = grouped_by_user.apply(lambda x: x.iloc[:-2])
  156. # reset the indexes as they are kept from the original dataframe
  157. test_ratings_df = test_ratings_df.reset_index(drop=True)
  158. validation_df = validation_df.reset_index(drop=True)
  159. train_ratings_df = train_ratings_df.reset_index(drop=True)
  160. return train_ratings_df, validation_df, test_ratings_df