import time import vlc from path import Path from core import logging_ from core.exceptions import NotSupportedFile from core.file_utilities import is_media_file_ext, hash_file from core.logging_ import Logger from core.models import Track from core.repositories import MusicFolderRepository, TrackRepository logger = Logger.get() class Indexation: def __init__(self): self.started = False self.music_folder_repo = MusicFolderRepository() self.track_repo = TrackRepository() self.music_folders = [] self.index = {} self.processed = set() self.t0 = time.time() def index_file(self, music_folder, filename, force_update=False): filename = Path(filename) if not filename.exists(): raise FileNotFoundError(f"File not found: {filename}") if not is_media_file_ext(filename.ext): raise NotSupportedFile(f"File's extension {filename.ext} is not supported") vlc_media = vlc.Media(filename) vlc_media.parse() track_infos = vlc_media.get_tracks_info() track_hash = hash_file(filename) if track_hash in self.processed: print(" ... file already indexed, ignore: ", filename) return if self.index and track_hash in self.index: track = self.index[track_hash] else: track = next(iter(self.track_repo.get_by('hash', track_hash)), None) or Track() if track.id and not force_update: return track track.profile_id = 0 track.music_folder_id = music_folder.id track.title = vlc_media.get_meta(vlc.Meta.Title) track.format = filename.ext track.artist = vlc_media.get_meta(vlc.Meta.AlbumArtist) or vlc_media.get_meta(vlc.Meta.Artist) track.album = vlc_media.get_meta(vlc.Meta.Album) track.track_num = vlc_media.get_meta(vlc.Meta.TrackNumber) # track.year = vlc_media.get_meta(vlc.Meta.Date) # track.duration = vlc_media.get_meta(vlc.Meta.Date) # track.size = 0 track.note = "" track.status = Track.STATUS_FOUND track.path = filename track.hash = track_hash if track.id is not None: self.track_repo.update(track) logger.debug('Index - updated: %s' % filename) else: self.track_repo.create(track) logger.debug('Index - created: %s' % filename) self.track_repo.commit() return track def index_all(self, filter_music_folder_id=None, force_update=False): self.index = {t.hash: t for t in self.track_repo.get_all()} self.processed = set() self.t0 = time.time() self.started = True if filter_music_folder_id: music_folders = [self.music_folder_repo.get_by_id(filter_music_folder_id)] else: music_folders = self.music_folder_repo.get_all() # add and update tracks for music_folder in music_folders: music_folder_path = Path(music_folder.path) for filename in music_folder_path.walkfiles(): if not is_media_file_ext(filename.ext): continue track = self.index_file(music_folder, filename, force_update=force_update) self.processed.add(track.hash) # mark missings for hash_, track in self.index.items(): if hash_ in self.processed: continue track.status = Track.STATUS_UNAVAILABLE self.track_repo.update(track, True) logger.debug('Index - marked as unavailable: %s' % track.path) class Indexer: @staticmethod def index_file(music_folder, path, force_update=False): indexation = Indexation() indexation.index_file(music_folder, path, force_update=force_update) @staticmethod def index_folder(music_folder, force_update=False): logger.debug('** Start indexation on folder %s' % music_folder.id) indexation = Indexation() indexation.index_all(filter_music_folder_id=music_folder.id, force_update=force_update) @staticmethod def index_all(force_update=False): logger.debug('** Start complete indexation') indexation = Indexation() indexation.index_all(force_update=force_update) if __name__ == '__main__': Indexer.index_all()