Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use threading to run jobmanager loop #614

Closed
wants to merge 15 commits into from
Closed

use threading to run jobmanager loop #614

wants to merge 15 commits into from

Conversation

jdries
Copy link
Collaborator

@jdries jdries commented Sep 5, 2024

I propose to allow running the jobmanager cycle in a separate thread, allowing for a more clean option to interrupt it.

Copy link
Member

@soxofaan soxofaan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I'm not sure we should do this with asyncio, as everything (HTTP requests, file IO) are just classic blocking calls currently, so there is not much to gain.

I think in this case working with Python threading will be a bit simpler

@@ -252,6 +255,43 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:

return df

def start_job_thread(self,start_job: Callable[[], BatchJob],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "thread" in naming and docs might be confusing and setting wrong expectations as asyncio is not about threading but coroutines

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's now converted to use an actual 'Thread' object, so the confusion is gone?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soxofaan if this is fine now, we can merge and continue with the other PR's

openeo/extra/job_management.py Outdated Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
@jdries jdries changed the title use asyncio to run jobmanager loop use threading to run jobmanager loop Sep 15, 2024
@jdries jdries marked this pull request as ready for review September 15, 2024 12:02
openeo/extra/job_management.py Show resolved Hide resolved
openeo/extra/job_management.py Outdated Show resolved Hide resolved
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

df = manager._normalize_df(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this normalize_df be handled automatically in the manager?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we lack a good mechanism to initialize a job db correctly, we'll have to come up with something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soxofaan I introduced initialize_job_db to address this issue.

@soxofaan
Copy link
Member

FYI: I pushed some tweaks (some automatic code style cleanups), but more importantly some tweaks to the sleep/wait logic in 95a4ec7. Feel free to revert

return BatchJob(job_id=f"job-{year}", connection=connection)

job_db = CsvJobDatabase(output_file)
manager.initialize_job_db(job_db, df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#614 (comment):

@soxofaan I introduced initialize_job_db to address this issue.

I see, but I'd think that this initialization is not something the user should be bothered with,
can't this just be done automatically like in run_jobs:

if job_db.exists():
# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
elif df is not None:
df = self._normalize_df(df)
job_db.persist(df)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To initialize a job db, you need a dataframe (once), I try to avoid that, due to the confusion that Victor explained here:
#607 (comment)

soxofaan added a commit that referenced this pull request Sep 27, 2024
db initialization API needs some more thought, which is for another PR
@soxofaan
Copy link
Member

remove the initialize_job_db concern from the scope of this PR and merged in ffa7be2

@soxofaan soxofaan closed this Sep 27, 2024
@soxofaan soxofaan deleted the async_jobmanager branch September 27, 2024 10:57
soxofaan added a commit that referenced this pull request Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants