社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

供应链 | Schlably:深度强化学习车间调度实验的Python框架

运筹OR帷幄 • 6 天前 • 31 次点击  

最近发现了一个比较好玩的开源项目Schlably,其是一个基于Python和深度强化学习(DRL),用于进行调度问题实验的框架。它具有可扩展的gym环境和DRL-Agent,以及用于数据生成、训练和测试的相关功能。

1. 引言

生产调度(Production Scheduling, PS)是运筹学(Operations Research, OR)和优化中的一个重要且复杂的问题。它涉及在时间上分配资源以完成生产任务,目标是最小化时间、成本和资源使用等指标。PS问题在人工智能的运用中得到了广泛的关注,尤其是通过深度强化学习(Deep Reinforcement Learning, DRL)技术的应用。

在这个领域,尽管存在大量的实验和研究,但由于各实验的设置和解决方案常常仅有细微的差异,研究人员不得不不断重复相似的编程工作,这大大增加了研究的初始成本和难度。为了解决这个问题,德国伍伯塔尔大学开发了一个名为Schlably的Python框架,它提供了一整套工具来简化PS解决方案的开发和评估。

开源代码:https://github.com/tmdt-buw/schlably
使用手册:https://schlably.readthedocs.io/en/latest/index.html

2. 背景与相关工作

Schlably的开发初衷源于一个大学研究项目,该项目需要解决一个实际的生产调度问题,并与工业合作伙伴合作。因此,作者在开发早期就确定了一些设计目标,包括:提供开箱即用的DRL方法和启发式算法、覆盖不同的调度场景、支持详细的评估、并且易于代码交互,以便学生和研究人员快速上手。

作者还对现有的相关框架进行了评估和比较。Schlably的设计目标在于提供一种灵活、模块化、易于使用的实验框架,特别是在生成调度问题实例和集成第三方DRL库方面有显著优势。

3. 软件架构

Schlably的架构设计非常注重模块化和扩展性。其核心组件包括:

  • 环境(Environment)
  • 调度问题生成器(Scheduling Problem Generator)
  • DRL代理算法(DRL Agent Algorithm)
  • 日志记录和评估工具(Logging and Evaluation Tools)

这一设计不仅简化了实验设置,还使得不同组件之间的替换和扩展变得更加容易。

下图展示了Schlably的总体架构:

Schlably架构图

在Schlably的架构中,作者使用了强化学习中的Q学习算法。其更新公式如下:

其中:

  • 是在状态  下采取动作  的值。
  •  是学习率。
  •  是当前时间步的奖励。
  •  是折扣因子。
  •  是下一状态  下所有可能动作的最大值。

4. 算法

Schlably中集成了多个调度算法,包括启发式算法、强化学习代理和求解器。

4.1 启发式算法

其中启发式算法包括:

  • EDD: earliest due date
  • SPT: shortest processing time first
  • MTR: most tasks remaining
  • LTR: least tasks remaining
  • Random: random action

具体实现代码如下:

"""
This module provides the following scheduling heuristics as function:

- EDD: earliest due date
- SPT: shortest processing time first
- MTR: most tasks remaining
- LTR: least tasks remaining
- Random: random action

You can implement additional heuristics in this file by specifying a function that takes a list of tasks and an action
mask and returns the index of the job to be scheduled next.

If you want to call your heuristic via the HeuristicSelectionAgent or edit an existing shortcut,
adapt/extend the task_selection dict attribute of the HeuristicSelectionAgent class.

:Example:

Add a heuristic that returns zeros (this is not a practical example!)
1. Define the according function

.. code-block:: python

    def return_0_heuristic(tasks: List[Task], action_mask: np.array) -> int:
        return 0

2. Add the function to the task_selection dict within the HeuristicSelectionAgent class:

.. code-block:: python

    self.task_selections = {
        'rand': random_task,
        'EDD': edd,
        'SPT': spt,
        'MTR': mtr,
        'LTR': ltr,
        'ZERO': return_0_heuristic
    }

"
""
import numpy as np
from typing import List

from src.data_generator.task import Task


def get_active_task_dict(tasks: List[Task]) -> dict:
    """
    Helper function to determining the next unfinished task to be processed for each job

    :param tasks: List of task objects, so one instance

    :return: Dictionary containing the next tasks to be processed for each job

    Would be an empty dictionary if all tasks were completed

    "
""
    active_job_task_dict = {}
    for task_i, task in enumerate(tasks):
        if not task.done and task.job_index not in active_job_task_dict.keys():
            active_job_task_dict[task.job_index] = task_i
    return active_job_task_dict


def edd(tasks: List[Task], action_mask: np.array) -> int:
    """
    EDD: earliest due date. Determines the job with the smallest deadline

    :param tasks: List of task objects, so one instance
    :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic

    :return: Index of the job selected according to the heuristic

    "
""
    if np.sum(action_mask) == 1:
        chosen_job = np.argmax(action_mask)
    else:
        num_jobs = action_mask.shape[0] - 1
        num_tasks_per_job = len(tasks) / num_jobs
        deadlines = np.full(num_jobs + 1, np.inf)

        for job_i in range(num_jobs):
            idx = int(num_tasks_per_job * job_i)
            deadlines[job_i] = tasks[idx].deadline

        deadlines = np.where(action_mask == 1, deadlines, np.full(deadlines.shape, np.inf))
        chosen_job = np.argmin(deadlines)
    return chosen_job


def spt(tasks: List[Task], action_mask: np.array) -> int:
    """
    SPT: shortest processing time first. Determines the job of which the next unfinished task has the lowest runtime

    :param tasks: List of task objects, so one instance
    :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic

    :return: Index of the job selected according to the heuristic

    "
""
    if np.sum(action_mask) == 1:
        chosen_job = np.argmax(action_mask)
    else:
        num_jobs = action_mask.shape[0] - 1
        runtimes = np.full(num_jobs + 1, np.inf)
        active_task_dict = get_active_task_dict(tasks)

        for i in range(num_jobs):
            if i in active_task_dict.keys():
                task_idx = active_task_dict[i]
                runtimes[i] = tasks[task_idx].runtime
        runtimes = np.where(action_mask == 1, runtimes, np.full(runtimes.shape, np.inf))
        chosen_job = np.argmin(runtimes)
    return chosen_job


def mtr(tasks: List[Task], action_mask: np.array) -> int:
    """
    MTR: most tasks remaining. Determines the job with the least completed tasks

    :param tasks: List of task objects, so one instance
    :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic

    :return: Index of the job selected according to the heuristic

    "
""
    if np.sum(action_mask) == 1:
        chosen_job = np.argmax(action_mask)
    else:
        tasks_done = np.zeros(len(tasks) + 1)
        possible_tasks = get_active_task_dict(tasks)
        for _, task in enumerate(tasks):
            if task.done and task.job_index in possible_tasks.keys():
                tasks_done[possible_tasks[task.job_index]] += 1

        task_mask = np.zeros(len(tasks) + 1)
        for job_id, task_id in possible_tasks.items():
            if action_mask[job_id] == 1:
                task_mask[task_id] += 1
        tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, np.inf))
        tasks_done[-1] = np.inf
        chosen_task = np.argmin(tasks_done)
        chosen_job = tasks[chosen_task].job_index
    return chosen_job


def ltr(tasks: List[Task], action_mask: np.array) -> int:
    """
    LTR: least tasks remaining. Determines the job with the most completed tasks

    :param tasks: List of task objects, so one instance
    :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic

    :return: Index of the job selected according to the heuristic

    "
