Pyspark DataFrame

This PySpark DataFrame blog is helpful for those who want to start a career in Big Data and Data Science. In this PySpark DataFrame blog, you will learn what DataFrame is, the need for DataFrames, and different methods to create DataFrames

PySpark DataFrame is an incorporated data structure with the available API known as Spark data frame. It is the library developed in Python for running Python applications through Apache Spark capabilities, and through PySpark, we can run the applications parallelly on multiple nodes. PySpark has been widely utilized in Machine Learning and Data Science community because there are various broadly utilized data science libraries developed in Python, including TensorFlow and NumPy. 

It is also utilized because it effectively processes massive datasets or big data. Leading organizations like Trivago, Walmart, and Sanofi use PySpark for big data processing. So, big data professionals and data scientists should learn PySpark. In this PySpark DataFrame, we will introduce you to all the fundamental and essential concepts of PySpark DataFrames.

Table of Contents

What is a DataFrame?

A data frame is defined as a two-dimensional labeled data structure with columns of different types. We can consider the dataframe as a spreadsheet, dictionary of series objects, or SQL table. Apache Spark DataFrame offers a group of functions(Join, Aggregate, Select Columns, Filter) that enable us to resolve data analysis problems effectively.

Apache Spark DataFrames are the generalization developed on top of the Resilient Distributed Datasets(RDDs). Spark SQL and Spark DataFrames utilize the unified optimization and planning engine, enabling you to perform similarly throughout all the supported languages on Databricks(SQL, R, Scala, and Python).

If you want to Enrich your career with an PySpark Certified Professional, then visit Mindmajix - A Global online training platform: “PySpark Certification Course”.  This course will help you to achieve excellence in this domain.

Why is DataFrames Beneficial?

  • Pyspark DataFrames are helpful for machine learning activities because they can integrate plenty of data.
  • They are the basic data structures that are easy to assess and control.
  • DataFrame in Pyspark can manage the petabytes of data.
  • It contains API support for programming languages like Java, Scala, R, and Python.
  • They are regularly utilized as data sources for data visualization and can store tabular data.
  • Compared to RDDs, customized memory management reduces the overload and strengthens performance.

Pyspark DataFrame Features

Following are the features of Pyspark DataFrame:

1) Distributed

DataFrames are the distributed collections organized into columns and rows in the Pyspark. DataFrames have types and names for every column. DataFrames are analogous to traditional database tables in that they are brief and organized.

2) Lazy Evaluation

Scala can be executed quickly, and Spark is developed in Scala. Spark’s default execution mode is lazy. It indicates that until the action is invoked. No operations on the DataFrame, Dataset, or RDD are ever calculated.

3) Immutable

Immutable storage contains datasets, data frames, and resilient distributed datasets(RDDs). The term “Immutability” indicates the “inability to change” when utilized with the object. In comparison to Python, these data frames are immutable and offer resilience while manipulating columns and rows.

MindMajix YouTube Channel

Setup Apache Spark

To perform the DataFrame operations, we have to set up Apache Spark in the machine. Follow this step-by-step procedure for installing Apache Spark:

Operating System: Ubuntu 64-bit. 

Software: Java 7+, or R 3.1, or Python 2.6

Steps for installing Apache Spark

Step1: Open the installation terminal

Step2: Install Java

$ sudo apt-add-repository ppa:webupd8team/java

$ sudo apt-get update

$ sudo apt-get install oracle-java7-installer

Accept the Java license terms and press “yes” to proceed. After installing the java, we can check its version by executing the below command:

$java -version

Step3: After java installation, install scala

$cd ~/downloads

$ wget http://WWW.scala-lang.org / files/ archive/ scala-2.11.7.deb

$ sudo dpkg -i scala-2.11.7.deb

$ scala -version

Step 4: Install the py4j

Py4j is utilized on the driver for the local interaction between Java and Python SparkContext objects: massive data transfers are carried out using a different procedure:

$sudo pip install py4j

Step 5: Install Spark

As we have installed the dependencies, we can now install Apache Spark. So, download and extract the Spark source tar. By using the wget, we can get the Apache Spark latest version:

$ cd ~/downlaods 

$ wget http://d3kbcqa4mib13. cloudfront .net/spark-1.6.0.tgz

$ tar xvf spark-1.6.0.tgz

Step 6: Extracted Source Compilation

sbt is the freeware build tool for Java and Scala projects which is same as Java’s maven.

$ cd ~/Downloads/spark-1.6.0

$ sbt/sbt assembly

After installing the spark, we can check whether the Spark is running correctly by executing the following command:

