Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Controlling which modules are captured with Lightweight components #1435

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ._structures import *

from pathlib import Path
from typing import TypeVar, Generic
from typing import TypeVar, Generic, List

T = TypeVar('T')

Expand All @@ -45,16 +45,23 @@ def _python_function_name_to_component_name(name):
return re.sub(' +', ' ', name.replace('_', ' ')).strip(' ').capitalize()


def _capture_function_code_using_cloudpickle(func) -> str:
def _capture_function_code_using_cloudpickle(func, modules_to_capture: List[str] = None) -> str:
import sys
import cloudpickle
import pickle

if modules_to_capture is None:
modules_to_capture = [func.__module__]

# Hack to force cloudpickle to capture the whole function instead of just referencing the code file. See https://github.com/cloudpipe/cloudpickle/blob/74d69d759185edaeeac7bdcb7015cfc0c652f204/cloudpickle/cloudpickle.py#L490
old_modules = {}
try: # Try is needed to restore the state if something goes wrong
old_module = sys.modules.pop(func.__module__)
for module_name in modules_to_capture:
if module_name in sys.modules:
old_modules[module_name] = sys.modules.pop(module_name)
func_pickle = cloudpickle.dumps(func, pickle.DEFAULT_PROTOCOL)
finally:
sys.modules[func.__module__] = old_module
sys.modules.update(old_modules)
func_code = '{func_name} = pickle.loads({func_pickle})'.format(func_name=func.__name__, func_pickle=repr(func_pickle))

code_lines = [
Expand All @@ -73,14 +80,15 @@ def _capture_function_code_using_cloudpickle(func) -> str:
return '\n'.join(code_lines)


def _func_to_component_spec(func, extra_code='', base_image=_default_base_image) -> ComponentSpec:
def _func_to_component_spec(func, extra_code='', base_image=_default_base_image, modules_to_capture: List[str] = None) -> ComponentSpec:
'''Takes a self-contained python function and converts it to component

Args:
func: Required. The function to be converted
base_image: Optional. Docker image to be used as a base image for the python component. Must have python 3.5+ installed. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Python source code that gets placed before the function code. Can be used as workaround to define types used in function signature.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured.
'''
decorator_base_image = getattr(func, '_component_base_image', None)
if decorator_base_image is not None:
Expand Down Expand Up @@ -155,7 +163,7 @@ def annotation_to_type_struct(annotation):

func_name=func.__name__

func_code = _capture_function_code_using_cloudpickle(func)
func_code = _capture_function_code_using_cloudpickle(func, modules_to_capture)

extra_output_external_names = [name + '_file' for name in extra_output_names]

Expand Down Expand Up @@ -232,11 +240,11 @@ def annotation_to_type_struct(annotation):
return component_spec


def _func_to_component_dict(func, extra_code='', base_image=_default_base_image):
return _func_to_component_spec(func, extra_code, base_image).to_dict()
def _func_to_component_dict(func, extra_code='', base_image=_default_base_image, modules_to_capture: List[str] = None):
return _func_to_component_spec(func, extra_code, base_image, modules_to_capture).to_dict()


def func_to_component_text(func, extra_code='', base_image=_default_base_image):
def func_to_component_text(func, extra_code='', base_image=_default_base_image, modules_to_capture: List[str] = None):
'''
Converts a Python function to a component definition and returns its textual representation

Expand All @@ -254,15 +262,16 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.

Returns:
Textual representation of a component definition
'''
component_dict = _func_to_component_dict(func, extra_code, base_image)
component_dict = _func_to_component_dict(func, extra_code, base_image, modules_to_capture)
return dump_yaml(component_dict)


def func_to_component_file(func, output_component_file, base_image=_default_base_image, extra_code='') -> None:
def func_to_component_file(func, output_component_file, base_image=_default_base_image, extra_code='', modules_to_capture: List[str] = None) -> None:
'''
Converts a Python function to a component definition and writes it to a file

Expand All @@ -281,14 +290,15 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.
'''

component_yaml = func_to_component_text(func, extra_code, base_image)
component_yaml = func_to_component_text(func, extra_code, base_image, modules_to_capture)

Path(output_component_file).write_text(component_yaml)


def func_to_container_op(func, output_component_file=None, base_image=_default_base_image, extra_code=''):
def func_to_container_op(func, output_component_file=None, base_image=_default_base_image, extra_code='', modules_to_capture: List[str] = None):
'''
Converts a Python function to a component and returns a task (ContainerOp) factory

Expand All @@ -307,13 +317,14 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
modules_to_capture: Optional. List of module names that will be captured (instead of just referencing) during the dependency scan. By default the func.__module__ is captured. The actual algorithm: Starting with the initial function, start traversing dependencies. If the dependecy.__module__ is in the modules_to_capture list then it's captured and it's dependencies are traversed. Otherwise the dependency is only referenced instead of capturing and its dependencies are not traversed.

Returns:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container.
'''

component_spec = _func_to_component_spec(func, extra_code, base_image)
component_spec = _func_to_component_spec(func, extra_code, base_image, modules_to_capture)

output_component_file = output_component_file or getattr(func, '_component_target_component_file', None)
if output_component_file:
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/components/test_data/module1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module_level_variable = 10


class ModuleLevelClass:
def class_method(self, x):
return x * module_level_variable


def module_func(a: float) -> float:
return a * 5


def module_func_with_deps(a: float, b: float) -> float:
return ModuleLevelClass().class_method(a) + module_func(b)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .module1 import module_func_with_deps

def module2_func_with_deps(a: float, b: float) -> float:
return module_func_with_deps(a, b) + 10
37 changes: 37 additions & 0 deletions sdk/python/tests/components/test_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,49 @@ def main_func(a: float, b: float) -> float:

self.helper_test_2_in_1_out_component_using_local_call(func, op)

def test_func_to_container_op_check_nothing_extra_captured(self):
def f1():
pass

def f2():
pass

def main_func(a: float, b: float) -> float:
f1()
try:
eval('f2()')
except:
return a + b
raise AssertionError("f2 should not be captured, because it's not a dependency.")

expected_func = lambda a, b: a + b
op = comp.func_to_container_op(main_func)

self.helper_test_2_in_1_out_component_using_local_call(expected_func, op)

def test_func_to_container_op_call_other_func_global(self):
func = module_func_with_deps
op = comp.func_to_container_op(func, output_component_file='comp.yaml')

self.helper_test_2_in_1_out_component_using_local_call(func, op)

def test_func_to_container_op_with_imported_func(self):
from .test_data.module1 import module_func_with_deps as module1_func_with_deps
func = module1_func_with_deps
op = comp.func_to_container_op(func)

self.helper_test_2_in_1_out_component_using_local_call(func, op)

def test_func_to_container_op_with_imported_func2(self):
from .test_data.module2_which_depends_on_module1 import module2_func_with_deps as module2_func_with_deps
func = module2_func_with_deps
op = comp.func_to_container_op(func, modules_to_capture=[
'tests.components.test_data.module1',
'tests.components.test_data.module2_which_depends_on_module1'
])

self.helper_test_2_in_1_out_component_using_local_call(func, op)

def test_func_to_container_op_multiple_named_typed_outputs(self):
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
Expand Down