From 2d486e6190593c8ea7f9282519d5a4789a2753d9 Mon Sep 17 00:00:00 2001 From: Nabeel Date: Tue, 11 Dec 2018 17:13:07 +1100 Subject: [PATCH] Decoding, labels, and handling larger files New functionality to decode text data and to provide labels instead of variable names if desired. Also improvements in reading larger files. --- Qlik-SAS-Init.bat | 1 + README.md | 35 ++++++++++-- core/__main__.py | 48 ++++++++++++---- core/_sas_reader.py | 136 ++++++++++++++++++++++++++++++++++++++------ core/functions.json | 10 ++++ 5 files changed, 196 insertions(+), 34 deletions(-) diff --git a/Qlik-SAS-Init.bat b/Qlik-SAS-Init.bat index 46456f8..79ca31e 100644 --- a/Qlik-SAS-Init.bat +++ b/Qlik-SAS-Init.bat @@ -17,6 +17,7 @@ pip install grpcio pip install grpcio-tools pip install numpy pip install pandas +pip install sas7bdat echo. echo Creating a new firewall rule for TCP port 50056... & echo. netsh advfirewall firewall add rule name="Qlik SAS Reader" dir=in action=allow protocol=TCP localport=50056 diff --git a/README.md b/README.md index 7f093d8..c18b6b0 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Load SAS Datasets into Qlik This Python Server Side Extension (SSE) for Qlik helps load SAS datasets stored in SAS7BDAT or XPORT files. -The files are read using the [Pandas library](https://pandas.pydata.org/pandas-docs/stable/io.html?highlight=sas7bdatreader#sas-formats). +The files are read using the [Pandas library](https://pandas.pydata.org/pandas-docs/stable/io.html?highlight=sas7bdatreader#sas-formats) and the [sas7bdat module](https://bitbucket.org/jaredhobbs/sas7bdat/overview). For more information on Qlik Server Side Extensions see [qlik-oss](https://github.com/qlik-oss/server-side-extension). @@ -23,7 +23,7 @@ For more information on Qlik Server Side Extensions see [qlik-oss](https://githu 3. Right click `Qlik-SAS-Init.bat` and choose 'Run as Administrator'. You can open this file in a text editor to review the commands that will be executed. If everything goes smoothly you will see a Python virtual environment being set up, project files being copied, some packages being installed and TCP Port 50056 being opened for inbound communication. - Note that the script always ends with a "All done" message and does not check for errors. - If you need to change the port you can do so in the file `core\__main__.py` by opening the file with a text editor, changing the value of the `_DEFAULT_PORT` variable, and then saving the file. You will also need to update `Qlik-Py-Init.bat` to use the same port in the `netsh` command. This command will only work if you run the batch file through an elevated command prompt (i.e. with administrator privileges). - - Once the execution completes, do a quick scan of the log to see everything installed correctly. The libraries imported are: `grpcio`, `grpcio-tools`, `numpy`, `pandas`. Also, check that the `core` and `generated` directories have been copied successfully to the newly created `qlik-sas-env` directory. + - Once the execution completes, do a quick scan of the log to see everything installed correctly. The libraries imported are: `grpcio`, `grpcio-tools`, `numpy`, `pandas`, `sas7bdat` and their dependencies. Also, check that the `core` and `generated` directories have been copied successfully to the newly created `qlik-sas-env` directory. 4. Now whenever you want to start this Python service you can run `Qlik-SAS-Start.bat`. You may need to run this batch file as an administrator. @@ -42,7 +42,7 @@ First you need to specify the path for the file and any additional arguments. We TempInputs: LOAD * INLINE [ 'Path', 'Args' - '..\..\data\sample.sas7bdat', 'debug=true, chunksize=1000' + '..\..\data\sample.sas7bdat', 'debug=true, labels=true' ]; ``` @@ -58,7 +58,7 @@ EXTENSION SAS.Read_SAS(TempInputs{Path, Args}); In the example above the analytic connection has been named as `SAS`. This will depend on how you named the connection in step 5 of the installation. -If you want a preview of the field names, you can use the `debug=true` argument. This will enable the logging features of the SSE with information printed to the terminal and a log file. The log files can be found in the `qlik-sas-reader\qlik-sas-env\core\logs\` directory. +If you want a preview of the data, you can use the `debug=true` argument. This will enable the logging features of the SSE with information printed to the terminal and a log file. The log files can be found in the `qlik-sas-reader\qlik-sas-env\core\logs\` directory. For large files you should consider passing the `chunksize` parameter. This allows the file to be read iteratively `chunksize` lines at a time. This parameter defaults to `1000` for this SSE, but may need to be adjusted based on the number of columns in the file. @@ -67,6 +67,29 @@ The optional parameters below can be included in the additional arguments passed | Keyword | Description | Sample Values | Remarks | | --- | --- | --- | --- | | debug | Flag to output additional information to the terminal and logs | `true`, `false` | Information will be printed to the terminal and a log file: `..\qlik-sas-env\core\logs\SAS Reader Log .txt`.

Particularly useful is looking at the sample output to see how the file is structured. | +| labels | Flag to return labels instead of variable names from the SAS file | `true`, `false` | This parameter defaults to `false`.

For very wide tables, the labels may exceed metadata limits. In this case you can use the `Get_Labels` function described below. | | format | The format of the file | `xport`, `sas7bdat` | If the format is not specified, it will be inferred. | -| encoding | Encoding for text data | `utf-8` | If the encoding is not specified, Pandas returns the text as raw bytes. This could be cleaned up in Qlik if desired. | -| chunksize | Read file chunksize lines at a time | `1000` | The file is read iteratively, `chunksize` lines at a time. This parameter defaults to `1000` but may need to be adjusted based on the number of columns in the file. | \ No newline at end of file +| encoding | Encoding for text data | `utf_8` | If the encoding is not specified, Pandas returns the text as raw bytes. This SSE will attempt to decode with `utf_8`, `ascii` and `latin_1`, but in case of issues will return the text as bytes.

If the encoding is unknown and default decoding fails, the data can be cleaned up in Qlik using [String functions](https://help.qlik.com/en-US/sense/November2018/Subsystems/Hub/Content/Sense_Hub/Scripting/StringFunctions/string-functions.htm). | +| chunksize | Read file chunksize lines at a time | `1000` | The file is read iteratively, `chunksize` lines at a time. This parameter defaults to `1000` but may need to be adjusted based on the number of columns in the file. | + +To get labels for the variables in a SAS7BDAT file you can call the `Get_Labels` function. If you load the result from this function as a mapping table in Qlik, you can easily rename the field names using the [Rename Fields](https://help.qlik.com/en-US/sense/November2018/Subsystems/Hub/Content/Sense_Hub/Scripting/ScriptRegularStatements/rename-field.htm) script function. + +``` +TempInputs: +LOAD * INLINE [ + 'Path', 'Args' + '..\..\data\sample.sas7bdat', '' +]; + +[SAS Dataset]: +LOAD * +EXTENSION SAS.Read_SAS(TempInputs{Path, Args}); + +FieldMap: +MAPPING LOAD * +EXTENSION SAS.Get_Labels(TempInputs{Path, Args}); + +Rename Fields using FieldMap; + +Drop table TempInputs; +``` \ No newline at end of file diff --git a/core/__main__.py b/core/__main__.py index e142667..cae1032 100644 --- a/core/__main__.py +++ b/core/__main__.py @@ -22,6 +22,9 @@ # Set the default port for this SSE Extension _DEFAULT_PORT = '50056' +# Set the maximum message length for gRPC in bytes +_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024 + _ONE_DAY_IN_SECONDS = 60 * 60 * 24 _MINFLOAT = float('-inf') @@ -55,7 +58,8 @@ def functions(self): :return: Mapping of function id and implementation """ return { - 0: '_read_sas' + 0: '_read_sas', + 1: '_read_sas' } """ @@ -73,6 +77,9 @@ def _read_sas(request, context): :Qlik expression examples: :.Read_SAS('data/airline.sas7bdat', 'format=sas7bdat') """ + # Get the function id from the header to determine the variant being called + function = ExtensionService._get_function_id(context) + # Get a list from the generator object so that it can be iterated over multiple times request_list = [request_rows for request_rows in request] @@ -80,8 +87,15 @@ def _read_sas(request, context): # This will take the SAS file information from Qlik and prepare the data to be read reader = SASReader(request_list, context) - # Read the SAS data file. This returns a Pandas Data Frame or an interator if the file is to be read in chunks - response = reader.read() + if function == 1: + # Get labels for the variables in the SAS file + response = reader.get_labels() + else: + # Read the SAS data file. This returns a Pandas Data Frame or an interator if the file is to be read in chunks + response = reader.read() + + # The function will only send a maximum number of cells per bundle + _MAX_CELLS = 10000 if isinstance(response, pd.DataFrame): # Convert the response to a list of rows @@ -96,25 +110,36 @@ def _read_sas(request, context): # Values are then structured as SSE.Rows response_rows = [SSE.Row(duals=duals) for duals in response_rows] - # Yield Row data as Bundled rows - yield SSE.BundledRows(rows=response_rows) + # Calculate number of rows per bundle, adjusting for overheads of data structures + rows_per_bundle = _MAX_CELLS//response.shape[1] + + # Stream response as BundledRows + for i in range(0, len(response_rows), rows_per_bundle): + # Yield Row data as Bundled rows + yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle]) else: - for chunk in response: + for chunk in response: # Convert the chunk to a list of rows response_list = chunk.values.tolist() # We convert values to type SSE.Dual, and group columns into a iterable response_rows = [] + # Append rows to the response for row in response_list: response_rows.append(ExtensionService._get_duals(row)) # Values are then structured as SSE.Rows response_rows = [SSE.Row(duals=duals) for duals in response_rows] - # Yield Row data as Bundled rows - yield SSE.BundledRows(rows=response_rows) + # Calculate number of rows per bundle, adjusting for overheads of data structures + rows_per_bundle = _MAX_CELLS//chunk.shape[1] + + # Stream response as BundledRows + for i in range(0, len(response_rows), rows_per_bundle): + # Yield Row data as Bundled rows + yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle]) @staticmethod def _get_duals(row): @@ -129,7 +154,7 @@ def _get_duals(row): # if the value is null: if pd.isnull(col): - duals.append(SSE.Dual(numData=np.NaN, strData='/x00')) + duals.append(SSE.Dual(numData=np.NaN, strData='')) # if the value is numeric: elif isinstance(col, (int, float)): @@ -219,7 +244,10 @@ def Serve(self, port, pem_dir): :param pem_dir: Directory including certificates :return: None """ - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),\ + options=[('grpc.max_message_length', _MAX_MESSAGE_LENGTH),('grpc.max_send_message_length', _MAX_MESSAGE_LENGTH),\ + ('grpc.max_receive_message_length', _MAX_MESSAGE_LENGTH),('grpc.max_metadata_size', _MAX_MESSAGE_LENGTH)]) + SSE.add_ConnectorServicer_to_server(self, server) if pem_dir: diff --git a/core/_sas_reader.py b/core/_sas_reader.py index ce8f1ca..c1d7827 100644 --- a/core/_sas_reader.py +++ b/core/_sas_reader.py @@ -6,6 +6,8 @@ import pandas as pd import ServerSideExtension_pb2 as SSE +from sas7bdat import SAS7BDAT + # Add Generated folder to module path PARENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.join(PARENT_DIR, 'generated')) @@ -51,12 +53,65 @@ def read(self): """ Read the SAS dataset and return as a Pandas Data Frame or an iterator to read the file in chunks. """ - + reader = None + + # If encoding is not specified, we try some common codecs + if self.encoding is None: + # Try encoding with each of the default codecs + for cp in self.default_encoding: + try: + reader = pd.read_sas(self.filepath, encoding=cp, **self.read_sas_kwargs) + self.encoding = cp + break + except UnicodeDecodeError: + continue + + # Instantiate the reader if we haven't already done so + if reader is None: + reader = pd.read_sas(self.filepath, **self.read_sas_kwargs) + # Send metadata on the result to Qlik self._send_table_description() - + # Read the SAS dataset - return pd.read_sas(self.filepath, **self.read_sas_kwargs) + return reader + + def get_labels(self): + """ + Return labels for the variable names in a sas7bdat file + """ + + # Use the sas7bdat library to read the file + reader = SAS7BDAT(self.filepath, skip_header=False) + + columns = None + + # If encoding is not specified, we try some common codecs + if self.encoding is None: + # Try encoding with each of the default codecs + for cp in self.default_encoding: + try: + # Get labels for the variables + columns = [(col.name.decode(cp), col.label.decode(cp)) for col in reader.columns] + self.encoding = cp + break + except UnicodeDecodeError: + continue + + if columns is None: + # Get labels for the variables + columns = [(col.name, col.label) for col in reader.columns] + + self.columns = pd.DataFrame(columns) + reader.close() + + if self.debug: + self._print_log(3) + + # Send metadata on the result to Qlik + self._send_table_description(func="get_labels") + + return self.columns def _set_params(self, kwargs): """ @@ -74,6 +129,8 @@ def _set_params(self, kwargs): # SSE parameters: self.debug = False + self.labels = False + self.default_encoding = ["utf_8", "ascii", "latin_1"] # pandas.read_sas parameters: self.format = None self.encoding = None @@ -97,6 +154,11 @@ def _set_params(self, kwargs): if 'debug' in self.kwargs: self.debug = 'true' == self.kwargs['debug'].lower() + # Choose whether labels or variable names are to be returned as field names + # Valid values are: true, false + if 'labels' in self.kwargs: + self.labels = 'true' == self.kwargs['labels'].lower() + # Set the format of the file, if none is specified it is inferred. # Options are: xport, sas7bdat if 'format' in self.kwargs: @@ -131,7 +193,7 @@ def _populate_dict(self, params): return output_dict - def _send_table_description(self): + def _send_table_description(self, func=None): """ Send the table description to Qlik as meta data. Only used when the SSE is called from the Qlik load script. @@ -139,22 +201,44 @@ def _send_table_description(self): # Set up the table description to send as metadata to Qlik self.table = SSE.TableDescription() - self.table.name = "SAS_Dataset" - - # Read the SAS dataset to get column headers - sample_response = pd.read_sas(self.filepath, format=self.format, encoding=self.encoding, chunksize=5) - - # Get the first chunk of data as a Pandas DataFrame - self.sample_data = sample_response.__next__() - # Set field names based on the DataFrame - for col in self.sample_data.columns.tolist(): - # Set up fields for the table - self.table.fields.add(name=col) + if func is None: + self.table.name = "SAS_Dataset" + + # Read the SAS file to get sample data + sample_response = pd.read_sas(self.filepath, format=self.format, encoding=self.encoding, chunksize=5) + + # Get the first chunk of data as a Pandas DataFrame + self.sample_data = sample_response.__next__() + + # Fetch field labels from SAS variable attributes if required + # This may fail for wide tables due to meta data limits. For such cases use the get_labels function. + if self.labels: + # Use the sas7bdat library to read the file + reader = SAS7BDAT(self.filepath, skip_header=False) + + # Get labels for the variables + labels = [col.label.decode(self.encoding) for col in reader.columns] + else: + # Get the variable names from the sample data + labels = self.sample_data.columns + + # Set field names + for col in labels: + # Set up fields for the table + self.table.fields.add(name=col) + + if self.debug: + self._print_log(2) - if self.debug: - self._print_log(2) + elif func == "get_labels": + self.table.name = "SAS_Labels" + self.table.fields.add(name="variable") + self.table.fields.add(name="label") + if self.debug: + self._print_log(4) + # Send table description table_header = (('qlik-tabledescription-bin', self.table.SerializeToString()),) self.context.send_initial_metadata(table_header) @@ -191,7 +275,7 @@ def _print_log(self, step): sys.stdout.write("\nSAMPLE DATA: {0} rows x cols\n\n".format(self.sample_data.shape)) sys.stdout.write("{0} \n\n".format(self.sample_data.to_string())) - # Print the table description if the call was made from the load script + # Print the table description sys.stdout.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table)) with open(self.logfile,'a') as f: @@ -201,5 +285,21 @@ def _print_log(self, step): # Write the table description to the log file f.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table)) + + elif step == 3: + # Print labels to the terminal + sys.stdout.write("\nRESPONSE FROM GET_LABELS:\n\n{0}\n\n".format(self.columns.to_string())) + + with open(self.logfile,'a') as f: + # Write the sample data to the log file + f.write("\nRESPONSE FROM GET_LABELS:\n\n{0}\n\n".format(self.columns.to_string())) + + elif step == 4: + # Print the table description + sys.stdout.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table)) + + with open(self.logfile,'a') as f: + # Write the table description to the log file + f.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table)) diff --git a/core/functions.json b/core/functions.json index 2d022dc..ca9954e 100644 --- a/core/functions.json +++ b/core/functions.json @@ -9,6 +9,16 @@ "a_path": 0, "b_other_args": 0 } + }, + { + "Id": 1, + "Name": "Get_Labels", + "Type": 0, + "ReturnType": 1, + "Params": { + "a_path": 0, + "b_other_args": 0 + } } ] }