Package kuimaze :: Module gym_wrapper
[hide private]
[frames] | no frames]

Source Code for Module kuimaze.gym_wrapper

  1  # -*- coding: utf-8 -*- 
  2   
  3  ''' 
  4  File wrapping maze.py functionality into few methods defined by OpenAI GYM 
  5  @author: Zdeněk Rozsypálek, Tomas Svoboda 
  6  @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz 
  7  @copyright: (c) 2017, 2018 
  8  ''' 
  9   
 10  import collections 
 11  import os 
 12  import sys 
 13  import numpy as np 
 14  import gym 
 15  import copy 
 16  from gym import spaces 
 17  from gym.utils import seeding 
 18   
 19  import kuimaze 
 20  from .map_generator import maze as mapgen_maze 
 21   
 22  EIGHT_DIRS = False # can robot/agent go in 8 directions? If not, 4 neighbourhood is considered 
 23   
 24  path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action']) 
 25  state = collections.namedtuple('State', ['x', 'y']) 
 26   
27 -class MazeEnv(gym.Env):
28 metadata = {'render.modes': ['human']} 29 _path = [] 30 _visited = [] 31 MAP = '../maps/easy/easy3.bmp' 32
33 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
34 ''' 35 Class wrapping Maze into gym enviroment. 36 @param informed: boolean 37 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze 38 @param deter: boolean - T = deterministic maze, F = probabilistic maze 39 @param map_image_dir: string - path to image of map 40 @param grad: tuple - vector tuning the tilt of maze` 41 ''' 42 if map_image_dir is None: 43 ''' 44 If there is no map in parameter, it will be generated, with following setup 45 ''' 46 x_size = 6 47 y_size = 6 # not 100% accurate size, could have smaller dimensions 48 complexity = 0.1 # in interval (0, 1] 49 density = 0.25 # in interval [0, 1] 50 self.MAP = mapgen_maze(x_size, y_size, complexity, density) 51 else: 52 self.MAP = map_image_dir 53 if grad is None: 54 self._grad = (0, 0) 55 else: 56 self._grad = grad 57 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards) 58 self._player = EnvAgent(self._problem) 59 self._curr_state = self._problem.get_start_state() 60 self._informed = informed 61 self._gym_compatible = gym_compatible 62 self._deter = deter 63 self._gui_disabled = True 64 self._set = False 65 # set action and observation space 66 self._xsize = self._problem.get_dimensions()[0] 67 self._ysize = self._problem.get_dimensions()[1] 68 self.action_space = self._get_action_space() 69 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))) 70 self.seed() 71 self.reset()
72
73 - def step(self, action):
74 assert self._set, "reset() must be called first!" 75 last_state = self._curr_state 76 assert(0 <= action <= 3) 77 if not self._deter: 78 action = self._problem.non_det_result(action) 79 self._curr_state = self._problem.result(self._curr_state, action) 80 self._path.append(self._curr_state) 81 if self._curr_state not in self._visited: 82 self._visited.append(self._curr_state) 83 reward, done = self._get_reward(self._curr_state, last_state) 84 # reward = self._problem.get_state_reward(self._curr_state) 85 return self._get_observation(), reward, done, None
86
87 - def get_all_states(self):
88 ''' 89 auxiliary function for MDPs - where states (map) are supposed to be known 90 :return: list of states 91 ''' 92 return self._problem.get_all_states()
93
94 - def reset(self):
95 self._set = True 96 self._gui_disabled = True 97 self._path = [] 98 self._visited = [] 99 self._problem.clear_player_data() 100 self._problem.set_player(self._player) 101 if self._gym_compatible: 102 self._path.append(self._problem.get_start_state()) 103 self._visited.append(self._problem.get_start_state()) 104 self._curr_state = self._problem.get_start_state() 105 return self._get_observation()
106
107 - def render(self, mode='human', close=False, visited=None, explored=None):
108 assert self._set, "reset() must be called first!" 109 self._gui_disabled = False 110 if visited is None: 111 self._problem.set_visited(self._visited) 112 else: 113 self._problem.set_visited(self._visited) 114 if explored is None: 115 self._problem.set_explored([self._curr_state]) 116 else: 117 self._problem.set_explored(explored) 118 119 self._problem.show_and_break() 120 self._gui_on = True
121
122 - def close(self):
123 self._gui_disabled = True 124 self._problem.close_gui()
125
126 - def seed(self, seed=None):
127 self.np_random, seed = seeding.np_random(seed) 128 return [seed]
129
130 - def save_path(self):
131 ''' 132 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script 133 runned from. 134 @return: None 135 ''' 136 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first" 137 # at the moment it assumes the output directory exists 138 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt") 139 with open(pathfname, 'wt') as f: 140 # then go backwards throught the path restored by bactracking 141 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible: 142 for pos in self._path: 143 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1])))) 144 if self._gym_compatible: 145 for pos in self._path: 146 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
147
148 - def save_eps(self):
149 ''' 150 Save last rendered image into directory where the script was runned from. 151 @return: None 152 ''' 153 assert not self._gui_disabled, "render() must be called before save_eps" 154 self._problem.save_as_eps(self._gui_disabled)
155
156 - def visualise(self, dictionary=None):
157 ''' 158 Visualise input. If visualise is called before GUI opening, render() is called first 159 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary: 160 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar 161 or 4 dimensional vector (tuple or list). 162 @return: none 163 ''' 164 assert self._set, "reset() must be called before any visualisation setting!" 165 if self._gui_disabled: 166 self.render() 167 self._problem.visualise(dictionary)
168
169 - def _get_observation(self):
170 ''' 171 method to generate observation - current state, finish states 172 @return: tuple 173 ''' 174 if self._informed: 175 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))] 176 for n in self._problem.get_goal_nodes(): 177 ret.append((n.x, n.y, self._get_depth(n))) 178 else: 179 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)] 180 return tuple(ret)
181
182 - def _get_action_space(self):
183 ''' 184 method to get action space - all available actions in enviroment 185 @return: spaces 186 ''' 187 if self._gym_compatible: 188 return spaces.Discrete(4) 189 else: 190 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
191
192 - def __get_reward_curr_state(self):
193 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
194
195 - def _get_reward(self, curr, last):
196 ''' 197 returns reward and indication of goal state 198 @param curr: new state 199 @param last: last state 200 @return: float, boolean 201 ''' 202 reward = -2 203 done = False 204 vector = [curr.x - last.x, curr.y - last.y] 205 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 206 if curr != last: 207 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis) 208 if self._problem.is_goal_state(curr): 209 reward = 100.0 210 done = True 211 if self._gym_compatible: 212 self._player.set_path(self._path) 213 self._player.find_path() 214 return reward, done
215
216 - def _get_depth(self, state):
217 ''' 218 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0. 219 @param state: namedtuple state 220 @return: float 221 ''' 222 start = self._problem.get_start_state() 223 vector = [state.x - start.x, state.y - start.y] 224 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1] 225 return float(format(ret, '.3f'))
226 227
228 -class EnvAgent(kuimaze.BaseAgent):
229 ''' 230 Class necessary for wrapping maze 231 ''' 232 __path = [] 233
234 - def set_path(self, path):
235 self.__path = path
236
237 - def find_path(self):
238 ''' 239 visualise path of the agent, path must be set before visualising! 240 @return: 241 ''' 242 ret = [] 243 for i in range(len(self.__path) - 1): 244 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None)) 245 try: 246 self.problem.show_path(ret) 247 except: 248 pass # do nothing if no graphics is prepared 249 return self.__path
250 251
252 -class EasyMazeEnv(MazeEnv):
253 ''' 254 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to 255 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze. 256 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement. 257 ''' 258
259 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
260 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad) 261 self._gui_on = False
262
263 - def step(self, action):
264 last_state = self._curr_state 265 assert (type(action) == list or type(action) == tuple) and len(action) == 2 266 self._curr_state = self._easy_result(action) 267 if self._curr_state not in self._visited: 268 self._visited.append(self._curr_state) 269 reward, done = self._get_reward(self._curr_state, last_state) 270 return self._get_observation(), reward, done, None
271 272 # def render(self, mode='human', close=False): 273 # super(EasyMazeEnv, self).render(mode, close) 274 # self._gui_on = True 275
276 - def set_path(self, path):
277 ''' 278 This method sets enviroment to visualize your found path. Method render, must be called afterwards. 279 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ] 280 @return: None 281 ''' 282 ret = [] 283 self._path = path 284 if self._gui_on: 285 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3) 286 previus_state = None 287 for state_list in path: 288 if previus_state != None: 289 if (abs(state_list[0]-previus_state[0]) > 1 and abs(state_list[1]-previus_state[1]) > 1): 290 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be max. 2 ** (1/2)') 291 ret.append(state(state_list[0], state_list[1])) 292 previus_state = copy.copy(state_list) 293 294 self._player.set_path(ret) 295 self._player.find_path()
296
297 - def _is_available(self, new_state):
298 ''' 299 returns true if new state is available 300 @param new_state: 301 @return: boolean 302 ''' 303 tmp = [] 304 tmp.extend(self._visited) 305 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1), 306 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3), 307 self._problem.result(self._curr_state, 4), self._problem.result(self._curr_state, 5), 308 self._problem.result(self._curr_state, 6), self._problem.result(self._curr_state, 6)]) 309 return new_state in tmp
310
311 - def _easy_result(self, state_list):
312 ''' 313 Gives result of desired action in parameter 314 @param state_list: list or tuple of coordinates [x, y] 315 @return: state - new position of agent 316 ''' 317 new_state = state(state_list[0], state_list[1]) 318 if self._is_available(new_state): 319 return new_state 320 else: 321 # print('UNAVAILABLE ' + str(new_state) + ' from ' + str(self._curr_state)) 322 return self._curr_state
323
324 - def _get_cost(self, curr, last):
325 ''' 326 returns cost of movement from last to curr 327 @param curr: new state 328 @param last: last state 329 @return: float 330 ''' 331 reward = 0 332 vector = [curr.x - last.x, curr.y - last.y] 333 # TODO rewrite cost function 334 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 335 addition_cost = 0 336 if curr in self._problem.hard_places: 337 addition_cost = 5 338 339 if curr != last: 340 reward = pow(pow(vector[0],2) + pow(vector[1],2),1/2) + z_axis + addition_cost 341 return reward
342
343 - def expand(self,position):
344 ''' 345 returns tuple of positions with associated costs that can be visited from "position" 346 @param position: position in the maze defined by coordinates (x,y) 347 348 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ] 349 ''' 350 expanded_nodes = [] 351 maze_pose = state(position[0], position[1]) 352 if EIGHT_DIRS: 353 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 354 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3), 355 self._problem.result(maze_pose, 4), self._problem.result(maze_pose, 5), 356 self._problem.result(maze_pose, 6), self._problem.result(maze_pose, 7)] 357 else: 358 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 359 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)] 360 for new_state in tmp: 361 if new_state.x == maze_pose.x and new_state.y == maze_pose.y: 362 continue 363 if new_state not in self._visited: 364 self._visited.append(new_state) 365 reward = self._get_cost(maze_pose, new_state) 366 expanded_nodes.append([(new_state.x, new_state.y), reward]) 367 return expanded_nodes
368 369 370 371 372 ''' 373 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor. 374 Main method of wrapper is function step, which returns three values: 375 376 Observations: 377 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...) 378 For Uninformed only (current position coords) 379 380 Rewards: 381 When agent moves to different place it gets reward -1 - depth. 382 When agent reaches finish it gets reward +100. 383 If unavailible action is called, agent stays in same position and reward is 0. 384 385 Done: 386 True when agent reaches the finish. 387 388 Input (parameter) of step method is defined by action space: 389 Easy maze action space is list [x_coordinate, y_coordinate]. 390 Hard maze action space is integer from 0 to 3. 391 ''' 392 393
394 -class InfEasyMaze(EasyMazeEnv):
395 ''' 396 informed easy maze, suitable for A* implementation 397 step([x, y]) 398 '''
399 - def __init__(self, map_image=None, grad=(0, 0)):
400 super(InfEasyMaze, self).__init__(True, map_image, grad)
401 402
403 -class EasyMaze(EasyMazeEnv):
404 ''' 405 uninformed easy maze, suitable for BFS, DFS ... 406 step([x, y]) 407 '''
408 - def __init__(self, map_image=None, grad=(0, 0)):
409 super(EasyMaze, self).__init__(False, map_image, grad)
410 411
412 -class MDPMaze(MazeEnv):
413 ''' 414 maze for solving MDP problems 415 '''
416 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
417 if probs is not None: 418 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 419 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 420 else: 421 super().__init__(False, True, True, map_image, grad)
422
423 - def _get_reward(self, curr, last):
424 ''' 425 returns reward and indication of goal state 426 @param curr: new state 427 @param last: last state 428 @return: float, boolean 429 ''' 430 reward = self._problem.get_state_reward(curr) 431 done = False 432 vector = [curr.x - last.x, curr.y - last.y] 433 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 434 if curr != last: 435 reward = z_axis + self._problem.get_state_reward(last) 436 if self._problem.is_goal_state(curr): 437 reward = self._problem.get_state_reward(curr) 438 done = True 439 if self._gym_compatible: 440 self._player.set_path(self._path) 441 self._player.find_path() 442 return reward, done
443
444 - def get_actions(self, state):
445 return self._problem.get_actions(state)
446
447 - def is_goal_state(self, state):
448 return self._problem.is_goal_state(state)
449
450 - def is_terminal_state(self, state):
451 return self._problem.is_goal_state(state) or self._problem.is_danger_state(state)
452
453 - def get_next_states_and_probs(self, state, action):
454 return self._problem.get_next_states_and_probs(state, action)
455
456 - def get_state_reward(self,curr):
457 return self._problem.get_state_reward(curr)
458 459
460 -class HardMaze(MazeEnv):
461 ''' 462 Uninformed hard maze, suitable for reinforcement learning 463 step(param) where param is integer; 0 <= param <= 3 464 '''
465 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
466 if probs is not None: 467 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards) 468 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 469 else: 470 super(HardMaze, self).__init__(False, True, True, map_image, grad)
471
472 - def _get_reward(self, curr, last):
473 ''' 474 returns reward and indication of goal state 475 @param curr: new state 476 @param last: last state 477 @return: float, booleanloc 478 ''' 479 reward = self._problem.get_state_reward(last) # energy consumption 480 done = False 481 vector = [curr.x - last.x, curr.y - last.y] 482 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 483 if curr != last: 484 reward = reward - z_axis # more or less depending on the gradient; going up costs more 485 if self._problem.is_goal_state(curr): 486 reward = reward + self._problem.get_state_reward(curr) 487 done = True 488 if self._gym_compatible: 489 self._player.set_path(self._path) 490 self._player.find_path() 491 return reward, done
492
493 -class InfHardMaze(MazeEnv):
494 ''' 495 Informed hard maze, suitable for reinforcement learning 496 step(param) where param is integer; 0 <= param <= 3 497 '''
498 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
499 if probs is not None: 500 super(InfHardMaze, self).__init__(True, True, False, map_image, grad) 501 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 502 else: 503 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
504