Skip to content

Commit

Permalink
Improve definition of dependency table entries (#420)
Browse files Browse the repository at this point in the history
* Introduce define.DEPENDENCY_TABLE

* Rename to define.DEPENDENCY_INDEX_DTYPE

* Rename definition of dependency filename

* Store define.DEPENDENCY_TYPE as dict

* Fix linter

* Fix tests
  • Loading branch information
hagenw committed Aug 14, 2024
1 parent 593776c commit 99eb738
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 225 deletions.
13 changes: 5 additions & 8 deletions audb/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ def cached(
# Skip old audb cache (e.g. 1 as flavor)
files = audeer.list_file_names(version_path, basenames=True)
if (
define.DEPENDENCIES_FILE not in files
and define.LEGACY_DEPENDENCIES_FILE not in files
and define.CACHED_DEPENDENCIES_FILE not in files
define.DEPENDENCY_FILE not in files
and define.LEGACY_DEPENDENCY_FILE not in files
and define.CACHED_DEPENDENCY_FILE not in files
):
# Skip all cache entries
# that don't contain a dependency file
Expand Down Expand Up @@ -265,7 +265,7 @@ def dependencies(
version,
cache_root=cache_root,
)
cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE)
cached_deps_file = os.path.join(db_root, define.CACHED_DEPENDENCY_FILE)

with FolderLock(db_root):
try:
Expand Down Expand Up @@ -492,10 +492,7 @@ def remove_media(
# if archive exists in this version,
# remove file from it and re-publish
remote_archive = backend_interface.join(
"/",
name,
define.DEPEND_TYPE_NAMES[define.DependType.MEDIA],
archive + ".zip",
"/", name, "media", archive + ".zip"
)
if backend_interface.exists(remote_archive, version):
files_in_archive = backend_interface.get_archive(
Expand Down
114 changes: 56 additions & 58 deletions audb/core/define.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,73 +10,71 @@
HEADER_FILE = f"{DB}.yaml"

# Dependencies
DEPENDENCIES_FILE = f"{DB}.parquet"
CACHED_DEPENDENCIES_FILE = f"{DB}.pkl"
LEGACY_DEPENDENCIES_FILE = f"{DB}.csv"
DEPENDENCY_FILE = f"{DB}.parquet"
r"""Filename and extension of dependency table file."""

CACHED_DEPENDENCY_FILE = f"{DB}.pkl"
r"""Filename and extension of cached dependency table file.
As loading from a pickle file is still faster
than loading from a parquet file,
we are storing the dependency table
as a pickle file in cache.
"""

LEGACY_DEPENDENCY_FILE = f"{DB}.csv"
r"""Filename and extension of legacy dependency table file.
In ``audb`` versions smaller than 1.7.0,
the dependency table was stored in a csv file.
"""

DEPENDENCY_TABLE = {
# Column name: column dtype
"archive": "string[pyarrow]",
"bit_depth": "int32[pyarrow]",
"channels": "int32[pyarrow]",
"checksum": "string[pyarrow]",
"duration": "float64[pyarrow]",
"format": "string[pyarrow]",
"removed": "int32[pyarrow]",
"sampling_rate": "int32[pyarrow]",
"type": "int32[pyarrow]",
"version": "string[pyarrow]",
}
r"""Column names and data types of dependency table.
# Cache lock
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
LOCK_FILE = ".lock"
TIMEOUT_MSG = "Lock could not be acquired. Timeout exceeded."
The dependency table is stored in a dataframe
at ``audb.Dependencies._df``,
and contains the specified column names
and data types.
"""

class DependField:
r"""Fields stored in dependency table."""

ARCHIVE = 0
BIT_DEPTH = 1
CHANNELS = 2
CHECKSUM = 3
DURATION = 4
FORMAT = 5
REMOVED = 6
SAMPLING_RATE = 7
TYPE = 8
VERSION = 9


DEPEND_FIELD_NAMES = {
DependField.ARCHIVE: "archive",
DependField.BIT_DEPTH: "bit_depth",
DependField.CHANNELS: "channels",
DependField.CHECKSUM: "checksum",
DependField.DURATION: "duration",
DependField.FORMAT: "format",
DependField.REMOVED: "removed",
DependField.SAMPLING_RATE: "sampling_rate",
DependField.TYPE: "type",
DependField.VERSION: "version",
}
DEPENDENCY_INDEX_DTYPE = "object"
r"""Data type of the dependency table index."""

DEPEND_FIELD_DTYPES = {
DependField.ARCHIVE: "string[pyarrow]",
DependField.BIT_DEPTH: "int32[pyarrow]",
DependField.CHANNELS: "int32[pyarrow]",
DependField.CHECKSUM: "string[pyarrow]",
DependField.DURATION: "float64[pyarrow]",
DependField.FORMAT: "string[pyarrow]",
DependField.REMOVED: "int32[pyarrow]",
DependField.SAMPLING_RATE: "int32[pyarrow]",
DependField.TYPE: "int32[pyarrow]",
DependField.VERSION: "string[pyarrow]",
DEPENDENCY_TYPE = {
"meta": 0,
"media": 1,
"attachment": 2,
}
r"""Types of files stored in a database.
DEPEND_INDEX_DTYPE = "object"

Currently, a database can contain the following files:
class DependType:
r"""Dependency file types."""
* ``"meta"``: tables and misc tables
* ``"media"``: media files, e.g. audio, video, text
* ``"attachment"``: files included as attachments
META = 0
MEDIA = 1
ATTACHMENT = 2
"""


DEPEND_TYPE_NAMES = {
DependType.META: "meta",
DependType.MEDIA: "media",
DependType.ATTACHMENT: "attachment",
}
# Cache lock
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
LOCK_FILE = ".lock"
TIMEOUT_MSG = "Lock could not be acquired. Timeout exceeded."


# Flavors
Expand Down
51 changes: 27 additions & 24 deletions audb/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Dependencies:
""" # noqa: E501

def __init__(self):
self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values())
self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys())
self._df = self._set_dtypes(self._df)
# pyarrow schema
# used for reading and writing files
Expand Down Expand Up @@ -147,7 +147,9 @@ def attachments(self) -> typing.List[str]:
list of attachments
"""
return self._df[self._df["type"] == define.DependType.ATTACHMENT].index.tolist()
return self._df[
self._df["type"] == define.DEPENDENCY_TYPE["attachment"]
].index.tolist()

@property
def attachment_ids(self) -> typing.List[str]:
Expand All @@ -158,7 +160,7 @@ def attachment_ids(self) -> typing.List[str]:
"""
return self._df[
self._df["type"] == define.DependType.ATTACHMENT
self._df["type"] == define.DEPENDENCY_TYPE["attachment"]
].archive.tolist()

@property
Expand All @@ -179,7 +181,9 @@ def media(self) -> typing.List[str]:
list of media
"""
return self._df[self._df["type"] == define.DependType.MEDIA].index.tolist()
return self._df[
self._df["type"] == define.DEPENDENCY_TYPE["media"]
].index.tolist()

@property
def removed_media(self) -> typing.List[str]:
Expand All @@ -190,7 +194,8 @@ def removed_media(self) -> typing.List[str]:
"""
return self._df[
(self._df["type"] == define.DependType.MEDIA) & (self._df["removed"] == 1)
(self._df["type"] == define.DEPENDENCY_TYPE["media"])
& (self._df["removed"] == 1)
].index.tolist()

@property
Expand All @@ -215,7 +220,9 @@ def tables(self) -> typing.List[str]:
list of tables
"""
return self._df[self._df["type"] == define.DependType.META].index.tolist()
return self._df[
self._df["type"] == define.DEPENDENCY_TYPE["meta"]
].index.tolist()

def archive(self, file: str) -> str:
r"""Name of archive the file belongs to.
Expand Down Expand Up @@ -306,7 +313,7 @@ def load(self, path: str):
FileNotFoundError: if ``path`` does not exists
"""
self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values())
self._df = pd.DataFrame(columns=define.DEPENDENCY_TABLE.keys())
path = audeer.path(path)
extension = audeer.file_extension(path)
if extension not in ["csv", "pkl", "parquet"]:
Expand Down Expand Up @@ -442,7 +449,7 @@ def _add_attachment(
format, # format
0, # removed
0, # sampling_rate
define.DependType.ATTACHMENT, # type
define.DEPENDENCY_TYPE["attachment"], # type
version, # version
]

Expand Down Expand Up @@ -473,7 +480,7 @@ def _add_media(
"""
df = pd.DataFrame.from_records(
values,
columns=["file"] + list(define.DEPEND_FIELD_NAMES.values()),
columns=["file"] + list(define.DEPENDENCY_TABLE.keys()),
).set_index("file")
df = self._set_dtypes(df)
self._df = pd.concat([self._df, df])
Expand Down Expand Up @@ -507,7 +514,7 @@ def _add_meta(
format, # format
0, # removed
0, # sampling_rate
define.DependType.META, # type
define.DEPENDENCY_TYPE["meta"], # type
version, # version
]

Expand Down Expand Up @@ -598,11 +605,8 @@ def _set_dtypes(df: pd.DataFrame) -> pd.DataFrame:
with correct dtypes
"""
df.index = df.index.astype(define.DEPEND_INDEX_DTYPE, copy=False)
columns = define.DEPEND_FIELD_NAMES.values()
dtypes = define.DEPEND_FIELD_DTYPES.values()
mapping = {column: dtype for column, dtype in zip(columns, dtypes)}
df = df.astype(mapping, copy=False)
df.index = df.index.astype(define.DEPENDENCY_INDEX_DTYPE, copy=False)
df = df.astype(define.DEPENDENCY_TABLE, copy=False)
return df

def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame:
Expand All @@ -629,7 +633,7 @@ def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame:
)
df.set_index("file", inplace=True)
df.index.name = None
df.index = df.index.astype(define.DEPEND_INDEX_DTYPE)
df.index = df.index.astype(define.DEPENDENCY_INDEX_DTYPE)
return df

def _update_media(
Expand Down Expand Up @@ -659,7 +663,7 @@ def _update_media(
"""
df = pd.DataFrame.from_records(
values,
columns=["file"] + list(define.DEPEND_FIELD_NAMES.values()),
columns=["file"] + list(define.DEPENDENCY_TABLE.keys()),
).set_index("file")
df = self._set_dtypes(df)
self._df.loc[df.index] = df
Expand All @@ -676,8 +680,7 @@ def _update_media_version(
version: version string
"""
field = define.DEPEND_FIELD_NAMES[define.DependField.VERSION]
self._df.loc[files, field] = version
self._df.loc[files, "version"] = version


def error_message_missing_object(
Expand Down Expand Up @@ -802,9 +805,9 @@ def download_dependencies(
# Load `db.parquet` file,
# or if non-existent `db.zip`
# from backend
remote_deps_file = backend_interface.join("/", name, define.DEPENDENCIES_FILE)
remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE)
if backend_interface.exists(remote_deps_file, version):
local_deps_file = os.path.join(tmp_root, define.DEPENDENCIES_FILE)
local_deps_file = os.path.join(tmp_root, define.DEPENDENCY_FILE)
backend_interface.get_file(
remote_deps_file,
local_deps_file,
Expand All @@ -815,7 +818,7 @@ def download_dependencies(
remote_deps_file = backend_interface.join("/", name, define.DB + ".zip")
local_deps_file = os.path.join(
tmp_root,
define.LEGACY_DEPENDENCIES_FILE,
define.LEGACY_DEPENDENCY_FILE,
)
backend_interface.get_archive(
remote_deps_file,
Expand Down Expand Up @@ -850,7 +853,7 @@ def upload_dependencies(
version: database version
"""
local_deps_file = os.path.join(db_root, define.DEPENDENCIES_FILE)
remote_deps_file = backend_interface.join("/", name, define.DEPENDENCIES_FILE)
local_deps_file = os.path.join(db_root, define.DEPENDENCY_FILE)
remote_deps_file = backend_interface.join("/", name, define.DEPENDENCY_FILE)
deps.save(local_deps_file)
backend_interface.put_file(local_deps_file, remote_deps_file, version)
10 changes: 5 additions & 5 deletions audb/core/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def bit_depths(
"""
df = filtered_dependencies(name, version, media, tables, cache_root)
return set(df[df.type == define.DependType.MEDIA].bit_depth)
return set(df[df.type == define.DEPENDENCY_TYPE["media"]].bit_depth)


def channels(
Expand Down Expand Up @@ -142,7 +142,7 @@ def channels(
"""
df = filtered_dependencies(name, version, media, tables, cache_root)
return set(df[df.type == define.DependType.MEDIA].channels)
return set(df[df.type == define.DEPENDENCY_TYPE["media"]].channels)


def description(
Expand Down Expand Up @@ -213,7 +213,7 @@ def duration(
"""
df = filtered_dependencies(name, version, media, tables, cache_root)
return pd.to_timedelta(
df[df.type == define.DependType.MEDIA].duration.sum(),
df[df.type == define.DEPENDENCY_TYPE["media"]].duration.sum(),
unit="s",
)

Expand Down Expand Up @@ -277,7 +277,7 @@ def formats(
"""
df = filtered_dependencies(name, version, media, tables, cache_root)
return set(df[df.type == define.DependType.MEDIA].format)
return set(df[df.type == define.DEPENDENCY_TYPE["media"]].format)


def header(
Expand Down Expand Up @@ -606,7 +606,7 @@ def sampling_rates(
"""
df = filtered_dependencies(name, version, media, tables, cache_root)
return set(df[df.type == define.DependType.MEDIA].sampling_rate)
return set(df[df.type == define.DEPENDENCY_TYPE["media"]].sampling_rate)


def schemes(
Expand Down
Loading

0 comments on commit 99eb738

Please sign in to comment.