Package kuimaze :: Module gym_wrapper
[hide private]
[frames] | no frames]

Source Code for Module kuimaze.gym_wrapper

  1  # -*- coding: utf-8 -*- 
  2   
  3  ''' 
  4  File wrapping maze.py functionality into few methods defined by OpenAI GYM 
  5  @author: Zdeněk Rozsypálek, Tomas Svoboda 
  6  @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz 
  7  @copyright: (c) 2017, 2018 
  8  ''' 
  9   
 10  import collections 
 11  import os 
 12  import sys 
 13  import numpy as np 
 14  import gym 
 15  import copy 
 16  from gym import spaces 
 17  from gym.utils import seeding 
 18   
 19  import kuimaze 
 20  from .map_generator import maze as mapgen_maze 
 21   
 22  path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action']) 
 23  state = collections.namedtuple('State', ['x', 'y']) 
 24   
25 -class MazeEnv(gym.Env):
26 metadata = {'render.modes': ['human']} 27 _path = [] 28 _visited = [] 29 MAP = '../maps/easy/easy3.bmp' 30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 ''' 33 Class wrapping Maze into gym enviroment. 34 @param informed: boolean 35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze 36 @param deter: boolean - T = deterministic maze, F = probabilistic maze 37 @param map_image_dir: string - path to image of map 38 @param grad: tuple - vector tuning the tilt of maze` 39 ''' 40 if map_image_dir is None: 41 ''' 42 If there is no map in parameter, it will be generated, with following setup 43 ''' 44 x_size = 6 45 y_size = 6 # not 100% accurate size, could have smaller dimensions 46 complexity = 0.1 # in interval (0, 1] 47 density = 0.25 # in interval [0, 1] 48 self.MAP = mapgen_maze(x_size, y_size, complexity, density) 49 else: 50 self.MAP = map_image_dir 51 if grad is None: 52 self._grad = (0, 0) 53 else: 54 self._grad = grad 55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards) 56 self._player = EnvAgent(self._problem) 57 self._curr_state = self._problem.get_start_state() 58 self._informed = informed 59 self._gym_compatible = gym_compatible 60 self._deter = deter 61 self._gui_disabled = True 62 self._set = False 63 # set action and observation space 64 self._xsize = self._problem.get_dimensions()[0] 65 self._ysize = self._problem.get_dimensions()[1] 66 self.action_space = self._get_action_space() 67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))) 68 self.seed() 69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!" 73 last_state = self._curr_state 74 assert(0 <= action <= 3) 75 if not self._deter: 76 action = self._problem.non_det_result(action) 77 self._curr_state = self._problem.result(self._curr_state, action) 78 self._path.append(self._curr_state) 79 if self._curr_state not in self._visited: 80 self._visited.append(self._curr_state) 81 reward, done = self._get_reward(self._curr_state, last_state) 82 # reward = self._problem.get_state_reward(self._curr_state) 83 return self._get_observation(), reward, done, None
84
85 - def get_all_states(self):
86 ''' 87 auxiliary function for MDPs - where states (map) are supposed to be known 88 :return: list of states 89 ''' 90 return self._problem.get_all_states()
91
92 - def reset(self):
93 self._set = True 94 self._gui_disabled = True 95 self._path = [] 96 self._visited = [] 97 self._problem.clear_player_data() 98 self._problem.set_player(self._player) 99 if self._gym_compatible: 100 self._path.append(self._problem.get_start_state()) 101 self._visited.append(self._problem.get_start_state()) 102 self._curr_state = self._problem.get_start_state() 103 return self._get_observation()
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!" 107 self._gui_disabled = False 108 if visited is None: 109 self._problem.set_visited(self._visited) 110 else: 111 self._problem.set_visited(self._visited) 112 if explored is None: 113 self._problem.set_explored([self._curr_state]) 114 else: 115 self._problem.set_explored(explored) 116 117 self._problem.show_and_break() 118 self._gui_on = True
119
120 - def close(self):
121 self._gui_disabled = True 122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed) 126 return [seed]
127
128 - def save_path(self):
129 ''' 130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script 131 runned from. 132 @return: None 133 ''' 134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first" 135 # at the moment it assumes the output directory exists 136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt") 137 with open(pathfname, 'wt') as f: 138 # then go backwards throught the path restored by bactracking 139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible: 140 for pos in self._path: 141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1])))) 142 if self._gym_compatible: 143 for pos in self._path: 144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
146 - def save_eps(self):
147 ''' 148 Save last rendered image into directory where the script was runned from. 149 @return: None 150 ''' 151 assert not self._gui_disabled, "render() must be called before save_eps" 152 self._problem.save_as_eps(self._gui_disabled)
153
154 - def visualise(self, dictionary=None):
155 ''' 156 Visualise input. If visualise is called before GUI opening, render() is called first 157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary: 158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar 159 or 4 dimensional vector (tuple or list). 160 @return: none 161 ''' 162 assert self._set, "reset() must be called before any visualisation setting!" 163 if self._gui_disabled: 164 self.render() 165 self._problem.visualise(dictionary)
166
167 - def _get_observation(self):
168 ''' 169 method to generate observation - current state, finish states 170 @return: tuple 171 ''' 172 if self._informed: 173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))] 174 for n in self._problem.get_goal_nodes(): 175 ret.append((n.x, n.y, self._get_depth(n))) 176 else: 177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)] 178 return tuple(ret)
179
180 - def _get_action_space(self):
181 ''' 182 method to get action space - all available actions in enviroment 183 @return: spaces 184 ''' 185 if self._gym_compatible: 186 return spaces.Discrete(4) 187 else: 188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
190 - def __get_reward_curr_state(self):
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
193 - def _get_reward(self, curr, last):
194 ''' 195 returns reward and indication of goal state 196 @param curr: new state 197 @param last: last state 198 @return: float, boolean 199 ''' 200 reward = -2 201 done = False 202 vector = [curr.x - last.x, curr.y - last.y] 203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 204 if curr != last: 205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis) 206 if self._problem.is_goal_state(curr): 207 reward = 100.0 208 done = True 209 if self._gym_compatible: 210 self._player.set_path(self._path) 211 self._player.find_path() 212 return reward, done
213
214 - def _get_depth(self, state):
215 ''' 216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0. 217 @param state: namedtuple state 218 @return: float 219 ''' 220 start = self._problem.get_start_state() 221 vector = [state.x - start.x, state.y - start.y] 222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1] 223 return float(format(ret, '.3f'))
224 225
226 -class EnvAgent(kuimaze.BaseAgent):
227 ''' 228 Class necessary for wrapping maze 229 ''' 230 __path = [] 231
232 - def set_path(self, path):
233 self.__path = path
234
235 - def find_path(self):
236 ''' 237 visualise path of the agent, path must be set before visualising! 238 @return: 239 ''' 240 ret = [] 241 for i in range(len(self.__path) - 1): 242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None)) 243 self.problem.show_path(ret) 244 return self.__path
245 246
247 -class EasyMazeEnv(MazeEnv):
248 ''' 249 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to 250 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze. 251 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement. 252 ''' 253
254 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
255 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad) 256 self._gui_on = False
257
258 - def step(self, action):
259 last_state = self._curr_state 260 assert (type(action) == list or type(action) == tuple) and len(action) == 2 261 self._curr_state = self._easy_result(action) 262 if self._curr_state not in self._visited: 263 self._visited.append(self._curr_state) 264 reward, done = self._get_reward(self._curr_state, last_state) 265 return self._get_observation(), reward, done, None
266 267 # def render(self, mode='human', close=False): 268 # super(EasyMazeEnv, self).render(mode, close) 269 # self._gui_on = True 270
271 - def set_path(self, path):
272 ''' 273 This method sets enviroment to visualize your found path. Method render, must be called afterwards. 274 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ] 275 @return: None 276 ''' 277 ret = [] 278 self._path = path 279 if self._gui_on: 280 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3) 281 previus_state = None 282 for state_list in path: 283 if previus_state != None: 284 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1): 285 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1') 286 ret.append(state(state_list[0], state_list[1])) 287 previus_state = copy.copy(state_list) 288 289 self._player.set_path(ret) 290 self._player.find_path()
291
292 - def _is_available(self, new_state):
293 ''' 294 returns true if new state is available 295 @param new_state: 296 @return: boolean 297 ''' 298 tmp = [] 299 tmp.extend(self._visited) 300 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1), 301 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)]) 302 return new_state in tmp
303
304 - def _easy_result(self, state_list):
305 ''' 306 Gives result of desired action in parameter 307 @param state_list: list or tuple of coordinates [x, y] 308 @return: state - new position of agent 309 ''' 310 new_state = state(state_list[0], state_list[1]) 311 if self._is_available(new_state): 312 return new_state 313 else: 314 # print('UNAVAILABLE ' + str(new_state) + ' from ' + str(self._curr_state)) 315 return self._curr_state
316
317 - def _get_cost(self, curr, last):
318 ''' 319 returns cost of movement from last to curr 320 @param curr: new state 321 @param last: last state 322 @return: float 323 ''' 324 reward = 0 325 vector = [curr.x - last.x, curr.y - last.y] 326 # TODO rewrite cost function 327 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 328 addition_cost = 0 329 if curr in self._problem.hard_places: 330 addition_cost = 5 331 332 if curr != last: 333 reward = abs(vector[0]) + abs(vector[1]) + z_axis + addition_cost 334 return reward
335
336 - def expand(self,position):
337 ''' 338 returns tuple of positions with associated costs that can be visited from "position" 339 @param position: position in the maze defined by coordinates (x,y) 340 341 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ] 342 ''' 343 expanded_nodes = [] 344 maze_pose = state(position[0], position[1]) 345 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 346 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)] 347 for new_state in tmp: 348 if new_state.x == maze_pose.x and new_state.y == maze_pose.y: 349 continue 350 if new_state not in self._visited: 351 self._visited.append(new_state) 352 reward = self._get_cost(maze_pose, new_state) 353 expanded_nodes.append([(new_state.x, new_state.y), reward]) 354 return expanded_nodes
355 356 357 358 359 ''' 360 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor. 361 Main method of wrapper is function step, which returns three values: 362 363 Observations: 364 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...) 365 For Uninformed only (current position coords) 366 367 Rewards: 368 When agent moves to different place it gets reward -1 - depth. 369 When agent reaches finish it gets reward +100. 370 If unavailible action is called, agent stays in same position and reward is 0. 371 372 Done: 373 True when agent reaches the finish. 374 375 Input (parameter) of step method is defined by action space: 376 Easy maze action space is list [x_coordinate, y_coordinate]. 377 Hard maze action space is integer from 0 to 3. 378 ''' 379 380
381 -class InfEasyMaze(EasyMazeEnv):
382 ''' 383 informed easy maze, suitable for A* implementation 384 step([x, y]) 385 '''
386 - def __init__(self, map_image=None, grad=(0, 0)):
387 super(InfEasyMaze, self).__init__(True, map_image, grad)
388 389
390 -class EasyMaze(EasyMazeEnv):
391 ''' 392 uninformed easy maze, suitable for BFS, DFS ... 393 step([x, y]) 394 '''
395 - def __init__(self, map_image=None, grad=(0, 0)):
396 super(EasyMaze, self).__init__(False, map_image, grad)
397 398
399 -class MDPMaze(MazeEnv):
400 ''' 401 maze for solving MDP problems 402 '''
403 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
404 if probs is not None: 405 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 406 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 407 else: 408 super().__init__(False, True, True, map_image, grad)
409
410 - def _get_reward(self, curr, last):
411 ''' 412 returns reward and indication of goal state 413 @param curr: new state 414 @param last: last state 415 @return: float, boolean 416 ''' 417 reward = self._problem.get_state_reward(curr) 418 done = False 419 vector = [curr.x - last.x, curr.y - last.y] 420 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 421 if curr != last: 422 reward = z_axis + self._problem.get_state_reward(curr) 423 if self._problem.is_goal_state(curr): 424 reward = self._problem.get_state_reward(curr) 425 done = True 426 if self._gym_compatible: 427 self._player.set_path(self._path) 428 self._player.find_path() 429 return reward, done
430
431 - def get_actions(self, state):
432 return self._problem.get_actions(state)
433
434 - def is_goal_state(self, state):
435 return self._problem.is_goal_state(state)
436
437 - def is_terminal_state(self, state):
438 return self._problem.is_goal_state(state) or self._problem.is_danger_state(state)
439
440 - def get_next_states_and_probs(self, state, action):
441 return self._problem.get_next_states_and_probs(state, action)
442
443 - def get_state_reward(self,curr):
444 return self._problem.get_state_reward(curr)
445 446 447
448 -class HardMaze(MazeEnv):
449 ''' 450 Uninformed hard maze, suitable for reinforcement learning 451 step(param) where param is integer; 0 <= param <= 3 452 '''
453 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
454 if probs is not None: 455 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 456 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 457 else: 458 super(HardMaze, self).__init__(False, True, True, map_image, grad)
459 460
461 -class InfHardMaze(MazeEnv):
462 ''' 463 Informed hard maze, suitable for reinforcement learning 464 step(param) where param is integer; 0 <= param <= 3 465 '''
466 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
467 if probs is not None: 468 super(InfHardMaze, self).__init__(True, True, False, map_image, grad) 469 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 470 else: 471 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
472