# start konzoly:
# export PYSPARK_PYTHON=python3
# pyspark --master yarn --num-executors 2 --executor-memory 4G --conf spark.ui.port=10811

from pyspark.sql import functions as F

# vychozi data
Tr = spark.sql("select * from pascepet.transakce2 and country='cze'") \ 
    .cache()
# podle zadani mela byt podminka cze=True, ale kvuli bugu Hive se udela omezeni primo na vychozi sloupec country 
# tabulka transakce2 byla vytvorena v Hive - predpoklada se, ze ma omezeni na datum podle varianty B

# 1. Kolik unikátních čerpacích stanic je v datech?
pos_cnt = Tr.select('posid').distinct().count()
print(pos_cnt)
# Výsledek: 2 866

# 2. Na jakém podílu stanic (v %) proběhla transakce někdy v prosinci pozdě večer (s číslem hodiny od 21 do 23)?
pos_pros_vecer_cnt = Tr.filter(F.month(Tr['day'])==12) \
    .filter((Tr['hour']>=21) & (Tr['hour']<=23)) \
    .select('posid').distinct().count()
print(pos_pros_vecer_cnt / pos_cnt * 100)
# Výsledek: 44.59 %

# 3. Jaký podíl stanic (v %) není oblíbenou stanicí pro žádného klienta?
pos_fav_cnt = Tr.filter(Tr['fav']==1).select('posid').distinct().count() # stanice oblibene aspon pro jednoho klienta
pos_unfav_cnt = pos_cnt - pos_fav_cnt # zbyle stanice nejsou oblibene pro zadneho klienta
print(pos_unfav_cnt / pos_cnt * 100)
# Výsledek: 7.01 % 

# 4. Pro čerpací stanici s id 7210 spočítejte průměr GPS souřadnic bydliště klientů, kteří na této stanici mají aspoň jednu platbu (každého klienta počítejte jen jednou).
# + Zjistěte, o kolik GPS jednotek (stupňů) se spočtený průměr liší od GPS souřadnic stanice.
# kazdou kombinaci klient-stanice vezmu jen jednou a pak spoctu prumery souradnic z toho, co zustane
Tr2 = Tr.filter(Tr['posid']==7210) \
    .select('clid', 'pos_gps_lat', 'pos_gps_lon', 'cl_gps_lat', 'cl_gps_lon').distinct() \
    .groupBy().agg({'pos_gps_lat': 'avg', 'pos_gps_lon': 'avg', 'cl_gps_lat': 'avg', 'cl_gps_lon': 'avg'}) \
    .toDF('pos_gps_lat', 'pos_gps_lon', 'cl_gps_lat', 'cl_gps_lon')
# pridam sloupce s rozdilem
Tr2.withColumn('diff_lat', Tr2['pos_gps_lat']-Tr2['cl_gps_lat']) \
    .withColumn('diff_lon', Tr2['pos_gps_lon']-Tr2['cl_gps_lon']) \
    .show()
# Výsledek:
# +------------------+------------------+------------------+------------------+-------------------+------------------+
# |       pos_gps_lat|       pos_gps_lon|        cl_gps_lat|        cl_gps_lon|           diff_lat|          diff_lon|
# +------------------+------------------+------------------+------------------+-------------------+------------------+
# |49.791071490340656|13.647517772181079|49.736629486083984|13.330845832824707|0.05444200425667134|0.3166719393563717|
# +------------------+------------------+------------------+------------------+-------------------+------------------+

# 5. Rozdělíme transakce podle vzdálenosti stanice od bydliště do pásem po 5 km. Vyloučíme záznamy s vyšší vzdáleností než 150 km nebo s neuvedenou vzdáleností.
# + Jaký počet transakcí byl proveden v jednotlivých pásmech a ve kterém pásmu to bylo nejvíc?
# + Liší se to pro stanice v jižní části ČR (pos_gps_lat <= 49.5) a pro stanice v severní části ČR (pos_gps_lat >= 50.5)?
TrS = Tr.filter(Tr['pos_gps_lat'] >= 50.5) # pomocne DataFrames jen pro jizni a severni cast CR
TrJ = Tr.filter(Tr['pos_gps_lat'] <= 49.5)
# pasma po 5 km vzdalenosti, ponechany jen zaznamy se znamou vzdalenosti do 150 km
Tr2 = Tr.withColumn('dist5', F.floor(Tr['dist'] / 5.0)).select('dist', 'dist5').dropna().filter('dist <= 150.0')
TrS2 = TrS.withColumn('dist5', F.floor(TrS['dist'] / 5.0)).select('dist', 'dist5').dropna().filter('dist <= 150.0')
TrJ2 = TrJ.withColumn('dist5', F.floor(TrJ['dist'] / 5.0)).select('dist', 'dist5').dropna().filter('dist <= 150.0')
# cetnosti v pasmech
Tr2.groupBy('dist5').count() \
    .toDF('dist5', 'pocet')  \
    .orderBy('pocet', ascending=False) \
    .show()
# Výsledek:
# +-----+------+
# |dist5| pocet|
# +-----+------+
# |    0|292405|
# |    1|144429|
# |    2| 76643|
# |    3| 43120|
# |    4| 28552|
# |    5| 19724|
# ...
# jen severni cast CR
TrS2.groupBy('dist5').count() \
    .toDF('dist5', 'pocet')  \
    .orderBy('pocet', ascending=False) \
    .show()
