|
@@ -2,8 +2,9 @@
|
|
|
Implements the A* algorithm
|
|
Implements the A* algorithm
|
|
|
|
|
|
|
|
usage:
|
|
usage:
|
|
|
|
|
+
|
|
|
grid = SquareGrid(30, 30)
|
|
grid = SquareGrid(30, 30)
|
|
|
- p = path(grid, (1, 6), (3, 9))
|
|
|
|
|
|
|
+ p = Pathfinder.a_star(grid, (1, 6), (3, 9))
|
|
|
>> [(2, 7), (3, 8), (3, 9), (3, 9)]
|
|
>> [(2, 7), (3, 8), (3, 9), (3, 9)]
|
|
|
|
|
|
|
|
* 'grid': Grid object
|
|
* 'grid': Grid object
|
|
@@ -14,6 +15,9 @@
|
|
|
'''
|
|
'''
|
|
|
import heapq
|
|
import heapq
|
|
|
|
|
|
|
|
|
|
+from pypog.geometry_objects import BaseGeometry
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
class NoPathFound(Exception):
|
|
class NoPathFound(Exception):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
@@ -24,70 +28,81 @@ class Node(tuple):
|
|
|
n.cost = 0
|
|
n.cost = 0
|
|
|
return n
|
|
return n
|
|
|
|
|
|
|
|
-def path(grid, origin, target, include_origin):
|
|
|
|
|
|
|
+class NodesHeap():
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ self._lst = []
|
|
|
|
|
+
|
|
|
|
|
+ def push(self, node, priority):
|
|
|
|
|
+ heapq.heappush(self._lst, (priority, node))
|
|
|
|
|
+
|
|
|
|
|
+ def pop(self):
|
|
|
|
|
+ return heapq.heappop(self._lst)[1]
|
|
|
|
|
|
|
|
- # list of checked nodes
|
|
|
|
|
- nodes = []
|
|
|
|
|
|
|
|
|
|
- # starting node, cost is 0 and parent is None
|
|
|
|
|
- origin = Node(*origin)
|
|
|
|
|
|
|
+class Pathfinder():
|
|
|
|
|
|
|
|
- # append 'origin' to nodes, with priority 0
|
|
|
|
|
- heapq.heappush(nodes, (0, origin))
|
|
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def a_star(grid, origin, target):
|
|
|
|
|
|
|
|
- # while there are unchecked nodes , process
|
|
|
|
|
- while nodes:
|
|
|
|
|
|
|
+ BaseGeometry.assertCoordinates(origin, target)
|
|
|
|
|
|
|
|
- # pop the node with the lowest priority (cost) from the list,
|
|
|
|
|
- current = heapq.heappop(nodes)[1]
|
|
|
|
|
|
|
+ # list of checked nodes
|
|
|
|
|
+ nodes = NodesHeap()
|
|
|
|
|
|
|
|
- # early exit
|
|
|
|
|
- if current == target:
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ # starting node, cost is 0 and parent is None
|
|
|
|
|
+ origin = Node(*origin)
|
|
|
|
|
|
|
|
- for x, y in grid.neighbors(*current):
|
|
|
|
|
|
|
+ # append 'origin' to nodes, with priority 0
|
|
|
|
|
+ nodes.push(origin, 0)
|
|
|
|
|
|
|
|
- node = Node(x, y, current)
|
|
|
|
|
|
|
+ # while there remains unchecked nodes , process
|
|
|
|
|
+ while nodes:
|
|
|
|
|
|
|
|
- # get the moving cost to this node
|
|
|
|
|
- movingcost = grid.movingcost(*current, *node)
|
|
|
|
|
- if movingcost < 0:
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ # pop the node with the lowest priority (cost) from the list,
|
|
|
|
|
+ current = nodes.pop()
|
|
|
|
|
|
|
|
- # cost of the node is the accumulated cost from origin
|
|
|
|
|
- node.cost = current.cost + movingcost
|
|
|
|
|
|
|
+ # early exit
|
|
|
|
|
+ if current == target:
|
|
|
|
|
+ break
|
|
|
|
|
|
|
|
- # check if there is already a node with a lower cost
|
|
|
|
|
- try:
|
|
|
|
|
- index = nodes.index(node)
|
|
|
|
|
- if nodes[index].cost > node.cost:
|
|
|
|
|
- del nodes[index]
|
|
|
|
|
- else:
|
|
|
|
|
|
|
+ for x, y in grid.neighbors(*current):
|
|
|
|
|
+
|
|
|
|
|
+ node = Node(x, y, current)
|
|
|
|
|
+
|
|
|
|
|
+ # get the moving cost to this node
|
|
|
|
|
+ movingcost = grid.movingcost(*current, *node)
|
|
|
|
|
+ if movingcost < 0:
|
|
|
continue
|
|
continue
|
|
|
- except ValueError:
|
|
|
|
|
- pass
|
|
|
|
|
|
|
|
|
|
- # compute the cost of the node
|
|
|
|
|
- priority = node.cost + grid.geometry.manhattan(*node, *target)
|
|
|
|
|
|
|
+ # cost of the node is the accumulated cost from origin
|
|
|
|
|
+ node.cost = current.cost + movingcost
|
|
|
|
|
+
|
|
|
|
|
+ # priority of the node is the sum of its cost and distance to target
|
|
|
|
|
+ # (the lower the better)
|
|
|
|
|
+ priority = node.cost + grid.geometry.manhattan(*node, *target)
|
|
|
|
|
+
|
|
|
|
|
+ # (?) would it be necessary to check if there is already a
|
|
|
|
|
+ # node with same coordinates and a lower priority?
|
|
|
|
|
|
|
|
- # append to the checked nodes list
|
|
|
|
|
- heapq.heappush(nodes, (priority, node))
|
|
|
|
|
- else:
|
|
|
|
|
- # all the reachable nodes hve been checked, no way found to the target
|
|
|
|
|
- raise NoPathFound("no path were found to the targetted location {}".format(target))
|
|
|
|
|
|
|
+ # append to the checked nodes list
|
|
|
|
|
+ nodes.push(node, priority)
|
|
|
|
|
+ else:
|
|
|
|
|
+ # all the reachable nodes hve been checked, no way found to the target
|
|
|
|
|
+ raise NoPathFound("no path were found to the targetted location {}".format(target))
|
|
|
|
|
|
|
|
- # build the result
|
|
|
|
|
- result = [target]
|
|
|
|
|
- while current != origin:
|
|
|
|
|
- result.append(tuple(current))
|
|
|
|
|
- current = current.parent
|
|
|
|
|
- result.reverse()
|
|
|
|
|
|
|
+ # build the result by going back up from target to origin
|
|
|
|
|
+ result = [target]
|
|
|
|
|
+ while current != origin:
|
|
|
|
|
+ result.append(tuple(current))
|
|
|
|
|
+ current = current.parent
|
|
|
|
|
+ result.append(origin)
|
|
|
|
|
+ result.reverse()
|
|
|
|
|
|
|
|
- return result
|
|
|
|
|
|
|
+ return result
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
|
from pypog.grid_objects import SquareGrid
|
|
from pypog.grid_objects import SquareGrid
|
|
|
grid = SquareGrid(30, 30)
|
|
grid = SquareGrid(30, 30)
|
|
|
- p = path(grid, (1, 6), (3, 9))
|
|
|
|
|
|
|
+ p = Pathfinder.a_star(grid, (1, 6), (3, 9))
|
|
|
print(p)
|
|
print(p)
|
|
|
|
|
|