9. Databáze - InfluxDB

Cíle cvičení

  • Seznámit se s časovými řadami, InfluxDB a jeho data modelem
  • Vytvořit uPython aplikaci pro odesílání dat do InfluxDB
  • Seznámit se s balíčkem Pandas v kontextu zpracování dat z InfluxDB

Důležité odkazy

Databáze časových řad

Time Series Databáze (TSDB) jsou speciálně navrženy pro ukládání a analýzu časových řad, což jsou data, která jsou časově seřazena a obvykle obsahují časové razítko. Tento typ databáze je důležitý pro mnoho aplikací, které vyžadují sledování a analýzu časových trendů, jako jsou finanční trhy, meteorologické údaje, sledování provozu, a mnoho dalších.

Základní principy časových řad

Časové řady lze definovat jako sekvence datových bodů indexovaných časovými razítky. Každý datový bod v časové řadě je obvykle reprezentován jako dvojice hodnot: hodnota proměnné a časové razítko, které určuje, kdy byla hodnota zaznamená. Tento koncept je základem pro analýzu a modelování časových trendů a cyklů.

Časové řady mají několik vlastností, které je činí unikátními pro analýzu a ukládání dat:

  • Data jsou časově seřazena a často zahrnují časové razítko, které určuje, kdy byla hodnota zaznamená.
  • Na rozdíl od diskrétních datových typů, jako jsou tabulky nebo grafy, časové řady pokrývají kontinuální časovou osu. To znamená, že data mohou být zaznamenána v jakémkoli časovém intervalu.
  • Časové řady často obsahují sezónní vzorce a trendy, které mohou být zjištěny a analyzovány pomocí statistických a matematických modelů.

Vlastnosti a výhody

TSDB nabízejí řadu výhod oproti tradičním relačním databázím (RDBMS) a NoSQL databázím, které jsou běžně používány pro ukládání a správu dat. TSDB jsou speciálně navrženy pro efektivní ukládání, správu a analýzu časových řad.

  • TSDB jsou optimalizovány pro rychlé čtení a zápis dat, což je důležité pro aplikace, které vyžadují analýzu velkých objemů časových dat v reálném čase.
  • Díky své schopnosti efektivně ukládat a zpracovávat velké objemy časových dat, TSDB umožňují efektivní správu dat, což snižuje náklady na úložiště a zvyšuje rychlost zpracování dat.
  • Na rozdíl od jiných typů databází, které mohou vyžadovat dodatečné nástroje nebo komplikované řešení pro práci s časovými řadami, TSDB jsou navrženy tak, aby přímo podporovaly časové řady, což zjednodušuje jejich implementaci a správu.
  • TSDB jsou navrženy tak, aby mohly efektivně škálovat s rostoucím objemem dat.

TSDB jsou preferovány pro aplikace, které vyžadují sledování a analýzu časových trendů a cyklů. Mezi tyto aplikace patří:

  • Finanční trhy: Pro sledování cenových trendů, analýzu volatility a provádění algoritmického obchodování.
  • Meteorologické údaje: Pro sledování a předpověď počasí, teploty a dalších meteorologických parametrů.
  • Sledování provozu: Pro monitorování výkonu a stavu průmyslových zařízení, infrastruktury a sítí v reálném čase.
  • IoT aplikace: Pro sběr a analýzu dat z IoT zařízení, jako jsou senzory, chytré domy a průmyslové sítě.

InfluxDB