# Výsledek:
# +-----+-----+
# |dist5|pocet|
# +-----+-----+
# |    0|42129|
# |    1|11807|
# |    2| 6595|
# |    3| 4012|
# |    4| 2444|
# |    5| 1433|
# ...
# jen jizni cast CR
TrJ2.groupBy('dist5').count() \
    .toDF('dist5', 'pocet')  \
    .orderBy('pocet', ascending=False) \
    .show()
# Výsledek:
# +-----+-----+
# |dist5|pocet|
# +-----+-----+
# |    0|69376|
# |    1|37401|
# |    2|19515|
# |    3|11909|
# |    4| 7891|
# |    5| 5315|
# ...

# 6. Kterých osm stanic má nejvyšší počty plateb a kterých osm nejvyšší počty unikátních klientů?
Tr.groupBy('posid').count() \
    .toDF('posid', 'pocet') \
    .orderBy('pocet', ascending=False) \
    .show(8)
# Výsledek:
# +-----+-----+
# |posid|pocet|
# +-----+-----+
# | 4585| 3267|
# | 7293| 2695|
# | 6971| 2687|
# | 5217| 2398|
# | 7329| 2368|
# | 7379| 2282|
# | 5900| 2127|
# | 6140| 2039|
# +-----+-----+
Tr.select('clid', 'posid').distinct() \
    .groupBy('posid').count() \
    .toDF('posid', 'pocet') \
    .orderBy('pocet', ascending=False) \
    .show(8)
# Výsledek:
# +-----+-----+
# |posid|pocet|
# +-----+-----+
# | 4585| 1185|
# | 7293| 1126|
# | 6971| 1076|
# | 7379| 1076|
# | 5553|  963|
# | 7329|  894|
# | 7442|  869|
# | 5217|  865|
# +-----+-----+

# 7. Zjistěte průměrnou zaplacenou částku na stanicích v obcích s názvem obsahujícím slovo „hradec“ (ve sloupci city) a porovnejte to s průměrnou částkou v obcích s názvem začínajícím na „jablo“.
# v datech se udelaji sloupce s priznakem pro Hradec (kdekoli) a pro Jablo (na zacatku)
Tr2 = Tr.withColumn('flag_hradec', Tr['city'].contains('hradec')) \
    .withColumn('flag_jablo', Tr['city'].startswith('jablo'))
# lze pouzit i obecnejsi detekci souladu s regularnim vyrazem, ale v prvnim pripade je zbytecne slozite:
# Tr2 = Tr.withColumn('flag_hradec', F.when(F.regexp_extract('city', r'(hradec)', 1)=="", False).otherwise(True)) \
#     .withColumn('flag_jablo', F.when(F.regexp_extract('city', r'^(jablo)', 1)=="", False).otherwise(True))
# vypocet prumerne ceny
Tr2.filter(Tr2['flag_hradec']) \
    .groupBy().avg('amt') \
    .show()
Tr2.filter(Tr2['flag_jablo']) \
    .groupBy().avg('amt') \
    .show()
# Výsledek:
# +-----------------+
# |         avg(amt)|
# +-----------------+
# |578.4503151147287|
# +-----------------+
# +-----------------+
# |         avg(amt)|
# +-----------------+
# |509.8340535551941|
# +-----------------+
# Závěr: průměrná částka ve stanicích obcí "Hradec" je výrazně vyšší než ve stanicích obcí začínajících "Jablo".

# 8. Jaká jsou nejčastější slova v názvech stanic (každou stanici počítejte jen jednou)? Liší se to pro stanice v jižní a v severní části ČR (viz úlohu 5)? 
# DataFrame se upravi na nazvy unikatnich stanic a konvertuje do RDD
# Vyuzijeme pomocne DataFrames z ulohy 5
Nazvy = Tr.select('posid', 'name').distinct() \
    .select('name').rdd 
NazvyS = TrS.select('posid', 'name').distinct() \
    .select('name').rdd
NazvyJ = TrJ.select('posid', 'name').distinct() \
    .select('name').rdd
# klasicky WordCount
Nazvy.flatMap(lambda x: x[0].split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a+b) \
    .sortBy(lambda r: -r[1]) \
    .take(5)
# Výsledek:
# [('cs', 1030), ('benzina', 391), ('cerpaci', 292), ('oil', 280), ('stanice', 273)]
NazvyS.flatMap(lambda x: x[0].split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a+b) \
    .sortBy(lambda r: -r[1]) \
    .take(5)
# Výsledek:
# [('cs', 125), ('benzina', 53), ('oil', 31), ('s', 25), ('cerpaci', 25)]
NazvyJ.flatMap(lambda x: x[0].split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a+b) \
    .sortBy(lambda r: -r[1]) \
    .take(5)
# Výsledek:
# [('cs', 330), ('benzina', 115), ('cerpaci', 86), ('stanice', 80), ('oil', 75)]

# 9. Spočítejte Overlap index (počet prvků průniku děleno menším z počtů prvků obou množin) mezi množinami čerpacích stanic s aspoň jednou platbou od klientů s id 18233415 a 18154208.
# vsechny stanice klienta jako list
TrA = Tr.filter('clid=18233415') \
    .select('posid').distinct() \
    .rdd.map(lambda x: x[0]) \
    .collect()
TrB = Tr.filter('clid=18154208') \
    .select('posid').distinct() \
    .rdd.map(lambda x: x[0]) \
    .collect()
TrA = set(TrA) # vyhozeni duplicitnich prvku
TrB = set(TrB)
# Overlap koeficient
print(len(TrA.intersection(TrB)) / min(len(TrA), len(TrB)))
# Výsledek: 0.08