""
    if np.sum(action_mask) == 1:
        chosen_job = np.argmax(action_mask)
    else:
        tasks_done = np.zeros(len(tasks) + 1)
        possible_tasks = get_active_task_dict(tasks)
        for _, task in enumerate(tasks):
            if task.done and task.job_index in possible_tasks.keys():
                tasks_done[possible_tasks[task.job_index]] += 1
        task_mask = np.zeros(len(tasks) + 1)
        for job_id, task_id in possible_tasks.items():
            if action_mask[job_id] == 1:
                task_mask[task_id] += 1
        tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, -1))
        tasks_done[-1] = -1
        chosen_task = np.argmax(tasks_done)
        chosen_job = tasks[chosen_task].job_index
    return chosen_job


def random_task(tasks: List[Task], action_mask: np.array) -> int:
    """
    Returns a random task

    :param tasks: Not needed
    :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic

    :return: Index of the job selected according to the heuristic

    "
""

    chosen_job = None
    if np.sum(action_mask) == 1:
        chosen_job = np.argmax(action_mask)
    else:
        valid_values_0 = np.where(action_mask > 0)[0]

        if len(valid_values_0) > 2:
            chosen_job = np.random.choice(valid_values_0, size=1)[0]
        elif len(valid_values_0) == 0:
            print('this is not possible')
        else:
            chosen_job = np.random.choice(valid_values_0, size=1)[0]
    return chosen_job


def choose_random_machine(chosen_task, machine_mask) -> int:
    """
    Determines a random machine which is available according to the mask and chosen task. Useful for the FJSSP.

    :param chosen_task: ID of the task that is scheduled on the selected machine
    :param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function

    :return: Index of the chosen machine

    "
""
    machine_mask = np.array(np.where(machine_mask > 0))
    idx_valid_machine = np.where(machine_mask[0] == chosen_task)
    valid_machines = machine_mask[1][idx_valid_machine]
    chosen_machine = np.random.choice(valid_machines, size=1)[0]
    return chosen_machine


def choose_first_machine(chosen_task, machine_mask) -> int:
    """
    Determines the first (by index) machine which is available according to the mask and chosen task. Useful for the
    FJSSP

    :param chosen_task: ID of the task that is scheduled on the selected machine
    :param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function

    :return: Index of the chosen machine

    "
""
    machine_mask = np.array(np.where(machine_mask > 0))
    idx_valid_machine = np.where(machine_mask[0] == chosen_task)
    valid_machines = machine_mask[1][idx_valid_machine]
    return valid_machines[0]


class HeuristicSelectionAgent:
    """
    This class can be used to get the next task according to the heuristic passed as string abbreviation (e.g. EDD).
    If you want to edit a shortcut, or add one for your custom heuristic, adapt/extend the task_selection dict.

    :Example:

    .. code-block:: python

        def my_custom_heuristic():
            ......

    or

    .. code-block:: python

        self.task_selections = {
            'rand': random_task,
            'XYZ': my_custom_heuristic
            }

    "
""

    def __init__(self) -> None:

        super().__init__()
        # Map heuristic ids to corresponding function
        self.task_selections = {
            'rand': random_task,
            'EDD': edd,
            'SPT': spt,
            'MTR': mtr,
            'LTR': ltr
        }

    def __call__(self, tasks: List, action_mask: np.array, task_selection: str) -> int:
        """
        Selects the next heuristic function according to the heuristic passed as string abbreviation
        and the assignment in the task_selections dictionary

        :param tasks: List of task objects, so one instance
        :param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
        :param task_selection: Heuristic string abbreviation (e.g. EDD)

        :return: Index of the job selected according to the heuristic

        "
""
        choose_task = self.task_selections[task_selection]

        chosen_task = choose_task(tasks, action_mask)

        return chosen_task

4.2 强化学习代理

其中强化学习包括DQN、PPO、PPO_masked三种代理。

4.2.1 DQN

"""
DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation.
To reuse trained models, you can make use of the save and load function.
To adapt policy and value network structure, specify the layer and activation parameter in your train config or
change the constants in this file
"
""
import numpy as np
import pickle
import random
from collections import deque
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from typing import Tuple, List

from src.utils.logger import Logger

# constants
LAYER: List[int] = [64, 64]
ACTIVATION: str = 'ReLU'


class MemoryBuffer:
    """
    Handles episode data collection and sample generation

    :param buffer_size: Buffer size
    :param batch_size: Size for batches to be generated
    :param obs_dim: Size of the observation to be stored in the buffer
    :param obs_type: Type of the observation to be stored in the buffer
    :param action_type: Type of the action to be stored in the buffer

    "
""
    def __init__(self, buffer_size: int, batch_size: int, obs_dim: int, obs_type: type, action_type: type):

        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.pos = 0
        self.full = False

        # buffer data
        self.obs = np.zeros((buffer_size, obs_dim), dtype=obs_type)
        self.actions = np.zeros((buffer_size, 1), dtype=action_type)
        self.rewards = np.zeros((buffer_size, 1), dtype=np.float32)
        self.dones = np.zeros((buffer_size, 1), dtype=np.float32)   # try with   dtype=np.bool
        self.new_obs = np.zeros((buffer_size, obs_dim), dtype=np.float32)

    def __len__(self):
        if self.full:
            return self.buffer_size
        else:
            return self.pos

    def store_memory(self, obs, action, reward, done, new_obs) -> None:
        """
        Appends all data from the recent step

        :param obs: Observation at the beginning of the step
        :param action: Index of the selected action
        :param reward: Reward the env returned in this step
        :param done: True if the episode ended in this step
        :param new_obs:  Observation after the step

        :return:

        "
""
        self.obs[self.pos] = np.array(obs).copy()
        self.actions[self.pos] = np.array(action).copy()
        self.rewards[self.pos] = np.array(reward).copy()
        self.dones[self.pos] = np.array(done).copy()
        self.new_obs[self.pos] = np.array(new_obs)

        self.pos += 1
        # if pos behind last element -> buffer full.
        # Return pos to 0. Next step, the oldest data in the buffer is then replaced by the newest one
        if self.pos == self.buffer_size:
            self.full = True
            self.pos = 0

    def get_samples(self) -> Tuple:
        """
        Generates random samples from the stored data

        :return: batch_size samples from the buffer. e.g. obs, actions, ..., new_obs from step 21

        "
""
        # generate batch_size random indices in range of current buffer len
        indices = np.random.randint(0, len(self), size=self.batch_size)

        return self.obs[indices], self.actions[indices], self.rewards[indices], \
            self.dones[indices], self.new_obs[indices]


class Policy(nn.Module):
    """
    Network structure used for both the Q network and the target network

    :param obs_dim: Observation size to determine input dimension
    :param action_dim: Number of action to determine output size
    :param learning_rate: Learning rate for the network
    :param hidden_layers: List of hidden layer sizes (int)
    :param activation: String naming activation function for hidden layers

    "
""
    def __init__(self, obs_dim: int, action_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
        super(Policy, self).__init__()

        net_structure = []
        # get activation class according to string
        activation = getattr(nn, activation)()

        # create first hidden layer in accordance with the input dim and the first hidden dim
        net_structure.extend([nn.Linear(obs_dim, hidden_layers[0]), activation])

        # create the other hidden layers
        for i, layer_dim in enumerate(hidden_layers):
            if not i + 1 == len(hidden_layers):
                net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
            else:
                # create output layer
                net_structure.append(nn.Linear(layer_dim, action_dim))

        self.q_net = nn.Sequential(*net_structure)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu')
        self.to(self.device)

    def forward(self, obs):
        """ forward pass through the Q-network """
        q_values = self.q_net(obs)
        return q_values


class DQN:
    """DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation."""
    def __init__(self, env, config: dict, logger: Logger = None):

        """
        | batch_size: Number of samples that are chosen and passed through the net per update
        | gradient_steps: Number of updates per training
        | train_freq: Environment steps between two trainings
        | buffer_size: Size of the memory buffer = max number of rollouts that can be stored before the oldest are deleted
        | target_net_update: Number of steps between target_net_updates
        | training_starts = Learning_starts: steps after which training can start for the first time
        | initial_eps: Initial epsilon value
        | final_eps: Final epsilon value
        | fraction_eps: If the percentage progress of learn exceeds fraction eps, epsilon takes the final_eps value
        | e.g. 5/100 total_timesteps done -> progress = 0.5 > fraction eps -> eps=final_eps
        | max_grad_norm: Value to clip the policy update of the q_net

        :param env: Pregenerated, gymbased environment. If no env is passed, env = None -> PPO can only be used
            for evaluation (action prediction)
        :param config: Dictionary with parameters to specify DQN attributes
        :param logger: Logger

        "
""

        self.env = env
        self.gamma = config.get('gamma', 0.99)
        self.learning_rate = config.get('learning_rate', 1e-4)
        self.batch_size = config.get('batch_size', 32)
        self.gradient_steps = config.get('gradient_steps', 1)
        self.train_freq = config.get('train_freq', 4)
        self.buffer_size = config.get('buffer_size', 1_000_000)

        self.target_net_update = config.get('target_net_update', 10_000)
        self.training_starts = config.get('training_starts', 50_000)
        self.initial_eps = config.get('initial_eps', 1.0)
        self.final_eps = config.get('final_eps', 0.05)
        self.fraction_eps = config.get('fraction_eps', 0.1)
        self.max_grad_norm = config.get('max_grad_norm', 10.0)
        self.epsilon = self.initial_eps  # epsilon is the exploration rate
        self.remaining_progress = 1  # tracks how much % of total steps remain -> value between 1 and 0
        self.num_timesteps = 0
        self.n_updates = 0

        self.logger = logger if logger else Logger(config=config)
        self.seed = config.get('seed', None)
        self.reward_info = deque(maxlen=100)

        # torch seed setting
        if self.seed is not None:
            random.seed(self.seed)
            np.random.seed(self.seed)
            T.manual_seed(self.seed)
            self.env.action_space.seed(self.seed)
            self.env.seed(self.seed)

        # create networks and buffer
        self.q_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
                            config.get('layer', LAYER),
                            config.get('activation', ACTIVATION))
        self.q_target_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
                                   config.get('layer', LAYER),
                                   config.get('activation', ACTIVATION))
        # copy weights to target_net
        self.q_target_net.load_state_dict(self.q_net.state_dict())
        self.memory_buffer = MemoryBuffer(self.buffer_size, self.batch_size, env.observation_space.shape[0],
                                          env.observation_space.dtype, env.action_space.dtype)

    def save(self, file: str) -> None:
        """
        Save model as pickle file

        :param file: Path under which the file will be saved

        :return: None

        "
""
        params_dict = self.__dict__.copy()
        del params_dict['logger']
        data = {
            "params": params_dict,
            "q_params": self.q_net.state_dict(),
            "target_params": self.q_target_net.state_dict()
        }

        with open(f"{file}.pkl""wb") as handle:
            pickle.dump(data, handle)

    @classmethod
    def load(cls, file: str, config: dict, logger: Logger = None):
        """
        Creates a DQN object according to the parameters saved in file.pkl

        :param file: Path and filname (without .pkl) of your saved model pickle file
        :param config: Dictionary with parameters to specify PPO attributes
        :param logger: Logger

        :return: DQN object

        "
""
        with open(f"{file}.pkl""rb") as handle:
            data = pickle.load(handle)

        env = data["params"]["env"]

        # create DQN object. Commit necessary parameters. Update remaining parameters
        model = cls(env=env, config=config, logger=logger)
        model.__dict__.update(data["params"])

        # set weights
        model.q_net.load_state_dict(data["q_params"])
        model.q_target_net.load_state_dict(data["target_params"])

        return model

    def get_action(self, obs: np.ndarray) -> int:
        """
        Random action or action according to the policy and epsilon

        :return: action index

        "
""
        if np.random.random() < self.epsilon:
            # random action from the action space
            action = self.env.action_space.sample()
        else:
            obs = T.tensor(obs, dtype=T.float).to(self.q_net.device)
            q_values = self.q_net(obs)
            # choose action with highest Q value -> greedy policy
            action = T.argmax(q_values)
            action = T.squeeze(action).item()

        return action

    def predict(self, observation: np.ndarray, action_mask: np.ndarray = np.ones(1),
                deterministic: bool = True, state=None) -> Tuple:
        """
        Action prediction for testing

        :param observation: Current observation of teh environment
        :param action_mask: Mask of actions, which can logically be taken. NOTE: currently not implemented!
        :param deterministic: Set True, to force a deterministic prediction
        :param state: The last states (used in rnn policies)

        :return: Predicted action and next state (used in rnn policies)

        "
""
        observation = T.tensor(np.array([observation]), dtype=T.float).to(self.q_net.device)

        with T.no_grad():
            q_values = self.q_net(observation)
            if deterministic:
                action = T.argmax(q_values)
            else:
                # choose random action according to the predicted probs
                action = q_values.sample()
            action = T.squeeze(action).item()

        return action, state

    def train(self) -> None:
        """
        Trains Q-network and Target-Network

        :return: None

        "
""
        # Switch to train mode (this affects batch norm / dropout)
        self.q_net.train()

        losses = []

        for _ in range(self.gradient_steps):

            # get samples from the buffer
            obs_arr, action_arr, reward_arr, done_array, new_obs_array = self.memory_buffer.get_samples()

            # convert to tensors
            obs = T.tensor(obs_arr, dtype=T.float).to(self.q_target_net.device)
            actions = T.tensor(action_arr, dtype=T.float).to(self.q_target_net.device)
            rewards = T.tensor(reward_arr, dtype=T.float).to(self.q_target_net.device)
            dones = T.tensor(done_array, dtype=T.float).to(self.q_target_net.device)
            new_obs = T.tensor(new_obs_array, dtype=T.float).to(self.q_target_net.device)

            # no update on the target net -> use no_grad
            with T.no_grad():
                # Compute the next Q-values using the target network
                next_q_values = self.q_target_net(new_obs)
                # Follow greedy policy: use the one with the highest value
                next_q_values, _ = next_q_values.max(dim=1)
                # Avoid potential broadcast issue
                next_q_values = next_q_values.reshape(-1, 1)
                # 1-dones -> reward + 0 if step is last in episode
                target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

            # get all current Q-values for each obs
            current_q_values = self.q_net(obs)

            # choose Q-Values according to actions
            current_q_values = T.gather(current_q_values, dim=1, index=actions.long())

            # loss computation. MSE also possible
            loss = F.smooth_l1_loss(current_q_values, target_q_values)
            losses.append(loss.item())

            # update
            self.q_net.optimizer.zero_grad()
            loss.backward()
            # clip
            nn.utils.clip_grad_norm_(self.q_net.parameters(), self.max_grad_norm)
            self.q_net.optimizer.step()

        self.n_updates += self.gradient_steps

        # logs
        self.logger.record(
            {
                'agent_training/exploration rate': self.epsilon,
                'agent_training/n_updates': self.n_updates,
                'agent_training/loss': np.mean(losses),
                'agent_training/mean_rwd': np.mean(self.reward_info)
            }
        )
        self.logger.dump()

        # if self.num_timesteps % 10_000 == 0:
        #     print(f'Update at {self.num_timesteps} Mean reward {np.mean(self.reward_info)}')

    def on_step(self, total_timesteps):
        """
        Method track and check plenty conditions to e.g. check if q_target_net or epsilon update are necessary
        "
""
        # update progress
        self.remaining_progress = 1 - float(self.num_timesteps) / float(total_timesteps)

        # update target_net with parameters from main q_net
        if self.num_timesteps % self.target_net_update == 0:
            self.q_target_net.load_state_dict(self.q_net.state_dict())

        # update epsilon
        if (1-self.remaining_progress) > self.fraction_eps:
            # constant if fraction reached
            self.epsilon = self.final_eps
        else:
            # linear function. Goes from initial eps to final eps. Reaches final values in the step,
            # where the function turns constant
            self.epsilon = self.initial_eps + \
                           (1-self.remaining_progress) * (self.final_eps-self.initial_eps) / self.fraction_eps

    def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
        """
        Learn over n problem instances or n timesteps (environment steps).
        Breaks depending on which condition is met first.
        One learning iteration consists of collecting rollouts and training the networks on the rollout data

        :param total_instances: Instance limit
        :param total_timesteps: Timestep limit
        :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.

        "
""
        instances = 0

        # iterate over n episodes = the agents has n episodes to interact with the environment
        for _ in range(total_instances):
            obs = self.env.reset()
            done = False
            instances += 1
            episode_reward = 0

            # run agent on env until done
            while not done:
                # observe and fill buffer
                action = self.get_action(obs)
                new_obs, reward, done, info = self.env.step(action)
                self.num_timesteps += 1
                episode_reward += reward
                self.memory_buffer.store_memory(obs, action, reward, done, new_obs)

                # call intermediate_test on_step
                if intermediate_test:
                    intermediate_test.on_step(self.num_timesteps, instances, self)

                # call function intern on_step
                self.on_step(total_timesteps)

                # break learn if total_timesteps are reached
                if self.num_timesteps >= total_timesteps:
                    print(f'Total timesteps reached: {total_timesteps}')
                    self.logger.record(
                        {
                            'results_on_train_dataset/instances': instances,
                            'results_on_train_dataset/num_timesteps': self.num_timesteps
                        }
                    )
                    self.logger.dump()

                    return None

                # train if training_starts is reached and then every n rollout_steps
                if self.num_timesteps >= self.training_starts and self.num_timesteps % self.train_freq == 0:

                    self.train()
                    # switch back to eval mode
                    self.q_net.train(False)

                obs = new_obs

            self.reward_info.append(episode_reward)

            if instances % len(self.env.data) == len(self.env.data) - 1:
                mean_training_reward = np.mean(self.env.episodes_rewards)
                mean_training_makespan = np.mean(self.env.episodes_makespans)
                mean_training_tardiness = np.mean(self.env.tardiness)
                self.logger.record(
                    {
                        'results_on_train_dataset/mean_reward': mean_training_reward,
                        'results_on_train_dataset/mean_makespan': mean_training_makespan,
                        'results_on_train_dataset/mean_tardiness': mean_training_tardiness
                    }
                )
                self.logger.dump()

        print("TRAINING DONE")
        self.logger.record(
            {
                'results_on_train_dataset/instances': instances,
                'results_on_train_dataset/num_timesteps': self.num_timesteps
            }
        )
        self.logger.dump()

4.2.2 PPO

"""
PPO implementation inspired by the StableBaselines3 implementation.
To reuse trained models, you can make use of the save and load function
To adapt policy and value network structure, specify the policy and value layer and activation parameter
in your train config or change the constants in this file
"
""
import numpy as np
import random
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
import pickle
from typing import Tuple, Any, List

from src.utils.logger import Logger

# constants
POLICY_LAYER: List[int] = [256, 256]
POLICY_ACTIVATION: str = 'ReLU'
VALUE_LAYER: List[int] = [256, 256]
VALUE_ACTIVATION: str = 'ReLU'


class RolloutBuffer:
    """
    Handles episode data collection and batch generation

    :param buffer_size: Buffer size
    :param batch_size: Size for batches to be generated

    "
""
    def __init__(self, buffer_size: int, batch_size: int):

        self.observations = []
        self.probs = []
        self.values = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.advantages = None
        self.returns = None

        if buffer_size % batch_size != 0:
            raise TypeError("rollout_steps has to be a multiple of batch_size")
        self.buffer_size = buffer_size
        self.batch_size = batch_size

        self.reset()

    def generate_batches(self) -> Tuple:
        """
        Generates batches from the stored data

        :return: batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size
            e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)]

        "
""
         # create random index list and split into arrays with batch size
        indices = np.random.permutation(self.buffer_size)
        num_batches = int(self.buffer_size / self.batch_size)
        batches = indices.reshape((num_batches, self.batch_size))

        return np.array(self.observations), np.array(self.actions), np.array(self.probs), batches

    def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None:
        """
        Computes advantage values and returns for all stored episodes.

        :param last_value: Value from the next step to calculate the advantage for the last episode in the buffer
        :param gamma: Discount factor for the advantage calculation
        :param gae_lambda: Smoothing parameter for the advantage calculation

        :return: None

        "
""
        # advantage: advantage from the actual returned rewards over the baseline value from step t onwards
        last_advantage = 0
        for step in reversed(range(self.buffer_size)):
            # use the predicted reward for the advantage computation of the last step of the buffer
            if step == self.buffer_size - 1:
                # if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage
                # doesn't contain values outside the own episode
                not_done = 1.0 - self.dones[step]
                next_values = last_value
            else:
                not_done = 1.0 - self.dones[step]
                next_values = self.values[step + 1]
            delta = self.rewards[step] + gamma * next_values * not_done - self.values[step]
            last_advantage = delta + gamma * gae_lambda * not_done * last_advantage
            self.advantages[step] = last_advantage

        # compute returns = discounted rewards, advantages = discounted rewards - values
        # Necessary to update the value network
        self.returns = self.values + self.advantages

    def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float,
                     reward: Any, done: bool) -> None:
        """
        Appends all data from the recent step

        :param observation: Observation at the beginning of the step
        :param action: Index of the selected action
        :param prob: Probability of the selected action (output from the policy_net)
        :param value: Baseline value that the value_net estimated from this step onwards according to the
        :param observation: Output from the value_net
        :param reward: Reward the env returned in this step
        :param done: True if the episode ended in this step

        :return: None

        "
""
        self.observations.append(observation)
        self.actions.append(action)
        self.probs.append(prob)
        self.values.append(value)
        self.rewards.append(reward)
        self.dones.append(done)

    def reset(self) -> None:
        """
        Resets all buffer lists

        :return: None

        "
