Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline decorator #2629

Merged
merged 20 commits into from
Feb 12, 2021
72 changes: 69 additions & 3 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2017-2020, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2017-2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,20 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#pylint: disable=no-member
# pylint: disable=no-member
from collections import deque
from nvidia.dali import backend as b
from nvidia.dali import tensors as Tensors
from nvidia.dali import types
from nvidia.dali.backend import CheckDLPackCapsule
from threading import local as tls
from . import data_node as _data_node
import functools
import inspect
import warnings
import ctypes

pipeline_tls = tls()

from .data_node import DataNode
DataNode.__module__ = __name__ # move to pipeline

DataNode.__module__ = __name__ # move to pipeline


def _show_deprecation_warning(deprecated, in_favor_of):
# show only this warning
Expand All @@ -34,6 +39,7 @@ def _show_deprecation_warning(deprecated, in_favor_of):
warnings.warn("{} is deprecated, please use {} instead".format(deprecated, in_favor_of),
Warning, stacklevel=2)


def _get_default_stream_for_array(array):
if types._is_torch_tensor(array):
import torch
Expand Down Expand Up @@ -990,3 +996,63 @@ def iter_setup(self):
For example, one can use this function to feed the input
data from NumPy arrays."""
pass


def _discriminate_args(**kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _discriminate_args(**kwargs):
def _separate_args(**kwargs):

or

Suggested change
def _discriminate_args(**kwargs):
def _split_args(**kwargs):

"""Split args on those applicable to Pipeline constructor and the rest."""
fca = inspect.getfullargspec(Pipeline.__init__) # Full Ctor Args
ctor_args = dict(filter(lambda x: x[0] in fca.args or x[0] in fca.kwonlyargs, kwargs.items()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the other way round: detect arguments for this function - if not found, try Pipeline - if not found either, raise an error.
Rationale: if we add new arguments to Pipeline's constructor, we may break users' code because a named argument would be redirected to the Pipeline constructor instead.

Copy link
Member Author

@szalpal szalpal Feb 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with one exception: there's be a problem with the error part. I believe we'd like this to be legal:

@pipeline_def
def peculiar_pipeline(operator=fn.old_color_twist, **operator_kwargs):
   return operator(**operator_kwargs)

p = peculiar_pipeline(operator=fn.color_twist, brightness=5)

So the remaining args we still need to assign to the func_args

fn_args = dict(filter(lambda x: x[0] not in ctor_args.keys(), kwargs.items()))
return ctor_args, fn_args


def pipeline(fn=None, **pipeline_kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name can be misleading because we have
nvidia.dali.pipeline.Pipeline (the pipeline class) and nvidia.dali.pipeline.pipeline the decorator.
How about renaming to pipeline_def to avoid confusion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, pipeline_def or def_pipeline, or def_pipe or make_pipeline. Whatever to no have three different nvidia.dali.pipeline.[Pp]ipeline things.

Copy link
Member Author

@szalpal szalpal Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm fine with pipeline, because we do already have the Pipeline in the same package and place. So we do already:

from nvidia.dali.pipeline import Pipeline

pipe = Pipeline(...)

Why not doing

from nvidia.dali.pipeline import pipeline

@pipeline
def pipe():
   ...

If you make a mistake there, the interpreter will boldly tell you about it

Let's leave more sophisticated naming for the user:

import nvidia.dali.pipeline.pipeline as pipeline_decorator
import nvidia.dali.pipeline.Pipeline as pipeline_object

"""
Decorator for wrapping functions that define DALI pipelines.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add something like: Decorated functions becomes a pipeline factory? Or Decorated function will return instances of Pipeline with processing defined by the function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter - or sth like:

Suggested change
Decorator for wrapping functions that define DALI pipelines.
A decorator which creates a factory of DALI pipelines whose processing graph is defined by the decorated function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


**Usage**
Copy link
Contributor

@mzient mzient Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that some description should go before usage/example:

    ``pipeline_def`` returns a function that creates a pipeline, where the processing graph is defined by the function ``fn`` passed as the first positional argument. It can be used as a decorator, in which case the decorated function becomes a pipeline factory.
The function returned is roughly equivalent to::

    def factory(arg1, arg2, ...)
        pipe = nvidia.dali.Pipeline()
        with pipe:
            pipe.set_outputs(*fn(arg1, arg2, ...))
        return pipe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that all this is written when you google "how to write a decorator?". So I wouldn't add it at the beginning, but I added it at the end of the paragraph, cause this note is in fact valuable.

Copy link
Contributor

@mzient mzient Feb 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but.... how can you possibly google that a decorator creates a DALI pipeline, calls your function within the with-scope of the newly created pipeline and calls set_outputs with the return values of the function you supplied???
Perhaps it should be written as text, not as code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the doc is fine, we shouldn't use multi compound statements, it is hard to follow.

I was thinking whether to move the equivalent code to the top, but I'm not sure if it's better than usage examples.


First, implement a function, that defines a DALI pipeline.
Such function shall return DALI's DataNodes. These return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Such function shall return DALI's DataNodes. These return
Such function shall return one or more DALI DataNodes, representing the outputs of the pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can return not only DataNodes.

I'm missing the information, that this decorator will turn this function into something like Pipeline factory. And that the returns of this function become output of the Pieline (for example .run()) and the function will return Pipeline instances.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

values will denote output from the pipeline. You can
decorate this function with ``@pipeline``::

@nvidia.dali.pipeline.pipeline
def pipe(flip_vertical, flip_horizontal):
data, _ = fn.file_reader(file_root=images_dir)
img = fn.image_decoder(data, device="mixed")
flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical)
return flipped, img

When creating a Pipeline object using the decorated function,
you can pass any number of ``Pipeline.__init__`` arguments as
keyword-args to the call::

my_pipe = pipe(0, batch_size=32, num_threads=1, device_id=0, flip_horizontal=1)
my_pipe.build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe add flipped, img = my_pipe.run() and explained that those are the outputs of the pipeline defined by the decorated function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Additionally, decorator can accept Pipeline constructor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Additionally, decorator can accept Pipeline constructor
Additionally, the decorator can accept Pipeline constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

parameters. Please note, that in this case you must use
keyword args only. Any Pipeline constructor parameter
passed later at pipeline instantiation will overwrite
the decorator-defined params::

@nvidia.dali.pipeline.pipeline(batch_size=32, num_threads=3)
def pipe():
data = fn.external_source(source=my_generator)
return data

my_pipe = pipe(batch_size=128) # batch_size=128 overwrites batch_size=32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also thinking if the doc should mention that this is equivalent to using with pipe: and set_outputs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""
def actual_decorator(func):
@functools.wraps(func)
def create_pipeline(*args, **kwargs):
ctor_args, fn_kwargs = _discriminate_args(**kwargs)
pipe = Pipeline(**{**pipeline_kwargs, **ctor_args}) # Merge and overwrite dict
with pipe:
pipe_outputs = func(*args, **fn_kwargs)
_ = pipe_outputs if isinstance(pipe_outputs, tuple) else pipe_outputs,
pipe.set_outputs(*_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PLZ NO.
_ is for ignored values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return pipe
return create_pipeline
return actual_decorator(fn) if fn else actual_decorator
Loading