Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyFlink UDFs #397

Merged
merged 6 commits into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions FlinkSqlGateway/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
FROM node:16
FROM python:3.8 as plint
ADD submitjob/ submitjob
RUN pip3 install flake8 && flake8 submitjob/job.py

FROM node:16 as nodelint
ADD gateway.js gateway.js
ADD package.json package.json
ADD lib/ lib
Expand All @@ -19,11 +23,33 @@ ENV NODE_ENV=production
ENV NODEJSPATH=/.nvm/versions/node/v*/bin
ENV NODE_PATH=/.nvm/versions/node/v*/lib/node_modules
WORKDIR /opt/gateway
ADD gateway.js gateway.js
#ADD gateway.js gateway.js
COPY --from=nodelint gateway.js gateway.js
ADD package.json package.json
ADD lib/ lib
COPY --from=plint submitjob submitjob
RUN mkdir jars && cd jars && wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.1.jar
RUN . $HOME/.nvm/nvm.sh && npm install --production
ADD sql-client-defaults.yaml /opt/flink/conf/


RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev zip && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink
RUN pip3 install apache-flink==1.16.0


USER 9999
ENTRYPOINT [ "node ./gateway.js" ]
163 changes: 131 additions & 32 deletions FlinkSqlGateway/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,163 @@ const express = require('express');
const { exec } = require('child_process');
const uuid = require('uuid');
const fs = require('fs');
const path = require('path');
const app = express();
const bodyParser = require('body-parser');

const logger = require('./lib/logger.js');
const port = process.env.SIMPLE_FLINK_SQL_GATEWAY_PORT || 9000;
const flinkVersion = '1.14.3';
const flinkRoot = process.env.SIMPLE_FLINK_SQL_GATEWAY_ROOT || `./flink-${flinkVersion}`;
const flinkSqlClient = '/bin/sql-client.sh -l ';
const sqlJars = process.env.SIMPLE_FLINK_SQL_GATEWAY_JARS || './jars';
const flinksubmit = '/opt/flink/bin/flink run';
const runningAsMain = require.main === module;

const udfdir = '/tmp/udf';
const submitdir = 'submitjob';
const localudf = 'udf';
const localdata = 'data';
const sqlStructures = 'SQL-structures.json';
const submitjobscript = 'job.py';
const cwd = process.cwd();

function appget (_, response) {
response.status(200).send('OK');
logger.debug('Health Endpoint was requested.');
};

function submitJob (command, response) {
return new Promise((resolve, reject) =>
exec(command, (error, stdout, stderr) => {
if (error) {
logger.error('Error while submitting sql job: ' + error);
logger.error('Additional stdout messages from applicatino: ' + stdout);
logger.error('Additional sterr messages from applicatino: ' + stderr);
response.status(500);
response.send('Error while submitting sql job: ' + error);
reject(error);
return;
}
// find Job ID ind stdout, e.g.
// Job ID: e1ebb6b314c82b27cf81cbc812300d97
const regexp = /JobID=\[([0-9a-f]*)\]/i;
const found = stdout.match(regexp);
logger.debug('Server output: ' + stdout);
if (found !== null && found !== undefined) {
const jobId = found[1];
logger.debug('jobId found:' + jobId);
response.status(200).send('{ "jobid": "' + jobId + '" }');
} else { // no JOB ID found, unsuccessful
response.status(500);
response.send('Not successfully submitted. No JOB ID found in server reply.');
}
resolve();
})
);
}

const createCommand = function (dirname) {
const command = flinksubmit + ' --python ' + dirname + '/' + submitjobscript;
logger.debug('Now executing ' + command);
process.chdir(dirname);
return command;
};