InfluxDB (https://www.influxdata.com/) je open-source TSDB vyvinutá společností InfluxData. Byla navržena speciálně pro efektivní ukládání, správu a analýzu časových řad. InfluxDB byla poprvé představena v roce 2013 a od té doby se stala jednou z nejpopulárnějších TSD na trhu.

InfluxDB je preferována pro práci s časovými řadami z několika důvodů:

  • InfluxDB je speciálně navržena pro časové řady, což ji činí ideální volbou pro aplikace, které vyžadují sledování a analýzu časových trendů.
  • Díky své architektuře je InfluxDB schopna efektivně zpracovávat velké objemy časových dat s nízkou latencí a umožňuje horizontální škálování pro zvládnutí rostoucích objemů dat.
  • Poskytuje pokročilé funkce pro práci s časovými řadami, což umožňuje hlubší a složitější analýzy dat.
  • Umožňuje uživatelům definovat vlastní schémata a indexy pro efektivní správu dat a poskytuje silné bezpečnostní funkce pro ochranu dat a přístupů.

InfluxDB Alternativy

Na trhu TSDB existuje několik hlavních konkurentů InfluxDB. Mezi nejznámější patří:

OpenTSDB (opentsdb.net)
Open-source TSDB navržená pro ukládání, indexování a poskytování přístupu k velkým objemům časových dat. Je známá svou schopností efektivně ukládat a zpracovávat velké objemy dat, ale může být méně flexibilní v porovnání s InfluxDB.

TimescaleDB (timescale.com)
Rozšíření PostgreSQL, které přidává podporu pro časové řady. Je známá svou silnou podporou SQL a schopností integrovat s existujícími PostgreSQL aplikacemi, ale může být méně optimalizována pro velmi vysoké objemy časových dat.

Prometheus (prometheus.io)
Open-source systém pro monitorování a upozorňování, který je speciálně navržený pro sběr a ukládání časových dat z mnoha zdrojů. Je známý svou jednoduchostí a schopností efektivně monitorovat a upozorňovat na stav systémů a aplikací, ale může být méně flexibilní pro složitější analytické úlohy.

Graphite (graphiteapp.org)
Starší a méně flexibilní TSD, která je stále používána pro sledování a ukládání časových dat. Má silnou podporu pro grafy a vizualizace, ale může být méně optimalizována pro velmi vysoké objemy dat.

Při výběru mezi InfluxDB a jeho alternativami lze zvážit následující faktory:

  • Výkon a škálovatelnost: Jak dobře databáze zvládá velké objemy časových dat a jak dobře může škálovat s rostoucím objemem dat.
  • Flexibilita a podpora pro časové řady: Jak dobře databáze podporuje časové řady a jaké jsou její pokročilé analytické funkce.
  • Bezpečnost a správa dat: Jak dobře databáze chrání data a jaké jsou její bezpečnostní funkce.
  • Podpora a komunita: Jak silná je podpora a komunita kolem databáze a jak dobře je databáze aktualizována a udržována.
  • Cena a licencování: Jaké jsou náklady na použití databáze a jaké jsou její licenční požadavky.

InfluxDB vs. SQL DB

Základní funkce, jako je vytváření databází, tabulek a dotazování, se liší mezi InfluxDB a SQL databázemi kvůli jejich odlišným architekturám a účelům.

Databáze X Organizace, Buckety

V SQL databázích je databáze primární jednotkou pro organizaci dat.

V InfluxDB jsou databáze nahrazeny konceptem “organizací”, které slouží jako kontejnery pro “buckety”, což jsou fyzické úložiště dat.

Tabulky X Series

SQL databáze používají tabulky pro ukládání dat. Každá tabulka má pevně definované schéma, které určuje strukturu dat.

Schémata X Tagy, Fields

SQL databáze používají schémata k definování struktury tabulek. InfluxDB naopak používá “tags” a “fields” k popisu dat v series. “Tags” jsou klíčové dvojice použité pro indexování a filtrování dat, zatímco “fields” obsahují skutečná měřená data.

InfluxDB naopak používá “series”, které jsou dynamické a mohou obsahovat různé typy dat.

Dotazování - SQL X Flux

SQL databáze používají SQL (Structured Query Language) pro dotazování dat. SQL je silně typovaný a podporuje složité dotazy a transakce.

InfluxDB používá Flux, což je nový, funkční dotazovací jazyk navržený pro práci s časovými řadami.

Rozdíly v architektuře

Architektura InfluxDB a SQL databází se také liší, což ovlivňuje, jak ukládají a zpracovávají data.

InfluxDB je navržena tak, aby byla optimalizována pro časové řady. Její architektura je založena na shardingu dat, což umožňuje horizontální škálování a efektivní zpracování velkých objemů dat.

SQL databáze mají tradiční architekturu založenou na tabulkách a relacích. SQL databáze jsou optimalizovány pro transakční zpracování a podporují dotazy a transakce. Jejich architektura je založena na indexování a normalizaci dat.

Terminologie InfluxDB

Bucket

V InfluxDB je “bucket” fyzické úložiště dat, které může obsahovat data z různých “series”. Buckety jsou organizovány do “organizací”, což jsou kontejnery pro buckety.

Series

V InfluxDB je “series” sekvence datových bodů, které mají stejné measurements a tagy. Series jsou základní jednotkou pro ukládání a dotazování dat v InfluxDB.

Tags

V InfluxDB je “tag” klíčová dvojice použitá pro indexování a filtrování dat. Tags umožňují rychlé a efektivní dotazování dat na základě jejich atributů.

Fields

V InfluxDB je “field” skutečná měřená data uložená v series. Každý datový bod v series obsahuje jedno nebo více fields.

Meassurements

V InfluxDB je “measurement” název pro typ dat, které jsou uloženy v series. Measurements definují, jak jsou data organizována a jak mohou být dotazována.

measurement tag key tag key field key field key
airSensor sensorID stationID humidityValue temperatureValue
waterQualitySensor sensorID stationID pHValue temperatureValue

airSensor,sensorId=A0100,station=Harbor humidityValue=35.0658,temperatureValue=21.667 1636729543000000000
waterQualitySensor,sensorId=W0101,station=Harbor pHValue=6.1,temperatureValue=16.103 1472515200000000000

Instalace a konfigurace InfluxDB

Cloud

InfluxDB Cloud

Instalace InfluxDB v cloudu umožňuje uživatelům využít cloudové služby pro snadnou a rychlou instalaci a správu databáze. Pro instalaci InfluxDB v cloudu je možné využít služby poskytované společností InfluxData, jako je InfluxDB Cloud. Tato služba nabízí plánované a automatizované zálohy, škálování a aktualizace. Uživatelé mohou také využít InfluxDB 2.0, nejnovější verzi InfluxDB, která přináší řadu vylepšení a nových funkcí.

Cloudová instalace InfluxDB je nejvhodnější v situacích, kdy:

  • Není potřeba plná kontrola nad databází
  • Je potřeba rychlé nasazení
  • Je důležitá škálovatelnost a dostupnost

Vlastní instalace

Dokumentace

Instalace InfluxDB na vlastní server umožňuje uživatelům plnou kontrolu nad konfigurací a nastavením databáze.

Vlastní instalace InfluxDB je vhodná v situacích, kdy:

  • Je potřeba plná kontrola nad databází
  • Je důležitý výkon a optimalizace
  • Je potřeba integrace s existujícími systémy

Docker

Docker Hub

Docker umožňuje snadnou a rychlou instalaci InfluxDB tím, že poskytuje kontejnerizované řešení, které zjednodušuje proces instalace a správy databáze.

Docker je vhodný pro instalaci InfluxDB v situacích, kdy:

  • Je potřeba izolace a reproducibilita
  • Je potřeba rychlé nasazení pro vývoj a testování
  • Je důležité snadné nasazení a správa

Bezpečnost - API Tokeny

API tokeny jsou používány pro ověření a autorizaci přístupu k databázi a její API. API tokeny umožňují definovat, kteří uživatelé mají přístup k jakým datům a jaké operace mohou provádět. Tímto způsobem je možné efektivně řídit přístup k datům a zajistit, že pouze autorizovaní uživatelé mohou provádět určité operace.

InfluxDB Docs - Manage API Tokens

Vytvoření tokenu

1. Přihlášení do InfluxDB GUI

Otevřete webové rozhraní InfluxDB a přihlaste se pomocí svých přihlašovacích údajů.

2. Přejděte do sekce API Tokens

Postranní lišta → Load Data → API Tokens

3. Vytvoření nového API tokenu

+ GENERATE API TOKEN → All Access API Token → Zadejte popis tokenu API → SAVE.

TBD: Custom API Token

InfluxDB API - Odesílání dat z Raspberry Pi

MicroPython nemá přímo integrovaný balíček pro vytvoření InfluxDB klienta. Nicméně, je možné využít modul urequests pro odesílání HTTP požadavků, což nám umožní komunikovat s InfluxDB API.

InfluxDB Docs - Write data with the InfluxDB API

import network
from time import sleep, time
import json
import machine
import random
import urequests
import gc
 
PERIOD               = 10     # Perioda odesilani dat 
last_temp            = 23.5   # Pocatecni hodnota simulovanych dat
 
SENSOR_ID            = "rpipw-01"
STUDENT_ID           = "cvut-login"    
 
INFLUXDB_API_TOKEN   = "api-token"
INFLUXDB_SERVER      = "influx-server"
INFLUXDB_ORG         = "influx-organization"
INFLUXDB_BUCKET      = "influx-bucket"
INFLUXDB_MEASUREMENT = "influx-measurement" 
 
INTERNET_SSID        = "wifi-ap-ssid"
INTERNET_PASSWORD    = "wifi-ap-password"
 
 
def connect_to_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(INTERNET_SSID, INTERNET_PASSWORD)
    while wlan.isconnected() == False:
        print('Waiting for WiFi connection...')
        sleep(1)
    print('Connected to WiFi:', wlan.ifconfig())
    return wlan
 
def send_data_to_influxdb(temperature):
    url = f"http://{INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns"
    headers = {
        'Authorization': f'Token {INFLUXDB_API_TOKEN}',
        'Content-Type': 'text/plain; charset=utf-8',
        'Accept': 'application/json'
    }
    data = f"{INFLUXDB_MEASUREMENT},sensor_id={SENSOR_ID},student_id={STUDENT_ID} temperature={temperature}"
    try:
        response = urequests.post(url, headers=headers, data=data)
        if response.status_code == 204:
            print('Data successfully sent to InfluxDB! T = ', temperature, ' °C')
            gc.collect()
            return True
        else:
            print('Failed to send data to InfluxDB.')
    except Exception as e:
        print('Error sending data to InfluxDB:', e)
    return False
 
try:
    wlan = connect_to_wifi()
    while True:
        new_temp = last_temp + random.uniform(-0.5, 0.5)
        if new_temp < 15:
            new_temp = 15
        elif new_temp > 30:
            new_temp = 30
        last_temp = new_temp
        send_data_to_influxdb(new_temp)
        sleep(PERIOD)
except KeyboardInterrupt:
    machine.reset()

HTTP požadavek odeslaný z Raspberry Pi do InfluxDB má následující strukturu.

Metoda je POST', což znamená, že požadavek je určen k odeslání dat na server.

URL adresa požadavku je {INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns. Tato URL obsahuje adresu (INFLUXDB_SERVER) a port (8086) serveru InfluxDB, cestu k API endpointu pro zápis dat (/api/v2/write), a parametry specifikující organizaci (org={INFLUXDB_ORG}), bucket (bucket={INFLUXDB_BUCKET}) a přesnost času pro data (precision=ns).

Autorizační token je poskytnut v hlavičce Authorization, což umožňuje ověření požadavku. 'Authorization': f'Token {INFLUXDB_API_TOKEN}'

Typ obsahu požadavku je určen jako text/plain s kódováním UTF-8, a je specifikován v hlavičce Content-Type. 'Content-Type': 'text/plain; charset=utf-8'

Očekává se odpověď ve formátu JSON, což je specifikováno v hlavičce Accept. 'Accept': 'application/json'

Tělo požadavku obsahuje data ve formátu Line Protocol, které jsou formátovány jako řetězec s názvem měření, identifikátory senzoru a studenta, a hodnotou teploty.

Zpracování dat z InfluxDB pomocí Pandas

Pandas (pandas.pydata.org)

Pandas je široce používaná knihovna v Pythonu, která poskytuje flexibilní datové struktury a nástroje pro manipulaci s daty. Umožňuje snadno manipulovat s velkými datovými sadami, provádět statistické analýzy, a efektivně ukládat a načítat data z různých zdrojů.

Pandas poskytuje datový typ **DataFrame**, který je ideální pro práci s tabulkovými daty. DataFrame je dvourozměrná datová struktura, která může obsahovat data různých typů (čísla, řetězce, datumy atd.) a umožňuje snadno provádět operace jako filtrování, agregaci, a transformaci dat.

Pandas také poskytuje řadu funkcí pro čtení a zápis dat z různých formátů a zdrojů, včetně CSV souborů, Excel souborů, SQL databází, a z webových API.

InfluxDBClient (influxdb-client.readthedocs.io)

InfluxDBClient je knihovna, která poskytuje API pro práci s InfluxDB v Pythonu. Umožňuje snadno odesílat a načítat data z InfluxDB, provádět dotazy na data pomocí Flux nebo InfluxQL, a spravovat databáze a uživatele v InfluxDB.

InfluxDBClient poskytuje třídu InfluxDBClient, která je hlavní třídou pro interakci s InfluxDB. Tato třída umožňuje vytvoření instance klienta, která je pak použita pro provádění operací s databází. InfluxDBClient také poskytuje další třídy a nástroje pro práci s daty v InfluxDB, včetně QueryAPI pro provádění dotazů a WriteAPI pro zápis dat.

import pandas as pd
from influxdb_client import InfluxDBClient
 
INFLUXDB_URL    = 'http://server:port'
INFLUXDB_TOKEN  = 'influx-api-token'
INFLUXDB_ORG    = 'influx-organization'
INFLUXDB_BUCKET = 'influx-bucket'
 
STUDENT_ID      = 'cvut-login'
MINUTES         = 60
 
# Flux query
flux_query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
  |> range(start: -{MINUTES}m)
  |> filter(fn: (r) => r._measurement == "102" and r.student_id == "{STUDENT_ID}")
"""
 
# Připojení k InfluxDB
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
 
# Spuštění Flux query
result = query_api.query(flux_query)
 
# Zpracování výsledků
data = []
for table in result:
    for record in table.records:
        data.append({
            "time": record.get_time(),
            "measurement": record.get_measurement(),
            "student_id": record.values.get('student_id', ''),
            "sensor_id": record.values.get('sensor_id', ''),
            "temperature": record.get_value()
        })
 
# Převod na DataFrame
df = pd.DataFrame(data)
 
# Statistika
print("Začátek:          ", df['time'].min())
print("Konec:            ", df['time'].max())
print("Minimální teplota:", df['temperature'].min())
print("Maximální teplota:", df['temperature'].max())
print("Průměrná teplota: ", df['temperature'].mean())
print("Počet záznamů:    ", df['temperature'].count())
 
if len(df) > 1:
    df['time'] = pd.to_datetime(df['time'])
    time_diff = df['time'].diff().dt.total_seconds()
    print("Průměrná časová prodleva mezi záznamy (v sekundách):", time_diff.mean())
else:
    print("Nedostatek záznamů pro výpočet průměrné časové prodlevy.")
 
# Uložení do CSV
df.to_csv('output.csv', index=False)
print("Data successfully saved to output.csv")

Po spuštění Flux query kód prochází všechny tabulky a záznamy vrácené z InfluxDB a extrahuje z nich relevantní informace. Pro každý záznam se vytvoří slovník s klíči “time”, “measurement”, “student_id”, “sensor_id” a “temperature”, které obsahují čas záznamu, název měření, identifikátor studenta, identifikátor senzoru a hodnotu teploty. Tyto slovníky jsou pak přidány do seznamu data, který je převeden na Pandas DataFrame.

courses/b0b37nsi/tutorials/09.txt · Last modified: 2024/04/18 10:41 by spicajak