Warning
This page is located in archive. Go to the latest version of this course pages. Go the latest version of this page.

6. InfluxDB

Cíle cvičení

  • Seznámení s InfluxDB, databází pro ukládání a správu časových řad.
  • Možnosti InfluxDB UI, interaktivní práce s daty
  • Přístup pomocí klientské aplikace v Pythonu
  • Analýza časových řad

InfluxDB

Přístup do InfluxDB cloudu

  1. použijte Google account (např. svázaný se školním e-mailem)
  2. vyberte umístění (Amazon, Google, Azure) - umístění může ovlivnit dobu odezvy (umístění serverů bývá různé, každý z providerů vždy nabízí umístění v USA a v EU, např. Microsoft má servery Azure ve Virginii a Amsterodamu), na funkcionalitu by němělo mít vliv
  3. vyplňte vhodný název společnosti, např. Czech Technical University in Prague
  4. zaškrtněte souhlas s podmínkami
  5. vyberte Free plán

Pojmy

V InfluxDB UI Postupně se seznámíme s:

  • organizace, role uživatele (owner, member), bucket
  • přístup do databáze pomocí tokenu

Line protocol

InfluxDB ukládá data v následujícím formátu:

<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]

Výše uvedená syntaxe slouží k vložení jednoho bodu do databáze. Protože InfluxDB je databáze časových řad, je zde bod analogický bodu na grafu, který reprezentuje data časové řady. Každý bod je pak popsán čtymi složkami

První složka, measurement, je podobná názvu tabulky v SQL databázi. Měření obsahuje všechny značky, pole a časové hodnoty. Měření má smysl pojmenovat tak, aby popisovalo data v něm obsažená.

Dvě složky tag set a field set jsou v řádkovém protokolu množiny, což znamená dvojici klíč-hodnota. Klíč tagu a hodnota tagu jsou v InfluxDB uloženy jako řetězce. Klíč pole je také řetězec, ale hodnota pole může být různých datových typů: řetězce, celá čísla, reálná čísla a logické hodnoty. Hodnota pole je skutečný údaj, který chceme uložit do databáze, a je vždy doprovázen časovou značkou ve formátu RFC3339 UTC. Přesnost časové značky může být v sekundách, milisekundách, mikrosekundách nebo nanosekundách.

Komunikace s databází

S databází je možné komunikovat pomocí řádkového klienta nebo vlastním programem, vytvořeným pomocí knihovny v podporovaném programovacím jazyce. Pro naše účely využijeme jazyk Python, pro který je dostupná knihovna influxdb-client, kterou nejprve nainstalujeme:

pip install influxdb-client --user

Pro připojení je třeba znát url databáze, název organizace a bucketu, ve kterém budou data uložena. Kromě toho je třeba mít k dispozici token pro autentifikaci, který je možné získat v InfluxDB UI. Spojení s databází se vytvoří takto:

import time
from datetime import datetime, timedelta
 
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
 
url = "<my-url>"
token = "<my-token>"
org = "<my-org>"
bucket = "<my-bucket>"
 
client = InfluxDBClient(url=url, token=token, org=org)

Zápis dat do databáze

Pro zápis je třeba vytvořit instanci write_api a vytvořit bod časové řady, který bude zapsán:

write_api = client.write_api(write_options=SYNCHRONOUS)

Pro vytvoření jednoho bodu meření je možné využít vestavěnou třídu Point.

p = Point("my_measurement").tag("location", "location1").field("temperature", 25).time(datetime.utcnow(), WritePrecision.NS)

Zápis bodu do databáze proběhne následovně:

write_api.write(bucket, org, p) 

Alternativě lze připravit data i manuálně:

measurement = "my_measurement"
tag_set = "location=location1"
field_set = "temperature=25"
date_in_nano = str(int(datetime.datetime.utcnow().timestamp() * 1000000000))
p = f"{measurement},{tag_set} {field_set} {date_in_nano}" 

Získání dat z databáze

Pro získání dat je třeba nejprve vytvořit instanci query_api:

query_api = client.query_api() 

Data jsou pak dostupná buď ve stuktuře tabulek nebo ve formě datového proudu:

# query using table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables: 
    print(table) 
    for record in table.records: 
        print(record.values) 
 
records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')
for record in records: 
    print(f'Temperature at {record["location"]} - {record["_value"]}')

Analýza dat

V další části cvičení se pokusíme o import reálných dat z existujícího zdroje. Použijeme data NOAA, která obsahují informace o stavu vody za časový úsek přibližně jednoho měsíce.

  1. Vytvořte nový bucket noaa
  2. Do editoru skriptů vložte následující kód:

import "experimental/csv"
csv.from(url: "https://influx-testdata.s3.amazonaws.com/noaa.csv")
    |> to(bucket: "noaa", org: "your-org")

Pro analýzu využijeme knihovny pandas, scipy a statsmodels, které nejprve nainstalujeme:

pip install pandas scipy statsmodels --user

Import všech knihoven pak vypadá následovně:

from datetime import datetime
 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf 
 
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

Nejprve je třeba získat z databáze data, např. následujícím dotazem:

query = """
from(bucket: "noaa") 
    |> range(start: 2019-08-15, stop: 2019-09-15)
    |> filter(fn: (r) => r["_measurement"] == "h2o_feet")
    |> filter(fn: (r) => r["_field"] == "water_level")
    |> filter(fn: (r) => r["location"] == "santa_monica")
"""

Pro naše účely stačí pouze hodnoty úrovně vody, které získáme třeba takto:

tables = client.query_api().query(query, org=org)
data = []
for table in tables:
    for record in table.records:
        data.append(record['_value'])

Ze získaných dat připravíme strukturu pro knihovnu pandas:

d = {'h2O_level': data}
h2O_df = pd.DataFrame(data=d)
h2O_df['time_step'] = range(0,len(h2O_df))  
h2O_df.plot(kind='line', x='time_step', y='h2O_level') 
plt.show() 

Data vykazují značnou periodicitu, pojďmě vypočítat autokorelaci:

shift_1 = h2O_df['h2O_level'].autocorr(lag=1)  
print(shift_1)

courses/b0b37nsi/tutorials/06.txt · Last modified: 2022/03/24 14:15 by viteks