import findspark
findspark.init()
In [5]:
import pyspark
import random
In [3]:
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):
#
x, y = random.random(), random.random()
return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()
In [6]:
sc = pyspark.SparkContext(appName="Daily_Show_Test1")
print(sc) #
In [5]:
## Dhankar >> PySpark initiated with FindSpark
# SparkContext started
# Got CSV from
# Converted CSV to TSV
# Now importing data from TSV
# raw_d == the SPARK RDD Object
# Print out top 15 Rows
raw_d = sc.textFile("dsT1.tsv")
#
# In the above line of Code - actual Loading of CSV in RDD is Not yet Done
# Its Done LAZILY - "as and when ABOSULUTELY required" as below -
#
In [6]:
raw_d.take(15)
Out[6]:
In [7]:
# Using a 'map' function operate on all elements within a RDD object
#
daily_show = raw_d.map(lambda line: line.split('\t'))
daily_show.take(5)
Out[7]:
In [8]:
daily_show = raw_d.map(lambda line: line.split('99'))
daily_show.take(10)
Out[8]:
In [9]:
print(type(daily_show)) ## <class 'pyspark.rdd.PipelinedRDD'>
In [10]:
#reduceByKey() ##
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
#
print(tally) #PythonRDD[9] at RDD at PythonRDD.scala:53
In [29]:
## Explanation ##
#%time bigger_chunk = raw_d.take(5000)
# 1st RUN
# CPU times: user 8 ms, sys: 8 ms, total: 16 ms
# Wall time: 274 ms
#%time bigger_chunk = raw_d.take(50000)
# 2nd RUN
# CPU times: user 12 ms, sys: 8 ms, total: 20 ms
# Wall time: 299 ms
#%time tally.take(tally.count())
"""
CPU times: user 56 ms, sys: 0 ns, total: 56 ms
Wall time: 503 ms
"""
Out[29]:
In [7]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib notebook
dates = pd.date_range('20190101', periods=30)
print(type(dates))
print(" "*90)
print(dates)
In [8]:
df_dts = pd.DataFrame(np.random.randn(30,10), index=dates, columns=list('ABCDEFGHIJ'))
print(df_dts)
In [9]:
my_dict = {'A':199.,
#'B':pd.Timestamp('20190101'),
'B':pd.date_range('20190101', periods=40),
'C':pd.Series(1,index=list(range(40)),dtype='float32'),
'D':np.array([100]*40,dtype='int32'),
'D_a':np.array([1000]*40,dtype='float64'),
#'E':pd.Categorical(["Ytest","Ytrain","Xtest","Xtrain"]), # 4 hardcoded strings
'E':np.array(["my_STR"]*40,dtype='str'),
'F':'foo_bar'}
df2 = pd.DataFrame(my_dict)
print(df2.head(5))
print(df2.tail(5))
## my_dict_from_dict_comprehension = {x: x**2 for x in (2, 4, 6)}
In [36]:
my_dict = {'A':199.,
#'B':pd.Timestamp('20190101'),
'B':pd.date_range('20190101', periods=40),
'C':pd.Series(1,index=list(range(40)),dtype='float32'),
'D':np.array([100]*40,dtype='int32'), ## SPARK Dtype == long (nullable = true)
'D_a':np.array([1000]*40,dtype='float64'), ## SPARK Dtype == double (nullable = true)
'D_b':np.random.uniform(low=0, high=40, size=(40,)), ## DEFAULT SPARK Dtype == double (nullable = true)
'D_c':np.random.uniform(low=-100, high=100, size=(40,)), ## DEFAULT SPARK Dtype == double (nullable = true)
#'E':pd.Categorical(["Ytest","Ytrain","Xtest","Xtrain"]), # 4 hardcoded strings
'E':np.array(["my_STR"]*40,dtype='str'),
'F':'foo_bar'}
df2 = pd.DataFrame(my_dict)
print(df2.head(5))
print(df2.tail(5))
In [11]:
from collections import OrderedDict
## https://docs.python.org/2/library/collections.html#ordereddict-objects
## https://docs.python.org/2/library/collections.html#ordereddict-examples-and-recipes
my_dict_from_dict_comprehension = OrderedDict({x: x**5 for x in (1, 5, 10)}).__class__
my_ORDERED_dict_from_dict_comprehension = OrderedDict({x: x**5 for x in (1, 5, 10)})
my_dict_from_dict_comprehension1 = {x: x*5 for x in (1, 5, 10 , 20 , 33)}
my_dict_from_dict_comprehension_class = {x: x*5 for x in (1, 5, 10 , 20 , 33)}.__class__
#
print(my_dict_from_dict_comprehension) ## Doesnt give an ORDERED DICT ?
print(" "*90)
print(my_dict_from_dict_comprehension1) ## Doesnt give an ORDERED DICT ?
print(" "*90)
print(my_dict_from_dict_comprehension_class) #.__class__ ## Chained above with DOT Notation
print(" "*90)
print(my_ORDERED_dict_from_dict_comprehension) # OrderedDict([(1, 1), (10, 100000), (5, 3125)])
In [37]:
print("The DATA Types ---\n\n", df2.dtypes)
In [38]:
df2.columns
Out[38]:
In [39]:
print(df2.values[2]) ## Numpy Array of "values" - print the 2nd RECORD
#
print(" "*90)
print(df2.values[3]) ## Numpy Array of "values" - print the 3rd RECORD
#
print(" "*90)
print(df2.values[3:5]) ## Numpy Array of "values" - print the 3rd RECORD
In [40]:
print(df2.describe)
In [41]:
%time print(df2.T) # Transpose
In [42]:
%time print(df2.sort_values(by='B',ascending=False)) ## SORTS - in Descending order of DATES in COL - B
In [43]:
## Create a SPARK DF from Pandas DF
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
%time df2_spark = spark.createDataFrame(df2)
In [44]:
print(type(df2_spark)) ## <class 'pyspark.sql.dataframe.DataFrame'>
#
print(" "*90)
#
print(df2_spark.show())
In [45]:
df2_spark.printSchema() ## DATA TYPES stored in SPARK
In [46]:
## SELECT Columns
spark_col_names = ['A','C','D_a','D_b']
df2_spark.select(spark_col_names).show(3)
In [25]:
## FILTER same as the WHERE Clause from SQL
df2_spark.filter(df2_spark.B == '2019-01-06 00:00:00').show()
## SHOWS only ONE ROW where we have the TIMESTAMP - in COLUMN B == 2019-01-06 00:00:00
In [26]:
df2_spark.filter(df2_spark.B == '2019-01-06').show() ## PARTIAL String Match within Column B ??
In [28]:
df2_spark.filter(df2_spark.B == '2019-01').show() ## Not OK ## PARTIAL String Match within Column B ??
In [47]:
df2_spark.filter("B == '2019-01'").show() # Note the DOUBLE QUOTES
In [49]:
## Data Bricks --- SparkML --- Orange telecom CHURN Prediction data
## SOURCE -- https://github.com/databricks/spark-csv#python-api
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
CV_data = sqlContext.read.load('./churn-bigml-80.csv',
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
In [50]:
print(type(CV_data))
In [52]:
CV_data.show(3)
In [53]:
CV_data.columns
Out[53]:
In [54]:
CV_data.printSchema()
In [56]:
## SELECT Columns --- after seeing them with --- .printSchema()
spark_col_names = ['International plan','State','Customer service calls','Churn']
CV_data.select(spark_col_names).show(10)
In [ ]:
##
spark_col_names = ['International plan','State','Customer service calls','Churn']
CV_data.select(spark_col_names).show(10)
In [59]:
spark_col_names = ['International plan','State','Customer service calls','Churn']
CV_data.select(spark_col_names).filter("Churn == 'true'").show(30) #
# DOT Notation Chained --- SELECT and FILTER
#Note the DOUBLE QUOTES
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: