Saturday, August 06, 2016

Getting started with Apache Bahir MQTT Structured Streaming for Apache Spark

Structured Streaming (a.k.a. continuous application) has been introduced in Apache Spark 2.0 and is currently labeled as an "Alpha Release".

In Apache Bahir, we have started moving the extensions that initially supported the Apache Spark Discretized Streams (DStreams) to also support this new Structured Streaming.

Prashant Sharma has contributed SQL-Streaming-MQTT to Apache Bahir. This extension enables users of Apache Spark to process data from IoT sources that support the MQTT protocol using the SQL programming model of Structured Streaming.

Getting Started with MQTT Structured Streaming

MQTT Server

First, let's bring-up a Mosquitto server, which implements the MQTT protocol, using a public available docker image.

docker run -ti --name mosquitto -p 1883:1883 -p 9001:9001 toke/mosquitto

MQTT Client

Now let's use Paho, a MQTT client, to connect to the MQTT server and push some test data to a topic that we will use in the Spark application.

First, Please go to the Paho website and download the client based on the platform you are using.

Now start the client application and create a new connection (and accept the default values) :
MQTT Structured Streaming Spark Application Let's see below the main parts of a MQTT Structured Streaming Spark application that counts the words of a given MQTT topic.

First, we create a SparkSession and create a dataset representing the stream of data from the MQTT topic.
 SparkConf sparkConf = new SparkConf().setAppName("JavaMQTTStreamWordCount");

 SparkSession spark = SparkSession.builder()
     .config(sparkConf)
     .getOrCreate();

 Dataset lines = spark
     .readStream()
     .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
     .option("topic", topic)
     .load(brokerUrl).select("value").as(Encoders.STRING());

Now we process the lines read from MQTT topic, and split it into words.
 // Split the lines into words
 Dataset words = lines.flatMap(new FlatMapFunction() {
     @Override
     public Iterator call(String x) {
          return Arrays.asList(x.split(" ")).iterator();
     }
 }, Encoders.STRING());

And last, we calculate the word counts, and print the results to console.
 Dataset wordCounts = words.groupBy("value").count();

 StreamingQuery query = wordCounts.writeStream()
     .outputMode("complete")
     .format("console")
     .start();

 query.awaitTermination();
The complete application is available at the Bahir source repository.

Running the MQTT sample application from Apache Bahir

Assuming you have a local standalone Spark Server on your system, use the spark-submit below to run the application:
spark-submit --master spark://127.0.0.1:7077 \
             --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.0-SNAPSHOT \
             --class org.apache.bahir.examples.sql.streaming.mqtt.JavaMQTTStreamWordCount \
             sql-streaming-mqtt/target/spark-sql-streaming-mqtt_2.11-2.1.0-SNAPSHOT-tests.jar \
             tcp://localhost:1883 bahir
And use the Paho MQTT Client to start publishing data to the MQTT topic.

Which then will start printing the word counts on the console, like below :

+-----+-----+
|value|count|
+-----+-----+
| some|    1|
|words|    1|
|other|    1|
+-----+-----+

Getting involved with Apache Bahir

Apache Bahir is an open source project that provides extensions to distributed analytic platforms such as Apache Spark. It is run by volunteers and welcome help from others on the community. Please see our community page for information on how to get involved.

No comments:

Post a Comment