$ ./bin/run-example SparkPi 10

this will give you the following output:

Pi is approximately 3.147

How to create a DataFrame?

In Apache Spark, a DataFrame will be created in multiple ways:

1.Create a DataFrame from RDD

The best way to create the Pyspark DataFrame is from the available RDD. First, let us create the Spark RDD from the collection. Catalogue by invoking parallelize() function from the SparkContext. We would require the rdd object for the following examples:

spark = SparkSession.builder.appName(‘SparkByExamples.com’).getOrCreate()
rdd = spark.sparkContext . parallelize(data)
Using toDF() function
        The “toDF()” method is used for creating the DataFrame from the available RDD. As RDD does not include columns, the DataFrame can be created with the default columns “_3” and “_4” since we include two columns.
       dfFromRDD1 = rdd.toDF()
      dfFromRDD1.printSchema()
      The above code gives the DataFrame Schema with the column names. We will utilize the “show()” method on the PySpark DataFrame for showing the DataFrame.
      root
      language: string (nullable = true)
     users: string(nullable = true)

    The columns datatype deduces the data types. We can modify this behavior by providing the schemas, where we can define the datatype, column name, and nullable to every field or column.

  • Creating DataFrame by utilizing createDataFrame() from the SparkSession

         We can use createDataFrame() from the SparkSession for creating, and it will take RDD object as the argument and pair it with toDF() for specifying the columns name.

         dfFromRDD2 = spark.CreateDataFrame (RDD).toDF(*columns)

2. Create DataFrame from the List Collection

We can create the DataFrame from the List. This method would be the same as the above method, but we will utilize the list data object rather than the “RDD” object for creating the DataFrame.

  • Creating DataFrame by invoking createDataFrame() from the SparkSession

Invoking createDataFrame() from the SparkSession is the other method for creating PySpark DataFrame; it fetches the list object as the argument and pairs it with toDF() for specifying the columns name.

dfFromData2 = spark.createDataFrame(data).toDF(*columns)

  • Using the createDataFrame() with the Row type

createDataFrame() contains an alternative signature in the PySpark, which take the Row type collection and schemas for the column names as arguments. For using this, first, we have to transform our “data” object from the list to Row list

rowData = map(lambda x: Row(*x), data)

dfFromData3 = spark.CreateDataFrame(rowData, columns)

  • Create the DataFrame using the Schema

