Search
Time Series Databáze (TSDB) jsou speciálně navrženy pro ukládání a analýzu časových řad, což jsou data, která jsou časově seřazena a obvykle obsahují časové razítko. Tento typ databáze je důležitý pro mnoho aplikací, které vyžadují sledování a analýzu časových trendů, jako jsou finanční trhy, meteorologické údaje, sledování provozu, a mnoho dalších.
Časové řady lze definovat jako sekvence datových bodů indexovaných časovými razítky. Každý datový bod v časové řadě je obvykle reprezentován jako dvojice hodnot: hodnota proměnné a časové razítko, které určuje, kdy byla hodnota zaznamená. Tento koncept je základem pro analýzu a modelování časových trendů a cyklů.
Časové řady mají několik vlastností, které je činí unikátními pro analýzu a ukládání dat:
TSDB nabízejí řadu výhod oproti tradičním relačním databázím (RDBMS) a NoSQL databázím, které jsou běžně používány pro ukládání a správu dat. TSDB jsou speciálně navrženy pro efektivní ukládání, správu a analýzu časových řad.
TSDB jsou preferovány pro aplikace, které vyžadují sledování a analýzu časových trendů a cyklů. Mezi tyto aplikace patří:
InfluxDB (https://www.influxdata.com/) je open-source TSDB vyvinutá společností InfluxData. Byla navržena speciálně pro efektivní ukládání, správu a analýzu časových řad. InfluxDB byla poprvé představena v roce 2013 a od té doby se stala jednou z nejpopulárnějších TSD na trhu.
InfluxDB je preferována pro práci s časovými řadami z několika důvodů:
Na trhu TSDB existuje několik hlavních konkurentů InfluxDB. Mezi nejznámější patří:
OpenTSDB (opentsdb.net) Open-source TSDB navržená pro ukládání, indexování a poskytování přístupu k velkým objemům časových dat. Je známá svou schopností efektivně ukládat a zpracovávat velké objemy dat, ale může být méně flexibilní v porovnání s InfluxDB.
TimescaleDB (timescale.com) Rozšíření PostgreSQL, které přidává podporu pro časové řady. Je známá svou silnou podporou SQL a schopností integrovat s existujícími PostgreSQL aplikacemi, ale může být méně optimalizována pro velmi vysoké objemy časových dat.
Prometheus (prometheus.io) Open-source systém pro monitorování a upozorňování, který je speciálně navržený pro sběr a ukládání časových dat z mnoha zdrojů. Je známý svou jednoduchostí a schopností efektivně monitorovat a upozorňovat na stav systémů a aplikací, ale může být méně flexibilní pro složitější analytické úlohy.
Graphite (graphiteapp.org) Starší a méně flexibilní TSD, která je stále používána pro sledování a ukládání časových dat. Má silnou podporu pro grafy a vizualizace, ale může být méně optimalizována pro velmi vysoké objemy dat.
Při výběru mezi InfluxDB a jeho alternativami lze zvážit následující faktory:
Základní funkce, jako je vytváření databází, tabulek a dotazování, se liší mezi InfluxDB a SQL databázemi kvůli jejich odlišným architekturám a účelům.
Databáze X Organizace, Buckety
V SQL databázích je databáze primární jednotkou pro organizaci dat.
V InfluxDB jsou databáze nahrazeny konceptem “organizací”, které slouží jako kontejnery pro “buckety”, což jsou fyzické úložiště dat.
Tabulky X Series
SQL databáze používají tabulky pro ukládání dat. Každá tabulka má pevně definované schéma, které určuje strukturu dat.
Schémata X Tagy, Fields
SQL databáze používají schémata k definování struktury tabulek. InfluxDB naopak používá “tags” a “fields” k popisu dat v series. “Tags” jsou klíčové dvojice použité pro indexování a filtrování dat, zatímco “fields” obsahují skutečná měřená data.
InfluxDB naopak používá “series”, které jsou dynamické a mohou obsahovat různé typy dat.
Dotazování - SQL X Flux
SQL databáze používají SQL (Structured Query Language) pro dotazování dat. SQL je silně typovaný a podporuje složité dotazy a transakce.
InfluxDB používá Flux, což je nový, funkční dotazovací jazyk navržený pro práci s časovými řadami.
Architektura InfluxDB a SQL databází se také liší, což ovlivňuje, jak ukládají a zpracovávají data.
InfluxDB je navržena tak, aby byla optimalizována pro časové řady. Její architektura je založena na shardingu dat, což umožňuje horizontální škálování a efektivní zpracování velkých objemů dat.
SQL databáze mají tradiční architekturu založenou na tabulkách a relacích. SQL databáze jsou optimalizovány pro transakční zpracování a podporují dotazy a transakce. Jejich architektura je založena na indexování a normalizaci dat.
Bucket
V InfluxDB je “bucket” fyzické úložiště dat, které může obsahovat data z různých “series”. Buckety jsou organizovány do “organizací”, což jsou kontejnery pro buckety.
Series
V InfluxDB je “series” sekvence datových bodů, které mají stejné measurements a tagy. Series jsou základní jednotkou pro ukládání a dotazování dat v InfluxDB.
Tags
V InfluxDB je “tag” klíčová dvojice použitá pro indexování a filtrování dat. Tags umožňují rychlé a efektivní dotazování dat na základě jejich atributů.
Fields
V InfluxDB je “field” skutečná měřená data uložená v series. Každý datový bod v series obsahuje jedno nebo více fields.
Meassurements
V InfluxDB je “measurement” název pro typ dat, které jsou uloženy v series. Measurements definují, jak jsou data organizována a jak mohou být dotazována.
airSensor,sensorId=A0100,station=Harbor humidityValue=35.0658,temperatureValue=21.667 1636729543000000000 waterQualitySensor,sensorId=W0101,station=Harbor pHValue=6.1,temperatureValue=16.103 1472515200000000000
Cloud
InfluxDB Cloud
Instalace InfluxDB v cloudu umožňuje uživatelům využít cloudové služby pro snadnou a rychlou instalaci a správu databáze. Pro instalaci InfluxDB v cloudu je možné využít služby poskytované společností InfluxData, jako je InfluxDB Cloud. Tato služba nabízí plánované a automatizované zálohy, škálování a aktualizace. Uživatelé mohou také využít InfluxDB 2.0, nejnovější verzi InfluxDB, která přináší řadu vylepšení a nových funkcí.
Cloudová instalace InfluxDB je nejvhodnější v situacích, kdy:
Vlastní instalace
Dokumentace
Instalace InfluxDB na vlastní server umožňuje uživatelům plnou kontrolu nad konfigurací a nastavením databáze.
Vlastní instalace InfluxDB je vhodná v situacích, kdy:
Docker
Docker Hub
Docker umožňuje snadnou a rychlou instalaci InfluxDB tím, že poskytuje kontejnerizované řešení, které zjednodušuje proces instalace a správy databáze.
Docker je vhodný pro instalaci InfluxDB v situacích, kdy:
API tokeny jsou používány pro ověření a autorizaci přístupu k databázi a její API. API tokeny umožňují definovat, kteří uživatelé mají přístup k jakým datům a jaké operace mohou provádět. Tímto způsobem je možné efektivně řídit přístup k datům a zajistit, že pouze autorizovaní uživatelé mohou provádět určité operace.
InfluxDB Docs - Manage API Tokens
1. Přihlášení do InfluxDB GUI
Otevřete webové rozhraní InfluxDB a přihlaste se pomocí svých přihlašovacích údajů.
2. Přejděte do sekce API Tokens
Postranní lišta → Load Data → API Tokens
3. Vytvoření nového API tokenu
+ GENERATE API TOKEN → All Access API Token → Zadejte popis tokenu API → SAVE.
TBD: Custom API Token
MicroPython nemá přímo integrovaný balíček pro vytvoření InfluxDB klienta. Nicméně, je možné využít modul urequests pro odesílání HTTP požadavků, což nám umožní komunikovat s InfluxDB API.
InfluxDB Docs - Write data with the InfluxDB API
import network from time import sleep, time import json import machine import random import urequests import gc PERIOD = 10 # Perioda odesilani dat last_temp = 23.5 # Pocatecni hodnota simulovanych dat SENSOR_ID = "rpipw-01" STUDENT_ID = "cvut-login" INFLUXDB_API_TOKEN = "api-token" INFLUXDB_SERVER = "influx-server" INFLUXDB_ORG = "influx-organization" INFLUXDB_BUCKET = "influx-bucket" INFLUXDB_MEASUREMENT = "influx-measurement" INTERNET_SSID = "wifi-ap-ssid" INTERNET_PASSWORD = "wifi-ap-password" def connect_to_wifi(): wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(INTERNET_SSID, INTERNET_PASSWORD) while wlan.isconnected() == False: print('Waiting for WiFi connection...') sleep(1) print('Connected to WiFi:', wlan.ifconfig()) return wlan def send_data_to_influxdb(temperature): url = f"http://{INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns" headers = { 'Authorization': f'Token {INFLUXDB_API_TOKEN}', 'Content-Type': 'text/plain; charset=utf-8', 'Accept': 'application/json' } data = f"{INFLUXDB_MEASUREMENT},sensor_id={SENSOR_ID},student_id={STUDENT_ID} temperature={temperature}" try: response = urequests.post(url, headers=headers, data=data) if response.status_code == 204: print('Data successfully sent to InfluxDB! T = ', temperature, ' °C') gc.collect() return True else: print('Failed to send data to InfluxDB.') except Exception as e: print('Error sending data to InfluxDB:', e) return False try: wlan = connect_to_wifi() while True: new_temp = last_temp + random.uniform(-0.5, 0.5) if new_temp < 15: new_temp = 15 elif new_temp > 30: new_temp = 30 last_temp = new_temp send_data_to_influxdb(new_temp) sleep(PERIOD) except KeyboardInterrupt: machine.reset()
HTTP požadavek odeslaný z Raspberry Pi do InfluxDB má následující strukturu.
Metoda je POST', což znamená, že požadavek je určen k odeslání dat na server.
POST
URL adresa požadavku je {INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns. Tato URL obsahuje adresu (INFLUXDB_SERVER) a port (8086) serveru InfluxDB, cestu k API endpointu pro zápis dat (/api/v2/write), a parametry specifikující organizaci (org={INFLUXDB_ORG}), bucket (bucket={INFLUXDB_BUCKET}) a přesnost času pro data (precision=ns).
{INFLUXDB_SERVER}:8086/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns
INFLUXDB_SERVER
8086
/api/v2/write
org={INFLUXDB_ORG}
bucket={INFLUXDB_BUCKET}
precision=ns
Autorizační token je poskytnut v hlavičce Authorization, což umožňuje ověření požadavku. 'Authorization': f'Token {INFLUXDB_API_TOKEN}'
'Authorization': f'Token {INFLUXDB_API_TOKEN}
Typ obsahu požadavku je určen jako text/plain s kódováním UTF-8, a je specifikován v hlavičce Content-Type. 'Content-Type': 'text/plain; charset=utf-8'
'Content-Type': 'text/plain; charset=utf-8
Očekává se odpověď ve formátu JSON, což je specifikováno v hlavičce Accept. 'Accept': 'application/json'
'Accept': 'application/json
Tělo požadavku obsahuje data ve formátu Line Protocol, které jsou formátovány jako řetězec s názvem měření, identifikátory senzoru a studenta, a hodnotou teploty.
Pandas (pandas.pydata.org)
Pandas je široce používaná knihovna v Pythonu, která poskytuje flexibilní datové struktury a nástroje pro manipulaci s daty. Umožňuje snadno manipulovat s velkými datovými sadami, provádět statistické analýzy, a efektivně ukládat a načítat data z různých zdrojů.
Pandas poskytuje datový typ **DataFrame**, který je ideální pro práci s tabulkovými daty. DataFrame je dvourozměrná datová struktura, která může obsahovat data různých typů (čísla, řetězce, datumy atd.) a umožňuje snadno provádět operace jako filtrování, agregaci, a transformaci dat.
Pandas také poskytuje řadu funkcí pro čtení a zápis dat z různých formátů a zdrojů, včetně CSV souborů, Excel souborů, SQL databází, a z webových API.
InfluxDBClient (influxdb-client.readthedocs.io)
InfluxDBClient je knihovna, která poskytuje API pro práci s InfluxDB v Pythonu. Umožňuje snadno odesílat a načítat data z InfluxDB, provádět dotazy na data pomocí Flux nebo InfluxQL, a spravovat databáze a uživatele v InfluxDB.
InfluxDBClient poskytuje třídu InfluxDBClient, která je hlavní třídou pro interakci s InfluxDB. Tato třída umožňuje vytvoření instance klienta, která je pak použita pro provádění operací s databází. InfluxDBClient také poskytuje další třídy a nástroje pro práci s daty v InfluxDB, včetně QueryAPI pro provádění dotazů a WriteAPI pro zápis dat.
import pandas as pd from influxdb_client import InfluxDBClient INFLUXDB_URL = 'http://server:port' INFLUXDB_TOKEN = 'influx-api-token' INFLUXDB_ORG = 'influx-organization' INFLUXDB_BUCKET = 'influx-bucket' STUDENT_ID = 'cvut-login' MINUTES = 60 # Flux query flux_query = f""" from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -{MINUTES}m) |> filter(fn: (r) => r._measurement == "102" and r.student_id == "{STUDENT_ID}") """ # Připojení k InfluxDB client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) query_api = client.query_api() # Spuštění Flux query result = query_api.query(flux_query) # Zpracování výsledků data = [] for table in result: for record in table.records: data.append({ "time": record.get_time(), "measurement": record.get_measurement(), "student_id": record.values.get('student_id', ''), "sensor_id": record.values.get('sensor_id', ''), "temperature": record.get_value() }) # Převod na DataFrame df = pd.DataFrame(data) # Statistika print("Začátek: ", df['time'].min()) print("Konec: ", df['time'].max()) print("Minimální teplota:", df['temperature'].min()) print("Maximální teplota:", df['temperature'].max()) print("Průměrná teplota: ", df['temperature'].mean()) print("Počet záznamů: ", df['temperature'].count()) if len(df) > 1: df['time'] = pd.to_datetime(df['time']) time_diff = df['time'].diff().dt.total_seconds() print("Průměrná časová prodleva mezi záznamy (v sekundách):", time_diff.mean()) else: print("Nedostatek záznamů pro výpočet průměrné časové prodlevy.") # Uložení do CSV df.to_csv('output.csv', index=False) print("Data successfully saved to output.csv")
Po spuštění Flux query kód prochází všechny tabulky a záznamy vrácené z InfluxDB a extrahuje z nich relevantní informace. Pro každý záznam se vytvoří slovník s klíči “time”, “measurement”, “student_id”, “sensor_id” a “temperature”, které obsahují čas záznamu, název měření, identifikátor studenta, identifikátor senzoru a hodnotu teploty. Tyto slovníky jsou pak přidány do seznamu data, který je převeden na Pandas DataFrame.