====== 4. InfluxDB a časové řady ====== ===== Cíle cvičení ===== * Seznámení s InfluxDB, databází pro ukládání a správu časových řad. * Možnosti InfluxDB UI, interaktivní práce s daty * Přístup pomocí klientské aplikace v Pythonu * Analýza časových řad ===== InfluxDB ===== ==== Přístup do InfluxDB cloudu ==== - [[https://westeurope-1.azure.cloud2.influxdata.com]] - [[https://cloud2.influxdata.com/signup]] - použijte Google account (např. svázaný se školním e-mailem) - vyberte umístění (Amazon, Google, Azure) - umístění může ovlivnit dobu odezvy (umístění serverů bývá různé, každý z providerů vždy nabízí umístění v USA a v EU, např. Microsoft má servery Azure ve Virginii a Amsterodamu), na funkcionalitu by němělo mít vliv - vyplňte vhodný název společnosti, např. Czech Technical University in Prague - zaškrtněte souhlas s podmínkami - vyberte Free plán ==== Pojmy ==== V InfluxDB UI Postupně se seznámíme s: * organizace, role uživatele (owner, member), bucket * přístup do databáze pomocí tokenu ==== Line protocol ==== InfluxDB ukládá data v následujícím formátu: [,=[,=]] =[,=] [] Výše uvedená syntaxe slouží k vložení jednoho bodu do databáze. Protože InfluxDB je databáze časových řad, je zde bod analogický bodu na grafu, který reprezentuje data časové řady. Každý bod je pak popsán čtymi složkami První složka, **measurement**, je podobná názvu tabulky v SQL databázi. Měření obsahuje všechny značky, pole a časové hodnoty. Měření má smysl pojmenovat tak, aby popisovalo data v něm obsažená. Dvě složky **tag set** a **field set** jsou v řádkovém protokolu množiny, což znamená dvojici klíč-hodnota. Klíč tagu a hodnota tagu jsou v InfluxDB uloženy jako řetězce. Klíč pole je také řetězec, ale hodnota pole může být různých datových typů: řetězce, celá čísla, reálná čísla a logické hodnoty. Hodnota pole je skutečný údaj, který chceme uložit do databáze, a je vždy doprovázen časovou značkou ve formátu [[https://docs.influxdata.com/flux/v0.x/spec/types/#timestamp-format|RFC3339 UTC]]. Přesnost časové značky může být v sekundách, milisekundách, mikrosekundách nebo nanosekundách. ==== Komunikace s databází ==== S databází je možné komunikovat pomocí řádkového klienta nebo vlastním programem, vytvořeným pomocí knihovny v podporovaném programovacím jazyce. Pro naše účely využijeme jazyk Python, pro který je dostupná knihovna ''influxdb-client'', kterou nejprve nainstalujeme: pip install influxdb-client --user Pro připojení je třeba znát url databáze, název organizace a bucketu, ve kterém budou data uložena. Kromě toho je třeba mít k dispozici token pro autentifikaci, který je možné získat v InfluxDB UI. Spojení s databází se vytvoří takto: import time from datetime import datetime, timedelta from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS url = "" token = "" org = "" bucket = "" client = InfluxDBClient(url=url, token=token, org=org) Pokud je při spojení s databází problém s certifikátem, přidejte do svého kódu následující: import certifi client = InfluxDBClient(url=url, token=token, org=org,ssl_ca_cert=certifi.where()) === Zápis dat do databáze === Pro zápis je třeba vytvořit instanci ''write_api'' a vytvořit bod časové řady, který bude zapsán: write_api = client.write_api(write_options=SYNCHRONOUS) Pro vytvoření jednoho bodu meření je možné využít vestavěnou třídu ''Point''. p = Point("my_measurement").tag("location", "location1").field("temperature", 25).time(datetime.utcnow(), WritePrecision.NS) Zápis bodu do databáze proběhne následovně: write_api.write(bucket, org, p) Alternativě lze připravit data i manuálně: measurement = "my_measurement" tag_set = "location=location1" field_set = "temperature=25" date_in_nano = str(int(datetime.datetime.utcnow().timestamp() * 1000000000)) p = f"{measurement},{tag_set} {field_set} {date_in_nano}" === Získání dat z databáze === Pro získání dat je třeba nejprve vytvořit instanci ''query_api'': query_api = client.query_api() Data jsou pak dostupná buď ve stuktuře tabulek nebo ve formě datového proudu: # query using table structure tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') for table in tables: print(table) for record in table.records: print(record.values) records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)') for record in records: print(f'Temperature at {record["location"]} - {record["_value"]}') ==== Analýza dat ==== V další části cvičení se pokusíme o import reálných dat z existujícího zdroje. Použijeme data NOAA, která obsahují informace o stavu vody za časový úsek přibližně jednoho měsíce. - Vytvořte nový bucket ''noaa'' - Do editoru skriptů vložte následující kód: import "experimental/csv" csv.from(url: "https://influx-testdata.s3.amazonaws.com/noaa.csv") |> to(bucket: "noaa", org: "your-org") Pro analýzu využijeme knihovny pandas, scipy a statsmodels, které nejprve nainstalujeme: pip install pandas scipy statsmodels --user Import všech knihoven pak vypadá následovně: from datetime import datetime import numpy as np import pandas as pd import matplotlib.pyplot as plt from statsmodels.graphics.tsaplots import plot_acf from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS Nejprve je třeba získat z databáze data, např. následujícím dotazem: query = """ from(bucket: "noaa") |> range(start: 2019-08-15, stop: 2019-09-15) |> filter(fn: (r) => r["_measurement"] == "h2o_feet") |> filter(fn: (r) => r["_field"] == "water_level") |> filter(fn: (r) => r["location"] == "santa_monica") """ Pro naše účely stačí pouze hodnoty úrovně vody, které získáme třeba takto: tables = client.query_api().query(query, org=org) data = [] for table in tables: for record in table.records: data.append(record['_value']) Ze získaných dat připravíme strukturu pro knihovnu pandas: d = {'h2O_level': data} h2O_df = pd.DataFrame(data=d) h2O_df['time_step'] = range(0,len(h2O_df)) h2O_df.plot(kind='line', x='time_step', y='h2O_level') plt.show() Data vykazují značnou periodicitu, pojďmě vypočítat [[https://pandas.pydata.org/docs/reference/api/pandas.Series.autocorr.html|autokorelaci]]: shift_1 = h2O_df['h2O_level'].autocorr(lag=1) print(shift_1) ===== Data z chytré domácnosti ===== Díky aktivitě Alexandadra Gregora máme k dispozici reálná data z prostředí Home Assistant, propojeného s IndluxDB databází. Data a pěkné povídání okolo toho jsou k dipozici v [[https://gitlab.fel.cvut.cz/gregoal1/lexuv-nsi-projektik|repozitáři]]. Příkad: dotaz na atm. tlak: query = """ from(bucket: "home_assistant") |> range(start: 2023-02-12, stop: 2023-03-12) |> filter(fn: (r) => r._measurement == "hPa") |> filter(fn: (r) => r._field == "value") """