Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

31855 #32593

Closed
wants to merge 15 commits into from
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ def expr_to_stages(expr):

if stage is None:
# No stage available, compute this expression as part of a new stage.
stage = Stage(expr.args(), expr.requires_partition_by())
stage = Stage([arg for arg in expr.args() if arg in inputs],
expr.requires_partition_by())
for arg in expr.args():
# For each argument, declare that it is also available in
# this new stage.
Expand All @@ -422,6 +423,7 @@ def expr_to_stage(expr):

@_memoize
def stage_to_result(stage):
# print({expr._id: expr_to_pcoll(expr) for expr in stage.inputs})
return {expr._id: expr_to_pcoll(expr)
for expr in stage.inputs} | ComputeStage(stage)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ def partition_for(self, element, num_partitions, *args, **kwargs):

def _get_function_body_without_inners(func):
source_lines = inspect.getsourcelines(func)[0]
source_lines = dropwhile(lambda x: x.startswith("@"), source_lines)
source_lines = dropwhile(lambda x: x.strip().startswith("@"), source_lines)
first_def_line = next(source_lines).strip()
if first_def_line.startswith("def "):
last_def_line_without_comment = first_def_line.split("#")[0] \
Expand Down
Loading