### spusteni Sparku
# pyspark --num-executors 2 --executor-memory 1500M --master yarn

# potlaceni vypisu INFO logu: sc.setLogLevel("WARN")
# defaultne uz je tak nastavene

##### vypocet statu s nejvyssi prumernou teplotou v lete pomoci Spark RDD ----------
###  pomocne funkce
# transformace dat z typu list na strukturu (stat, (teplota, 1))
def uprav_radek(rlist):
    stat = rlist[9]
    tepl = (int(rlist[4])/10.0 - 32)*5/9
    return (stat, (tepl, 1))

# agregace paru (teplota, 1) na (soucet, pocet)
def soucty(a, b):
    soucetA = a[0]
    soucetB = b[0]
    pocetA = a[1]
    pocetB = b[1]
    return (soucetA + soucetB, pocetA + pocetB)

# cteni teplot ze souboru
teploty_raw = sc.textFile('/user/pascepet/data/teplota')

# radek se prevede na typ list
teploty1 = teploty_raw.map(lambda r: r.split(',')) 

# ponechaji se jen prvky RDD (radky) s mesicem 6-8 a neprazdnym udajem o teplote
teploty2 = teploty1.filter(lambda rlist: (rlist[1] in set('678')) & (rlist[4] != ''))

# z prvku RDD typu list se vytahnout jen potrebna data
teploty3 = teploty2.map(uprav_radek)

# agregace po statech
teploty_staty = teploty3.reduceByKey(soucty)

# vypocet prumeru pro stat, tj. transformace (stat, (soucet, pocet)) -> (stat, prumer)
teploty_staty2 = teploty_staty.map(lambda x: (x[0], x[1][0]/x[1][1]))

# serazeni
teploty_staty3 = teploty_staty2.sortBy(lambda y: y[1], ascending=False)

# vypise poradi statu
teploty_staty3.take(1)
teploty_staty3.collect()
####################################
    

##### tri zpusoby nacteni dat jako DataFrame -------------------------------
### 1. nacteni primo z CSV
# u starsiho Sparku (1.6) je treba spustit Spark s externi knihovnou na cteni CSV
# pyspark --num-executors 2 --executor-memory 1500M --packages com.databricks:spark-csv_2.10:1.5.0 --master yarn
teploty_DF1 = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .load("/user/pascepet/data/teplota")


### 2. nacteni z Hive (existuje-li tabulka)
teploty_DF2 = sqlContext.sql('select * from fel_bigdata.teplota')

### 3. prevod z existujiciho RDD
from pyspark.sql import Row
import re

def uprav_radek_df_row(r):
    rlist = r.split(',')
    return Row(stanice=rlist[0], mesic=int(rlist[1]), den=int(rlist[2]), hodina=int(rlist[3]), \
        teplota=None if rlist[4]=='' else float(rlist[4]), flag=rlist[5], latitude=float(rlist[6]), \
        longitude=float(rlist[7]), vyska=float(rlist[8]), stat=rlist[9], nazev=rlist[10])

# nacte RDD ze souboru
teploty_raw = sc.textFile('/user/pascepet/data/teplota')
teploty_prep = teploty_raw.filter(lambda line: not(re.match(r'stanice', line))) \
    .map(uprav_radek_df_row)
teploty_DF3 = sqlContext.createDataFrame(teploty_prep)


##### prace s DataFrame: vypocet statu s nejvyssi teplotou pomoci Spark SQL -----
### 1. registrace DataFrame jako docasne tabulky
teploty_DF1.registerTempTable("tpDF")

# agregace pomoci SQL dotazu
teploty_DF1_prum = sqlContext.sql("""select stat, avg((teplota/10.0-32)*5/9) as tepl_prum from tpDF
where mesic in (6,7,8)
group by stat order by tepl_prum desc""")
teploty_DF1_prum.show(10)


### 2. pomoci pseudo-SQL operaci
# ponechani jen neprazdnych dat za letni mesice
teploty_DF_upr = teploty_DF1.filter((teploty_DF1['mesic']>=6) & (teploty_DF1['mesic']<=8)) \
    .select('stat','teplota').dropna()
# pracujeme s originalnimi daty -> prepocet teploty na stupne Celsia
teploty_DF_upr = teploty_DF_upr.withColumn('teplota', (teploty_DF_upr['teplota']/10.0 - 32) * 5/9)
# agregace - prumery podle statu
teploty_statyDF = teploty_DF_upr.groupBy('stat').avg('teplota') \
    .toDF('stat', 'tepl_prum')
# serazeni podle prumerne teploty
teploty_statyDF = teploty_statyDF.orderBy(teploty_statyDF['tepl_prum'].desc())
# vypise poradi prvnich deseti statu
teploty_statyDF.show(10)

#############################