""
        self.observations = []
        self.probs = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.values = []
        self.advantages = np.zeros(self.buffer_size, dtype=np.float32)


class PolicyNetwork(nn.Module):
    """
    Policy Network for the agent

    :param input_dim: Observation size to determine input dimension
    :param n_actions: Number of action to determine output size
    :param learning_rate: Learning rate for the network
    :param hidden_layers: List of hidden layer sizes (int)
    :param activation: String naming activation function for hidden layers

    "
""
    def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str):

        super(PolicyNetwork, self).__init__()

        net_structure = []
        # get activation class according to string
        activation = getattr(nn, activation)()

        # create first hidden layer in accordance with the input dim and the first hidden dim
        net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation])

        # create the other hidden layers
        for i, layer_dim in enumerate(hidden_layers):
            if not i+1 == len(hidden_layers):
                net_structure.extend([nn.Linear(layer_dim, hidden_layers[i+1]), activation])
            else:
                # create output layer
                net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)])

        self.policy_net = nn.Sequential(*net_structure)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu')
        self.to(self.device)

    def forward(self, observation):
        """forward function"""
        observation.to(self.device)
        logits = self.policy_net(observation)

        dist = Categorical(logits=logits)
        
        return dist


class ValueNetwork(nn.Module):
    """
    Value Network for the agent

    :param input_dim: Observation size to determine input dimension
    :param learning_rate: Learning rate for the network
    :param hidden_layers: List of hidden layer sizes (int)
    :param activation: String naming activation function for hidden layers

    "
