Source code for banditpylib.bandits.mnl_bandit_utils

from abc import abstractmethod

from typing import List, Tuple, Set, Optional

from absl import logging

import numpy as np


def search(
    assortments: List[Set[int]],
    product_num: int,
    next_product_id: int,
    assortment: Set[int],
    card_limit: int = np.inf,  # type: ignore
    restricted_products: Set[int] = None):
  """Find all assortments satisfying cardinality limit

  Args:
    assortments: all eligible assortments found so far
    product_num: total number of products
    next_product_id: next product to consider
    assortment: current assortment
    card_limit: cardinality limit
    restricted_products: products can only be selected from this restricted set
  """
  if next_product_id == (product_num + 1):
    # ignore empty assortment
    if assortment:
      assortments.append(assortment)
    return
  if len(assortment) < card_limit and (restricted_products is None or
                                       next_product_id in restricted_products):
    search(assortments, product_num, next_product_id + 1,
           assortment.union({next_product_id}), card_limit,
           restricted_products)
  search(assortments, product_num, next_product_id + 1, assortment, card_limit,
         restricted_products)


[docs]class Reward: """General reward class""" def __init__(self): self.__preference_params: Optional[np.ndarray] = None self.__revenues: Optional[np.ndarray] = None @property @abstractmethod def name(self) -> str: """Reward name"""
[docs] @abstractmethod def calc(self, assortment: Set[int]) -> float: """ Args: assortment: assortment to calculate Returns: reward of the assortment """
@property def preference_params(self) -> np.ndarray: """preference parameters (product 0 is included)""" if self.__preference_params is None: raise Exception('Preference parameters are not set yet.') return self.__preference_params @property def revenues(self) -> np.ndarray: """revenues (product 0 is included)""" if self.__revenues is None: raise Exception('Revenues of products are not set yet.') return self.__revenues
[docs] def set_preference_params(self, preference_params: np.ndarray): """ Args: preference_params: preference parameters of products """ self.__preference_params = preference_params
[docs] def set_revenues(self, revenues: np.ndarray): """ Args: revenues: revenues of products """ self.__revenues = revenues
[docs]class MeanReward(Reward): """Mean reward""" def __init__(self): super().__init__() @property def name(self) -> str: return 'mean_reward'
[docs] def calc(self, assortment: Set[int]) -> float: preference_params_sum = ( sum([self.preference_params[product] for product in assortment]) + self.preference_params[0]) return sum([ self.preference_params[product] / preference_params_sum * self.revenues[product] for product in assortment ])
[docs]class CvarReward(Reward): """CVaR reward :param float alpha: percentile of cvar """ def __init__(self, alpha: float): super().__init__() if alpha <= 0: raise ValueError('Alpha is expected greater than 0. Got %.2f.' % alpha) # alpha is at most 1.0 if alpha > 1.0: logging.error('Alpha %.2f is expected at most 1. I am setting it to 1.' % alpha) self.__alpha = alpha if alpha <= 1.0 else 1.0 @property def name(self) -> str: return 'cvar_reward' @property def alpha(self) -> float: """Percentile of cvar""" return self.__alpha
[docs] def calc(self, assortment: Set[int]) -> float: preference_params_sum = sum( [self.preference_params[product] for product in assortment]) + self.preference_params[0] # Sort according to revenue of product revenue_prob = sorted([ (self.revenues[0], self.preference_params[0] / preference_params_sum) ] + [(self.revenues[product], self.preference_params[product] / preference_params_sum) for product in assortment], key=lambda x: x[0]) # The minimum revenue should be 0, which is the revenue of non-purchase if revenue_prob[0][0] != 0: raise Exception('CVaR calculation error!') # Find the index of revenue_prob such that the accumulate probability is at # least alpha accumulate_prob = revenue_prob[0][1] next_ind = 1 while next_ind < len(revenue_prob) and (accumulate_prob < self.__alpha or revenue_prob[next_ind][0] == revenue_prob[next_ind - 1][0]): accumulate_prob += revenue_prob[next_ind][1] next_ind += 1 # calculate cvar_alpha cvar_alpha = sum( [revenue_prob[ind][0] * revenue_prob[ind][1] \ for ind in range(next_ind)]) cvar_alpha -= revenue_prob[next_ind - 1][0] * (accumulate_prob - self.__alpha) cvar_alpha /= self.__alpha return cvar_alpha
[docs]def search_best_assortment( reward: Reward, card_limit: int = np.inf # type: ignore ) -> Tuple[float, Set[int]]: """Search assortment with the maximum reward Args: reward: reward definition card_limit: cardinality constraint Returns: assortment with the maximum reward """ product_num = len(reward.revenues) - 1 restricted_products = None if isinstance(reward, MeanReward): # a fast method to find the best assortment when the reward is MeanReward revenues = reward.revenues sorted_revenues = sorted(list(zip(revenues, range(product_num + 1))), key=lambda x: x[0]) best_assortment = {sorted_revenues[-1][1]} next_ind = product_num - 1 while next_ind > 0 and reward.calc(best_assortment) < revenues[ sorted_revenues[next_ind][1]]: best_assortment.add(sorted_revenues[next_ind][1]) next_ind -= 1 if len(best_assortment) <= card_limit: return (reward.calc(best_assortment), best_assortment) else: restricted_products = best_assortment assortments: List[Set[int]] = [] search( assortments=assortments, product_num=product_num, next_product_id=1, assortment=set(), card_limit=card_limit, # type: ignore restricted_products=restricted_products) # Sort assortments according to reward value sorted_assort = sorted([(reward.calc(assortment), assortment) for assortment in assortments], key=lambda x: x[0]) # Randomly select one assortment with the maximum reward ind = len(sorted_assort) - 1 while (ind > 0 and sorted_assort[ind - 1][0] == sorted_assort[ind][0]): ind -= 1 return sorted_assort[np.random.randint(ind, len(sorted_assort))]
[docs]def local_search_best_assortment( reward: Reward, random_neighbors: int, card_limit: int, init_assortment: Set[int] = None) -> Tuple[float, Set[int]]: """Local search assortment with the maximum reward .. warning:: This method does not guarantee to output the best assortment. .. todo:: Implement this function with `cppyy`. Args: reward: reward definition random_neighbors: number of random neighbors to look up card_limit: cardinality constraint init_assortment: initial assortment to start Returns: local best assortment with its reward """ if random_neighbors <= 0: raise Exception('Number of neighbors to look up %d is no greater than 0!' \ % random_neighbors) product_num = len(reward.revenues) - 1 # All available products all_products = set(range(1, product_num + 1)) if init_assortment is None: # Randomly generate an assortment initially best_assortment = set( np.random.choice(list(all_products), card_limit, replace=False)) best_reward = reward.calc(best_assortment) else: best_assortment = set(init_assortment) best_reward = reward.calc(best_assortment) remaining_products = all_products - best_assortment while True: available_operations = [] if len(remaining_products) > 0: available_operations.append('replace') if len(best_assortment) > 1: available_operations.append('remove') if len(remaining_products) > 0 and len(best_assortment) < card_limit: available_operations.append('add') local_best_assortment = set() local_best_reward = 0.0 for _ in range(random_neighbors): # pylint: disable=no-member operation = np.random.choice(available_operations) if operation == 'replace': # Replace one product product_to_remove = np.random.choice(list(best_assortment)) product_to_add = np.random.choice(list(remaining_products)) new_assortment = set(best_assortment) new_assortment.remove(product_to_remove) new_assortment.add(product_to_add) new_reward = reward.calc(new_assortment) elif operation == 'remove': # Remove one product product_to_remove = np.random.choice(list(best_assortment)) new_assortment = set(best_assortment) new_assortment.remove(product_to_remove) new_reward = reward.calc(new_assortment) else: # operation = 'add' # Add one product product_to_add = np.random.choice(list(remaining_products)) new_assortment = set(best_assortment) new_assortment.add(product_to_add) new_reward = reward.calc(new_assortment) if new_reward > local_best_reward: local_best_assortment = new_assortment local_best_reward = new_reward if local_best_reward > best_reward: best_assortment = local_best_assortment best_reward = local_best_reward remaining_products = all_products - best_assortment else: break return (best_reward, best_assortment)