1
2
3 '''
4 File wrapping maze.py functionality into few methods defined by OpenAI GYM
5 @author: Zdeněk Rozsypálek
6 @contact: rozsyzde(at)fel.cvut.cz
7 @copyright: (c) 2017
8 '''
9
10 import collections
11 import os
12 import sys
13 import numpy as np
14 import gym
15 import copy
16 from gym import spaces
17 from gym.utils import seeding
18
19 import kuimaze
20 from .map_generator import maze as mapgen_maze
21
22 path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action'])
23 state = collections.namedtuple('State', ['x', 'y'])
24
26 metadata = {'render.modes': ['human']}
27 _path = []
28 _visited = []
29 MAP = '../maps/easy/easy3.bmp'
30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0)):
32 '''
33 Class wrapping Maze into gym enviroment.
34 @param informed: boolean
35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze
36 @param deter: boolean - T = deterministic maze, F = probabilistic maze
37 @param map_image_dir: string - path to image of map
38 @param grad: tuple - vector tuning the tilt of maze`
39 '''
40 if map_image_dir is None:
41 '''
42 If there is no map in parameter, it will be generated, with following setup
43 '''
44 x_size = 6
45 y_size = 6
46 complexity = 0.1
47 density = 0.25
48 self.MAP = mapgen_maze(x_size, y_size, complexity, density)
49 else:
50 self.MAP = map_image_dir
51 if grad is None:
52 self._grad = (0, 0)
53 else:
54 self._grad = grad
55 self._problem = kuimaze.Maze(self.MAP, self._grad)
56 self._player = EnvAgent(self._problem)
57 self._curr_state = self._problem.get_start_state()
58 self._informed = informed
59 self._gym_compatible = gym_compatible
60 self._deter = deter
61 self._gui_disabled = True
62 self._set = False
63
64 self._xsize = self._problem.get_dimensions()[0]
65 self._ysize = self._problem.get_dimensions()[1]
66 self.action_space = self._get_action_space()
67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize)))
68 self.seed()
69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!"
73 last_state = self._curr_state
74 assert(0 <= action <= 3)
75 if not self._deter:
76 action = self._problem.non_det_result(action)
77 self._curr_state = self._problem.result(self._curr_state, action)
78 self._path.append(self._curr_state)
79 if self._curr_state not in self._visited:
80 self._visited.append(self._curr_state)
81 reward, done = self._get_reward(self._curr_state, last_state)
82 return self._get_observation(), reward, done, None
83
96
97 - def render(self, mode='human', close=False):
103
105 self._gui_disabled = True
106 self._problem.close_gui()
107
108 - def seed(self, seed=None):
109 self.np_random, seed = seeding.np_random(seed)
110 return [seed]
111
113 '''
114 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script
115 runned from.
116 @return: None
117 '''
118 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first"
119
120 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt")
121 with open(pathfname, 'wt') as f:
122
123 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible:
124 for pos in self._path:
125 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1]))))
126 if self._gym_compatible:
127 for pos in self._path:
128 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
129
131 '''
132 Save last rendered image into directory where the script was runned from.
133 @return: None
134 '''
135 assert not self._gui_disabled, "render() must be called before save_eps"
136 self._problem.save_as_eps(self._gui_disabled)
137
139 '''
140 Visualise input. If visualise is called before GUI opening, render() is called first
141 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary:
142 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar
143 or 4 dimensional vector (tuple or list).
144 @return: none
145 '''
146 assert self._set, "reset() must be called before any visualisation setting!"
147 if self._gui_disabled:
148 self.render()
149 self._problem.visualise(dictionary)
150
152 '''
153 method to generate observation - current state, finish states
154 @return: tuple
155 '''
156 if self._informed:
157 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))]
158 for n in self._problem.get_goal_nodes():
159 ret.append((n.x, n.y, self._get_depth(n)))
160 else:
161 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)]
162 return tuple(ret)
163
165 '''
166 method to get action space - all available actions in enviroment
167 @return: spaces
168 '''
169 if self._gym_compatible:
170 return spaces.Discrete(4)
171 else:
172 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
173
175 '''
176 returns reward and indication of goal state
177 @param curr: new state
178 @param last: last state
179 @return: float, boolean
180 '''
181 reward = -2
182 done = False
183 vector = [curr.x - last.x, curr.y - last.y]
184 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
185 if curr != last:
186 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis)
187 if self._problem.is_goal_state(curr):
188 reward = 100.0
189 done = True
190 if self._gym_compatible:
191 self._player.set_path(self._path)
192 self._player.find_path()
193 return reward, done
194
196 '''
197 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0.
198 @param state: namedtuple state
199 @return: float
200 '''
201 start = self._problem.get_start_state()
202 vector = [state.x - start.x, state.y - start.y]
203 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1]
204 return float(format(ret, '.3f'))
205
206
208 '''
209 Class necessary for wrapping maze
210 '''
211 __path = []
212
215
217 '''
218 visualise path of the agent, path must be set before visualising!
219 @return:
220 '''
221 ret = []
222 for i in range(len(self.__path) - 1):
223 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None))
224 self.problem.show_path(ret)
225 return self.__path
226
227
229 '''
230 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to
231 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze.
232 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement.
233 '''
234
235 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
236 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad)
237 self._gui_on = False
238
239 - def step(self, action):
240 last_state = self._curr_state
241 assert (type(action) == list or type(action) == tuple) and len(action) == 2
242 self._curr_state = self._easy_result(action)
243 if self._curr_state not in self._visited:
244 self._visited.append(self._curr_state)
245 reward, done = self._get_reward(self._curr_state, last_state)
246 return self._get_observation(), reward, done, None
247
248 - def render(self, mode='human', close=False):
251
253 '''
254 This method sets enviroment to visualize your found path. Method render, must be called afterwards.
255 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ]
256 @return: None
257 '''
258 ret = []
259 self._path = path
260 if self._gui_on:
261 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3)
262 previus_state = None
263 for state_list in path:
264 if previus_state != None:
265 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1):
266 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1')
267 ret.append(state(state_list[0], state_list[1]))
268 previus_state = copy.copy(state_list)
269
270 self._player.set_path(ret)
271 self._player.find_path()
272
274 '''
275 returns true if new state is available
276 @param new_state:
277 @return: boolean
278 '''
279 tmp = []
280 tmp.extend(self._visited)
281 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1),
282 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)])
283 return new_state in tmp
284
286 '''
287 Gives result of desired action in parameter
288 @param state_list: list or tuple of coordinates [x, y]
289 @return: state - new position of agent
290 '''
291 new_state = state(state_list[0], state_list[1])
292 if self._is_available(new_state):
293 return new_state
294 else:
295
296 return self._curr_state
297
299 '''
300 returns cost of movement from last to curr
301 @param curr: new state
302 @param last: last state
303 @return: float
304 '''
305 reward = 0
306 vector = [curr.x - last.x, curr.y - last.y]
307 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
308 if curr != last:
309 reward = abs(vector[0]) + abs(vector[1]) + z_axis
310 return reward
311
313 '''
314 returns tuple of positions with associated costs that can be visited from "position"
315 @param position: position in the maze defined by coordinates (x,y)
316
317 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ]
318 '''
319 expanded_nodes = []
320 maze_pose = state(position[0], position[1])
321 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
322 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)]
323 for new_state in tmp:
324 if new_state.x == maze_pose.x and new_state.y == maze_pose.y:
325 continue
326 if new_state not in self._visited:
327 self._visited.append(new_state)
328 reward = self._get_cost(maze_pose, new_state)
329 expanded_nodes.append([(new_state.x, new_state.y), reward])
330 return expanded_nodes
331
332
333
334
335 '''
336 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor.
337 Main method of wrapper is function step, which returns three values:
338
339 Observations:
340 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...)
341 For Uninformed only (current position coords)
342
343 Rewards:
344 When agent moves to different place it gets reward -1 - depth.
345 When agent reaches finish it gets reward +100.
346 If unavailible action is called, agent stays in same position and reward is 0.
347
348 Done:
349 True when agent reaches the finish.
350
351 Input (parameter) of step method is defined by action space:
352 Easy maze action space is list [x_coordinate, y_coordinate].
353 Hard maze action space is integer from 0 to 3.
354 '''
355
356
358 '''
359 informed easy maze, suitable for A* implementation
360 step([x, y])
361 '''
362 - def __init__(self, map_image=None, grad=(0, 0)):
364
365
367 '''
368 uninformed easy maze, suitable for BFS, DFS ...
369 step([x, y])
370 '''
371 - def __init__(self, map_image=None, grad=(0, 0)):
373
374
376 '''
377 Uninformed hard maze, suitable for reinforcement learning
378 step(param) where param is integer; 0 <= param <= 3
379 '''
380 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
381 if probs is not None:
382 super(HardMaze, self).__init__(False, True, False, map_image, grad)
383 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
384 else:
385 super(HardMaze, self).__init__(False, True, True, map_image, grad)
386
387
389 '''
390 Informed hard maze, suitable for reinforcement learning
391 step(param) where param is integer; 0 <= param <= 3
392 '''
393 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
394 if probs is not None:
395 super(InfHardMaze, self).__init__(True, True, False, map_image, grad)
396 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
397 else:
398 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
399