function apppost (request, response) {
// for now ignore session_id
const body = request.body;
if (body === undefined || body === null || body.statement === undefined) {
if (body === undefined || body === null || body.sqlstatementset === undefined) {
response.status(500);
response.send('Wrong format! No statement field in body');
response.send('Wrong format! No sqlstatementset field in body');
return;
}
const id = uuid.v4();
const filename = '/tmp/script_' + id + '.sql';
fs.writeFileSync(filename, body.statement.toString());
const command = flinkRoot + flinkSqlClient + sqlJars + ' -f ' + filename;
logger.debug('Now executing ' + command);
exec(command, (error, stdout, stderr) => {
fs.unlinkSync(filename);
if (error) {
logger.error('Error while executing sql-client: ' + error);
response.status(500);
response.send('Error while executing sql-client: ' + error);
return;
}
// find Job ID ind stdout, e.g.
// Job ID: e1ebb6b314c82b27cf81cbc812300d97
const regexp = /Job ID: ([0-9a-f]*)/i;
const found = stdout.match(regexp);
logger.debug('Server output: ' + stdout);
if (found !== null && found !== undefined) {
const jobId = found[1];
logger.debug('jobId found:' + jobId);
response.status(200).send('{ "jobid": "' + jobId + '" }');
} else { // no JOB ID found, unsuccessful
response.status(500);
response.send('Not successfully submitted. No JOB ID found in server reply.');
}
});
const dirname = '/tmp/gateway_' + id;
const datatargetdir = dirname + '/' + localdata;
const udftargetdir = dirname + '/' + localudf;
const submitjobscripttargetdir = dirname + '/' + submitjobscript;
try {
process.chdir(cwd);
fs.mkdirSync(dirname, '0744');
fs.mkdirSync(datatargetdir, '0744');
fs.cpSync(submitdir + '/' + localudf, udftargetdir, { recursive: true });
fs.cpSync(submitdir + '/' + submitjobscript, submitjobscripttargetdir);
fs.writeFileSync(datatargetdir + '/' + sqlStructures, JSON.stringify(body));
const udfFiles = getLocalPythonUdfs();
udfFiles.forEach(file => fs.copyFileSync(file, udftargetdir + '/' + path.basename(file)));

const command = createCommand(dirname);
submitJob(command, response).then(
() => { fs.rmSync(dirname, { recursive: true, force: true }); }
).catch(
(e) => {
logger.error(e.stack || e);
fs.rmSync(dirname, { recursive: true, force: true });
}
);
} catch (e) {
logger.error('Could not submit job: ' + e.stack || e);
fs.rmSync(dirname, { recursive: true, force: true });
}
}

function udfget (req, res) {
const filename = req.params.filename;
logger.debug('python_udf get was requested for: ' + filename);
const fullname = `${udfdir}/${filename}.py`;
try {
fs.readFileSync(fullname);
} catch (err) {
res.status(404).send('File not Found');
logger.info('File not found: ' + fullname);
return;
}
res.status(200).send('OK');
};

function udfpost (req, res) {
const filename = req.params.filename;
const body = req.body;
if (body === undefined || body === null) {
res.status(500);
res.send('No body received!');
return;
}
logger.debug(`python_udf with name ${filename}`);
const fullname = `${udfdir}/${filename}.py`;
try {
fs.writeFileSync(fullname, body);
} catch (err) {
res.status(500).send('Could not write file: ' + err);
logger.error('WriteSync failed:' + err);
return;
}
res.status(201).send('CREATED');
}

function getLocalPythonUdfs () {
const verfiles = {};
const files = fs.readdirSync(udfdir)
.filter(fn => fn.endsWith('.py'))
.sort()
.map(x => x.substring(0, x.lastIndexOf('.')))
.map(x => x.split('_v'));

files.forEach(x => { verfiles[x[0]] = x[1]; });
const result = Object.keys(verfiles).map(x => `${x}_v${verfiles[x]}.py`).map(x => `${udfdir}/${x}`);
return result;
}

app.use(express.json({ limit: '10mb' }));

app.get('/health', appget);
app.get('/v1/python_udf/:filename', udfget);

app.post('/v1/sessions/:session_id/statements', apppost);
app.post('/v1/python_udf/:filename', bodyParser.text(), udfpost);

if (runningAsMain) {
if (!fs.existsSync(udfdir)) {
fs.mkdirSync(udfdir);
}

app.listen(port, function () {
console.log('Listening on port ' + port);
});
Expand Down
1 change: 1 addition & 0 deletions FlinkSqlGateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"dependencies": {
"child-process": "^1.0.2",
"express": "^4.18.2",
"jszip": "^3.10.1",
"uuid": "^8.3.2",
"winston": "^3.8.1"
},
Expand Down
61 changes: 61 additions & 0 deletions FlinkSqlGateway/submitjob/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from pyflink.table import TableEnvironment, EnvironmentSettings
import json
import pathlib
import os


JARDIR = '/opt/gateway/jars'

with open('data/SQL-structures.json') as f:
d = json.load(f)

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Get all jars from /opt/gateway/jar
jars = ';'.join(list(map(lambda x: "file://"+str(x),
pathlib.Path(JARDIR).glob('*.jar'))))

table_env.get_config().set("pipeline.classpaths", jars)

# Register udf models
for file in os.scandir('udf'):
if file.name.endswith('.py') and file.name != '__init__.py':
try:
print(f"Executing {file.name}")
f = open('udf/' + file.name).read()
exec(f)
register(table_env) # noqa: F821
except Exception as error:
print(error)

# Create SETs
if 'sqlsets' in d:
sets = d['sqlsets']
for set in sets:
v = set.replace('=', ' ').split(' ')
key = v[1]
value = v[-1].strip(';').strip('\'')
print(f'SET: {key}={value}')
table_env.get_config().set(key, value)

# Create Tables
if 'tables' in d:
tables = d['tables']
for table in tables:
table_env.execute_sql(table)

# Create Views
if 'views' in d:
views = d['views']
for view in views:
table_env.execute_sql(view)

# CREATE SQL Statement SET

statement_set = table_env.create_statement_set()
for statement in d["sqlstatementset"]:
statement_set.add_insert_sql(statement)

jobresult = statement_set.execute()
print(f'JobID=[{jobresult.get_job_client().get_job_id()}]')
Empty file.
Loading