Package kuimaze :: Module gym_wrapper
[hide private]
[frames] | no frames]

Source Code for Module kuimaze.gym_wrapper

  1  # -*- coding: utf-8 -*- 
  2   
  3  ''' 
  4  File wrapping maze.py functionality into few methods defined by OpenAI GYM 
  5  @author: Zdeněk Rozsypálek, Tomas Svoboda 
  6  @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz 
  7  @copyright: (c) 2017, 2018 
  8  ''' 
  9   
 10  import collections 
 11  import os 
 12  import sys 
 13  import numpy as np 
 14  import gym 
 15  import copy 
 16  from gym import spaces 
 17  from gym.utils import seeding 
 18   
 19  import kuimaze 
 20  from .map_generator import maze as mapgen_maze 
 21   
 22  path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action']) 
 23  state = collections.namedtuple('State', ['x', 'y']) 
 24   
25 -class MazeEnv(gym.Env):
26 metadata = {'render.modes': ['human']} 27 _path = [] 28 _visited = [] 29 MAP = '../maps/easy/easy3.bmp' 30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 ''' 33 Class wrapping Maze into gym enviroment. 34 @param informed: boolean 35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze 36 @param deter: boolean - T = deterministic maze, F = probabilistic maze 37 @param map_image_dir: string - path to image of map 38 @param grad: tuple - vector tuning the tilt of maze` 39 ''' 40 if map_image_dir is None: 41 ''' 42 If there is no map in parameter, it will be generated, with following setup 43 ''' 44 x_size = 6 45 y_size = 6 # not 100% accurate size, could have smaller dimensions 46 complexity = 0.1 # in interval (0, 1] 47 density = 0.25 # in interval [0, 1] 48 self.MAP = mapgen_maze(x_size, y_size, complexity, density) 49 else: 50 self.MAP = map_image_dir 51 if grad is None: 52 self._grad = (0, 0) 53 else: 54 self._grad = grad 55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards) 56 self._player = EnvAgent(self._problem) 57 self._curr_state = self._problem.get_start_state() 58 self._informed = informed 59 self._gym_compatible = gym_compatible 60 self._deter = deter 61 self._gui_disabled = True 62 self._set = False 63 # set action and observation space 64 self._xsize = self._problem.get_dimensions()[0] 65 self._ysize = self._problem.get_dimensions()[1] 66 self.action_space = self._get_action_space() 67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))) 68 self.seed() 69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!" 73 last_state = self._curr_state 74 assert(0 <= action <= 3) 75 if not self._deter: 76 action = self._problem.non_det_result(action) 77 self._curr_state = self._problem.result(self._curr_state, action) 78 self._path.append(self._curr_state) 79 if self._curr_state not in self._visited: 80 self._visited.append(self._curr_state) 81 reward, done = self._get_reward(self._curr_state, last_state) 82 # reward = self._problem.get_state_reward(self._curr_state) 83 return self._get_observation(), reward, done, None
84
85 - def get_all_states(self):
86 ''' 87 auxiliary function for MDPs - where states (map) are supposed to be known 88 :return: list of states 89 ''' 90 return self._problem.get_all_states()
91
92 - def reset(self):
93 self._set = True 94 self._gui_disabled = True 95 self._path = [] 96 self._visited = [] 97 self._problem.clear_player_data() 98 self._problem.set_player(self._player) 99 if self._gym_compatible: 100 self._path.append(self._problem.get_start_state()) 101 self._visited.append(self._problem.get_start_state()) 102 self._curr_state = self._problem.get_start_state() 103 return self._get_observation()
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!" 107 self._gui_disabled = False 108 if visited is None: 109 self._problem.set_visited(self._visited) 110 else: 111 self._problem.set_visited(self._visited) 112 if explored is None: 113 self._problem.set_explored([self._curr_state]) 114 else: 115 self._problem.set_explored(explored) 116 117 self._problem.show_and_break() 118 self._gui_on = True
119
120 - def close(self):
121 self._gui_disabled = True 122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed) 126 return [seed]
127
128 - def save_path(self):
129 ''' 130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script 131 runned from. 132 @return: None 133 ''' 134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first" 135 # at the moment it assumes the output directory exists 136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt") 137 with open(pathfname, 'wt') as f: 138 # then go backwards throught the path restored by bactracking 139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible: 140 for pos in self._path: 141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1])))) 142 if self._gym_compatible: 143 for pos in self._path: 144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
146 - def save_eps(self):
147 ''' 148 Save last rendered image into directory where the script was runned from. 149 @return: None 150 ''' 151 assert not self._gui_disabled, "render() must be called before save_eps" 152 self._problem.save_as_eps(self._gui_disabled)
153
154 - def visualise(self, dictionary=None):
155 ''' 156 Visualise input. If visualise is called before GUI opening, render() is called first 157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary: 158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar 159 or 4 dimensional vector (tuple or list). 160 @return: none 161 ''' 162 assert self._set, "reset() must be called before any visualisation setting!" 163 if self._gui_disabled: 164 self.render() 165 self._problem.visualise(dictionary)
166
167 - def _get_observation(self):
168 ''' 169 method to generate observation - current state, finish states 170 @return: tuple 171 ''' 172 if self._informed: 173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))] 174 for n in self._problem.get_goal_nodes(): 175 ret.append((n.x, n.y, self._get_depth(n))) 176 else: 177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)] 178 return tuple(ret)
179
180 - def _get_action_space(self):
181 ''' 182 method to get action space - all available actions in enviroment 183 @return: spaces 184 ''' 185 if self._gym_compatible: 186 return spaces.Discrete(4) 187 else: 188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
190 - def __get_reward_curr_state(self):
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
193 - def _get_reward(self, curr, last):
194 ''' 195 returns reward and indication of goal state 196 @param curr: new state 197 @param last: last state 198 @return: float, boolean 199 ''' 200 reward = -2 201 done = False 202 vector = [curr.x - last.x, curr.y - last.y] 203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 204 if curr != last: 205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis) 206 if self._problem.is_goal_state(curr): 207 reward = 100.0 208 done = True 209 if self._gym_compatible: 210 self._player.set_path(self._path) 211 self._player.find_path() 212 return reward, done
213
214 - def _get_depth(self, state):
215 ''' 216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0. 217 @param state: namedtuple state 218 @return: float 219 ''' 220 start = self._problem.get_start_state() 221 vector = [state.x - start.x, state.y - start.y] 222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1] 223 return float(format(ret, '.3f'))
224 225
226 -class EnvAgent(kuimaze.BaseAgent):
227 ''' 228 Class necessary for wrapping maze 229 ''' 230 __path = [] 231
232 - def set_path(self, path):
233 self.__path = path
234
235 - def find_path(self):
236 ''' 237 visualise path of the agent, path must be set before visualising! 238 @return: 239 ''' 240 ret = [] 241 for i in range(len(self.__path) - 1): 242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None)) 243 try: 244 self.problem.show_path(ret) 245 except: 246 pass # do nothing if no graphics is prepared 247 return self.__path
248 249
250 -class EasyMazeEnv(MazeEnv):
251 ''' 252 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to 253 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze. 254 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement. 255 ''' 256
257 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
258 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad) 259 self._gui_on = False
260
261 - def step(self, action):
262 last_state = self._curr_state 263 assert (type(action) == list or type(action) == tuple) and len(action) == 2 264 self._curr_state = self._easy_result(action) 265 if self._curr_state not in self._visited: 266 self._visited.append(self._curr_state) 267 reward, done = self._get_reward(self._curr_state, last_state) 268 return self._get_observation(), reward, done, None
269 270 # def render(self, mode='human', close=False): 271 # super(EasyMazeEnv, self).render(mode, close) 272 # self._gui_on = True 273
274 - def set_path(self, path):
275 ''' 276 This method sets enviroment to visualize your found path. Method render, must be called afterwards. 277 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ] 278 @return: None 279 ''' 280 ret = [] 281 self._path = path 282 if self._gui_on: 283 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3) 284 previus_state = None 285 for state_list in path: 286 if previus_state != None: 287 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1): 288 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1') 289 ret.append(state(state_list[0], state_list[1])) 290 previus_state = copy.copy(state_list) 291 292 self._player.set_path(ret) 293 self._player.find_path()
294
295 - def _is_available(self, new_state):
296 ''' 297 returns true if new state is available 298 @param new_state: 299 @return: boolean 300 ''' 301 tmp = [] 302 tmp.extend(self._visited) 303 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1), 304 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)]) 305 return new_state in tmp
306
307 - def _easy_result(self, state_list):
308 ''' 309 Gives result of desired action in parameter 310 @param state_list: list or tuple of coordinates [x, y] 311 @return: state - new position of agent 312 ''' 313 new_state = state(state_list[0], state_list[1]) 314 if self._is_available(new_state): 315 return new_state 316 else: 317 # print('UNAVAILABLE ' + str(new_state) + ' from ' + str(self._curr_state)) 318 return self._curr_state
319
320 - def _get_cost(self, curr, last):
321 ''' 322 returns cost of movement from last to curr 323 @param curr: new state 324 @param last: last state 325 @return: float 326 ''' 327 reward = 0 328 vector = [curr.x - last.x, curr.y - last.y] 329 # TODO rewrite cost function 330 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 331 addition_cost = 0 332 if curr in self._problem.hard_places: 333 addition_cost = 5 334 335 if curr != last: 336 reward = abs(vector[0]) + abs(vector[1]) + z_axis + addition_cost 337 return reward
338
339 - def expand(self,position):
340 ''' 341 returns tuple of positions with associated costs that can be visited from "position" 342 @param position: position in the maze defined by coordinates (x,y) 343 344 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ] 345 ''' 346 expanded_nodes = [] 347 maze_pose = state(position[0], position[1]) 348 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 349 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)] 350 for new_state in tmp: 351 if new_state.x == maze_pose.x and new_state.y == maze_pose.y: 352 continue 353 if new_state not in self._visited: 354 self._visited.append(new_state) 355 reward = self._get_cost(maze_pose, new_state) 356 expanded_nodes.append([(new_state.x, new_state.y), reward]) 357 return expanded_nodes
358 359 360 361 362 ''' 363 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor. 364 Main method of wrapper is function step, which returns three values: 365 366 Observations: 367 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...) 368 For Uninformed only (current position coords) 369 370 Rewards: 371 When agent moves to different place it gets reward -1 - depth. 372 When agent reaches finish it gets reward +100. 373 If unavailible action is called, agent stays in same position and reward is 0. 374 375 Done: 376 True when agent reaches the finish. 377 378 Input (parameter) of step method is defined by action space: 379 Easy maze action space is list [x_coordinate, y_coordinate]. 380 Hard maze action space is integer from 0 to 3. 381 ''' 382 383
384 -class InfEasyMaze(EasyMazeEnv):
385 ''' 386 informed easy maze, suitable for A* implementation 387 step([x, y]) 388 '''
389 - def __init__(self, map_image=None, grad=(0, 0)):
390 super(InfEasyMaze, self).__init__(True, map_image, grad)
391 392
393 -class EasyMaze(EasyMazeEnv):
394 ''' 395 uninformed easy maze, suitable for BFS, DFS ... 396 step([x, y]) 397 '''
398 - def __init__(self, map_image=None, grad=(0, 0)):
399 super(EasyMaze, self).__init__(False, map_image, grad)
400 401
402 -class MDPMaze(MazeEnv):
403 ''' 404 maze for solving MDP problems 405 '''
406 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
407 if probs is not None: 408 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 409 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 410 else: 411 super().__init__(False, True, True, map_image, grad)
412
413 - def _get_reward(self, curr, last):
414 ''' 415 returns reward and indication of goal state 416 @param curr: new state 417 @param last: last state 418 @return: float, boolean 419 ''' 420 reward = self._problem.get_state_reward(curr) 421 done = False 422 vector = [curr.x - last.x, curr.y - last.y] 423 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 424 if curr != last: 425 reward = z_axis + self._problem.get_state_reward(last) 426 if self._problem.is_goal_state(curr): 427 reward = self._problem.get_state_reward(curr) 428 done = True 429 if self._gym_compatible: 430 self._player.set_path(self._path) 431 self._player.find_path() 432 return reward, done
433
434 - def get_actions(self, state):
435 return self._problem.get_actions(state)
436
437 - def is_goal_state(self, state):
438 return self._problem.is_goal_state(state)
439
440 - def is_terminal_state(self, state):
441 return self._problem.is_goal_state(state) or self._problem.is_danger_state(state)
442
443 - def get_next_states_and_probs(self, state, action):
444 return self._problem.get_next_states_and_probs(state, action)
445
446 - def get_state_reward(self,curr):
447 return self._problem.get_state_reward(curr)
448 449
450 -class HardMaze(MazeEnv):
451 ''' 452 Uninformed hard maze, suitable for reinforcement learning 453 step(param) where param is integer; 0 <= param <= 3 454 '''
455 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
456 if probs is not None: 457 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 458 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 459 else: 460 super(HardMaze, self).__init__(False, True, True, map_image, grad)
461
462 - def _get_reward(self, curr, last):
463 ''' 464 returns reward and indication of goal state 465 @param curr: new state 466 @param last: last state 467 @return: float, boolean 468 ''' 469 reward = self._problem.get_state_reward(last) # energy consumption 470 done = False 471 vector = [curr.x - last.x, curr.y - last.y] 472 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 473 if curr != last: 474 reward = reward - z_axis # more or less depending on the gradient; going up costs more 475 if self._problem.is_goal_state(curr): 476 reward = reward + self._problem.get_state_reward(curr) 477 done = True 478 if self._gym_compatible: 479 self._player.set_path(self._path) 480 self._player.find_path() 481 return reward, done
482
483 -class InfHardMaze(MazeEnv):
484 ''' 485 Informed hard maze, suitable for reinforcement learning 486 step(param) where param is integer; 0 <= param <= 3 487 '''
488 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
489 if probs is not None: 490 super(InfHardMaze, self).__init__(True, True, False, map_image, grad) 491 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 492 else: 493 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
494