In Apache Bahir, we have started moving the extensions that initially supported the Apache Spark Discretized Streams (DStreams) to also support this new Structured Streaming.
Prashant Sharma has contributed SQL-Streaming-MQTT to Apache Bahir. This extension enables users of Apache Spark to process data from IoT sources that support the MQTT protocol using the SQL programming model of Structured Streaming.
Getting Started with MQTT Structured Streaming
MQTT ServerFirst, let's bring-up a Mosquitto server, which implements the MQTT protocol, using a public available docker image.
docker run -ti --name mosquitto -p 1883:1883 -p 9001:9001 toke/mosquitto
MQTT Client
Now let's use Paho, a MQTT client, to connect to the MQTT server and push some test data to a topic that we will use in the Spark application.
First, Please go to the Paho website and download the client based on the platform you are using.
Now start the client application and create a new connection (and accept the default values) : MQTT Structured Streaming Spark Application Let's see below the main parts of a MQTT Structured Streaming Spark application that counts the words of a given MQTT topic.
First, we create a SparkSession and create a dataset representing the stream of data from the MQTT topic.
SparkConf sparkConf = new SparkConf().setAppName("JavaMQTTStreamWordCount");
SparkSession spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate();
Dataset lines = spark
.readStream()
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl).select("value").as(Encoders.STRING());
Now we process the lines read from MQTT topic, and split it into words.
// Split the lines into words Datasetwords = lines.flatMap(new FlatMapFunction () { @Override public Iterator call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING());
And last, we calculate the word counts, and print the results to console.
DatasetThe complete application is available at the Bahir source repository.wordCounts = words.groupBy("value").count(); StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
Running the MQTT sample application from Apache Bahir
Assuming you have a local standalone Spark Server on your system, use the spark-submit below to run the application:
spark-submit --master spark://127.0.0.1:7077 \
--packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.0-SNAPSHOT \
--class org.apache.bahir.examples.sql.streaming.mqtt.JavaMQTTStreamWordCount \
sql-streaming-mqtt/target/spark-sql-streaming-mqtt_2.11-2.1.0-SNAPSHOT-tests.jar \
tcp://localhost:1883 bahir
And use the Paho MQTT Client to start publishing data to the MQTT topic.Which then will start printing the word counts on the console, like below :
+-----+-----+ |value|count| +-----+-----+ | some| 1| |words| 1| |other| 1| +-----+-----+

No comments:
Post a Comment