If we want to define the names of the columns with their data types, we must create StructType Schema and next assign that when we create the DataFrame.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data2 = [("James, "Smith," "36636", "M," 3000),

    ("Michael," "Rose, "40288", "M," 4000),

     ("Robert," "Williams," "42114", "M," 4000),

  ("Maria," "Anne," "Jones," "39192", "F," 4000),

    ("Jen," "Mary," "Brown," "F,"-1)

  ]
schema = StructType([ \

    StructField("firstname",StringType(),True), \

    StructField("middlename",StringType(),True), \

    StructField("lastname",StringType(),True), \

    StructField("id", StringType(), True), \

    StructField("gender", StringType(), True), \

    StructField("salary", IntegerType(), True) \

  ])

df = spark.createDataFrame(data=data2,schema=schema)

df.printSchema()

df.show(truncate=False)

 

3. Create DataFrame from the Data sources

 In the real world, we can create the DataFrame from the data sources like Text, CSV, XML, JSON, etc. By default, PySpark supports various data formats without importing the libraries, and for creating the dataframes we have to utilize the right method existing in the “DataFrameReader” class.

  • Creating the DataFrame from the CSV

By using the CSV() method of the “DataFrameReader” object for creating the DataFrame from the CSV file, we can also give the options like what delimiter to utilize, either you have quoted the data, infer a schema, date formats, etc. 

df2 = spark.read.csv(“/src/resources/file.csv”)

  • Creating the DataFrame through the text(TXT) file

Likewise, we can also create the DataFrame by reading from the Text file and utilize the “text()” function of DataFrame for doing

df2 = spark. read.text(“/src/resources/file.txt”)

  • Creating from the JSON file

PySpark is utilized for processing semi-structured data files like the JSON format. We can use the JSON() function of DataFrameReader for reading the JSON files into the DataFrame. Following is the example:

df2 = spark.read.json(“/src/resources/file1.json”)

4. Create a DataFrame with Python

Almost every SQL query returns a data frame. This contains a reading from the table, loading the data from the files, and operations that convert the data.

We can create the Spark DataFrame from the list or the Pandas DataFrame, like in the below example:

Python

import pandas as pd1

data = [[1. “Praveen”], [2, “Vivek”], [3, “Pramod”]]

pdf = pd.DataFrame(data, columns=[“id”, “name”])

df1 = spark.createDataFrame(pdf)

df2 = spark.createDataFrame(data, schema= “id LONG, name STRING”)

5. Other Sources(Avro, Parquet, Kafka, and ORC)

We can create the DataFrame by reading the Parquet, ORC, Avro, and Binary files, using the HBase and Hive table, and reading the data from the Kafka.

DataFrame Manipulation

How to see the datatypes of the Columns?

For seeing the types of columns in the DataFrames, we can utilize the printSchema and dtypes. Let us apply printSchema() on the bus to get the Schema in the tree format.

bus.printSchema()

Output:

root

U_ID: integer (nullable = true)

P_ID: integer (nullable = true)

Gender: string (nullable = true)

Age: string (nullable = true)

Profession: integer (nullable = true)

City: string (nullable = true)

Current_City_Years: string (nullable = true)

P_Categpry:_1 integer (nullable = true)

P_Category_2: integer (nullable = true)

P_Category_3: integer (nullable = true)

Price: integer (nullable = true)

 

From the above output, we can observe that we have captured the schema or data types of every column while reading from the CSV file.

1. How to count the rows in DataFrame?

We utilize the “count” operation for counting the number of rows in the DataFrame. Let us apply “count” operation on the train & test files for counting the number of rows.

bus. count(),

sample.count()

output:

(345679, 145783)

we have 345679 and 145783 rows in bus and test1

2. What if I want to remove some categories of the Product_ID column in the test that are not present in the Product_ID column in bus?

We can utilize the user-defined function(UDF) for removing the categories of the column which are in the sample but not in the bus.

diff_cat_in_bus_test1=test1.select(‘P_ID’).subtract(bus.select(‘P_ID’))

diff_cat_in_bus_test1.distinct().count()

Output:

36

 

We got 36 categories in test1. To remove these categories from the test1 “P_ID” column, we will apply the following steps:

  • Create the unique list of categories called “not_found_cat” from the “diff_cat_in_bus_test1” through the map operation.
  • Register a user-defined function(UDF)
  • User-defined function will take every element of the test column and search for this in the “not_found_cat” list, and it will give -1 if it finds it in the list. 

Now let us see how to implement it.

First, create “not_found_cat.”

not_found_cat = diff_cat_in_bus_test1.distinct().rdd.map(lambda x: x[0]).collect()

len(not_found_cat)

Output:

35

To register the user-defined function, we have to import “StringType” from the Pyspark.sql and UDF from the pyspark.sql.functions. The user-defined function takes 2 parameters as the arguments:

  • Function
  • Return type

from pyspark. Sql. types import StringType

from pyspark. sql. functions import udf

 

F1 = udf(lambda x: ‘-1’ if x in not_found_cat else x, StringType())

After registering the “bus” as the table using “registerAsTable” operation, we apply the SQL queries on the “bus_table” for selecting the “P_ID”; the SQL query result will be the DataFrame. We have to apply the action to get the result.

sqlcontext.sql(‘select P_ID from bus_table’).show(5)

Output:

P_ID

------------

P00001276

P00006754

P00008765

P00007543

P00008324

Pandas vs. PySpark DataFrame

A DataFrame depicts a data table that contains rows and columns; dataframe methodologies never change in the programming, but Pandas DataFrame and Spark DataFrame differ from each other.

Pandas DataFrame

Pandas is the freeware python library as per the Numpy library. It is the python package that allows us to modify the numerical data and time series through various data structures and operations. It is mainly used for simplifying the data import and analysis. Pandas DataFrame is a heterogeneous two-dimensional tabular data structure with labeled axes(columns and rows). The rows, columns, and data are the three primary elements of the Pandas DataFrame.

Advantages

1) It can Manipulate the data by indexing, sorting, merging, and renaming data frames.

2) Pandas will simplify updating, deleting, and adding columns.

3) It will support a variety of file formats.

4) In Pandas DataFrame, the processing time is very high because of the built-in function

Disadvantages

  1. When we use a vast dataset, manipulation becomes difficult.
  2. Processing time will be slow in data manipulation.

PySpark DataFrame

Spark is the system for cluster computing. In comparison to other cluster computing systems, it is rapid. It contains python, java, and scala APIs. Spark is the most popular Apache project, processing massive datasets. PySpark is the Spark library developed in Python for running python applications through Apache Spark capabilities. Using PySpark, we can run the applications parallelly on distributed clusters or a single node.

Advantages

1) Spark carries APIs for working with a large dataset.

