Spark & Jupyter Notebook
Posted on 2019-05-29 18:42:00
Spark & Jupyter Notebook
In [3]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sparkconf = SparkConf().setAppName("myspark")
#spark = SparkSession.builder.master("spark://localhost:7077").appName("test").getOrCreate()
spark = SparkSession.builder.config(conf=sparkconf).getOrCreate()
In [4]:
print(spark)
<pyspark.sql.session.SparkSession object at 0x7f82ba80c978>
In [12]:
!ls ./data
sample1.csv sample.csv
In [7]:
!whoami
root
In [8]:
!klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: weirenjie@FASTRETAILING-PRO.CN
Valid starting Expires Service principal
05/29/2019 10:54:45 05/30/2019 10:54:45 krbtgt/FASTRETAILING-PRO.CN@FASTRETAILING-PRO.CN
renew until 06/05/2019 10:54:45
In [9]:
!hdfs dfs -ls /
Found 22 items
drwxrwxrwx - hive supergroup 0 2018-11-29 17:54 /app
drwxr-x--x - nifi nifi 0 2018-12-18 16:17 /archive
drwx--x--x - hbase supergroup 0 2019-02-25 12:04 /data
drwxr-x--x - nifi nifi 0 2018-12-18 16:18 /etl
drwxrwxrwx - kettle hive 0 2019-01-21 11:05 /event_tracking
drwxr-x--x - kettle hive 0 2018-12-13 10:57 /event_tracking_hk
drwxr-x--x - kettle hive 0 2018-10-24 14:19 /ftp
drwxr-x--x - kettle hive 0 2018-11-22 13:38 /ftp_hk
drwx------ - hbase hbase 0 2019-03-14 19:28 /hbase
drwxrwxrwx - insight supergroup 0 2018-12-17 14:21 /home
drwxr-x--x - kettle hive 0 2019-01-21 11:06 /kafkadata
drwxr-x--x - kettle hive 0 2018-12-13 10:57 /kafkadata_hk
drwxrwxrwx - nifi nifi 0 2018-12-18 16:18 /model.db
drwxrwxrwx - hdfs supergroup 0 2019-05-15 22:55 /system
drwxrwxrwt - hdfs supergroup 0 2019-04-01 14:01 /tmp
drwxr-x--x - uat-ecbi hive 0 2019-01-10 13:51 /uat_event_tracking
drwxr-x--x - uat-ecbi hive 0 2019-01-18 14:12 /uat_event_tracking_hk
drwxr-x--x - uat-ecbi hive 0 2019-01-08 17:19 /uat_ftp
drwxr-x--x - uat-ecbi hive 0 2019-01-18 14:13 /uat_ftp_hk
drwxr-x--x - uat-ecbi hive 0 2019-01-10 13:41 /uat_kafkadata
drwxr-x--x - uat-ecbi hive 0 2019-01-18 14:13 /uat_kafkadata_hk
drwxr-xr-x - hdfs supergroup 0 2019-05-27 10:56 /user
In [28]:
from pyspark.sql.types import *
inputPath = "./data/"
# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
csvSchema = StructType([ StructField("plu", StringType(), True), StructField("sum_cd", IntegerType(), True), StructField("price", IntegerType(), True), StructField("gdept", StringType(), True) ])
# Static DataFrame representing data in the JSON files
staticInputDF = (
spark
.read
.format('csv')
.schema(csvSchema)
.option('header','false')
.load(inputPath)
)
staticInputDF.show()
+-------------+-------+-----+-----+
| plu| sum_cd|price|gdept|
+-------------+-------+-----+-----+
|2000011896645|5070756| 99| 24|
|2000011896645|3815352| 199| 25|
|2000011896645|4542009| 99| 24|
|2000011896645|5454821| 39| 37|
|2000011896645|5507537| 259| 23|
|2000011896645|5507538| 259| 23|
|2000011896645|5507540| 259| 23|
|2000011896645|5454813| 39| 37|
|2000011896645|3815606| 299| 25|
|2000011896645|4245109| 199| 24|
|2000011896645|5070770| 99| 24|
|2000011896645|4245107| 199| 24|
|2000011896645|4031104| 199| 24|
|2000011896645|5126211| 39| 34|
|2000011896645|4031094| 199| 24|
|2000011896645|4245106| 199| 24|
|2000011896645|5070765| 99| 24|
|2000011896645|4245104| 199| 24|
|2000011896645|5070754| 99| 24|
|2000011896645|4456272| 99| 34|
+-------------+-------+-----+-----+
only showing top 20 rows
In [30]:
staticInputDF.createOrReplaceTempView("static_counts")
In [33]:
#%sql select * from static_counts
dataFrame =spark.sql("Select count(*) from static_counts")
dataFrame.show()
+--------+
|count(1)|
+--------+
| 649998|
+--------+