Extracting IP Addresses and HTTP Codes in PySpark

Convert an RDD from a text file into a DataFrame - PySpark

In this tutorial, we'll learn how to convert an RDD from a text file into a DataFrame. Using log data, we'll extract IP addresses and HTTP status codes with PySpark, and then create a DataFrame to store this information for further analysis.

Step 1: Sample Log Data

Let's consider the following sample log data: click here to download

83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
83.149.9.216 - - [17/May/2015:10:05:43 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1" 200 171717 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
  

Step 2: Initialize a Spark Session

First, we need to initialize a Spark session in PySpark:

from pyspark.sql import SparkSession
spark_session=SparkSession.builder.master("local").appName("read log file using pyspark").getOrCreate();
  

Step 3: Create an RDD from Log Data and split it using '-'

Next, we'll create an RDD (Resilient Distributed Dataset) from the log data:

raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt")
rdd=raw_rdd.map( lambda  line: line.split('-') )
rdd.foreach(lambda data:print(data)) #for local
log file to dataframe

Step 3: Process each line

def process(list):
output_list=[]
output_list.append(list[0])
if "HTTP/1.1\" 200" in list[2]:
output_list.append("200")
elif  "HTTP/1.1\" 404" in list[2]:
output_list.append("404")
else:
output_list.append("unknown")
return output_list
transformed_rdd=rdd.map(process)

Step 4: Convert to a Dataframe

final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) )
df=spark_session.createDataFrame(final_rdd,schema)
df.show()

Complete Code:


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

def process(list):
    output_list=[]
    output_list.append(list[0])
    if "HTTP/1.1\" 200" in list[2]:
    output_list.append("200")
    elif  "HTTP/1.1\" 404" in list[2]:
    output_list.append("404")
    else:
    output_list.append("unknown")
    return output_list

spark_session=SparkSession.builder.master("local").appName("test").getOrCreate();
raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt")
rdd=raw_rdd.map( lambda  line: line.split('-') )

rdd.foreach(lambda data:print(data))
transformed_rdd=rdd.map(process)

schema=StructType( [ StructField("ip",StringType(),True),StructField("http_code",StringType(),True)])
final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) )
df=spark_session.createDataFrame(final_rdd,schema)
df.show()
log file to dataframe pyspark