Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Package branch #19

Merged
merged 18 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added FPC/__init__.py
Empty file.
6 changes: 5 additions & 1 deletion receiver/orion_pyspark_connector.py → FPC/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from threading import Thread
from datetime import datetime

from connectorconf import *
from FPC.connectorconf import *



Expand Down Expand Up @@ -439,6 +439,10 @@ def UnstructuredReplyToBroker(body: str, api_url: str = REPL_SINGLETON.api_url,
'''
return sendRequest(body, api_url, api_method)


def Test():
print(RECV_SINGLETON.http_address)




Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions receiver/subscribing_tool.py → FPC/subscribing_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import requests
import json

import connectorconf
from connectorconf import NGSIAttribute, NGSIEntityv2, NGSIEntityLD
import FPC.connectorconf
from FPC.connectorconf import NGSIAttribute, NGSIEntityv2, NGSIEntityLD
import sys


Expand Down
9 changes: 4 additions & 5 deletions Test/env_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ wget https://github.com/Engineering-Research-and-Development/fiware-orion-pyspar
unzip main.zip
echo "repository downloaded and unzipped"
mkdir ci/PySpark/
mv fiware-orion-pyspark-connector-main/receiver/orion_pyspark_connector.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/receiver/connectorconf.py ./ci/PySpark/
# mv fiware-orion-pyspark-connector-main/receiver/subscribing_tool.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/FPC/connector.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/FPC/connectorconf.py ./ci/PySpark/
# mv fiware-orion-pyspark-connector-main/FPC/subscribing_tool.py ./ci/PySpark/
rm -r fiware-orion-pyspark-connector-main
rm main.zip
echo "connector succesfully imported"

echo "connector succesfully installed"
2 changes: 1 addition & 1 deletion Test/ld_start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
from pyspark import SparkContext, SparkConf, StorageLevel


Expand Down
2 changes: 1 addition & 1 deletion Test/raw_start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
from pyspark import SparkContext, SparkConf, StorageLevel

import os
Expand Down
2 changes: 1 addition & 1 deletion Test/start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
from pyspark import SparkContext, SparkConf, StorageLevel

import os
Expand Down
2 changes: 1 addition & 1 deletion Test/v2_replier_start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
#import subscribing_tool as sub


Expand Down
6 changes: 3 additions & 3 deletions ci/env_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ wget https://github.com/Engineering-Research-and-Development/fiware-orion-pyspar
unzip main.zip
echo "repository downloaded and unzipped"
mkdir ci/PySpark/
mv fiware-orion-pyspark-connector-main/receiver/orion_pyspark_connector.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/receiver/connectorconf.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/receiver/subscribing_tool.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/FPC/connector.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/FPC/connectorconf.py ./ci/PySpark/
mv fiware-orion-pyspark-connector-main/FPC/subscribing_tool.py ./ci/PySpark/
rm -r fiware-orion-pyspark-connector-main
rm main.zip
echo "connector succesfully imported"
Expand Down
2 changes: 1 addition & 1 deletion ci/raw_test/start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
from pyspark import SparkContext, SparkConf, StorageLevel

import os
Expand Down
2 changes: 1 addition & 1 deletion ci/v2_test/start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orion_pyspark_connector as connector
import connector
from pyspark import SparkContext, SparkConf, StorageLevel


Expand Down
13 changes: 1 addition & 12 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,5 @@ RUN export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
RUN pip3 install 'py4j==0.10.9.3'
RUN pip3 install 'pyspark==3.2.1'
RUN pip3 install psutil
RUN pip3 install fiware-pyspark-connector

RUN wget https://github.com/Engineering-Research-and-Development/fiware-orion-pyspark-connector/archive/refs/heads/main.zip
RUN unzip main.zip


RUN mkdir ../PySpark/

RUN mv fiware-orion-pyspark-connector-main/receiver/orion_pyspark_connector.py ../PySpark/
RUN mv fiware-orion-pyspark-connector-main/receiver/connectorconf.py ../PySpark/
RUN mv fiware-orion-pyspark-connector-main/receiver/subscribing_tool.py ../PySpark/

RUN rm -r fiware-orion-pyspark-connector-main
RUN rm main.zip
65 changes: 40 additions & 25 deletions docs/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@
### Receiver

Once installed the requirements, it is possible to use the connector by following these steps:
- Load files on the same machine running the spark job
- Modify the `connectorconf.py` file in the repository and search for the **ReceiverConfiguration** class to set up the IP address and port for both the HTTP Server and the multi-threading socket.
- If your PySpark job is running in a docker container, make sure that both the server and multi-thread socket *IP addresses* are the same of that container
- **Don't use the same (address, port) couple for the HTPP Server and the Sockets**
- The "REQUEST_COMPLETENESS" field in this file allow the user to choose if obtain a raw body (JSON Format) or the whole request (with HTTP Headers) to work with a NGSIEvent Object
- The SOCKET_BUFFER field allow the user to increment the socket buffer to match his needs.
- The MAX_CONCURRENT_CONNECTIONS field allow the user to set the maximum concurrent connections of the main socket. It is suggested to keep this number sufficiently high. **Please, remember that the number of EFFECTIVE_CONCURRENT CONNECTIONS = (MAX_CONCURRENT_CONNECTIONS - 1) since 1 connection is reserved by the pyspark socket.**
- Make a subscription in the Context Broker, inserting the same HTTP server address and port you chose for the configuration file.
- Import all PySpark functions needed for starting the Spark Streaming:
- Import the connector library and all PySpark functions needed for starting the Spark Streaming:
```python
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import StorageLevel
import orion_pyspark_connector as connector
from FPC import connector
```
- The connector is configured with default values. If needed, manually configure the connector using the **RECV_SINGLETON** instance of the replier configurer class:
```python
connector.RECV_SINGLETON.http_address = # The machine IP address / Docker container IP if running in docker
connector.RECV_SINGLETON.http_port = # Desired Port
connector.RECV_SINGLETON.socket_address = # localhost is fair enough / Docker container IP if running in docker
connector.RECV_SINGLETON.socket_port = # Desired port
connector.RECV_SINGLETON.request_completeness = # True if full HTTP packet is needed / False if only body strings are needed
connector.RECV_SINGLETON.socket_buffer = # 2048 by default, use more if needed
connector.RECV_SINGLETON.max_concurrent_connections = # 20 suggested by default, use more if needed.
```
**WARNING: Don't use the same (address, port) couple for the HTPP Server and the Sockets**
**WARNING: Remember that the number of EFFECTIVE_CONCURRENT CONNECTIONS = (MAX_CONCURRENT_CONNECTIONS - 1) since 1 connection is reserved by the pyspark socket.**

- Make a subscription in the Context Broker, inserting the same HTTP server address and port you chose for the configuration file.

- Obtain a SparkContext by configuring a SparkSession
```python
from pyspark.sql import SparkSession
Expand All @@ -30,15 +37,13 @@ conf = SparkConf().setAppName("YOURAPPNAME").set(YOURRESOURCEMANAGER, YOURMASTER
sc = spark.SparkContext(conf = conf)
```
with *n_nodes > 1*
- Run the connector, passing the SparkContext, the number of seconds of your sliding window and the desired storage level. You can also change configuration from your code, before starting the connector:

- Run the connector, passing the SparkContext, the number of seconds of your sliding window and the desired storage level:
```python
# Change configuration, i.e. HTTP endpoint address and port:
connector.RECV_SINGLETON.http_address = "localhost"
connector.RECV_SINGLETON.http_port = 10025
#Start the connector
record, streamingcontext = connector.Prime(sc, YOUR-DESIRED-NUMBER-OF-SECONDS, StorageLevel.MEMORY_AND_DISK_2)
```
The connector will receive data from the broker and its bhaviour is based on both the configuration file (if it accepts only body or whole request) and the type of request arrived on the HTTPServer, automatically deciding if the request contains a NGSIv2 or NGSI-LD data. The function above returns both the stream data to be processed (via PySpark mapping) and the streaming context itself. Please, refer to NGSIv2 or NGSI-LD base classes in the `connectorconf.py` file to understand their structure.

- Run the streaming context, like the example below:
```python

Expand All @@ -64,14 +69,18 @@ ssc.awaitTermination()
### Replier


- Modify the `connectorconf.py` file to change the Blueprint file path, the API URL and the HTTP method, choosing from "POST", "PUT" and "PATCH". Moreover you need to specify some header fields like the content-type (default application/json) and both fiware service and service_path. Moreover, in this configuration file it is possible to write a custom *placeholder string* to use in the request body blueprint

- In you PySpark job import the connector library and set up your configuration by accessing the replier configuration, i.e:
- In you PySpark job import the connector library and set up your configuration by accessing the singleton instance of the replier configuration class, i.e:
```python
import orion_pyspark_connector as connector

connector.REPL_SINGLETON.api_url = "http://localhost:1026/v2/entities/MyProduct1/attrs/price/"
connector.REPL_SINGLETON.api_methid = "PUT"
from FPC import connector

connector.REPL_SINGLETON.api_url = # Insert a valid CB API URL
connector.REPL_SINGLETON.api_method = # Choose among "POST" "PUT" "PATCH"
connector.REPL_SINGLETON.fiware_service = # Fiware-Service Header for HTTP Requests
connector.REPL_SINGLETON.fiware_servicepath = # Fiware-ServicePath Header for HTTP Requests
connector.REPL_SINGLETON.content_type = # Default set to "application/json; charset=utf-8"
# Here there are complex requests configuration, see below for further details
connector.REPL_SINGLETON.blueprint_file = # Relative path to a blueprint file for complex requests
connector.REPL_SINGLETON.placeholder_string # Placeholder string for complex requests
```
- **The replier can be used in three different modes: structured, unstructured and semi-structured.**

Expand All @@ -95,7 +104,6 @@ response = record.map(lambda x: connector.SemistructuredReplyToBroker(x, '{"exam
response.pprint()
```


- *Unstructured mode*:
- Use the UnstructuredReplyToBroker function, passing only a complete request body (without placeholder)
- In case of JSON bodies, remember that properties and string fields must be enclosed in double quotes, so the whole body should be enclosed in single quotes.
Expand All @@ -117,9 +125,16 @@ The subscribing tool is an optional tool capable of making easy subscription to

- To use the subscribing tool, import it with the following line of code:
```python
import subscribing_tool as sub
from FPC import subscribing_tool as sub
```
**Remember: the subscription tool will use the connectorconf.py file! Remember to add it in the same folder**
**Remember: the subscription tool will use the same configuration of the connector. If needed, configure both the receiver and replier side**
```python
sub.RECV_SINGLETON.http_address = # The machine IP address / Docker container IP if running in docker
sub.RECV_SINGLETON.http_port = # Desired Port
sub.REPL_SINGLETON.fiware_service = # Fiware-Service Header for HTTP Requests
sub.REPL_SINGLETON.fiware_servicepath = # Fiware-ServicePath Header for HTTP Requests
```

- Use the following function, keeping in mind that:
- base_url: is the base url of the context broker. Usually, it would be: "http://ipaddress:port/v2/" or "http://ipaddress:port/ngsi-ld/v1/"
```python
Expand Down
5 changes: 5 additions & 0 deletions docs/requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ Once downloaded this file, open the terminal in the repository folder and run th
pip3 install -r conn_requirements.txt
```

Then it is possible to install the connector through the pip packet manager:
```console
pip3 install fiware-pyspark-connector
```

Now every required library to run the connector is ready. <br />
**N.B:** The requirements files contains requirement for the connector and the primer. Spark itself is omitted since it is assumed that this library is already installed. If spark is needed, use the following guide:

Expand Down
3 changes: 2 additions & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

### Medium Term

- [ ] Make the connector available with a docker image **Working**
- [x] Make a working environment available with a docker image **Completed on 21/06/2023**
- [x] Subscribing tool **Completed on 20/09/2022**
- [x] Collapsed receiver and replier into one library and one configuration file **Completed on 20/09/2022**
- [x] Make the connector available as python package with pip **Completed on 21/06/2023**
- [ ] Efficiency improvements
- [x] Better Socket management (automatic ports) **Completed on 20/01/2023**
- [ ] Find a **more elegant** way to keep only Spark sockets in memory without blocking the server
Expand Down
Loading