123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- """ Recommender model and utility methods to train, predict and evaluate """
- import functools
- import json
- import logging
- import os
- from tensorflow.python.keras import backend as K
- from tensorflow.python.keras.callbacks import EarlyStopping, ModelCheckpoint
- from tensorflow.python.keras.layers import concatenate, Dense, Embedding, Input, Flatten, Layer
- from tensorflow.python.keras.models import Model
- from tensorflow.python.keras.optimizers import Adam, SGD
- from tensorflow.python.keras.regularizers import l2
- DEFAULT_PARAMS = { # Just some toy params to test the code
- # model:
- "num_users": 5,
- "num_items": 10,
- "layers_sizes": [5, 4],
- "layers_l2reg": [0.01, 0.01],
- # training:
- "optimizer": "adam",
- "lr": 0.001,
- "beta_1": 0.9,
- "beta_2": 0.999,
- "batch_size": 6,
- "num_negs_per_pos": 2,
- "batch_size_eval": 12,
- "num_negs_per_pos_eval": 5,
- "k": 3
- }
- ADAM_NAME = "adam"
- SGD_NAME = "sgd"
- OPTIMIZERS = [ADAM_NAME, SGD_NAME]
- HIT_RATE = "hr"
- DCG = "dcg"
- OUTPUT_PRED = "output"
- OUTPUT_RANK = "rank"
- METRIC_VAL_DCG = "val_{}_{}".format(OUTPUT_PRED, DCG)
- class MovierecModel(object):
- """
- Movie Recommendation Model
- """
- def __init__(self, params=DEFAULT_PARAMS, model_name='movierec', output_dir="models/", verbose=1):
- """
- Create a movie recommendation model.
- Parameters
- ----------
- params : dict of param names (str) to values (any type)
- Dictionary of model hyper parameters. Default: `DEFAULT_PARAMS`.
- model_name : str
- Name of the model. Used in `Model` instance and to save model files.
- output_dir : str or `os.path`
- Output directory to save model files.
- verbose : int
- Verbosity mode.
- """
- # Save params into internal attributes and perform some basic verifications.
- # Dict params` is not saved to encourage detection of common input errors (missing keys, wrong values)
- # as early as possible.
- self._num_users = params["num_users"]
- self._num_items = params["num_items"]
- self._layers_sizes = params["layers_sizes"]
- self._layers_l2reg = params["layers_l2reg"]
- if len(self._layers_sizes) != len(self._layers_l2reg):
- raise ValueError("'layers_sizes' length = {}, 'layers_l2reg' length = {}, but must be equal."
- .format(len(self._layers_sizes), len(self._layers_l2reg)))
- self._num_layers = len(self._layers_sizes)
- # params to compile
- self._optimizer = params["optimizer"]
- if self._optimizer not in OPTIMIZERS:
- raise NotImplementedError("Optimizer {} is not implemented.".format(params["optimizer"]))
- self._lr = params["lr"]
- self._beta_1 = params.get("beta_1", 0.9) # optional, for Adam optimizer, default from paper
- self._beta_2 = params.get("beta_2", 0.999) # optional, for Adam optimizer, default from paper
- self._batch_size = params["batch_size"]
- self._num_negs_per_pos = params["num_negs_per_pos"]
- if self._num_negs_per_pos <= 0:
- raise ValueError("num_negs_per_pos must be > 0, found {}".format(self._num_negs_per_pos))
- if self._batch_size % (self._num_negs_per_pos + 1):
- raise ValueError("Batch size must be divisible by (num_negs_per_pos + 1). Found: batch_size={}, "
- "num_negs_per_pos={}".format(self._batch_size, self._num_negs_per_pos))
- self._batch_size_eval = params["batch_size_eval"]
- self._num_negs_per_pos_eval = params["num_negs_per_pos_eval"]
- if self._num_negs_per_pos_eval <= 0:
- raise ValueError("num_negs_per_pos_eval must be > 0, found {}".format(self._num_negs_per_pos_eval))
- if self._batch_size_eval % (self._num_negs_per_pos_eval + 1):
- raise ValueError("Batch size (eval) must be divisible by (num_negs_per_pos_eval + 1). Found: "
- "batch_size_eval={}, num_negs_per_pos_eval={}".format(self._batch_size_eval,
- self._num_negs_per_pos_eval))
- self._k = params.get("k", self._num_negs_per_pos + 1) # optional
- if self._k > (self._num_negs_per_pos + 1):
- raise ValueError("'k' must be lower than (num_negs_per_pos + 1) and lower than (num_negs_per_pos_eval + 1)."
- "Found: k={}, num_negs_per_pos={}, num_negs_per_pos_eval={}"
- .format(self._k, self._num_negs_per_pos, self._num_negs_per_pos_eval))
- # Create output dir and get file names
- try:
- os.makedirs(output_dir)
- except FileExistsError:
- # directory already exists
- pass
- self.name = model_name
- self._model_weights_path = self.get_model_weights_path(output_dir, model_name)
- self._params_path = self.get_params_json_path(output_dir, model_name)
- # serialize params for later (should just be used to save to file)
- self._serialized_params = json.dumps(params)
- self._output_model_checkpoints = os.path.join(
- output_dir, "{}-checkpoint-{{epoch:02d}}-{{val_loss:.2f}}.h5".format(model_name))
- self.verbose = verbose
- # Build model and compile
- self.model = self.build_mlp_model()
- self.compile_model()
- def build_mlp_model(self):
- """
- Build a MLP (Multi Layer Perceptron) model with the following architecture:
- Input: 2 matrices of size (batch, num_users), (batch, num_items)
- First Layer: [ User Embedding ][ Item Embedding ]
- N times: [ Hidden Dense Layer ]
- Last Layer: [ Prediction Layer ]
- Output: vector of size (batch, 1)
- Returns
- -------
- model: `Model`
- Keras MLP model.
- """
- # Inputs
- user_input = Input(shape=(1,), dtype="int32", name="user_input")
- item_input = Input(shape=(1,), dtype="int32", name="item_input")
- # First layer is the concatenation of embeddings for users and items
- # (size of each is about half of layers_sizes[0])
- user_layer_size = self._layers_sizes[0] // 2
- item_layer_size = self._layers_sizes[0] - user_layer_size # in case layers_sizes[0] % 2 != 0
- user_embedding = Embedding(
- input_dim=self._num_users, output_dim=user_layer_size, input_length=1,
- embeddings_initializer="glorot_uniform", embeddings_regularizer=l2(self._layers_l2reg[0]),
- name="user_embedding"
- )
- item_embedding = Embedding(
- input_dim=self._num_items, output_dim=item_layer_size, input_length=1,
- embeddings_initializer="glorot_uniform", embeddings_regularizer=l2(self._layers_l2reg[0]),
- name="item_embedding"
- )
- mlp_layer = concatenate([Flatten()(user_embedding(user_input)),
- Flatten()(item_embedding(item_input))])
- # Hidden layers
- for layer_i in range(1, self._num_layers):
- hidden = Dense(
- units=self._layers_sizes[layer_i], activation="relu",
- kernel_initializer="glorot_uniform", kernel_regularizer=l2(self._layers_l2reg[layer_i]),
- name="hidden_{}".format(layer_i)
- )
- mlp_layer = hidden(mlp_layer)
- # Prediction layer
- pred_layer = Dense(
- units=1, activation="sigmoid",
- kernel_initializer="lecun_uniform", name=OUTPUT_PRED
- )
- output_pred = pred_layer(mlp_layer)
- rank_layer = RankLayer(self._num_negs_per_pos, self._num_negs_per_pos_eval, name=OUTPUT_RANK)
- rank = rank_layer(output_pred)
- # Create Model
- model = Model([user_input, item_input], [output_pred, rank])
- return model
- def compile_model(self):
- # Set optimizer
- if self._optimizer == ADAM_NAME:
- optimizer = Adam(self._lr, self._beta_1, self._beta_2)
- elif self._optimizer == SGD_NAME:
- optimizer = SGD(self._lr)
- else:
- raise NotImplementedError("Optimizer {} is not implemented.".format(self._optimizer))
- # Create metrics
- hit_rate_fn = functools.partial(hit_rate, k=self._k, pred_rank_idx=self.get_pred_rank())
- hit_rate_fn.__name__ = HIT_RATE
- dcg_fn = functools.partial(discounted_cumulative_gain, k=self._k, pred_rank_idx=self.get_pred_rank())
- dcg_fn.__name__ = DCG
- # Compile model
- self.model.compile(optimizer=optimizer,
- loss={OUTPUT_PRED: "binary_crossentropy"},
- metrics={OUTPUT_PRED: [hit_rate_fn, dcg_fn]})
- @staticmethod
- def get_model_weights_path(output_dir, model_name):
- return os.path.join(output_dir, "{}_weights.h5".format(model_name))
- @staticmethod
- def get_params_json_path(output_dir, model_name):
- return os.path.join(output_dir, "{}_params.json".format(model_name))
- def get_pred_rank(self):
- """
- Get output of rank layer.
- Returns
- -------
- output : `tf.Tensor`
- Output of rank layer.
- """
- return self.model.get_layer(OUTPUT_RANK).output
- def log_summary(self):
- self.model.summary(print_fn=logging.info)
- def save(self):
- """
- Save params and weights to files.
- """
- # Save model weights and serialized model class instance.
- self.model.save_weights(self._model_weights_path)
- logging.info('Model weights saved to: {}'.format(self._model_weights_path))
- with open(self._params_path, 'w') as f_out:
- f_out.write(self._serialized_params)
- logging.info('Model params saved to: {}'.format(self._params_path))
- @staticmethod
- def load_from_dir(model_dir, model_name, verbose=1):
- """
- Load `MovierecModel` from directory and model name. File names are built like when saving in a
- `MovierecModel` instance.
- Parameters
- ----------
- model_dir : str or `os.path`
- Directory where model files are. Also used as output directory to create the MovierecModel object,
- in case `save` is called.
- model_name : str or `os.path`
- Model name, used to build model file name. Also used to create a MovierecModel object, in case `save`
- is called.
- verbose : int
- Verbosity level.
- Returns
- -------
- MovierecModel object.
- """
- params_path = MovierecModel.get_params_json_path(model_dir, model_name)
- weights_path = MovierecModel.get_model_weights_path(model_dir, model_name)
- return MovierecModel.load_from_files(params_path, weights_path, model_dir, model_name, verbose)
- @staticmethod
- def load_from_files(params_path, weights_path, output_model_dir, output_model_name, verbose=1):
- """
- Load `MovierecModel` from param and weight files.
- Parameters
- ----------
- params_path : str or `os.path`
- Path of params json file.
- weights_path : str or `os.path`
- Path of weights h5 file.
- output_model_dir : str or `os.path`
- Output directory, needed to create a MovierecModel object, in case `save` is called.
- output_model_name : str or `os.path`
- Model name, needed to create a MovierecModel object, in case `save` is called.
- verbose : int
- Verbosity level.
- Returns
- -------
- MovierecModel object.
- """
- with open(params_path, 'r') as f_in:
- params = json.load(f_in)
- print(params)
- movierec = MovierecModel(params, output_model_dir, output_model_name, verbose)
- movierec.model.load_weights(weights_path)
- return movierec
- def fit_generator(self, train_data_generator, validation_data_generator, epochs):
- """
- Call keras 'fit_generator' on the model with early stopping and checkpoint callbacks.
- Parameters
- ----------
- train_data_generator : A generator or a `keras.utils.Sequence`
- Generator of training data.
- validation_data_generator : A generator or a `keras.utils.Sequence`
- Generator of validation data.
- epochs : int
- Number of epochs.
- Returns
- -------
- Output of model.fit_generator(...) (`History` object)
- """
- # Callbacks
- callbacks = [
- EarlyStopping(monitor=METRIC_VAL_DCG, mode='max', restore_best_weights=True, patience=5,
- verbose=self.verbose),
- ModelCheckpoint(self._output_model_checkpoints, monitor=METRIC_VAL_DCG, save_best_only=True,
- save_weights_only=False, mode='max', verbose=self.verbose)
- ]
- return self.model.fit_generator(generator=train_data_generator,
- validation_data=validation_data_generator,
- epochs=epochs,
- callbacks=callbacks,
- verbose=self.verbose)
- class RankLayer(Layer):
- def __init__(self, num_negs_per_pos_train, num_negs_per_pos_eval, name, **kwargs):
- super(RankLayer, self).__init__(name=name, **kwargs)
- self.num_negs_per_pos_train = num_negs_per_pos_train
- self.num_negs_per_pos_eval = num_negs_per_pos_eval
- self._uses_learning_phase = True
- def call(self, inputs, **kwargs):
- inputs = K.ops.convert_to_tensor(inputs)
- num_negs_per_pos = K.in_train_phase(self.num_negs_per_pos_train, self.num_negs_per_pos_eval)
- # Reshape and get ranked indices per user
- y_pred_per_user = K.reshape(inputs, (-1, num_negs_per_pos + 1))
- _, indices = K.nn.top_k(y_pred_per_user, K.shape(y_pred_per_user)[1], sorted=True)
- return indices
- def get_config(self):
- config = {'num_negs_per_pos_train': self.num_negs_per_pos_train,
- 'num_negs_per_pos_eval': self.num_negs_per_pos_eval}
- base_config = super(RankLayer, self).get_config()
- return dict(list(base_config.items()) + list(config.items()))
- def hit_rate(y_true, _, k, pred_rank_idx):
- """
- Compute HR (Hit Rate) in batch considering only the top 'k' items in the rank.
- Parameters
- ----------
- y_true : `tf.Tensor` (or `np.array`)
- True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
- and the rest are negative (0).
- _
- Placeholder for y_pred. Ignored argument that will be passed by keras metrics, but this method will only
- use pred_rank_idx.
- k : int
- Number of top elements to consider for the metric computation.
- pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
- Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
- with the ranked indexes of the items in the row.
- Returns
- -------
- hit rate: `tf.Tensor`
- A single value tensor with the hit rate for the batch.
- """
- hits_per_user, _ = _get_hits_per_user(y_true, pred_rank_idx, k)
- return K.mean(hits_per_user, axis=-1)
- def discounted_cumulative_gain(y_true, _, k, pred_rank_idx):
- """
- Compute DCG (Discounted Cumulative Gain) considering only the top 'k' items in the rank.
- Parameters
- ----------
- y_true : `tf.Tensor` (or `np.array`)
- True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
- and the rest are negative (0).
- _
- Placeholder for y_pred. Ignored argument that will be passed by keras metrics, but this method will only
- use pred_rank_idx.
- k : int
- Number of top elements to consider for the metric computation.
- pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
- Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
- with the ranked indexes of the items in the row.
- Returns
- -------
- discounted cumulative gain: `tf.Tensor`
- A single value tensor with the average Discounted Cumulative Gain on the top k for the batch.
- """
- hits_per_user, idx_label_in_pred_rank = _get_hits_per_user(y_true, pred_rank_idx, k)
- # compute dcg for each item, but make 0.0 the entries where position is > k (only consider top k)
- dcg_per_user = K.math_ops.log(2.) / K.math_ops.log(K.cast(idx_label_in_pred_rank, "float32") + 2)
- dcg_per_user *= hits_per_user
- return K.mean(dcg_per_user, axis=-1)
- def _get_hits_per_user(y_true, pred_rank_idx, k):
- """
- Compute the position of the label in the predicted ranking and whether is a hit on top k or not.
- Parameters
- ----------
- y_true : `tf.Tensor` (or `np.array`)
- True labels. For every (`num_negs_per_pos` + 1) items, there should be only one positive class (+1)
- and the rest are negative (0).
- pred_rank_idx : `tf.Tensor`(or `np.array`) of integers. Shape: (users per batch, num_negs_per_pos + 1)
- Tensor representing a ranking. Each row represents a single user and contains (num_negs_per_pos + 1) elements
- with the ranked indexes of the items in the row.
- k : int
- Number of top elements to consider for the metric computation.
- Returns
- -------
- Tuple: (hits_per_user, idx_label_in_pred_rank)
- hits_per_user : `tf.Tensor` with shape (users per batch, )
- Tensor of floats where elements are 1.0 if there is a hit (label is in top k) for that user, or
- 0.0 otherwise.
- idx_label_in_pred_rank : `tf.Tensor` with shape (users per batch, )
- Tensor of integers with the index of the label in the predicted rank.
- """
- # Get the index of the positive label per user.
- # Assume that every user has num_neg_per_pos negatives (zeros) and one positive (1).
- idx_label_per_user = K.reshape(y_true, K.shape(pred_rank_idx))
- idx_label_per_user = K.math_ops.argmax(idx_label_per_user, axis=-1, output_type="int32")
- # get the position of the expected label in the ranked list and compute whether is a hit in top k
- idx_label_in_pred_rank = K.array_ops.where(K.equal(pred_rank_idx, K.reshape(idx_label_per_user, (-1, 1))))[:, -1]
- # determine whether the label is in top k of ranking or not
- hits_per_user = K.cast(K.less(idx_label_in_pred_rank, k), "float32")
- return hits_per_user, idx_label_in_pred_rank
|