Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyFlink UDFs #397

Merged
merged 6 commits into from
Jul 8, 2023
Merged

Conversation

wagmarcel
Copy link
Member

No description provided.

@wagmarcel wagmarcel force-pushed the pyflink+operator-update-PR branch 4 times, most recently from fd2a0e3 to e7d59d0 Compare July 6, 2023 20:28
@wagmarcel wagmarcel changed the title Pyflink+operator update pr PyFlink UDFs Jul 6, 2023
@wagmarcel wagmarcel marked this pull request as ready for review July 6, 2023 21:07
@wagmarcel wagmarcel force-pushed the pyflink+operator-update-PR branch 2 times, most recently from 946e544 to 830aab8 Compare July 8, 2023 17:33
This commit is added python udfs to SQL Operator. In order to manage udfs, a new CRD
has been defined: flinkpythonudf
Flinkpythonudf contains python classes in the spec which are executed by flink as udfs.
In order to implement that, the submission mechanism had to be changed.
Up to know the jobs where submitted to the Flink SQl-Client by the kopf operator. The SQL-Gateway created
all the needed SQL intructions from the beamtables, beamviews and beamsqlservices CRDs.
Unfortunately, the SQL-Client cannot handle python udfs generic enough. Therfore, a new
submission mechanism has been developen in pyflink. It is very similar to the Flink-SQL mechanism.
Main difference is, that the Kopf operator creates now a SQL-JSON which contains the SQL, Tables, Views and Sets in separate
fields. In parallel the new flinkpythonudfoperator is storing the udf files under /tmp/udf. At deployment time
the Gateway can create a directory with the SQL-JSON together with the udfs and the deployment script are put into
a joint directory and executed. By this mechanism self-contained udfs can be submitted (i.e. udfs
which do not need non-standard imports). In future, we will add more features to add udfs-data and
requirement.txt files to handle python import dependencies.
Another addition is a new way to submit SQL scripts with the CRDs. In the past, all information had to be
included in the CRD. But this limits the submission of scripts to <1MB. Especially for RDF data this is
too small. In order to allow to handle larger deployments, another field can be submitted with beamsqlstatementsets:
'statementmaps'. Statementmaps are references to configmaps containing the SQL scripts. By this, arbitrary sets of configmaps
can be split to several configmaps and jointly submitted.
Changes;
* Updated dependcies in the Dockerfile to force executing of earlier stages for linting and testing
* Make gateway.js creating a submit directory and submit the SQL-crds with a pyflinkg submit-script
* Make the beamsqlservicesoperator to submit a SQL-JSON instead of a SQL-script
* Added flinkpythonudfoperator to manage new flinkpythonudf crds
* Allow to submit sql-scripts distribued over several configmaps by using the 'statementmaps' field
* Added example for flinkudfpython CRD: One normal udf function and one udf aggregator
Flink stores states in memory but this will not be sufficient for expecting state.
In order to mangage arbitraty large state the Rocksdb backend is used. Precondition
is to have sufficient ephemeral storage. This is added by this commit.
* Added values to configure for Flink ephemeral storage size
* Use always the custom build flink image rathe than vanilla
* Add emptyDir storage to Flink deployments
…ctory structure

The semantic folder is managing all data in a KMS subfolder. The bridges need access to the knowledge
data in order to manage inheritance of classes. E.g. updating data for a type 'plasmacutter' could also
mean updating data for type 'cutter' when type 'plasmacutter' is derived from type 'cutter'.
* Updated folder reference of knowledge.ttl
* Increased Kafka cpu usage from 300m to 500m
This commit provides dynamic python udf feature to the beamsqlstatementset operator.
Challenge was that udfs need to be defined in sqlite and for flink. Solution was to
wrap sqlite python udfs around flink python udfs. So when sqlite is using flink udfs it is
actually using pyflink udfs. This makes testing easier and ensures consistency between
sqlite and flink sql tests.
* Add two example for python udfs, one basic function and one aggregator
* Extended sparql_to_sql parsing to allow udfs with respective rdf namespaces:
  IFA = Namespace("https://industry-fusion.com/aggregators/v0.9/")
  IFN = Namespace("https://industry-fusion.com/functions/v0.9/")
* Add kms-tests for the 2 udf examples
…ink 18 is out.

The security issue is within testing and not exposed.
The push which was meant only for Scorpio is also affecting non scorpio images and repush old ibn images.
This leads sometimes to usage of old images in e2e test.
@wagmarcel wagmarcel merged commit f4be10f into IndustryFusion:main Jul 8, 2023
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants