Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸš€ Anomalib Pipelines #2005

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/anomalib/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rich import traceback

from anomalib import TaskType, __version__
from anomalib.cli.pipelines import PIPELINE_REGISTRY, pipeline_subcommands, run_pipeline
from anomalib.cli.utils.help_formatter import CustomHelpFormatter, get_short_docstring
from anomalib.cli.utils.openvino import add_openvino_export_arguments
from anomalib.loggers import configure_logger
Expand Down Expand Up @@ -132,6 +133,13 @@ def add_subcommands(self, **kwargs) -> None:
# add arguments to subcommand
getattr(self, f"add_{subcommand}_arguments")(sub_parser)

# Add pipeline subcommands
if PIPELINE_REGISTRY is not None:
for subcommand, value in pipeline_subcommands().items():
sub_parser = PIPELINE_REGISTRY[subcommand].get_parser()
self.subcommand_parsers[subcommand] = sub_parser
parser_subcommands.add_subcommand(subcommand, sub_parser, help=value["description"])

def add_arguments_to_parser(self, parser: ArgumentParser) -> None:
"""Extend trainer's arguments to add engine arguments.

Expand Down Expand Up @@ -353,6 +361,8 @@ def _run_subcommand(self) -> None:
fn = getattr(self.engine, self.subcommand)
fn_kwargs = self._prepare_subcommand_kwargs(self.subcommand)
fn(**fn_kwargs)
elif PIPELINE_REGISTRY is not None and self.subcommand in pipeline_subcommands():
run_pipeline(self.config)
else:
self.config_init = self.parser.instantiate_classes(self.config)
getattr(self, f"{self.subcommand}")()
Expand Down
41 changes: 41 additions & 0 deletions src/anomalib/cli/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Subcommand for pipelines."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import logging

from jsonargparse import Namespace

from anomalib.cli.utils.help_formatter import get_short_docstring
from anomalib.utils.exceptions import try_import

logger = logging.getLogger(__name__)

if try_import("anomalib.pipelines"):
from anomalib.pipelines import Benchmark
from anomalib.pipelines.components.base import Pipeline

PIPELINE_REGISTRY: dict[str, type[Pipeline]] | None = {"benchmark": Benchmark}
else:
PIPELINE_REGISTRY = None


def pipeline_subcommands() -> dict[str, dict[str, str]]:
"""Return subcommands for pipelines."""
if PIPELINE_REGISTRY is not None:
return {name: {"description": get_short_docstring(pipeline)} for name, pipeline in PIPELINE_REGISTRY.items()}
return {}


def run_pipeline(args: Namespace) -> None:
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
"""Run pipeline."""
logger.warning("This feature is experimental. It may change or be removed in the future.")
if PIPELINE_REGISTRY is not None:
subcommand = args.subcommand
config = args[subcommand]
PIPELINE_REGISTRY[subcommand]().run(config)
else:
msg = "Pipeline is not available"
raise ValueError(msg)
5 changes: 2 additions & 3 deletions src/anomalib/cli/utils/help_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import argparse
import re
import sys
from typing import TypeVar

import docstring_parser
from jsonargparse import DefaultHelpFormatter
Expand Down Expand Up @@ -36,11 +35,11 @@
print("To use other subcommand using `anomalib install`")


def get_short_docstring(component: TypeVar) -> str:
def get_short_docstring(component: type) -> str:
"""Get the short description from the docstring.

Args:
component (TypeVar): The component to get the docstring from
component (type): The component to get the docstring from

Returns:
str: The short description
Expand Down
26 changes: 20 additions & 6 deletions src/anomalib/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Copyright (C) 2022-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import importlib
import logging
from enum import Enum
Expand All @@ -29,20 +28,35 @@
)


def get_datamodule(config: DictConfig | ListConfig) -> AnomalibDataModule:
class UnknownDatamoduleError(ModuleNotFoundError):
...


def get_datamodule(config: DictConfig | ListConfig | dict) -> AnomalibDataModule:
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
"""Get Anomaly Datamodule.

Args:
config (DictConfig | ListConfig): Configuration of the anomaly model.
config (DictConfig | ListConfig | dict): Configuration of the anomaly model.

Returns:
PyTorch Lightning DataModule
"""
logger.info("Loading the datamodule")

module = importlib.import_module(".".join(config.data.class_path.split(".")[:-1]))
dataclass = getattr(module, config.data.class_path.split(".")[-1])
init_args = {**config.data.get("init_args", {})} # get dict
if isinstance(config, dict):
config = DictConfig(config)

try:
_config = config.data if "data" in config else config
if len(_config.class_path.split(".")) > 1:
module = importlib.import_module(".".join(_config.class_path.split(".")[:-1]))
else:
module = importlib.import_module("anomalib.data")
except ModuleNotFoundError as exception:
logger.exception(f"ModuleNotFoundError: {_config.class_path}")
raise UnknownDatamoduleError from exception
dataclass = getattr(module, _config.class_path.split(".")[-1])
init_args = {**_config.get("init_args", {})} # get dict
if "image_size" in init_args:
init_args["image_size"] = to_tuple(init_args["image_size"])

Expand Down
1 change: 0 additions & 1 deletion src/anomalib/metrics/f1_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0


import logging
from typing import Any, Literal

Expand Down
8 changes: 8 additions & 0 deletions src/anomalib/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Pipelines for end-to-end usecases."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .benchmark import Benchmark

__all__ = ["Benchmark"]
8 changes: 8 additions & 0 deletions src/anomalib/pipelines/benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Benchmarking."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .pipeline import Benchmark

__all__ = ["Benchmark"]
41 changes: 41 additions & 0 deletions src/anomalib/pipelines/benchmark/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Benchmark job generator."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
ashwinvaidya17 marked this conversation as resolved.
Show resolved Hide resolved

from collections.abc import Generator

from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.pipelines.components import JobGenerator
from anomalib.pipelines.components.utils import get_iterator_from_grid_dict
from anomalib.utils.logging import hide_output

from .job import BenchmarkJob


class BenchmarkJobGenerator(JobGenerator):
"""Generate BenchmarkJob.

Args:
accelerator (str): The accelerator to use.
"""

def __init__(self, accelerator: str) -> None:
self.accelerator = accelerator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the terminology here. We use this variable mainly to distinguish between cpu and gpu, but I'm not sure if cpu is technically considered to be an accelerator. Maybe device would be a more suitable name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally this was called device. I think we discussed on changing this to accelerator to be inline with lightning's terminology. I have no preference here. So, I can rename this once we finalise the name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@property
def job_class(self) -> type:
"""Return the job class."""
return BenchmarkJob

@hide_output
def generate_jobs(self, args: dict) -> Generator[BenchmarkJob, None, None]:
"""Return iterator based on the arguments."""
for _container in get_iterator_from_grid_dict(args):
yield BenchmarkJob(
accelerator=self.accelerator,
seed=_container["seed"],
model=get_model(_container["model"]),
datamodule=get_datamodule(_container["data"]),
)
108 changes: 108 additions & 0 deletions src/anomalib/pipelines/benchmark/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Benchmarking job."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import logging
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any

import pandas as pd
from lightning import seed_everything
from rich.console import Console
from rich.table import Table

from anomalib.data import AnomalibDataModule
from anomalib.engine import Engine
from anomalib.models import AnomalyModule
from anomalib.pipelines.components import Job
from anomalib.utils.logging import hide_output

logger = logging.getLogger(__name__)


class BenchmarkJob(Job):
"""Benchmarking job.

Args:
accelerator (str): The accelerator to use.
model (AnomalyModule): The model to use.
datamodule (AnomalibDataModule): The data module to use.
seed (int): The seed to use.
"""

name = "benchmark"

def __init__(self, accelerator: str, model: AnomalyModule, datamodule: AnomalibDataModule, seed: int) -> None:
super().__init__()
self.accelerator = accelerator
self.model = model
self.datamodule = datamodule
self.seed = seed

