Batch Processing In Spark
Before beginning to learn the complex tasks of the batch processing in Spark, you need to know how to operate the Spark shell. However, for those who are used to using the Python or the Scala shell, then the better as you can skip this step.
Begin by launching the Scala console. This can be done by typing the following command:
We now need to declare a list having some integers, and this will be given the name “ourNumbers.” The following command can be used for doing this:
scala> val ourNumbers = List(3, 2, 5, 9, 1, 7)
ourNumbers: List[Int] = List(3, 2, 5, 9, 1, 7)
We now need to declare a function which will be used to compute the cube of a particular integer. This can be done as shown below:
scala> def cube(x: Int): Int = x * x * x
cube: (x: Int)Int
The function has been given the name “cube.”
We should then use the map function so as to apply the function which we have created to the numbers that we have. This can be done as shown below:
scala> ourNumbers.map(a => cube(a))
res: List[Int] = List(27, 8, 125, 729, 1, 343)
// there are shorthand ways on how this can be done in Scala.
You have now learned how to perform some operations on the shell. That is how simple it is for one to use.
How to use the Spark shell
With Spark, once one has run the Spark shell, the app ID should be specified, which is connected to the Spark cluster. The app ID will be similar to the application entry as shown in the web UI under the applications which are running. If you just begin by performing a download of a dataset for the purpose of experimentation. The spam dataset can be obtained by running the following command:
It should then be loaded into a textfile in Spark by running the following command on the Spark shell:
With the above command, the file “spam.data” will be loaded into the Spark and each of the lines in the file will be contained in a separate entry of the RDD.
For those who have already been connected to the Spark master, the file might be loaded on one of the machines which are available on the cluster. This is why you should ensure that the file is made available on all of the machines in the cluster. If you need to make this file available on all of the machines, the following command can be used for executing the addFile function which will perform this:
scala> import spark.SparkFiles;
scala> val f = sc.addFile(“spam.data”)
scala> val iFile = sc.textFile(SparkFiles.get(“spam.data”))
Subscribe to our youtube channel to get new updates..!
Note that with the Spark shell, the command history can easily be accessed. The up arrows can be pressed, and the previous commands will be shown. If you do not want to keep on typing each command until it is complete, just press the Tab key and it will be autocompleted. This is also applicable when you are not sure of how to write a particular command. To do a conversion to a useful format, then use the following command:
scala> val numbers = inFile.map(a => a.split(‘ ‘).map(_.toDouble))
Exploring Data by use of Spark
You might need to perform some statics on your data. Begin by starting the Spark shell as shown below:
After a few seconds, you will get the prompt. If you need to clear the log output, just hit the “Enter” key and all will be well.
The next step should be for us to create a RDD named “pagecounts” from the input files which we have. The SparkContext has already been created, and this is the sc. In Scala, this can be done as follows:
res: spark.SparkContext = spark.SparkContext@490d1h30
scala> val pagecounts = sc.textFile(“/wiki/pagecounts”)
In Python, execute the following commands so as to do this:
>>> pagecounts = sc.textFile(“/wiki/pagecounts”)
For you to obtain a specific number of records, you can use the operation “take,” which belongs to the RDD. Consider the example given below:
The above command will give the first five records that you have. The command can be used on both Scala and in Python, and it will execute effectively. However, the result will be in the form of an array and in Scala, the elements will be separated by a comma. This is why the output will not be very readable. To make it more readable, we can traverse the array so that each element in the array is printed on its own line. The following command can be used for that purpose in Scala:
For Python users, then use the following command for this purpose:
>> for a in pagecounts.take(5):
… print a
Sometimes, you might need to know the number of records that are contained in your data set. The following command can be used for this purpose:
For Scala users, the command should be as follows:
The same command should also be n used in Python. However, note that the command will execute for a while, so try to read ahead as the command runs. You will note that the console log will inform you of all of the tasks that will be carried out.
The process of reading from the disk each time we perform an operation on the RDD is a bit tiresome. To avoid this, we have to cache the RDD into the memory. Our Spark will start to shine from this point. The process can be done as follows:
scala> val cPages = pagecounts.filter(_.split(” “)(1) == “en”).cache
In Python, this can be done as follows:
>> cPages = pagecounts.filter(lambda a: a.split(” “) == “en”).cache()
Note that once the above command has been executed on the Spark shell, the RDD will be defined by Spark. However, no computation will be done due to the lazy computation. On the next time when any action has been applied on the cpages, the data will cached in the memory and across the slaves which are contained in your cluster. Two to three minutes will be enough for the Spark to scan your entire data set and you will finally have your results back. However, due to the effect of caching which we have just applied, the results should be returned a bit faster. For those who observe the console log very clean, they will realize that some lines will be available, and these will be an indication that some data was added to the cache.
To know the number of records that you have in your pages, just run the following command:
The command can be executed on both Scala and Python. Note that once the above command has been executed for the first time, it will take two to three minutes so as to execute. However, when you run it for the second time, you will realize that the command will run a bit faster. This is due to the effect of caching the data in the memory.
We need to do something a bit complex. Let us try to generate a histogram from the dataset that we have. If course, this should be in the range of some specified dates. A key value pair for each line should first be generated. The number of pageviews for the date should then be the date. For Scala users, begin by executing the following command:
scala> val cTuples = cPages.map(line => line.split(” “))
The second command should be as follows:
scala> val cKeyValuePairs = cTuples.map(line => (line(0).substring(0, 8), line(3).toInt))
For Python users, execute the following sequence of commands:
>> cTuples = cPages.map(lambda a: a.split(” “))
>>> cKeyValuePairs = cTuples.map(lambda a: (a[:8], int(a)))
The next step should be shuffling of the data and grouping of all the values which have the same key together. The values for each key should then be finally summed together. For this pattern, a method named “reduceByKey” can be used for doing this conveniently. Use the command given below:
scala> cKeyValuePairs.reduceByKey(_+_, 1). Collect
The above example is for Scala users. For the users of Python, the command should be as follows:
cKeyValuePairs.reduceByKey(lambda a, b: a + b, 1).collect()
With the collect method, the RDD which is the result will be converted into an array. A name for the result produced by the command should be created, otherwise, a default name for this will automatically be created. Our last three commands can be combined into a single command. In Scala, this will be as follows:
scala> cPages.map(line => line.split(” “)).map(line => (line(0).substring(0, 8), line(3).toInt)).reduceByKey(_+_, 1).collect
In Python, this should be as follows:
cPages.map(lambda a: a.split(” “)).map(lambda a: (a[:8], int(a))).reduceByKey(lambda a, b: a + b, 1).collect()