Quellcode durchsuchen

test new chained pathfinder

olinox14 vor 3 Jahren
Ursprung
Commit
d8c0b36037
2 geänderte Dateien mit 250 neuen und 33 gelöschten Zeilen
  1. 131 33
      cultist_war/main.py
  2. 119 0
      cultist_war/pathfinder.py

+ 131 - 33
cultist_war/main.py

@@ -39,6 +39,18 @@ class PathNode(tuple):
         return f"<{self[0]}, {self[1]}, c:{self.cost}>"
 
 
+class DiscoveryNode(tuple):
+    def __new__(cls, x, y, ancestors=None, matches=None):
+        n = tuple.__new__(cls, (x, y))
+        n.ancestors = ancestors if ancestors is not None else []
+        n.cost = 0
+        n.matches = matches if matches is not None else []
+        return n
+
+    def __repr__(self):
+        return f"<{self[0]}, {self[1]}, c:{self.cost}>"
+
+
 class Queue(BaseClass):
     def __init__(self):
         self.items = []
@@ -201,7 +213,16 @@ class Grid(BaseClass):
     def update_threat_map(self):
         self.threat = {(x, y): 0 for x in range(self.width) for y in range(self.height)}
 
-        for u in self.opponent_cultists():
+        sources = self.opponent_cultists()
+
+        # On ajoute les neutres voisins du leader ennemi aux sources de menace possible
+        opponent_cult_leader = self.opponent_cult_leader()
+        if opponent_cult_leader:
+            for pos in self.neighbors(*opponent_cult_leader.pos):
+                if pos in self.index and self.index[pos].neutral:
+                    sources.append(self.index[pos])
+
+        for u in sources:
             shooting_zone = self.zone(u.pos, Unit.SHOOTING_RANGE)
             for x, y in shooting_zone:
                 dist = shooting_zone[(x, y)]
@@ -210,6 +231,9 @@ class Grid(BaseClass):
                     continue
 
                 threat = Unit.SHOOTING_RANGE + 1 - dist
+                if u.neutral:
+                    threat //= 2
+
                 if threat > self.threat[(x, y)]:
                     self.threat[(x, y)] = threat
 
@@ -259,11 +283,14 @@ class Grid(BaseClass):
     def allied_cultists(self):
         return [u for u in self.units.values() if type(u) is not CultLeader and u.owned]
 
+    def owned_units(self):
+        return [u for u in self.units.values() if u.owned]
+
     def opponent_units(self):
         return [u for u in self.units.values() if u.opponent]
 
     def opponent_cult_leader(self):
-        return [u for u in self.units.values() if type(u) is CultLeader and not u.owned]
+        return next((u for u in self.units.values() if type(u) is CultLeader and not u.owned), None)
 
     def opponent_cultists(self):
         return [u for u in self.units.values() if type(u) is not CultLeader and u.opponent]
@@ -276,25 +303,36 @@ class Grid(BaseClass):
 
         k_convert_neutrals = 10
         k_convert_danger = 30
-        k_shoot_opponent_cultist = 20
-        k_shoot_opponent_cult_leader = 10
-        k0_protect_cult_leader = 10
+        k_shoot_opponent_cultist = 10
+        k_shoot_opponent_cult_leader = 5
+        k0_protect_cult_leader = 30
+        k_protect_threat_level = -5
+        k_cover_threat = 10
+        k_cover_interest = -3
         k0_position = 50
         k_position_distance = 10
-        k_position_heat = -5
+        k_position_heat = -2
         k_position_danger = 5
+        k_position_advantage = 10
 
         cult_leader = self.cult_leader()
+        opponent_cult_leader = self.opponent_cult_leader()
 
