""" This module provides the following scheduling heuristics as function: - EDD: earliest due date - SPT: shortest processing time first - MTR: most tasks remaining - LTR: least tasks remaining - Random: random action You can implement additional heuristics in this file by specifying a function that takes a list of tasks and an action mask and returns the index of the job to be scheduled next. If you want to call your heuristic via the HeuristicSelectionAgent or edit an existing shortcut, adapt/extend the task_selection dict attribute of the HeuristicSelectionAgent class. :Example: Add a heuristic that returns zeros (this is not a practical example!) 1. Define the according function .. code-block:: python def return_0_heuristic(tasks: List[Task], action_mask: np.array) -> int: return 0 2. Add the function to the task_selection dict within the HeuristicSelectionAgent class: .. code-block:: python self.task_selections = { 'rand': random_task, 'EDD': edd, 'SPT': spt, 'MTR': mtr, 'LTR': ltr, 'ZERO': return_0_heuristic } """ import numpy as np from typing import List from src.data_generator.task import Task def get_active_task_dict(tasks: List[Task]) -> dict: """ Helper function to determining the next unfinished task to be processed for each job :param tasks: List of task objects, so one instance :return: Dictionary containing the next tasks to be processed for each job Would be an empty dictionary if all tasks were completed """ active_job_task_dict = {} for task_i, task in enumerate(tasks): if not task.done and task.job_index not in active_job_task_dict.keys(): active_job_task_dict[task.job_index] = task_i return active_job_task_dict def edd(tasks: List[Task], action_mask: np.array) -> int: """ EDD: earliest due date. Determines the job with the smallest deadline :param tasks: List of task objects, so one instance :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :return: Index of the job selected according to the heuristic """ if np.sum(action_mask) == 1: chosen_job = np.argmax(action_mask) else: num_jobs = action_mask.shape[0] - 1 num_tasks_per_job = len(tasks) / num_jobs deadlines = np.full(num_jobs + 1, np.inf) for job_i in range(num_jobs): idx = int(num_tasks_per_job * job_i) deadlines[job_i] = tasks[idx].deadline deadlines = np.where(action_mask == 1, deadlines, np.full(deadlines.shape, np.inf)) chosen_job = np.argmin(deadlines) return chosen_job def spt(tasks: List[Task], action_mask: np.array) -> int: """ SPT: shortest processing time first. Determines the job of which the next unfinished task has the lowest runtime :param tasks: List of task objects, so one instance :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :return: Index of the job selected according to the heuristic """ if np.sum(action_mask) == 1: chosen_job = np.argmax(action_mask) else: num_jobs = action_mask.shape[0] - 1 runtimes = np.full(num_jobs + 1, np.inf) active_task_dict = get_active_task_dict(tasks) for i in range(num_jobs): if i in active_task_dict.keys(): task_idx = active_task_dict[i] runtimes[i] = tasks[task_idx].runtime runtimes = np.where(action_mask == 1, runtimes, np.full(runtimes.shape, np.inf)) chosen_job = np.argmin(runtimes) return chosen_job def mtr(tasks: List[Task], action_mask: np.array) -> int: """ MTR: most tasks remaining. Determines the job with the least completed tasks :param tasks: List of task objects, so one instance :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :return: Index of the job selected according to the heuristic """ if np.sum(action_mask) == 1: chosen_job = np.argmax(action_mask) else: tasks_done = np.zeros(len(tasks) + 1) possible_tasks = get_active_task_dict(tasks) for _, task in enumerate(tasks): if task.done and task.job_index in possible_tasks.keys(): tasks_done[possible_tasks[task.job_index]] += 1 task_mask = np.zeros(len(tasks) + 1) for job_id, task_id in possible_tasks.items(): if action_mask[job_id] == 1: task_mask[task_id] += 1 tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, np.inf)) tasks_done[-1] = np.inf chosen_task = np.argmin(tasks_done) chosen_job = tasks[chosen_task].job_index return chosen_job def ltr(tasks: List[Task], action_mask: np.array) -> int: """ LTR: least tasks remaining. Determines the job with the most completed tasks :param tasks: List of task objects, so one instance :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :return: Index of the job selected according to the heuristic """ if np.sum(action_mask) == 1: chosen_job = np.argmax(action_mask) else: tasks_done = np.zeros(len(tasks) + 1) possible_tasks = get_active_task_dict(tasks) for _, task in enumerate(tasks): if task.done and task.job_index in possible_tasks.keys(): tasks_done[possible_tasks[task.job_index]] += 1 task_mask = np.zeros(len(tasks) + 1) for job_id, task_id in possible_tasks.items(): if action_mask[job_id] == 1: task_mask[task_id] += 1 tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, -1)) tasks_done[-1] = -1 chosen_task = np.argmax(tasks_done) chosen_job = tasks[chosen_task].job_index return chosen_job def random_task(tasks: List[Task], action_mask: np.array) -> int: """ Returns a random task :param tasks: Not needed :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :return: Index of the job selected according to the heuristic """ chosen_job = None if np.sum(action_mask) == 1: chosen_job = np.argmax(action_mask) else: valid_values_0 = np.where(action_mask > 0)[0] if len(valid_values_0) > 2: chosen_job = np.random.choice(valid_values_0, size=1)[0] elif len(valid_values_0) == 0: print('this is not possible') else: chosen_job = np.random.choice(valid_values_0, size=1)[0] return chosen_job def choose_random_machine(chosen_task, machine_mask) -> int: """ Determines a random machine which is available according to the mask and chosen task. Useful for the FJSSP. :param chosen_task: ID of the task that is scheduled on the selected machine :param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function
:return: Index of the chosen machine """ machine_mask = np.array(np.where(machine_mask > 0)) idx_valid_machine = np.where(machine_mask[0] == chosen_task) valid_machines = machine_mask[1][idx_valid_machine] chosen_machine = np.random.choice(valid_machines, size=1)[0] return chosen_machine def choose_first_machine(chosen_task, machine_mask) -> int: """ Determines the first (by index) machine which is available according to the mask and chosen task. Useful for the FJSSP :param chosen_task: ID of the task that is scheduled on the selected machine :param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function :return: Index of the chosen machine """ machine_mask = np.array(np.where(machine_mask > 0)) idx_valid_machine = np.where(machine_mask[0] == chosen_task) valid_machines = machine_mask[1][idx_valid_machine] return valid_machines[0] class HeuristicSelectionAgent: """ This class can be used to get the next task according to the heuristic passed as string abbreviation (e.g. EDD). If you want to edit a shortcut, or add one for your custom heuristic, adapt/extend the task_selection dict. :Example: .. code-block:: python def my_custom_heuristic(): ...... or .. code-block:: python self.task_selections = { 'rand': random_task, 'XYZ': my_custom_heuristic } """ def __init__(self) -> None: super().__init__() # Map heuristic ids to corresponding function self.task_selections = { 'rand': random_task, 'EDD': edd, 'SPT': spt, 'MTR': mtr, 'LTR': ltr } def __call__(self, tasks: List, action_mask: np.array, task_selection: str) -> int: """ Selects the next heuristic function according to the heuristic passed as string abbreviation and the assignment in the task_selections dictionary :param tasks: List of task objects, so one instance :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic :param task_selection: Heuristic string abbreviation (e.g. EDD) :return: Index of the job selected according to the heuristic """ choose_task = self.task_selections[task_selection] chosen_task = choose_task(tasks, action_mask) return chosen_task
4.2 强化学习代理
其中强化学习包括DQN、PPO、PPO_masked三种代理。
4.2.1 DQN
""" DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation. To reuse trained models, you can make use of the save and load function. To adapt policy and value network structure, specify the layer and activation parameter in your train config or change the constants in this file """ import numpy as np import pickle import random from collections import deque import torch as T import torch.nn as nn import torch.optim as optim from torch.nn import functional as F from typing import Tuple, List from src.utils.logger import Logger # constants LAYER: List[int] = [64, 64] ACTIVATION: str = 'ReLU' class MemoryBuffer: """ Handles episode data collection and sample generation :param buffer_size: Buffer size :param batch_size: Size for batches to be generated :param obs_dim: Size of the observation to be stored in the buffer :param obs_type: Type of the observation to be stored in the buffer :param action_type: Type of the action to be stored in the buffer """ def __init__(self, buffer_size: int, batch_size: int, obs_dim: int, obs_type: type, action_type: type): self.buffer_size = buffer_size self.batch_size = batch_size self.pos = 0 self.full = False # buffer data self.obs = np.zeros((buffer_size, obs_dim), dtype=obs_type) self.actions = np.zeros((buffer_size, 1), dtype=action_type) self.rewards = np.zeros((buffer_size, 1), dtype=np.float32) self.dones = np.zeros((buffer_size, 1), dtype=np.float32) # try with dtype=np.bool self.new_obs = np.zeros((buffer_size, obs_dim), dtype=np.float32) def __len__(self): if self.full: return self.buffer_size else: return self.pos def store_memory(self, obs, action, reward, done, new_obs) -> None: """ Appends all data from the recent step :param obs: Observation at the beginning of the step :param action: Index of the selected action :param reward: Reward the env returned in this step :param done: True if the episode ended in this step :param new_obs: Observation after the step :return: """ self.obs[self.pos] = np.array(obs).copy() self.actions[self.pos] = np.array(action).copy() self.rewards[self.pos] = np.array(reward).copy() self.dones[self.pos] = np.array(done).copy() self.new_obs[self.pos] = np.array(new_obs) self.pos += 1 # if pos behind last element -> buffer full. # Return pos to 0. Next step, the oldest data in the buffer is then replaced by the newest one if self.pos == self.buffer_size: self.full = True self.pos = 0 def get_samples(self) -> Tuple: """ Generates random samples from the stored data :return: batch_size samples from the buffer. e.g. obs, actions, ..., new_obs from step 21 """ # generate batch_size random indices in range of current buffer len indices = np.random.randint(0, len(self), size=self.batch_size) return self.obs[indices], self.actions[indices], self.rewards[indices], \ self.dones[indices], self.new_obs[indices] class Policy(nn.Module): """ Network structure used for both the Q network and the target network :param obs_dim: Observation size to determine input dimension :param action_dim: Number of action to determine output size :param learning_rate: Learning rate for the network :param hidden_layers: List of hidden layer sizes (int) :param activation: String naming activation function for hidden layers """ def __init__(self, obs_dim: int, action_dim: int, learning_rate: float, hidden_layers: List[int], activation: str): super(Policy, self).__init__()
net_structure = [] # get activation class according to string activation = getattr(nn, activation)() # create first hidden layer in accordance with the input dim and the first hidden dim net_structure.extend([nn.Linear(obs_dim, hidden_layers[0]), activation]) # create the other hidden layers for i, layer_dim in enumerate(hidden_layers): if not i + 1 == len(hidden_layers): net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation]) else: # create output layer net_structure.append(nn.Linear(layer_dim, action_dim)) self.q_net = nn.Sequential(*net_structure) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu') self.to(self.device) def forward(self, obs): """ forward pass through the Q-network """ q_values = self.q_net(obs) return q_values class DQN: """DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation.""" def __init__(self, env, config: dict, logger: Logger = None): """ | batch_size: Number of samples that are chosen and passed through the net per update | gradient_steps: Number of updates per training | train_freq: Environment steps between two trainings | buffer_size: Size of the memory buffer = max number of rollouts that can be stored before the oldest are deleted | target_net_update: Number of steps between target_net_updates | training_starts = Learning_starts: steps after which training can start for the first time | initial_eps: Initial epsilon value | final_eps: Final epsilon value | fraction_eps: If the percentage progress of learn exceeds fraction eps, epsilon takes the final_eps value | e.g. 5/100 total_timesteps done -> progress = 0.5 > fraction eps -> eps=final_eps | max_grad_norm: Value to clip the policy update of the q_net :param env: Pregenerated, gymbased environment. If no env is passed, env = None -> PPO can only be used for evaluation (action prediction) :param config: Dictionary with parameters to specify DQN attributes :param logger: Logger """ self.env = env self.gamma = config.get('gamma', 0.99) self.learning_rate = config.get('learning_rate', 1e-4) self.batch_size = config.get('batch_size', 32) self.gradient_steps = config.get('gradient_steps', 1) self.train_freq = config.get('train_freq', 4) self.buffer_size = config.get('buffer_size', 1_000_000) self.target_net_update = config.get('target_net_update', 10_000) self.training_starts = config.get('training_starts', 50_000) self.initial_eps = config.get('initial_eps', 1.0) self.final_eps = config.get('final_eps', 0.05) self.fraction_eps = config.get('fraction_eps', 0.1) self.max_grad_norm = config.get('max_grad_norm', 10.0) self.epsilon = self.initial_eps # epsilon is the exploration rate self.remaining_progress = 1 # tracks how much % of total steps remain -> value between 1 and 0 self.num_timesteps = 0 self.n_updates = 0 self.logger = logger if logger else Logger(config=config) self.seed = config.get('seed', None)
self.reward_info = deque(maxlen=100) # torch seed setting if self.seed is not None: random.seed(self.seed) np.random.seed(self.seed) T.manual_seed(self.seed) self.env.action_space.seed(self.seed) self.env.seed(self.seed) # create networks and buffer self.q_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate, config.get('layer', LAYER), config.get('activation', ACTIVATION)) self.q_target_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate, config.get('layer', LAYER), config.get('activation', ACTIVATION)) # copy weights to target_net self.q_target_net.load_state_dict(self.q_net.state_dict()) self.memory_buffer = MemoryBuffer(self.buffer_size, self.batch_size, env.observation_space.shape[0], env.observation_space.dtype, env.action_space.dtype) def save(self, file: str) -> None: """ Save model as pickle file :param file: Path under which the file will be saved :return: None """ params_dict = self.__dict__.copy() del params_dict['logger'] data = { "params": params_dict, "q_params": self.q_net.state_dict(), "target_params": self.q_target_net.state_dict() } with open(f"{file}.pkl", "wb") as handle: pickle.dump(data, handle) @classmethod def load(cls, file: str, config: dict, logger: Logger = None): """ Creates a DQN object according to the parameters saved in file.pkl :param file: Path and filname (without .pkl) of your saved model pickle file :param config: Dictionary with parameters to specify PPO attributes :param logger: Logger :return: DQN object """ with open(f"{file}.pkl", "rb") as handle: data = pickle.load(handle) env = data["params"]["env"] # create DQN object. Commit necessary parameters. Update remaining parameters model = cls(env=env, config=config, logger=logger) model.__dict__.update(data["params"]) # set weights model.q_net.load_state_dict(data["q_params"]) model.q_target_net.load_state_dict(data["target_params"]) return model def get_action(self, obs: np.ndarray) -> int: """ Random action or action according to the policy and epsilon :return: action index """ if np.random.random() < self.epsilon: # random action from the action space action = self.env.action_space.sample() else: obs = T.tensor(obs, dtype=T.float).to(self.q_net.device) q_values = self.q_net(obs) # choose action with highest Q value -> greedy policy action = T.argmax(q_values) action = T.squeeze(action).item() return action def predict(self, observation: np.ndarray, action_mask: np.ndarray = np.ones(1), deterministic: bool = True, state=None) -> Tuple: """ Action prediction for testing :param observation: Current observation of teh environment :param action_mask: Mask of actions, which can logically be taken. NOTE: currently not implemented! :param deterministic: Set True, to force a deterministic prediction :param state: The last states (used in rnn policies) :return: Predicted action and next state (used in rnn policies) """ observation = T.tensor(np.array([observation]), dtype=T.float).to(self.q_net.device) with T.no_grad(): q_values = self.q_net(observation) if deterministic: action = T.argmax(q_values) else: # choose random action according to the predicted probs action = q_values.sample() action = T.squeeze(action).item() return action, state def train(self) -> None: """ Trains Q-network and Target-Network :return: None """ # Switch to train mode (this affects batch norm / dropout) self.q_net.train() losses = [] for _ in range(self.gradient_steps): # get samples from the buffer obs_arr, action_arr, reward_arr, done_array, new_obs_array = self.memory_buffer.get_samples() # convert to tensors obs = T.tensor(obs_arr, dtype=T.float).to(self.q_target_net.device) actions = T.tensor(action_arr, dtype=T.float).to(self.q_target_net.device) rewards = T.tensor(reward_arr, dtype=T.float).to(self.q_target_net.device) dones = T.tensor(done_array, dtype=T.float).to(self.q_target_net.device) new_obs = T.tensor(new_obs_array, dtype=T.float).to(self.q_target_net.device) # no update on the target net -> use no_grad with T.no_grad(): # Compute the next Q-values using the target network next_q_values = self.q_target_net(new_obs) # Follow greedy policy: use the one with the highest value next_q_values, _ = next_q_values.max(dim=1) # Avoid potential broadcast issue next_q_values = next_q_values.reshape(-1, 1) # 1-dones -> reward + 0 if step is last in episode target_q_values = rewards + (1 - dones) * self.gamma * next_q_values # get all current Q-values for each obs current_q_values = self.q_net(obs) # choose Q-Values according to actions current_q_values = T.gather(current_q_values, dim=1, index=actions.long()) # loss computation. MSE also possible loss = F.smooth_l1_loss(current_q_values, target_q_values) losses.append(loss.item()) # update self.q_net.optimizer.zero_grad() loss.backward() # clip nn.utils.clip_grad_norm_(self.q_net.parameters(), self.max_grad_norm) self.q_net.optimizer.step() self.n_updates += self.gradient_steps # logs self.logger.record( { 'agent_training/exploration rate': self.epsilon, 'agent_training/n_updates': self.n_updates, 'agent_training/loss': np.mean(losses), 'agent_training/mean_rwd': np.mean(self.reward_info) } ) self.logger.dump() # if self.num_timesteps % 10_000 == 0: # print(f'Update at {self.num_timesteps} Mean reward {np.mean(self.reward_info)}') def on_step(self, total_timesteps): """ Method track and check plenty conditions to e.g. check if q_target_net or epsilon update are necessary """ # update progress self.remaining_progress = 1 - float(self.num_timesteps) / float(total_timesteps) # update target_net with parameters from main q_net if self.num_timesteps % self.target_net_update == 0: self.q_target_net.load_state_dict(self.q_net.state_dict()) # update epsilon if (1-self.remaining_progress) > self.fraction_eps: # constant if fraction reached self.epsilon = self.final_eps else: # linear function. Goes from initial eps to final eps. Reaches final values in the step, # where the function turns constant self.epsilon = self.initial_eps + \ (1-self.remaining_progress) * (self.final_eps-self.initial_eps) / self.fraction_eps def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None: """ Learn over n problem instances or n timesteps (environment steps). Breaks depending on which condition is met first. One learning iteration consists of collecting rollouts and training the networks on the rollout data :param total_instances: Instance limit :param total_timesteps: Timestep limit :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before. """ instances = 0 # iterate over n episodes = the agents has n episodes to interact with the environment for _ in range(total_instances): obs = self.env.reset() done = False instances += 1 episode_reward = 0 # run agent on env until done while not done: # observe and fill buffer action = self.get_action(obs) new_obs, reward, done, info = self.env.step(action) self.num_timesteps += 1 episode_reward += reward self.memory_buffer.store_memory(obs, action, reward, done, new_obs) # call intermediate_test on_step if intermediate_test: intermediate_test.on_step(self.num_timesteps, instances, self) # call function intern on_step self.on_step(total_timesteps) # break learn if total_timesteps are reached if self.num_timesteps >= total_timesteps: print(f'Total timesteps reached: {total_timesteps}') self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump() return None # train if training_starts is reached and then every n rollout_steps if self.num_timesteps >= self.training_starts and self.num_timesteps % self.train_freq == 0: self.train() # switch back to eval mode self.q_net.train(False) obs = new_obs self.reward_info.append(episode_reward) if instances % len(self.env.data) == len(self.env.data) - 1: mean_training_reward = np.mean(self.env.episodes_rewards) mean_training_makespan = np.mean(self.env.episodes_makespans) mean_training_tardiness = np.mean(self.env.tardiness) self.logger.record( { 'results_on_train_dataset/mean_reward': mean_training_reward, 'results_on_train_dataset/mean_makespan': mean_training_makespan, 'results_on_train_dataset/mean_tardiness': mean_training_tardiness } ) self.logger.dump() print("TRAINING DONE") self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump()
4.2.2 PPO
""" PPO implementation inspired by the StableBaselines3 implementation. To reuse trained models, you can make use of the save and load function To adapt policy and value network structure, specify the policy and value layer and activation parameter in your train config or change the constants in this file """ import numpy as np import random import torch as T import torch.nn as nn import torch.optim as optim from torch.distributions.categorical import Categorical import pickle from typing import Tuple, Any, List from src.utils.logger import Logger # constants POLICY_LAYER: List[int] = [256, 256] POLICY_ACTIVATION: str = 'ReLU' VALUE_LAYER: List[int] = [256, 256] VALUE_ACTIVATION: str = 'ReLU' class RolloutBuffer: """ Handles episode data collection and batch generation :param buffer_size: Buffer size :param batch_size: Size for batches to be generated """ def __init__(self, buffer_size: int, batch_size: int): self.observations = [] self.probs = [] self.values = [] self.actions = [] self.rewards = [] self.dones = [] self.advantages = None self.returns = None if buffer_size % batch_size != 0: raise TypeError("rollout_steps has to be a multiple of batch_size") self.buffer_size = buffer_size self.batch_size = batch_size self.reset() def generate_batches(self) -> Tuple: """ Generates batches from the stored data :return: batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)] """ # create random index list and split into arrays with batch size indices = np.random.permutation(self.buffer_size) num_batches = int(self.buffer_size / self.batch_size) batches = indices.reshape((num_batches, self.batch_size)) return np.array(self.observations), np.array(self.actions), np.array(self.probs), batches def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None: """ Computes advantage values and returns for all stored episodes. :param last_value: Value from the next step to calculate the advantage for the last episode in the buffer :param gamma: Discount factor for the advantage calculation :param gae_lambda: Smoothing parameter for the advantage calculation :return: None """ # advantage: advantage from the actual returned rewards over the baseline value from step t onwards last_advantage = 0 for step in reversed(range(self.buffer_size)): # use the predicted reward for the advantage computation of the last step of the buffer if step == self.buffer_size - 1: # if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage # doesn't contain values outside the own episode not_done = 1.0 - self.dones[step] next_values = last_value else: not_done = 1.0 - self.dones[step] next_values = self.values[step + 1] delta = self.rewards[step] + gamma * next_values * not_done - self.values[step] last_advantage = delta + gamma * gae_lambda * not_done * last_advantage self.advantages[step] = last_advantage # compute returns = discounted rewards, advantages = discounted rewards - values # Necessary to update the value network self.returns = self.values + self.advantages def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float, reward: Any, done: bool) -> None: """ Appends all data from the recent step :param observation: Observation at the beginning of the step :param action: Index of the selected action :param prob: Probability of the selected action (output from the policy_net) :param value: Baseline value that the value_net estimated from this step onwards according to the :param observation: Output from the value_net :param reward: Reward the env returned in this step :param done: True if the episode ended in this step :return: None """ self.observations.append(observation) self.actions.append(action) self.probs.append(prob) self.values.append(value) self.rewards.append(reward) self.dones.append(done) def reset(self) -> None: """ Resets all buffer lists :return: None """ self.observations = [] self.probs = [] self.actions = [] self.rewards = [] self.dones = [] self.values = [] self.advantages = np.zeros(self.buffer_size, dtype=np.float32) class PolicyNetwork(nn.Module): """ Policy Network for the agent :param input_dim: Observation size to determine input dimension :param n_actions: Number of action to determine output size :param learning_rate: Learning rate for the network :param hidden_layers: List of hidden layer sizes (int) :param activation: String naming activation function for hidden layers """ def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(PolicyNetwork, self).__init__() net_structure = [] # get activation class according to string activation = getattr(nn, activation)() # create first hidden layer in accordance with the input dim and the first hidden dim net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation]) # create the other hidden layers for i, layer_dim in enumerate(hidden_layers): if not i+1 == len(hidden_layers): net_structure.extend([nn.Linear(layer_dim, hidden_layers[i+1]), activation]) else: # create output layer net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)]) self.policy_net = nn.Sequential(*net_structure) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu') self.to(self.device) def forward(self, observation): """forward function""" observation.to(self.device) logits = self.policy_net(observation) dist = Categorical(logits=logits) return dist class ValueNetwork(nn.Module): """ Value Network for the agent :param input_dim: Observation size to determine input dimension :param learning_rate: Learning rate for the network :param hidden_layers: List of hidden layer sizes (int) :param activation: String naming activation function for hidden layers """ def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str): super(ValueNetwork, self).__init__() net_structure = [] # get activation class according to string activation = getattr(nn, activation)() # create first hidden layer in accordance with the input dim and the first hidden dim net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation]) # create the other hidden layers for i, layer_dim in enumerate(hidden_layers): if not i + 1 == len(hidden_layers): net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation]) else: # create output layer net_structure.append(nn.Linear(layer_dim, 1)) self.value_net = nn.Sequential(*net_structure) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu') self.to(self.device) def forward(self, observation): """forward function""" value = self.value_net(observation) return value class PPO: """PPO Agent class""" def __init__(self, env, config: dict, logger: Logger = None): """ | gamma: Discount factor for the advantage calculation | learning_rate: Learning rate for both, policy_net and value_net | gae_lambda: Smoothing parameter for the advantage calculation | clip_range: Limitation for the ratio between old and new policy
| batch_size: Size of batches which were sampled from the buffer and fed into the nets during training | n_epochs: Number of repetitions for each training iteration | rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size """ self.env = env self.gamma = config.get('gamma', 0.99) self.gae_lambda = config.get('gae_lambda', 0.95) self.clip_range = config.get('clip_range', 0.2) self.n_epochs = config.get('n_epochs', 0.5) self.rollout_steps = config.get('rollout_steps', 2048) self.ent_coef = config.get('ent_coef', 0.0) self.num_timesteps = 0 self.n_updates = 0 self.learning_rate = config.get('learning_rate', 0.002) self.batch_size = config.get('batch_size', 256) self.logger = logger if logger else Logger(config=config) self.seed = config.get('seed', None) # torch seed setting if self.seed is not None: random.seed(self.seed) np.random.seed(self.seed) T.manual_seed(self.seed) # self.env.action_space.seed(seed) self.env.seed(self.seed) # create networks and buffer self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate, config.get('policy_layer', POLICY_LAYER), config.get('policy_activation', POLICY_ACTIVATION)) self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate, config.get('value_layer', VALUE_LAYER), config.get('value_activation', VALUE_ACTIVATION)) self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size) @classmethod def load(cls, file: str, config: dict, logger: Logger = None): """ Creates a PPO object according to the parameters saved in file.pkl :param file: Path and filname (without .pkl) of your saved model pickle file :param config: Dictionary with parameters to specify PPO attributes :param logger: Logger :return: MaskedPPO object """ with open(f"{file}.pkl", "rb") as handle: data = pickle.load(handle) env = data["params"]["env"] # create PPO object, commit necessary parameters. Update remaining parameters model = cls(env=env, config=config, logger=logger) model.__dict__.update(data["params"]) # set weights from policy and value model.policy_net.load_state_dict(data["policy_params"]) model.value_net.load_state_dict(data["value_params"]) return model def save(self, file: str) -> None: """ Save model as pickle file :param file: Path under which the file will be saved :return: None """ params_dict = self.__dict__.copy() del params_dict['logger'] data = { "params": params_dict, "policy_params": self.policy_net.state_dict(), "value_params": self.value_net.state_dict() } with open(f"{file}.pkl", "wb") as handle: pickle.dump(data, handle) def forward(self, observation: np.ndarray, **kwargs) -> Tuple: """ Predicts an action according to the current policy based on the observation and the value for the next state :param observation: Current observation of teh environment :param kwargs: Used to accept but ignore passing actions masks from the environment. :return: Predicted action, probability for this action, and predicted value for the next state """ observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device) dist = self.policy_net(observation) value = self.value_net(observation) action = dist.sample() prob = T.squeeze(dist.log_prob(action)).item() action = T.squeeze(action).item() value = T.squeeze(value).item() return action, prob, value def predict(self, observation: np.ndarray, deterministic: bool = True, state=None, **kwargs) -> Tuple: """ Action prediction for testing :param observation: Current observation of teh environment :param deterministic: Set True, to force a deterministic prediction :param state: The last states (used in rnn policies) :param kwargs: Used to accept but ignore passing actions masks from the environment. :return: Predicted action and next state (used in rnn policies) """ observation = T.tensor(np.array([observation]), dtype=T.float).to(self.policy_net.device) with T.no_grad(): dist = self.policy_net(observation) if deterministic: action = T.argmax(dist.probs) else: # choose random action according to the predicted probs action = dist.sample() action = T.squeeze(action).item() return action, state def train(self) -> None: """ Trains policy and value :return: None """ # switch to train mode self.policy_net.train(True) self.value_net.train(True) policy_losses, value_losses, entropy_losses, total_losses = [], [], [], [] for _ in range(self.n_epochs): # get data from buffer and random batches(index lists) to iterate over # e.g. obs[batch] returns the observations for all indices in batch obs_arr, action_arr, old_prob_arr, batches = self.rollout_buffer.generate_batches() # get advantage and return values from buffer advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device) returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device) # normalize advantages advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) for batch in batches: observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device) old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device) actions = T.tensor(action_arr[batch]).to(self.policy_net.device) dist = self.policy_net(observations) values = self.value_net(observations) values = T.squeeze(values) # ratio between old and new policy (probs of selected actions) # Should be one at the first batch of every train iteration new_probs = dist.log_prob(actions) prob_ratio = new_probs.exp() / old_probs.exp() # policy clip
policy_loss_1 = prob_ratio * advantages[batch] policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch] # we want to maximize the reward, but running gradient descent -> negate the loss here policy_loss = -T.min(policy_loss_1, policy_loss_2).mean() value_loss = (returns[batch]-values)**2 value_loss = value_loss.mean() # entropy loss entropy_loss = -T.mean(dist.entropy()) entropy_losses.append(entropy_loss.item()) total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss self.policy_net.optimizer.zero_grad() self.value_net.optimizer.zero_grad() total_loss.backward() self.policy_net.optimizer.step() self.value_net.optimizer.step() policy_losses.append(policy_loss.item()) value_losses.append(value_loss.item()) total_losses.append(total_loss.item()) self.n_updates += self.n_epochs # logs # compute explained variance explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns) self.logger.record( { 'agent_training/n_updates': self.n_updates, 'agent_training/loss': np.mean(total_losses), 'agent_training/policy_gradient_loss': np.mean(policy_losses), 'agent_training/value_loss': np.mean(value_losses), 'agent_training/entropy_loss': np.mean(entropy_losses), 'agent_training/explained_variance': explained_var } ) self.logger.dump() def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None: """ Learn over n environment instances or n timesteps. Break depending on which condition is met first One learning iteration consists of collecting rollouts and training the networks :param total_instances: Instance limit :param total_timesteps: Timestep limit :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before. """ instances = 0 # iterate over n episodes = the agents has n episodes to interact with the environment for _ in range(total_instances): obs = self.env.reset() done = False instances += 1 # run agent on env until done while not done: action, prob, val = self.forward(obs) new_obs, reward, done, info = self.env.step(action) self.num_timesteps += 1 self.rollout_buffer.store_memory(obs, action, prob, val, reward, done) # call intermediate_test on_step if intermediate_test: intermediate_test.on_step(self.num_timesteps, instances, self) # break learn if total_timesteps are reached if self.num_timesteps >= total_timesteps: print("total_timesteps reached") self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump() return None
# update every n rollout_steps if self.num_timesteps % self.rollout_steps == 0: # predict the next reward, needed for the advantage computation of the last collected step with T.no_grad(): _, _, val = self.forward(new_obs) self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda) # train networks self.train() # switch back to normal mode self.policy_net.train(False) self.value_net.train(False) # reset buffer to continue collecting rollouts self.rollout_buffer.reset() obs = new_obs if instances % len(self.env.data) == len(self.env.data) - 1: mean_training_reward = np.mean(self.env.episodes_rewards) mean_training_makespan = np.mean(self.env.episodes_makespans) if len(self.env.episodes_tardinesses) == 0: mean_training_tardiness = 0 else: mean_training_tardiness = np.mean(self.env.episodes_tardinesses) self.logger.record( { 'results_on_train_dataset/mean_reward': mean_training_reward, 'results_on_train_dataset/mean_makespan': mean_training_makespan, 'results_on_train_dataset/mean_tardiness': mean_training_tardiness } ) self.logger.dump() print("TRAINING DONE") self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump() def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray: """ From Stable-Baseline Computes fraction of variance that ypred explains about y. Returns 1 - Var[y-ypred] / Var[y] interpretation: ev=0 => might as well have predicted zero ev=1 => perfect prediction ev<0 => worse than just predicting zero :param y_pred: the prediction :param y_true: the expected value :return: explained variance of ypred and y """ assert y_true.ndim == 1 and y_pred.ndim == 1 var_y = np.var(y_true) return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y if __name__ == "__main__": policy_net = PolicyNetwork(4, 10, 0.003, [64, 64], 'ReLU') for name, para in policy_net.named_parameters(): print('{}: {}'.format(name, para.shape))
4.2.3 PPO_MASKED
""" PPO implementation with action mask according to the StableBaselines3 implementation. To reuse trained models, you can make use of the save and load function """ import numpy as np import random
import torch as T import torch.nn as nn import torch.optim as optim from torch.distributions.categorical import Categorical import pickle from typing import Tuple, Any, List from src.utils.logger import Logger # constants POLICY_LAYER: List[int] = [256, 256] POLICY_ACTIVATION: str = 'ReLU' VALUE_LAYER: List[int] = [256, 256] VALUE_ACTIVATION: str = 'ReLU' class RolloutBuffer: """ Handles episode data collection and batch generation :param buffer_size: Buffer size :param batch_size: Size for batches to be generated """ def __init__(self, buffer_size: int, batch_size: int): self.observations = [] self.probs = [] self.values = [] self.actions = [] self.rewards = [] self.dones = [] self.action_masks = [] self.advantages = None self.returns = None if buffer_size % batch_size != 0: raise TypeError("rollout_steps has to be a multiple of batch_size") self.buffer_size = buffer_size self.batch_size = batch_size self.reset() def generate_batches(self) -> Tuple: """ Generates batches from the stored data :return: batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)] """ # create random index list and split into arrays with batch size indices = np.random.permutation(self.buffer_size) num_batches = int(self.buffer_size / self.batch_size) batches = indices.reshape((num_batches, self.batch_size)) return np.array(self.observations), np.array(self.actions), np.array(self.probs), np.array(self.action_masks),\ batches def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None: """ Computes advantage values and returns for all stored episodes. Required to :param last_value: Value from the next step to calculate the advantage for the last episode in the buffer :param gamma: Discount factor for the advantage calculation :param gae_lambda: Smoothing parameter for the advantage calculation :return: None """ # advantage: advantage from the actual returned rewards over the baseline value from step t onwards last_advantage = 0 for step in reversed(range(self.buffer_size)): # use the predicted reward for the advantage computation of the last step of the buffer if step == self.buffer_size - 1: # if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage # doesn't contain values outside the own episode not_done = 1.0 - self.dones[step] next_values = last_value else: not_done = 1.0 - self.dones[step] next_values = self.values[step + 1] delta = self.rewards[step] + gamma * next_values * not_done - self.values[step] last_advantage = delta + gamma * gae_lambda * not_done * last_advantage self.advantages[step] = last_advantage # compute returns = discounted rewards, advantages = discounted rewards - values # Necessary to update the value network self.returns = self.values + self.advantages def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float, reward: Any, done: bool, action_mask: np.ndarray) -> None: """ Appends all data from the recent step :param observation: Observation at the beginning of the step :param action: Index of the selected action :param prob: Probability of the selected action (output from the policy_net) :param value: Baseline value that the value_net estimated from this step onwards according to the :param observation: Output from the value_net :param reward: Reward the env returned in this step :param done: True if the episode ended in this step :param action_mask: One hot vector with ones for all possible actions :return: None """ self.observations.append(observation) self.actions.append(action) self.probs.append(prob) self.values.append(value) self.rewards.append(reward) self.dones.append(done) self.action_masks.append(action_mask) def reset(self) -> None: """ Resets all buffer lists :return: None """ self.observations = [] self.probs = [] self.actions = [] self.rewards = [] self.dones = [] self.values = [] self.action_masks = [] self.advantages = np.zeros(self.buffer_size, dtype=np.float32) class PolicyNetwork(nn.Module): """ Policy Network for the agent :param input_dims: Observation size to determine input dimension :param n_actions: Number of action to determine output size :param learning_rate: Learning rate for the network :param fc1_dims: Size hidden layer 1 :param fc2_dims: Size hidden layer 2 """ def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str): super(PolicyNetwork, self).__init__() net_structure = [] # get activation class according to string activation = getattr(nn, activation)() # create first hidden layer in accordance with the input dim and the first hidden dim net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation]) # create the other hidden layers for i, layer_dim in enumerate(hidden_layers): if not i + 1 == len(hidden_layers): net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation]) else: # create output layer net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)]) self.policy_net = nn.Sequential(*net_structure) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu') self.to(self.device) def forward(self, observation, action_mask): """forward through the actor network""" observation.to(self.device) logits = self.policy_net(observation) # mask probabilities if action_mask is not None (for env.reset) if action_mask is not None: action_mask.to(self.device) logits = T.where(action_mask, logits, T.tensor(-1e+8).to(self.device)) dist = Categorical(logits=logits) return dist class ValueNetwork(nn.Module): """ Value Network for the agent :param input_dims: Observation size to determine input dimension :param learning_rate: Learning rate for the network :param fc1_dims: Size hidden layer 1 :param fc2_dims: Size hidden layer 2 """ def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str): super(ValueNetwork, self).__init__() net_structure = [] # get activation class according to string activation = getattr(nn, activation)() # create first hidden layer in accordance with the input dim and the first hidden dim net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation]) # create the other hidden layers for i, layer_dim in enumerate(hidden_layers): if not i + 1 == len(hidden_layers): net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation]) else: # create output layer net_structure.append(nn.Linear(layer_dim, 1)) self.value_net = nn.Sequential(*net_structure) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu') self.to(self.device) def forward(self, observation): """forward through the value network""" value = self.value_net(observation) return value class MaskedPPO: def __init__(self, env, config: dict, logger: Logger = None): """ | gamma: Discount factor for the advantage calculation | learning_rate: Learning rate for both, policy_net and value_net | gae_lambda: Smoothing parameter for the advantage calculation | clip_range: Limitation for the ratio between old and new policy | batch_size: Size of batches which were sampled from the buffer and fed into the nets during training | n_epochs: Number of repetitions for each training iteration | rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size """ self.env = env self.gamma = config.get('gamma', 0.99) self.gae_lambda = config.get('gae_lambda', 0.95) self.clip_range = config.get('clip_range', 0.2) self.n_epochs = config.get('n_epochs', 0.5) self.rollout_steps = config.get('rollout_steps', 2048) self.ent_coef = config.get('ent_coef', 0.0) self.num_timesteps = 0 self.n_updates = 0 self.learning_rate = config.get('learning_rate', 0.002) self.batch_size = config.get('batch_size', 256) self.logger = logger if logger else Logger(config=config) self.seed = config.get('seed', None) # torch seed setting if self.seed is not None: random.seed(self.seed) np.random.seed(self.seed) T.manual_seed(self.seed) # self.env.action_space.seed(seed) self.env.seed(self.seed) # create networks and buffer self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate, config.get('policy_layer', POLICY_LAYER), config.get('policy_activation', POLICY_ACTIVATION)) self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate, config.get('value_layer', VALUE_LAYER), config.get(
'value_activation', VALUE_ACTIVATION)) self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size) @classmethod def load(cls, file: str, config: dict, logger: Logger = None): """ Creates a PPO object according to the parameters saved in file.pkl :param file: Path and filname (without .pkl) of your saved model pickle file :param config: Dictionary with parameters to specify PPO attributes :param logger: Logger :return: MaskedPPO object """ with open(f"{file}.pkl", "rb") as handle: data = pickle.load(handle) env = data["params"]["env"] # create PPO object, commit necessary parameters. Update remaining parameters model = cls(env=env, config=config, logger=logger) model.__dict__.update(data["params"]) # set weights from policy and value model.policy_net.load_state_dict(data["policy_params"]) model.value_net.load_state_dict(data["value_params"]) return model def save(self, file: str) -> None: """ Save model as pickle file :param file: Path under which the file will be saved :return: None """ params_dict = self.__dict__.copy() del params_dict['logger'] data = { "params": params_dict, "policy_params": self.policy_net.state_dict(), "value_params": self.value_net.state_dict() } with open(f"{file}.pkl", "wb") as handle: pickle.dump(data, handle) def forward(self, observation: np.ndarray, action_mask: np.ndarray) -> Tuple: """ Predicts an action according to the current policy and based on the action_mask and observation and the value for the next state :param observation: Current observation of teh environment :param action_mask: One hot vector with ones for all possible actions :return: Predicted action, probability for this action, and predicted value for the next state """ observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device) if action_mask is not None: action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device) dist = self.policy_net(observation, action_mask) value = self.value_net(observation) action = dist.sample() prob = T.squeeze(dist.log_prob(action)).item() action = T.squeeze(action).item() value = T.squeeze(value).item() return action, prob, value def predict(self, observation: np.ndarray, action_mask: np.ndarray, deterministic: bool = True, state=None) -> Tuple: """ Action prediction for testing :param observation: Current observation of teh environment :param action_mask: One hot vector with ones for all possible actions :param deterministic: Set True, to force a deterministic prediction :param state: The last states (used in rnn policies) :return: Predicted action and next state (used in rnn policies) """ observation = T.tensor(np.array(observation), dtype=T.float).to(self.policy_net.device) action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device) with T.no_grad(): dist = self.policy_net(observation, action_mask) if deterministic:
action = T.argmax(dist.probs) else: # choose random action according to the predicted probs action = dist.sample() action = T.squeeze(action).item() return action, state def train(self) -> None: """ Trains policy and value :return: None """ # switch to train mode self.policy_net.train(True) self.value_net.train(True) policy_losses, value_losses, entropy_losses, total_losses = [], [], [], [] for _ in range(self.n_epochs): # get data from buffer and random batches(index lists) to iterate over # e.g. obs[batch] returns the observations for all indices in batch obs_arr, action_arr, old_prob_arr, action_mask_arr, batches = self.rollout_buffer.generate_batches() # get advantage and return values from buffer advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device) returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device) # normalize advantages advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) for batch in batches: observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device) old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device) actions = T.tensor(action_arr[batch]).to(self.policy_net.device) action_masks = T.tensor(action_mask_arr[batch], dtype=T.bool).to(self.policy_net.device) dist = self.policy_net(observations, action_masks) values = self.value_net(observations) values = T.squeeze(values) # ratio between old and new policy (probs of selected actions) # Should be one at the first batch of every train iteration new_probs = dist.log_prob(actions) prob_ratio = new_probs.exp() / old_probs.exp() # policy clip policy_loss_1 = prob_ratio * advantages[batch] policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch] # we want to maximize the reward, but running gradient descent -> negate the loss here policy_loss = -T.min(policy_loss_1, policy_loss_2).mean() value_loss = (returns[batch]-values)**2 value_loss = value_loss.mean() # entropy loss entropy_loss = -T.mean(dist.entropy()) entropy_losses.append(entropy_loss.item()) total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss self.policy_net.optimizer.zero_grad() self.value_net.optimizer.zero_grad() total_loss.backward() self.policy_net.optimizer.step() self.value_net.optimizer.step() policy_losses.append(policy_loss.item()) value_losses.append(value_loss.item()) total_losses.append(total_loss.item()) self.n_updates += self.n_epochs # logs # compute explained variance explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns) self.logger.record( { 'agent_training/n_updates': self.n_updates, 'agent_training/loss': np.mean(total_losses), 'agent_training/policy_gradient_loss': np.mean(policy_losses), 'agent_training/value_loss': np.mean(value_losses), 'agent_training/entropy_loss': np.mean(entropy_losses), 'agent_training/explained_variance': explained_var }
) self.logger.dump() def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None: """ Learn over n environment instances or n timesteps. Break depending on which condition is met first One learning iteration consists of collecting rollouts and training the networks :param total_instances: Instance limit :param total_timesteps: Timestep limit :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before. """ instances = 0 # iterate over n episodes = the agents has n episodes to interact with the environment for _ in range(total_instances): obs = self.env.reset() info = {'mask': None} done = False instances += 1 # run agent on env until done while not done: action, prob, val = self.forward(obs, action_mask=info['mask']) new_obs, reward, done, info = self.env.step(action) self.num_timesteps += 1 self.rollout_buffer.store_memory(obs, action, prob, val, reward, done, info['mask']) # call intermediate_test on_step if intermediate_test: intermediate_test.on_step(self.num_timesteps, instances, self) # break learn if total_timesteps are reached if self.num_timesteps >= total_timesteps: print("total_timesteps reached") self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump() return None # update every n rollout_steps if self.num_timesteps % self.rollout_steps == 0: # predict the next reward, needed for the advantage computation of the last collected step with T.no_grad(): _, _, val = self.forward(new_obs, info['mask']) self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda) # train networks self.train() # switch back to normal mode self.policy_net.train(False) self.value_net.train(False) # reset buffer to continue collecting rollouts self.rollout_buffer.reset() obs = new_obs if instances % len(self.env.data) == len(self.env.data) - 1: mean_training_reward = np.mean(self.env.episodes_rewards) mean_training_makespan = np.mean(self.env.episodes_makespans) if len(self.env.episodes_tardinesses) == 0: mean_training_tardiness = 0 else: mean_training_tardiness = np.mean(self.env.episodes_tardinesses) self.logger.record( { 'results_on_train_dataset/mean_reward': mean_training_reward, 'results_on_train_dataset/mean_makespan': mean_training_makespan,
'results_on_train_dataset/mean_tardiness': mean_training_tardiness } ) self.logger.dump() print("TRAINING DONE") self.logger.record( { 'results_on_train_dataset/instances': instances, 'results_on_train_dataset/num_timesteps': self.num_timesteps } ) self.logger.dump() def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray: """ From Stable-Baseline Computes fraction of variance that ypred explains about y. Returns 1 - Var[y-ypred] / Var[y] interpretation: ev=0 => might as well have predicted zero ev=1 => perfect prediction ev<0 => worse than just predicting zero :param y_pred: the prediction :param y_true: the expected value :return: explained variance of ypred and y """ assert y_true.ndim == 1 and y_pred.ndim == 1 var_y = np.var(y_true) return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y if __name__ == "__main__": policy_net = PolicyNetwork(4, 10, 0.003) for name, para in policy_net.named_parameters(): print('{}: {}'.format(name, para.shape))
[1] C. W. de Puiseau, J. Peters, C. Dorpelkus, H. Tercan, and T. Meisen, "schlably: A Python framework for deep reinforcement learning based scheduling experiments," Softwarex, vol. 22, May 2023, Art no. 101383, doi: 10.1016/j.softx.2023.101383.