Once installed the requirements, it is possible to use the connector by following these steps: - Load files on the same machine running the spark job - Modify the file in the repository and search for the ReceiverConfiguration class to set up the IP address and port for both the HTTP Server and the multi-threading socket. - If your PySpark job is running in a docker container, make sure that both the server and multi-thread socket IP addresses are the same of that container - Don't use the same (address, port) couple for the HTPP Server and the Sockets - The "REQUEST_COMPLETENESS" field in this file allow the user to choose if obtain a raw body (JSON Format) or the whole request (with HTTP Headers) to work with a NGSIEvent Object - The SOCKET_BUFFER field allow the user to increment the socket buffer to match his needs. - The MAX_CONCURRENT_CONNECTIONS field allow the user to set the maximum concurrent connections of the main socket. It is suggested to keep this number sufficiently high. Please, remember that the number of EFFECTIVE_CONCURRENT CONNECTIONS = (MAX_CONCURRENT_CONNECTIONS - 1) since 1 connection is reserved by the pyspark socket. - Make a subscription in the Context Broker, inserting the same HTTP server address and port you chose for the configuration file. - Import all PySpark functions needed for starting the Spark Streaming:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import StorageLevel
import orion_pyspark_connector as connector
  • Obtain a SparkContext by configuring a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("YOURAPPNAME").getOrCreate()
sc = spark.SparkContext

Alternatively, you can create a new SparkContext with your custom configuration

sc = spark.SparkContext(conf = conf)

with n_nodes > 1 - Run the connector, passing the SparkContext, the number of seconds of your sliding window and the desired storage level. You can also change configuration from your code, before starting the connector:

# Change configuration, i.e. HTTP endpoint address and port:
connector.RECV_SINGLETON.http_address = "localhost"
connector.RECV_SINGLETON.http_port = 10025
#Start the connector
record, streamingcontext = connector.Prime(sc, YOUR-DESIRED-NUMBER-OF-SECONDS, StorageLevel.MEMORY_AND_DISK_2)

The connector will receive data from the broker and its bhaviour is based on both the configuration file (if it accepts only body or whole request) and the type of request arrived on the HTTPServer, automatically deciding if the request contains a NGSIv2 or NGSI-LD data. The function above returns both the stream data to be processed (via PySpark mapping) and the streaming context itself. Please, refer to NGSIv2 or NGSI-LD base classes in the file to understand their structure. - Run the streaming context, like the example below:

def MyProcessFunction:
   return result

Processing steps:
   - Flattening entity list from event
   - For each entity, takes 'someAttribute' and process its value with the previously defined function
processed_record = record.flatMap(lambda x: x.entities).map(lambda x : MyProcessFunction(x.attrs['someAttribute].value))

# Sink the result to trigger mapping

# Start the above workflow until termination


  • Modify the file to change the Blueprint file path, the API URL and the HTTP method, choosing from "POST", "PUT" and "PATCH". Moreover you need to specify some header fields like the content-type (default application/json) and both fiware service and service_path. Moreover, in this configuration file it is possible to write a custom placeholder string to use in the request body blueprint

  • In you PySpark job import the connector library and set up your configuration by accessing the replier configuration, i.e:

import orion_pyspark_connector as connector

connector.REPL_SINGLETON.api_url = "http://localhost:1026/v2/entities/MyProduct1/attrs/price/"
connector.REPL_SINGLETON.api_methid = "PUT" 
  • The replier can be used in three different modes: structured, unstructured and semi-structured.

  • Structured mode:

  • Create a .txt file with the wanted request body, using the placeholder string decided in the configuration file every time a field has to be completed by the output of the PySpark algorithm
  • If the algorithm produces more than one value, make sure that the incoming values are ordered with respect to the wanted fields
  • Take in account that this method is slower than the others (since files are read from disk) and it fits well when completing large bodies
  • Use the ReplyToBroker function passing the values from the algorithm
response = x: connector.ReplyToBroker(x))
  • Semi-structured mode:
  • Use the SemistructuredReplyToBroker function passing both values and request body with placeholders decided in the configuration file
  • If the algorithm produces more than one value, make sure that the incoming values are ordered with respect to the body fields
  • This method is faster than the structured one, but it fits for small request bodies
  • In case of JSON bodies, remember that properties and string fields must be enclosed in double quotes, so the whole body should be enclosed in single quotes like in the following example (i.e: the replace string configured is %%PLACEHOLDER%%):
response = x: connector.SemistructuredReplyToBroker(x, '{"example" : %%PLACEHOLDER%% }'))
  • Unstructured mode:
  • Use the UnstructuredReplyToBroker function, passing only a complete request body (without placeholder)
  • In case of JSON bodies, remember that properties and string fields must be enclosed in double quotes, so the whole body should be enclosed in single quotes.
  • Have particular care in constructing the request, making sure that no value is escaped
  • Make sure that every value x from the algorithm is casted to string by using the str() keyword
  • This method fits well when the algorithm returns very complex structures (i.e: an entire NGSI Entity) to insert in very small requests
  • This method is the fastest one, but it fits for small request bodies and is more error prone that the others
response = x: connector.UnstructuredReplyToBroker('{"price" :' + str(x.attrs["price"].value) +' }'))

Subscription Tool

The subscribing tool is an optional tool capable of making easy subscription to the context broker. It provides a API allowing to browse the entity space in the context broker, select one of them and then selecting the attributes to return in the subscription.

Currently, conditions are not implemented. Subscriptions with condition will be implemented later

  • To use the subscribing tool, import it with the following line of code:
import subscribing_tool as sub

Remember: the subscription tool will use the file! Remember to add it in the same folder - Use the following function, keeping in mind that: - base_url: is the base url of the context broker. Usually, it would be: "http://ipaddress:port/v2/" or "http://ipaddress:port/ngsi-ld/v1/"

  • The algorithm will browse the base url, showing a list of the currently existing entities.
  • Select an entity among the existing ones by typing the name (case insensitive)
  • The algorithm will show a numbered list of attributes
  • Type the attribute number to add it into a list of selected attributes for the subscription
  • Type >QUIT to exit the selection list
  • Type >ALL to copy the whole list and exit the selection
  • Now type the attribute number to add it into a list of condition attributes for the subscription (v2 Only)
  • Type >QUIT to exit the selection list if at least an attribute is selected
  • Type >ALL or >QUIT if no attribute is selected to copy the whole list and exit the selection
  • Type the description you want to put
  • If the algorithm succeeds, it will show the success message and the subscription is posted on the context broker