@hide_output
def run(
self,
task_id: int | None = None,
) -> dict[str, Any]:
"""Run the benchmark."""
devices: str | list[int] = "auto"
if task_id is not None:
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
devices = [task_id]
logger.info(f"Running job {self.model.__class__.__name__} with device {task_id}")
with TemporaryDirectory() as temp_dir:
seed_everything(self.seed)
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
engine = Engine(
accelerator=self.accelerator,
devices=devices,
default_root_dir=temp_dir,
)
engine.fit(self.model, self.datamodule)
test_results = engine.test(self.model, self.datamodule)
# TODO(ashwinvaidya17): Restore throughput
# https://github.com/openvinotoolkit/anomalib/issues/2054
output = {
"seed": self.seed,
"accelerator": self.accelerator,
"model": self.model.__class__.__name__,
"data": self.datamodule.__class__.__name__,
"category": self.datamodule.category,
**test_results[0],
}
logger.info(f"Completed with result {output}")
return output

@staticmethod
def collect(results: list[dict[str, Any]]) -> pd.DataFrame:
"""Gather the results returned from run."""
output: dict[str, Any] = {}
for key in results[0]:
output[key] = []
for result in results:
for key, value in result.items():
output[key].append(value)
return pd.DataFrame(output)

@staticmethod
def save(result: pd.DataFrame) -> None:
"""Save the result to a csv file."""
BenchmarkJob._print_tabular_results(result)
file_path = Path("runs") / BenchmarkJob.name / datetime.now().strftime("%Y-%m-%d-%H:%M:%S") / "results.csv"
file_path.parent.mkdir(parents=True, exist_ok=True)
result.to_csv(file_path, index=False)
logger.info(f"Saved results to {file_path}")

@staticmethod
def _print_tabular_results(gathered_result: pd.DataFrame) -> None:
samet-akcay marked this conversation as resolved.
Show resolved Hide resolved
"""Print the tabular results."""
if gathered_result is not None:
console = Console()
table = Table(title=f"{BenchmarkJob.name} Results", show_header=True, header_style="bold magenta")
_results = gathered_result.to_dict("list")
for column in _results:
table.add_column(column)
for row in zip(*_results.values(), strict=False):
table.add_row(*[str(value) for value in row])
console.print(table)
29 changes: 29 additions & 0 deletions src/anomalib/pipelines/benchmark/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Benchmarking."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import torch

from anomalib.pipelines.components.base import Pipeline, Runner
from anomalib.pipelines.components.runners import ParallelRunner, SerialRunner

from .generator import BenchmarkJobGenerator


class Benchmark(Pipeline):
"""Benchmarking pipeline."""

def _setup_runners(self, args: dict) -> list[Runner]:
"""Setup the runners for the pipeline."""
accelerators = args["accelerator"] if isinstance(args["accelerator"], list) else [args["accelerator"]]
runners: list[Runner] = []
for accelerator in accelerators:
if accelerator == "cpu":
runners.append(SerialRunner(BenchmarkJobGenerator("cpu")))
elif accelerator == "cuda":
runners.append(ParallelRunner(BenchmarkJobGenerator("cuda"), n_jobs=torch.cuda.device_count()))
else:
msg = f"Unsupported accelerator: {accelerator}"
raise ValueError(msg)
return runners
13 changes: 13 additions & 0 deletions src/anomalib/pipelines/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Utilities for the pipeline modules."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .base import Job, JobGenerator, Pipeline, Runner

__all__ = [
"Job",
"JobGenerator",
"Pipeline",
"Runner",
]
10 changes: 10 additions & 0 deletions src/anomalib/pipelines/components/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Base classes for pipelines."""

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from .job import Job, JobGenerator
from .pipeline import Pipeline
from .runner import Runner

__all__ = ["Job", "JobGenerator", "Runner", "Pipeline"]
Loading
Loading