import os os.environ['OMP_NUM_THREADS'] = '8' import gymnasium as gym import json import numpy as np from flexsim_env import FlexSimEnv from stable_baselines3 import PPO from stable_baselines3.common.env_checker import check_env from stable_baselines3 import DQN class SimplifiedFlexSimEnv(FlexSimEnv): def __init__(self, flexsimPath, modelPath, address='localhost', port=5005, verbose=False, visible=False): super().__init__(flexsimPath, modelPath, address, port, verbose, visible) # Simplified action space: self.num_actions = 24 # Example: 24 possible binary actions for simplicity self.action_space = gym.spaces.Discrete(2**self.num_actions) # Precedence matrices for Products A and B self.precedence_matrix_A = [ [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1], [0, 0, 0, 0, 0, 0, 1], [0, 0, 0, 0, 0, 0, 0] ] self.precedence_matrix_B = [ [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1], [0, 0, 0, 0] ] def _enforce_constraints(self, action): if not (isinstance(action, np.ndarray) and action.dtype == np.int_ and action.ndim == 1): action = list(map(int, format(action, f'0{self.num_actions}b'))) last_observation_dict = json.loads(self.lastObservation) health_status = last_observation_dict["state"][:2] num_tasks_A = 7 num_tasks_B = 4 task_offset = 2 def check_precedence(product, task, ws): if product == 'A': for prev_task in range(num_tasks_A): if self.precedence_matrix_A[prev_task][task] == 1: prev_assigned_ws = np.argmax(action[task_offset + prev_task * 2: task_offset + (prev_task + 1) * 2]) if prev_assigned_ws >= ws: return False elif product == 'B': for prev_task in range(num_tasks_B): if self.precedence_matrix_B[prev_task][task] == 1: prev_assigned_ws = np.argmax(action[task_offset + num_tasks_A * 2 + prev_task * 2: task_offset + num_tasks_A * 2 + (prev_task + 1) * 2]) if prev_assigned_ws >= ws: return False return True # Enforcing task assignments based on health status and precedence for task in range(num_tasks_A): assigned = False for ws in range(2): if action[task_offset + task * 2 + ws] == 1: if health_status[ws] >= 3 or not check_precedence('A', task, ws): action[task_offset + task * 2 + ws] = 0 # Invalidate the assignment if constraints are violated elif assigned: action[task_offset + task * 2 + ws] = 0 else: assigned = True for task in range(num_tasks_B): assigned = False for ws in range(2): if action[task_offset + num_tasks_A * 2 + task * 2 + ws] == 1: if health_status[ws] >= 3 or not check_precedence('B', task, ws): action[task_offset + num_tasks_A * 2 + task * 2 + ws] = 0 # Invalidate the assignment if constraints are violated elif assigned: action[task_offset + num_tasks_A * 2 + task * 2 + ws] = 0 else: assigned = True # Post-processing: Ensure all tasks are assigned for task in range(num_tasks_A): if not any(action[task_offset + task * 2: task_offset + (task + 1) * 2]): for ws in range(2): if health_status[ws] < 3 and check_precedence('A', task, ws): action[task_offset + task * 2 + ws] = 1 break # Assign to the first valid workstation for task in range(num_tasks_B): if not any(action[task_offset + num_tasks_A * 2 + task * 2: task_offset + num_tasks_A * 2 + (task + 1) * 2]): for ws in range(2): if health_status[ws] < 3 and check_precedence('B', task, ws): action[task_offset + num_tasks_A * 2 + task * 2 + ws] = 1 break # Assign to the first valid workstation return action def step(self, action): # Apply constraints to the action before taking a step constrained_action = self._enforce_constraints(action) state, reward, terminated, truncated, info = super().step(constrained_action) print(f"Step Info - Reward: {reward:.2f}, Terminated: {terminated}, Truncated: {truncated}") return state, reward, terminated, truncated, info def main(): print("Initializing FlexSim environment...") env = SimplifiedFlexSimEnv( flexsimPath="C:/Program Files/FlexSim 2024 Update 1/program/flexsim.exe", modelPath="C:/MMALBPAUG19_Maintenance.fsm", verbose=False, visible=False ) check_env(env) policy_kwargs = dict( net_arch=[32, 32] ) model = DQN("MlpPolicy", env, verbose=1, batch_size=32, buffer_size=500, learning_starts=500, target_update_interval=500, train_freq=4, gradient_steps=2, learning_rate=1e-4, policy_kwargs=policy_kwargs) print("Training model...") model.learn(total_timesteps=10000) print("After model update") # Save the trained model print("Saving model...") model.save("SingleAgentTrainedModel") # Testing the model after training for episode in range(2): env.seed(episode) observation, _ = env.reset() env.render() done = False rewards = [] timestep = 0 while not done: action, _ = model.predict(observation) observation, reward, terminated, truncated, info = env.step(action) timestep += 1 print(f"Episode {episode+1}, Timestep {timestep}, Reward: {reward:.2f}") env.render() rewards.append(reward) if terminated or truncated: cumulative_reward = sum(rewards) print(f"Episode {episode+1} completed | Cumulative Reward: {cumulative_reward:.2f}") env._release_flexsim() input("Waiting for input to close FlexSim...") env.close() if __name__ == "__main__": main()