-        # Conversion des neutres
-        if cult_leader:
-            results = self.discover(
+        log(self.obstacles)
+        log([n.pos for n in self.neutrals()])
+
+        # compute conversion paths
+        conversion_paths = []
+        if cult_leader and self.neutrals():
+            conversion_paths = self.discover(
                 cult_leader.pos,
                 key=(lambda pos: pos in self.index and self.index[pos].neutral),
-                limit=5
+                limit=min(5, len(self.neutrals()))
             )
 
-            for path, target_pos in results:
+        # Conversion des neutres
+        if cult_leader:
+            for path, target_pos in conversion_paths:
                 target = self.index[target_pos]
 
                 priority = 0
@@ -308,8 +346,14 @@ class Grid(BaseClass):
                 actions.put(priority, action)
 
         # Attaque d'unités ennemies
+        targets = self.opponent_cultists()
+        if opponent_cult_leader:
+            targets.append(opponent_cult_leader)
+
+        advantage = sum([t.hp for t in targets]) < sum([u.hp for u in self.owned_units()])
+
         for a in self.allied_cultists():
-            for u in self.opponent_cultists():
+            for u in targets:
                 shooting_distance = self.shooting_distance(a.pos, u.pos)
                 if shooting_distance and shooting_distance < u.SHOOTING_RANGE:
                     action = ActionShoot(a, u)
@@ -337,7 +381,7 @@ class Grid(BaseClass):
                 priority += k_position_distance * len(path)
                 priority += k_position_danger * sum(self.threat[p] for p in path)
 
-                action = ActionMove(a, path[0], f'pos on {path[0]}')
+                action = ActionMove(a, path[0], f'pos on {target_pos}')
 
                 actions.put(priority, action)
 
@@ -345,15 +389,20 @@ class Grid(BaseClass):
         if cult_leader:
             current_threat = self.threat[cult_leader.pos]
             if current_threat:
-                target = min(
-                    [n for n in self.neighbors(*cult_leader.pos) if self.can_move_on(n)],
-                    key=lambda x: self.threat[x]
-                )
+                covers = [n for n in self.neighbors(*cult_leader.pos) if self.can_move_on(n)]
 
-                action = ActionMove(cult_leader, target)
-                priority = k0_protect_cult_leader
+                for pos in covers:
+                    action = ActionMove(cult_leader, pos, 'take cover')
 
-                actions.put(priority, action)
+                    # interest is the number of conversion paths that start with this pos
+                    interest = len([p for p in conversion_paths if p[0] and p[0][0] == pos])
+
+                    priority = k0_protect_cult_leader
+                    priority += k_protect_threat_level * current_threat
+                    priority += k_cover_threat * self.threat[pos]
+                    priority += k_cover_interest * interest
+
+                    actions.put(priority, action)
 
         return actions
 
@@ -491,7 +540,7 @@ class Grid(BaseClass):
 
         nodes = []
         its, break_on = 0, 2000
-        origin = PathNode(*start)
+        origin = DiscoveryNode(*start)
 
         nodes.append(origin)
 
@@ -503,32 +552,71 @@ class Grid(BaseClass):
             for pos in neighbors:
                 its += 1
                 if its > break_on:
-                    log("<!> discovering ended earlier than expected")
+                    log(f"<!> discovery broke, {len(paths)} results")
                     return paths
 
                 if key(pos):
-                    path = []
-                    previous = current
-                    while previous:
-                        if previous != start:
-                            path.insert(0, previous)
-                        previous = previous.parent
+                    path = current.ancestors[:-1:-1] + [current]  # reverse ancestors after having removed the start
                     paths.append((path, pos))
 
                     if len(paths) >= limit:
                         return paths
                     continue
 
-                if pos == current.parent:
+                if pos in current.ancestors:
                     continue
 
                 if not self.can_move_on(pos):
                     continue
 
-                node = PathNode(*pos, current)
+                node = DiscoveryNode(*pos, current.ancestors + [current])
                 nodes.append(node)
         return paths
 
+    def discover_multiple(self, start, key, limit=5):
+        nodes = Queue()
+        its, break_on = 0, 2000
+        origin = DiscoveryNode(*start)
+
+        nodes.put(0, origin)
+
+        while nodes:
+            current = nodes.get()
+
+            neighbors = self.neighbors(*current)
+
+            for pos in neighbors:
+                its += 1
+                if its > break_on:
+                    log(f"<!> discovery broke")
+                    break
+
+                if pos not in current.matches and key(pos):
+                    current.matches.append(pos)
+                    if len(current.matches) >= limit:
+                        break
+
+                if not self.can_move_on(pos):
+                    continue
+
+                node = DiscoveryNode(
+                    *pos,
+                    current.ancestors + [current],
+                    current.ancestors[-1].matches if current.ancestors else None
+                )
+
+                # on priorise le plus haut ratio matches / longueur de chemin (donc le plus bas inverse, pour la Queue).
+                priority = (10 * (len(node.ancestors) + 1)) // (1 + len(node.matches))
+
+                nodes.put(priority, node)
+
+            if len(current.matches) >= limit or its > break_on:
+                break
+
+        best_node = nodes.get()
+        path = best_node.ancestors[:-1:-1] + [best_node]
+        return path, best_node.matches
+
     def _repr_cell(self, pos):
         # return f"{self.control[pos]}/{self.threat[pos]}"
         # return self.heat_map[pos]
@@ -557,7 +645,15 @@ GRID.obstacles = [(x, y) for y, row in enumerate(obstacles_input) for x, val in
 GRID.pre_compute()
 
 while 1:
-    # TODO: prendre en compte le terrain dans la ligne de visée et les déplacements
+    # TODO : en cas de choix entre plusieurs neutres à convertir, privilégier un neutre se trouvant entre un ennemi et le leader
+    # TODO: Laisser l'algo de découverte des cibles à convertir passer outre les alliés, ajouter une action "dégager le passage" pour ces unités
+    #       (prévoir le cas où cet allié ne peut pas se dégager). Le fait de passer outre un allié doit augmenter le coût de mouvement de 1
+    # TODO : Etre plus prudent dans le positionnement des unités; ne pas s'attaquer à plus fort que soi, et décourager plus fortement l'entrée dans une
+    #        zone de menace
+    # TODO: donner un coup à l'action "ne rien faire", car c'est parfois la meilleure chose à faire...
+    # TODO: ajouter une action "s'interposer" où une unité s'interpose entre le leader et un ennemi ; à mettre en balance avec le 'take cover'
+    # TODO: remplacer le discover par un algo qui cherche le plus court chemin pour relier tous les neutres les uns après les autres
+
     log(f"start round {GRID.round}")
 
     GRID.reinit_round()
@@ -567,8 +663,8 @@ while 1:
 
     actions = GRID.list_actions()
 
-    # for action in actions.items:
-    #     log(f"* {action}")
+    for action in actions.items:
+        log(f"* {action}")
 
     # print("\n" + GRID.graph(), file=sys.stderr)
 
@@ -582,3 +678,5 @@ while 1:
 
     action.exec()
     GRID.round += 1
+
+

+ 119 - 0
cultist_war/pathfinder.py

@@ -0,0 +1,119 @@
+import heapq
+import time
+
+t0 = time.time()
+
+
+class DiscoveryNode(tuple):
+    def __new__(cls, x, y, cost=0, ancestors=None, matches=None):
+        n = tuple.__new__(cls, (x, y))
+        n.cost = cost
+        n.ancestors = ancestors if ancestors is not None else []
+        n.matches = matches if matches is not None else []
+        return n
+
+    def __repr__(self):
+        return f"<{self[0]}, {self[1]}>"
+
+
+class Queue:
+    def __init__(self):
+        self.items = []
+
+    def __bool__(self):
+        return bool(self.items)
+
+    def __repr__(self):
+        return str(self.items)
+
+    def values(self):
+        return (v for _, v in self.items)
+
+    def put(self, priority, item):
+        while priority in [p for p, _ in self.items]:
+            priority += 1
+        heapq.heappush(self.items, (priority, item))
+
+    def get(self):
+        return heapq.heappop(self.items)[1]
+
+    def get_items(self):
+        return heapq.heappop(self.items)
+
+
+def log(*msg):
+    print("{} - ".format(str(time.time() - t0)[:5]), *msg)
+
+
+start = (0, 3)
+obstacles = [(0, 0), (1, 0), (3, 0), (9, 0), (11, 0), (12, 0), (0, 1), (1, 1), (5, 1), (7, 1), (11, 1), (12, 1), (3, 3), (9, 3)]
+neutrals = [(1, 4), (2, 3), (1, 5), (3, 5), (2, 6), (4, 1), (11, 4), (10, 3), (11, 5), (9, 5), (10, 6), (8, 1)]
+
+width = 13
+height = 7
+
+
+def get_neighbors(x, y):
+    return [(xn, yn) for xn, yn in [(x, y - 1), (x - 1, y), (x + 1, y), (x, y + 1)] if
+                                       0 <= xn < width and 0 <= yn < height]
+
+
+def can_move_on(pos):
+    return pos not in obstacles and pos not in neutrals
+
+
+def discover_multiple(start, key, limit=5):
+    nodes = Queue()
+    its, break_on = 0, 20000
+    origin = DiscoveryNode(*start)
+
+    nodes.put(0, origin)
+
+    while nodes:
+        current = nodes.get()
+
+        neighbors = get_neighbors(*current)
+
+        for pos in neighbors:
+            its += 1
+            if its > break_on:
+                log(f"<!> discovery broke")
+                break
+
+            if not can_move_on(pos):
+                continue
+
+            moving_cost = 1
+
+            matches = []
+            for n in get_neighbors(*pos):
+                if n not in current.matches and key(n):
+                    matches.append(n)
+
+            cost = current.cost + moving_cost
+            priority = 100 * cost - (200 * (len(current.matches) + len(matches)))
+            priority += 10 * len([a for a in current.ancestors if a == pos])  # décourage de revenir à une case visitée
+
+            node = DiscoveryNode(
+                pos[0],
+                pos[1],
+                cost,
+                current.ancestors + [current],
+                current.matches + matches
+            )
+
+            nodes.put(priority, node)
+
+        if len(current.matches) >= limit or its > break_on:
+            break
+
+    best_node = nodes.get()
+    path = best_node.ancestors[1:] + [best_node]
+    return path, best_node.matches
+
+
+log("start")
+
+res = discover_multiple(start, key=lambda x: x in neutrals, limit=6)
+
+log(res)