Package kuimaze :: Module gym_wrapper
[hide private]
[frames] | no frames]

Source Code for Module kuimaze.gym_wrapper

  1  # -*- coding: utf-8 -*- 
  2   
  3  ''' 
  4  File wrapping maze.py functionality into few methods defined by OpenAI GYM 
  5  @author: Zdeněk Rozsypálek 
  6  @contact: rozsyzde(at)fel.cvut.cz 
  7  @copyright: (c) 2017 
  8  ''' 
  9   
 10  import collections 
 11  import os 
 12  import sys 
 13  import numpy as np 
 14  import gym 
 15  import copy 
 16  from gym import spaces 
 17  from gym.utils import seeding 
 18   
 19  import kuimaze 
 20  from .map_generator import maze as mapgen_maze 
 21   
 22  path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action']) 
 23  state = collections.namedtuple('State', ['x', 'y']) 
 24   
25 -class MazeEnv(gym.Env):
26 metadata = {'render.modes': ['human']} 27 _path = [] 28 _visited = [] 29 MAP = '../maps/easy/easy3.bmp' 30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0)):
32 ''' 33 Class wrapping Maze into gym enviroment. 34 @param informed: boolean 35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze 36 @param deter: boolean - T = deterministic maze, F = probabilistic maze 37 @param map_image_dir: string - path to image of map 38 @param grad: tuple - vector tuning the tilt of maze` 39 ''' 40 if map_image_dir is None: 41 ''' 42 If there is no map in parameter, it will be generated, with following setup 43 ''' 44 x_size = 6 45 y_size = 6 # not 100% accurate size, could have smaller dimensions 46 complexity = 0.1 # in interval (0, 1] 47 density = 0.25 # in interval [0, 1] 48 self.MAP = mapgen_maze(x_size, y_size, complexity, density) 49 else: 50 self.MAP = map_image_dir 51 if grad is None: 52 self._grad = (0, 0) 53 else: 54 self._grad = grad 55 self._problem = kuimaze.Maze(self.MAP, self._grad) 56 self._player = EnvAgent(self._problem) 57 self._curr_state = self._problem.get_start_state() 58 self._informed = informed 59 self._gym_compatible = gym_compatible 60 self._deter = deter 61 self._gui_disabled = True 62 self._set = False 63 # set action and observation space 64 self._xsize = self._problem.get_dimensions()[0] 65 self._ysize = self._problem.get_dimensions()[1] 66 self.action_space = self._get_action_space() 67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))) 68 self.seed() 69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!" 73 last_state = self._curr_state 74 assert(0 <= action <= 3) 75 if not self._deter: 76 action = self._problem.non_det_result(action) 77 self._curr_state = self._problem.result(self._curr_state, action) 78 self._path.append(self._curr_state) 79 if self._curr_state not in self._visited: 80 self._visited.append(self._curr_state) 81 reward, done = self._get_reward(self._curr_state, last_state) 82 return self._get_observation(), reward, done, None
83
84 - def reset(self):
85 self._set = True 86 self._gui_disabled = True 87 self._path = [] 88 self._visited = [] 89 self._problem.clear_player_data() 90 self._problem.set_player(self._player) 91 if self._gym_compatible: 92 self._path.append(self._problem.get_start_state()) 93 self._visited.append(self._problem.get_start_state()) 94 self._curr_state = self._problem.get_start_state() 95 return self._get_observation()
96
97 - def render(self, mode='human', close=False):
98 assert self._set, "reset() must be called first!" 99 self._gui_disabled = False 100 self._problem.set_visited(self._visited) 101 self._problem.set_explored([self._curr_state]) 102 self._problem.show_and_break()
103
104 - def close(self):
105 self._gui_disabled = True 106 self._problem.close_gui()
107
108 - def seed(self, seed=None):
109 self.np_random, seed = seeding.np_random(seed) 110 return [seed]
111
112 - def save_path(self):
113 ''' 114 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script 115 runned from. 116 @return: None 117 ''' 118 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first" 119 # at the moment it assumes the output directory exists 120 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt") 121 with open(pathfname, 'wt') as f: 122 # then go backwards throught the path restored by bactracking 123 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible: 124 for pos in self._path: 125 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1])))) 126 if self._gym_compatible: 127 for pos in self._path: 128 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
129
130 - def save_eps(self):
131 ''' 132 Save last rendered image into directory where the script was runned from. 133 @return: None 134 ''' 135 assert not self._gui_disabled, "render() must be called before save_eps" 136 self._problem.save_as_eps(self._gui_disabled)
137
138 - def visualise(self, dictionary=None):
139 ''' 140 Visualise input. If visualise is called before GUI opening, render() is called first 141 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary: 142 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar 143 or 4 dimensional vector (tuple or list). 144 @return: none 145 ''' 146 assert self._set, "reset() must be called before any visualisation setting!" 147 if self._gui_disabled: 148 self.render() 149 self._problem.visualise(dictionary)
150
151 - def _get_observation(self):
152 ''' 153 method to generate observation - current state, finish states 154 @return: tuple 155 ''' 156 if self._informed: 157 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))] 158 for n in self._problem.get_goal_nodes(): 159 ret.append((n.x, n.y, self._get_depth(n))) 160 else: 161 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)] 162 return tuple(ret)
163
164 - def _get_action_space(self):
165 ''' 166 method to get action space - all available actions in enviroment 167 @return: spaces 168 ''' 169 if self._gym_compatible: 170 return spaces.Discrete(4) 171 else: 172 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
173
174 - def _get_reward(self, curr, last):
175 ''' 176 returns reward and indication of goal state 177 @param curr: new state 178 @param last: last state 179 @return: float, boolean 180 ''' 181 reward = -2 182 done = False 183 vector = [curr.x - last.x, curr.y - last.y] 184 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 185 if curr != last: 186 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis) 187 if self._problem.is_goal_state(curr): 188 reward = 100.0 189 done = True 190 if self._gym_compatible: 191 self._player.set_path(self._path) 192 self._player.find_path() 193 return reward, done
194
195 - def _get_depth(self, state):
196 ''' 197 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0. 198 @param state: namedtuple state 199 @return: float 200 ''' 201 start = self._problem.get_start_state() 202 vector = [state.x - start.x, state.y - start.y] 203 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1] 204 return float(format(ret, '.3f'))
205 206
207 -class EnvAgent(kuimaze.BaseAgent):
208 ''' 209 Class necessary for wrapping maze 210 ''' 211 __path = [] 212
213 - def set_path(self, path):
214 self.__path = path
215
216 - def find_path(self):
217 ''' 218 visualise path of the agent, path must be set before visualising! 219 @return: 220 ''' 221 ret = [] 222 for i in range(len(self.__path) - 1): 223 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None)) 224 self.problem.show_path(ret) 225 return self.__path
226 227
228 -class EasyMazeEnv(MazeEnv):
229 ''' 230 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to 231 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze. 232 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement. 233 ''' 234
235 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
236 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad) 237 self._gui_on = False
238
239 - def step(self, action):
240 last_state = self._curr_state 241 assert (type(action) == list or type(action) == tuple) and len(action) == 2 242 self._curr_state = self._easy_result(action) 243 if self._curr_state not in self._visited: 244 self._visited.append(self._curr_state) 245 reward, done = self._get_reward(self._curr_state, last_state) 246 return self._get_observation(), reward, done, None
247
248 - def render(self, mode='human', close=False):
249 super(EasyMazeEnv, self).render(mode, close) 250 self._gui_on = True
251
252 - def set_path(self, path):
253 ''' 254 This method sets enviroment to visualize your found path. Method render, must be called afterwards. 255 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ] 256 @return: None 257 ''' 258 ret = [] 259 self._path = path 260 if self._gui_on: 261 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3) 262 previus_state = None 263 for state_list in path: 264 if previus_state != None: 265 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1): 266 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1') 267 ret.append(state(state_list[0], state_list[1])) 268 previus_state = copy.copy(state_list) 269 270 self._player.set_path(ret) 271 self._player.find_path()
272
273 - def _is_available(self, new_state):
274 ''' 275 returns true if new state is available 276 @param new_state: 277 @return: boolean 278 ''' 279 tmp = [] 280 tmp.extend(self._visited) 281 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1), 282 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)]) 283 return new_state in tmp
284
285 - def _easy_result(self, state_list):
286 ''' 287 Gives result of desired action in parameter 288 @param state_list: list or tuple of coordinates [x, y] 289 @return: state - new position of agent 290 ''' 291 new_state = state(state_list[0], state_list[1]) 292 if self._is_available(new_state): 293 return new_state 294 else: 295 # print('UNAVAILABLE ' + str(new_state) + ' from ' + str(self._curr_state)) 296 return self._curr_state
297
298 - def _get_cost(self, curr, last):
299 ''' 300 returns cost of movement from last to curr 301 @param curr: new state 302 @param last: last state 303 @return: float 304 ''' 305 reward = 0 306 vector = [curr.x - last.x, curr.y - last.y] 307 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1] 308 if curr != last: 309 reward = abs(vector[0]) + abs(vector[1]) + z_axis 310 return reward
311
312 - def expand(self,position):
313 ''' 314 returns tuple of positions with associated costs that can be visited from "position" 315 @param position: position in the maze defined by coordinates (x,y) 316 317 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ] 318 ''' 319 expanded_nodes = [] 320 maze_pose = state(position[0], position[1]) 321 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1), 322 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)] 323 for new_state in tmp: 324 if new_state.x == maze_pose.x and new_state.y == maze_pose.y: 325 continue 326 if new_state not in self._visited: 327 self._visited.append(new_state) 328 reward = self._get_cost(maze_pose, new_state) 329 expanded_nodes.append([(new_state.x, new_state.y), reward]) 330 return expanded_nodes
331 332 333 334 335 ''' 336 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor. 337 Main method of wrapper is function step, which returns three values: 338 339 Observations: 340 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...) 341 For Uninformed only (current position coords) 342 343 Rewards: 344 When agent moves to different place it gets reward -1 - depth. 345 When agent reaches finish it gets reward +100. 346 If unavailible action is called, agent stays in same position and reward is 0. 347 348 Done: 349 True when agent reaches the finish. 350 351 Input (parameter) of step method is defined by action space: 352 Easy maze action space is list [x_coordinate, y_coordinate]. 353 Hard maze action space is integer from 0 to 3. 354 ''' 355 356
357 -class InfEasyMaze(EasyMazeEnv):
358 ''' 359 informed easy maze, suitable for A* implementation 360 step([x, y]) 361 '''
362 - def __init__(self, map_image=None, grad=(0, 0)):
363 super(InfEasyMaze, self).__init__(True, map_image, grad)
364 365
366 -class EasyMaze(EasyMazeEnv):
367 ''' 368 uninformed easy maze, suitable for BFS, DFS ... 369 step([x, y]) 370 '''
371 - def __init__(self, map_image=None, grad=(0, 0)):
372 super(EasyMaze, self).__init__(False, map_image, grad)
373 374
375 -class HardMaze(MazeEnv):
376 ''' 377 Uninformed hard maze, suitable for reinforcement learning 378 step(param) where param is integer; 0 <= param <= 3 379 '''
380 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
381 if probs is not None: 382 super(HardMaze, self).__init__(False, True, False, map_image, grad) 383 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 384 else: 385 super(HardMaze, self).__init__(False, True, True, map_image, grad)
386 387
388 -class InfHardMaze(MazeEnv):
389 ''' 390 Informed hard maze, suitable for reinforcement learning 391 step(param) where param is integer; 0 <= param <= 3 392 '''
393 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
394 if probs is not None: 395 super(InfHardMaze, self).__init__(True, True, False, map_image, grad) 396 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3]) # set probabilities here 397 else: 398 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
399