1
2
3 '''
4 File wrapping maze.py functionality into few methods defined by OpenAI GYM
5 @author: Zdeněk Rozsypálek, Tomas Svoboda
6 @contact: rozsyzde(at)fel.cvut.cz, svobodat@fel.cvut.cz
7 @copyright: (c) 2017, 2018
8 '''
9
10 import collections
11 import os
12 import sys
13 import numpy as np
14 import gym
15 import copy
16 from gym import spaces
17 from gym.utils import seeding
18
19 import kuimaze
20 from .map_generator import maze as mapgen_maze
21
22 path_section = collections.namedtuple('Path', ['state_from', 'state_to', 'cost', 'action'])
23 state = collections.namedtuple('State', ['x', 'y'])
24
26 metadata = {'render.modes': ['human']}
27 _path = []
28 _visited = []
29 MAP = '../maps/easy/easy3.bmp'
30
31 - def __init__(self, informed, gym_compatible, deter, map_image_dir=None, grad=(0, 0), node_rewards=None):
32 '''
33 Class wrapping Maze into gym enviroment.
34 @param informed: boolean
35 @param gym_compatible: boolean - T = HardMaze, F = EasyMaze
36 @param deter: boolean - T = deterministic maze, F = probabilistic maze
37 @param map_image_dir: string - path to image of map
38 @param grad: tuple - vector tuning the tilt of maze`
39 '''
40 if map_image_dir is None:
41 '''
42 If there is no map in parameter, it will be generated, with following setup
43 '''
44 x_size = 6
45 y_size = 6
46 complexity = 0.1
47 density = 0.25
48 self.MAP = mapgen_maze(x_size, y_size, complexity, density)
49 else:
50 self.MAP = map_image_dir
51 if grad is None:
52 self._grad = (0, 0)
53 else:
54 self._grad = grad
55 self._problem = kuimaze.Maze(self.MAP, self._grad, node_rewards=node_rewards)
56 self._player = EnvAgent(self._problem)
57 self._curr_state = self._problem.get_start_state()
58 self._informed = informed
59 self._gym_compatible = gym_compatible
60 self._deter = deter
61 self._gui_disabled = True
62 self._set = False
63
64 self._xsize = self._problem.get_dimensions()[0]
65 self._ysize = self._problem.get_dimensions()[1]
66 self.action_space = self._get_action_space()
67 self.observation_space = spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize)))
68 self.seed()
69 self.reset()
70
71 - def step(self, action):
72 assert self._set, "reset() must be called first!"
73 last_state = self._curr_state
74 assert(0 <= action <= 3)
75 if not self._deter:
76 action = self._problem.non_det_result(action)
77 self._curr_state = self._problem.result(self._curr_state, action)
78 self._path.append(self._curr_state)
79 if self._curr_state not in self._visited:
80 self._visited.append(self._curr_state)
81 reward, done = self._get_reward(self._curr_state, last_state)
82
83 return self._get_observation(), reward, done, None
84
86 '''
87 auxiliary function for MDPs - where states (map) are supposed to be known
88 :return: list of states
89 '''
90 return self._problem.get_all_states()
91
104
105 - def render(self, mode='human', close=False, visited=None, explored=None):
106 assert self._set, "reset() must be called first!"
107 self._gui_disabled = False
108 if visited is None:
109 self._problem.set_visited(self._visited)
110 else:
111 self._problem.set_visited(self._visited)
112 if explored is None:
113 self._problem.set_explored([self._curr_state])
114 else:
115 self._problem.set_explored(explored)
116
117 self._problem.show_and_break()
118 self._gui_on = True
119
121 self._gui_disabled = True
122 self._problem.close_gui()
123
124 - def seed(self, seed=None):
125 self.np_random, seed = seeding.np_random(seed)
126 return [seed]
127
129 '''
130 Method for saving path of the agent into the file named 'saved_path.txt' into the directory where was the script
131 runned from.
132 @return: None
133 '''
134 assert len(self._path) > 0, "Path length must be greater than 0, for easy enviroment call set_path first"
135
136 pathfname = os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "saved_path.txt")
137 with open(pathfname, 'wt') as f:
138
139 if (type(self._path[0]) == tuple or type(self._path[0]) == list) and not self._gym_compatible:
140 for pos in self._path:
141 f.write("x:{}, y:{}, z:{}\n".format(pos[0], pos[1], self._get_depth(state(pos[0], pos[1]))))
142 if self._gym_compatible:
143 for pos in self._path:
144 f.write("x:{}, y:{}, z:{}\n".format(pos.x, pos.y, self._get_depth(pos)))
145
147 '''
148 Save last rendered image into directory where the script was runned from.
149 @return: None
150 '''
151 assert not self._gui_disabled, "render() must be called before save_eps"
152 self._problem.save_as_eps(self._gui_disabled)
153
155 '''
156 Visualise input. If visualise is called before GUI opening, render() is called first
157 @param dictionary: input to visualise, can be None -> visulise depth, or dictionary:
158 {'x': x_coord, 'y': y_coord, 'value': value_to_visualise} where value can be scalar
159 or 4 dimensional vector (tuple or list).
160 @return: none
161 '''
162 assert self._set, "reset() must be called before any visualisation setting!"
163 if self._gui_disabled:
164 self.render()
165 self._problem.visualise(dictionary)
166
168 '''
169 method to generate observation - current state, finish states
170 @return: tuple
171 '''
172 if self._informed:
173 ret = [(self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state))]
174 for n in self._problem.get_goal_nodes():
175 ret.append((n.x, n.y, self._get_depth(n)))
176 else:
177 ret = [self._curr_state.x, self._curr_state.y, self._get_depth(self._curr_state)]
178 return tuple(ret)
179
181 '''
182 method to get action space - all available actions in enviroment
183 @return: spaces
184 '''
185 if self._gym_compatible:
186 return spaces.Discrete(4)
187 else:
188 return spaces.Tuple(spaces.Tuple((spaces.Discrete(self._xsize), spaces.Discrete(self._ysize))))
189
191 return self._problem.__node_rewards[self._curr_state.x, self._curr_state.y]
192
194 '''
195 returns reward and indication of goal state
196 @param curr: new state
197 @param last: last state
198 @return: float, boolean
199 '''
200 reward = -2
201 done = False
202 vector = [curr.x - last.x, curr.y - last.y]
203 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
204 if curr != last:
205 reward = -(abs(vector[0]) + abs(vector[1]) + z_axis)
206 if self._problem.is_goal_state(curr):
207 reward = 100.0
208 done = True
209 if self._gym_compatible:
210 self._player.set_path(self._path)
211 self._player.find_path()
212 return reward, done
213
215 '''
216 Get depth (z coordinate) of state based on gradient. Start state of map has depth 0.
217 @param state: namedtuple state
218 @return: float
219 '''
220 start = self._problem.get_start_state()
221 vector = [state.x - start.x, state.y - start.y]
222 ret = self._grad[0] * vector[0] + self._grad[1] * vector[1]
223 return float(format(ret, '.3f'))
224
225
227 '''
228 Class necessary for wrapping maze
229 '''
230 __path = []
231
234
236 '''
237 visualise path of the agent, path must be set before visualising!
238 @return:
239 '''
240 ret = []
241 for i in range(len(self.__path) - 1):
242 ret.append(path_section(self.__path[i], self.__path[i + 1], 1, None))
243 self.problem.show_path(ret)
244 return self.__path
245
246
248 '''
249 EasyMazeEnv is version of maze closer to graph search. It is possible to move agent from any state to
250 different one already visited or neighbour state of current one. EasyMaze has all methods of HardMaze.
251 Unlike the HardMaze, EasyMaze has additional method set_path - which can set different path than agent movement.
252 '''
253
254 - def __init__(self, informed, map_image_dir=None, grad=(0, 0)):
255 super(EasyMazeEnv, self).__init__(informed, False, True, map_image_dir, grad)
256 self._gui_on = False
257
258 - def step(self, action):
259 last_state = self._curr_state
260 assert (type(action) == list or type(action) == tuple) and len(action) == 2
261 self._curr_state = self._easy_result(action)
262 if self._curr_state not in self._visited:
263 self._visited.append(self._curr_state)
264 reward, done = self._get_reward(self._curr_state, last_state)
265 return self._get_observation(), reward, done, None
266
267
268
269
270
272 '''
273 This method sets enviroment to visualize your found path. Method render, must be called afterwards.
274 @param path: list of lists in format: [[x1, y1], [x2, y2], ... ]
275 @return: None
276 '''
277 ret = []
278 self._path = path
279 if self._gui_on:
280 assert (type(path[0]) == list or type(path[0]) == tuple) and (len(path[0]) == 2 or len(path[0]) == 3)
281 previus_state = None
282 for state_list in path:
283 if previus_state != None:
284 if (abs(state_list[0]-previus_state[0]) + abs(state_list[1]-previus_state[1]) != 1):
285 raise AssertionError('The path is not continuous - distance between neighbouring path segments should be 1')
286 ret.append(state(state_list[0], state_list[1]))
287 previus_state = copy.copy(state_list)
288
289 self._player.set_path(ret)
290 self._player.find_path()
291
293 '''
294 returns true if new state is available
295 @param new_state:
296 @return: boolean
297 '''
298 tmp = []
299 tmp.extend(self._visited)
300 tmp.extend([self._problem.result(self._curr_state, 0), self._problem.result(self._curr_state, 1),
301 self._problem.result(self._curr_state, 2), self._problem.result(self._curr_state, 3)])
302 return new_state in tmp
303
305 '''
306 Gives result of desired action in parameter
307 @param state_list: list or tuple of coordinates [x, y]
308 @return: state - new position of agent
309 '''
310 new_state = state(state_list[0], state_list[1])
311 if self._is_available(new_state):
312 return new_state
313 else:
314
315 return self._curr_state
316
318 '''
319 returns cost of movement from last to curr
320 @param curr: new state
321 @param last: last state
322 @return: float
323 '''
324 reward = 0
325 vector = [curr.x - last.x, curr.y - last.y]
326
327 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
328 addition_cost = 0
329 if curr in self._problem.hard_places:
330 addition_cost = 5
331
332 if curr != last:
333 reward = abs(vector[0]) + abs(vector[1]) + z_axis + addition_cost
334 return reward
335
337 '''
338 returns tuple of positions with associated costs that can be visited from "position"
339 @param position: position in the maze defined by coordinates (x,y)
340
341 @return: tuple of coordinates [x, y] with "cost" for movement to these positions: [[[x1, y1], cost1], [[x2, y2], cost2], ... ]
342 '''
343 expanded_nodes = []
344 maze_pose = state(position[0], position[1])
345 tmp = [self._problem.result(maze_pose, 0), self._problem.result(maze_pose, 1),
346 self._problem.result(maze_pose, 2), self._problem.result(maze_pose, 3)]
347 for new_state in tmp:
348 if new_state.x == maze_pose.x and new_state.y == maze_pose.y:
349 continue
350 if new_state not in self._visited:
351 self._visited.append(new_state)
352 reward = self._get_cost(maze_pose, new_state)
353 expanded_nodes.append([(new_state.x, new_state.y), reward])
354 return expanded_nodes
355
356
357
358
359 '''
360 Final set of classes to use. As defined in OpenAI gym, all without any params needed in constructor.
361 Main method of wrapper is function step, which returns three values:
362
363 Observations:
364 For informed search is observation in format: ((current position coords), (finish_1 coords), (finish_2 coords), ...)
365 For Uninformed only (current position coords)
366
367 Rewards:
368 When agent moves to different place it gets reward -1 - depth.
369 When agent reaches finish it gets reward +100.
370 If unavailible action is called, agent stays in same position and reward is 0.
371
372 Done:
373 True when agent reaches the finish.
374
375 Input (parameter) of step method is defined by action space:
376 Easy maze action space is list [x_coordinate, y_coordinate].
377 Hard maze action space is integer from 0 to 3.
378 '''
379
380
382 '''
383 informed easy maze, suitable for A* implementation
384 step([x, y])
385 '''
386 - def __init__(self, map_image=None, grad=(0, 0)):
388
389
391 '''
392 uninformed easy maze, suitable for BFS, DFS ...
393 step([x, y])
394 '''
395 - def __init__(self, map_image=None, grad=(0, 0)):
397
398
400 '''
401 maze for solving MDP problems
402 '''
403 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
404 if probs is not None:
405 super().__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
406 self._problem.set_probs_table(probs[0], probs[1], probs[2], probs[3])
407 else:
408 super().__init__(False, True, True, map_image, grad)
409
411 '''
412 returns reward and indication of goal state
413 @param curr: new state
414 @param last: last state
415 @return: float, boolean
416 '''
417 reward = self._problem.get_state_reward(curr)
418 done = False
419 vector = [curr.x - last.x, curr.y - last.y]
420 z_axis = vector[0] * self._grad[0] + vector[1] * self._grad[1]
421 if curr != last:
422 reward = z_axis + self._problem.get_state_reward(curr)
423 if self._problem.is_goal_state(curr):
424 reward = self._problem.get_state_reward(curr)
425 done = True
426 if self._gym_compatible:
427 self._player.set_path(self._path)
428 self._player.find_path()
429 return reward, done
430
433
436
439
442
445
446
447
449 '''
450 Uninformed hard maze, suitable for reinforcement learning
451 step(param) where param is integer; 0 <= param <= 3
452 '''
453 - def __init__(self, map_image=None, grad=(0, 0), probs=None, node_rewards=None):
454 if probs is not None:
455 super(HardMaze, self).__init__(False, True, False, map_image, grad, node_rewards=node_rewards)
456 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
457 else:
458 super(HardMaze, self).__init__(False, True, True, map_image, grad)
459
460
462 '''
463 Informed hard maze, suitable for reinforcement learning
464 step(param) where param is integer; 0 <= param <= 3
465 '''
466 - def __init__(self, map_image=None, grad=(0, 0), probs=None):
467 if probs is not None:
468 super(InfHardMaze, self).__init__(True, True, False, map_image, grad)
469 self._problem.set_probs(probs[0], probs[1], probs[2], probs[3])
470 else:
471 super(InfHardMaze, self).__init__(True, True, True, map_image, grad)
472