2) It supports MAP, Machine Learning, Streaming Data, Graph Algorithms, SQL Queries, etc.

3) Spark utilizes in-memory for computation.

4) It provides 80 high-level operators for developing the parallel applications.

Disadvantages

1) Little Files Issue.

2) Not Many Algorithms

3) No automatic optimization process

Pandas DataFramePySpark DataFrame
1) It will not support parallelization.1) It supports parallelization.
2) It contains a single node.2) It contains multiple nodes
3) It adapats Eager Execution, which indicates tha task is executed instantly.3) It adapts Lazy Evaluation which indicates that the task is executed until the action is executed.
4) It is Mutable.4) It is Immutable.
5) In Comparison to PySpark DataFrame, we can perform the complex operations easily.5) In Comparison to Pandas DataFrame, it is difficult to perform complex operations.
6) It is not distributed and therefore processing the pandas data frame is slower for huge amounts of data.6) It is distributed and therefore processing the spark data farme is rapid for huge amounts of data.
7) pandaDataFrame.count() method gives the number of non NA or null observations for every column.7) sparkDataFrame.count() method gives the number of rows.
8) It cannot be utilized for developing the scalable applications.8) It is suitable for developing the scalable applications.
9) It does not ensures fault tolerance. We have to execute our framework for ensuring it.9) It ensures fault tolerance.

Frequently Asked Questions

1.What is a PySpark DataFrame?

DataFrames are defined as the distributed data collection organized into columns and rows. DataFranes will have types and names for every column.

2.Is PySpark faster than Pandas?

Yes, because of parallel execution, PySpark is faster than Pandas.

3. Is PySpark the same as Pandas?

No, PySpark and Pandas are not same. PySpark is the library to work with massive datasets in the distributed computing environment, whereas Pandas is suitable to work with smaller and tabular datasets in a single machine.

4. What is PySpark, and Why is it used?

PySpark is the Python API for the Apache Spark, a freeware, distributed computing framework, and a group of libraries for large-scale and real-time data processing. PySpark is suitable for more scalable pipelines and analysis. It allows you to interface with RDDs(Resilient Distributed Datasets) by utilizing Py4j library. 

5. Which IDE is best for PySpark?

The most commonly used IDE is IntelliJ IDEA.

6. Is PySpark faster than SQL?

Yes, PySpark is faster than SQL.

7. Is PySpark an ETL?

Yes, PySpark is an ETL tool utilized to build ETL pipelines for large datasets.

8. Is PySpark Difficult to Learn?

No, PySpark is not difficult to learn. It is user-friendly as it includes APIs developed in popular programming languages like Python and Java, which is easy for developers.

9. Should I Learn Pandas or PySpark?

Pandas execute operations on a single machine, whereas PySpark works on the multiple machines. If we are working on the machine learning applications where you are handling large datasets, PySpark is suitable.

10. Is PySpark needed for data science?

 PySpark is a precious tool for data scientists because it can simplify the process of converting prototype models into production-grade model workflows.

11. Is PySpark enough for Big Data?

PySpark is suitable for Big Data because it runs almost every computation in memory and consequently offers better performance for the applications like interactive data mining.

12. Will PySpark replace Pandas?

Pandas and Spark are complementary to each other and have their cons and pros. Whether to use PySpark or Pandas relies on your application.

Conclusion

PySpark DataFrames are the data arranged in the tables that have columns and rows. We can call the data frame a spreadsheet, SQL table, or dictionary of the series objects. It offers a wide variety of functions, like joins and aggregate, that enable you to resolve data analysis problems. I hope this PySpark DataFrame blog helps you get a detailed understanding of PySpark DataFrame. If you have any queries, let us know by commenting below.

Job Support Program

Online Work Support for your on-job roles.

jobservice

Our work-support plans provide precise options as per your project tasks. Whether you are a newbie or an experienced professional seeking assistance in completing project tasks, we are here with the following plans to meet your custom needs:

  • Pay Per Hour
  • Pay Per Week
  • Monthly
Learn MoreGet Job Support
Course Schedule
NameDates
Python TrainingNov 23 to Dec 08View Details
Python TrainingNov 26 to Dec 11View Details
Python TrainingNov 30 to Dec 15View Details
Python TrainingDec 03 to Dec 18View Details
Last updated: 07 Oct 2024
About Author

Viswanath is a passionate content writer of Mindmajix. He has expertise in Trending Domains like Data Science, Artificial Intelligence, Machine Learning, Blockchain, etc. His articles help the learners to get insights about the Domain. You can reach him on Linkedin

read less
  1. Share:
Python Articles