Spark and Standalone Clusters
Sometimes, you might need to install Spark in a standalone mode. In this case, a compiled version of Spark has to be placed on each node of the cluster.
How to Restart a Cluster Manually
If you need to manually start a standalone master serve, just execute the following command:
You will realize that after starting the master, a spark://HOST:PORT URL will be printed out and this can be used for the purpose of connecting the workers or for passing the arguments for the master to the SparkContext. This URL can also be found in the web UI for the master. The default URL for this is the one for the local host. At the same time, one can choose to start one or even more workers whom they can connect to the master.
It is after the worker has been started that one can look for the web UI of the master. The new node should be among the ones which are in the list, and the number of CPUs and the size of the memory will be shown. However, for the case of the memory, 1 gigabyte will be subtracted so that it can be used by the OS.
You should also pass the following parameters to both the worker and the master:
- -i IP, –ip IP- this is the DNS name or the IP address which is to be listened to.
- –webui-port PORT- this is the port for the web UI.
- -p PORT, –port PORT- this is the port for which the service is to listen on.
- -d DIR, –work-dir DIR- this is the directory to be used for the scratch space and the logs for the job output. This should only be for the worker.
- -m MEM, –memory MEM- this is the total amount of memory for allowing Spark applications to be used on the machine, in the format like 1000M or 2G.
- -c CORES, –cores CORES- these are the total CPU cores for allowing Spark applications to use on the machine and it should only be on the worker. The default setting is that it should be available for all.
Those are the parameters which should be passed.
Once the cluster has been set up, you should then connect your application to it. To do this, you just have to pass the spark://IP:PORT URL of the master to the constructor of the SparkContext. If you need to execute your Spark shell interactively against the cluster, just execute the following command:
./bin/spark-shell –master spark://IP:PORT
The option –cores can also be passed to the command so as to control the number of cores the Spark shell will use in the cluster.
How to Launch the Compiled Spark Applications
Subscribe to our youtube channel to get new updates..!
The script spark-submit provides us with an effective and straightforward mechanism on how we can submit our Spark application to a cluster once it has been compiled. In the case of standalone clusters, installation of the driver inside the client process is currently supported by the Spark which is submitting the application. However, this is only possible in the client deploy mode.
Once an application has been launched via the Spark submit, the application jar will then be distributed to all of the worker nodes. If there are some additional jars that your application is using, then you should the flag –jars so as to specify them, and a comma should be used as a delimiter. The configuration or the execution environment of the application can also be specified.
Scheduling of Resources
The standalone cluster mode supports only a FIFO scheduler across the applications. However, you might need to involve multiple and concurrent users in the app. If this is the case, then one is encouraged to control the maximum number of resources that each of the applications will make use of. By default, the application will obtain all of the cores contained in the cluster, and this will be effective only when you run a single application at a time. The number of cores can be controlled by setting the spark.cores.max in the file SparkConf. An example of this is given below:
val con = new SparkConf()
val sc = new SparkContext(conf)
Also, the parameter spark.deploy.defaultCores can also be configured on the cluster master process so that the default settings for the application can be changed and the setting which does not set the parameter spark.cores.max to something which is less than infinite. To do this, open the file “conf/spark-env.sh” and then add the following line of code to it:
The above feature is very useful in the case of shared clusters and in which the users may have not configured the maximum number of cores on their individual machines.
Logging and Monitoring
With the standalone mode of Spark, a web based user interface is provided, which enables us to effectively monitor the cluster. The worker and the master are provided with their own web UI which is responsible for the showing of job statistics and the cluster. The default setting is that the web UI for the master can be accessed at the port 8080. However, one can choose to change this port, and this can be done either via the command line options or via the configuration file.
In the directory “SPARK_HOME/work,” one can find a detailed output log for each of the jobs which is executed. Two files for each of the jobs will be provided, that is, stderr and stdout and all the output which was written to the console will be provided here.
Running Spark Alongside Hadoop
Spark can be used alongside a Hadoop cluster which is in existence, and this can be done by launching it as a separate service in your machines. For those who want to access the data for Hadoop but from Spark, just use the hdfs:// URL, but the right URL for doing this can be found from the Namenode URL for Hadoop. A separate cluster for the Spark can also be set, and it will also be in a position to access the HDFS via the network. However, this will take much time compared to accessing the local disk, but for those who are running on the same local network, this may not be a concern to them.
You need to know that with Spark, a heavy use of the network will be involved. In some environments, there might be very strict requirements regarding the use of tight firewall settings. There are some ports which you have to configure for this purpose.
In the case of standalone scheduling clusters, these are prone to failures. However, for the purpose of decision-making, the scheduler will use a master. This leads to creation of a single point of failure because in case the master fails to work or crashes, then we will be unable to create new applications. However, in order to solve this problem, there are two schemes on how the availability can be ensured.
Local File Recovery for Single-Node Recovery
With ZooKeeper, the availability in a production environment can easily be ensured. For those who only need to be in a position to restart the master once it goes down, then there is another way how this can easily be done. After the registration of both workers and applications, they will be having an enough state written to the directory which has been provided so that once the master process has been restarted, then these will be recovered.
For the purpose of enabling the recovery mode, the parameter SPARK_DAEMON_JAVA_OPTS can be set by use of the configurations given below:
- spark.deploy.recoveryMode- this property should be set to FILESYSTEM so that the mode for single-recovery can be enabled. The default setting for this is none.
- spark.deploy.recoveryDirectory- this should be the directory in which the recovery state for Spark will be stored. This will be accessible from the perspective of the Master.
Standby Masters with ZooKeeper
This involves making use of ZooKeer so as to perform an election of the leader and a state of storage. With this, multiple masters can be launched in the master which is connected to the instance of ZooKeeper. Once one of them has been elected to be the leader, the rest will be kept in the standby mode. In case the acting leader dies, then another master will have to be elected so that it becomes the leader.
The process of scheduling will then be resumed. Note that the process of recovery should only take between one and two minutes, and this should be from the time that the first leader went down. The delay will only affect the process of scheduling of new applications, and the applications which were in the state of execution during the time of failure of the master will not be affected in any way.
For this mode of recovery to be enabled, the parameter SPARK_DAEMON_JAVA_OPTS should be set to the following configuration:
- spark.deploy.recoveryMode- this property should be set to ZooKeeper for the standby Recovery Mode to be enabled. The default setting for this is NONE.
- spark.deploy.zookeeper.url- this property specifies the ZooKeeper cluster URL.
- spark.deploy.zookeeper.dir- this specifies the directory in the ZooKeeper which should be used for storage of the state for recovery.
Note that for those who might be having multiple masters in the cluster, in case you fail to perform the correct configuration so that masters can use the ZooKeeper, then they will not be in a position to discover each other correctly, and then each will think that it is a leader. This will mean that the state of the cluster will not be a healthy one. The reason is that each of the masters will be scheduled independently.
Once the ZooKeeper cluster has been set up, the process of enabling the high availability will be very easy and straightforward. What you need to do is to start multiple master processes on the different nodes which you have, but the ZooKeeper configuration in all of these nodes will be the same. Feel free to add or remove the masters at any time that you want.
For new applications to be scheduled or for workers to be added to the cluster, the IP address which identifies the node acting as the leader currently should be well known. For you to accomplish this, just pass in a list of the masters where the passing of a single one was done. This will force the SparkContext to begin the registration with both of the masters. This will mean that in case the first host does, the configuration will be okay since a new leader, which will be host two will be found.
However, it will be good for you to know the difference between the normal operation and registration with the master. When an application is in the process of being started or launched, it should find the node which is acting as the current leader, and then perform a registration with it. In case this leader fails or dies, then all of the applications which were previously registered will be contacted together with the workers, and they will be notified of a change in the leadership. This means that they will be notified of the new master during startup.