|
|
@@ -1,7 +1,6 @@
|
|
|
import time
|
|
|
from collections import deque
|
|
|
-from queue import Queue
|
|
|
-from threading import Thread, Lock
|
|
|
+from threading import Thread, Timer, Event
|
|
|
|
|
|
import vlc
|
|
|
from PyQt5.QtCore import pyqtSignal, QObject
|
|
|
@@ -17,42 +16,8 @@ from core.repositories import MusicFolderRepository, TrackRepository
|
|
|
logger = Logger.get()
|
|
|
|
|
|
|
|
|
-class CyclicThread(Thread):
|
|
|
- DELAY = 0
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- Thread.__init__(self)
|
|
|
- self.interrupted = False
|
|
|
- self.last_exec = 0
|
|
|
- self.running = False
|
|
|
-
|
|
|
- def act(self):
|
|
|
- raise NotImplementedError()
|
|
|
-
|
|
|
- def run(self):
|
|
|
- t = None
|
|
|
- self.running = True
|
|
|
- try:
|
|
|
- while 1:
|
|
|
- if self.DELAY:
|
|
|
- t = time.time()
|
|
|
-
|
|
|
- if not self.DELAY or not self.last_exec or (t - self.last_exec) > self.DELAY:
|
|
|
- self.act()
|
|
|
- self.last_exec = t
|
|
|
-
|
|
|
- if self.interrupted:
|
|
|
- break
|
|
|
-
|
|
|
- time.sleep(0.1)
|
|
|
- finally:
|
|
|
- self.running = False
|
|
|
-
|
|
|
- def trigger(self):
|
|
|
- self.last_exec = 0
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- self.interrupted = True
|
|
|
+class AlreadyIndexed(Exception):
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
class Emitter(QObject):
|
|
|
@@ -60,23 +25,37 @@ class Emitter(QObject):
|
|
|
musicFolderStatusChanged = pyqtSignal(int)
|
|
|
|
|
|
|
|
|
-class Discoverer(CyclicThread):
|
|
|
- DELAY = 5
|
|
|
+class Indexer(Thread):
|
|
|
+ DELAY = 2
|
|
|
|
|
|
- def __init__(self, indexer):
|
|
|
- CyclicThread.__init__(self)
|
|
|
- self.indexer = indexer
|
|
|
+ def __init__(self):
|
|
|
+ Thread.__init__(self)
|
|
|
+ self.stopped = Event()
|
|
|
+ self.emitter = Emitter()
|
|
|
+ self.timer = Timer(self.DELAY, self.act)
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ logger.info('** indexation thread started **')
|
|
|
+ while not self.stopped.wait(self.DELAY):
|
|
|
+ self.act()
|
|
|
|
|
|
def act(self):
|
|
|
+
|
|
|
+ # Initialize
|
|
|
session = db.Session()
|
|
|
music_folder_repo = MusicFolderRepository(session)
|
|
|
- music_folders = music_folder_repo.get_all()
|
|
|
-
|
|
|
track_repo = TrackRepository(session)
|
|
|
+
|
|
|
+ # Get current data
|
|
|
+ music_folders = music_folder_repo.get_all()
|
|
|
tracks = track_repo.get_all()
|
|
|
|
|
|
+ # Index existing
|
|
|
index = {t.path: t for t in tracks}
|
|
|
+ buffer = deque()
|
|
|
|
|
|
+ # -- Walk through music folders
|
|
|
+ # Index new files
|
|
|
for music_folder in music_folders:
|
|
|
music_folder_path = Path(music_folder.path)
|
|
|
|
|
|
@@ -84,79 +63,57 @@ class Discoverer(CyclicThread):
|
|
|
if music_folder.status == music_folder.STATUS_FOUND:
|
|
|
music_folder.status = music_folder.STATUS_UNAVAILABLE
|
|
|
music_folder_repo.commit()
|
|
|
- self.indexer.emitter.musicFolderStatusChanged.emit(music_folder.id)
|
|
|
+ self.emitter.musicFolderStatusChanged.emit(music_folder.id)
|
|
|
continue
|
|
|
|
|
|
if music_folder.status != music_folder.STATUS_FOUND:
|
|
|
music_folder.status = music_folder.STATUS_FOUND
|
|
|
music_folder_repo.commit()
|
|
|
- self.indexer.emitter.musicFolderStatusChanged.emit(music_folder.id)
|
|
|
+ self.emitter.musicFolderStatusChanged.emit(music_folder.id)
|
|
|
|
|
|
for filename in music_folder_path.walkfiles():
|
|
|
- if self.indexer.in_deque(filename):
|
|
|
+ if filename in buffer:
|
|
|
continue
|
|
|
if filename not in index and is_media_file_ext(filename.ext):
|
|
|
- self.indexer.put(filename)
|
|
|
+ buffer.append(filename)
|
|
|
elif filename in index:
|
|
|
track = index[filename]
|
|
|
if track.status == Track.STATUS_UNAVAILABLE:
|
|
|
- self.indexer.put(track.id)
|
|
|
+ buffer.append(track.id)
|
|
|
del index[filename]
|
|
|
|
|
|
+ # Index missing files
|
|
|
for filename, track in index.items():
|
|
|
- if self.indexer.in_deque(track.id):
|
|
|
+ if track.id in buffer:
|
|
|
continue
|
|
|
|
|
|
filename = Path(filename)
|
|
|
if not filename.exists() and track.status != Track.STATUS_UNAVAILABLE:
|
|
|
- self.indexer.put(track.id)
|
|
|
-
|
|
|
-class Indexer(CyclicThread):
|
|
|
- DELAY = 2
|
|
|
- BUFFER_SIZE = 100
|
|
|
+ buffer.append(track.id)
|
|
|
|
|
|
- def __init__(self):
|
|
|
- CyclicThread.__init__(self)
|
|
|
- self.deque = deque()
|
|
|
- self.interrupted = False
|
|
|
- self.discoverer = Discoverer(self)
|
|
|
- self.last_commit = None
|
|
|
- self.tracks = []
|
|
|
- self.emitter = Emitter()
|
|
|
-
|
|
|
- def start(self):
|
|
|
- logger.info('** indexation thread started **')
|
|
|
- self.discoverer.start()
|
|
|
- super().start()
|
|
|
-
|
|
|
- def act(self):
|
|
|
- buffer = []
|
|
|
- session = db.Session()
|
|
|
- track_repo = TrackRepository(session)
|
|
|
-
|
|
|
- for _ in range(self.BUFFER_SIZE):
|
|
|
+ # Index buffered tracks
|
|
|
+ tracks = []
|
|
|
+ while buffer:
|
|
|
+ filename_or_id = buffer.pop()
|
|
|
try:
|
|
|
- track = self.index(track_repo, self.deque.pop())
|
|
|
- buffer.append(track)
|
|
|
+ track = self.index(track_repo, filename_or_id)
|
|
|
+ tracks.append(track)
|
|
|
+ except AlreadyIndexed:
|
|
|
+ pass
|
|
|
except (FileNotFoundError, NotSupportedFile) as e:
|
|
|
logger.warning("Error during indexation: %s" % e)
|
|
|
continue
|
|
|
except IndexError:
|
|
|
break
|
|
|
|
|
|
- if buffer:
|
|
|
- for track in buffer:
|
|
|
+ # Finalize
|
|
|
+ if tracks:
|
|
|
+ for track in tracks:
|
|
|
if track.id is None:
|
|
|
track_repo.create(track)
|
|
|
track_repo.commit()
|
|
|
- self.emitter.filesIndexed.emit(buffer)
|
|
|
- logger.info(f"{len(buffer)} tracks indexed")
|
|
|
-
|
|
|
- def put(self, filename_or_track_id):
|
|
|
- self.deque.appendleft(filename_or_track_id)
|
|
|
-
|
|
|
- def in_deque(self, filename_or_track_id):
|
|
|
- return filename_or_track_id in self.deque
|
|
|
+ self.emitter.filesIndexed.emit(tracks)
|
|
|
+ logger.info(f"{len(tracks)} tracks indexed")
|
|
|
|
|
|
@staticmethod
|
|
|
def index(track_repo, filename_or_track_id):
|
|
|
@@ -183,6 +140,8 @@ class Indexer(CyclicThread):
|
|
|
track = track_repo.get_by_hash(track_hash)
|
|
|
if not track:
|
|
|
track = Track()
|
|
|
+ elif track.status == Track.STATUS_FOUND:
|
|
|
+ raise AlreadyIndexed(f"File already indexed")
|
|
|
|
|
|
vlc_media = vlc.Media(filename)
|
|
|
vlc_media.parse()
|
|
|
@@ -207,8 +166,9 @@ class Indexer(CyclicThread):
|
|
|
return track
|
|
|
|
|
|
def stop(self):
|
|
|
- self.discoverer.stop()
|
|
|
- super().stop()
|
|
|
+ self.stopped.set()
|
|
|
+ while self.is_alive():
|
|
|
+ time.sleep(0.1)
|
|
|
logger.info('** indexation thread stopped **')
|
|
|
|
|
|
|