""
    def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
        super(ValueNetwork, self).__init__()

        net_structure = []
        # get activation class according to string
        activation = getattr(nn, activation)()

        # create first hidden layer in accordance with the input dim and the first hidden dim
        net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation])

        # create the other hidden layers
        for i, layer_dim in enumerate(hidden_layers):
            if not i + 1 == len(hidden_layers):
                net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
            else:
                # create output layer
                net_structure.append(nn.Linear(layer_dim, 1))

        self.value_net = nn.Sequential(*net_structure)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu')
        self.to(self.device)

    def forward(self, observation):
        """forward function"""
        value = self.value_net(observation)

        return value


class PPO:
    """PPO Agent class"""
    def __init__(self, env, config: dict, logger: Logger = None):
        """
        | gamma: Discount factor for the advantage calculation
        | learning_rate: Learning rate for both, policy_net and value_net
        | gae_lambda: Smoothing parameter for the advantage calculation
        | clip_range: Limitation for the ratio between old and new policy
        | batch_size: Size of batches which were sampled from the buffer and fed into the nets during training
        | n_epochs: Number of repetitions for each training iteration
        | rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size
        "
""

        self.env = env
        self.gamma = config.get('gamma', 0.99)
        self.gae_lambda = config.get('gae_lambda', 0.95)
        self.clip_range = config.get('clip_range', 0.2)
        self.n_epochs = config.get('n_epochs', 0.5)
        self.rollout_steps = config.get('rollout_steps', 2048)
        self.ent_coef = config.get('ent_coef', 0.0)
        self.num_timesteps = 0
        self.n_updates = 0
        self.learning_rate = config.get('learning_rate', 0.002)
        self.batch_size = config.get('batch_size', 256)

        self.logger = logger if logger else Logger(config=config)
        self.seed = config.get('seed', None)

        # torch seed setting
        if self.seed is not None:
            random.seed(self.seed)
            np.random.seed(self.seed)
            T.manual_seed(self.seed)
            # self.env.action_space.seed(seed)
            self.env.seed(self.seed)

        # create networks and buffer
        self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
                                        config.get('policy_layer', POLICY_LAYER),
                                        config.get('policy_activation', POLICY_ACTIVATION))
        self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate,
                                      config.get('value_layer', VALUE_LAYER),
                                      config.get('value_activation', VALUE_ACTIVATION))
        self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size)

    @classmethod
    def load(cls, file: str, config: dict, logger: Logger = None):
        """
        Creates a PPO object according to the parameters saved in file.pkl

        :param file: Path and filname (without .pkl) of your saved model pickle file
        :param config: Dictionary with parameters to specify PPO attributes
        :param logger: Logger

        :return: MaskedPPO object

        "
""
        with open(f"{file}.pkl""rb") as handle:
            data = pickle.load(handle)

        env = data["params"]["env"]

        # create PPO object, commit necessary parameters. Update remaining parameters
        model = cls(env=env, config=config, logger=logger)
        model.__dict__.update(data["params"])

        # set weights from policy and value
        model.policy_net.load_state_dict(data["policy_params"])
        model.value_net.load_state_dict(data["value_params"])

        return model

    def save(self, file: str) -> None:
        """
        Save model as pickle file

        :param file: Path under which the file will be saved

        :return: None

        "
""
        params_dict = self.__dict__.copy()
        del params_dict['logger']
        data = {
            "params": params_dict,
            "policy_params": self.policy_net.state_dict(),
            "value_params" : self.value_net.state_dict()
        }

        with open(f"{file}.pkl""wb") as handle:
            pickle.dump(data, handle)

    def forward(self, observation: np.ndarray, **kwargs) -> Tuple:
        """
        Predicts an action according to the current policy based on the observation
        and the value for the next state

        :param observation: Current observation of teh environment
        :param kwargs: Used to accept but ignore passing actions masks from the environment.

        :return: Predicted action, probability for this action, and predicted value for the next state

        "
""

        observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device)

        dist = self.policy_net(observation)
        value = self.value_net(observation)
        action = dist.sample()

        prob = T.squeeze(dist.log_prob(action)).item()
        action = T.squeeze(action).item()
        value = T.squeeze(value).item()

        return action, prob, value

    def predict(self, observation: np.ndarray, deterministic: bool = True, state=None, **kwargs) -> Tuple:
        """
         Action prediction for testing

        :param observation: Current observation of teh environment
        :param deterministic: Set True, to force a deterministic prediction
        :param state: The last states (used in rnn policies)
        :param kwargs: Used to accept but ignore passing actions masks from the environment.

        :return: Predicted action and next state (used in rnn policies)

        "
""
        observation = T.tensor(np.array([observation]), dtype=T.float).to(self.policy_net.device)

        with T.no_grad():
            dist = self.policy_net(observation)
            if deterministic:
                action = T.argmax(dist.probs)
            else:
                # choose random action according to the predicted probs
                action = dist.sample()
            action = T.squeeze(action).item()

        return action, state

    def train(self) -> None:
        """
        Trains policy and value

        :return: None

        "
""
        # switch to train mode
        self.policy_net.train(True)
        self.value_net.train(True)

        policy_losses, value_losses, entropy_losses, total_losses = [], [], [], []

        for _ in range(self.n_epochs):

            # get data from buffer and random batches(index lists) to iterate over
            # e.g. obs[batch] returns the observations for all indices in batch
            obs_arr, action_arr, old_prob_arr, batches = self.rollout_buffer.generate_batches()

            # get advantage and return values from buffer
            advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device)
            returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device)

            # normalize advantages
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

            for batch in batches:
                observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device)
                old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device)
                actions = T.tensor(action_arr[batch]).to(self.policy_net.device)

                dist = self.policy_net(observations)
                values = self.value_net(observations)
                values = T.squeeze(values)

                # ratio between old and new policy (probs of selected actions)
                # Should be one at the first batch of every train iteration
                new_probs = dist.log_prob(actions)
                prob_ratio = new_probs.exp() / old_probs.exp()

                # policy clip
                policy_loss_1 = prob_ratio * advantages[batch]
                policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch]
                # we want to maximize the reward, but running gradient descent -> negate the loss here
                policy_loss = -T.min(policy_loss_1, policy_loss_2).mean()

                value_loss = (returns[batch]-values)**2
                value_loss = value_loss.mean()

                # entropy loss
                entropy_loss = -T.mean(dist.entropy())
                entropy_losses.append(entropy_loss.item())

                total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss
                self.policy_net.optimizer.zero_grad()
                self.value_net.optimizer.zero_grad()
                total_loss.backward()
                self.policy_net.optimizer.step()
                self.value_net.optimizer.step()

                policy_losses.append(policy_loss.item())
                value_losses.append(value_loss.item())
                total_losses.append(total_loss.item())

        self.n_updates += self.n_epochs

        # logs
        # compute explained variance
        explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns)

        self.logger.record(
            {
                'agent_training/n_updates': self.n_updates,
                'agent_training/loss': np.mean(total_losses),
                'agent_training/policy_gradient_loss': np.mean(policy_losses),
                'agent_training/value_loss': np.mean(value_losses),
                'agent_training/entropy_loss': np.mean(entropy_losses),
                'agent_training/explained_variance': explained_var
            }
            )
        self.logger.dump()

    def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
        """
        Learn over n environment instances or n timesteps. Break depending on which condition is met first
        One learning iteration consists of collecting rollouts and training the networks

        :param total_instances: Instance limit
        :param total_timesteps: Timestep limit
        :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.

        "
""
        instances = 0

        # iterate over n episodes = the agents has n episodes to interact with the environment
        for _ in range(total_instances):
            obs = self.env.reset()
            done = False
            instances += 1

            # run agent on env until done
            while not done:
                action, prob, val = self.forward(obs)
                new_obs, reward, done, info = self.env.step(action)
                self.num_timesteps += 1
                self.rollout_buffer.store_memory(obs, action, prob, val, reward, done)

                # call intermediate_test on_step
                if intermediate_test:
                    intermediate_test.on_step(self.num_timesteps, instances, self)

                # break learn if total_timesteps are reached
                if self.num_timesteps >= total_timesteps:
                    print("total_timesteps reached")
                    self.logger.record(
                        {
                            'results_on_train_dataset/instances': instances,
                            'results_on_train_dataset/num_timesteps': self.num_timesteps
                        }
                    )
                    self.logger.dump()

                    return None

                 # update every n rollout_steps
                if self.num_timesteps % self.rollout_steps == 0:
                    # predict the next reward, needed for the advantage computation of the last collected step
                    with T.no_grad():
                        _, _, val = self.forward(new_obs)
                    self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda)

                    # train networks
                    self.train()
                    # switch back to normal mode
                    self.policy_net.train(False)
                    self.value_net.train(False)

                    # reset buffer to continue collecting rollouts
                    self.rollout_buffer.reset()

                obs = new_obs

            if instances % len(self.env.data) == len(self.env.data) - 1:
                mean_training_reward = np.mean(self.env.episodes_rewards)
                mean_training_makespan = np.mean(self.env.episodes_makespans)
                if len(self.env.episodes_tardinesses) == 0:
                    mean_training_tardiness = 0
                else:
                    mean_training_tardiness = np.mean(self.env.episodes_tardinesses)
                self.logger.record(
                    {
                        'results_on_train_dataset/mean_reward': mean_training_reward,
                        'results_on_train_dataset/mean_makespan': mean_training_makespan,
                        'results_on_train_dataset/mean_tardiness': mean_training_tardiness
                    }
                )
                self.logger.dump()

        print("TRAINING DONE")
        self.logger.record(
            {
                'results_on_train_dataset/instances': instances,
                'results_on_train_dataset/num_timesteps': self.num_timesteps
            }
        )
        self.logger.dump()


def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
    """
    From Stable-Baseline
    Computes fraction of variance that ypred explains about y.
    Returns 1 - Var[y-ypred] / Var[y]

    interpretation:
        ev=0  =>  might as well have predicted zero
        ev=1  =>  perfect prediction
        ev<0  =>  worse than just predicting zero

    :param y_pred: the prediction
    :param y_true: the expected value

    :return: explained variance of ypred and y

    "
