Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚀 Anomalib Pipelines #2060

Merged
merged 8 commits into from
May 24, 2024
10 changes: 10 additions & 0 deletions src/anomalib/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from rich import traceback

from anomalib import TaskType, __version__
from anomalib.cli.pipelines import PIPELINE_REGISTRY, pipeline_subcommands, run_pipeline
from anomalib.cli.utils.help_formatter import CustomHelpFormatter, get_short_docstring
from anomalib.cli.utils.openvino import add_openvino_export_arguments
from anomalib.loggers import configure_logger
Expand Down Expand Up @@ -131,6 +132,13 @@ def add_subcommands(self, **kwargs) -> None:
# add arguments to subcommand
getattr(self, f"add_{subcommand}_arguments")(sub_parser)

# Add pipeline subcommands
if PIPELINE_REGISTRY is not None:
for subcommand, value in pipeline_subcommands().items():
sub_parser = PIPELINE_REGISTRY[subcommand].get_parser()
self.subcommand_parsers[subcommand] = sub_parser
parser_subcommands.add_subcommand(subcommand, sub_parser, help=value["description"])

def add_arguments_to_parser(self, parser: ArgumentParser) -> None:
"""Extend trainer's arguments to add engine arguments.

Expand Down Expand Up @@ -355,6 +363,8 @@ def _run_subcommand(self) -> None:
fn = getattr(self.engine, self.subcommand)
fn_kwargs = self._prepare_subcommand_kwargs(self.subcommand)
fn(**fn_kwargs)
elif PIPELINE_REGISTRY is not None and self.subcommand in pipeline_subcommands():
run_pipeline(self.config)
else:
self.config_init = self.parser.instantiate_classes(self.config)
getattr(self, f"{self.subcommand}")()
Expand Down
41 changes: 41 additions & 0 deletions src/anomalib/cli/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Subcommand for pipelines."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import logging

from jsonargparse import Namespace

from anomalib.cli.utils.help_formatter import get_short_docstring
from anomalib.utils.exceptions import try_import

logger = logging.getLogger(__name__)

if try_import("anomalib.pipelines"):
from anomalib.pipelines import Benchmark
from anomalib.pipelines.components.base import Pipeline

PIPELINE_REGISTRY: dict[str, type[Pipeline]] | None = {"benchmark": Benchmark}
else:
PIPELINE_REGISTRY = None


def pipeline_subcommands() -> dict[str, dict[str, str]]:
"""Return subcommands for pipelines."""
if PIPELINE_REGISTRY is not None:
return {name: {"description": get_short_docstring(pipeline)} for name, pipeline in PIPELINE_REGISTRY.items()}
return {}


def run_pipeline(args: Namespace) -> None:
"""Run pipeline."""
logger.warning("This feature is experimental. It may change or be removed in the future.")
if PIPELINE_REGISTRY is not None:
subcommand = args.subcommand
config = args[subcommand]
PIPELINE_REGISTRY[subcommand]().run(config)
else:
msg = "Pipeline is not available"
raise ValueError(msg)
5 changes: 2 additions & 3 deletions src/anomalib/cli/utils/help_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import argparse
import re
import sys
from typing import TypeVar

import docstring_parser
from jsonargparse import DefaultHelpFormatter
Expand Down Expand Up @@ -38,11 +37,11 @@
print("To use other subcommand using `anomalib install`")


def get_short_docstring(component: TypeVar) -> str:
def get_short_docstring(component: type) -> str:
"""Get the short description from the docstring.

Args:
component (TypeVar): The component to get the docstring from
component (type): The component to get the docstring from

Returns:
str: The short description
Expand Down
26 changes: 20 additions & 6 deletions src/anomalib/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Copyright (C) 2022-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import importlib
import logging
from enum import Enum
Expand All @@ -29,20 +28,35 @@
)


def get_datamodule(config: DictConfig | ListConfig) -> AnomalibDataModule:
class UnknownDatamoduleError(ModuleNotFoundError):
...


