Olivier Massot 3 лет назад
Родитель
Сommit
51383e54de
3 измененных файлов с 88 добавлено и 55 удалено
  1. 6 13
      src/Message/Handler/ExportHandler.php
  2. 82 0
      src/Service/MercureHub.php
  3. 0 42
      src/Service/MercurePublisher.php

+ 6 - 13
src/Message/Handler/ExportHandler.php

@@ -4,32 +4,25 @@ declare(strict_types=1);
 namespace App\Message\Handler;
 
 use App\Message\Command\Export;
-use App\Service\MercurePublisher;
+use App\Service\MercureHub;
 use App\Service\ServiceIterator\ExporterIterator;
 use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
 use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
+use Symfony\Component\Serializer\Exception\ExceptionInterface;
 
 class ExportHandler implements MessageHandlerInterface
 {
     public function __construct(
         private ExporterIterator $handler,
-        private MercurePublisher $mercurePublisher
+        private MercureHub $mercureHub
     ) {}
 
     public function __invoke(Export $export)
     {
         $exportRequest = $export->getExportRequest();
-        try {
-            $exportService = $this->handler->getExporterFor($exportRequest);
-            $file = $exportService->export($exportRequest);
+        $exportService = $this->handler->getExporterFor($exportRequest);
+        $file = $exportService->export($exportRequest);
 
-            $this->mercurePublisher->publish(
-                ['url' => 'https://my.download.url/' . $file->getId()],
-                $exportRequest->getRequesterId()
-            );
-        } catch (\Exception $e) {
-            // To prevent Messenger from retrying
-            throw new UnrecoverableMessageHandlingException($e->getMessage(), $e->getCode(), $e);
-        }
+        $this->mercureHub->publishUpdate($exportRequest->getRequesterId(), $file);
     }
 }

+ 82 - 0
src/Service/MercureHub.php

@@ -0,0 +1,82 @@
+<?php
+
+namespace App\Service;
+
+use ApiPlatform\Core\Api\IriConverterInterface;
+use Symfony\Component\Mercure\HubInterface;
+use Symfony\Component\Mercure\Update;
+use Symfony\Component\Serializer\Encoder\EncoderInterface;
+use Symfony\Component\Serializer\SerializerInterface;
+
+/**
+ * Sends private and encrypted mercure updates to the target users.
+ *
+ * Updates inform of modifications on entities : updates, creations, deletions.
+ *
+ * The update topic is the id of the recipient user.
+ * The content is a json containing the iri of the entity, the operation type, and the current data of this entity
+ */
+class MercureHub
+{
+    public const UPDATE = 'update';
+    public const CREATE = 'create';
+    public const DELETE = 'delete';
+
+    public function __construct(
+        private HubInterface $mercureHub,
+        private SerializerInterface $serializer,
+        private EncoderInterface $encoder,
+        private IriConverterInterface $iriConverter
+    ) {}
+
+    /**
+     * Send an update to the
+     *
+     * @param $entity
+     * @param int $accessId
+     * @param string $operationType
+     */
+    public function publish(int $accessId, $entity, string $operationType = self::UPDATE): void
+    {
+        if (!in_array($operationType, [self::UPDATE, self::CREATE, self::DELETE], true)) {
+            throw new \InvalidArgumentException('Invalid operation type');
+        }
+
+        $data = $this->encoder->encode([
+            'iri' => $this->iriConverter->getIriFromItem($entity),
+            'operation' => $operationType,
+            'data' => $this->serializer->serialize($entity, 'jsonld')
+        ], 'jsonld');
+
+        $update = new Update(
+            "access/{$accessId}",
+            $data,
+            true
+        );
+        $this->mercureHub->publish($update);
+    }
+
+    /**
+     * @param $entity
+     * @param int $accessId
+     */
+    public function publishUpdate(int $accessId, $entity): void {
+        $this->publish($accessId, $entity, self::UPDATE);
+    }
+
+    /**
+     * @param $entity
+     * @param int $accessId
+     */
+    public function publishCreate(int $accessId, $entity): void {
+        $this->publish($accessId, $entity, self::CREATE);
+    }
+
+    /**
+     * @param $entity
+     * @param int $accessId
+     */
+    public function publishDelete(int $accessId, $entity): void {
+        $this->publish($accessId, $entity, self::DELETE);
+    }
+}

+ 0 - 42
src/Service/MercurePublisher.php

@@ -1,42 +0,0 @@
-<?php
-
-namespace App\Service;
-
-use App\Entity\Access\Access;
-use Symfony\Component\Mercure\HubInterface;
-use Symfony\Component\Mercure\Update;
-use Symfony\Component\Security\Core\Security;
-
-class MercurePublisher
-{
-    public function __construct(
-        private Security $security,
-        private HubInterface $mercureHub
-    ) {}
-
-    /**
-     * @throws \JsonException
-     */
-    public function publish(
-        array $data,
-        ?int $accessId = null
-    ) {
-        if ($accessId === null) {
-            /**
-             * @var Access $access
-             */
-            $access = $this->security->getUser();
-            if ($access === null) {
-                throw new \RuntimeException('No accessId provided, impossible to send the mercure update');
-            }
-            $accessId = $access->getId();
-        }
-
-        $update = new Update(
-            "access/{$accessId}",
-            json_encode($data, JSON_THROW_ON_ERROR),
-            true
-        );
-        $this->mercureHub->publish($update);
-    }
-}