""
    assert y_true.ndim == 1 and y_pred.ndim == 1
    var_y = np.var(y_true)
    return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y


if __name__ == "__main__":

    policy_net = PolicyNetwork(4, 10, 0.003, [64, 64], 'ReLU')

    for name, para in policy_net.named_parameters():
        print('{}: {}'.format(name, para.shape))

4.2.3 PPO_MASKED

"""
PPO implementation with action mask according to the StableBaselines3 implementation.
To reuse trained models, you can make use of the save and load function
"
""
import numpy as np
import random
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
import pickle
from typing import Tuple, Any, List

from src.utils.logger import Logger

# constants
POLICY_LAYER: List[int] = [256, 256]
POLICY_ACTIVATION: str = 'ReLU'
VALUE_LAYER: List[int] = [256, 256]
VALUE_ACTIVATION: str = 'ReLU'


class RolloutBuffer:
    """
    Handles episode data collection and batch generation

    :param buffer_size: Buffer size
    :param batch_size: Size for batches to be generated

    "
""
    def __init__(self, buffer_size: int, batch_size: int):

        self.observations = []
        self.probs = []
        self.values = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.action_masks = []
        self.advantages = None
        self.returns = None

        if buffer_size % batch_size != 0:
            raise TypeError("rollout_steps has to be a multiple of batch_size")
        self.buffer_size = buffer_size
        self.batch_size = batch_size

        self.reset()

    def generate_batches(self) -> Tuple:
        """
        Generates batches from the stored data

        :return:  batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size
            e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)]

        "
""
        # create random index list and split into arrays with batch size
        indices = np.random.permutation(self.buffer_size)
        num_batches = int(self.buffer_size / self.batch_size)
        batches = indices.reshape((num_batches, self.batch_size))

        return np.array(self.observations), np.array(self.actions), np.array(self.probs), np.array(self.action_masks),\
               batches

    def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None:
        """
        Computes advantage values and returns for all stored episodes. Required to

        :param last_value: Value from the next step to calculate the advantage for the last episode in the buffer
        :param gamma: Discount factor for the advantage calculation
        :param gae_lambda: Smoothing parameter for the advantage calculation

        :return: None

        "
""
        # advantage: advantage from the actual returned rewards over the baseline value from step t onwards
        last_advantage = 0
        for step in reversed(range(self.buffer_size)):
            # use the predicted reward for the advantage computation of the last step of the buffer
            if step == self.buffer_size - 1:
                # if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage
                # doesn't contain values outside the own episode
                not_done = 1.0 - self.dones[step]
                next_values = last_value
            else:
                not_done = 1.0 - self.dones[step]
                next_values = self.values[step + 1]
            delta = self.rewards[step] + gamma * next_values * not_done - self.values[step]
            last_advantage = delta + gamma * gae_lambda * not_done * last_advantage
            self.advantages[step] = last_advantage

        # compute returns = discounted rewards, advantages = discounted rewards - values
        # Necessary to update the value network
        self.returns = self.values + self.advantages

    def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float,
                     reward: Any, done: bool, action_mask: np.ndarray) -> None:
        "" "
        Appends all data from the recent step

        :param observation: Observation at the beginning of the step
        :param action: Index of the selected action
        :param prob: Probability of the selected action (output from the policy_net)
        :param value: Baseline value that the value_net estimated from this step onwards according to the
        :param observation: Output from the value_net
        :param reward: Reward the env returned in this step
        :param done: True if the episode ended in this step
        :param action_mask: One hot vector with ones for all possible actions

        :return: None

        "
""
        self.observations.append(observation)
        self.actions.append(action)
        self.probs.append(prob)
        self.values.append(value)
        self.rewards.append(reward)
        self.dones.append(done)
        self.action_masks.append(action_mask)

    def reset(self) -> None:
        """
        Resets all buffer lists
        :return: None
        "
""
        self.observations = []
        self.probs = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.values = []
        self.action_masks = []
        self.advantages = np.zeros(self.buffer_size, dtype=np.float32)


class PolicyNetwork(nn.Module):
    """
    Policy Network for the agent

    :param input_dims: Observation size to determine input dimension
    :param n_actions: Number of action to determine output size
    :param learning_rate: Learning rate for the network
    :param fc1_dims: Size hidden layer 1
    :param fc2_dims: Size hidden layer 2

    "
""
    def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str):

        super(PolicyNetwork, self).__init__()

        net_structure = []
        # get activation class according to string
        activation = getattr(nn, activation)()

        # create first hidden layer in accordance with the input dim and the first hidden dim
        net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation])

        # create the other hidden layers
        for i, layer_dim in enumerate(hidden_layers):
            if not i + 1 == len(hidden_layers):
                net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
            else:
                # create output layer
                net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)])

        self.policy_net = nn.Sequential(*net_structure)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu')
        self.to(self.device)

    def forward(self, observation, action_mask):
        """forward through the actor network"""
        observation.to(self.device)
        logits = self.policy_net(observation)

        # mask probabilities if action_mask is not None (for env.reset)
        if action_mask is not None:
            action_mask.to(self.device)
            logits = T.where(action_mask, logits, T.tensor(-1e+8).to(self.device))

        dist = Categorical(logits=logits)
        
        return dist


