Extracting IP Addresses and HTTP Codes in PySpark
In this tutorial, we'll learn how to convert an RDD from a text file into a DataFrame. Using log data, we'll extract IP addresses and HTTP status codes with PySpark, and then create a DataFrame to store this information for further analysis.
Let's consider the following sample log data: click here to download
83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" 83.149.9.216 - - [17/May/2015:10:05:43 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1" 200 171717 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
First, we need to initialize a Spark session in PySpark:
from pyspark.sql import SparkSession spark_session=SparkSession.builder.master("local").appName("read log file using pyspark").getOrCreate();
Next, we'll create an RDD (Resilient Distributed Dataset) from the log data:
raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt") rdd=raw_rdd.map( lambda line: line.split('-') ) rdd.foreach(lambda data:print(data)) #for local
def process(list): output_list=[] output_list.append(list[0]) if "HTTP/1.1\" 200" in list[2]: output_list.append("200") elif "HTTP/1.1\" 404" in list[2]: output_list.append("404") else: output_list.append("unknown") return output_list transformed_rdd=rdd.map(process)
final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) ) df=spark_session.createDataFrame(final_rdd,schema) df.show()
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType def process(list): output_list=[] output_list.append(list[0]) if "HTTP/1.1\" 200" in list[2]: output_list.append("200") elif "HTTP/1.1\" 404" in list[2]: output_list.append("404") else: output_list.append("unknown") return output_list spark_session=SparkSession.builder.master("local").appName("test").getOrCreate(); raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt") rdd=raw_rdd.map( lambda line: line.split('-') ) rdd.foreach(lambda data:print(data)) transformed_rdd=rdd.map(process) schema=StructType( [ StructField("ip",StringType(),True),StructField("http_code",StringType(),True)]) final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) ) df=spark_session.createDataFrame(final_rdd,schema) df.show()