| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import time
- import vlc
- from path import Path
- from core import logging_
- from core.exceptions import NotSupportedFile
- from core.file_utilities import is_media_file_ext, hash_file
- from core.logging_ import Logger
- from core.models import Track
- from core.repositories import MusicFolderRepository, TrackRepository
- logger = Logger.get()
- class Indexation:
- def __init__(self):
- self.started = False
- self.music_folder_repo = MusicFolderRepository()
- self.track_repo = TrackRepository()
- self.music_folders = []
- self.index = {}
- self.processed = set()
- self.t0 = time.time()
- def index_file(self, music_folder, filename, force_update=False):
- filename = Path(filename)
- if not filename.exists():
- raise FileNotFoundError(f"File not found: {filename}")
- if not is_media_file_ext(filename.ext):
- raise NotSupportedFile(f"File's extension {filename.ext} is not supported")
- vlc_media = vlc.Media(filename)
- vlc_media.parse()
- track_infos = vlc_media.get_tracks_info()
- track_hash = hash_file(filename)
- if track_hash in self.processed:
- print(" ... file already indexed, ignore: ", filename)
- return
- if self.index and track_hash in self.index:
- track = self.index[track_hash]
- else:
- track = next(iter(self.track_repo.get_by('hash', track_hash)), None) or Track()
- if track.id and not force_update:
- return track
- track.profile_id = 0
- track.music_folder_id = music_folder.id
- track.title = vlc_media.get_meta(vlc.Meta.Title)
- track.format = filename.ext
- track.artist = vlc_media.get_meta(vlc.Meta.AlbumArtist) or vlc_media.get_meta(vlc.Meta.Artist)
- track.album = vlc_media.get_meta(vlc.Meta.Album)
- track.track_num = vlc_media.get_meta(vlc.Meta.TrackNumber)
- # track.year = vlc_media.get_meta(vlc.Meta.Date)
- # track.duration = vlc_media.get_meta(vlc.Meta.Date)
- # track.size = 0
- track.note = ""
- track.status = Track.STATUS_FOUND
- track.path = filename
- track.hash = track_hash
- if track.id is not None:
- self.track_repo.update(track)
- logger.debug('Index - updated: %s' % filename)
- else:
- self.track_repo.create(track)
- logger.debug('Index - created: %s' % filename)
- self.track_repo.commit()
- return track
- def index_all(self, filter_music_folder_id=None, force_update=False):
- self.index = {t.hash: t for t in self.track_repo.get_all()}
- self.processed = set()
- self.t0 = time.time()
- self.started = True
- if filter_music_folder_id:
- music_folders = [self.music_folder_repo.get_by_id(filter_music_folder_id)]
- else:
- music_folders = self.music_folder_repo.get_all()
- # add and update tracks
- for music_folder in music_folders:
- music_folder_path = Path(music_folder.path)
- for filename in music_folder_path.walkfiles():
- if not is_media_file_ext(filename.ext):
- continue
- track = self.index_file(music_folder, filename, force_update=force_update)
- self.processed.add(track.hash)
- # mark missings
- for hash_, track in self.index.items():
- if hash_ in self.processed:
- continue
- track.status = Track.STATUS_UNAVAILABLE
- self.track_repo.update(track, True)
- logger.debug('Index - marked as unavailable: %s' % track.path)
- class Indexer:
- @staticmethod
- def index_file(music_folder, path, force_update=False):
- indexation = Indexation()
- indexation.index_file(music_folder, path, force_update=force_update)
- @staticmethod
- def index_folder(music_folder, force_update=False):
- logger.debug('** Start indexation on folder %s' % music_folder.id)
- indexation = Indexation()
- indexation.index_all(filter_music_folder_id=music_folder.id, force_update=force_update)
- @staticmethod
- def index_all(force_update=False):
- logger.debug('** Start complete indexation')
- indexation = Indexation()
- indexation.index_all(force_update=force_update)
- if __name__ == '__main__':
- Indexer.index_all()
|