# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""The central Model and Database constructs for DBCore.
"""
import time
import os
import re
from collections import defaultdict
import threading
import sqlite3
import contextlib
from unidecode import unidecode
import beets
from beets.util import functemplate
from beets.util import py3_path
from beets.dbcore import types
from .query import MatchQuery, NullSort, TrueQuery, AndQuery
from collections.abc import Mapping
class DBAccessError(Exception):
"""The SQLite database became inaccessible.
This can happen when trying to read or write the database when, for
example, the database file is deleted or otherwise disappears. There
is probably no way to recover from this error.
"""
class FormattedMapping(Mapping):
"""A `dict`-like formatted view of a model.
The accessor `mapping[key]` returns the formatted version of
`model[key]` as a unicode string.
The `included_keys` parameter allows filtering the fields that are
returned. By default all fields are returned. Limiting to specific keys can
avoid expensive per-item database queries.
If `for_path` is true, all path separators in the formatted values
are replaced.
"""
ALL_KEYS = '*'
def __init__(self, model, included_keys=ALL_KEYS, for_path=False):
self.for_path = for_path
self.model = model
if included_keys == self.ALL_KEYS:
# Performance note: this triggers a database query.
self.model_keys = self.model.keys(True)
else:
self.model_keys = included_keys
def __getitem__(self, key):
if key in self.model_keys:
return self._get_formatted(self.model, key)
else:
raise KeyError(key)
def __iter__(self):
return iter(self.model_keys)
def __len__(self):
return len(self.model_keys)
def get(self, key, default=None):
if default is None:
default = self.model._type(key).format(None)
return super().get(key, default)
def _get_formatted(self, model, key):
value = model._type(key).format(model.get(key))
if isinstance(value, bytes):
value = value.decode('utf-8', 'ignore')
if self.for_path:
sep_repl = beets.config['path_sep_replace'].as_str()
sep_drive = beets.config['drive_sep_replace'].as_str()
if re.match(r'^\w:', value):
value = re.sub(r'(?<=^\w):', sep_drive, value)
for sep in (os.path.sep, os.path.altsep):
if sep:
value = value.replace(sep, sep_repl)
return value
class LazyConvertDict:
"""Lazily convert types for attributes fetched from the database
"""
def __init__(self, model_cls):
"""Initialize the object empty
"""
self.data = {}
self.model_cls = model_cls
self._converted = {}
def init(self, data):
"""Set the base data that should be lazily converted
"""
self.data = data
def _convert(self, key, value):
"""Convert the attribute type according the the SQL type
"""
return self.model_cls._type(key).from_sql(value)
def __setitem__(self, key, value):
"""Set an attribute value, assume it's already converted
"""
self._converted[key] = value
def __getitem__(self, key):
"""Get an attribute value, converting the type on demand
if needed
"""
if key in self._converted:
return self._converted[key