Package kuimaze :: Module gym_wrapper
[hide private]
[frames] | no frames]

Source Code for Module kuimaze.gym_wrapper

  1  # -*- coding: utf-8 -*- 
  2   
  3  ''' 
  4  File wrapping maze.py functionality into few methods defined by OpenAI GYM 
  5  @author: Zdeněk Rozsypálek, Tomas Svoboda 
  6  @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz 
  7  @copyright: (c) 2017, 2018 
  8  ''' 
  9   
 10  import collections 
 11  import os 
 12  import sys 
 13  import numpy as np 
 14  import gym 
 15  import copy 
 16  from gym import spaces 
 17  from gym.utils import seeding 
 18   
 19  import kuimaze 
 20  from .map_generator import maze as mapgen_maze 
 21   
 22  path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action']) 
 23  state = collections.namedtuple('State', ['x', 'y']) 
 24   
25 -class MazeEnv(gym.Env):
26 metadata = {'render.modes': ['human']} 27 _path = [] 28 _visited = [] 29 MAP = '../maps/easy/easy3.bmp' 30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 ''' 33 Class wrapping Maze into gym enviroment. 34 @param informed: boolean 35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze 36 @param deter: boolean - T = deterministic maze, F = probabilistic maze 37 @param map_image_dir: string - path to image of map 38 @param grad: tuple - vector tuning the tilt of maze` 39 ''' 40 if map_image_dir is None: 41 ''' 42 If there is no map in parameter, it will be generated, with following setup 43 ''' 44 x_size = 6 45 y_size = 6 # not 100% accurate size, could have smaller dimensions 46 complexity = 0.1 # in interval (0, 1] 47 density = 0.25 # in interval [0, 1] 48 self.MAP = mapgen_maze(x_size, y_size, complexity, density) 49 else: 50 self.MAP = map_image_dir 51 if grad is None: 52 self._grad = (0, 0) 53 else: 54 self._grad = grad 55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards) 56 self._player = EnvAgent(self._problem) 57 self._curr_state = self._problem.get_start_state() 58 self._informed = informed 59 self._gym_compatible = gym_compatible 60 self._deter = deter 61 self._gui_disabled = True 62 self._set = False 63 # set action and observation space 64 self._xsize = self._problem.get_dimensions()[0] 65 self._ysize = self._problem.get_dimensions()[1] 66 self.action_space = self._get_action_space() 67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))) 68 self.seed() 69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!" 73 last_state = self._curr_state 74 assert(0 <= action <= 3) 75 if not self._deter: 76 action = self._problem.non_det_result(action) 77 self._curr_state = self._problem.result(self._curr_state, action) 78 self._path.append(self._curr_state) 79 if self._curr_state not in self._visited: 80 self._visited.append(self._curr_state) 81 reward, done = self._get_reward(self._curr_state, last_state) 82 # reward = self._problem.get_state_reward(self._curr_state) 83 return self._get_observation(), reward, done, None
84
85 - def get_all_states(self):
86 ''' 87 auxiliary function for MDPs - where states (map) are supposed to be known 88 :return: list of states 89 ''' 90 return self._problem.get_all_states()
91
92 - def reset(self):
93 self._set = True 94 self._gui_disabled = True 95 self._path = [] 96 self._visited = [] 97 self._problem.clear_player_data() 98 self._problem.set_player(self._player) 99 if self._gym_compatible: 100 self._path.append(self._problem.get_start_state()) 101 self._visited.append(self._problem.get_start_state()) 102 self._curr_state = self._problem.get_start_state() 103 return self._get_observation()
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!" 107 self._gui_disabled = False 108 if visited is None: 109 self._problem.set_visited(self._visited) 110 else: 111 self._problem.set_visited(self._visited) 112 if explored is None: 113 self._problem.set_explored([self._curr_state]) 114 else: 115 self._problem.set_explored(explored) 116 117 self._problem.show_and_break() 118 self._gui_on = True
119
120 - def close(self):
121 self._gui_disabled = True 122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed) 126 return [seed]
127
128 - def save_path(self):
129 ''' 130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script 131 runned from. 132 @return: None 133 ''' 134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first" 135 # at the moment it assumes the output directory exists 136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt") 137 with open(pathfname, 'wt') as f: 138 # then go backwards throught the path restored by bactracking 139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible: 140 for pos in self._path: 141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1])))) 142 if self._gym_compatible: 143 for pos in self._path: 144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
146 - def save_eps(self):
147 ''' 148 Save last rendered image into directory where the script was runned from. 149 @return: None 150 ''' 151 assert not self._gui_disabled, "render() must be called before save_eps" 152 self._problem.save_as_eps(self._gui_disabled)
153
154 - def visualise(self, dictionary=None):
155 ''' 156 Visualise input. If visualise is called before GUI opening, render() is called first 157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary: 158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar 159 or 4 dimensional vector (tuple or list). 160 @return: none 161 ''' 162 assert self._set, "reset() must be called before any visualisation setting!" 163 if self._gui_disabled: 164 self.render() 165 self._problem.visualise(dictionary)
166
167 - def _get_observation(self):
168 ''' 169 method to generate observation - current state, finish states 170 @return: tuple 171 ''' 172 if self._informed: 173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))] 174 for n in self._problem.get_goal_nodes(): 175 ret.append((n.x, n.y, self._get_depth(n))) 176 else: 177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)] 178 return tuple(ret)
179
180 - def _get_action_space(self):
181 ''' 182 method to get action space - all available actions in enviroment 183 @return: spaces 184 ''' 185 if self._gym_compatible: 186 return spaces.Discrete(4) 187 else: 188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
190 - def __get_reward_curr_state(self):
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
193 - def _get_reward(self, curr, last):
194 ''' 195 returns reward and indication of goal state 196 @param curr: new state 197 @param last: last state 198 @return: float, boolean 199 ''' 200 reward = -2 201 done = False 202 vector = [curr.x - last.x, curr.y - last.y] 203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 204 if curr != last: 205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis) 206 if self._problem.is_goal_state(curr): 207 reward = 100.0 208 done = True 209 if self._gym_compatible: 210 self._player.set_path(self._path) 211 self._player.find_path() 212 return reward, done
213
214 - def _get_depth(self, state):
215 ''' 216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0. 217 @param state: namedtuple state 218 @return: float 219 ''' 220 start = self._problem.get_start_state() 221 vector = [state.x - start.x, state.y - start.y] 222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1] 223 return float(format(ret, '.3f'))
224 225
226 -class EnvAgent(kuimaze.BaseAgent):
227 ''' 228 Class necessary for wrapping maze 229 ''' 230 __path = [] 231
232 - def set_path(self, path):
233 self.__path = path
234
235 - def find_path(self):
236 ''' 237 visualise path of the agent, path must be set before visualising! 238 @return: 239 ''' 240 ret = [] 241 for i in range(len(self.__path) - 1): 242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None)) 243 try: 244 self.problem.show_path(ret) 245 except: 246 pass # do nothing if no graphics is prepared 247 return self.__path
248 249
250 -class EasyMazeEnv(MazeEnv):
251 ''' 252 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to 253 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze. 254 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement. 255 ''' 256
257 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
258 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad) 259 self._gui_on = False
260
261 - def step(self, action):
262 last_state = self._curr_state 263 assert (type(action) == list or type(action) == tuple) and len(action) == 2 264 self._curr_state = self._easy_result(action) 265 if self._curr_state not in self._visited: 266 self._visited.append(self._curr_state) 267 reward, done = self._get_reward(self._curr_state, last_state) 268 return self._get_observation(), reward, done, None
269 270 # def render(self, mode='human', close=False): 271 # super(EasyMazeEnv, self).render(mode, close) 272 # self._gui_on = True 273
274 - def set_path(self, path):
275 ''' 276 This method sets enviroment to visualize your found path. Method render, must be called afterwards. 277 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ] 278 @return: None 279 ''' 280 ret = [] 281 self._path = path 282 if self._gui_on: 283 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3) 284 previus_state = None 285 for state_list in path: 286 if previus_state != None: 287 if (abs(state_list[0]-previus_state[0]) > 1 and abs(state_list[1]-previus_state[1]) > 1): 288 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be max. 2 ** (1/2)') 289 ret.append(state(state_list[0], state_list[1])) 290 previus_state = copy.copy(state_list) 291 292 self._player.set_path(ret) 293 self._player.find_path()
294
295 - def _is_available(self, new_state):
296 ''' 297 returns true if new state is available 298 @param new_state: 299 @return: boolean 300 ''' 301 tmp = [] 302 tmp.extend(self._visited) 303 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1), 304 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3), 305 self._problem.result(self._curr_state, 4), self._problem.result(self._curr_state, 5), 306 self._problem.result(self._curr_state, 6), self._problem.result(self._curr_state, 6)]) 307 return new_state in tmp
308
309 - def _easy_result(self, state_list):
310 ''' 311 Gives result of desired action in parameter 312 @param state_list: list or tuple of coordinates [x, y] 313 @return: state - new position of agent 314 ''' 315 new_state = state(state_list[0], state_list[1]) 316 if self._is_available(new_state): 317 return new_state 318 else: 319 # print('UNAVAILABLE ' + str(new_state) + ' from ' + str(self._curr_state)) 320 return self._curr_state
321
322 - def _get_cost(self, curr, last):
323 ''' 324 returns cost of movement from last to curr 325 @param curr: new state 326 @param last: last state 327 @return: float 328 ''' 329 reward = 0 330 vector = [curr.x - last.x, curr.y - last.y] 331 # TODO rewrite cost function 332 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 333 addition_cost = 0 334 if curr in self._problem.hard_places: 335 addition_cost = 5 336 337 if curr != last: 338 reward = pow(pow(vector[0],2) + pow(vector[1],2),1/2) + z_axis + addition_cost 339 return reward
340
341 - def expand(self,position):
342 ''' 343 returns tuple of positions with associated costs that can be visited from "position" 344 @param position: position in the maze defined by coordinates (x,y) 345 346 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ] 347 ''' 348 expanded_nodes = [] 349 maze_pose = state(position[0], position[1]) 350 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 351 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3), 352 self._problem.result(maze_pose, 4), self._problem.result(maze_pose, 5), 353 self._problem.result(maze_pose, 6), self._problem.result(maze_pose, 7)] 354 for new_state in tmp: 355 if new_state.x == maze_pose.x and new_state.y == maze_pose.y: 356 continue 357 if new_state not in self._visited: 358 self._visited.append(new_state) 359 reward = self._get_cost(maze_pose, new_state) 360 expanded_nodes.append([(new_state.x, new_state.y), reward]) 361 return expanded_nodes
362 363 364 365 366 ''' 367 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor. 368 Main method of wrapper is function step, which returns three values: 369 370 Observations: 371 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...) 372 For Uninformed only (current position coords) 373 374 Rewards: 375 When agent moves to different place it gets reward -1 - depth. 376 When agent reaches finish it gets reward +100. 377 If unavailible action is called, agent stays in same position and reward is 0. 378 379 Done: 380 True when agent reaches the finish. 381 382 Input (parameter) of step method is defined by action space: 383 Easy maze action space is list [x_coordinate, y_coordinate]. 384 Hard maze action space is integer from 0 to 3. 385 ''' 386 387
388 -class InfEasyMaze(EasyMazeEnv):
389 ''' 390 informed easy maze, suitable for A* implementation 391 step([x, y]) 392 '''
393 - def __init__(self, map_image=None, grad=(0, 0)):
394 super(InfEasyMaze, self).__init__(True, map_image, grad)
395 396
397 -class EasyMaze(EasyMazeEnv):
398 ''' 399 uninformed easy maze, suitable for BFS, DFS ... 400 step([x, y]) 401 '''
402 - def __init__(self, map_image=None, grad=(0, 0)):
403 super(EasyMaze, self).__init__(False, map_image, grad)
404 405
406 -class MDPMaze(MazeEnv):
407 ''' 408 maze for solving MDP problems 409 '''
410 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
411 if probs is not None: 412 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 413 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 414 else: 415 super().__init__(False, True, True, map_image, grad)
416
417 - def _get_reward(self, curr, last):
418 ''' 419 returns reward and indication of goal state 420 @param curr: new state 421 @param last: last state 422 @return: float, boolean 423 ''' 424 reward = self._problem.get_state_reward(curr) 425 done = False 426 vector = [curr.x - last.x, curr.y - last.y] 427 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 428 if curr != last: 429 reward = z_axis + self._problem.get_state_reward(last) 430 if self._problem.is_goal_state(curr): 431 reward = self._problem.get_state_reward(curr) 432 done = True 433 if self._gym_compatible: 434 self._player.set_path(self._path) 435 self._player.find_path() 436 return reward, done
437
438 - def get_actions(self, state):
439 return self._problem.get_actions(state)
440
441 - def is_goal_state(self, state):
442 return self._problem.is_goal_state(state)
443
444 - def is_terminal_state(self, state):
445 return self._problem.is_goal_state(state) or self._problem.is_danger_state(state)
446
447 - def get_next_states_and_probs(self, state, action):
448 return self._problem.get_next_states_and_probs(state, action)
449
450 - def get_state_reward(self,curr):
451 return self._problem.get_state_reward(curr)
452 453
454 -class HardMaze(MazeEnv):
455 ''' 456 Uninformed hard maze, suitable for reinforcement learning 457 step(param) where param is integer; 0 <= param <= 3 458 '''
459 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
460 if probs is not None: 461 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 462 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 463 else: 464 super(HardMaze, self).__init__(False, True, True, map_image, grad)
465
466 - def _get_reward(self, curr, last):
467 ''' 468 returns reward and indication of goal state 469 @param curr: new state 470 @param last: last state 471 @return: float, booleanloc 472 ''' 473 reward = self._problem.get_state_reward(last) # energy consumption 474 done = False 475 vector = [curr.x - last.x, curr.y - last.y] 476 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 477 if curr != last: 478 reward = reward - z_axis # more or less depending on the gradient; going up costs more 479 if self._problem.is_goal_state(curr): 480 reward = reward + self._problem.get_state_reward(curr) 481 done = True 482 if self._gym_compatible: 483 self._player.set_path(self._path) 484 self._player.find_path() 485 return reward, done
486
487 -class InfHardMaze(MazeEnv):
488 ''' 489 Informed hard maze, suitable for reinforcement learning 490 step(param) where param is integer; 0 <= param <= 3 491 '''
492 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
493 if probs is not None: 494 super(InfHardMaze, self).__init__(True, True, False, map_image, grad) 495 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 496 else: 497 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
498