class ValueNetwork(nn.Module):
    """
    Value Network for the agent

    :param input_dims: Observation size to determine input dimension
    :param learning_rate: Learning rate for the network
    :param fc1_dims: Size hidden layer 1
    :param fc2_dims: Size hidden layer 2

    "
""
    def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
        super(ValueNetwork, self).__init__()

        net_structure = []
        # get activation class according to string
        activation = getattr(nn, activation)()

        # create first hidden layer in accordance with the input dim and the first hidden dim
        net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation])

        # create the other hidden layers
        for i, layer_dim in enumerate(hidden_layers):
            if not i + 1 == len(hidden_layers):
                net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
            else:
                # create output layer
                net_structure.append(nn.Linear(layer_dim, 1))

        self.value_net = nn.Sequential(*net_structure)

        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = T.device('cuda:0'if T.cuda.is_available() else'cpu')
        self.to(self.device)

    def forward(self, observation):
        """forward through the value network"""
        value = self.value_net(observation)

        return value


class MaskedPPO:
    def __init__(self, env, config: dict, logger: Logger = None):
        """
        | gamma: Discount factor for the advantage calculation
        | learning_rate: Learning rate for both, policy_net and value_net
        | gae_lambda: Smoothing parameter for the advantage calculation
        | clip_range: Limitation for the ratio between old and new policy
        | batch_size: Size of batches which were sampled from the buffer and fed into the nets during training
        | n_epochs: Number of repetitions for each training iteration
        | rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size
        "
""

        self.env = env
        self.gamma = config.get('gamma', 0.99)
        self.gae_lambda = config.get('gae_lambda', 0.95)
        self.clip_range = config.get('clip_range', 0.2)
        self.n_epochs = config.get('n_epochs', 0.5)
        self.rollout_steps = config.get('rollout_steps', 2048)
        self.ent_coef = config.get('ent_coef', 0.0)
        self.num_timesteps = 0
        self.n_updates = 0
        self.learning_rate = config.get('learning_rate', 0.002)
        self.batch_size = config.get('batch_size', 256)

        self.logger = logger if logger else Logger(config=config)
        self.seed = config.get('seed', None)

        # torch seed setting
        if self.seed is not None:
            random.seed(self.seed)
            np.random.seed(self.seed)
            T.manual_seed(self.seed)
            # self.env.action_space.seed(seed)
            self.env.seed(self.seed)

        # create networks and buffer
        self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
                                        config.get('policy_layer', POLICY_LAYER),
                                        config.get('policy_activation', POLICY_ACTIVATION))
        self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate,
                                      config.get('value_layer', VALUE_LAYER),
                                      config.get( 'value_activation', VALUE_ACTIVATION))
        self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size)

    @classmethod
    def load(cls, file: str, config: dict, logger: Logger = None):
        """
        Creates a PPO object according to the parameters saved in file.pkl

        :param file: Path and filname (without .pkl) of your saved model pickle file
        :param config: Dictionary with parameters to specify PPO attributes
        :param logger: Logger

        :return: MaskedPPO object

        "
""
        with open(f"{file}.pkl""rb") as handle:
            data = pickle.load(handle)

        env = data["params"]["env"]

        # create PPO object, commit necessary parameters. Update remaining parameters
        model = cls(env=env, config=config, logger=logger)
        model.__dict__.update(data["params"])

        # set weights from policy and value
        model.policy_net.load_state_dict(data["policy_params"])
        model.value_net.load_state_dict(data["value_params"])

        return model

    def save(self, file: str) -> None:
        """
        Save model as pickle file

        :param file: Path under which the file will be saved

        :return: None

        "
""
        params_dict = self.__dict__.copy()
        del params_dict['logger']
        data = {
            "params": params_dict,
            "policy_params": self.policy_net.state_dict(),
            "value_params": self.value_net.state_dict()
        }

        with open(f"{file}.pkl""wb") as handle:
            pickle.dump(data, handle)

    def forward(self, observation: np.ndarray, action_mask: np.ndarray) -> Tuple:
        """
        Predicts an action according to the current policy and based on the action_mask and observation
        and the value for the next state

        :param observation: Current observation of teh environment
        :param action_mask: One hot vector with ones for all possible actions

        :return: Predicted action, probability for this action, and predicted value for the next state

        "
""

        observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device)
        if action_mask is not None:
            action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device)

        dist = self.policy_net(observation, action_mask)
        value = self.value_net(observation)
        action = dist.sample()

        prob = T.squeeze(dist.log_prob(action)).item()
        action = T.squeeze(action).item()
        value = T.squeeze(value).item()

        return action, prob, value

    def predict(self, observation: np.ndarray, action_mask: np.ndarray,
                deterministic: bool = True, state=None) -> Tuple:
        """
        Action prediction for testing

        :param observation: Current observation of teh environment
        :param action_mask: One hot vector with ones for all possible actions
        :param deterministic: Set True, to force a deterministic prediction
        :param state: The last states (used in rnn policies)

        :return: Predicted action and next state (used in rnn policies)

        "
""
        observation = T.tensor(np.array(observation), dtype=T.float).to(self.policy_net.device)
        action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device)

        with T.no_grad():
            dist = self.policy_net(observation, action_mask)
            if deterministic:
                action = T.argmax(dist.probs)
            else:
                # choose random action according to the predicted probs
                action = dist.sample()
            action = T.squeeze(action).item()

        return action, state

    def train(self) -> None:
        """
        Trains policy and value

        :return: None

        "
""
        # switch to train mode
        self.policy_net.train(True)
        self.value_net.train(True)

        policy_losses, value_losses, entropy_losses, total_losses = [], [], [], []

        for _ in range(self.n_epochs):

            # get data from buffer and random batches(index lists) to iterate over
            # e.g. obs[batch] returns the observations for all indices in batch
            obs_arr, action_arr, old_prob_arr, action_mask_arr, batches = self.rollout_buffer.generate_batches()

            # get advantage and return values from buffer
            advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device)
            returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device)

            # normalize advantages
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

            for batch in batches:
                observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device)
                old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device)
                actions = T.tensor(action_arr[batch]).to(self.policy_net.device)
                action_masks = T.tensor(action_mask_arr[batch], dtype=T.bool).to(self.policy_net.device)

                dist = self.policy_net(observations, action_masks)
                values = self.value_net(observations)
                values = T.squeeze(values)

                # ratio between old and new policy (probs of selected actions)
                # Should be one at the first batch of every train iteration
                new_probs = dist.log_prob(actions)
                prob_ratio = new_probs.exp() / old_probs.exp()

                # policy clip
                policy_loss_1 = prob_ratio * advantages[batch]
                policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch]
                # we want to maximize the reward, but running gradient descent -> negate the loss here
                policy_loss = -T.min(policy_loss_1, policy_loss_2).mean()

                value_loss = (returns[batch]-values)**2
                value_loss = value_loss.mean()

                # entropy loss
                entropy_loss = -T.mean(dist.entropy())
                entropy_losses.append(entropy_loss.item())

                total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss
                self.policy_net.optimizer.zero_grad()
                self.value_net.optimizer.zero_grad()
                total_loss.backward()
                self.policy_net.optimizer.step()
                self.value_net.optimizer.step()

                policy_losses.append(policy_loss.item())
                value_losses.append(value_loss.item())
                total_losses.append(total_loss.item())

        self.n_updates += self.n_epochs

        # logs
        # compute explained variance
        explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns)

        self.logger.record(
            {
                'agent_training/n_updates': self.n_updates,
                'agent_training/loss': np.mean(total_losses),
                'agent_training/policy_gradient_loss': np.mean(policy_losses),
                'agent_training/value_loss': np.mean(value_losses),
                'agent_training/entropy_loss': np.mean(entropy_losses),
                'agent_training/explained_variance': explained_var
            }
        )
        self.logger.dump()

    def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
        """
        Learn over n environment instances or n timesteps. Break depending on which condition is met first
        One learning iteration consists of collecting rollouts and training the networks

        :param total_instances: Instance limit
        :param total_timesteps: Timestep limit
        :param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.

        "
