diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..22768c7 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,78 @@ +[target.'cfg(all())'] +rustflags = [ + "-Funsafe_code", + "-Dclippy::all", + "-Wclippy::await_holding_lock", + "-Wclippy::char_lit_as_u8", + "-Wclippy::checked_conversions", + "-Wclippy::dbg_macro", + "-Wclippy::debug_assert_with_mut_call", + "-Wclippy::disallowed_methods", + "-Wclippy::disallowed_types", + "-Wclippy::empty_enum", + "-Wclippy::enum_glob_use", + "-Wclippy::exit", + "-Wclippy::expl_impl_clone_on_copy", + "-Wclippy::explicit_deref_methods", + "-Wclippy::explicit_into_iter_loop", + "-Wclippy::fallible_impl_from", + "-Wclippy::filter_map_next", + "-Wclippy::flat_map_option", + "-Wclippy::float_cmp_const", + "-Wclippy::fn_params_excessive_bools", + "-Wclippy::from_iter_instead_of_collect", + "-Wclippy::if_let_mutex", + "-Wclippy::implicit_clone", + "-Wclippy::imprecise_flops", + "-Dclippy::inefficient_to_string", + "-Wclippy::invalid_upcast_comparisons", + "-Wclippy::large_digit_groups", + "-Wclippy::large_stack_arrays", + "-Wclippy::large_types_passed_by_value", + "-Wclippy::let_unit_value", + "-Wclippy::linkedlist", + "-Wclippy::lossy_float_literal", + "-Wclippy::macro_use_imports", + "-Wclippy::manual_ok_or", + "-Wclippy::map_err_ignore", + "-Wclippy::map_flatten", + "-Wclippy::map_unwrap_or", + "-Wclippy::match_on_vec_items", + "-Wclippy::match_same_arms", + "-Wclippy::match_wild_err_arm", + "-Dclippy::match_wildcard_for_single_variants", + "-Wclippy::mem_forget", + "-Wclippy::mismatched_target_os", + "-Wclippy::missing_enforced_import_renames", + "-Wclippy::mut_mut", + "-Wclippy::mutex_integer", + "-Wclippy::needless_borrow", + "-Wclippy::needless_continue", + "-Wclippy::needless_for_each", + "-Wclippy::option_option", + "-Wclippy::path_buf_push_overwrite", + "-Wclippy::ptr_as_ptr", + "-Wclippy::rc_mutex", + "-Wclippy::ref_option_ref", + "-Wclippy::rest_pat_in_fully_bound_structs", + "-Wclippy::same_functions_in_if_condition", + "-Wclippy::semicolon_if_nothing_returned", + "-Wclippy::single_match_else", + "-Wclippy::string_add_assign", + "-Wclippy::string_add", + "-Wclippy::string_lit_as_bytes", + "-Wclippy::string_to_string", + "-Wclippy::todo", + "-Wclippy::trait_duplication_in_bounds", + "-Wclippy::unimplemented", + "-Wclippy::unnested_or_patterns", + "-Wclippy::unused_self", + "-Wclippy::useless_transmute", + "-Wclippy::verbose_file_reads", + "-Wclippy::zero_sized_map_values", + "-Wfuture_incompatible", + # TODO: reenable + # "-Wmissing_docs", + "-Wnonstandard_style", + "-Wrust_2018_idioms", +] diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..67f579c --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,82 @@ +name: Deploy + +on: + push: + branches: + - main + workflow_dispatch: + +env: + slug: ghcr.io/fyko/ambient-weather-collector + dockerfile: ./Dockerfile + +jobs: + publish: + name: Publish Image + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Cache Docker layers + uses: actions/cache@v3 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx- + + - name: Login to Google Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.slug }} + tags: | + type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }} + type=sha + type=sha,format=long + #type=ref,enable=true,priority=600,prefix=,suffix=,event=pr + #type=ref,enable=true,priority=600,prefix=,suffix=,event=push + + - name: Build + uses: docker/build-push-action@v5 + with: + context: . + file: ${{ env.dockerfile }} + push: true + tags: ${{ steps.meta.outputs.tags }} + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max + + # Temp fix + # https://github.com/docker/build-push-action/issues/252 + # https://github.com/moby/buildkit/issues/1896 + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache + + - name: Trigger Portainer Webhook + if: github.repository == 'Fyko/ambient-weather-collector' + shell: bash + run: | + webhook_url=$PORTAINER_DEPLOY_WEBHOOK + if [ -z "$webhook_url" ]; then + echo "PORTAINER_DEPLOY_WEBHOOK secret is not set" + exit 0 + fi + + curl -X POST ${{ secrets.PORTAINER_DEPLOY_WEBHOOK }} diff --git a/.gitignore b/.gitignore index 26efd8d..6df0159 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ Cargo.lock *.pdb .envrc +.env diff --git a/Cargo.toml b/Cargo.toml index b358d0c..eb4b1fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,13 @@ rust_socketio = { version = "0.6.0", features = ["async"] } secrecy = { version = "0.8.0", features = ["serde"] } serde = { version = "1.0.199", features = ["derive"] } serde_json = "1.0.116" +sqlx = { version = "0.7", features = [ + "runtime-tokio-rustls", + "postgres", + "uuid", + "time", + "json", +] } tokio = { version = "1.37.0", features = [ "rt", "rt-multi-thread", diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e74bba7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM clux/muslrust:stable AS chef +USER root +RUN cargo install cargo-chef +WORKDIR /app + +FROM chef AS planner +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +COPY --from=planner /app/recipe.json recipe.json +RUN cargo chef cook --release --target x86_64-unknown-linux-musl --recipe-path recipe.json +COPY . . +RUN cargo build --release --target x86_64-unknown-linux-musl --bin ambient_weather_collector --no-default-features + +FROM alpine AS runtime +WORKDIR /app +COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/ambient_weather_collector . + +RUN ls -la +CMD ["/app/ambient_weather_collector"] diff --git a/Makefile.toml b/Makefile.toml new file mode 100644 index 0000000..926e83a --- /dev/null +++ b/Makefile.toml @@ -0,0 +1,70 @@ +[tasks.setup] +script = ''' + echo installing git hooks + pre-commit --version || pip install pre-commit + pre-commit install || echo "failed to install git hooks!" 1>&2 + + echo installing stuff for tests + cargo binstall -y cargo-nextest + rustup component add clippy-preview +''' + +[tasks.dev] +command = "cargo" +env_files = ["./.env"] +args = ["run"] + +[tasks.lint] +install_crate = "clippy" +command = "cargo" +args = [ + "+nightly", + "clippy", + "--tests", + "--examples", + "--all-targets", + "--all-features", +] + +[tasks.fmt] +install_crate = "rustfmt" +command = "cargo" +args = ["fmt", "--all"] + +[tasks.lint-ci] +install_crate = "clippy" +command = "cargo" +args = [ + "clippy", + "--tests", + "--examples", + "--all-targets", + "--all-features", + "--workspace", + "--", + "-D", + "warnings", +] + +[tasks.fmt-ci] +install_crate = "rustfmt" +command = "cargo" +args = ["fmt", "--all", "--", "--check"] + +# runs tests +[tasks.test] +env = { "RUN_MODE" = "test", "RUST_LOG" = "info", "RUST_BACKTRACE" = 0 } +command = "cargo" +args = [ + "nextest", + "run", + "--examples", + "--all-targets", + "--all-features", + "${@}", +] + +[tasks.test-ci] +env = { "RUN_MODE" = "ci", "RUST_LOG" = "info" } +command = "cargo" +args = ["nextest", "run", "--examples", "--all-targets", "--all-features"] diff --git a/datagrip/.idea/.gitignore b/datagrip/.idea/.gitignore new file mode 100644 index 0000000..8bf4d45 --- /dev/null +++ b/datagrip/.idea/.gitignore @@ -0,0 +1,6 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/datagrip/.idea/.name b/datagrip/.idea/.name new file mode 100644 index 0000000..a1199e4 --- /dev/null +++ b/datagrip/.idea/.name @@ -0,0 +1 @@ +ambient-weather-collector \ No newline at end of file diff --git a/datagrip/.idea/ambient-weather-collector.iml b/datagrip/.idea/ambient-weather-collector.iml new file mode 100644 index 0000000..0399c4b --- /dev/null +++ b/datagrip/.idea/ambient-weather-collector.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/datagrip/.idea/dataSources.xml b/datagrip/.idea/dataSources.xml new file mode 100644 index 0000000..5fc64d4 --- /dev/null +++ b/datagrip/.idea/dataSources.xml @@ -0,0 +1,12 @@ + + + + + postgresql + true + org.postgresql.Driver + jdbc:postgresql://localhost:5432/ambient_weather + $ProjectFileDir$ + + + \ No newline at end of file diff --git a/datagrip/.idea/modules.xml b/datagrip/.idea/modules.xml new file mode 100644 index 0000000..0b84b7c --- /dev/null +++ b/datagrip/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/datagrip/.idea/vcs.xml b/datagrip/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/datagrip/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/docker-compose.portainer.yml b/docker-compose.portainer.yml new file mode 100644 index 0000000..4072019 --- /dev/null +++ b/docker-compose.portainer.yml @@ -0,0 +1,38 @@ +services: + collector: + image: ghcr.io/fyko/ambient-weather-collector:latest + restart: unless-stopped + environment: + - AMBIENT_WEATHER_API_KEY=${AMBIENT_WEATHER_API_KEY:?ambient weather api key required} + - AMBIENT_WEATHER_APPLICATION_KEY=${AMBIENT_WEATHER_APPLICATION_KEY:?ambient weather application key required} + - DATABASE_URL=${DATABASE_URL:?database name required} + + timescale: + image: timescale/timescaledb-ha:pg16 + restart: unless-stopped + environment: + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?database password required} + volumes: + - timescale-storage:/home/postgres/pgdata/data + - ./docker/postgres/migrations:/docker-entrypoint-initdb.d + - ./docker/postgres/postgresql.conf/:/etc/postgresql/postgresql.conf + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -d $${POSTGRES_DATABASE} -U $${POSTGRES_USER}", + ] + start_period: 20s + interval: 30s + retries: 5 + timeout: 5s + deploy: + resources: + limits: + cpus: "4" + memory: 2G + ports: + - 8432:8432 + +volumes: + timescale-storage: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..06de606 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,30 @@ +services: + timescale: + image: timescale/timescaledb-ha:pg16 + restart: unless-stopped + environment: + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?database password required} + volumes: + - timescale-storage:/home/postgres/pgdata/data + - ./docker/postgres/migrations:/docker-entrypoint-initdb.d + - ./docker/postgres/postgresql.conf/:/etc/postgresql/postgresql.conf + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -d $${POSTGRES_DATABASE} -U $${POSTGRES_USER}", + ] + start_period: 20s + interval: 30s + retries: 5 + timeout: 5s + deploy: + resources: + limits: + cpus: "4" + memory: 2G + ports: + - 5432:5432 + +volumes: + timescale-storage: diff --git a/docker/postgres/migrations/init.sql b/docker/postgres/migrations/init.sql new file mode 100644 index 0000000..a817cde --- /dev/null +++ b/docker/postgres/migrations/init.sql @@ -0,0 +1 @@ +create database ambient_weather; diff --git a/docker/postgres/postgresql.conf b/docker/postgres/postgresql.conf new file mode 100644 index 0000000..bc87f8c --- /dev/null +++ b/docker/postgres/postgresql.conf @@ -0,0 +1,25 @@ +# DB Version: 16 +# OS Type: linux +# DB Type: dw +# Total Memory (RAM): 2 GB +# CPUs num: 4 +# Connections num: 500 +# Data Storage: hdd + +max_connections = 500 +shared_buffers = 512MB +effective_cache_size = 1536MB +maintenance_work_mem = 256MB +checkpoint_completion_target = 0.9 +wal_buffers = 16MB +default_statistics_target = 500 +random_page_cost = 4 +effective_io_concurrency = 2 +work_mem = 262kB +huge_pages = off +min_wal_size = 4GB +max_wal_size = 16GB +max_worker_processes = 4 +max_parallel_workers_per_gather = 2 +max_parallel_workers = 4 +max_parallel_maintenance_workers = 2 diff --git a/migrations/20240429155325_init.down.sql b/migrations/20240429155325_init.down.sql new file mode 100644 index 0000000..d2f607c --- /dev/null +++ b/migrations/20240429155325_init.down.sql @@ -0,0 +1 @@ +-- Add down migration script here diff --git a/migrations/20240429155325_init.up.sql b/migrations/20240429155325_init.up.sql new file mode 100644 index 0000000..16a1647 --- /dev/null +++ b/migrations/20240429155325_init.up.sql @@ -0,0 +1,32 @@ +create extension if not exists timescaledb; +create table if not exists sensor_data ( + time timestamptz not null default now(), + mac_address text not null, + baromabsin double precision not null, + baromrelin double precision not null, + battout double precision not null, + dailyrainin double precision not null, + dew_point double precision not null, + dew_pointin double precision not null, + eventrainin double precision not null, + feels_like double precision not null, + feels_likein double precision not null, + hourlyrainin double precision not null, + humidity double precision not null, + humidityin double precision not null, + maxdailygust double precision not null, + monthlyrainin double precision not null, + solarradiation double precision not null, + tempf double precision not null, + tempinf double precision not null, + totalrainin double precision not null, + uv double precision not null, + weeklyrainin double precision not null, + winddir double precision not null, + windgustmph double precision not null, + windspeedmph double precision not null, + yearlyrainin double precision not null +); +create unique index if not exists idx_sensor_data_time_mac_address_idx on sensor_data (mac_address, time); +select create_hypertable('sensor_data', by_range('time')); +select add_dimension('sensor_data', by_hash('mac_address', 4)); diff --git a/src/ambient_api/device.rs b/src/ambient_api/device.rs index 672bba9..6ef5c74 100644 --- a/src/ambient_api/device.rs +++ b/src/ambient_api/device.rs @@ -47,25 +47,25 @@ pub struct Geo { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct LastData { - pub dateutc: i64, + pub dateutc: f64, pub tempf: f64, - pub humidity: i64, - pub windspeedmph: i64, - pub windgustmph: i64, + pub humidity: f64, + pub windspeedmph: f64, + pub windgustmph: f64, pub maxdailygust: f64, - pub winddir: i64, - pub uv: i64, + pub winddir: f64, + pub uv: f64, pub solarradiation: f64, - pub hourlyrainin: i64, - pub eventrainin: i64, - pub dailyrainin: i64, - pub weeklyrainin: i64, - pub monthlyrainin: i64, - pub yearlyrainin: i64, - pub totalrainin: i64, - pub battout: i64, + pub hourlyrainin: f64, + pub eventrainin: f64, + pub dailyrainin: f64, + pub weeklyrainin: f64, + pub monthlyrainin: f64, + pub yearlyrainin: f64, + pub totalrainin: f64, + pub battout: f64, pub tempinf: f64, - pub humidityin: i64, + pub humidityin: f64, pub baromrelin: f64, pub baromabsin: f64, pub feels_like: f64, diff --git a/src/ambient_api/device_data.rs b/src/ambient_api/device_data.rs index c55ea0b..39c102c 100644 --- a/src/ambient_api/device_data.rs +++ b/src/ambient_api/device_data.rs @@ -26,7 +26,7 @@ pub struct ApiDeviceData { pub uv: i64, pub weeklyrainin: i64, pub winddir: i64, - pub windgustmph: i64, + pub windgustmph: f64, pub windspeedmph: i64, pub yearlyrainin: i64, } diff --git a/src/ambient_api/ws.rs b/src/ambient_api/ws.rs index 09513bb..326e9ac 100644 --- a/src/ambient_api/ws.rs +++ b/src/ambient_api/ws.rs @@ -29,37 +29,37 @@ pub struct WsDeviceData { pub baromrelin: f64, /// Outdoor battery status: 1 = OK, 0 = Low. For Meteobridge users: 1 = Low, /// 0 = OK. - pub battout: i64, + pub battout: f64, /// Daily rainfall in inches. - pub dailyrainin: i64, + pub dailyrainin: f64, /// Human-readable date (converted on server from `dateutc`). pub date: String, /// Date and time in milliseconds from 01-01-1970, rounded down to the /// nearest minute. - pub dateutc: i64, + pub dateutc: f64, /// Dew point in degrees Fahrenheit (calculated on server). pub dew_point: f64, /// Indoor dew point in degrees Fahrenheit. pub dew_pointin: f64, /// Rainfall for the current event in inches. - pub eventrainin: i64, + pub eventrainin: f64, /// Feels like temperature in degrees Fahrenheit (calculated on server, /// applies formula based on temperature). pub feels_like: f64, /// Indoor feels like temperature in degrees Fahrenheit. pub feels_likein: f64, /// Hourly rainfall in inches. - pub hourlyrainin: i64, + pub hourlyrainin: f64, /// Outdoor humidity percentage. - pub humidity: i64, + pub humidity: f64, /// Indoor humidity percentage. - pub humidityin: i64, + pub humidityin: f64, /// MAC address of the device. pub mac_address: String, /// Maximum wind gust speed in the last day in miles per hour. pub maxdailygust: f64, /// Monthly rainfall in inches. - pub monthlyrainin: i64, + pub monthlyrainin: f64, /// Solar radiation in watts per square meter. pub solarradiation: f64, /// Outdoor temperature in degrees Fahrenheit. @@ -67,17 +67,17 @@ pub struct WsDeviceData { /// Indoor temperature in degrees Fahrenheit. pub tempinf: f64, /// Total rainfall in inches since the last factory reset. - pub totalrainin: i64, + pub totalrainin: f64, /// Ultra-Violet radiation index (integer, except on model WS-8478). - pub uv: i64, + pub uv: f64, /// Weekly rainfall in inches. - pub weeklyrainin: i64, + pub weeklyrainin: f64, /// Instantaneous wind direction in degrees (0-360). - pub winddir: i64, + pub winddir: f64, /// Maximum wind speed in the last 10 minutes in miles per hour. - pub windgustmph: i64, + pub windgustmph: f64, /// Instantaneous wind speed in miles per hour. - pub windspeedmph: i64, + pub windspeedmph: f64, /// Yearly rainfall in inches. - pub yearlyrainin: i64, + pub yearlyrainin: f64, } diff --git a/src/main.rs b/src/main.rs index 8f05da0..3386ea5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,9 @@ use std::env; -use std::sync::Arc; use ambient_api::ws::{WsDeviceData, WsSubscribedPayload}; use error::BoxDynError; use futures_util::FutureExt; -use rust_socketio::asynchronous::ClientBuilder; +use rust_socketio::asynchronous::{Client as SocketIoClient, ClientBuilder}; use rust_socketio::{Payload, TransportType}; use serde_json::json; use tracing_subscriber::prelude::*; @@ -12,6 +11,27 @@ use tracing_subscriber::{fmt, EnvFilter, Registry}; pub mod ambient_api; pub mod error; +pub mod timescale; + +async fn handle_subscribed(data: Payload, _client: SocketIoClient) { + let payload = match data { + Payload::Text(value) => { + let first = &value[0]; + serde_json::from_value::(first.clone()) // ouch + } + Payload::Binary(bytes) => serde_json::from_slice::(&bytes), + _ => panic!("unexpected payload"), + } + .expect("failed to parse payload"); + + let macs = payload + .devices + .iter() + .map(|d| d.mac_address.as_ref()) + .collect::>(); + + tracing::info!("successfully subscribed to {}", macs.join(", ")); +} #[tokio::main] async fn main() -> Result<(), BoxDynError> { @@ -20,48 +40,22 @@ async fn main() -> Result<(), BoxDynError> { .with(fmt::layer()) .init(); - let api_key = Arc::new(env::var("AMBIENT_WEATHER_API_KEY").expect("AMBIENT_WEATHER_API_KEY must be set")); let application_key = env::var("AMBIENT_WEATHER_APPLICATION_KEY").expect("AMBIENT_WEATHER_APPLICATION_KEY must be set"); + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - // let client = AmbientApiClient::new(api_key.clone(), application_key.clone()); - // let devices = client.get_devices().await?; - // tracing::info!("devices: {devices:#?}"); - // tokio::time::sleep(Duration::from_millis(1_500)).await; - - // let mac_address = devices[0].mac_address.clone(); - // let data = client.get_device_data(&mac_address).await?; - // tracing::info!("data: {data:#?}"); + let db = timescale::create_db(&database_url).await; let socket = ClientBuilder::new(format!( "https://rt2.ambientweather.net/?api=1&applicationKey={application_key}" )) .transport_type(TransportType::Websocket) - .on("subscribed", |data, _| { - async move { - let payload = match data { - Payload::Text(value) => { - let first = &value[0]; - serde_json::from_value::(first.clone()) // ouch - } - Payload::Binary(bytes) => serde_json::from_slice::(&bytes), - _ => panic!("unexpected payload"), - } - .expect("failed to parse payload"); - - let macs = payload - .devices - .iter() - .map(|d| d.mac_address.as_ref()) - .collect::>(); - - tracing::info!("successfully subscribed to {}", macs.join(", ")); - } - .boxed() + .on("subscribed", |data, socket| { + async move { handle_subscribed(data, socket).await }.boxed() }) - .on("data", |data, _| { + .on("data", move |data, _| { + let db_clone = db.clone(); async move { - tracing::debug!("received data: {data:#?}"); let payload = match data { Payload::Text(value) => { let first = &value[0]; @@ -72,7 +66,10 @@ async fn main() -> Result<(), BoxDynError> { } .expect("failed to parse payload"); - tracing::info!("received data from {}", payload.mac_address); + timescale::insert_ws_data(&db_clone, &payload) + .await + .expect("failed to insert data"); + tracing::info!("received and inserted data from {}", payload.mac_address); } .boxed() }) @@ -84,9 +81,9 @@ async fn main() -> Result<(), BoxDynError> { }) .on("open", { |_, client| { - async move { - let api_key = env::var("AMBIENT_WEATHER_API_KEY").expect("AMBIENT_WEATHER_API_KEY must be set"); + let api_key = env::var("AMBIENT_WEATHER_API_KEY").expect("AMBIENT_WEATHER_API_KEY must be set"); + async move { let subscribe_message = json!({ "apiKeys": [api_key], }); diff --git a/src/timescale.rs b/src/timescale.rs new file mode 100644 index 0000000..891e3a4 --- /dev/null +++ b/src/timescale.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; +use std::time::Duration; + +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +use sqlx::{ConnectOptions, PgPool}; + +use crate::ambient_api::ws::WsDeviceData; +use crate::error::BoxDynError; + +pub async fn create_db(database_url: &String) -> Arc { + let pool = PgPoolOptions::new() + .max_connections(100) + .min_connections(5) + .acquire_timeout(Duration::from_secs(8)) + .idle_timeout(Duration::from_secs(8)) + .max_lifetime(Duration::from_secs(60)); + + tracing::info!("connecting to database {}", database_url); + + let mut opts: PgConnectOptions = database_url.parse().expect("failed to parse database url"); + opts = opts.log_statements(tracing::log::LevelFilter::Trace); + + let db = pool.connect_with(opts).await.expect("Failed to connect to database"); + + Arc::new(db) +} + +pub async fn insert_ws_data(db: &PgPool, payload: &WsDeviceData) -> Result<(), BoxDynError> { + sqlx::query!(r#" + insert into sensor_data( + mac_address, + baromabsin, + baromrelin, + battout, + dailyrainin, + dew_point, + dew_pointin, + eventrainin, + feels_like, + feels_likein, + hourlyrainin, + humidity, + humidityin, + maxdailygust, + monthlyrainin, + solarradiation, + tempf, + tempinf, + totalrainin, + uv, + weeklyrainin, + winddir, + windgustmph, + windspeedmph, + yearlyrainin + ) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25) + returning *;"#, + payload.mac_address, + payload.baromabsin, + payload.baromrelin, + payload.battout, + payload.dailyrainin, + payload.dew_point, + payload.dew_pointin, + payload.eventrainin, + payload.feels_like, + payload.feels_likein, + payload.hourlyrainin, + payload.humidity, + payload.humidityin, + payload.maxdailygust, + payload.monthlyrainin, + payload.solarradiation, + payload.tempf, + payload.tempinf, + payload.totalrainin, + payload.uv, + payload.weeklyrainin, + payload.winddir, + payload.windgustmph, + payload.windspeedmph, + payload.yearlyrainin + ).fetch_one(db).await.expect("failed to insert data"); + + Ok(()) +}