From 2ec22becd946287da18bae8d2df4410b955a103e Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 20 Feb 2025 14:30:44 -0500 Subject: [PATCH] pedantic scylla rust 2024 --- scylla-server/Cargo.toml | 2 +- .../src/controllers/car_command_controller.rs | 10 ++-- .../src/controllers/data_controller.rs | 8 +-- .../src/controllers/data_type_controller.rs | 8 +-- .../controllers/file_insertion_controller.rs | 21 +++++--- .../src/controllers/run_controller.rs | 22 +++++--- scylla-server/src/db_handler.rs | 33 +++++++----- scylla-server/src/error.rs | 4 +- scylla-server/src/lib.rs | 3 ++ scylla-server/src/main.rs | 25 +++++---- scylla-server/src/metadata_structs.rs | 7 +-- scylla-server/src/mqtt_processor.rs | 54 ++++++++++--------- scylla-server/src/services/data_service.rs | 17 ++++-- .../src/services/data_type_service.rs | 10 ++-- scylla-server/src/services/mod.rs | 2 + scylla-server/src/services/run_service.rs | 30 ++++++++--- scylla-server/src/socket_handler.rs | 35 +++++++----- scylla-server/tests/data_service_test.rs | 2 +- scylla-server/tests/data_type_service_test.rs | 10 ++-- scylla-server/tests/test_utils.rs | 2 +- 20 files changed, 190 insertions(+), 115 deletions(-) diff --git a/scylla-server/Cargo.toml b/scylla-server/Cargo.toml index f2ac549a..4b228378 100644 --- a/scylla-server/Cargo.toml +++ b/scylla-server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scylla-server" version = "0.0.1" -edition = "2021" +edition = "2024" default-run = "scylla-server" [dependencies] diff --git a/scylla-server/src/controllers/car_command_controller.rs b/scylla-server/src/controllers/car_command_controller.rs index dfb83039..24f5dd19 100644 --- a/scylla-server/src/controllers/car_command_controller.rs +++ b/scylla-server/src/controllers/car_command_controller.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use axum::{extract::Path, Extension}; +use axum::{Extension, extract::Path}; use axum_extra::extract::Query; use protobuf::Message; use rumqttc::v5::AsyncClient; @@ -19,10 +19,12 @@ pub struct ConfigRequest { /// Sends a configuration to the car over MQTT /// * `key` - The key of the configuration, as defined in the cangen YAML -/// * `data_query` - The data of the configuration, a URI query list of data=. If empty or too short, filled with cangen YAMl defaults +/// * `data_query` - The data of the configuration, a URI query list of data=. If empty or too short, filled with cangen YAML defaults /// * `client` - The MQTT client to be used to send the data /// -/// More info: This follows the specification of sending a command_data object over siren to topic CALYPSO_BIDIR_CMD_PREFIX/ +/// More info: This follows the specification of sending a `command_data` object over siren to topic `CALYPSO_BIDIR_CMD_PREFIX`/ +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn send_config_command( Path(key): Path, Query(data_query): Query, @@ -48,7 +50,7 @@ pub async fn send_config_command( // publish the message to the topic that calypso's encoder is susbcribed to if let Err(err) = client .publish( - format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key), + format!("{CALYPSO_BIDIR_CMD_PREFIX}{key}"), rumqttc::v5::mqttbytes::QoS::ExactlyOnce, false, bytes, diff --git a/scylla-server/src/controllers/data_controller.rs b/scylla-server/src/controllers/data_controller.rs index 837fcf1a..49073c31 100644 --- a/scylla-server/src/controllers/data_controller.rs +++ b/scylla-server/src/controllers/data_controller.rs @@ -1,14 +1,16 @@ use axum::{ - extract::{Path, State}, Json, + extract::{Path, State}, }; use crate::{ - error::ScyllaError, services::data_service, transformers::data_transformer::PublicData, - PoolHandle, + PoolHandle, error::ScyllaError, services::data_service, + transformers::data_transformer::PublicData, }; /// Get all of the data points of a certain data type name and run ID +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn get_data( State(pool): State, Path((data_type_name, run_id)): Path<(String, i32)>, diff --git a/scylla-server/src/controllers/data_type_controller.rs b/scylla-server/src/controllers/data_type_controller.rs index ce958131..60294abd 100644 --- a/scylla-server/src/controllers/data_type_controller.rs +++ b/scylla-server/src/controllers/data_type_controller.rs @@ -1,11 +1,13 @@ -use axum::{extract::State, Json}; +use axum::{Json, extract::State}; use crate::{ - error::ScyllaError, services::data_type_service, - transformers::data_type_transformer::PublicDataType, PoolHandle, + PoolHandle, error::ScyllaError, services::data_type_service, + transformers::data_type_transformer::PublicDataType, }; /// Get a list of data types +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn get_all_data_types( State(pool): State, ) -> Result>, ScyllaError> { diff --git a/scylla-server/src/controllers/file_insertion_controller.rs b/scylla-server/src/controllers/file_insertion_controller.rs index c7f78b23..6cb05f03 100644 --- a/scylla-server/src/controllers/file_insertion_controller.rs +++ b/scylla-server/src/controllers/file_insertion_controller.rs @@ -1,6 +1,6 @@ use axum::{ - extract::{Multipart, State}, Extension, + extract::{Multipart, State}, }; use axum_macros::debug_handler; use chrono::DateTime; @@ -10,12 +10,18 @@ use tokio::sync::mpsc; use tracing::{debug, info, trace, warn}; use crate::{ - error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle, + ClientData, PoolHandle, error::ScyllaError, proto::playback_data, services::run_service, }; /// Inserts a file using http multipart /// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded +/// # Errors +/// Returns a scyllaError if the DB fails +/// # Panics +/// Panics if impossible time generated // super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails +#[allow(clippy::cast_possible_wrap)] +#[allow(clippy::cast_sign_loss)] #[debug_handler] pub async fn insert_file( State(pool): State, @@ -62,19 +68,18 @@ pub async fn insert_file( match stream.read_message::() { Ok(f) => { trace!("Decoded file msg: {}", f); - let f = match run_rng.get(&f.time_us) { - Some(a) => ClientData { + let f = if let Some(a) = run_rng.get(&f.time_us) { + ClientData { run_id: *a, name: f.topic.clone(), unit: f.unit, values: f.values, timestamp: DateTime::from_timestamp_micros(f.time_us as i64) .unwrap(), - }, - None => { - count_bad_run += 1; - continue; } + } else { + count_bad_run += 1; + continue; }; insertable_data.push(f); } diff --git a/scylla-server/src/controllers/run_controller.rs b/scylla-server/src/controllers/run_controller.rs index d9837e70..046ab752 100644 --- a/scylla-server/src/controllers/run_controller.rs +++ b/scylla-server/src/controllers/run_controller.rs @@ -1,15 +1,17 @@ use std::sync::atomic::Ordering; use axum::{ - extract::{Path, State}, Json, + extract::{Path, State}, }; use crate::{ - error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, PoolHandle, + PoolHandle, error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, }; /// get a list of runs +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn get_all_runs( State(pool): State, ) -> Result>, ScyllaError> { @@ -22,6 +24,8 @@ pub async fn get_all_runs( } /// get the latest run +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn get_latest_run( State(pool): State, ) -> Result, ScyllaError> { @@ -34,6 +38,8 @@ pub async fn get_latest_run( } /// get a run given its ID +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn get_run_by_id( State(pool): State, Path(run_id): Path, @@ -41,11 +47,9 @@ pub async fn get_run_by_id( let mut db = pool.get().await?; let run_data = run_service::get_run_by_id(&mut db, run_id).await?; - if run_data.is_none() { + let Some(run_data_safe) = run_data else { return Err(ScyllaError::EmptyResult); - } - - let run_data_safe = run_data.unwrap(); + }; let transformed_run_data = PublicRun::from(run_data_safe); @@ -54,6 +58,8 @@ pub async fn get_run_by_id( /// create a new run with an auto-incremented ID /// note the new run must be updated so the channel passed in notifies the data processor to use the new run +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn new_run(State(pool): State) -> Result, ScyllaError> { let mut db = pool.get().await?; let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?; @@ -68,6 +74,8 @@ pub async fn new_run(State(pool): State) -> Result, } /// creates a new run with all associated data (driver, location, notes) +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn new_run_with_data( State(pool): State, Path((driver, location, run_notes)): Path<(String, String, String)>, @@ -92,6 +100,8 @@ pub async fn new_run_with_data( } /// updates a run's notes with a given run id +/// # Errors +/// Returns a scyllaError if the DB fails pub async fn update_run_with_data( State(pool): State, Path((run_id, driver, location, run_notes)): Path<(i32, String, String, String)>, diff --git a/scylla-server/src/db_handler.rs b/scylla-server/src/db_handler.rs index 91f4a9f7..df6dddbb 100644 --- a/scylla-server/src/db_handler.rs +++ b/scylla-server/src/db_handler.rs @@ -4,7 +4,7 @@ use tokio::sync::{broadcast, mpsc}; use tokio::time::Duration; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, instrument, trace, warn, Level}; +use tracing::{Level, debug, info, instrument, trace, warn}; use crate::services::{data_service, data_type_service}; use crate::{ClientData, PoolHandle}; @@ -25,11 +25,12 @@ pub struct DbHandler { } /// Chunks a vec into roughly equal vectors all under size `max_chunk_size` -/// This precomputes vec capacity but does however call to_vec(), reallocating the slices -fn chunk_vec(input: Vec, max_chunk_size: usize) -> Vec> { - if max_chunk_size == 0 { - panic!("Maximum chunk size must be greater than zero"); - } +/// This precomputes vec capacity but does however call `to_vec()`, reallocating the slices +fn chunk_vec(input: &[T], max_chunk_size: usize) -> Vec> { + assert!( + max_chunk_size > 0, + "Maximum chunk size must be greater than zero" + ); let len = input.len(); if len == 0 { @@ -56,6 +57,7 @@ fn chunk_vec(input: Vec, max_chunk_size: usize) -> Vec> { impl DbHandler { /// Make a new db handler /// * `recv` - the broadcast reciver of which clientdata will be sent + #[must_use] pub fn new( receiver: broadcast::Receiver, pool: PoolHandle, @@ -80,7 +82,7 @@ impl DbHandler { ) { loop { tokio::select! { - _ = cancel_token.cancelled() => { + () = cancel_token.cancelled() => { let Ok(mut database) = pool.get().await else { warn!("Could not get connection for cleanup"); break; @@ -94,7 +96,7 @@ impl DbHandler { continue; } let chunk_size = final_msgs.len() / ((final_msgs.len() / 8190) + 1); - let chunks = chunk_vec(final_msgs, chunk_size); + let chunks = chunk_vec(&final_msgs, chunk_size); debug!("Batch uploading {} chunks in sequence", chunks.len()); for chunk in chunks { info!( @@ -116,7 +118,7 @@ impl DbHandler { } let msg_len = msgs.len(); let chunk_size = msg_len / ((msg_len / 8190) + 1); - let chunks = chunk_vec(msgs, chunk_size); + let chunks = chunk_vec(&msgs, chunk_size); info!("Batch uploading {} chunks in parrallel, {} messages.", chunks.len(), msg_len); for chunk in chunks { tokio::spawn(DbHandler::batch_upload(chunk, pool.clone())); @@ -138,7 +140,7 @@ impl DbHandler { ) { loop { tokio::select! { - _ = cancel_token.cancelled() => { + () = cancel_token.cancelled() => { warn!("Cancelling fake upload with {} batches left in queue!", batch_queue.len()); break; }, @@ -163,9 +165,16 @@ impl DbHandler { /// A loop which uses self and a sender channel to process data /// If the data is special, i.e. coordinates, driver, etc. it will store it in its special location of the db immediately - /// For all data points it will add the to the data_channel for batch uploading logic when a certain time has elapsed + /// For all data points it will add the to the `data_channel` for batch uploading logic when a certain time has elapsed /// Before this time the data is stored in an internal queue. /// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation + /// # Panics + /// Panics if the last data sent to the thread fails to be sent + #[allow( + clippy::cast_sign_loss, + clippy::cast_precision_loss, + clippy::cast_possible_truncation + )] pub async fn handling_loop( mut self, data_channel: mpsc::Sender>, @@ -176,7 +185,7 @@ impl DbHandler { let mut max_batch_size = 2usize; loop { tokio::select! { - _ = cancel_token.cancelled() => { + () = cancel_token.cancelled() => { debug!("Pushing final messages to queue"); data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown"); break; diff --git a/scylla-server/src/error.rs b/scylla-server/src/error.rs index e7bf009c..60d85f77 100644 --- a/scylla-server/src/error.rs +++ b/scylla-server/src/error.rs @@ -35,11 +35,11 @@ impl IntoResponse for ScyllaError { let (status, reason) = match self { ScyllaError::ConnError(error) => ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Could not connect to db: {}", error), + format!("Could not connect to db: {error}"), ), ScyllaError::DbError(error) => ( StatusCode::BAD_REQUEST, - format!("Misc query error: {}", error), + format!("Misc query error: {error}"), ), ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason), diff --git a/scylla-server/src/lib.rs b/scylla-server/src/lib.rs index c0f6ae49..ea6ce01f 100644 --- a/scylla-server/src/lib.rs +++ b/scylla-server/src/lib.rs @@ -1,3 +1,5 @@ +#![warn(clippy::pedantic)] + use chrono::serde::ts_milliseconds; pub mod controllers; @@ -15,6 +17,7 @@ pub mod models; #[allow(non_snake_case)] pub mod schema; +#[allow(clippy::pedantic)] pub mod proto; pub mod transformers; diff --git a/scylla-server/src/main.rs b/scylla-server/src/main.rs index 6b45160e..03f4c307 100755 --- a/scylla-server/src/main.rs +++ b/scylla-server/src/main.rs @@ -1,24 +1,29 @@ use std::{ - sync::{atomic::Ordering, Arc}, + sync::{Arc, atomic::Ordering}, time::Duration, }; use axum::{ + Extension, Router, extract::DefaultBodyLimit, http::Method, routing::{get, post}, - Extension, Router, }; use clap::Parser; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::{ - pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}, AsyncConnection, AsyncPgConnection, + pooled_connection::{AsyncDieselConnectionManager, bb8::Pool}, }; -use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; use dotenvy::dotenv; use rumqttc::v5::AsyncClient; use scylla_server::{ + ClientData, RUN_ID, db_handler, + mqtt_processor::{MqttProcessor, MqttProcessorOptions}, +}; +use scylla_server::{ + RateLimitMode, controllers::{ self, car_command_controller::{self}, @@ -26,14 +31,8 @@ use scylla_server::{ }, services::run_service::{self}, socket_handler::{socket_handler, socket_handler_with_metadata}, - RateLimitMode, -}; -use scylla_server::{ - db_handler, - mqtt_processor::{MqttProcessor, MqttProcessorOptions}, - ClientData, RUN_ID, }; -use socketioxide::{extract::SocketRef, SocketIo}; +use socketioxide::{SocketIo, extract::SocketRef}; use tokio::{ signal, sync::{broadcast, mpsc}, @@ -45,7 +44,7 @@ use tower_http::{ trace::TraceLayer, }; use tracing::{debug, info, level_filters::LevelFilter}; -use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; +use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan}; #[cfg(not(target_env = "msvc"))] use tikv_jemallocator::Jemalloc; @@ -252,7 +251,7 @@ async fn main() -> Result<(), Box> { let (recv, opts) = MqttProcessor::new( mqtt_send, token.clone(), - MqttProcessorOptions { + &MqttProcessorOptions { mqtt_path: cli.siren_host_url, initial_run: curr_run.runId, static_rate_limit_time: cli.static_rate_limit_value, diff --git a/scylla-server/src/metadata_structs.rs b/scylla-server/src/metadata_structs.rs index 05c69e12..2e8a2449 100644 --- a/scylla-server/src/metadata_structs.rs +++ b/scylla-server/src/metadata_structs.rs @@ -1,5 +1,5 @@ use ::serde::Serialize; -use chrono::{serde::ts_milliseconds, DateTime, TimeDelta, Utc}; +use chrono::{DateTime, TimeDelta, Utc, serde::ts_milliseconds}; pub const DATA_SOCKET_KEY: &str = "data"; @@ -47,11 +47,11 @@ pub const FAULT_SOCKET_KEY: &str = "faults"; pub const FAULT_MIN_REG_GAP: TimeDelta = TimeDelta::seconds(8); pub const FAULT_BINS: &[&str] = &["DTI/Fault/FaultCode"]; + +#[must_use] pub const fn map_dti_flt(index: usize) -> Option<&'static str> { match index { - 0 => None, 1 => Some("Overvoltage"), - 2 => None, 3 => Some("DRV"), 4 => Some("ABS_Overcurrent"), 5 => Some("CTLR_Overtemp"), @@ -60,6 +60,7 @@ pub const fn map_dti_flt(index: usize) -> Option<&'static str> { 8 => Some("Sensor_general"), 9 => Some("CAN_command"), 0x0A => Some("Analog_input"), + // 0 and 2 are not an actual alerting fault _ => None, } } diff --git a/scylla-server/src/mqtt_processor.rs b/scylla-server/src/mqtt_processor.rs index d8158a05..da18bdfa 100644 --- a/scylla-server/src/mqtt_processor.rs +++ b/scylla-server/src/mqtt_processor.rs @@ -1,5 +1,5 @@ use std::{ - sync::{atomic::Ordering, Arc}, + sync::{Arc, atomic::Ordering}, time::{Duration, SystemTime}, }; @@ -7,16 +7,16 @@ use chrono::TimeDelta; use protobuf::Message; use ringbuffer::RingBuffer; use rumqttc::v5::{ - mqttbytes::v5::{Packet, Publish}, AsyncClient, Event, EventLoop, MqttOptions, + mqttbytes::v5::{Packet, Publish}, }; use rustc_hash::FxHashMap; use tokio::{sync::broadcast, time::Instant}; use tokio_util::sync::CancellationToken; -use tracing::{debug, instrument, trace, warn, Level}; +use tracing::{Level, debug, instrument, trace, warn}; use crate::{ - controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, proto::serverdata, RateLimitMode, + RateLimitMode, controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, proto::serverdata, }; use super::ClientData; @@ -56,11 +56,14 @@ impl MqttProcessor { /// * `channel` - The mpsc channel to send the database data to /// * `cancel_token` - The token which indicates cancellation of the task /// * `opts` - The mqtt processor options to use - /// Returns the instance and options to create a client, which is then used in the process_mqtt loop + /// Returns the instance and options to create a client, which is then used in the `process_mqtt` loop + /// # Panics + /// Panics if unexpected time is fetched, or the siren url format is incorrect + #[must_use] pub fn new( channel: broadcast::Sender, cancel_token: CancellationToken, - opts: MqttProcessorOptions, + opts: &MqttProcessorOptions, ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it let mut mqtt_opts = MqttOptions::new( @@ -99,8 +102,10 @@ impl MqttProcessor { } /// This handles the reception of mqtt messages, will not return - /// * `eventloop` - The eventloop returned by ::new to connect to. The loop isnt sync so this is the best that can be done + /// * `eventloop` - The eventloop returned by `::new` to connect to. The loop isnt sync so this is the best that can be done /// * `client` - The async mqttt v5 client to use for subscriptions + /// # Panics + /// Panics if it cannot subscribe to Siren messages pub async fn process_mqtt(mut self, client: Arc, mut eventloop: EventLoop) { // let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); @@ -114,7 +119,7 @@ impl MqttProcessor { loop { #[rustfmt::skip] // rust cannot format this macro for some reason tokio::select! { - _ = self.cancel_token.cancelled() => { + () = self.cancel_token.cancelled() => { debug!("Shutting down MQTT processor!"); break; }, @@ -122,12 +127,11 @@ impl MqttProcessor { Ok(Event::Incoming(Packet::Publish(msg))) => { trace!("Received mqtt message: {:?}", msg); // parse the message into the data and the node name it falls under - let msg = match self.parse_msg(msg) { - Some(msg) => msg, - None => continue + let Some(msg) = self.parse_msg(msg) else { + continue }; latency_ringbuffer.push(chrono::offset::Utc::now() - msg.timestamp); - self.send_db_msg(msg.clone()).await; + self.send_db_msg(msg.clone()); }, Err(msg) => trace!("Received mqtt error: {:?}", msg), _ => trace!("Received misc mqtt: {:?}", msg), @@ -158,6 +162,7 @@ impl MqttProcessor { /// * `msg` - The mqtt message to parse /// returns the ClientData, or the Err of something that can be debug printed #[instrument(skip(self), level = Level::TRACE)] + #[allow(clippy::cast_possible_wrap)] fn parse_msg(&mut self, msg: Publish) -> Option { let Ok(topic) = std::str::from_utf8(&msg.topic) else { warn!("Could not parse topic, topic: {:?}", msg.topic); @@ -178,10 +183,10 @@ impl MqttProcessor { if old.elapsed() < self.rate_limit_time { trace!("Static rate limit skipping message with topic {}", topic); return None; - } else { - // if the message is past the rate limit, continue with the parsing of it and mark the new time last received - self.rate_limiter.insert(topic.to_string(), Instant::now()); } + + // if the message is past the rate limit, continue with the parsing of it and mark the new time last received + self.rate_limiter.insert(topic.to_string(), Instant::now()); } else { // here is the first insertion of the topic (the first time we receive the topic in scylla's lifetime) self.rate_limiter.insert(topic.to_string(), Instant::now()); @@ -213,7 +218,7 @@ impl MqttProcessor { unix_time } else { // B - match match msg + if let Some(e) = match msg .properties .unwrap_or_default() .user_properties @@ -229,22 +234,21 @@ impl MqttProcessor { } None => None, } { - Some(e) => e, - None => { - // C - debug!("Could not extract time, using system time!"); - chrono::offset::Utc::now() - } + e + } else { + // C + debug!("Could not extract time, using system time!"); + chrono::offset::Utc::now() } }; // ts check for bad sources of time which may return 1970 // if both system time and packet timestamp are before year 2000, the message cannot be recorded let unix_clean = - if unix_time < chrono::DateTime::from_timestamp_millis(963014966000).unwrap() { + if unix_time < chrono::DateTime::from_timestamp_millis(963_014_966_000).unwrap() { debug!("Timestamp before year 2000: {}", unix_time.to_string()); let sys_time = chrono::offset::Utc::now(); - if sys_time < chrono::DateTime::from_timestamp_millis(963014966000).unwrap() { + if sys_time < chrono::DateTime::from_timestamp_millis(963_014_966_000).unwrap() { warn!("System has no good time, discarding message!"); return None; } @@ -264,7 +268,7 @@ impl MqttProcessor { /// Send a message to the channel, printing and IGNORING any error that may occur /// * `client_data` - The client data to send over the broadcast - async fn send_db_msg(&self, client_data: ClientData) { + fn send_db_msg(&self, client_data: ClientData) { if let Err(err) = self.channel.send(client_data) { warn!("Error sending through channel: {:?}", err); } diff --git a/scylla-server/src/services/data_service.rs b/scylla-server/src/services/data_service.rs index 35428e17..ee0bd485 100644 --- a/scylla-server/src/services/data_service.rs +++ b/scylla-server/src/services/data_service.rs @@ -1,7 +1,7 @@ use crate::{ + ClientData, Database, models::{Data, DataInsert}, schema::data::dsl::*, - ClientData, Database, }; use diesel::prelude::*; use diesel_async::RunQueryDsl; @@ -10,7 +10,9 @@ use diesel_async::RunQueryDsl; /// * `db` - The database connection to use /// * `data_type_name` - The data type name to filter the data by /// * `run_id` - The run id to filter the data -/// returns: A result containing the data or the error propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn get_data( db: &mut Database<'_>, data_type_name: String, @@ -23,11 +25,13 @@ pub async fn get_data( /// Adds a datapoint /// * `db` - The database connection to use -/// * `serverdata` - The protobuf message to parse, note the unit is ignored! +/// * `client_data` - The client data to put into the database /// * `unix_time` - The time im miliseconds since unix epoch of the message /// * `data_type_name` - The name of the data type, note this data type must already exist! /// * `rin_id` - The run id to assign the data point to, note this run must already exist! -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn add_data( db: &mut Database<'_>, client_data: ClientData, @@ -38,6 +42,11 @@ pub async fn add_data( .await } +/// Adds many datapoints +/// * `db` - The database connection to use +/// * `client_data` - The list of client datapoints to put into the database +/// # Errors +/// Gives a db error back pub async fn add_many( db: &mut Database<'_>, client_data: Vec, diff --git a/scylla-server/src/services/data_type_service.rs b/scylla-server/src/services/data_type_service.rs index 5df2850d..882d33d3 100644 --- a/scylla-server/src/services/data_type_service.rs +++ b/scylla-server/src/services/data_type_service.rs @@ -1,10 +1,12 @@ -use crate::{models::DataType, schema::data_type::dsl::*, Database}; +use crate::{Database, models::DataType, schema::data_type::dsl::*}; use diesel::prelude::*; use diesel_async::RunQueryDsl; /// Gets all datatypes /// * `d ` - The connection to the database -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn get_all_data_types( db: &mut Database<'_>, ) -> Result, diesel::result::Error> { @@ -16,7 +18,9 @@ pub async fn get_all_data_types( /// * `data_type_name` - The data type name to upsert /// * `unit` - The unit of the data /// * `node_name` - The name of the node linked to the data type, must already exist! -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn upsert_data_type( db: &mut Database<'_>, data_type_name: String, diff --git a/scylla-server/src/services/mod.rs b/scylla-server/src/services/mod.rs index 2de58ca6..3ab3a46b 100644 --- a/scylla-server/src/services/mod.rs +++ b/scylla-server/src/services/mod.rs @@ -1,3 +1,5 @@ +#![allow(clippy::wildcard_imports)] + pub mod data_service; pub mod data_type_service; pub mod run_service; diff --git a/scylla-server/src/services/run_service.rs b/scylla-server/src/services/run_service.rs index ca6f9eb0..1a29f355 100644 --- a/scylla-server/src/services/run_service.rs +++ b/scylla-server/src/services/run_service.rs @@ -1,18 +1,22 @@ -use crate::{models::Run, schema::run::dsl::*, Database}; +use crate::{Database, models::Run, schema::run::dsl::*}; use chrono::{DateTime, Utc}; use diesel::prelude::*; use diesel_async::RunQueryDsl; /// Gets all runs /// * `db` - The prisma client to make the call to -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn get_all_runs(db: &mut Database<'_>) -> Result, diesel::result::Error> { run.order(runId.asc()).get_results(db).await } /// Gets the latest run (highest run id) /// * `db` - The prisma client to make the call to -/// returns: The latest run or the QueryError propogated by the db +/// returns: The latest run +/// # Errors +/// Gives a db error back pub async fn get_latest_run(db: &mut Database<'_>) -> Result { run.order(runId.desc()).first::(db).await } @@ -20,7 +24,9 @@ pub async fn get_latest_run(db: &mut Database<'_>) -> Result, run_id: i32, @@ -31,7 +37,9 @@ pub async fn get_run_by_id( /// Creates a run /// * `db` - The prisma client to make the call to /// * `timestamp` - time when the run starts -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn create_run( db: &mut Database<'_>, timestamp: DateTime, @@ -46,7 +54,9 @@ pub async fn create_run( /// * `db` - The prisma client to make the call to /// * `timestamp` - time when the run starts /// * `run_id` - The id of the run to create, must not already be in use! -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn create_run_with_id( db: &mut Database<'_>, timestamp: DateTime, @@ -64,7 +74,9 @@ pub async fn create_run_with_id( /// * `driver` - The driver's name /// * `location` - The location of the runs /// * `run_notes` - The notes written for the run -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn create_run_with_data( db: &mut Database<'_>, timestamp: DateTime, @@ -89,7 +101,9 @@ pub async fn create_run_with_data( /// * `driver` - The driver's name /// * `location` - The location of the runs /// * `run_notes` - The updated run notes -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data +/// # Errors +/// Gives a db error back pub async fn update_run_data_with_run_id( db: &mut Database<'_>, run_id: i32, diff --git a/scylla-server/src/socket_handler.rs b/scylla-server/src/socket_handler.rs index 89a686e6..416f87a7 100644 --- a/scylla-server/src/socket_handler.rs +++ b/scylla-server/src/socket_handler.rs @@ -1,3 +1,7 @@ +#![allow(clippy::cast_sign_loss)] +#![allow(clippy::cast_precision_loss)] +#![allow(clippy::cast_possible_truncation)] + use chrono::{DateTime, Utc}; use regex::Regex; use ringbuffer::{AllocRingBuffer, RingBuffer}; @@ -8,11 +12,11 @@ use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; +use crate::ClientData; use crate::metadata_structs::{ - map_dti_flt, FaultData, Node, TimerData, DATA_SOCKET_KEY, FAULT_BINS, FAULT_MIN_REG_GAP, - FAULT_SOCKET_KEY, TIMERS_TOPICS, TIMER_SOCKET_KEY, + DATA_SOCKET_KEY, FAULT_BINS, FAULT_MIN_REG_GAP, FAULT_SOCKET_KEY, FaultData, Node, + TIMER_SOCKET_KEY, TIMERS_TOPICS, TimerData, map_dti_flt, }; -use crate::ClientData; pub async fn socket_handler( cancel_token: CancellationToken, @@ -23,7 +27,7 @@ pub async fn socket_handler( let mut upload_counter = 0u8; loop { tokio::select! { - _ = cancel_token.cancelled() => { + () = cancel_token.cancelled() => { debug!("Shutting down socket handler!"); break; }, @@ -34,6 +38,9 @@ pub async fn socket_handler( } } +/// Setup a socket handler with metadata +/// # Panics +/// Panics when regex is invalid on a given platform (unlikely) pub async fn socket_handler_with_metadata( cancel_token: CancellationToken, mut data_channel: broadcast::Receiver, @@ -51,7 +58,7 @@ pub async fn socket_handler_with_metadata( let mut timer_map: HashMap = HashMap::new(); for item in TIMERS_TOPICS { timer_map.insert( - item.to_string(), + (*item).to_string(), TimerData { topic: item, last_change: DateTime::UNIX_EPOCH, @@ -71,7 +78,7 @@ pub async fn socket_handler_with_metadata( loop { tokio::select! { - _ = cancel_token.cancelled() => { + () = cancel_token.cancelled() => { debug!("Shutting down socket handler!"); break; }, @@ -83,7 +90,7 @@ pub async fn socket_handler_with_metadata( &io, DATA_SOCKET_KEY, ).await; - handle_socket_msg(data, &fault_regex_mpu, &fault_regex_bms, &fault_regex_charger, &mut timer_map, &mut fault_ringbuffer); + handle_socket_msg(&data, &fault_regex_mpu, &fault_regex_bms, &fault_regex_charger, &mut timer_map, &mut fault_ringbuffer); } _ = recent_faults_interval.tick() => { send_socket_msg( @@ -92,7 +99,7 @@ pub async fn socket_handler_with_metadata( upload_ratio, &io, FAULT_SOCKET_KEY, - ).await + ).await; }, _ = timers_interval.tick() => { trace!("Sending Timers Intervals!"); @@ -106,7 +113,7 @@ pub async fn socket_handler_with_metadata( let sockets_cnt = io.sockets().len() as f32; let item = ClientData { name: "Argos/Viewers".to_string(), - unit: "".to_string(), + unit: String::new(), run_id: crate::RUN_ID.load(Ordering::Relaxed), timestamp: chrono::offset::Utc::now(), values: vec![sockets_cnt] @@ -125,7 +132,7 @@ pub async fn socket_handler_with_metadata( /// Handles parsing and creating metadata for a newly received socket message. fn handle_socket_msg( - data: ClientData, + data: &ClientData, fault_regex_mpu: &Regex, fault_regex_bms: &Regex, fault_regex_charger: &Regex, @@ -137,7 +144,7 @@ fn handle_socket_msg( if let Some(time) = timer_map.get_mut(&data.name) { trace!("Triggering timer: {}", data.name); let new_val = *data.values.first().unwrap_or(&-1f32); - if time.last_value != new_val { + if (time.last_value - new_val).abs() > 0.0001 { time.last_value = new_val; time.last_change = Utc::now(); } @@ -216,16 +223,16 @@ async fn send_socket_msg( ) .await { - Ok(_) => (), + Ok(()) => (), Err(err) => match err { socketioxide::BroadcastError::Socket(e) => { trace!("Socket: Transmit error: {:?}", e); } socketioxide::BroadcastError::Serialize(_) => { - warn!("Socket: Serialize error: {}", err) + warn!("Socket: Serialize error: {}", err); } socketioxide::BroadcastError::Adapter(_) => { - warn!("Socket: Adapter error: {}", err) + warn!("Socket: Adapter error: {}", err); } }, } diff --git a/scylla-server/tests/data_service_test.rs b/scylla-server/tests/data_service_test.rs index 075c6e74..71978899 100644 --- a/scylla-server/tests/data_service_test.rs +++ b/scylla-server/tests/data_service_test.rs @@ -2,10 +2,10 @@ mod test_utils; use scylla_server::{ + ClientData, models::Data, services::{data_service, data_type_service, run_service}, transformers::data_transformer::PublicData, - ClientData, }; use test_utils::cleanup_and_prepare; diff --git a/scylla-server/tests/data_type_service_test.rs b/scylla-server/tests/data_type_service_test.rs index 9ce49c4a..1f4d9955 100644 --- a/scylla-server/tests/data_type_service_test.rs +++ b/scylla-server/tests/data_type_service_test.rs @@ -2,8 +2,8 @@ mod test_utils; use diesel::{ - query_dsl::methods::{FilterDsl, SelectDsl}, ExpressionMethods, SelectableHelper, + query_dsl::methods::{FilterDsl, SelectDsl}, }; use diesel_async::RunQueryDsl; use scylla_server::{ @@ -18,9 +18,11 @@ async fn test_get_all_datatypes() -> Result<(), diesel::result::Error> { let mut db = pool.get().await.unwrap(); // ensure datatypes is empty - assert!(data_type_service::get_all_data_types(&mut db) - .await? - .is_empty()); + assert!( + data_type_service::get_all_data_types(&mut db) + .await? + .is_empty() + ); Ok(()) } diff --git a/scylla-server/tests/test_utils.rs b/scylla-server/tests/test_utils.rs index 767003a1..54621d94 100644 --- a/scylla-server/tests/test_utils.rs +++ b/scylla-server/tests/test_utils.rs @@ -1,8 +1,8 @@ use std::time::Duration; use diesel_async::{ - pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}, AsyncPgConnection, RunQueryDsl, + pooled_connection::{AsyncDieselConnectionManager, bb8::Pool}, }; use dotenvy::dotenv; use scylla_server::schema::{data, data_type, run};