1
2
3 '''
4 File wrapping maze.py functionality into few methods defined by OpenAI GYM
5 @author: Zdeněk Rozsypálek, Tomas Svoboda
6 @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz
7 @copyright: (c) 2017, 2018
8 '''
9
10 import collections
11 import os
12 import sys
13 import numpy as np
14 import gym
15 import copy
16 from gym import spaces
17 from gym.utils import seeding
18
19 import kuimaze
20 from .map_generator import maze as mapgen_maze
21
22 path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action'])
23 state = collections.namedtuple('State', ['x', 'y'])
24
26 metadata = {'render.modes': ['human']}
27 _path = []
28 _visited = []
29 MAP = '../maps/easy/easy3.bmp'
30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 '''
33 Class wrapping Maze into gym enviroment.
34 @param informed: boolean
35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze
36 @param deter: boolean - T = deterministic maze, F = probabilistic maze
37 @param map_image_dir: string - path to image of map
38 @param grad: tuple - vector tuning the tilt of maze`
39 '''
40 if map_image_dir is None:
41 '''
42 If there is no map in parameter, it will be generated, with following setup
43 '''
44 x_size = 6
45 y_size = 6
46 complexity = 0.1
47 density = 0.25
48 self.MAP = mapgen_maze(x_size, y_size, complexity, density)
49 else:
50 self.MAP = map_image_dir
51 if grad is None:
52 self._grad = (0, 0)
53 else:
54 self._grad = grad
55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards)
56 self._player = EnvAgent(self._problem)
57 self._curr_state = self._problem.get_start_state()
58 self._informed = informed
59 self._gym_compatible = gym_compatible
60 self._deter = deter
61 self._gui_disabled = True
62 self._set = False
63
64 self._xsize = self._problem.get_dimensions()[0]
65 self._ysize = self._problem.get_dimensions()[1]
66 self.action_space = self._get_action_space()
67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize)))
68 self.seed()
69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!"
73 last_state = self._curr_state
74 assert(0 <= action <= 3)
75 if not self._deter:
76 action = self._problem.non_det_result(action)
77 self._curr_state = self._problem.result(self._curr_state, action)
78 self._path.append(self._curr_state)
79 if self._curr_state not in self._visited:
80 self._visited.append(self._curr_state)
81 reward, done = self._get_reward(self._curr_state, last_state)
82
83 return self._get_observation(), reward, done, None
84
86 '''
87 auxiliary function for MDPs - where states (map) are supposed to be known
88 :return: list of states
89 '''
90 return self._problem.get_all_states()
91
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!"
107 self._gui_disabled = False
108 if visited is None:
109 self._problem.set_visited(self._visited)
110 else:
111 self._problem.set_visited(self._visited)
112 if explored is None:
113 self._problem.set_explored([self._curr_state])
114 else:
115 self._problem.set_explored(explored)
116
117 self._problem.show_and_break()
118 self._gui_on = True
119
121 self._gui_disabled = True
122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed)
126 return [seed]
127
129 '''
130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script
131 runned from.
132 @return: None
133 '''
134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first"
135
136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt")
137 with open(pathfname, 'wt') as f:
138
139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible:
140 for pos in self._path:
141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1]))))
142 if self._gym_compatible:
143 for pos in self._path:
144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
147 '''
148 Save last rendered image into directory where the script was runned from.
149 @return: None
150 '''
151 assert not self._gui_disabled, "render() must be called before save_eps"
152 self._problem.save_as_eps(self._gui_disabled)
153
155 '''
156 Visualise input. If visualise is called before GUI opening, render() is called first
157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary:
158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar
159 or 4 dimensional vector (tuple or list).
160 @return: none
161 '''
162 assert self._set, "reset() must be called before any visualisation setting!"
163 if self._gui_disabled:
164 self.render()
165 self._problem.visualise(dictionary)
166
168 '''
169 method to generate observation - current state, finish states
170 @return: tuple
171 '''
172 if self._informed:
173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))]
174 for n in self._problem.get_goal_nodes():
175 ret.append((n.x, n.y, self._get_depth(n)))
176 else:
177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)]
178 return tuple(ret)
179
181 '''
182 method to get action space - all available actions in enviroment
183 @return: spaces
184 '''
185 if self._gym_compatible:
186 return spaces.Discrete(4)
187 else:
188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
194 '''
195 returns reward and indication of goal state
196 @param curr: new state
197 @param last: last state
198 @return: float, boolean
199 '''
200 reward = -2
201 done = False
202 vector = [curr.x - last.x, curr.y - last.y]
203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
204 if curr != last:
205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis)
206 if self._problem.is_goal_state(curr):
207 reward = 100.0
208 done = True
209 if self._gym_compatible:
210 self._player.set_path(self._path)
211 self._player.find_path()
212 return reward, done
213
215 '''
216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0.
217 @param state: namedtuple state
218 @return: float
219 '''
220 start = self._problem.get_start_state()
221 vector = [state.x - start.x, state.y - start.y]
222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1]
223 return float(format(ret, '.3f'))
224
225
227 '''
228 Class necessary for wrapping maze
229 '''
230 __path = []
231
234
236 '''
237 visualise path of the agent, path must be set before visualising!
238 @return:
239 '''
240 ret = []
241 for i in range(len(self.__path) - 1):
242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None))
243 try:
244 self.problem.show_path(ret)
245 except:
246 pass
247 return self.__path
248
249
251 '''
252 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to
253 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze.
254 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement.
255 '''
256
257 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
258 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad)
259 self._gui_on = False
260
261 - def step(self, action):
262 last_state = self._curr_state
263 assert (type(action) == list or type(action) == tuple) and len(action) == 2
264 self._curr_state = self._easy_result(action)
265 if self._curr_state not in self._visited:
266 self._visited.append(self._curr_state)
267 reward, done = self._get_reward(self._curr_state, last_state)
268 return self._get_observation(), reward, done, None
269
270
271
272
273
275 '''
276 This method sets enviroment to visualize your found path. Method render, must be called afterwards.
277 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ]
278 @return: None
279 '''
280 ret = []
281 self._path = path
282 if self._gui_on:
283 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3)
284 previus_state = None
285 for state_list in path:
286 if previus_state != None:
287 if (abs(state_list[0]-previus_state[0]) > 1 and abs(state_list[1]-previus_state[1]) > 1):
288 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be max. 2 ** (1/2)')
289 ret.append(state(state_list[0], state_list[1]))
290 previus_state = copy.copy(state_list)
291
292 self._player.set_path(ret)
293 self._player.find_path()
294
296 '''
297 returns true if new state is available
298 @param new_state:
299 @return: boolean
300 '''
301 tmp = []
302 tmp.extend(self._visited)
303 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1),
304 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3),
305 self._problem.result(self._curr_state, 4), self._problem.result(self._curr_state, 5),
306 self._problem.result(self._curr_state, 6), self._problem.result(self._curr_state, 6)])
307 return new_state in tmp
308
310 '''
311 Gives result of desired action in parameter
312 @param state_list: list or tuple of coordinates [x, y]
313 @return: state - new position of agent
314 '''
315 new_state = state(state_list[0], state_list[1])
316 if self._is_available(new_state):
317 return new_state
318 else:
319
320 return self._curr_state
321
323 '''
324 returns cost of movement from last to curr
325 @param curr: new state
326 @param last: last state
327 @return: float
328 '''
329 reward = 0
330 vector = [curr.x - last.x, curr.y - last.y]
331
332 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
333 addition_cost = 0
334 if curr in self._problem.hard_places:
335 addition_cost = 5
336
337 if curr != last:
338 reward = pow(pow(vector[0],2) + pow(vector[1],2),1/2) + z_axis + addition_cost
339 return reward
340
342 '''
343 returns tuple of positions with associated costs that can be visited from "position"
344 @param position: position in the maze defined by coordinates (x,y)
345
346 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ]
347 '''
348 expanded_nodes = []
349 maze_pose = state(position[0], position[1])
350 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
351 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3),
352 self._problem.result(maze_pose, 4), self._problem.result(maze_pose, 5),
353 self._problem.result(maze_pose, 6), self._problem.result(maze_pose, 7)]
354 for new_state in tmp:
355 if new_state.x == maze_pose.x and new_state.y == maze_pose.y:
356 continue
357 if new_state not in self._visited:
358 self._visited.append(new_state)
359 reward = self._get_cost(maze_pose, new_state)
360 expanded_nodes.append([(new_state.x, new_state.y), reward])
361 return expanded_nodes
362
363
364
365
366 '''
367 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor.
368 Main method of wrapper is function step, which returns three values:
369
370 Observations:
371 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...)
372 For Uninformed only (current position coords)
373
374 Rewards:
375 When agent moves to different place it gets reward -1 - depth.
376 When agent reaches finish it gets reward +100.
377 If unavailible action is called, agent stays in same position and reward is 0.
378
379 Done:
380 True when agent reaches the finish.
381
382 Input (parameter) of step method is defined by action space:
383 Easy maze action space is list [x_coordinate, y_coordinate].
384 Hard maze action space is integer from 0 to 3.
385 '''
386
387
389 '''
390 informed easy maze, suitable for A* implementation
391 step([x, y])
392 '''
393 - def __init__(self, map_image=None, grad=(0, 0)):
395
396
398 '''
399 uninformed easy maze, suitable for BFS, DFS ...
400 step([x, y])
401 '''
402 - def __init__(self, map_image=None, grad=(0, 0)):
404
405
407 '''
408 maze for solving MDP problems
409 '''
410 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
411 if probs is not None:
412 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
413 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3])
414 else:
415 super().__init__(False, True, True, map_image, grad)
416
418 '''
419 returns reward and indication of goal state
420 @param curr: new state
421 @param last: last state
422 @return: float, boolean
423 '''
424 reward = self._problem.get_state_reward(curr)
425 done = False
426 vector = [curr.x - last.x, curr.y - last.y]
427 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
428 if curr != last:
429 reward = z_axis + self._problem.get_state_reward(last)
430 if self._problem.is_goal_state(curr):
431 reward = self._problem.get_state_reward(curr)
432 done = True
433 if self._gym_compatible:
434 self._player.set_path(self._path)
435 self._player.find_path()
436 return reward, done
437
440
443
446
449
452
453
455 '''
456 Uninformed hard maze, suitable for reinforcement learning
457 step(param) where param is integer; 0 <= param <= 3
458 '''
459 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
460 if probs is not None:
461 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
462 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
463 else:
464 super(HardMaze, self).__init__(False, True, True, map_image, grad)
465
467 '''
468 returns reward and indication of goal state
469 @param curr: new state
470 @param last: last state
471 @return: float, booleanloc
472 '''
473 reward = self._problem.get_state_reward(last)
474 done = False
475 vector = [curr.x - last.x, curr.y - last.y]
476 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
477 if curr != last:
478 reward = reward - z_axis
479 if self._problem.is_goal_state(curr):
480 reward = reward + self._problem.get_state_reward(curr)
481 done = True
482 if self._gym_compatible:
483 self._player.set_path(self._path)
484 self._player.find_path()
485 return reward, done
486
488 '''
489 Informed hard maze, suitable for reinforcement learning
490 step(param) where param is integer; 0 <= param <= 3
491 '''
492 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
493 if probs is not None:
494 super(InfHardMaze, self).__init__(True, True, False, map_image, grad)
495 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
496 else:
497 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
498