""" Created on 7 mai 2020 @author: olinox """ import sys import time import heapq # TODO: faire un simple pathfinding en cas d'echec au calcul de route # TODO: prévoir un plan fallback quand pas de chemin debug = False t0 = time.time() LOG_INPUT = (0 and __file__[-8:] != 's2020.py') HIJACK_INPUT = (1 and __file__[-8:] == 's2020.py') _force_input = [] if HIJACK_INPUT: _force_inputdef l_input(*args): if HIJACK_INPUT: if not _force_input: print('no more hijack') sys.exit() inp = _force_input[0].pop(0) if len(_force_input[0]) == 0: del _force_input[0] else: inp = input() INPUTS.append(inp) return inp def log(*msg): if debug: print("{} - ".format(str(time.time() - t0)[1:5]), *msg, file=sys.stderr) sys.stderr.flush() # OWNER ME = 0 OPPONENT = 1 # Base classes class Base(): def __repr__(self): return f"<{self.__class__.__name__}: {self.__dict__}>" class Queue(Base): def __init__(self, inv=False): self.items = [] def __bool__(self): return bool(self.items) def __repr__(self): return str(self.items) def values(self): return (v for _, v in self.items) def put(self, item, priority): heapq.heappush(self.items, (priority, item)) def fput(self, item, priority): while priority in [p for p, _ in self.items]: priority += 1 self.put(item, priority) def get(self): return heapq.heappop(self.items)[1] def get_items(self): return heapq.heappop(self.items) class Node(Base): def __init__(self, pos, path=None): self.pos = pos self.path = path or [] class PathNode(tuple): def __new__(self, x, y, parent=None): n = tuple.__new__(self, (x, y)) n.parent = parent n.cost = 0 return n def __repr__(self): return f"<{self[0]}, {self[1]}, c:{self.cost}>" class RiskNode(tuple): def __new__(self, x, y, parent=None): n = tuple.__new__(self, (x, y)) n.parent = parent n.proba = 0 return n def __repr__(self): return f"<{self[0]}, {self[1]}, p:{self.proba}>" class RouteNode(): def __init__(self, segment, parent=None): self.segment = segment self.parent = parent self.value = 0 self.cost = 0 self.visited = [] def __repr__(self): return f"<{self.segment}, c:{self.cost}, v:{self.value}>" # Script classes class Player(): def __init__(self, id_): self.id_ = id_ self.score = 0 self.pacs = [] UNKNOWN_CELL = "?" WALL_CELL = "#" FLOOR_CELL = " " class Cell(Base): def __init__(self, x, y, type_=UNKNOWN_CELL): self.x = x self.y = y self.type_ = type_ self._value = 1 # on part du principe que toutes les cases contiennent au moins une pastille au départ self.status = UNOCCUPIED self.neighbors = [] # only movable self.seen = False @property def pos(self): return (self.x, self.y) @property def unknown(self): return self.type_ == UNKNOWN_CELL or (self.is_floor and not self.seen) @property def is_floor(self): return self.type_ == FLOOR_CELL @property def movable(self): return self.is_floor @property def value(self): return self._value @value.setter def value(self, value): self._value = value self.seen = True def print_(self): if self.unknown: return "?" elif self.movable: return {0: " ", 1: "*", 10: "$"}[self.value] else: return WALL_CELL class Segment(Base): REPR = "?" uid = 0 def __init__(self, cells): self.id_ = Segment.new_uid() self._cells = [] self.cells = cells # clockwise self.neighbors = [] self.status = UNOCCUPIED @property def cells(self): return self._cells @cells.setter def cells(self, cells): self._cells = cells self.coords = [c.pos for c in cells] self.length = len(cells) self.update() def update_status(self): self.status = max(c.status for c in self.cells) def update(self): self.value = sum(c.value for c in self.cells if not c.status == TARGETTED) self.update_status() def print_(self): return self.REPR @classmethod def new_uid(cls): cls.uid += 1 return cls.uid def __repr__(self): return f"<{self.__class__.__name__} {self.id_}: {'-'.join([str(c.pos) for c in self.cells])}>" def go_trough(self, from_): ifrom = self.coords.index(from_) if ifrom == 0: return list(self.coords) elif ifrom == (len(self.coords) - 1): return self.coords[::-1] else: raise ValueError(f"{from_} not in segment") class StartingPos(Segment): REPR = "O" def __init__(self, cell, pac_id): super().__init__([cell]) self.pac_id = pac_id def cell(self): return self.cells[0] class Intersection(Segment): REPR = "+" def __init__(self, cell): super().__init__([cell]) def cell(self): return self.cells[0] class Corridor(Segment): REPR = "-" class LoopCorridor(Corridor): REPR = "~" class Deadend(Segment): REPR = "." class Grid(Base): MAX_ITS = 360 def __init__(self, width, height): self.width = width self.height = height self.cells = {} self.pacs = {} self.pellets = {} self.risk = {} self.segments = [] self.owned_pacs = [] self.opponent_pacs = [] self.segment_index = {} self.pacs_index = {} self.path_cache = {} self.consumed_its = 0 @property def grid(self): return [[self.cells[(x, y)].print_() for x in range(self.width)] for y in range(self.height)] def print_(self): return "\n" + "\n".join(["".join([c for c in row]) for row in self.grid]) def __getitem__(self, key): return self.cells[key] @classmethod def from_matrix(cls, matrix): width = len(matrix[0]) height = len(matrix) cells = {(x, y): Cell(x, y, v) for y, r in enumerate(matrix) for x, v in enumerate(r)} grid = Grid(width, height) grid.cells = cells for pos, cell in grid.cells.items(): cell.neighbors = [] for n in grid.neighbors(*pos): try: if grid.cells[n].movable: cell.neighbors.append(n) except KeyError: log(f'/!\ {n} not in cells') return grid def segmentize(self): self.segments = [] self.segment_index = {} segments = [] # intersections and starting points pivots = [] pivots_coords = set() for pos, cell in self.cells.items(): if pos in self.pacs_index: segment = StartingPos(cell, self.pacs_index[pos].id) segments.append(segment) self.segment_index[pos] = segment pivots.append(segment) pivots_coords.add(pos) elif cell.movable and len(cell.neighbors) >= 3: segment = Intersection(cell) segments.append(segment) self.segment_index[pos] = segment pivots.append(segment) pivots_coords.add(pos) # corridors and deadends for pivot in pivots: pivot_cell = pivot.cell() for n in pivot_cell.neighbors: if n in self.segment_index: continue nc = self.cells[n] buffer = [] while nc: buffer.append(nc) if nc.pos in pivots_coords: # corridor ou startingpos segment = Corridor(buffer[:-1]) break elif len(nc.neighbors) == 1: segment = Deadend(list(buffer)) break elif len(nc.neighbors) == 2: nextnc = next((self.cells[p] for p in nc.neighbors if self.cells[p] not in buffer and p != pivot_cell.pos), None) if nextnc is None: # c'est une boucle, on vient de revenir au point de depart segment = LoopCorridor(list(buffer)) break nc = nextnc else: log('what happened??') for c in segment.cells: self.segment_index[c.pos] = segment segments.append(segment) # connect for pivot in pivots: pivot_cell = pivot.cell() for n in pivot_cell.neighbors: pivot.neighbors.append(n) neighbor_seg = self.segment_index[n] if type(neighbor_seg) not in (Intersection, StartingPos): neighbor_seg.neighbors.append(pivot_cell.pos) self.segments = segments for s in self.segments: s.update() def segment_neighbors(self, seg): return [self.segment_index[n] for n in seg.neighbors] def print_segments(self): rows = [] for y in range(self.height): row = [] for x in range(self.width): try: row.append(self.segment_index[(x, y)].print_()) except KeyError: row.append("#") rows.append(row) return "\n".join(["".join([c for c in row]) for row in rows]) def update(self, visible_pacs, visible_pellets): self.consumed_its = 0 for c in self.cells.values(): c.status = UNOCCUPIED # update pacs self.pacs_index = {} me.pacs = [] for a in visible_pacs: pac_id, is_mine, x, y, type_id, speed_turns_left, ability_cooldown = a if is_mine: id_ = pac_id else: id_ = -1 * (pac_id + 1) if not id_ in grid.pacs: pac = Pac(id_, is_mine) self.pacs[id_] = pac else: pac = self.pacs[id_] pac.update(x, y, type_id, speed_turns_left, ability_cooldown) self.cells[(x, y)].status = ALLY_HERE if is_mine else ENNEMY_HERE self.pacs_index[(x, y)] = pac if is_mine: me.pacs.append(pac) else: if not pac in opponent.pacs: opponent.pacs.append(pac) # update threats for pac in me.pacs: for ennemy in opponent.pacs: dist = 2 if not pac.is_accelerated else 3 scope = self.scope(ennemy.pos, dist) if pac.pos in scope and pac.lose_against(ennemy): pac.threaten_by = ennemy.id # special: if a 10-cell still exist, we'll know for c in self.cells.values(): if c.value: c.value = 1 # update pellets self.pellets = {} for a in visible_pellets: x, y, v = a self.pellets[(x, y)] = Pellet(x, y, v) if v == 10: # always visible self.cells[(x, y)].value = v # update cells values in_sight = set() for pac in me.pacs: sight = set(self.line_of_sight(pac.pos)) in_sight |= sight for pos in in_sight: val = self.pellets[pos].value if pos in self.pellets else 0 self.cells[pos].value = val @staticmethod def manhattan(from_, to_): xa, ya = from_ xb, yb = to_ return abs(xa - xb) + abs(ya - yb) def neighbors(self, x, y, diags=False): neighs = [] for nx, ny in [(x, y - 1), (x - 1, y), (x + 1, y), (x, y + 1)]: if not 0 <= ny < self.height: continue if nx == -1: nx = self.width - 1 if nx == self.width: nx = 0 neighs.append((nx, ny)) return neighs @staticmethod def closest(from_, in_): return min(in_, key=lambda x: Grid.manhattan(from_, x)) def line_of_sight(self, from_): x0, y0 = from_ sight = [(x0, y0)] x = x0 + 1 while 1: if x == self.width: x = 0 if x == x0: break # tour complet if not self.cells[(x, y0)].is_floor: break sight.append((x, y0)) x += 1 x = x0 - 1 while 1: if x == -1: x = self.width - 1 if x == x0: break # tour complet if not self.cells[(x, y0)].is_floor: break sight.append((x, y0)) x -= 1 y = y0 + 1 while self.cells[(x0, y)].is_floor: sight.append((x0, y)) y += 1 y = y0 - 1 while self.cells[(x0, y)].is_floor: sight.append((x0, y)) y -= 1 return sight def scope(self, start, dist=1): """ zone de mouvement d'un pac """ scope = {start} for _ in range(dist): new_scope = set() for p in scope: new_scope |= {n for n in self.neighbors(*p) if self.cells[n].is_floor} scope |= new_scope return list(scope) def path(self, start, target): nodes = Queue() its, break_on = 0, 2000 origin = PathNode(*start) nodes.put(origin, 0) neighbors = [] while nodes: current = nodes.get() if current == target: path = [] previous = current while previous: if previous != start: path.insert(0, previous) previous = previous.parent return path neighbors = self.neighbors(*current) for x, y in neighbors: its += 1 if its > break_on: log(" pathfinding broken") return None if (x, y) == current.parent: continue cell = self.cells[(x, y)] if not cell.movable: continue moving_cost = 1 if cell.unit and cell.unit.owned: moving_cost += cell.unit.level if cell.under_tower: moving_cost += 2 cost = current.cost + moving_cost priority = cost + 10 * Grid.manhattan((x, y), target) node = PathNode(x, y, current) node.cost = cost nodes.put(node, priority) return None def eval_risk(self): # eval the nest opponent moves self.risk = {} for pac in opponent.pacs: current = RiskNode(pac.x, pac.y) current.proba = 100 nodes = {0: [current]} round_ = 1 while round_ < 8: nodes[round_] = [] for parent in nodes[round_ - 1]: neighbors = self.neighbors(*parent) candidates_cells = [self.cells[n] for n in neighbors if self.cells[n].movable] coming_from = tuple(parent.parent) if parent.parent else None # le x4/+1 est pour laisser une proba aux cases sans pellet, ça donne du 80% pour la case à 1 et 20% pour celle à 0 total_val = sum( [(4 * cell.value if (cell.x, cell.y) != coming_from else 0) + 1 for cell in candidates_cells]) for cell in candidates_cells: node = RiskNode(cell.x, cell.y) node.parent = parent proba = (100 * ((4 * cell.value if (cell.x, cell.y) != coming_from else 0) + 1)) / total_val node.proba = round((proba * parent.proba) / 100) nodes[round_].append(node) round_ += 1 for round_, round_nodes in nodes.items(): if not round_ in self.risk: self.risk[round_] = {} for node in round_nodes: x, y = node if not (x, y) in self.risk[round_]: self.risk[round_][(x, y)] = node.proba else: if node.proba > self.risk[round_][(x, y)]: self.risk[round_][(x, y)] = node.proba def print_risk(self, round_): if not round_ in self.risk: return "no data to show" grid = [] for y in range(self.height): row = [] for x in range(self.width): if (x, y) in self.risk[round_]: c = str(self.risk[round_][(x, y)]) if len(c) == 1: c = f".{c}." elif len(c) == 2: c = f".{c}" elif not self.cells[(x, y)].movable: c = "###" else: c = "..." row.append(c) grid.append(row) res = "\n".join(["".join([c for c in row]) for row in grid]) return f"ROUND {round_}\n" + res def route_finding(self, start): if self.consumed_its > self.MAX_ITS: log('max its already consumed') return None nodes = Queue(inv=True) targeted_cost = 18 reserve = Queue() its, break_on = 0, 80 broken = False starting_seg = self.segment_index[start] origin = RouteNode(starting_seg) origin.visited = [starting_seg.id_] origin.value = 0 origin.cost = 1 nodes.put(origin, 0) current = None while 1: if broken: break if not nodes: break priority, current = nodes.get_items() if current.cost >= targeted_cost: reserve.fput(current, priority) continue neighbors = self.segment_neighbors(current.segment) for seg in neighbors: its += 1 if its >= break_on or (its + self.consumed_its) > self.MAX_ITS: broken = True break if seg.status > ALLY_HERE and not seg is starting_seg: continue if seg.status == ALLY_HERE and not current.cost: continue value = seg.value if seg.id_ not in current.visited else 0 cost = seg.length half_turn = (current.parent is not None and current.parent.segment.id_ == seg.id_ and type(seg) is not LoopCorridor) if half_turn: cost *= 2 node = RouteNode(seg, current) node.value = current.value + value node.cost = current.cost + cost node.visited = list(current.visited) + [seg.id_] priority = 100 * (node.cost - node.value) nodes.fput(node, priority) self.consumed_its += its if broken: # loop was interrupted if reserve: # some routes were put in the reserve log('> routefinding interrupted') target = reserve.get() elif nodes: # no routes has been fully explored log('> routefinding interrupted without reserve') target = nodes.get() while target.value == 0: if not nodes: log('not a single valued node to target... :(') return None target = nodes.get() else: # something went wrong log(' routefinding broken') return [] else: if reserve: # all paths were explored target = reserve.get() else: # something went wrong log(' routefinding broken') return [] route = [] previous = target while previous: if previous.segment.id_ != starting_seg.id_: route.insert(0, previous) previous = previous.parent return [node.segment for node in route] def route_to_path(self, route, from_): path = [] cur_pos = from_ for seg in route: part = [] # is it an half-turn? if cur_pos in seg.coords: # on parcourt le dernier segment a l'envers avant de poursuivre part = seg.go_trough(cur_pos)[1:] else: for n in self.neighbors(*cur_pos): try: part = seg.go_trough(n) break except ValueError: pass if not part: log("error: error while rebuilding the path") return path path += part cur_pos = path[-1] if not route: break return path def cache_path(self, pac_id, path): self.path_cache[pac_id] = (path, sum(self.cells[c].value for c in path)) def has_cached_path(self, pac_id): return pac_id in self.path_cache def get_cached_path(self, pac_id): """ return the cached path if its total value did not change """ if not pac_id in self.path_cache: return None path, val = self.path_cache[pac_id] if not path: return None cur_val = sum(self.cells[c].value for c in path) if val != cur_val: return None if any(other.pos in path for other in me.pacs if other.id != pac_id): return None return path def record_move(self, pac, new_pos): del self.pacs_index[pac.pos] self.pacs_index[new_pos] = pac self.cells[pac.pos].status = UNOCCUPIED self.cells[new_pos].status = ALLY_HERE self.segment_index[pac.pos].status = UNOCCUPIED self.segment_index[new_pos].status = ALLY_HERE pac.pos = new_pos SPEED_DURATION = 5 ABILITY_COOLDOWN = 10 ROCK = "ROCK" PAPER = "PAPER" SCISSORS = "SCISSORS" _COMBOS = ((ROCK, SCISSORS), (SCISSORS, PAPER), (PAPER, ROCK)) WIN_AGAINST = {t1: t2 for t1, t2 in _COMBOS} LOSE_AGAINST = {t2: t1 for t1, t2 in _COMBOS} # Types d'occupation des cases UNOCCUPIED = 0 TARGETTED = 5 ALLY_HERE = 10 ENNEMY_HERE = 20 PREDATOR_HERE = 21 HARMLESS_HERE = 22 PREY_HERE = 23 class Pac(Base): def __init__(self, pac_id, mine): self.id = pac_id self.owner = ME if mine else OPPONENT self.type = None self.speed_cooldown = 0 self.speed_turns_left = 0 self.ability_cooldown = 0 self.threaten_by = None self.action_cmd = "" def update(self, x, y, type_id, speed_turns_left, ability_cooldown): self.x = x self.y = y self.type = type_id self.speed_turns_left = speed_turns_left self.ability_cooldown = ability_cooldown self.action_cmd = "" self.threaten_by = None @property def pos(self): return (self.x, self.y) @pos.setter def pos(self, pos): self.x, self.y = pos @property def owned(self): return self.owner == ME def can_accelerate(self): return self.ability_cooldown == 0 def can_switch(self): return self.ability_cooldown == 0 @property def is_accelerated(self): return self.speed_turns_left > 0 def win_against(self, other_pac): if other_pac.owned: raise Exception("Why would you fight against your friend?") return WIN_AGAINST[self.type] == other_pac.type def lose_against(self, other_pac): if other_pac.owned: raise Exception("Why would tou fight against your friend?") return LOSE_AGAINST[self.type] == other_pac.type def move(self, x, y): self.action_cmd = f"MOVE {self.id} {x} {y}" def speed_up(self): if self.speed_cooldown > 0: raise Exception("Speed cooldown still run") self.ability_cooldown = ABILITY_COOLDOWN self.speed_turns_left = SPEED_DURATION self.action_cmd = f"SPEED {self.id}" def switch(self, new_type): if new_type == self.type: raise Exception('has already the type {new_type}') self.type = new_type self.ability_cooldown = ABILITY_COOLDOWN self.action_cmd = f"SWITCH {self.id} {new_type}" def switch_to_beat_up(self, other_pac): new_type = LOSE_AGAINST[other_pac.type] self.switch(new_type) def wait(self): self.action_cmd = f"MOVE {self.id} {self.x} {self.y}" class Pellet(Base): def __init__(self, x, y, value=1): self.x = x self.y = y self.value = value # Main loop ROUND = 0 _, height = [int(i) for i in l_input().split()] grid = Grid.from_matrix([list(l_input()) for _ in range(height)]) me = Player(ME) opponent = Player(OPPONENT) while 1: cmds = [] log(f"--- ROUND {ROUND} ---") me.score, opponent.score = [int(i) for i in l_input().split()] visible_pacs = [] visible_pac_count = int(l_input()) # all your pacs and enemy pacs in sight for _ in range(visible_pac_count): pac_id, is_mine, x, y, type_id, speed_turns_left, ability_cooldown = l_input().split() visible_pacs.append( (int(pac_id), is_mine != "0", int(x), int(y), type_id, int(speed_turns_left), int(ability_cooldown))) visible_pellets = [] visible_pellet_count = int(l_input()) # all pellets in sight for _ in range(visible_pellet_count): x, y, v = [int(j) for j in l_input().split()] visible_pellets.append((x, y, v)) log('> input fetched') grid.update(visible_pacs, visible_pellets) log('> grid updated') log(grid.print_()) if LOG_INPUT: log(INPUTS) for pac in me.pacs: grid.segmentize() grid.print_segments() log(f'# Pac {pac.id}') cached_path = grid.get_cached_path(pac.id) if pac.threaten_by: log(' Threaten by: ', pac.threaten_by) if cached_path and len(cached_path) > 3 and pac.can_accelerate() and not pac.threaten_by: pac.speed_up() continue if pac.threaten_by and pac.can_switch(): switch_to = next( type_ for type_, against in WIN_AGAINST.items() if against == grid.pacs[pac.threaten_by].type) log(f'Switch to resist! >> {switch_to}') pac.switch(switch_to) continue if cached_path is None: log('routefinding') route = grid.route_finding(pac.pos) if route: path = grid.route_to_path(route, pac.pos) grid.cache_path(pac.id, path) log('> done') else: path = [] else: path = cached_path log('use cached path') for pos in path[:2]: grid.cells[pos].status = TARGETTED # log(f"path {path[:10]}...") if path: if pac.is_accelerated and len(path) > 1: next_ = path.pop(1) else: next_ = path.pop(0) else: log('error: no path given') next_ = next((n for n in grid.neighbors(pac.x, pac.y) if grid.cells[n].movable), None) if next_: pac.move(*next_) grid.record_move(pac, next_) continue log(f"no action given, wait") pac.wait() print("|".join([pac.action_cmd for pac in me.pacs])) INPUTS = [] ROUND += 1