diff --git a/cpp/csp/adapters/CMakeLists.txt b/cpp/csp/adapters/CMakeLists.txt index 41a929dcf..67dc0025a 100644 --- a/cpp/csp/adapters/CMakeLists.txt +++ b/cpp/csp/adapters/CMakeLists.txt @@ -11,4 +11,5 @@ if(CSP_BUILD_WS_CLIENT_ADAPTER) add_subdirectory(websocket) endif() +add_subdirectory(arrow) add_subdirectory(utils) diff --git a/cpp/csp/adapters/arrow/ArrowFieldReader.cpp b/cpp/csp/adapters/arrow/ArrowFieldReader.cpp new file mode 100644 index 000000000..f8c16380c --- /dev/null +++ b/cpp/csp/adapters/arrow/ArrowFieldReader.cpp @@ -0,0 +1,494 @@ +// Concrete FieldReader implementations for all scalar Arrow types. +// +// Each reader extends FieldReader directly and implements doReadNext(). +// NestedStructReader binds children to child arrays and uses readNext()/skipNext(). + +#include +#include +#include + +#include +#include +#include + +namespace csp::adapters::arrow +{ + +namespace +{ + +// Helper: compute nanosecond multiplier for a given arrow::TimeUnit +int64_t timeUnitMultiplier( ::arrow::TimeUnit::type unit ) +{ + switch( unit ) + { + case ::arrow::TimeUnit::SECOND: return csp::NANOS_PER_SECOND; + case ::arrow::TimeUnit::MILLI: return csp::NANOS_PER_MILLISECOND; + case ::arrow::TimeUnit::MICRO: return csp::NANOS_PER_MICROSECOND; + case ::arrow::TimeUnit::NANO: return 1LL; + } + CSP_THROW( TypeError, "Unexpected arrow TimeUnit: " << static_cast( unit ) ); +} + +// --- Primitive numeric readers (int/uint/float/bool) --- + +template +class PrimitiveReader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + m_field -> setValue( s, static_cast( typed.Value( row ) ) ); + } +}; + +// --- Half-float reader (special: Value() returns uint16 bits, needs Float16 conversion) --- + +class HalfFloatReader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + m_field -> setValue( s, ::arrow::util::Float16::FromBits( typed.Value( row ) ).ToDouble() ); + } +}; + +// --- String readers --- + +template +class StringReader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + { + auto view = typed.GetView( row ); + m_field -> setValue( s, std::string( view.data(), view.size() ) ); + } + } +}; + +// --- Enum from string column --- + +template +class EnumFromStringReader final : public FieldReader +{ +public: + EnumFromStringReader( const std::string & columnName, const StructFieldPtr & field ) + : FieldReader( columnName, field ), + m_enumMeta( std::static_pointer_cast( field -> type() ) -> meta() ) + { + } + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + { + auto view = typed.GetView( row ); + // fromString requires null-terminated C string; view may not be null-terminated + m_tmpStr.assign( view.data(), view.size() ); + m_field -> setValue( s, m_enumMeta -> fromString( m_tmpStr.c_str() ) ); + } + } + +private: + std::shared_ptr m_enumMeta; + mutable std::string m_tmpStr; // reused buffer to avoid per-row allocation +}; + +// --- Binary / bytes readers --- + +template +class BytesReader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + { + auto view = typed.GetView( row ); + m_field -> setValue( s, std::string( view.data(), view.size() ) ); + } + } +}; + +// --- Timestamp -> DateTime --- + +class TimestampReader final : public FieldReader +{ +public: + TimestampReader( const std::string & columnName, const StructFieldPtr & field, int64_t multiplier ) + : FieldReader( columnName, field ), m_multiplier( multiplier ) + { + } + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + m_field -> setValue( s, DateTime::fromNanoseconds( typed.Value( row ) * m_multiplier ) ); + } + +private: + int64_t m_multiplier; +}; + +// --- Duration -> TimeDelta --- + +class DurationReader final : public FieldReader +{ +public: + DurationReader( const std::string & columnName, const StructFieldPtr & field, int64_t multiplier ) + : FieldReader( columnName, field ), m_multiplier( multiplier ) + { + } + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + m_field -> setValue( s, TimeDelta::fromNanoseconds( typed.Value( row ) * m_multiplier ) ); + } + +private: + int64_t m_multiplier; +}; + +// --- Date32 -> Date --- + +class Date32Reader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + { + int64_t nanos = static_cast( typed.Value( row ) ) * csp::NANOS_PER_DAY; + m_field -> setValue( s, DateTime::fromNanoseconds( nanos ).date() ); + } + } +}; + +// --- Date64 -> Date --- + +class Date64Reader final : public FieldReader +{ +public: + using FieldReader::FieldReader; + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + { + int64_t nanos = typed.Value( row ) * csp::NANOS_PER_MILLISECOND; + m_field -> setValue( s, DateTime::fromNanoseconds( nanos ).date() ); + } + } +}; + +// --- Time32 -> Time --- + +class Time32Reader final : public FieldReader +{ +public: + Time32Reader( const std::string & columnName, const StructFieldPtr & field, int64_t multiplier ) + : FieldReader( columnName, field ), m_multiplier( multiplier ) + { + } + +protected: + void doReadNext( int64_t row, Struct * s ) override + { + auto & typed = static_cast( *m_column ); + if( typed.IsValid( row ) ) + m_field -> setValue