Browse Source

implement multithreaded indexer

Olivier Massot 4 years ago
parent
commit
4ae163e151
2 changed files with 102 additions and 38 deletions
  1. 101 37
      core/indexer.py
  2. 1 1
      core/repositories.py

+ 101 - 37
core/indexer.py

@@ -13,69 +13,123 @@ from core.repositories import MusicFolderRepository, TrackRepository
 
 logger = Logger.get()
 
+class CyclicThread(Thread):
+    DELAY = 0
 
-class Discoverer(Thread):
-    def __init__(self, indexer):
+    def __init__(self):
         Thread.__init__(self)
-        self.indexer = indexer
+        self.interrupted = False
+        self.last_exec = 0
+
+    def act(self):
+        raise NotImplementedError()
 
     def run(self):
-        music_folder_repo = MusicFolderRepository()
+        t = None
         while 1:
-            for music_folder in music_folder_repo.get_all():
-                music_folder_path = Path(music_folder.path)
+            if self.DELAY:
+                t = time.time()
+
+            if not self.DELAY or not self.last_exec or (t - self.last_exec) > self.DELAY:
+                self.act()
+                self.last_exec = t
+
+            if self.interrupted:
+                break
 
-                for filename in music_folder_path.walkfiles():
-                    if is_media_file_ext(filename.ext):
-                        self.indexer.put(filename)
-            time.sleep(1)
+    def trigger(self):
+        self.last_exec = 0
 
+    def stop(self):
+        self.interrupted = True
+
+
+class Discoverer(CyclicThread):
+    DELAY = 5
 
-class Cleaner(Thread):
     def __init__(self, indexer):
-        Thread.__init__(self)
+        CyclicThread.__init__(self)
         self.indexer = indexer
 
-    def run(self):
+    def act(self):
+        music_folder_repo = MusicFolderRepository()
         track_repo = TrackRepository()
-        while 1:
-            pass
 
-class Indexer(Thread):
+        index = {t.path: t for t in track_repo.get_all()}
+
+        for music_folder in music_folder_repo.get_all():
+            music_folder_path = Path(music_folder.path)
+
+            for filename in music_folder_path.walkfiles():
+                if filename not in index and is_media_file_ext(filename.ext):
+                    self.indexer.put(filename)
+                elif filename in index:
+                    track = index[filename]
+                    if track.status == Track.STATUS_UNAVAILABLE:
+                        self.indexer.put(track.id)
+                    del index[filename]
+
+        for filename, track in index.items():
+            filename = Path(filename)
+            if not filename.exists():
+                self.indexer.put(track.id)
+
+class Indexer(CyclicThread):
 
     def __init__(self):
-        Thread.__init__(self)
+        CyclicThread.__init__(self)
         self.queue = Queue()
-        discoverer = Discoverer(self)
-
-    def run(self):
-        while 1:
-            filename = self.queue.get()
-            self.index(filename)
+        self.interrupted = False
+        self.discoverer = Discoverer(self)
+
+    def start(self):
+        logger.info('** indexation thread started **')
+        self.discoverer.start()
+        super().start()
+
+    def act(self):
+        if not self.queue.empty():
+            filename = self.queue.get(False)
+            try:
+                self.index(filename)
+            except (FileNotFoundError, NotSupportedFile) as e:
+                logger.warning("Error during indexation: %s" % e)
 
     def put(self, filename):
         self.queue.put(filename)
 
     @staticmethod
-    def index(filename):
-        filename = Path(filename)
-        if not filename.exists():
-            raise FileNotFoundError(f"File not found: {filename}")
-        if not is_media_file_ext(filename.ext):
-            raise NotSupportedFile(f"File's extension {filename.ext} is not supported")
-
+    def index(filename_or_track_id):
+        """ index a media file from the filesystem or a track id """
         track_repo = TrackRepository()
 
+        if type(filename_or_track_id) is int:
+            track = track_repo.get_by_id(filename_or_track_id)
+            filename = Path(track.path)
+            track_hash = track.hash
+
+            if not filename.exists():
+                logger.debug('Index - missing: %s' % filename)
+                track.status = Track.STATUS_UNAVAILABLE
+                track_repo.commit()
+                return
+        else:
+            filename = Path(filename_or_track_id)
+            if not filename.exists():
+                raise FileNotFoundError(f"File not found: {filename}")
+            if not is_media_file_ext(filename.ext):
+                raise NotSupportedFile(f"File's extension {filename.ext} is not supported")
+
+            track_hash = hash_file(filename)
+
+            track = track_repo.get_by_hash(track_hash)
+            if not track:
+                track = Track()
+
         vlc_media = vlc.Media(filename)
         vlc_media.parse()
         track_infos = vlc_media.get_tracks_info()
 
-        track_hash = hash_file(filename)
-
-        track = track_repo.get_by_hash(track_hash)
-        if not track:
-            track = Track()
-
         track.title = vlc_media.get_meta(vlc.Meta.Title) or filename.stripext().name
         track.format = filename.ext
         track.artist = vlc_media.get_meta(vlc.Meta.AlbumArtist) or vlc_media.get_meta(vlc.Meta.Artist)
@@ -95,6 +149,16 @@ class Indexer(Thread):
         logger.debug('Index - updated: %s' % filename)
         track_repo.commit()
 
+    def stop(self):
+        self.discoverer.stop()
+        super().stop()
+        logger.info('** indexation thread stopped **')
+
 
 if __name__ == '__main__':
-    pass
+    indexer = Indexer()
+    indexer.start()
+    try:
+        indexer.join()
+    except KeyboardInterrupt:
+        indexer.stop()

+ 1 - 1
core/repositories.py

@@ -20,7 +20,7 @@ class Repository:
         self.session.rollback()
 
     def get_by_id(self, id_):
-        return self.query().filter(id == id_).first()
+        return self.query().filter(self.MODEL_CLS.id == id_).first()
 
     def get_all(self):
         return self.query().all()