1
2
3 '''
4 File wrapping maze.py functionality into few methods defined by OpenAI GYM
5 @author: Zdeněk Rozsypálek, Tomas Svoboda
6 @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz
7 @copyright: (c) 2017, 2018
8 '''
9
10 import collections
11 import os
12 import sys
13 import numpy as np
14 import gym
15 import copy
16 from gym import spaces
17 from gym.utils import seeding
18
19 import kuimaze
20 from .map_generator import maze as mapgen_maze
21
22 path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action'])
23 state = collections.namedtuple('State', ['x', 'y'])
24
26 metadata = {'render.modes': ['human']}
27 _path = []
28 _visited = []
29 MAP = '../maps/easy/easy3.bmp'
30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 '''
33 Class wrapping Maze into gym enviroment.
34 @param informed: boolean
35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze
36 @param deter: boolean - T = deterministic maze, F = probabilistic maze
37 @param map_image_dir: string - path to image of map
38 @param grad: tuple - vector tuning the tilt of maze`
39 '''
40 if map_image_dir is None:
41 '''
42 If there is no map in parameter, it will be generated, with following setup
43 '''
44 x_size = 6
45 y_size = 6
46 complexity = 0.1
47 density = 0.25
48 self.MAP = mapgen_maze(x_size, y_size, complexity, density)
49 else:
50 self.MAP = map_image_dir
51 if grad is None:
52 self._grad = (0, 0)
53 else:
54 self._grad = grad
55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards)
56 self._player = EnvAgent(self._problem)
57 self._curr_state = self._problem.get_start_state()
58 self._informed = informed
59 self._gym_compatible = gym_compatible
60 self._deter = deter
61 self._gui_disabled = True
62 self._set = False
63
64 self._xsize = self._problem.get_dimensions()[0]
65 self._ysize = self._problem.get_dimensions()[1]
66 self.action_space = self._get_action_space()
67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize)))
68 self.seed()
69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!"
73 last_state = self._curr_state
74 assert(0 <= action <= 3)
75 if not self._deter:
76 action = self._problem.non_det_result(action)
77 self._curr_state = self._problem.result(self._curr_state, action)
78 self._path.append(self._curr_state)
79 if self._curr_state not in self._visited:
80 self._visited.append(self._curr_state)
81 reward, done = self._get_reward(self._curr_state, last_state)
82
83 return self._get_observation(), reward, done, None
84
86 '''
87 auxiliary function for MDPs - where states (map) are supposed to be known
88 :return: list of states
89 '''
90 return self._problem.get_all_states()
91
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!"
107 self._gui_disabled = False
108 if visited is None:
109 self._problem.set_visited(self._visited)
110 else:
111 self._problem.set_visited(self._visited)
112 if explored is None:
113 self._problem.set_explored([self._curr_state])
114 else:
115 self._problem.set_explored(explored)
116
117 self._problem.show_and_break()
118 self._gui_on = True
119
121 self._gui_disabled = True
122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed)
126 return [seed]
127
129 '''
130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script
131 runned from.
132 @return: None
133 '''
134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first"
135
136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt")
137 with open(pathfname, 'wt') as f:
138
139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible:
140 for pos in self._path:
141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1]))))
142 if self._gym_compatible:
143 for pos in self._path:
144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
147 '''
148 Save last rendered image into directory where the script was runned from.
149 @return: None
150 '''
151 assert not self._gui_disabled, "render() must be called before save_eps"
152 self._problem.save_as_eps(self._gui_disabled)
153
155 '''
156 Visualise input. If visualise is called before GUI opening, render() is called first
157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary:
158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar
159 or 4 dimensional vector (tuple or list).
160 @return: none
161 '''
162 assert self._set, "reset() must be called before any visualisation setting!"
163 if self._gui_disabled:
164 self.render()
165 self._problem.visualise(dictionary)
166
168 '''
169 method to generate observation - current state, finish states
170 @return: tuple
171 '''
172 if self._informed:
173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))]
174 for n in self._problem.get_goal_nodes():
175 ret.append((n.x, n.y, self._get_depth(n)))
176 else:
177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)]
178 return tuple(ret)
179
181 '''
182 method to get action space - all available actions in enviroment
183 @return: spaces
184 '''
185 if self._gym_compatible:
186 return spaces.Discrete(4)
187 else:
188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
194 '''
195 returns reward and indication of goal state
196 @param curr: new state
197 @param last: last state
198 @return: float, boolean
199 '''
200 reward = -2
201 done = False
202 vector = [curr.x - last.x, curr.y - last.y]
203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
204 if curr != last:
205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis)
206 if self._problem.is_goal_state(curr):
207 reward = 100.0
208 done = True
209 if self._gym_compatible:
210 self._player.set_path(self._path)
211 self._player.find_path()
212 return reward, done
213
215 '''
216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0.
217 @param state: namedtuple state
218 @return: float
219 '''
220 start = self._problem.get_start_state()
221 vector = [state.x - start.x, state.y - start.y]
222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1]
223 return float(format(ret, '.3f'))
224
225
227 '''
228 Class necessary for wrapping maze
229 '''
230 __path = []
231
234
236 '''
237 visualise path of the agent, path must be set before visualising!
238 @return:
239 '''
240 ret = []
241 for i in range(len(self.__path) - 1):
242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None))
243 try:
244 self.problem.show_path(ret)
245 except:
246 pass
247 return self.__path
248
249
251 '''
252 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to
253 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze.
254 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement.
255 '''
256
257 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
258 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad)
259 self._gui_on = False
260
261 - def step(self, action):
262 last_state = self._curr_state
263 assert (type(action) == list or type(action) == tuple) and len(action) == 2
264 self._curr_state = self._easy_result(action)
265 if self._curr_state not in self._visited:
266 self._visited.append(self._curr_state)
267 reward, done = self._get_reward(self._curr_state, last_state)
268 return self._get_observation(), reward, done, None
269
270
271
272
273
275 '''
276 This method sets enviroment to visualize your found path. Method render, must be called afterwards.
277 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ]
278 @return: None
279 '''
280 ret = []
281 self._path = path
282 if self._gui_on:
283 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3)
284 previus_state = None
285 for state_list in path:
286 if previus_state != None:
287 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1):
288 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1')
289 ret.append(state(state_list[0], state_list[1]))
290 previus_state = copy.copy(state_list)
291
292 self._player.set_path(ret)
293 self._player.find_path()
294
296 '''
297 returns true if new state is available
298 @param new_state:
299 @return: boolean
300 '''
301 tmp = []
302 tmp.extend(self._visited)
303 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1),
304 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)])
305 return new_state in tmp
306
308 '''
309 Gives result of desired action in parameter
310 @param state_list: list or tuple of coordinates [x, y]
311 @return: state - new position of agent
312 '''
313 new_state = state(state_list[0], state_list[1])
314 if self._is_available(new_state):
315 return new_state
316 else:
317
318 return self._curr_state
319
321 '''
322 returns cost of movement from last to curr
323 @param curr: new state
324 @param last: last state
325 @return: float
326 '''
327 reward = 0
328 vector = [curr.x - last.x, curr.y - last.y]
329
330 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
331 addition_cost = 0
332 if curr in self._problem.hard_places:
333 addition_cost = 5
334
335 if curr != last:
336 reward = abs(vector[0]) + abs(vector[1]) + z_axis + addition_cost
337 return reward
338
340 '''
341 returns tuple of positions with associated costs that can be visited from "position"
342 @param position: position in the maze defined by coordinates (x,y)
343
344 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ]
345 '''
346 expanded_nodes = []
347 maze_pose = state(position[0], position[1])
348 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
349 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)]
350 for new_state in tmp:
351 if new_state.x == maze_pose.x and new_state.y == maze_pose.y:
352 continue
353 if new_state not in self._visited:
354 self._visited.append(new_state)
355 reward = self._get_cost(maze_pose, new_state)
356 expanded_nodes.append([(new_state.x, new_state.y), reward])
357 return expanded_nodes
358
359
360
361
362 '''
363 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor.
364 Main method of wrapper is function step, which returns three values:
365
366 Observations:
367 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...)
368 For Uninformed only (current position coords)
369
370 Rewards:
371 When agent moves to different place it gets reward -1 - depth.
372 When agent reaches finish it gets reward +100.
373 If unavailible action is called, agent stays in same position and reward is 0.
374
375 Done:
376 True when agent reaches the finish.
377
378 Input (parameter) of step method is defined by action space:
379 Easy maze action space is list [x_coordinate, y_coordinate].
380 Hard maze action space is integer from 0 to 3.
381 '''
382
383
385 '''
386 informed easy maze, suitable for A* implementation
387 step([x, y])
388 '''
389 - def __init__(self, map_image=None, grad=(0, 0)):
391
392
394 '''
395 uninformed easy maze, suitable for BFS, DFS ...
396 step([x, y])
397 '''
398 - def __init__(self, map_image=None, grad=(0, 0)):
400
401
403 '''
404 maze for solving MDP problems
405 '''
406 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
407 if probs is not None:
408 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
409 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3])
410 else:
411 super().__init__(False, True, True, map_image, grad)
412
414 '''
415 returns reward and indication of goal state
416 @param curr: new state
417 @param last: last state
418 @return: float, boolean
419 '''
420 reward = self._problem.get_state_reward(curr)
421 done = False
422 vector = [curr.x - last.x, curr.y - last.y]
423 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
424 if curr != last:
425 reward = z_axis + self._problem.get_state_reward(last)
426 if self._problem.is_goal_state(curr):
427 reward = self._problem.get_state_reward(curr)
428 done = True
429 if self._gym_compatible:
430 self._player.set_path(self._path)
431 self._player.find_path()
432 return reward, done
433
436
439
442
445
448
449
451 '''
452 Uninformed hard maze, suitable for reinforcement learning
453 step(param) where param is integer; 0 <= param <= 3
454 '''
455 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
456 if probs is not None:
457 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
458 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
459 else:
460 super(HardMaze, self).__init__(False, True, True, map_image, grad)
461
463 '''
464 returns reward and indication of goal state
465 @param curr: new state
466 @param last: last state
467 @return: float, boolean
468 '''
469 reward = self._problem.get_state_reward(last)
470 done = False
471 vector = [curr.x - last.x, curr.y - last.y]
472 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
473 if curr != last:
474 reward = reward - z_axis
475 if self._problem.is_goal_state(curr):
476 reward = reward + self._problem.get_state_reward(curr)
477 done = True
478 if self._gym_compatible:
479 self._player.set_path(self._path)
480 self._player.find_path()
481 return reward, done
482
484 '''
485 Informed hard maze, suitable for reinforcement learning
486 step(param) where param is integer; 0 <= param <= 3
487 '''
488 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
489 if probs is not None:
490 super(InfHardMaze, self).__init__(True, True, False, map_image, grad)
491 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
492 else:
493 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
494