1+ using System . Data . Common ;
2+ using System . Diagnostics ;
3+ using Google . Cloud . Spanner . DataProvider . Benchmarks . tpcc . loader ;
4+
5+ namespace Google . Cloud . Spanner . DataProvider . Benchmarks . tpcc ;
6+
7+ public class BasicsRunner : AbstractRunner
8+ {
9+ private readonly Stats _stats ;
10+ private readonly DbConnection _connection ;
11+ private readonly Program . BenchmarkType _benchmarkType ;
12+ private readonly int _numWarehouses ;
13+ private readonly int _numDistrictsPerWarehouse ;
14+ private readonly int _numCustomersPerDistrict ;
15+ private readonly int _numItems ;
16+
17+ internal BasicsRunner (
18+ Stats stats ,
19+ DbConnection connection ,
20+ Program . BenchmarkType benchmarkType ,
21+ int numWarehouses ,
22+ int numDistrictsPerWarehouse = 10 ,
23+ int numCustomersPerDistrict = 3000 ,
24+ int numItems = 100_000 )
25+ {
26+ _stats = stats ;
27+ _connection = connection ;
28+ _benchmarkType = benchmarkType ;
29+ _numWarehouses = numWarehouses ;
30+ _numDistrictsPerWarehouse = numDistrictsPerWarehouse ;
31+ _numCustomersPerDistrict = numCustomersPerDistrict ;
32+ _numItems = numItems ;
33+ }
34+
35+ public override async Task RunAsync ( CancellationToken cancellationToken )
36+ {
37+ while ( ! cancellationToken . IsCancellationRequested )
38+ {
39+ await RunTransactionAsync ( cancellationToken ) ;
40+ var delay = Random . Shared . Next ( 10 , 100 ) ;
41+ await Task . Delay ( delay , cancellationToken ) ;
42+ }
43+ }
44+
45+ public override async Task RunTransactionAsync ( CancellationToken cancellationToken )
46+ {
47+ switch ( _benchmarkType )
48+ {
49+ case Program . BenchmarkType . PointQuery :
50+ await PointQueryAsync ( cancellationToken ) ;
51+ break ;
52+ case Program . BenchmarkType . Scalar :
53+ await ScalarAsync ( cancellationToken ) ;
54+ break ;
55+ case Program . BenchmarkType . LargeQuery :
56+ await LargeQueryAsync ( cancellationToken ) ;
57+ break ;
58+ case Program . BenchmarkType . PointDml :
59+ await PointDmlAsync ( cancellationToken ) ;
60+ break ;
61+ default :
62+ throw new NotSupportedException ( $ "Transaction type { _benchmarkType } is not supported.") ;
63+ }
64+ }
65+
66+ private async Task PointQueryAsync ( CancellationToken cancellationToken )
67+ {
68+ var watch = Stopwatch . StartNew ( ) ;
69+ await using var command = CreatePointReadCommand ( ) ;
70+ var numRows = 0 ;
71+ await using var reader = await command . ExecuteReaderAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
72+ while ( await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) )
73+ {
74+ for ( var col = 0 ; col < reader . FieldCount ; col ++ )
75+ {
76+ _ = reader . GetValue ( col ) ;
77+ }
78+ numRows ++ ;
79+ }
80+ if ( numRows != 1 )
81+ {
82+ throw new InvalidOperationException ( "Unexpected number of rows returned: " + numRows ) ;
83+ }
84+ watch . Stop ( ) ;
85+ _stats . RegisterOperationLatency ( Program . BenchmarkType . PointQuery , watch . Elapsed . TotalMilliseconds ) ;
86+ }
87+
88+ private async Task ScalarAsync ( CancellationToken cancellationToken )
89+ {
90+ var watch = Stopwatch . StartNew ( ) ;
91+ await using var command = CreatePointReadCommand ( ) ;
92+ var item = await command . ExecuteScalarAsync ( cancellationToken ) ;
93+ if ( item == null )
94+ {
95+ throw new InvalidOperationException ( "No row returned" ) ;
96+ }
97+ watch . Stop ( ) ;
98+ _stats . RegisterOperationLatency ( Program . BenchmarkType . Scalar , watch . Elapsed . TotalMilliseconds ) ;
99+ }
100+
101+ private async Task PointDmlAsync ( CancellationToken cancellationToken )
102+ {
103+ var warehouseId = DataLoader . ReverseBitsUnsigned ( ( ulong ) Random . Shared . Next ( _numWarehouses ) ) ;
104+ var districtId = DataLoader . ReverseBitsUnsigned ( ( ulong ) Random . Shared . Next ( _numDistrictsPerWarehouse ) ) ;
105+ var customerId = DataLoader . ReverseBitsUnsigned ( ( ulong ) Random . Shared . Next ( _numCustomersPerDistrict ) ) ;
106+
107+ var watch = Stopwatch . StartNew ( ) ;
108+ await using var command = _connection . CreateCommand ( ) ;
109+ command . CommandText = "update customer set c_data=$1 where w_id=$2 and d_id=$3 and c_id=$4" ;
110+ AddParameter ( command , "p1" , DataLoader . RandomString ( 500 ) ) ;
111+ AddParameter ( command , "p2" , warehouseId ) ;
112+ AddParameter ( command , "p3" , districtId ) ;
113+ AddParameter ( command , "p4" , customerId ) ;
114+ var updated = await command . ExecuteNonQueryAsync ( cancellationToken ) ;
115+ if ( updated != 1 )
116+ {
117+ throw new InvalidOperationException ( "Unexpected affected rows: " + updated ) ;
118+ }
119+ watch . Stop ( ) ;
120+ _stats . RegisterOperationLatency ( Program . BenchmarkType . PointDml , watch . Elapsed . TotalMilliseconds ) ;
121+ }
122+
123+ private async Task LargeQueryAsync ( CancellationToken cancellationToken )
124+ {
125+ await using var command = _connection . CreateCommand ( ) ;
126+ command . CommandText = "select * from customer limit $1 offset $2" ;
127+ AddParameter ( command , "p1" , 100_000L ) ;
128+ AddParameter ( command , "p2" , Random . Shared . Next ( 1 , 1001 ) ) ;
129+ await using var reader = await command . ExecuteReaderAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
130+ // Fetch the first row outside the measurement to ensure a fair comparison between clients that delay the
131+ // query execution to the first read, and those that don't.
132+ await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
133+
134+ var watch = Stopwatch . StartNew ( ) ;
135+ while ( await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) )
136+ {
137+ for ( var col = 0 ; col < reader . FieldCount ; col ++ )
138+ {
139+ _ = reader . GetValue ( col ) ;
140+ }
141+ }
142+ watch . Stop ( ) ;
143+ _stats . RegisterOperationLatency ( Program . BenchmarkType . LargeQuery , watch . Elapsed . TotalMilliseconds ) ;
144+ }
145+
146+ private DbCommand CreatePointReadCommand ( )
147+ {
148+ var warehouseId = DataLoader . ReverseBitsUnsigned ( ( ulong ) Random . Shared . Next ( _numWarehouses ) ) ;
149+ var itemId = DataLoader . ReverseBitsUnsigned ( ( ulong ) Random . Shared . Next ( _numItems ) ) ;
150+
151+ var command = _connection . CreateCommand ( ) ;
152+ command . CommandText = "select * from stock where s_i_id=$1 and w_id=$2" ;
153+ AddParameter ( command , "p1" , itemId ) ;
154+ AddParameter ( command , "p2" , warehouseId ) ;
155+
156+ return command ;
157+ }
158+
159+ private void AddParameter ( DbCommand command , string name , object value )
160+ {
161+ var itemParameter = command . CreateParameter ( ) ;
162+ itemParameter . ParameterName = name ;
163+ itemParameter . Value = value ;
164+ command . Parameters . Add ( itemParameter ) ;
165+ }
166+
167+ }
0 commit comments