===== T1x-dstar - Incremental path planning (D* Lite) ======
The main task is to implement an incremental grid-based path planning approach. You will learn how the pre-calculated planning data could be effectively reused to update the path plan in case there are unexpected changes in the map, hence, it is not required to start with the planning from scratch.
|**Due date** | January 11, 2025, 23:59 PST |
|**Deadline** | January 11, 2025, 23:59 PST |
|**Points** | 5 (Bonus points)|
|**Label in BRUTE** | t1x-dstar |
|**Files to submit** | archive with ''HexapodExplorer.py'' file |
|**Resources** | {{ :courses:uir:hw:b4m36uir_24_t1_resource_pack.zip | B4M36UIR_t1_resource_pack}} |
----
===Assignment===
In class ''HexapodExplorer.py'' implement the ''plan_path_incremental'' function. The purpose of the function is to find the collision free path between the start (robot position) and goal position considering 8-neigborhood using D* lite algorithm, according to the pseudocode described in {{courses:uir:lectures:b4m36uir-lec03-slides.pdf| Lecture 3. Path planning}}.
The input parameters of the function are:
- ''grid_map'' - [[courses:uir:hw:support:messages|OccupancyGrid message]] - pre-processed grid map representation (contains 1 and 0 only)
- ''start'' - [[courses:uir:hw:support:messages|Pose message]] - start robot pose
- ''goal'' - [[courses:uir:hw:support:messages|Pose message]] - goal robot pose
The function returns:
- the [[courses:uir:hw:support:messages|Path message]] with the found path or ''None'' when the path cannot be found. It is supposed that the path contains only 2D coordinates and not the heading, i.e., only ''position.x'' and ''position.y'' entries of individual poses in the path are filled-in. The path is returned in the world coordinates!
- ''rhs'' - ''float[]'' values of one-step lookahead objective function $rhs$ for individual map cells in row-major order
- ''g'' - ''float[]'' values of objective function $g$ for individual map cells in row-major order
The ''plan_path_incremental'' function in the ''HexapodExplorer.py'' class has a following prescription with the minimum working code:
def plan_path_incremental(self, grid_map, start, goal):
""" Method for incremental path planning from start to the goal pose on the grid
Args:
grid_map: OccupancyGrid - gridmap for obstacle growing
start: Pose - robot start pose
goal: Pose - robot goal pose
Returns:
path: Path - path between the start and goal Pose on the map
rhs: float[] - one-step lookahead objective function in row-major order
g: float[] - objective function value in row-major order
"""
if not hasattr(self, 'rhs'): #first run of the function
self.rhs = np.full((grid_map.height, grid_map.width), np.inf)
self.g = np.full((grid_map.height, grid_map.width), np.inf)
return self.plan_path(grid_map, start, goal), rhs.flatten(), g.flatten()
**The implementation requirements**
- For implementation of D* lite planner, the class ''HexapodExplorer'' has to keep its own representation of the environment in a form of map that is initialized to proper dimensions during the first call of the ''plan_path_incremental'' method based on the dimensions of the passed ''OccupancyGrid'' map. The dimensions of the map are fixed and won't change.
- The ''plan_path_incremental'' function returns as a debugging output the array of $rhs$ and $g$ values used for proper annotation of the visualized grid map
- The ''plan_path_incremental'' method is called whenever the path is obstructed, which forces an update of the occupancy grid map that is passed to the ''plan_path_incremental'' method as the ''grid_map'' parameter. The planner has to figure out what has been changed in the map and replan accordingly, i.e., it has to figure out the coordinates of the updated cells, update all the affected $rhs$ and $g$ values in its neighborhoods (8-neighborhood is used for the algorithm) and recompute the shortest path
----
=== Approach ===
Follow the pseudocode for the D* lite algorithm described in {{courses:uir:lectures:b4m36uir-lec03-slides.pdf| Lecture 3. Path planning}}, also to be read in the related paper [[https://www.cs.cmu.edu/~maxim/files/dlite_tro05.pdf|Koenig, S. and Likhachev, M. (2005): Fast Replanning for Navigation in Unknown Terrain. T-RO.]]
++++ Detailed description, guidance, and suggestions (Click to view) |
As it is shown above in the prescription of the function, the incremental planning can be achieved by simply replaning each time the map changes and the currently planned path is blocked. However, in this task we want to reuse what has been already calculated and only adjust the current plan to deal with the new observations.
The D* lite stores values of one-step lookahead objective function $rhs$ and objective function $g$ for individual map cells between individual runs of the ''plan_path_incremental'' method. Therefore it is suggested to create the storage variables for these values either as static variables of the method, or during the first run time of the function in the similar way shown above, or during the initialization of the class as:
...
class HexapodExplorer:
def __init__(self):
self.gridmap = None
self.rhs = None
self.g = None
...
Then you have a persistent storage for gridmap data, $rhs$, and $g$ values. Note that storage of gridmap is because of detecting both the addition of the new obstacle, which can be detected as well in case the input ''grid_map'' has ''1'' at the place of non-infinite $rhs$ value, but also subtraction of the obstacle which is much harder to detect using $rhs$ values only.
The next step cope with the pseudocode described in {{courses:uir:lectures:b4m36uir-lec03-slides.pdf| Lecture 3. Path planning}}.
As a suggestion, prepare the following helper functions and then fill them gradually with code. Note, that inside the ''HexapodExplorer'' class we are using simple coordinate tuples to index the gridmap, $rhs$ and $g$ arrays.
def calculate_key(self, coord, goal):
"""method to calculate the priority queue key
Args: coord: (int, int) - cell to calculate key for
goal: (int, int) - goal location
Returns: (float, float) - major and minor key
"""
#think about different heuristics and how they influence the steering of the algorithm
#heuristics = 0
#heuristics = L1 distance
#heuristics = L2 distance
return 0
def compute_shortest_path(self, start, goal):
"""Function to compute the shortest path
Args: start: (int, int) - start position
goal: (int, int) - goal position
"""
#while not finished
#fetch coordinates u from priority queue
#if g value at u > rhs value at u:
#node is over consistent
#else:
# node is under consistent
def update_vertex(self, u, start, goal):
""" Function for map vertex updating
Args: u: (int, int) - currently processed position
start: (int, int) - start position
goal: (int, int) - goal position
"""
pass
def reconstruct_path(self, start, goal):
"""Function to reconstruct the path
Args: start: (int, int) - start position
goal: (int, int) - goal position
Returns: Path - the path
"""
return None
def neighbors8(self, coord):
"""Returns coordinates of passable neighbors of the given cell in 8-neighborhood
Args: coord : (int, int) - map coordinate
Returns: list (int,int) - list of neighbor coordinates
"""
return (0,0)
You will also need a priority queue. A possible implementation of the priority queue follows.
#priority queue class
class PriorityQueue:
def __init__(self):
self.elements = []
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item))
def get(self):
return heapq.heappop(self.elements)[1]
def top(self):
u = self.elements[0]
return u[1]
def contains(self, element):
ret = False
for item in self.elements:
if element == item[1]:
ret = True
break
return ret
def print_elements(self):
print(self.elements)
return self.elements
def remove(self, element):
i = 0
for item in self.elements:
if element == item[1]:
self.elements[i] = self.elements[-1]
self.elements.pop()
heapq.heapify(self.elements)
break
i += 1
As the most important remark, build your code incrementally and check the results using the visualizations and debug prints. Remember that you are implementing an **optimal** (*) planner!! I.e., you should get an optimal path during each iteration of the algorithm!!
++++
----
=== Example behavior ===
The following videos show the example behavior of the D* lite planner with steering heuristics $h=0$ on the example scenarios.
| {{ :courses:uir:hw:t1x-dstar-v1.mp4 |}} | {{ :courses:uir:hw:t1x-dstar-v2.mp4 |}} | {{ :courses:uir:hw:t1x-dstar-v3.mp4 |}} |
----
=== Evaluation ===
The code can be evaluated using the following script (also attached as ''t1e-expl.py'').
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import sys
import os
import math
import numpy as np
sys.path.append('hexapod_robot')
sys.path.append('hexapod_explorer')
#import hexapod robot and explorer
import HexapodRobot as hexapod
import HexapodExplorer as explorer
#import communication messages
from messages import *
def plot_path(ax, path, clr):
""" simple method to draw the path
"""
if path is not None:
poses = np.asarray([(pose.position.x,pose.position.y) for pose in path.poses])
ax.plot(poses[:,0], poses[:,1], '.',color=clr)
ax.plot(poses[:,0], poses[:,1], '-',color=clr)
def plot_dstar_map(ax, grid_map, rhs_=None, g_=None):
"""method to plot the gridmap with rhs and g values
"""
#plot the gridmap
gridmap.plot(ax)
plt.grid(True, which='major')
rhs = rhs_.reshape(grid_map.height, grid_map.width)
g = g_.reshape(grid_map.height, grid_map.width)
#annotate the gridmap graph with g and rhs values
if g is not None and rhs is not None:
for i in range(0, gridmap.width):
for j in range(0, gridmap.height):
ax.annotate("%.2f" % g[i,j], xy=(i+0.1, j+0.2), color="red")
ax.annotate("%.2f" % rhs[i,j], xy=(i+0.1, j+0.6), color="blue")
def check_collision(gridmap, pose, scenario):
""" method to check whether the robot is in collision with newly discovered obstacle
"""
if to_grid(gridmap, pose) in scenario:
return True
else:
return False
def to_grid(grid_map, pose):
"""method to transform world coordinates to grid coordinates
"""
cell = ((np.asarray((pose.position.x, pose.position.y)) - (grid_map.origin.position.x, grid_map.origin.position.y)) / grid_map.resolution)
return (int(cell[0]), int(cell[1]))
if __name__=="__main__":
#define planning problems:
scenarios = [([(1, 0), (1, 1), (1, 2), (1, 3), (2, 3), (3, 3), (4, 3), (4, 2), (4, 1), (5, 1), (6, 1), (6, 2), (6, 3), (6, 4), (6, 5), (6, 6), (5, 6), (4, 6), (3, 6), (2, 6), (1, 6), (7, 1), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), (8, 8), (7, 8), (6, 8), (5, 8), (4, 8), (3, 8), (2, 8), (1, 8)]),
([(1, 0), (1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (3, 9), (3, 8), (3, 7), (3, 6), (3, 5), (3, 4), (3, 3), (3, 2), (3, 1), (5, 0), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7), (5, 8), (7, 9), (7, 8), (7, 7), (7, 6), (7, 5), (7, 4), (7, 3), (7, 2), (7, 1), (8, 1)]),
([(1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (7, 1), (8, 1), (6, 1), (8, 2), (8, 3), (8, 4), (7, 4), (7, 5), (7, 6), (9, 6), (8, 6), (8, 7), (8, 8), (7, 8), (6, 8), (5, 8), (4, 8), (3, 8), (2, 8), (1, 8), (1, 7), (1, 6), (1, 4), (1, 3), (1, 2)])]
start = Pose(Vector3(4.5, 4.5, 0),Quaternion(0,0,0,1))
goal = Pose(Vector3(9.5, 5.5, 0),Quaternion(0,0,0,1))
#prepare plot
fig, ax = plt.subplots(figsize=(10,10))
plt.ion()
cnt = 0
#fetch individual scenarios
for scenario in scenarios:
robot_odometry = Odometry()
robot_odometry.pose = start
robot_path = Path()
robot_path.poses.append(start)
#instantiate the explorer robot
explor = explorer.HexapodExplorer()
#instantiate the map
gridmap = OccupancyGrid()
gridmap.resolution = 1
gridmap.width = 10
gridmap.height = 10
gridmap.origin = Pose(Vector3(0,0,0), Quaternion(0,0,0,1))
gridmap.data = np.zeros((gridmap.height*gridmap.width))
while not (robot_odometry.pose.position.x == goal.position.x and
robot_odometry.pose.position.y == goal.position.y):
#plan the route from start to goal
path, rhs, g = explor.plan_path_incremental(gridmap, robot_odometry.pose, goal)
if path == None:
print("Destination unreachable")
break
if len(path.poses) < 1:
print("There is nowhere to go")
break
#check for possibility of the move
if check_collision(gridmap, path.poses[1], scenario):
#add the obstacle into the gridmap
cell = to_grid(gridmap, path.poses[1])
data = gridmap.data.reshape(gridmap.height, gridmap.width)
data[cell[1],cell[0]] = 1
gridmap.data = data.flatten()
else:
#move the robot
robot_odometry.pose = path.poses[1]
robot_path.poses.append(robot_odometry.pose)
#plot it
plt.cla()
plot_dstar_map(ax, gridmap, rhs, g)
plot_path(ax, path, 'r')
plot_path(ax, robot_path, 'b')
robot_odometry.pose.plot(ax)
plt.xlabel('x[m]')
plt.ylabel('y[m]')
plt.axis('square')
#optional save of the image for debugging purposes
#fig.savefig("result/%04d.png" % cnt, bbox_inches='tight')
cnt += 1
plt.show()
plt.pause(0.1)