====== 9. Databáze - InfluxDB ====== ===== Cíle cvičení ===== * Seznámit se s časovými řadami, InfluxDB a jeho data modelem * Vytvořit uPython aplikaci pro odesílání dat do InfluxDB * Seznámit se s balíčkem Pandas v kontextu zpracování dat z InfluxDB ===== Důležité odkazy ===== * [[https://docs.influxdata.com/influxdb/v2/|InfluxDB dokumentace]] * [[https://pandas.pydata.org/docs/|Pandas dokumentace]] * [[https://influxdb-client.readthedocs.io/en/latest/index.html| InfluxDB Client dokumentace]] * [[https://cw.fel.cvut.cz/wiki/courses/b0b37nsi/hw/99|Parametry InfluxDB Serveru]] ===== Databáze časových řad ===== Time Series Databáze (TSDB) jsou speciálně navrženy pro ukládání a analýzu časových řad, což jsou data, která jsou časově seřazena a obvykle obsahují časové razítko. Tento typ databáze je důležitý pro mnoho aplikací, které vyžadují sledování a analýzu časových trendů, jako jsou finanční trhy, meteorologické údaje, sledování provozu, a mnoho dalších. ==== Základní principy časových řad ==== Časové řady lze definovat jako sekvence datových bodů indexovaných časovými razítky. Každý datový bod v časové řadě je obvykle reprezentován jako dvojice hodnot: hodnota proměnné a časové razítko, které určuje, kdy byla hodnota zaznamená. Tento koncept je základem pro analýzu a modelování časových trendů a cyklů. Časové řady mají několik vlastností, které je činí unikátními pro analýzu a ukládání dat: * Data jsou časově seřazena a často zahrnují časové razítko, které určuje, kdy byla hodnota zaznamená. * Na rozdíl od diskrétních datových typů, jako jsou tabulky nebo grafy, časové řady pokrývají kontinuální časovou osu. To znamená, že data mohou být zaznamenána v jakémkoli časovém intervalu. * Časové řady často obsahují sezónní vzorce a trendy, které mohou být zjištěny a analyzovány pomocí statistických a matematických modelů. ==== Vlastnosti a výhody ==== TSDB nabízejí řadu výhod oproti tradičním relačním databázím (RDBMS) a NoSQL databázím, které jsou běžně používány pro ukládání a správu dat. TSDB jsou speciálně navrženy pro efektivní ukládání, správu a analýzu časových řad. * TSDB jsou optimalizovány pro rychlé čtení a zápis dat, což je důležité pro aplikace, které vyžadují analýzu velkých objemů časových dat v reálném čase. * Díky své schopnosti efektivně ukládat a zpracovávat velké objemy časových dat, TSDB umožňují efektivní správu dat, což snižuje náklady na úložiště a zvyšuje rychlost zpracování dat. * Na rozdíl od jiných typů databází, které mohou vyžadovat dodatečné nástroje nebo komplikované řešení pro práci s časovými řadami, TSDB jsou navrženy tak, aby přímo podporovaly časové řady, což zjednodušuje jejich implementaci a správu. * TSDB jsou navrženy tak, aby mohly efektivně škálovat s rostoucím objemem dat. TSDB jsou preferovány pro aplikace, které vyžadují sledování a analýzu časových trendů a cyklů. Mezi tyto aplikace patří: * Finanční trhy: Pro sledování cenových trendů, analýzu volatility a provádění algoritmického obchodování. * Meteorologické údaje: Pro sledování a předpověď počasí, teploty a dalších meteorologických parametrů. * Sledování provozu: Pro monitorování výkonu a stavu průmyslových zařízení, infrastruktury a sítí v reálném čase. * IoT aplikace: Pro sběr a analýzu dat z IoT zařízení, jako jsou senzory, chytré domy a průmyslové sítě. ==== InfluxDB ==== InfluxDB ([[https://www.influxdata.com/]]) je open-source TSDB vyvinutá společností InfluxData. Byla navržena speciálně pro efektivní ukládání, správu a analýzu časových řad. InfluxDB byla poprvé představena v roce 2013 a od té doby se stala jednou z nejpopulárnějších TSD na trhu. InfluxDB je preferována pro práci s časovými řadami z několika důvodů: * InfluxDB je speciálně navržena pro časové řady, což ji činí ideální volbou pro aplikace, které vyžadují sledování a analýzu časových trendů. * Díky své architektuře je InfluxDB schopna efektivně zpracovávat velké objemy časových dat s nízkou latencí a umožňuje horizontální škálování pro zvládnutí rostoucích objemů dat. * Poskytuje pokročilé funkce pro práci s časovými řadami, což umožňuje hlubší a složitější analýzy dat. * Umožňuje uživatelům definovat vlastní schémata a indexy pro efektivní správu dat a poskytuje silné bezpečnostní funkce pro ochranu dat a přístupů. ==== InfluxDB Alternativy ==== Na trhu TSDB existuje několik hlavních konkurentů InfluxDB. Mezi nejznámější patří: **OpenTSDB** ([[http://opentsdb.net/|opentsdb.net]])\\ Open-source TSDB navržená pro ukládání, indexování a poskytování přístupu k velkým objemům časových dat. Je známá svou schopností efektivně ukládat a zpracovávat velké objemy dat, ale může být méně flexibilní v porovnání s InfluxDB. **TimescaleDB** ([[https://www.timescale.com/|timescale.com]])\\ Rozšíření PostgreSQL, které přidává podporu pro časové řady. Je známá svou silnou podporou SQL a schopností integrovat s existujícími PostgreSQL aplikacemi, ale může být méně optimalizována pro velmi vysoké objemy časových dat. **Prometheus** ([[https://prometheus.io/|prometheus.io]])\\ Open-source systém pro monitorování a upozorňování, který je speciálně navržený pro sběr a ukládání časových dat z mnoha zdrojů. Je známý svou jednoduchostí a schopností efektivně monitorovat a upozorňovat na stav systémů a aplikací, ale může být méně flexibilní pro složitější analytické úlohy. **Graphite** ([[https://graphiteapp.org/|graphiteapp.org]])\\ Starší a méně flexibilní TSD, která je stále používána pro sledování a ukládání časových dat. Má silnou podporu pro grafy a vizualizace, ale může být méně optimalizována pro velmi vysoké objemy dat. Při výběru mezi InfluxDB a jeho alternativami lze zvážit následující faktory: * Výkon a škálovatelnost: Jak dobře databáze zvládá velké objemy časových dat a jak dobře může škálovat s rostoucím objemem dat. * Flexibilita a podpora pro časové řady: Jak dobře databáze podporuje časové řady a jaké jsou její pokročilé analytické funkce. * Bezpečnost a správa dat: Jak dobře databáze chrání data a jaké jsou její bezpečnostní funkce. * Podpora a komunita: Jak silná je podpora a komunita kolem databáze a jak dobře je databáze aktualizována a udržována. * Cena a licencování: Jaké jsou náklady na použití databáze a jaké jsou její licenční požadavky. ===== InfluxDB vs. SQL DB ===== Základní funkce, jako je vytváření databází, tabulek a dotazování, se liší mezi InfluxDB a SQL databázemi kvůli jejich odlišným architekturám a účelům. **Databáze X Organizace, Buckety**\\ V SQL databázích je databáze primární jednotkou pro organizaci dat. V InfluxDB jsou databáze nahrazeny konceptem "organizací", které slouží jako kontejnery pro "buckety", což jsou fyzické úložiště dat. **Tabulky X Series**\\ SQL databáze používají tabulky pro ukládání dat. Každá tabulka má pevně definované schéma, které určuje strukturu dat. **Schémata X Tagy, Fields**\\ SQL databáze používají schémata k definování struktury tabulek. InfluxDB naopak používá "tags" a "fields" k popisu dat v series. "Tags" jsou klíčové dvojice použité pro indexování a filtrování dat, zatímco "fields" obsahují skutečná měřená data. InfluxDB naopak používá "series", které jsou dynamické a mohou obsahovat různé typy dat. **Dotazování - SQL X Flux**\\ SQL databáze používají SQL (Structured Query Language) pro dotazování dat. SQL je silně typovaný a podporuje složité dotazy a transakce. InfluxDB používá Flux, což je nový, funkční dotazovací jazyk navržený pro práci s časovými řadami. ==== Rozdíly v architektuře ==== Architektura InfluxDB a SQL databází se také liší, což ovlivňuje, jak ukládají a zpracovávají data. InfluxDB je navržena tak, aby byla optimalizována pro časové řady. Její architektura je založena na shardingu dat, což umožňuje horizontální škálování a efektivní zpracování velkých objemů dat. SQL databáze mají tradiční architekturu založenou na tabulkách a relacích. SQL databáze jsou optimalizovány pro transakční zpracování a podporují dotazy a transakce. Jejich architektura je založena na indexování a normalizaci dat. ==== Terminologie InfluxDB ==== **Bucket**\\ V InfluxDB je "bucket" fyzické úložiště dat, které může obsahovat data z různých "series". Buckety jsou organizovány do "organizací", což jsou kontejnery pro buckety. **Series**\\ V InfluxDB je "series" sekvence datových bodů, které mají stejné measurements a tagy. Series jsou základní jednotkou pro ukládání a dotazování dat v InfluxDB. **Tags**\\ V InfluxDB je "tag" klíčová dvojice použitá pro indexování a filtrování dat. Tags umožňují rychlé a efektivní dotazování dat na základě jejich atributů. **Fields**\\ V InfluxDB je "field" skutečná měřená data uložená v series. Každý datový bod v series obsahuje jedno nebo více fields. **Meassurements**\\ V InfluxDB je "measurement" název pro typ dat, které jsou uloženy v series. Measurements definují, jak jsou data organizována a jak mohou být dotazována. ^ measurement ^ tag key ^ tag key ^ field key ^ field key ^ | airSensor | sensorID | stationID | humidityValue | temperatureValue | | waterQualitySensor | sensorID | stationID | pHValue | temperatureValue | airSensor,sensorId=A0100,station=Harbor humidityValue=35.0658,temperatureValue=21.667 1636729543000000000 waterQualitySensor,sensorId=W0101,station=Harbor pHValue=6.1,temperatureValue=16.103 1472515200000000000 ===== Instalace a konfigurace InfluxDB ===== **Cloud** [[https://www.influxdata.com/products/influxdb-cloud/|InfluxDB Cloud]] Instalace InfluxDB v cloudu umožňuje uživatelům využít cloudové služby pro snadnou a rychlou instalaci a správu databáze. Pro instalaci InfluxDB v cloudu je možné využít služby poskytované společností InfluxData, jako je InfluxDB Cloud. Tato služba nabízí plánované a automatizované zálohy, škálování a aktualizace. Uživatelé mohou také využít InfluxDB 2.0, nejnovější verzi InfluxDB, která přináší řadu vylepšení a nových funkcí. Cloudová instalace InfluxDB je nejvhodnější v situacích, kdy: * Není potřeba plná kontrola nad databází * Je potřeba rychlé nasazení * Je důležitá škálovatelnost a dostupnost **Vlastní instalace** [[https://docs.influxdata.com/influxdb/v2/install/?t=Linux#install-influxdb-as-a-service-with-systemd|Dokumentace]] Instalace InfluxDB na vlastní server umožňuje uživatelům plnou kontrolu nad konfigurací a nastavením databáze. Vlastní instalace InfluxDB je vhodná v situacích, kdy: * Je potřeba plná kontrola nad databází * Je důležitý výkon a optimalizace * Je potřeba integrace s existujícími systémy **Docker** [[https://hub.docker.com/_/influxdb|Docker Hub]] Docker umožňuje snadnou a rychlou instalaci InfluxDB tím, že poskytuje kontejnerizované řešení, které zjednodušuje proces instalace a správy databáze. Docker je vhodný pro instalaci InfluxDB v situacích, kdy: * Je potřeba izolace a reproducibilita * Je potřeba rychlé nasazení pro vývoj a testování * Je důležité snadné nasazení a správa ===== Bezpečnost - API Tokeny ===== API tokeny jsou používány pro ověření a autorizaci přístupu k databázi a její API. API tokeny umožňují definovat, kteří uživatelé mají přístup k jakým datům a jaké operace mohou provádět. Tímto způsobem je možné efektivně řídit přístup k datům a zajistit, že pouze autorizovaní uživatelé mohou provádět určité operace. [[https://docs.influxdata.com/influxdb/v2/admin/tokens/|InfluxDB Docs - Manage API Tokens]] ==== Vytvoření tokenu ==== **1. Přihlášení do InfluxDB GUI** Otevřete webové rozhraní InfluxDB a přihlaste se pomocí svých přihlašovacích údajů. **2. Přejděte do sekce API Tokens** Postranní lišta -> Load Data -> API Tokens **3. Vytvoření nového API tokenu** + GENERATE API TOKEN -> All Access API Token -> Zadejte popis tokenu API -> SAVE. TBD: Custom API Token ===== InfluxDB API - Odesílání dat z Raspberry Pi ===== MicroPython nemá přímo integrovaný balíček pro vytvoření InfluxDB klienta. Nicméně, je možné využít modul [[https://makeblock-micropython-api.readthedocs.io/en/latest/public_library/Third-party-libraries/urequests.html|urequests]] pro odesílání HTTP požadavků, což nám umožní komunikovat s InfluxDB API. [[https://docs.influxdata.com/influxdb/v2/write-data/developer-tools/api/|InfluxDB Docs - Write data with the InfluxDB API]] import network from time import sleep, time import json import machine import random import urequests import gc PERIOD = 10 # Perioda odesilani dat last_temp = 23.5 # Pocatecni hodnota simulovanych dat SENSOR_ID = "rpipw-01" STUDENT_ID = "cvut-login" INFLUXDB_API_TOKEN = "api-token" INFLUXDB_SERVER = "influx-server" INFLUXDB_ORG = "influx-organization" INFLUXDB_BUCKET = "influx-bucket" INFLUXDB_MEASUREMENT = "influx-measurement" INTERNET_SSID = "wifi-ap-ssid" INTERNET_PASSWORD = "wifi-ap-password" def connect_to_wifi(): wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(INTERNET_SSID, INTERNET_PASSWORD) while wlan.isconnected() == False: print('Waiting for WiFi connection...') sleep(1) print('Connected to WiFi:', wlan.ifconfig()) return wlan def send_data_to_influxdb(temperature): url = f"http://{INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns" headers = { 'Authorization': f'Token {INFLUXDB_API_TOKEN}', 'Content-Type': 'text/plain; charset=utf-8', 'Accept': 'application/json' } data = f"{INFLUXDB_MEASUREMENT},sensor_id={SENSOR_ID},student_id={STUDENT_ID} temperature={temperature}" try: response = urequests.post(url, headers=headers, data=data) if response.status_code == 204: print('Data successfully sent to InfluxDB! T = ', temperature, ' °C') gc.collect() return True else: print('Failed to send data to InfluxDB.') except Exception as e: print('Error sending data to InfluxDB:', e) return False try: wlan = connect_to_wifi() while True: new_temp = last_temp + random.uniform(-0.5, 0.5) if new_temp < 15: new_temp = 15 elif new_temp > 30: new_temp = 30 last_temp = new_temp send_data_to_influxdb(new_temp) sleep(PERIOD) except KeyboardInterrupt: machine.reset() HTTP požadavek odeslaný z Raspberry Pi do InfluxDB má následující strukturu. Metoda je ''POST''', což znamená, že požadavek je určen k odeslání dat na server. URL adresa požadavku je ''{INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns''. Tato URL obsahuje adresu (''INFLUXDB_SERVER'') a port (''8086'') serveru InfluxDB, cestu k API endpointu pro zápis dat (''/api/v2/write''), a parametry specifikující organizaci (''org={INFLUXDB_ORG}''), bucket (''bucket={INFLUXDB_BUCKET}'') a přesnost času pro data (''precision=ns''). Autorizační token je poskytnut v hlavičce Authorization, což umožňuje ověření požadavku. '''Authorization': f'Token {INFLUXDB_API_TOKEN}''' Typ obsahu požadavku je určen jako text/plain s kódováním UTF-8, a je specifikován v hlavičce Content-Type. '''Content-Type': 'text/plain; charset=utf-8''' Očekává se odpověď ve formátu JSON, což je specifikováno v hlavičce Accept. '''Accept': 'application/json''' Tělo požadavku obsahuje data ve formátu Line Protocol, které jsou formátovány jako řetězec s názvem měření, identifikátory senzoru a studenta, a hodnotou teploty. ===== Zpracování dat z InfluxDB pomocí Pandas ===== **Pandas** ([[https://pandas.pydata.org/docs/|pandas.pydata.org]]) Pandas je široce používaná knihovna v Pythonu, která poskytuje flexibilní datové struktury a nástroje pro manipulaci s daty. Umožňuje snadno manipulovat s velkými datovými sadami, provádět statistické analýzy, a efektivně ukládat a načítat data z různých zdrojů. Pandas poskytuje datový typ [[https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html|**DataFrame**]], který je ideální pro práci s tabulkovými daty. DataFrame je dvourozměrná datová struktura, která může obsahovat data různých typů (čísla, řetězce, datumy atd.) a umožňuje snadno provádět operace jako filtrování, agregaci, a transformaci dat. Pandas také poskytuje řadu funkcí pro čtení a zápis dat z různých formátů a zdrojů, včetně CSV souborů, Excel souborů, SQL databází, a z webových API. **InfluxDBClient** ([[https://influxdb-client.readthedocs.io/en/latest/index.html|influxdb-client.readthedocs.io]]) InfluxDBClient je knihovna, která poskytuje API pro práci s InfluxDB v Pythonu. Umožňuje snadno odesílat a načítat data z InfluxDB, provádět dotazy na data pomocí Flux nebo InfluxQL, a spravovat databáze a uživatele v InfluxDB. InfluxDBClient poskytuje třídu InfluxDBClient, která je hlavní třídou pro interakci s InfluxDB. Tato třída umožňuje vytvoření instance klienta, která je pak použita pro provádění operací s databází. InfluxDBClient také poskytuje další třídy a nástroje pro práci s daty v InfluxDB, včetně QueryAPI pro provádění dotazů a WriteAPI pro zápis dat. import pandas as pd from influxdb_client import InfluxDBClient INFLUXDB_URL = 'http://server:port' INFLUXDB_TOKEN = 'influx-api-token' INFLUXDB_ORG = 'influx-organization' INFLUXDB_BUCKET = 'influx-bucket' STUDENT_ID = 'cvut-login' MINUTES = 60 # Flux query flux_query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -{MINUTES}m) |> filter(fn: (r) => r._measurement == "102" and r.student_id == "{STUDENT_ID}") """ # Připojení k InfluxDB client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) query_api = client.query_api() # Spuštění Flux query result = query_api.query(flux_query) # Zpracování výsledků data = [] for table in result: for record in table.records: data.append({ "time": record.get_time(), "measurement": record.get_measurement(), "student_id": record.values.get('student_id', ''), "sensor_id": record.values.get('sensor_id', ''), "temperature": record.get_value() }) # Převod na DataFrame df = pd.DataFrame(data) # Statistika print("Začátek: ", df['time'].min()) print("Konec: ", df['time'].max()) print("Minimální teplota:", df['temperature'].min()) print("Maximální teplota:", df['temperature'].max()) print("Průměrná teplota: ", df['temperature'].mean()) print("Počet záznamů: ", df['temperature'].count()) if len(df) > 1: df['time'] = pd.to_datetime(df['time']) time_diff = df['time'].diff().dt.total_seconds() print("Průměrná časová prodleva mezi záznamy (v sekundách):", time_diff.mean()) else: print("Nedostatek záznamů pro výpočet průměrné časové prodlevy.") # Uložení do CSV df.to_csv('output.csv', index=False) print("Data successfully saved to output.csv") Po spuštění Flux query kód prochází všechny tabulky a záznamy vrácené z InfluxDB a extrahuje z nich relevantní informace. Pro každý záznam se vytvoří slovník s klíči "time", "measurement", "student_id", "sensor_id" a "temperature", které obsahují čas záznamu, název měření, identifikátor studenta, identifikátor senzoru a hodnotu teploty. Tyto slovníky jsou pak přidány do seznamu data, který je převeden na Pandas DataFrame.