""
        instances = 0

        # iterate over n episodes = the agents has n episodes to interact with the environment
        for _ in range(total_instances):
            obs = self.env.reset()
            info = {'mask': None}
            done = False
            instances += 1

            # run agent on env until done
            while not done:
                action, prob, val = self.forward(obs, action_mask=info['mask'])
                new_obs, reward, done, info = self.env.step(action)
                self.num_timesteps += 1
                self.rollout_buffer.store_memory(obs, action, prob, val, reward, done, info['mask'])

                # call intermediate_test on_step
                if intermediate_test:
                    intermediate_test.on_step(self.num_timesteps, instances, self)

                # break learn if total_timesteps are reached
                if self.num_timesteps >= total_timesteps:
                    print("total_timesteps reached")
                    self.logger.record(
                        {
                            'results_on_train_dataset/instances': instances,
                            'results_on_train_dataset/num_timesteps': self.num_timesteps
                        }
                    )
                    self.logger.dump()

                    return None

                # update every n rollout_steps
                if self.num_timesteps % self.rollout_steps == 0:
                    # predict the next reward, needed for the advantage computation of the last collected step
                    with T.no_grad():
                        _, _, val = self.forward(new_obs, info['mask'])
                    self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda)

                    # train networks
                    self.train()
                    # switch back to normal mode
                    self.policy_net.train(False)
                    self.value_net.train(False)

                    # reset buffer to continue collecting rollouts
                    self.rollout_buffer.reset()

                obs = new_obs

            if instances % len(self.env.data) == len(self.env.data) - 1:
                mean_training_reward = np.mean(self.env.episodes_rewards)
                mean_training_makespan = np.mean(self.env.episodes_makespans)
                if len(self.env.episodes_tardinesses) == 0:
                    mean_training_tardiness = 0
                else:
                    mean_training_tardiness = np.mean(self.env.episodes_tardinesses)
                self.logger.record(
                    {
                        'results_on_train_dataset/mean_reward': mean_training_reward,
                        'results_on_train_dataset/mean_makespan': mean_training_makespan,
                         'results_on_train_dataset/mean_tardiness': mean_training_tardiness
                    }
                )
                self.logger.dump()

        print("TRAINING DONE")
        self.logger.record(
            {
                'results_on_train_dataset/instances': instances,
                'results_on_train_dataset/num_timesteps': self.num_timesteps
            }
        )
        self.logger.dump()


def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
    """
    From Stable-Baseline
    Computes fraction of variance that ypred explains about y.
    Returns 1 - Var[y-ypred] / Var[y]

    interpretation:
        ev=0  =>  might as well have predicted zero
        ev=1  =>  perfect prediction
        ev<0  =>  worse than just predicting zero

    :param y_pred: the prediction
    :param y_true: the expected value

    :return: explained variance of ypred and y

    "
""
    assert y_true.ndim == 1 and y_pred.ndim == 1
    var_y = np.var(y_true)
    return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y


if __name__ == "__main__":

    policy_net = PolicyNetwork(4, 10, 0.003)

    for name, para in policy_net.named_parameters():
        print('{}: {}'.format(name, para.shape))

5. 可视化

Schlably还提供了代理性能学习的训练曲线、调度测试案例的甘特图和不同算法间的结果对比等的可视化。

"""
Gantt chart tests.
"
""
import unittest

import copy
from pathlib import Path
from typing import List
import PIL.Image

from src.agents.heuristic.heuristic_agent import HeuristicSelectionAgent
from src.data_generator.task import Task
from src.environments.env_tetris_scheduling import Env
from src.utils.file_handler.config_handler import ConfigHandler
from src.utils.file_handler.data_handler import DataHandler
from src.visuals_generator.gantt_chart import GanttChartPlotter


class TestGanttChart(unittest.TestCase):
    """
    Class with gantt chart tests.
    "
""
    _test_tasks: List[Task]

    @classmethod
    def setUpClass(cls) -> None:
        """

        :return: None
        "
""

        env_config = ConfigHandler.get_config(config_file_path='training/dqn/config_job3_task4_tools0.yaml')
        data = DataHandler.load_instances_data_file(config=env_config)
        cls._test_tasks = copy.deepcopy(data[0])

        done = False
        env = Env(env_config, [data[0]])
        heuristic_agent = HeuristicSelectionAgent()
        while not done:
             # obs = env.state_obs
            mask = env.get_action_mask()

            cls._test_tasks = env.tasks
            task_mask = mask
            action = heuristic_agent(cls._test_tasks, task_mask, 'rand')

            res = env.step(action)
            done = res[2]

    @classmethod
    def tearDownClass(cls) -> None:
        """
        Tear down class
        :return: None
        "
""
        del cls._test_tasks
        cls._trap = None

    def test_get_gantt_chart_image(self) -> None:
        """
        Test gantt chart image
        :return: None
        "
""
        test_image = GanttChartPlotter.get_gantt_chart_image(self._test_tasks)
        self.assertIsInstance(test_image, PIL.Image.Image)

    def test_get_gantt_chart_image_and_save(self) -> None:
        """
        Test gantt chart image and save
        :return: None
        "
""
        test_image_path: Path = \
            GanttChartPlotter.get_gantt_chart_image_and_save(self._test_tasks,
                                                             filename="automated_test_random_name_dc55c0e399428u7e",
                                                             file_type="png")
        test_image_path.unlink()

    def test_get_gantt_chart_gif_and_save(self) -> None:
        """
        Test gantt chart gif and save
        :return: None
        "
""
        # TODO - prevent print output - redirect_stdout does not work
        test_gif_path: Path = \
            GanttChartPlotter.get_gantt_chart_gif_and_save(self._test_tasks,
                                                           filename="automated_test_random_name_dc55c0e399428e3e",
                                                           save_intermediate_images=False,
                                                           quality_dpi=55)
        test_gif_path.unlink()


if __name__ == '__main__':
    unittest.main()

6. 结论

综上所述,Schlably是一个功能强大且易于使用的框架,它不仅解决了许多DRL调度实验中的常见问题,还提供了丰富的工具和资源,帮助研究人员专注于创新性工作。我们期待Schlably能在未来的研究中发挥更大的作用,推动PS领域的发展。

A. 参考文献

  • [1] C. W. de Puiseau, J. Peters, C. Dorpelkus, H. Tercan, and T. Meisen, "schlably: A Python framework for deep reinforcement learning based scheduling experiments," Softwarex, vol. 22, May 2023, Art no. 101383, doi: 10.1016/j.softx.2023.101383.




微信公众号后台回复

加群:加入全球华人OR|AI|DS社区硕博微信学术群

资料:免费获得大量运筹学相关学习资料

人才库:加入运筹精英人才库,获得独家职位推荐

电子书:免费获取平台小编独家创作的优化理论、运筹实践和数据科学电子书,持续更新中ing...

加入我们:加入「运筹OR帷幄」,参与内容创作平台运营

知识星球:加入「运筹OR帷幄」数据算法社区,免费参与每周「领读计划」、「行业inTalk」、「OR会客厅」等直播活动,与数百位签约大V进行在线交流



                    


        



Image

文章须知

推文作者:韩宝安

责任编辑:张琪

微信编辑:疑疑

文章转载自『智能制造与智能调度 』公众号,原文链接:Schlably:深度强化学习车间调度实验的Python框架




关注我们 

       FOLLOW US







































Image

Image

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/185885
 
31 次点击