1
2
3 '''
4 File wrapping maze.py functionality into few methods defined by OpenAI GYM
5 @author: Zdeněk Rozsypálek, Tomas Svoboda
6 @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz
7 @copyright: (c) 2017, 2018
8 '''
9
10 import collections
11 import os
12 import sys
13 import numpy as np
14 import gym
15 import copy
16 from gym import spaces
17 from gym.utils import seeding
18
19 import kuimaze
20 from .map_generator import maze as mapgen_maze
21
22 EIGHT_DIRS = False
23
24 path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action'])
25 state = collections.namedtuple('State', ['x', 'y'])
26
28 metadata = {'render.modes': ['human']}
29 _path = []
30 _visited = []
31 MAP = '../maps/easy/easy3.bmp'
32
33 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
34 '''
35 Class wrapping Maze into gym enviroment.
36 @param informed: boolean
37 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze
38 @param deter: boolean - T = deterministic maze, F = probabilistic maze
39 @param map_image_dir: string - path to image of map
40 @param grad: tuple - vector tuning the tilt of maze`
41 '''
42 if map_image_dir is None:
43 '''
44 If there is no map in parameter, it will be generated, with following setup
45 '''
46 x_size = 6
47 y_size = 6
48 complexity = 0.1
49 density = 0.25
50 self.MAP = mapgen_maze(x_size, y_size, complexity, density)
51 else:
52 self.MAP = map_image_dir
53 if grad is None:
54 self._grad = (0, 0)
55 else:
56 self._grad = grad
57 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards)
58 self._player = EnvAgent(self._problem)
59 self._curr_state = self._problem.get_start_state()
60 self._informed = informed
61 self._gym_compatible = gym_compatible
62 self._deter = deter
63 self._gui_disabled = True
64 self._set = False
65
66 self._xsize = self._problem.get_dimensions()[0]
67 self._ysize = self._problem.get_dimensions()[1]
68 self.action_space = self._get_action_space()
69 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize)))
70 self.seed()
71 self.reset()
72
73 - def step(self, action):
74 assert self._set, "reset() must be called first!"
75 last_state = self._curr_state
76 assert(0 <= action <= 3)
77 if not self._deter:
78 action = self._problem.non_det_result(action)
79 self._curr_state = self._problem.result(self._curr_state, action)
80 self._path.append(self._curr_state)
81 if self._curr_state not in self._visited:
82 self._visited.append(self._curr_state)
83 reward, done = self._get_reward(self._curr_state, last_state)
84
85 return self._get_observation(), reward, done, None
86
88 '''
89 auxiliary function for MDPs - where states (map) are supposed to be known
90 :return: list of states
91 '''
92 return self._problem.get_all_states()
93
106
107 - def render(self, mode='human', close=False, visited=None, explored=None):
108 assert self._set, "reset() must be called first!"
109 self._gui_disabled = False
110 if visited is None:
111 self._problem.set_visited(self._visited)
112 else:
113 self._problem.set_visited(self._visited)
114 if explored is None:
115 self._problem.set_explored([self._curr_state])
116 else:
117 self._problem.set_explored(explored)
118
119 self._problem.show_and_break()
120 self._gui_on = True
121
123 self._gui_disabled = True
124 self._problem.close_gui()
125
126 - def seed(self, seed=None):
127 self.np_random, seed = seeding.np_random(seed)
128 return [seed]
129
131 '''
132 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script
133 runned from.
134 @return: None
135 '''
136 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first"
137
138 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt")
139 with open(pathfname, 'wt') as f:
140
141 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible:
142 for pos in self._path:
143 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1]))))
144 if self._gym_compatible:
145 for pos in self._path:
146 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
147
149 '''
150 Save last rendered image into directory where the script was runned from.
151 @return: None
152 '''
153 assert not self._gui_disabled, "render() must be called before save_eps"
154 self._problem.save_as_eps(self._gui_disabled)
155
157 '''
158 Visualise input. If visualise is called before GUI opening, render() is called first
159 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary:
160 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar
161 or 4 dimensional vector (tuple or list).
162 @return: none
163 '''
164 assert self._set, "reset() must be called before any visualisation setting!"
165 if self._gui_disabled:
166 self.render()
167 self._problem.visualise(dictionary)
168
170 '''
171 method to generate observation - current state, finish states
172 @return: tuple
173 '''
174 if self._informed:
175 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))]
176 for n in self._problem.get_goal_nodes():
177 ret.append((n.x, n.y, self._get_depth(n)))
178 else:
179 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)]
180 return tuple(ret)
181
183 '''
184 method to get action space - all available actions in enviroment
185 @return: spaces
186 '''
187 if self._gym_compatible:
188 return spaces.Discrete(4)
189 else:
190 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
191
193 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
194
196 '''
197 returns reward and indication of goal state
198 @param curr: new state
199 @param last: last state
200 @return: float, boolean
201 '''
202 reward = -2
203 done = False
204 vector = [curr.x - last.x, curr.y - last.y]
205 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
206 if curr != last:
207 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis)
208 if self._problem.is_goal_state(curr):
209 reward = 100.0
210 done = True
211 if self._gym_compatible:
212 self._player.set_path(self._path)
213 self._player.find_path()
214 return reward, done
215
217 '''
218 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0.
219 @param state: namedtuple state
220 @return: float
221 '''
222 start = self._problem.get_start_state()
223 vector = [state.x - start.x, state.y - start.y]
224 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1]
225 return float(format(ret, '.3f'))
226
227
229 '''
230 Class necessary for wrapping maze
231 '''
232 __path = []
233
236
238 '''
239 visualise path of the agent, path must be set before visualising!
240 @return:
241 '''
242 ret = []
243 for i in range(len(self.__path) - 1):
244 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None))
245 try:
246 self.problem.show_path(ret)
247 except:
248 pass
249 return self.__path
250
251
253 '''
254 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to
255 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze.
256 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement.
257 '''
258
259 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
260 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad)
261 self._gui_on = False
262
263 - def step(self, action):
264 last_state = self._curr_state
265 assert (type(action) == list or type(action) == tuple) and len(action) == 2
266 self._curr_state = self._easy_result(action)
267 if self._curr_state not in self._visited:
268 self._visited.append(self._curr_state)
269 reward, done = self._get_reward(self._curr_state, last_state)
270 return self._get_observation(), reward, done, None
271
272
273
274
275
277 '''
278 This method sets enviroment to visualize your found path. Method render, must be called afterwards.
279 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ]
280 @return: None
281 '''
282 ret = []
283 self._path = path
284 if self._gui_on:
285 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3)
286 previus_state = None
287 for state_list in path:
288 if previus_state != None:
289 if (abs(state_list[0]-previus_state[0]) > 1 and abs(state_list[1]-previus_state[1]) > 1):
290 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be max. 2 ** (1/2)')
291 ret.append(state(state_list[0], state_list[1]))
292 previus_state = copy.copy(state_list)
293
294 self._player.set_path(ret)
295 self._player.find_path()
296
298 '''
299 returns true if new state is available
300 @param new_state:
301 @return: boolean
302 '''
303 tmp = []
304 tmp.extend(self._visited)
305 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1),
306 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3),
307 self._problem.result(self._curr_state, 4), self._problem.result(self._curr_state, 5),
308 self._problem.result(self._curr_state, 6), self._problem.result(self._curr_state, 6)])
309 return new_state in tmp
310
312 '''
313 Gives result of desired action in parameter
314 @param state_list: list or tuple of coordinates [x, y]
315 @return: state - new position of agent
316 '''
317 new_state = state(state_list[0], state_list[1])
318 if self._is_available(new_state):
319 return new_state
320 else:
321
322 return self._curr_state
323
325 '''
326 returns cost of movement from last to curr
327 @param curr: new state
328 @param last: last state
329 @return: float
330 '''
331 reward = 0
332 vector = [curr.x - last.x, curr.y - last.y]
333
334 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
335 addition_cost = 0
336 if curr in self._problem.hard_places:
337 addition_cost = 5
338
339 if curr != last:
340 reward = pow(pow(vector[0],2) + pow(vector[1],2),1/2) + z_axis + addition_cost
341 return reward
342
344 '''
345 returns tuple of positions with associated costs that can be visited from "position"
346 @param position: position in the maze defined by coordinates (x,y)
347
348 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ]
349 '''
350 expanded_nodes = []
351 maze_pose = state(position[0], position[1])
352 if EIGHT_DIRS:
353 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
354 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3),
355 self._problem.result(maze_pose, 4), self._problem.result(maze_pose, 5),
356 self._problem.result(maze_pose, 6), self._problem.result(maze_pose, 7)]
357 else:
358 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
359 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)]
360 for new_state in tmp:
361 if new_state.x == maze_pose.x and new_state.y == maze_pose.y:
362 continue
363 if new_state not in self._visited:
364 self._visited.append(new_state)
365 reward = self._get_cost(maze_pose, new_state)
366 expanded_nodes.append([(new_state.x, new_state.y), reward])
367 return expanded_nodes
368
369
370
371
372 '''
373 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor.
374 Main method of wrapper is function step, which returns three values:
375
376 Observations:
377 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...)
378 For Uninformed only (current position coords)
379
380 Rewards:
381 When agent moves to different place it gets reward -1 - depth.
382 When agent reaches finish it gets reward +100.
383 If unavailible action is called, agent stays in same position and reward is 0.
384
385 Done:
386 True when agent reaches the finish.
387
388 Input (parameter) of step method is defined by action space:
389 Easy maze action space is list [x_coordinate, y_coordinate].
390 Hard maze action space is integer from 0 to 3.
391 '''
392
393
395 '''
396 informed easy maze, suitable for A* implementation
397 step([x, y])
398 '''
399 - def __init__(self, map_image=None, grad=(0, 0)):
401
402
404 '''
405 uninformed easy maze, suitable for BFS, DFS ...
406 step([x, y])
407 '''
408 - def __init__(self, map_image=None, grad=(0, 0)):
410
411
413 '''
414 maze for solving MDP problems
415 '''
416 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
417 if probs is not None:
418 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
419 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3])
420 else:
421 super().__init__(False, True, True, map_image, grad)
422
424 '''
425 returns reward and indication of goal state
426 @param curr: new state
427 @param last: last state
428 @return: float, boolean
429 '''
430 reward = self._problem.get_state_reward(curr)
431 done = False
432 vector = [curr.x - last.x, curr.y - last.y]
433 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
434 if curr != last:
435 reward = z_axis + self._problem.get_state_reward(last)
436 if self._problem.is_goal_state(curr):
437 reward = self._problem.get_state_reward(curr)
438 done = True
439 if self._gym_compatible:
440 self._player.set_path(self._path)
441 self._player.find_path()
442 return reward, done
443
446
449
452
455
458
459
461 '''
462 Uninformed hard maze, suitable for reinforcement learning
463 step(param) where param is integer; 0 <= param <= 3
464 '''
465 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
466 if probs is not None:
467 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
468 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
469 else:
470 super(HardMaze, self).__init__(False, True, True, map_image, grad)
471
473 '''
474 returns reward and indication of goal state
475 @param curr: new state
476 @param last: last state
477 @return: float, booleanloc
478 '''
479 reward = self._problem.get_state_reward(last)
480 done = False
481 vector = [curr.x - last.x, curr.y - last.y]
482 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
483 if curr != last:
484 reward = reward - z_axis
485 if self._problem.is_goal_state(curr):
486 reward = reward + self._problem.get_state_reward(curr)
487 done = True
488 if self._gym_compatible:
489 self._player.set_path(self._path)
490 self._player.find_path()
491 return reward, done
492
494 '''
495 Informed hard maze, suitable for reinforcement learning
496 step(param) where param is integer; 0 <= param <= 3
497 '''
498 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
499 if probs is not None:
500 super(InfHardMaze, self).__init__(True, True, False, map_image, grad)
501 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
502 else:
503 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
504