Search
V InfluxDB UI Postupně se seznámíme s:
InfluxDB ukládá data v následujícím formátu:
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
Výše uvedená syntaxe slouží k vložení jednoho bodu do databáze. Protože InfluxDB je databáze časových řad, je zde bod analogický bodu na grafu, který reprezentuje data časové řady. Každý bod je pak popsán čtymi složkami
První složka, measurement, je podobná názvu tabulky v SQL databázi. Měření obsahuje všechny značky, pole a časové hodnoty. Měření má smysl pojmenovat tak, aby popisovalo data v něm obsažená.
Dvě složky tag set a field set jsou v řádkovém protokolu množiny, což znamená dvojici klíč-hodnota. Klíč tagu a hodnota tagu jsou v InfluxDB uloženy jako řetězce. Klíč pole je také řetězec, ale hodnota pole může být různých datových typů: řetězce, celá čísla, reálná čísla a logické hodnoty. Hodnota pole je skutečný údaj, který chceme uložit do databáze, a je vždy doprovázen časovou značkou ve formátu RFC3339 UTC. Přesnost časové značky může být v sekundách, milisekundách, mikrosekundách nebo nanosekundách.
S databází je možné komunikovat pomocí řádkového klienta nebo vlastním programem, vytvořeným pomocí knihovny v podporovaném programovacím jazyce. Pro naše účely využijeme jazyk Python, pro který je dostupná knihovna influxdb-client, kterou nejprve nainstalujeme:
influxdb-client
pip install influxdb-client --user
Pro připojení je třeba znát url databáze, název organizace a bucketu, ve kterém budou data uložena. Kromě toho je třeba mít k dispozici token pro autentifikaci, který je možné získat v InfluxDB UI. Spojení s databází se vytvoří takto:
import time from datetime import datetime, timedelta from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS url = "<my-url>" token = "<my-token>" org = "<my-org>" bucket = "<my-bucket>" client = InfluxDBClient(url=url, token=token, org=org)
Pro zápis je třeba vytvořit instanci write_api a vytvořit bod časové řady, který bude zapsán:
write_api
write_api = client.write_api(write_options=SYNCHRONOUS)
Pro vytvoření jednoho bodu meření je možné využít vestavěnou třídu Point.
Point
p = Point("my_measurement").tag("location", "location1").field("temperature", 25).time(datetime.utcnow(), WritePrecision.NS)
Zápis bodu do databáze proběhne následovně:
write_api.write(bucket, org, p)
Alternativě lze připravit data i manuálně:
measurement = "my_measurement" tag_set = "location=location1" field_set = "temperature=25" date_in_nano = str(int(datetime.datetime.utcnow().timestamp() * 1000000000)) p = f"{measurement},{tag_set} {field_set} {date_in_nano}"
Pro získání dat je třeba nejprve vytvořit instanci query_api:
query_api
query_api = client.query_api()
Data jsou pak dostupná buď ve stuktuře tabulek nebo ve formě datového proudu:
# query using table structure tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') for table in tables: print(table) for record in table.records: print(record.values) records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)') for record in records: print(f'Temperature at {record["location"]} - {record["_value"]}')
V další části cvičení se pokusíme o import reálných dat z existujícího zdroje. Použijeme data NOAA, která obsahují informace o stavu vody za časový úsek přibližně jednoho měsíce.
noaa
import "experimental/csv" csv.from(url: "https://influx-testdata.s3.amazonaws.com/noaa.csv") |> to(bucket: "noaa", org: "your-org")
Pro analýzu využijeme knihovny pandas, scipy a statsmodels, které nejprve nainstalujeme:
pip install pandas scipy statsmodels --user
Import všech knihoven pak vypadá následovně:
from datetime import datetime import numpy as np import pandas as pd import matplotlib.pyplot as plt from statsmodels.graphics.tsaplots import plot_acf from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS
Nejprve je třeba získat z databáze data, např. následujícím dotazem:
query = """ from(bucket: "noaa") |> range(start: 2019-08-15, stop: 2019-09-15) |> filter(fn: (r) => r["_measurement"] == "h2o_feet") |> filter(fn: (r) => r["_field"] == "water_level") |> filter(fn: (r) => r["location"] == "santa_monica") """
Pro naše účely stačí pouze hodnoty úrovně vody, které získáme třeba takto:
tables = client.query_api().query(query, org=org) data = [] for table in tables: for record in table.records: data.append(record['_value'])
Ze získaných dat připravíme strukturu pro knihovnu pandas:
d = {'h2O_level': data} h2O_df = pd.DataFrame(data=d) h2O_df['time_step'] = range(0,len(h2O_df)) h2O_df.plot(kind='line', x='time_step', y='h2O_level') plt.show()
Data vykazují značnou periodicitu, pojďmě vypočítat autokorelaci:
shift_1 = h2O_df['h2O_level'].autocorr(lag=1) print(shift_1)