def get_datamodule(config: DictConfig | ListConfig | dict) -> AnomalibDataModule:
"""Get Anomaly Datamodule.

Args:
config (DictConfig | ListConfig): Configuration of the anomaly model.
config (DictConfig | ListConfig | dict): Configuration of the anomaly model.

Returns:
PyTorch Lightning DataModule
"""
logger.info("Loading the datamodule")

module = importlib.import_module(".".join(config.data.class_path.split(".")[:-1]))
dataclass = getattr(module, config.data.class_path.split(".")[-1])
init_args = {**config.data.get("init_args", {})} # get dict
if isinstance(config, dict):
config = DictConfig(config)

try:
_config = config.data if "data" in config else config
if len(_config.class_path.split(".")) > 1:
module = importlib.import_module(".".join(_config.class_path.split(".")[:-1]))
else:
module = importlib.import_module("anomalib.data")
except ModuleNotFoundError as exception:
logger.exception(f"ModuleNotFoundError: {_config.class_path}")
raise UnknownDatamoduleError from exception
dataclass = getattr(module, _config.class_path.split(".")[-1])
init_args = {**_config.get("init_args", {})} # get dict
if "image_size" in init_args:
init_args["image_size"] = to_tuple(init_args["image_size"])

Expand Down
4 changes: 2 additions & 2 deletions src/anomalib/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any

