model.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. """ Recommender model and utility methods to train, predict and evaluate """
  2. import functools
  3. import json
  4. import logging
  5. import os
  6. from tensorflow.python.keras import backend as K
  7. from tensorflow.python.keras.callbacks import EarlyStopping, ModelCheckpoint
  8. from tensorflow.python.keras.layers import concatenate, Dense, Embedding, Input, Flatten, Layer
  9. from tensorflow.python.keras.models import Model
  10. from tensorflow.python.keras.optimizers import Adam, SGD
  11. from tensorflow.python.keras.regularizers import l2
  12. DEFAULT_PARAMS = { # Just some toy params to test the code
  13. # model:
  14. "num_users": 5,
  15. "num_items": 10,
  16. "layers_sizes": [5, 4],
  17. "layers_l2reg": [0.01, 0.01],
  18. # training:
  19. "optimizer": "adam",
  20. "lr": 0.001,
  21. "beta_1": 0.9,
  22. "beta_2": 0.999,
  23. "batch_size": 6,
  24. "num_negs_per_pos": 2,
  25. "batch_size_eval": 12,
  26. "num_negs_per_pos_eval": 5,
  27. "k": 3
  28. }
  29. ADAM_NAME = "adam"
  30. SGD_NAME = "sgd"
  31. OPTIMIZERS = [ADAM_NAME, SGD_NAME]
  32. HIT_RATE = "hr"
  33. DCG = "dcg"
  34. OUTPUT_PRED = "output"
  35. OUTPUT_RANK = "rank"
  36. METRIC_VAL_DCG = "val_{}_{}".format(OUTPUT_PRED, DCG)
  37. class MovierecModel(object):
  38. """
  39. Movie Recommendation Model
  40. """
  41. def __init__(self, params=DEFAULT_PARAMS, model_name='movierec', output_dir="models/", verbose=1):
  42. """
  43. Create a movie recommendation model.
  44. Parameters
  45. ----------
  46. params : dict of param names (str) to values (any type)
  47. Dictionary of model hyper parameters. Default: `DEFAULT_PARAMS`.
  48. model_name : str
  49. Name of the model. Used in `Model` instance and to save model files.
  50. output_dir : str or `os.path`
  51. Output directory to save model files.
  52. verbose : int
  53. Verbosity mode.
  54. """
  55. # Save params into internal attributes and perform some basic verifications.
  56. # Dict params` is not saved to encourage detection of common input errors (missing keys, wrong values)
  57. # as early as possible.
  58. self._num_users = params["num_users"]
  59. self._num_items = params["num_items"]
  60. self._layers_sizes = params["layers_sizes"]
  61. self._layers_l2reg = params["layers_l2reg"]
  62. if len(self._layers_sizes) != len(self._layers_l2reg):
  63. raise ValueError("'layers_sizes' length = {}, 'layers_l2reg' length = {}, but must be equal."
  64. .format(len(self._layers_sizes), len(self._layers_l2reg)))
  65. self._num_layers = len(self._layers_sizes)
  66. # params to compile
  67. self._optimizer = params["optimizer"]
  68. if self._optimizer not in OPTIMIZERS:
  69. raise NotImplementedError("Optimizer {} is not implemented.".format(params["optimizer"]))
  70. self._lr = params["lr"]
  71. self._beta_1 = params.get("beta_1", 0.9) # optional, for Adam optimizer, default from paper
  72. self._beta_2 = params.get("beta_2", 0.999) # optional, for Adam optimizer, default from paper
  73. self._batch_size = params["batch_size"]
  74. self._num_negs_per_pos = params["num_negs_per_pos"]
  75. if self._num_negs_per_pos <= 0:
  76. raise ValueError("num_negs_per_pos must be > 0, found {}".format(self._num_negs_per_pos))
  77. if self._batch_size % (self._num_negs_per_pos + 1):
  78. raise ValueError("Batch size must be divisible by (num_negs_per_pos + 1). Found: batch_size={}, "
  79. "num_negs_per_pos={}".format(self._batch_size, self._num_negs_per_pos))
  80. self._batch_size_eval = params["batch_size_eval"]
  81. self._num_negs_per_pos_eval = params["num_negs_per_pos_eval"]
  82. if self._num_negs_per_pos_eval <= 0:
  83. raise ValueError("num_negs_per_pos_eval must be > 0, found {}".format(self._num_negs_per_pos_eval))
  84. if self._batch_size_eval % (self._num_negs_per_pos_eval + 1):
  85. raise ValueError("Batch size (eval) must be divisible by (num_negs_per_pos_eval + 1). Found: "
  86. "batch_size_eval={}, num_negs_per_pos_eval={}".format(self._batch_size_eval,
  87. self._num_negs_per_pos_eval))
  88. self._k = params.get("k", self._num_negs_per_pos + 1) # optional
  89. if self._k > (self._num_negs_per_pos + 1):
  90. raise ValueError("'k' must be lower than (num_negs_per_pos + 1) and lower than (num_negs_per_pos_eval + 1)."
  91. "Found: k={}, num_negs_per_pos={}, num_negs_per_pos_eval={}"
  92. .format(self._k, self._num_negs_per_pos, self._num_negs_per_pos_eval))
  93. # Create output dir and get file names
  94. try:
  95. os.makedirs(output_dir)
  96. except FileExistsError:
  97. # directory already exists
  98. pass
  99. self.name = model_name
  100. self._model_weights_path = self.get_model_weights_path(output_dir, model_name)
  101. self._params_path = self.get_params_json_path(output_dir, model_name)
  102. # serialize params for later (should just be used to save to file)
  103. self._serialized_params = json.dumps(params)
  104. self._output_model_checkpoints = os.path.join(
  105. output_dir, "{}-checkpoint-{{epoch:02d}}-{{val_loss:.2f}}.h5".format(model_name))
  106. self.verbose = verbose
  107. # Build model and compile
  108. self.model = self.build_mlp_model()
  109. self.compile_model()
  110. def build_mlp_model(self):
  111. """
  112. Build a MLP (Multi Layer Perceptron) model with the following architecture:
  113. Input: 2 matrices of size (batch, num_users), (batch, num_items)
  114. First Layer: [ User Embedding ][ Item Embedding ]
  115. N times: [ Hidden Dense Layer ]
  116. Last Layer: [ Prediction Layer ]
  117. Output: vector of size (batch, 1)
  118. Returns
  119. -------
  120. model: `Model`
  121. Keras MLP model.
  122. """
  123. # Inputs
  124. user_input = Input(shape=(1,), dtype="int32", name="user_input")
  125. item_input = Input(shape=(1,), dtype="int32", name="item_input")
  126. # First layer is the concatenation of embeddings for users and items
  127. # (size of each is about half of layers_sizes[0])
  128. user_layer_size = self._layers_sizes[0] // 2
  129. item_layer_size = self._layers_sizes[0] - user_layer_size # in case layers_sizes[0] % 2 != 0
  130. user_embedding = Embedding(
  131. input_dim=self._num_users, output_dim=user_layer_size, input_length=1,
  132. embeddings_initializer="glorot_uniform", embeddings_regularizer=l2(self._layers_l2reg[0]),
  133. name="user_embedding"
  134. )
  135. item_embedding = Embedding(
  136. input_dim=self._num_items, output_dim=item_layer_size, input_length=1,
  137. embeddings_initializer="glorot_uniform", embeddings_regularizer=l2(self._layers_l2reg[0]),
  138. name="item_embedding"
  139. )
  140. mlp_layer = concatenate([Flatten()(user_embedding(user_input)),
  141. Flatten()(item_embedding(item_input))])
  142. # Hidden layers
  143. for layer_i in range(1, self._num_layers):
  144. hidden = Dense(
  145. units=self._layers_sizes[layer_i], activation="relu",
  146. kernel_initializer="glorot_uniform", kernel_regularizer=l2(self._layers_l2reg[layer_i]),
  147. name="hidden_{}".format(layer_i)
  148. )
  149. mlp_layer = hidden(mlp_layer)
  150. # Prediction layer
  151. pred_layer = Dense(
  152. units=1, activation="sigmoid",
  153. kernel_initializer="lecun_uniform", name=OUTPUT_PRED
  154. )
  155. output_pred = pred_layer(mlp_layer)
  156. rank_layer = RankLayer(self._num_negs_per_pos, self._num_negs_per_pos_eval, name=OUTPUT_RANK)
  157. rank = rank_layer(output_pred)
  158. # Create Model
  159. model = Model([user_input, item_input], [output_pred, rank])
  160. return model
  161. def compile_model(self):
  162. # Set optimizer
  163. if self._optimizer == ADAM_NAME:
  164. optimizer = Adam(self._lr, self._beta_1, self._beta_2)
  165. elif self._optimizer == SGD_NAME:
  166. optimizer = SGD(self._lr)
  167. else:
  168. raise NotImplementedError("Optimizer {} is not implemented.".format(self._optimizer))
  169. # Create metrics
  170. hit_rate_fn = functools.partial(hit_rate, k=self._k, pred_rank_idx=self.get_pred_rank())
  171. hit_rate_fn.__name__ = HIT_RATE
  172. dcg_fn = functools.partial(discounted_cumulative_gain, k=self._k, pred_rank_idx=self.get_pred_rank())
  173. dcg_fn.__name__ = DCG
  174. # Compile model
  175. self.model.compile(optimizer=optimizer,
  176. loss={OUTPUT_PRED: "binary_crossentropy"},
  177. metrics={OUTPUT_PRED: [hit_rate_fn, dcg_fn]})
  178. @staticmethod
  179. def get_model_weights_path(output_dir, model_name):
  180. return os.path.join(output_dir, "{}_weights.h5".format(model_name))
  181. @staticmethod
  182. def get_params_json_path(output_dir, model_name):
  183. return os.path.join(output_dir, "{}_params.json".format(model_name))
  184. def get_pred_rank(self):
  185. """
  186. Get output of rank layer.
  187. Returns
  188. -------
  189. output : `tf.Tensor`
  190. Output of rank layer.
  191. """
  192. return self.model.get_layer(OUTPUT_RANK).output
  193. def log_summary(self):
  194. self.model.summary(print_fn=logging.info)
  195. def save(self):
  196. """
  197. Save params and weights to files.
  198. """
  199. # Save model weights and serialized model class instance.
  200. self.model.save_weights(self._model_weights_path)
  201. logging.info('Model weights saved to: {}'.format(self._model_weights_path))
  202. with open(self._params_path, 'w') as f_out:
  203. f_out.write(self._serialized_params)
  204. logging.info('Model params saved to: {}'.format(self._params_path))
  205. @staticmethod
  206. def load_from_dir(model_dir, model_name, verbose=1):
  207. """
  208. Load `MovierecModel` from directory and model name. File names are built like when saving in a
  209. `MovierecModel` instance.
  210. Parameters
  211. ----------
  212. model_dir : str or `os.path`
  213. Directory where model files are. Also used as output directory to create the MovierecModel object,
  214. in case `save` is called.
  215. model_name : str or `os.path`
  216. Model name, used to build model file name. Also used to create a MovierecModel object, in case `save`
  217. is called.
  218. verbose : int
  219. Verbosity level.
  220. Returns
  221. -------
  222. MovierecModel object.
  223. """
  224. params_path = MovierecModel.get_params_json_path(model_dir, model_name)
  225. weights_path = MovierecModel.get_model_weights_path(model_dir, model_name)
  226. return MovierecModel.load_from_files(params_path, weights_path, model_dir, model_name, verbose)
  227. @staticmethod
  228. def load_from_files(params_path, weights_path, output_model_dir, output_model_name, verbose=1):
  229. """
  230. Load `MovierecModel` from param and weight files.
  231. Parameters
  232. ----------
  233. params_path : str or `os.path`
  234. Path of params json file.
  235. weights_path : str or `os.path`
  236. Path of weights h5 file.
  237. output_model_dir : str or `os.path`
  238. Output directory, needed to create a MovierecModel object, in case `save` is called.
  239. output_model_name : str or `os.path`
  240. Model name, needed to create a MovierecModel object, in case `save` is called.
  241. verbose : int
  242. Verbosity level.
  243. Returns
  244. -------
  245. MovierecModel object.
  246. """
  247. with open(params_path, 'r') as f_in:
  248. params = json.load(f_in)
  249. print(params)
  250. movierec = MovierecModel(params, output_model_dir, output_model_name, verbose)
  251. movierec.model.load_weights(weights_path)
  252. return movierec
  253. def fit_generator(self, train_data_generator, validation_data_generator, epochs):
  254. """
  255. Call keras 'fit_generator' on the model with early stopping and checkpoint callbacks.
  256. Parameters
  257. ----------
  258. train_data_generator : A generator or a `keras.utils.Sequence`
  259. Generator of training data.
  260. validation_data_generator : A generator or a `keras.utils.Sequence`
  261. Generator of validation data.
  262. epochs : int
  263. Number of epochs.
  264. Returns
  265. -------
  266. Output of model.fit_generator(...) (`History` object)
  267. """
  268. # Callbacks
  269. callbacks = [
  270. EarlyStopping(monitor=METRIC_VAL_DCG, mode='max', restore_best_weights=True, patience=5,
  271. verbose=self.verbose),
  272. ModelCheckpoint(self._output_model_checkpoints, monitor=METRIC_VAL_DCG, save_best_only=True,
  273. save_weights_only=False, mode='max', verbose=self.verbose)
  274. ]
  275. return self.model.fit_generator(generator=train_data_generator,
  276. validation_data=validation_data_generator,
  277. epochs=epochs,
  278. callbacks=callbacks,
  279. verbose=self.verbose)
  280. class RankLayer(Layer):
  281. def __init__(self, num_negs_per_pos_train, num_negs_per_pos_eval, name, **kwargs):
  282. super(RankLayer, self).__init__(name=name, **kwargs)
  283. self.num_negs_per_pos_train = num_negs_per_pos_train
  284. self.num_negs_per_pos_eval = num_negs_per_pos_eval
  285. self._uses_learning_phase = True
  286. def call(self, inputs, **kwargs):
  287. inputs = K.ops.convert_to_tensor(inputs)
  288. num_negs_per_pos = K.in_train_phase(self.num_negs_per_pos_train, self.num_negs_per_pos_eval)
  289. # Reshape and get ranked indices per user
  290. y_pred_per_user = K.reshape(inputs, (-1, num_negs_per_pos + 1))
  291. _, indices = K.nn.top_k(y_pred_per_user, K.shape(y_pred_per_user)[1], sorted=True)
  292. return indices
  293. def get_config(self):
  294. config = {'num_negs_per_pos_train': self.num_negs_per_pos_train,
  295. 'num_negs_per_pos_eval': self.num_negs_per_pos_eval}
  296. base_config = super(RankLayer, self).get_config()
  297. return dict(list(base_config.items()) + list(config.items()))
  298. def hit_rate(y_true, _, k, pred_rank_idx):
  299. """
  300. Compute HR (Hit Rate) in batch considering only the top 'k' items in the rank.
  301. Parameters
  302. ----------
  303. y_true : `tf.Tensor` (or `np.array`)
  304. True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
  305. and the rest are negative (0).
  306. _
  307. Placeholder for y_pred. Ignored argument that will be passed by keras metrics, but this method will only
  308. use pred_rank_idx.
  309. k : int
  310. Number of top elements to consider for the metric computation.
  311. pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
  312. Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
  313. with the ranked indexes of the items in the row.
  314. Returns
  315. -------
  316. hit rate: `tf.Tensor`
  317. A single value tensor with the hit rate for the batch.
  318. """
  319. hits_per_user, _ = _get_hits_per_user(y_true, pred_rank_idx, k)
  320. return K.mean(hits_per_user, axis=-1)
  321. def discounted_cumulative_gain(y_true, _, k, pred_rank_idx):
  322. """
  323. Compute DCG (Discounted Cumulative Gain) considering only the top 'k' items in the rank.
  324. Parameters
  325. ----------
  326. y_true : `tf.Tensor` (or `np.array`)
  327. True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
  328. and the rest are negative (0).
  329. _
  330. Placeholder for y_pred. Ignored argument that will be passed by keras metrics, but this method will only
  331. use pred_rank_idx.
  332. k : int
  333. Number of top elements to consider for the metric computation.
  334. pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
  335. Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
  336. with the ranked indexes of the items in the row.
  337. Returns
  338. -------
  339. discounted cumulative gain: `tf.Tensor`
  340. A single value tensor with the average Discounted Cumulative Gain on the top k for the batch.
  341. """
  342. hits_per_user, idx_label_in_pred_rank = _get_hits_per_user(y_true, pred_rank_idx, k)
  343. # compute dcg for each item, but make 0.0 the entries where position is > k (only consider top k)
  344. dcg_per_user = K.math_ops.log(2.) / K.math_ops.log(K.cast(idx_label_in_pred_rank, "float32") + 2)
  345. dcg_per_user *= hits_per_user
  346. return K.mean(dcg_per_user, axis=-1)
  347. def _get_hits_per_user(y_true, pred_rank_idx, k):
  348. """
  349. Compute the position of the label in the predicted ranking and whether is a hit on top k or not.
  350. Parameters
  351. ----------
  352. y_true : `tf.Tensor` (or `np.array`)
  353. True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
  354. and the rest are negative (0).
  355. pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
  356. Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
  357. with the ranked indexes of the items in the row.
  358. k : int
  359. Number of top elements to consider for the metric computation.
  360. Returns
  361. -------
  362. Tuple: (hits_per_user, idx_label_in_pred_rank)
  363. hits_per_user : `tf.Tensor` with shape (users per batch, )
  364. Tensor of floats where elements are 1.0 if there is a hit (label is in top k) for that user, or
  365. 0.0 otherwise.
  366. idx_label_in_pred_rank : `tf.Tensor` with shape (users per batch, )
  367. Tensor of integers with the index of the label in the predicted rank.
  368. """
  369. # Get the index of the positive label per user.
  370. # Assume that every user has num_neg_per_pos negatives (zeros) and one positive (1).
  371. idx_label_per_user = K.reshape(y_true, K.shape(pred_rank_idx))
  372. idx_label_per_user = K.math_ops.argmax(idx_label_per_user, axis=-1, output_type="int32")
  373. # get the position of the expected label in the ranked list and compute whether is a hit in top k
  374. idx_label_in_pred_rank = K.array_ops.where(K.equal(pred_rank_idx, K.reshape(idx_label_per_user, (-1, 1))))[:, -1]
  375. # determine whether the label is in top k of ranking or not
  376. hits_per_user = K.cast(K.less(idx_label_in_pred_rank, k), "float32")
  377. return hits_per_user, idx_label_in_pred_rank