Skip to content

Commit

Permalink
Decoding, labels, and handling larger files
Browse files Browse the repository at this point in the history
New functionality to decode text data and to provide labels instead of variable names if desired. Also improvements in reading larger files.
  • Loading branch information
Nabeel committed Dec 11, 2018
1 parent 343809c commit 2d486e6
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 34 deletions.
1 change: 1 addition & 0 deletions Qlik-SAS-Init.bat
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pip install grpcio
pip install grpcio-tools
pip install numpy
pip install pandas
pip install sas7bdat
echo.
echo Creating a new firewall rule for TCP port 50056... & echo.
netsh advfirewall firewall add rule name="Qlik SAS Reader" dir=in action=allow protocol=TCP localport=50056
Expand Down
35 changes: 29 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Load SAS Datasets into Qlik
This Python Server Side Extension (SSE) for Qlik helps load SAS datasets stored in SAS7BDAT or XPORT files.

The files are read using the [Pandas library](https://pandas.pydata.org/pandas-docs/stable/io.html?highlight=sas7bdatreader#sas-formats).
The files are read using the [Pandas library](https://pandas.pydata.org/pandas-docs/stable/io.html?highlight=sas7bdatreader#sas-formats) and the [sas7bdat module](https://bitbucket.org/jaredhobbs/sas7bdat/overview).

For more information on Qlik Server Side Extensions see [qlik-oss](https://github.com/qlik-oss/server-side-extension).

Expand All @@ -23,7 +23,7 @@ For more information on Qlik Server Side Extensions see [qlik-oss](https://githu
3. Right click `Qlik-SAS-Init.bat` and choose 'Run as Administrator'. You can open this file in a text editor to review the commands that will be executed. If everything goes smoothly you will see a Python virtual environment being set up, project files being copied, some packages being installed and TCP Port 50056 being opened for inbound communication.
- Note that the script always ends with a "All done" message and does not check for errors.
- If you need to change the port you can do so in the file `core\__main__.py` by opening the file with a text editor, changing the value of the `_DEFAULT_PORT` variable, and then saving the file. You will also need to update `Qlik-Py-Init.bat` to use the same port in the `netsh` command. This command will only work if you run the batch file through an elevated command prompt (i.e. with administrator privileges).
- Once the execution completes, do a quick scan of the log to see everything installed correctly. The libraries imported are: `grpcio`, `grpcio-tools`, `numpy`, `pandas`. Also, check that the `core` and `generated` directories have been copied successfully to the newly created `qlik-sas-env` directory.
- Once the execution completes, do a quick scan of the log to see everything installed correctly. The libraries imported are: `grpcio`, `grpcio-tools`, `numpy`, `pandas`, `sas7bdat` and their dependencies. Also, check that the `core` and `generated` directories have been copied successfully to the newly created `qlik-sas-env` directory.

4. Now whenever you want to start this Python service you can run `Qlik-SAS-Start.bat`. You may need to run this batch file as an administrator.

Expand All @@ -42,7 +42,7 @@ First you need to specify the path for the file and any additional arguments. We
TempInputs:
LOAD * INLINE [
'Path', 'Args'
'..\..\data\sample.sas7bdat', 'debug=true, chunksize=1000'
'..\..\data\sample.sas7bdat', 'debug=true, labels=true'
];
```

Expand All @@ -58,7 +58,7 @@ EXTENSION SAS.Read_SAS(TempInputs{Path, Args});

In the example above the analytic connection has been named as `SAS`. This will depend on how you named the connection in step 5 of the installation.

If you want a preview of the field names, you can use the `debug=true` argument. This will enable the logging features of the SSE with information printed to the terminal and a log file. The log files can be found in the `qlik-sas-reader\qlik-sas-env\core\logs\` directory.
If you want a preview of the data, you can use the `debug=true` argument. This will enable the logging features of the SSE with information printed to the terminal and a log file. The log files can be found in the `qlik-sas-reader\qlik-sas-env\core\logs\` directory.

For large files you should consider passing the `chunksize` parameter. This allows the file to be read iteratively `chunksize` lines at a time. This parameter defaults to `1000` for this SSE, but may need to be adjusted based on the number of columns in the file.

Expand All @@ -67,6 +67,29 @@ The optional parameters below can be included in the additional arguments passed
| Keyword | Description | Sample Values | Remarks |
| --- | --- | --- | --- |
| debug | Flag to output additional information to the terminal and logs | `true`, `false` | Information will be printed to the terminal and a log file: `..\qlik-sas-env\core\logs\SAS Reader Log <n>.txt`. <br/><br/>Particularly useful is looking at the sample output to see how the file is structured. |
| labels | Flag to return labels instead of variable names from the SAS file | `true`, `false` | This parameter defaults to `false`. <br/><br/>For very wide tables, the labels may exceed metadata limits. In this case you can use the `Get_Labels` function described below. |
| format | The format of the file | `xport`, `sas7bdat` | If the format is not specified, it will be inferred. |
| encoding | Encoding for text data | `utf-8` | If the encoding is not specified, Pandas returns the text as raw bytes. This could be cleaned up in Qlik if desired. |
| chunksize | Read file chunksize lines at a time | `1000` | The file is read iteratively, `chunksize` lines at a time. This parameter defaults to `1000` but may need to be adjusted based on the number of columns in the file. |
| encoding | Encoding for text data | `utf_8` | If the encoding is not specified, Pandas returns the text as raw bytes. This SSE will attempt to decode with `utf_8`, `ascii` and `latin_1`, but in case of issues will return the text as bytes.<br><br>If the encoding is unknown and default decoding fails, the data can be cleaned up in Qlik using [String functions](https://help.qlik.com/en-US/sense/November2018/Subsystems/Hub/Content/Sense_Hub/Scripting/StringFunctions/string-functions.htm). |
| chunksize | Read file chunksize lines at a time | `1000` | The file is read iteratively, `chunksize` lines at a time. This parameter defaults to `1000` but may need to be adjusted based on the number of columns in the file. |

To get labels for the variables in a SAS7BDAT file you can call the `Get_Labels` function. If you load the result from this function as a mapping table in Qlik, you can easily rename the field names using the [Rename Fields](https://help.qlik.com/en-US/sense/November2018/Subsystems/Hub/Content/Sense_Hub/Scripting/ScriptRegularStatements/rename-field.htm) script function.

```
TempInputs:
LOAD * INLINE [
'Path', 'Args'
'..\..\data\sample.sas7bdat', ''
];
[SAS Dataset]:
LOAD *
EXTENSION SAS.Read_SAS(TempInputs{Path, Args});
FieldMap:
MAPPING LOAD *
EXTENSION SAS.Get_Labels(TempInputs{Path, Args});
Rename Fields using FieldMap;
Drop table TempInputs;
```
48 changes: 38 additions & 10 deletions core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
# Set the default port for this SSE Extension
_DEFAULT_PORT = '50056'

# Set the maximum message length for gRPC in bytes
_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_MINFLOAT = float('-inf')

Expand Down Expand Up @@ -55,7 +58,8 @@ def functions(self):
:return: Mapping of function id and implementation
"""
return {
0: '_read_sas'
0: '_read_sas',
1: '_read_sas'
}

"""
Expand All @@ -73,15 +77,25 @@ def _read_sas(request, context):
:Qlik expression examples:
:<AAI Connection Name>.Read_SAS('data/airline.sas7bdat', 'format=sas7bdat')
"""
# Get the function id from the header to determine the variant being called
function = ExtensionService._get_function_id(context)

# Get a list from the generator object so that it can be iterated over multiple times
request_list = [request_rows for request_rows in request]

# Create an instance of the SASReader class
# This will take the SAS file information from Qlik and prepare the data to be read
reader = SASReader(request_list, context)

# Read the SAS data file. This returns a Pandas Data Frame or an interator if the file is to be read in chunks
response = reader.read()
if function == 1:
# Get labels for the variables in the SAS file
response = reader.get_labels()
else:
# Read the SAS data file. This returns a Pandas Data Frame or an interator if the file is to be read in chunks
response = reader.read()

# The function will only send a maximum number of cells per bundle
_MAX_CELLS = 10000

if isinstance(response, pd.DataFrame):
# Convert the response to a list of rows
Expand All @@ -96,25 +110,36 @@ def _read_sas(request, context):
# Values are then structured as SSE.Rows
response_rows = [SSE.Row(duals=duals) for duals in response_rows]

# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
# Calculate number of rows per bundle, adjusting for overheads of data structures
rows_per_bundle = _MAX_CELLS//response.shape[1]

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

else:
for chunk in response:
for chunk in response:
# Convert the chunk to a list of rows
response_list = chunk.values.tolist()

# We convert values to type SSE.Dual, and group columns into a iterable
response_rows = []

# Append rows to the response
for row in response_list:
response_rows.append(ExtensionService._get_duals(row))

# Values are then structured as SSE.Rows
response_rows = [SSE.Row(duals=duals) for duals in response_rows]

# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows)
# Calculate number of rows per bundle, adjusting for overheads of data structures
rows_per_bundle = _MAX_CELLS//chunk.shape[1]

# Stream response as BundledRows
for i in range(0, len(response_rows), rows_per_bundle):
# Yield Row data as Bundled rows
yield SSE.BundledRows(rows=response_rows[i : i + rows_per_bundle])

@staticmethod
def _get_duals(row):
Expand All @@ -129,7 +154,7 @@ def _get_duals(row):

# if the value is null:
if pd.isnull(col):
duals.append(SSE.Dual(numData=np.NaN, strData='/x00'))
duals.append(SSE.Dual(numData=np.NaN, strData=''))

# if the value is numeric:
elif isinstance(col, (int, float)):
Expand Down Expand Up @@ -219,7 +244,10 @@ def Serve(self, port, pem_dir):
:param pem_dir: Directory including certificates
:return: None
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),\
options=[('grpc.max_message_length', _MAX_MESSAGE_LENGTH),('grpc.max_send_message_length', _MAX_MESSAGE_LENGTH),\
('grpc.max_receive_message_length', _MAX_MESSAGE_LENGTH),('grpc.max_metadata_size', _MAX_MESSAGE_LENGTH)])

SSE.add_ConnectorServicer_to_server(self, server)

if pem_dir:
Expand Down
136 changes: 118 additions & 18 deletions core/_sas_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import pandas as pd
import ServerSideExtension_pb2 as SSE

from sas7bdat import SAS7BDAT

# Add Generated folder to module path
PARENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.join(PARENT_DIR, 'generated'))
Expand Down Expand Up @@ -51,12 +53,65 @@ def read(self):
"""
Read the SAS dataset and return as a Pandas Data Frame or an iterator to read the file in chunks.
"""

reader = None

# If encoding is not specified, we try some common codecs
if self.encoding is None:
# Try encoding with each of the default codecs
for cp in self.default_encoding:
try:
reader = pd.read_sas(self.filepath, encoding=cp, **self.read_sas_kwargs)
self.encoding = cp
break
except UnicodeDecodeError:
continue

# Instantiate the reader if we haven't already done so
if reader is None:
reader = pd.read_sas(self.filepath, **self.read_sas_kwargs)

# Send metadata on the result to Qlik
self._send_table_description()

# Read the SAS dataset
return pd.read_sas(self.filepath, **self.read_sas_kwargs)
return reader

def get_labels(self):
"""
Return labels for the variable names in a sas7bdat file
"""

# Use the sas7bdat library to read the file
reader = SAS7BDAT(self.filepath, skip_header=False)

columns = None

# If encoding is not specified, we try some common codecs
if self.encoding is None:
# Try encoding with each of the default codecs
for cp in self.default_encoding:
try:
# Get labels for the variables
columns = [(col.name.decode(cp), col.label.decode(cp)) for col in reader.columns]
self.encoding = cp
break
except UnicodeDecodeError:
continue

if columns is None:
# Get labels for the variables
columns = [(col.name, col.label) for col in reader.columns]

self.columns = pd.DataFrame(columns)
reader.close()

if self.debug:
self._print_log(3)

# Send metadata on the result to Qlik
self._send_table_description(func="get_labels")

return self.columns

def _set_params(self, kwargs):
"""
Expand All @@ -74,6 +129,8 @@ def _set_params(self, kwargs):

# SSE parameters:
self.debug = False
self.labels = False
self.default_encoding = ["utf_8", "ascii", "latin_1"]
# pandas.read_sas parameters:
self.format = None
self.encoding = None
Expand All @@ -97,6 +154,11 @@ def _set_params(self, kwargs):
if 'debug' in self.kwargs:
self.debug = 'true' == self.kwargs['debug'].lower()

# Choose whether labels or variable names are to be returned as field names
# Valid values are: true, false
if 'labels' in self.kwargs:
self.labels = 'true' == self.kwargs['labels'].lower()

# Set the format of the file, if none is specified it is inferred.
# Options are: xport, sas7bdat
if 'format' in self.kwargs:
Expand Down Expand Up @@ -131,30 +193,52 @@ def _populate_dict(self, params):

return output_dict

def _send_table_description(self):
def _send_table_description(self, func=None):
"""
Send the table description to Qlik as meta data.
Only used when the SSE is called from the Qlik load script.
"""

# Set up the table description to send as metadata to Qlik
self.table = SSE.TableDescription()
self.table.name = "SAS_Dataset"

# Read the SAS dataset to get column headers
sample_response = pd.read_sas(self.filepath, format=self.format, encoding=self.encoding, chunksize=5)

# Get the first chunk of data as a Pandas DataFrame
self.sample_data = sample_response.__next__()

# Set field names based on the DataFrame
for col in self.sample_data.columns.tolist():
# Set up fields for the table
self.table.fields.add(name=col)
if func is None:
self.table.name = "SAS_Dataset"

# Read the SAS file to get sample data
sample_response = pd.read_sas(self.filepath, format=self.format, encoding=self.encoding, chunksize=5)

# Get the first chunk of data as a Pandas DataFrame
self.sample_data = sample_response.__next__()

# Fetch field labels from SAS variable attributes if required
# This may fail for wide tables due to meta data limits. For such cases use the get_labels function.
if self.labels:
# Use the sas7bdat library to read the file
reader = SAS7BDAT(self.filepath, skip_header=False)

# Get labels for the variables
labels = [col.label.decode(self.encoding) for col in reader.columns]
else:
# Get the variable names from the sample data
labels = self.sample_data.columns

# Set field names
for col in labels:
# Set up fields for the table
self.table.fields.add(name=col)

if self.debug:
self._print_log(2)

if self.debug:
self._print_log(2)
elif func == "get_labels":
self.table.name = "SAS_Labels"
self.table.fields.add(name="variable")
self.table.fields.add(name="label")

if self.debug:
self._print_log(4)

# Send table description
table_header = (('qlik-tabledescription-bin', self.table.SerializeToString()),)
self.context.send_initial_metadata(table_header)
Expand Down Expand Up @@ -191,7 +275,7 @@ def _print_log(self, step):
sys.stdout.write("\nSAMPLE DATA: {0} rows x cols\n\n".format(self.sample_data.shape))
sys.stdout.write("{0} \n\n".format(self.sample_data.to_string()))

# Print the table description if the call was made from the load script
# Print the table description
sys.stdout.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table))

with open(self.logfile,'a') as f:
Expand All @@ -201,5 +285,21 @@ def _print_log(self, step):

# Write the table description to the log file
f.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table))

elif step == 3:
# Print labels to the terminal
sys.stdout.write("\nRESPONSE FROM GET_LABELS:\n\n{0}\n\n".format(self.columns.to_string()))

with open(self.logfile,'a') as f:
# Write the sample data to the log file
f.write("\nRESPONSE FROM GET_LABELS:\n\n{0}\n\n".format(self.columns.to_string()))

elif step == 4:
# Print the table description
sys.stdout.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table))

with open(self.logfile,'a') as f:
# Write the table description to the log file
f.write("\nTABLE DESCRIPTION SENT TO QLIK:\n\n{0} \n\n".format(self.table))


Loading

0 comments on commit 2d486e6

Please sign in to comment.