import torch
from lightning.pytorch.callbacks import Callback
from lightning.pytorch.callbacks import Callback, RichModelSummary, RichProgressBar
from lightning.pytorch.loggers import Logger
from lightning.pytorch.trainer import Trainer
from lightning.pytorch.utilities.types import _EVALUATE_OUTPUT, _PREDICT_OUTPUT, EVAL_DATALOADERS, TRAIN_DATALOADERS
Expand Down Expand Up @@ -406,7 +406,7 @@ def _setup_transform(

def _setup_anomalib_callbacks(self) -> None:
"""Set up callbacks for the trainer."""
_callbacks: list[Callback] = []
_callbacks: list[Callback] = [RichProgressBar(), RichModelSummary()]

# Add ModelCheckpoint if it is not in the callbacks list.
has_checkpoint_callback = any(isinstance(c, ModelCheckpoint) for c in self._cache.args["callbacks"])
Expand Down
4 changes: 0 additions & 4 deletions src/anomalib/loggers/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
from lightning.pytorch.utilities import rank_zero_only
from matplotlib.figure import Figure

from anomalib.utils.exceptions.imports import try_import

from .base import ImageLoggerBase

try_import("mlflow")


class AnomalibMLFlowLogger(ImageLoggerBase, MLFlowLogger):
"""Logger for MLFlow.
Expand Down
1 change: 0 additions & 1 deletion src/anomalib/metrics/f1_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import logging
from typing import Any, Literal

Expand Down
4 changes: 2 additions & 2 deletions src/anomalib/models/components/sampling/k_center_greedy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
# SPDX-License-Identifier: Apache-2.0

import torch
from rich.progress import track
from torch.nn import functional as F # noqa: N812

from anomalib.models.components.dimensionality_reduction import SparseRandomProjection
from anomalib.utils.rich import safe_track


class KCenterGreedy:
Expand Down Expand Up @@ -98,7 +98,7 @@ def select_coreset_idxs(self, selected_idxs: list[int] | None = None) -> list[in

selected_coreset_idxs: list[int] = []
idx = int(torch.randint(high=self.n_observations, size=(1,)).item())
for _ in track(range(self.coreset_size), description="Selecting Coreset Indices."):
for _ in safe_track(sequence=range(self.coreset_size), description="Selecting Coreset Indices."):
self.update_distances(cluster_centers=[idx])
idx = self.get_new_idx()
if idx in selected_idxs:
Expand Down
8 changes: 8 additions & 0 deletions src/anomalib/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Pipelines for end-to-end usecases."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .benchmark import Benchmark

__all__ = ["Benchmark"]
8 changes: 8 additions & 0 deletions src/anomalib/pipelines/benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Benchmarking."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .pipeline import Benchmark

__all__ = ["Benchmark"]
47 changes: 47 additions & 0 deletions src/anomalib/pipelines/benchmark/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Benchmark job generator."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Generator

from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.pipelines.components import JobGenerator
from anomalib.pipelines.components.utils import get_iterator_from_grid_dict
from anomalib.pipelines.types import PREV_STAGE_RESULT
from anomalib.utils.logging import hide_output

from .job import BenchmarkJob


class BenchmarkJobGenerator(JobGenerator):
"""Generate BenchmarkJob.

Args:
accelerator (str): The accelerator to use.
"""

def __init__(self, accelerator: str) -> None:
self.accelerator = accelerator

@property
def job_class(self) -> type:
"""Return the job class."""
return BenchmarkJob

@hide_output
def generate_jobs(
self,
args: dict,
previous_stage_result: PREV_STAGE_RESULT,
) -> Generator[BenchmarkJob, None, None]:
"""Return iterator based on the arguments."""
del previous_stage_result # Not needed for this job
for _container in get_iterator_from_grid_dict(args):
yield BenchmarkJob(
accelerator=self.accelerator,
seed=_container["seed"],
model=get_model(_container["model"]),
datamodule=get_datamodule(_container["data"]),
)
108 changes: 108 additions & 0 deletions src/anomalib/pipelines/benchmark/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Benchmarking job."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import logging
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any

import pandas as pd
from lightning import seed_everything
from rich.console import Console
from rich.table import Table

from anomalib.data import AnomalibDataModule
from anomalib.engine import Engine
from anomalib.models import AnomalyModule
from anomalib.pipelines.components import Job
from anomalib.utils.logging import hide_output

logger = logging.getLogger(__name__)


class BenchmarkJob(Job):
"""Benchmarking job.

Args:
accelerator (str): The accelerator to use.
model (AnomalyModule): The model to use.
datamodule (AnomalibDataModule): The data module to use.
seed (int): The seed to use.
"""

name = "benchmark"

def __init__(self, accelerator: str, model: AnomalyModule, datamodule: AnomalibDataModule, seed: int) -> None:
super().__init__()
self.accelerator = accelerator
self.model = model
self.datamodule = datamodule
self.seed = seed

@hide_output
def run(
self,
task_id: int | None = None,
) -> dict[str, Any]:
"""Run the benchmark."""
devices: str | list[int] = "auto"
if task_id is not None:
devices = [task_id]
logger.info(f"Running job {self.model.__class__.__name__} with device {task_id}")
with TemporaryDirectory() as temp_dir:
seed_everything(self.seed)
engine = Engine(
accelerator=self.accelerator,
devices=devices,
default_root_dir=temp_dir,
)
engine.fit(self.model, self.datamodule)
test_results = engine.test(self.model, self.datamodule)
# TODO(ashwinvaidya17): Restore throughput
# https://github.com/openvinotoolkit/anomalib/issues/2054
output = {
"seed": self.seed,
"accelerator": self.accelerator,
"model": self.model.__class__.__name__,
"data": self.datamodule.__class__.__name__,
"category": self.datamodule.category,
**test_results[0],
}
logger.info(f"Completed with result {output}")
return output

@staticmethod
def collect(results: list[dict[str, Any]]) -> pd.DataFrame:
"""Gather the results returned from run."""
output: dict[str, Any] = {}
for key in results[0]:
output[key] = []
for result in results:
for key, value in result.items():
output[key].append(value)
return pd.DataFrame(output)

@staticmethod
def save(result: pd.DataFrame) -> None:
"""Save the result to a csv file."""
BenchmarkJob._print_tabular_results(result)
file_path = Path("runs") / BenchmarkJob.name / datetime.now().strftime("%Y-%m-%d-%H_%M_%S") / "results.csv"
file_path.parent.mkdir(parents=True, exist_ok=True)
result.to_csv(file_path, index=False)
logger.info(f"Saved results to {file_path}")

@staticmethod
def _print_tabular_results(gathered_result: pd.DataFrame) -> None:
"""Print the tabular results."""
if gathered_result is not None:
console = Console()
table = Table(title=f"{BenchmarkJob.name} Results", show_header=True, header_style="bold magenta")
_results = gathered_result.to_dict("list")
for column in _results:
table.add_column(column)
for row in zip(*_results.values(), strict=False):
table.add_row(*[str(value) for value in row])
console.print(table)
Loading
Loading