Source code for banditpylib.learners.linear_bandit_learner.linucb

from typing import Optional, List

import numpy as np

from banditpylib.data_pb2 import Context, Actions, Feedback
from .utils import LinearBanditLearner


[docs]class LinUCB(LinearBanditLearner): r"""Linear Upper Confidence Bound policy .. todo:: Add algorithm description. :param List[np.ndarray] features: feature vector of each arm in a list :param float delta: delta :param float lambda_reg: lambda for regularization :param Optional[str] name: alias name """ def __init__(self, features: List[np.ndarray], delta: float, lambda_reg: float, name: Optional[str] = None): super().__init__(arm_num=len(features), name=name) if delta <= 0 or delta >= 1: raise ValueError('Delta is expected within (0, 1). Got %.2f.' % delta) if lambda_reg <= 0: raise ValueError('lambda_reg is expected greater than 0. Got %.2f.' % lambda_reg) self.__delta = delta self.__lambda_reg = lambda_reg self.__d = len(features[0]) # d: length of each feature self.__k = len(features) # arm_nums # feature_matrix: d x k matrix of features stacked self.__feature_matrix = np.zeros((self.__d, self.__k)) for i, feature in enumerate(features): self.__feature_matrix[:, i] = feature.reshape(-1) def _name(self) -> str: return 'linucb'
[docs] def reset(self): self.__summation_AtXt = np.zeros( (self.__d, 1)) # summation_AtXt: accumulated sum of At * Xt, d x 1 self.__Vt = self.__lambda_reg * np.eye( self.__d) #Vt: V matrix at time t, d x d self.__theta_hat_t = np.random.normal( 0, size=(self.__d, 1)) # theta_hat_t: the learners estimate of theta, d x 1 # Current time step self.__time = 1
def __LinUCB(self) -> np.ndarray: """Optimistic estimate of arms' real means Returns: optimistic estimate of arms' real means """ root_beta_t = np.sqrt( self.__lambda_reg) + np.sqrt(2 * np.log(1 / self.__delta) + self.__d * np.log(1 + (self.__time - 1) / (self.__lambda_reg * self.__d))) ucb = self.__feature_matrix.T @ self.__theta_hat_t + root_beta_t *\ np.sqrt((self.__feature_matrix.T @ np.linalg.pinv(self.__Vt) @ self.__feature_matrix).diagonal()).reshape(-1, 1) return ucb
[docs] def actions(self, context: Context) -> Actions: del context actions = Actions() arm_pull = actions.arm_pulls.add() ucb = self.__LinUCB() arm_pull.arm.id = int(np.argmax(ucb, axis=0)) arm_pull.times = 1 return actions
[docs] def update(self, feedback: Feedback): arm_feedback = feedback.arm_feedbacks[0] pulled_arm_index = arm_feedback.arm.id # Xt: reward observed at t Xt = np.array(arm_feedback.rewards) # At: feature of arm played at t At = self.__feature_matrix[:, pulled_arm_index].reshape(-1, 1) self.__Vt += (At @ At.T) self.__summation_AtXt += At * Xt self.__theta_hat_t = np.linalg.pinv(self.__Vt) @ self.__summation_AtXt self.__time += 1