diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingAccountService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingAccountService.java index a21438bf3a..4530c9efba 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingAccountService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingAccountService.java @@ -23,13 +23,19 @@ public class BinanceStreamingAccountService implements StreamingAccountService { accountInfoLast.toSerialized(); private volatile Disposable accountInfo; - private volatile BinanceUserDataStreamingService binanceUserDataStreamingService; + private volatile BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService; + private volatile BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService; + private boolean isFuture = false; private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); public BinanceStreamingAccountService( - BinanceUserDataStreamingService binanceUserDataStreamingService) { - this.binanceUserDataStreamingService = binanceUserDataStreamingService; + BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService, BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService) { + this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService; + this.binanceUserDataSpotStreamingService = binanceUserDataSpotStreamingService; + if (binanceUserDataFutureStreamingService != null) { + isFuture = true; + } } public Observable getRawAccountInfo() { @@ -45,8 +51,13 @@ public Observable getBalanceChanges() { } private void checkConnected() { - if (binanceUserDataStreamingService == null || !binanceUserDataStreamingService.isSocketOpen()) + if (isFuture) { + if (binanceUserDataFutureStreamingService == null || !binanceUserDataFutureStreamingService.isSocketOpen()) { + throw new ExchangeSecurityException("Not authenticated"); + } + } else if (binanceUserDataSpotStreamingService == null || !binanceUserDataSpotStreamingService.isSocketOpen()) { throw new ExchangeSecurityException("Not authenticated"); + } } @Override @@ -61,9 +72,23 @@ public Observable getBalanceChanges(Currency currency, Object... args) * handle these before the first messages arrive. */ public void openSubscriptions() { - if (binanceUserDataStreamingService != null) { + if (isFuture) { + if (binanceUserDataFutureStreamingService == null) { + return; + } + accountInfo = + binanceUserDataFutureStreamingService + .subscribeChannel( + BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.OUTBOUND_ACCOUNT_POSITION) + .map(this::accountInfo) + .filter( + m -> + accountInfoLast.getValue() == null + || accountInfoLast.getValue().getEventTime().before(m.getEventTime())) + .subscribe(accountInfoPublisher::onNext); + } else if (binanceUserDataSpotStreamingService != null) { accountInfo = - binanceUserDataStreamingService + binanceUserDataSpotStreamingService .subscribeChannel( BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.OUTBOUND_ACCOUNT_POSITION) .map(this::accountInfo) @@ -76,14 +101,14 @@ public void openSubscriptions() { } /** - * User data subscriptions may have to persist across multiple socket connections to different - * URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted - * stream. + * User data subscriptions may have to persist across multiple socket connections to different URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted stream. */ - void setUserDataStreamingService( - BinanceUserDataStreamingService binanceUserDataStreamingService) { - if (accountInfo != null && !accountInfo.isDisposed()) accountInfo.dispose(); - this.binanceUserDataStreamingService = binanceUserDataStreamingService; + void setUserDataFutureStreamingService( + BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService) { + if (accountInfo != null && !accountInfo.isDisposed()) { + accountInfo.dispose(); + } + this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService; openSubscriptions(); } diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index 906a8ae4a5..5e51cd72f8 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -40,7 +40,8 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami public static final String FETCH_ORDER_BOOK_LIMIT = "Binance_Fetch_Order_Book_Limit"; private BinanceStreamingService streamingService; - private BinanceUserDataStreamingService userDataStreamingService; + private BinanceUserDataFutureStreamingService userDataFutureStreamingService; + private BinanceUserDataSpotStreamingService userDataSpotStreamingService; private BinanceUserTradeStreamingService userTradeStreamingService; private BinanceStreamingMarketDataService streamingMarketDataService; @@ -85,12 +86,9 @@ public Completable connect(KlineSubscription klineSubscription, ProductSubscript } /** - * Binance streaming API expects connections to multiple channels to be defined at connection - * time. To define the channels for this connection pass a `ProductSubscription` in at connection - * time. + * Binance streaming API expects connections to multiple channels to be defined at connection time. To define the channels for this connection pass a `ProductSubscription` in at connection time. * - * @param args A single `ProductSubscription` to define the subscriptions required to be available - * during this connection. + * @param args A single `ProductSubscription` to define the subscriptions required to be available during this connection. * @return A completable which fulfils once connection is complete. */ @Override @@ -127,17 +125,24 @@ private Completable internalConnect( ExchangeRestProxyBuilder.forInterface( BinanceAuthenticated.class, getExchangeSpecification()) .build(); - userDataChannel = - new BinanceUserDataChannel( - binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled()); - try { - completables.add(createAndConnectUserDataService(userDataChannel.getListenKey())); + if (isFuturesEnabled()) { + userDataChannel = + new BinanceUserDataChannel( + binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled()); + try { + completables.add(createAndConnectUserDataFutureService(userDataChannel.getListenKey())); + } catch (NoActiveChannelException e) { + throw new IllegalStateException("Failed to establish user data channel", e); + } + } else { if (exchangeSpecification.getExchangeSpecificParametersItem("ed25519") != null && exchangeSpecification.getExchangeSpecificParametersItem("ed25519").equals(true)) { - completables.add(createAndConnectUserTradeService()); + completables.add(createAndConnectUserDataSpotService()); } - } catch (NoActiveChannelException e) { - throw new IllegalStateException("Failed to establish user data channel", e); + } + if (exchangeSpecification.getExchangeSpecificParametersItem("ed25519") != null + && exchangeSpecification.getExchangeSpecificParametersItem("ed25519").equals(true)) { + completables.add(createAndConnectUserTradeService()); } } @@ -149,10 +154,10 @@ BinanceAuthenticated.class, getExchangeSpecification()) orderBookUpdateFrequencyParameter, realtimeOrderBookTicker, oderBookFetchLimitParameter); - streamingAccountService = new BinanceStreamingAccountService(userDataStreamingService); + streamingAccountService = new BinanceStreamingAccountService(userDataFutureStreamingService, userDataSpotStreamingService); streamingTradeService = new BinanceStreamingTradeService( - this, userDataStreamingService, userTradeStreamingService, getResilienceRegistries()); + this, userDataFutureStreamingService, userDataSpotStreamingService, userTradeStreamingService, getResilienceRegistries()); return Completable.concat(completables) .doOnComplete( @@ -161,29 +166,29 @@ BinanceAuthenticated.class, getExchangeSpecification()) .doOnComplete(() -> streamingTradeService.openSubscriptions()); } - private Completable createAndConnectUserDataService(String listenKey) { - userDataStreamingService = - BinanceUserDataStreamingService.create( + private Completable createAndConnectUserDataFutureService(String listenKey) { + userDataFutureStreamingService = + BinanceUserDataFutureStreamingService.create( getStreamingBaseUri(), listenKey, exchangeSpecification); - applyStreamingSpecification(getExchangeSpecification(), userDataStreamingService); - return userDataStreamingService + applyStreamingSpecification(getExchangeSpecification(), userDataFutureStreamingService); + return userDataFutureStreamingService .connect() .doOnComplete( () -> { LOG.info("Connected to authenticated web socket"); userDataChannel.onChangeListenKey( newListenKey -> - userDataStreamingService + userDataFutureStreamingService .disconnect() .doOnComplete( () -> - createAndConnectUserDataService(newListenKey) + createAndConnectUserDataFutureService(newListenKey) .doOnComplete( () -> { - streamingAccountService.setUserDataStreamingService( - userDataStreamingService); - streamingTradeService.setUserDataStreamingService( - userDataStreamingService); + streamingAccountService.setUserDataFutureStreamingService( + userDataFutureStreamingService); + streamingTradeService.setUserDataFutureStreamingService( + userDataFutureStreamingService); }))); }); } @@ -199,6 +204,17 @@ private Completable createAndConnectUserTradeService() { return userTradeStreamingService.connect(); } + private Completable createAndConnectUserDataSpotService() { + userDataSpotStreamingService = + new BinanceUserDataSpotStreamingService( + getTradeStreamingBaseUri(), + exchangeSpecification.getApiKey(), + exchangeSpecification.getSecretKey(), + getExchangeSpecification()); + applyStreamingSpecification(getExchangeSpecification(), userDataSpotStreamingService); + return userDataSpotStreamingService.connect(); + } + @Override public Completable disconnect() { List completables = new ArrayList<>(); @@ -206,9 +222,9 @@ public Completable disconnect() { completables.add(streamingService.disconnect()); streamingService = null; } - if (userDataStreamingService != null) { - completables.add(userDataStreamingService.disconnect()); - userDataStreamingService = null; + if (userDataFutureStreamingService != null) { + completables.add(userDataFutureStreamingService.disconnect()); + userDataFutureStreamingService = null; } if (userDataChannel != null) { userDataChannel.close(); @@ -221,15 +237,18 @@ public Completable disconnect() { @Override public boolean isAlive() { if (exchangeSpecification.getApiKey() != null) { - if (streamingService != null) + if (isFuturesEnabled()) { return streamingService.isSocketOpen() - && userDataStreamingService.isSocketOpen() + && userDataFutureStreamingService.isSocketOpen() && userTradeStreamingService.isSocketOpen() && userTradeStreamingService.isAuthorized(); - else - return userDataStreamingService.isSocketOpen() + } else { + return streamingService.isSocketOpen() + && userDataSpotStreamingService.isSocketOpen() + && userDataSpotStreamingService.isAuthorized() && userTradeStreamingService.isSocketOpen() && userTradeStreamingService.isAuthorized(); + } } else { return streamingService != null && streamingService.isSocketOpen(); } @@ -251,7 +270,11 @@ public Observable connectionStateObservable() { } public Observable connectionStateObservableUserData() { - return userDataStreamingService.subscribeConnectionState(); + if (isFuturesEnabled()) { + return userDataFutureStreamingService.subscribeConnectionState(); + } else { + return userDataSpotStreamingService.subscribeConnectionState(); + } } public Observable connectionStateObservableUserTrade() { @@ -280,7 +303,7 @@ protected BinanceStreamingService createStreamingService( getStreamingBaseUri() + "stream?streams=" + URLEncoder.encode( - buildSubscriptionStreams(subscription, klineSubscription), StandardCharsets.UTF_8); + buildSubscriptionStreams(subscription, klineSubscription), StandardCharsets.UTF_8); BinanceStreamingService streamingService = new BinanceStreamingService( @@ -376,7 +399,9 @@ public void enableLiveSubscription() { } public void disableLiveSubscription() { - if (this.streamingService != null) this.streamingService.disableLiveSubscription(); + if (this.streamingService != null) { + this.streamingService.disableLiveSubscription(); + } } /** diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java index e5a600d069..9af3df1397 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingTradeService.java @@ -1,15 +1,24 @@ package info.bitrich.xchangestream.binance; -import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.*; -import static org.knowm.xchange.binance.BinanceResilience.*; +import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.EXECUTION_REPORT; +import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.ORDER_TRADE_UPDATE; +import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TRADE_LITE; +import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_10_SECONDS_RATE_LIMITER; +import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_DAY_RATE_LIMITER; +import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_MINUTE_RATE_LIMITER; +import static org.knowm.xchange.binance.BinanceResilience.REQUEST_WEIGHT_RATE_LIMITER; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes; import info.bitrich.xchangestream.binance.dto.account.AccountUpdateBinanceWebSocketTransaction; -import info.bitrich.xchangestream.binance.dto.trade.*; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelAndReplaceResponse; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderResponse; +import info.bitrich.xchangestream.binance.dto.trade.ExecutionReportBinanceUserTransaction; import info.bitrich.xchangestream.binance.dto.trade.ExecutionReportBinanceUserTransaction.ExecutionType; +import info.bitrich.xchangestream.binance.dto.trade.OrderTradeUpdateBinanceWebSocketTransaction; +import info.bitrich.xchangestream.binance.dto.trade.TradeLiteBinanceWebsocketTransaction; import info.bitrich.xchangestream.core.StreamingTradeService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.github.resilience4j.rxjava3.ratelimiter.operator.RateLimiterOperator; @@ -60,41 +69,50 @@ public class BinanceStreamingTradeService implements StreamingTradeService { private volatile Disposable positionChanges; private final BinanceExchange exchange; private final ResilienceRegistries resilienceRegistries; - private volatile BinanceUserDataStreamingService binanceUserDataStreamingService; - @Setter private volatile BinanceUserTradeStreamingService binanceUserTradeStreamingService; + private volatile BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService; + private volatile BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService; + @Setter + private volatile BinanceUserTradeStreamingService binanceUserTradeStreamingService; private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); public BinanceStreamingTradeService( BinanceExchange exchange, - BinanceUserDataStreamingService binanceUserDataStreamingService, + BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService, + BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService, BinanceUserTradeStreamingService binanceUserTradeStreamingService, ResilienceRegistries resilienceRegistries) { this.resilienceRegistries = resilienceRegistries; this.exchange = exchange; - this.binanceUserDataStreamingService = binanceUserDataStreamingService; + this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService; + this.binanceUserDataSpotStreamingService = binanceUserDataSpotStreamingService; this.binanceUserTradeStreamingService = binanceUserTradeStreamingService; } public Observable getRawExecutionReports() { - if (binanceUserDataStreamingService == null - || !binanceUserDataStreamingService.isSocketOpen()) { + if (exchange.isFuturesEnabled()) { // portfolio margin mode + if (binanceUserDataFutureStreamingService == null + || !binanceUserDataFutureStreamingService.isSocketOpen()) { + throw new ExchangeSecurityException("Not authenticated"); + } + } else if (binanceUserDataSpotStreamingService == null + || !binanceUserDataSpotStreamingService.isSocketOpen()) { throw new ExchangeSecurityException("Not authenticated"); } return executionReportsPublisher; } public Observable getRawOrderTradeUpdate() { - if (binanceUserDataStreamingService == null - || !binanceUserDataStreamingService.isSocketOpen()) { + if (binanceUserDataFutureStreamingService == null + || !binanceUserDataFutureStreamingService.isSocketOpen()) { throw new ExchangeSecurityException("Not authenticated"); } return orderTradeUpdatePublisher; } public Observable getRawTradeLite() { - if (binanceUserDataStreamingService == null - || !binanceUserDataStreamingService.isSocketOpen()) { + if (binanceUserDataFutureStreamingService == null + || !binanceUserDataFutureStreamingService.isSocketOpen()) { throw new ExchangeSecurityException("Not authenticated"); } return tradeLitePublisher; @@ -102,8 +120,8 @@ public Observable getRawTradeLite() { public Observable getRawPositionChanges( boolean isFuture) { - if (binanceUserDataStreamingService == null - || !binanceUserDataStreamingService.isSocketOpen()) { + if (binanceUserDataFutureStreamingService == null + || !binanceUserDataFutureStreamingService.isSocketOpen()) { throw new ExchangeSecurityException("Not authenticated"); } return positionChangesPublisher; @@ -215,7 +233,9 @@ public Single placeOrder(Order order) { resilienceRegistries .rateLimiters() .rateLimiter(REQUEST_WEIGHT_RATE_LIMITER))); - } else throw new UnsupportedOperationException("Only spot and futures supported"); + } else { + throw new UnsupportedOperationException("Only spot and futures supported"); + } } } else { throw new UnsupportedOperationException("binanceUserTradeStreamingService not authorized"); @@ -228,7 +248,8 @@ private Observable placeOrderInternal(Order order) { .flatMap( node -> { TypeReference> typeReference = - new TypeReference<>() {}; + new TypeReference<>() { + }; BinanceWebsocketOrderResponse response = mapper.treeToValue(node, typeReference); if (response.getStatus() == 200) { @@ -248,7 +269,8 @@ public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... o .flatMap( node -> { TypeReference> typeReference = - new TypeReference<>() {}; + new TypeReference<>() { + }; BinanceWebsocketOrderResponse response = mapper.treeToValue(node, typeReference); if (response.getStatus() == 200) { @@ -280,9 +302,10 @@ public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... o .flatMap( node -> { TypeReference< - BinanceWebsocketOrderResponse< - BinanceWebsocketOrderCancelAndReplaceResponse>> - typeReference = new TypeReference<>() {}; + BinanceWebsocketOrderResponse< + BinanceWebsocketOrderCancelAndReplaceResponse>> + typeReference = new TypeReference<>() { + }; BinanceWebsocketOrderResponse response = mapper.treeToValue(node, typeReference); if (response.getStatus() == 200) { @@ -304,7 +327,9 @@ public Single changeOrder(LimitOrder limitOrder, CancelOrderParams... o .compose( RateLimiterOperator.of( resilienceRegistries.rateLimiters().rateLimiter(REQUEST_WEIGHT_RATE_LIMITER))); - } else throw new UnsupportedOperationException("Only spot and futures supported"); + } else { + throw new UnsupportedOperationException("Only spot and futures supported"); + } } else { throw new UnsupportedOperationException("binanceUserTradeStreamingService not authorized"); @@ -320,7 +345,8 @@ public Single cancelOrder(CancelOrderParams orderParams) { .flatMap( node -> { TypeReference> typeReference = - new TypeReference<>() {}; + new TypeReference<>() { + }; BinanceWebsocketOrderResponse response = mapper.treeToValue(node, typeReference); if (response.getStatus() == 200) { @@ -343,41 +369,46 @@ public Single cancelOrder(CancelOrderParams orderParams) { } } - /** Registers subsriptions with the streaming service for the given products. */ + /** + * Registers subsriptions with the streaming service for the given products. + */ public void openSubscriptions() { - if (binanceUserDataStreamingService != null) { + if (binanceUserDataFutureStreamingService != null) { executionReports = - binanceUserDataStreamingService + binanceUserDataFutureStreamingService .subscribeChannel(EXECUTION_REPORT) .map(this::executionReport) .subscribe(executionReportsPublisher::onNext); orderTradeUpdate = - binanceUserDataStreamingService + binanceUserDataFutureStreamingService .subscribeChannel(ORDER_TRADE_UPDATE) .map(this::orderTradeUpdate) .subscribe(orderTradeUpdatePublisher::onNext); tradeLite = - binanceUserDataStreamingService + binanceUserDataFutureStreamingService .subscribeChannel(TRADE_LITE) .map(this::tradeLite) .subscribe(tradeLitePublisher::onNext); positionChanges = - binanceUserDataStreamingService + binanceUserDataFutureStreamingService .subscribeChannel(BinanceWebSocketTypes.ACCOUNT_UPDATE) .map(this::positionChanges) .subscribe(positionChangesPublisher::onNext); - - binanceUserDataStreamingService.setEnableLoggingHandler(true); + } + if (binanceUserDataSpotStreamingService != null) { + executionReports = + binanceUserDataSpotStreamingService + .subscribeChannel(EXECUTION_REPORT) + .map(this::executionReport) + .subscribe(executionReportsPublisher::onNext); } } /** - * User data subscriptions may have to persist across multiple socket connections to different - * URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted - * stream. + * User data subscriptions may have to persist across multiple socket connections to different URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted stream. */ - void setUserDataStreamingService( - BinanceUserDataStreamingService binanceUserDataStreamingService) { + void setUserDataFutureStreamingService( + BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService) { if (executionReports != null && !executionReports.isDisposed()) { executionReports.dispose(); } @@ -390,7 +421,7 @@ void setUserDataStreamingService( if (positionChanges != null && !positionChanges.isDisposed()) { positionChanges.dispose(); } - this.binanceUserDataStreamingService = binanceUserDataStreamingService; + this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService; openSubscriptions(); } diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUsStreamingExchange.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUsStreamingExchange.java index 3ba8af061a..0cbc568e02 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUsStreamingExchange.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUsStreamingExchange.java @@ -29,13 +29,16 @@ public class BinanceUsStreamingExchange extends BinanceUsExchange implements StreamingExchange { private static final Logger LOG = LoggerFactory.getLogger(BinanceUsStreamingExchange.class); private static final String WS_API_BASE_URI = "wss://stream.binance.us:9443/"; - + private static final String WS_TRADE_API_BASE_URI = "wss://ws-api.binance.com:443/ws-api/v3"; + private static final String WS_SANDBOX_TRADE_API_BASE_URI = + "wss://ws-api.testnet.binance.vision/ws-api/v3"; private static final String WS_SANDBOX_API_BASE_URI = "wss://testnet.binance.vision/"; public static final String USE_HIGHER_UPDATE_FREQUENCY = "Binance_Orderbook_Use_Higher_Frequency"; public static final String USE_REALTIME_BOOK_TICKER = "Binance_Ticker_Use_Realtime"; public static final String FETCH_ORDER_BOOK_LIMIT = "Binance_Fetch_Order_Book_Limit"; private BinanceStreamingService streamingService; - private BinanceUserDataStreamingService userDataStreamingService; + private BinanceUserDataFutureStreamingService userDataFutureStreamingService; + private BinanceUserDataSpotStreamingService userDataSpotStreamingService; private BinanceUserTradeStreamingService userTradeStreamingService; private BinanceStreamingMarketDataService streamingMarketDataService; @@ -138,13 +141,22 @@ private Completable internalConnect( ExchangeRestProxyBuilder.forInterface( BinanceAuthenticated.class, getExchangeSpecification()) .build(); - userDataChannel = - new BinanceUserDataChannel( - binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled()); - try { - completables.add(createAndConnectUserDataService(userDataChannel.getListenKey())); - } catch (NoActiveChannelException e) { - throw new IllegalStateException("Failed to establish user data channel", e); + if (isFuturesEnabled()) { + userDataChannel = + new BinanceUserDataChannel( + binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled()); + try { + completables.add(createAndConnectUserDataFutureService(userDataChannel.getListenKey())); + } catch (NoActiveChannelException e) { + throw new IllegalStateException("Failed to establish user data channel", e); + } + } + if (exchangeSpecification.getExchangeSpecificParametersItem("ed25519") != null + && exchangeSpecification.getExchangeSpecificParametersItem("ed25519").equals(true)) { + completables.add(createAndConnectUserTradeService()); + if (!isFuturesEnabled()) { + completables.add(createAndConnectUserDataSpotService()); + } } } @@ -156,10 +168,10 @@ BinanceAuthenticated.class, getExchangeSpecification()) orderBookUpdateFrequencyParameter, realtimeOrderBookTicker, oderBookFetchLimitParameter); - streamingAccountService = new BinanceStreamingAccountService(userDataStreamingService); + streamingAccountService = new BinanceStreamingAccountService(userDataFutureStreamingService, userDataSpotStreamingService); streamingTradeService = new BinanceStreamingTradeService( - this, userDataStreamingService, userTradeStreamingService, getResilienceRegistries()); + this, userDataFutureStreamingService, userDataSpotStreamingService, userTradeStreamingService, getResilienceRegistries()); return Completable.concat(completables) .doOnComplete( @@ -168,33 +180,54 @@ BinanceAuthenticated.class, getExchangeSpecification()) .doOnComplete(() -> streamingTradeService.openSubscriptions()); } - private Completable createAndConnectUserDataService(String listenKey) { - userDataStreamingService = - BinanceUserDataStreamingService.create( + private Completable createAndConnectUserDataFutureService(String listenKey) { + userDataFutureStreamingService = + BinanceUserDataFutureStreamingService.create( getStreamingBaseUri(), listenKey, getExchangeSpecification()); - applyStreamingSpecification(getExchangeSpecification(), userDataStreamingService); - return userDataStreamingService + applyStreamingSpecification(getExchangeSpecification(), userDataFutureStreamingService); + return userDataFutureStreamingService .connect() .doOnComplete( () -> { LOG.info("Connected to authenticated web socket"); userDataChannel.onChangeListenKey( newListenKey -> - userDataStreamingService + userDataFutureStreamingService .disconnect() .doOnComplete( () -> - createAndConnectUserDataService(newListenKey) + createAndConnectUserDataFutureService(newListenKey) .doOnComplete( () -> { - streamingAccountService.setUserDataStreamingService( - userDataStreamingService); - streamingTradeService.setUserDataStreamingService( - userDataStreamingService); + streamingAccountService.setUserDataFutureStreamingService( + userDataFutureStreamingService); + streamingTradeService.setUserDataFutureStreamingService( + userDataFutureStreamingService); }))); }); } + private Completable createAndConnectUserTradeService() { + userTradeStreamingService = + new BinanceUserTradeStreamingService( + getTradeStreamingBaseUri(), + exchangeSpecification.getApiKey(), + exchangeSpecification.getSecretKey(), + getExchangeSpecification()); + applyStreamingSpecification(getExchangeSpecification(), userTradeStreamingService); + return userTradeStreamingService.connect(); + } + + private Completable createAndConnectUserDataSpotService() { + userDataSpotStreamingService = + new BinanceUserDataSpotStreamingService( + getTradeStreamingBaseUri(), + exchangeSpecification.getApiKey(), + exchangeSpecification.getSecretKey(), + getExchangeSpecification()); + applyStreamingSpecification(getExchangeSpecification(), userDataSpotStreamingService); + return userDataSpotStreamingService.connect(); + } @Override public Completable disconnect() { List completables = new ArrayList<>(); @@ -202,9 +235,9 @@ public Completable disconnect() { completables.add(streamingService.disconnect()); streamingService = null; } - if (userDataStreamingService != null) { - completables.add(userDataStreamingService.disconnect()); - userDataStreamingService = null; + if (userDataFutureStreamingService != null) { + completables.add(userDataFutureStreamingService.disconnect()); + userDataFutureStreamingService = null; } if (userDataChannel != null) { userDataChannel.close(); @@ -286,6 +319,12 @@ protected String getStreamingBaseUri() { : WS_API_BASE_URI; } + protected String getTradeStreamingBaseUri() { + return Boolean.TRUE.equals(exchangeSpecification.getExchangeSpecificParametersItem(USE_SANDBOX)) + ? WS_SANDBOX_TRADE_API_BASE_URI + : WS_TRADE_API_BASE_URI; + } + public String buildSubscriptionStreams(ProductSubscription subscription) { return Stream.of( buildSubscriptionStrings( diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataStreamingService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataFutureStreamingService.java similarity index 82% rename from xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataStreamingService.java rename to xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataFutureStreamingService.java index 81a00a2c08..04cb03a7ae 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataStreamingService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataFutureStreamingService.java @@ -13,17 +13,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BinanceUserDataStreamingService extends JsonNettyStreamingService { +public class BinanceUserDataFutureStreamingService extends JsonNettyStreamingService { - private static final Logger LOG = LoggerFactory.getLogger(BinanceUserDataStreamingService.class); + private static final Logger LOG = LoggerFactory.getLogger(BinanceUserDataFutureStreamingService.class); - public static BinanceUserDataStreamingService create( + public static BinanceUserDataFutureStreamingService create( String baseUri, String listenKey, ExchangeSpecification exchangeSpecification) { - return new BinanceUserDataStreamingService(baseUri + "ws/" + listenKey, exchangeSpecification); + return new BinanceUserDataFutureStreamingService(baseUri + "ws/" + listenKey, exchangeSpecification); } - private BinanceUserDataStreamingService(String url, ExchangeSpecification exchangeSpecification) { + private BinanceUserDataFutureStreamingService(String url, ExchangeSpecification exchangeSpecification) { super( url, 65536, @@ -47,7 +47,7 @@ protected void handleMessage(JsonNode message) { try { super.handleMessage(message); } catch (Exception e) { - LOG.error("Error handling message: " + message, e); + LOG.error("Error handling message: {}", message, e); } } diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataSpotStreamingService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataSpotStreamingService.java new file mode 100644 index 0000000000..bca1cc5c2e --- /dev/null +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataSpotStreamingService.java @@ -0,0 +1,223 @@ +package info.bitrich.xchangestream.binance; + +import static info.bitrich.xchangestream.core.StreamingExchange.WS_CONNECTION_TIMEOUT; +import static info.bitrich.xchangestream.core.StreamingExchange.WS_IDLE_TIMEOUT; +import static info.bitrich.xchangestream.core.StreamingExchange.WS_RETRY_DURATION; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketLoginPayloadWithSignature; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketLoginResponse; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderResponse; +import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketPayload; +import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; +import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableSource; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.disposables.CompositeDisposable; +import io.reactivex.rxjava3.disposables.Disposable; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.security.Security; +import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; +import java.util.Base64; +import lombok.Getter; +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; +import org.bouncycastle.crypto.Signer; +import org.bouncycastle.crypto.params.AsymmetricKeyParameter; +import org.bouncycastle.crypto.signers.Ed25519Signer; +import org.bouncycastle.crypto.util.PrivateKeyFactory; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.binance.dto.BinanceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BinanceUserDataSpotStreamingService extends JsonNettyStreamingService { + + private static final Logger LOG = LoggerFactory.getLogger(BinanceUserDataSpotStreamingService.class); + private final String apiKey; + private final String privateKey; + CompositeDisposable compositeDisposable = new CompositeDisposable(); + Charset charSet = StandardCharsets.UTF_8; + @Getter + private boolean authorized = false; + private String signature = ""; + private Disposable loginDisposable; + + public BinanceUserDataSpotStreamingService(String apiUrl, String apiKey, String privateKey, + ExchangeSpecification exchangeSpecification) { + super( + apiUrl, + 65536, + (Duration) exchangeSpecification.getExchangeSpecificParametersItem(WS_CONNECTION_TIMEOUT), + (Duration) exchangeSpecification.getExchangeSpecificParametersItem(WS_RETRY_DURATION), + (Integer) exchangeSpecification.getExchangeSpecificParametersItem(WS_IDLE_TIMEOUT)); + this.apiKey = apiKey; + this.privateKey = privateKey; + } + + @Override + public Completable connect() { + Completable conn = super.connect(); + return conn.andThen( + (CompletableSource) + (completable) -> { + login(); + Disposable disposable = + subscribeDisconnect() + .subscribe( + obj -> { + authorized = false; + signature = ""; + }); + compositeDisposable.add(disposable); + completable.onComplete(); + }); + } + + public void login() { + ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + Observable observable = + this.subscribeChannel(String.valueOf(System.currentTimeMillis()), "session.logon") + .flatMap( + node -> { + TypeReference> + typeReference = new TypeReference<>() { + }; + BinanceWebsocketOrderResponse response = + mapper.treeToValue(node, typeReference); + if (response.getStatus() == 200) { + return Observable.just(true); + } else { + return Observable.error( + new BinanceException( + response.getError().getCode(), response.getError().getMsg())); + } + }); + loginDisposable = + observable + .firstElement() + .doOnError(error -> LOG.error("Login error", error)) + .subscribe( + loginResult -> { + LOG.info("Successfully authorized to BinanceUserDataSpotStreamingService"); + authorized = true; + subscribeToUserDataChannel(); + }); + } + + private void subscribeToUserDataChannel() { + subscribeChannel(String.valueOf(System.currentTimeMillis()), "userDataStream.subscribe").subscribe(node -> { + LOG.info("Received user data stream subscription response: {}", node); + }); + } + + @Override + public String getSubscriptionUniqueId(String channelName, Object... args) { + return channelName; + } + + @Override + public String getSubscribeMessage(String channelName, Object... args) throws IOException { + if (args != null && args.length > 0) { + String method = args[0].toString(); + switch (method) { + case "session.logon": {// login + long timestamp = System.currentTimeMillis(); + try { + String loginPayload = "apiKey=" + apiKey + "×tamp=" + timestamp; + signature = signPayload(loginPayload); + BinanceWebsocketLoginPayloadWithSignature loginPayloadWithSignature = + new BinanceWebsocketLoginPayloadWithSignature(apiKey, signature, timestamp); + BinanceWebsocketPayload payload = + new BinanceWebsocketPayload<>( + channelName, "session.logon", loginPayloadWithSignature); + return objectMapper.writeValueAsString(payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + case "userDataStream.subscribe": { + try { + BinanceWebsocketPayload payload = + new BinanceWebsocketPayload<>( + channelName, "userDataStream.subscribe", null); + return objectMapper.writeValueAsString(payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + return null; + } + + public String signPayload(String payload) throws Exception { + Security.addProvider(new BouncyCastleProvider()); + byte[] decodePrivateKey = Base64.getDecoder().decode(privateKey.getBytes(charSet)); + PKCS8EncodedKeySpec pkcs8EncodedKeySpec = new PKCS8EncodedKeySpec(decodePrivateKey); + PrivateKeyInfo instancePrivate = PrivateKeyInfo.getInstance(pkcs8EncodedKeySpec.getEncoded()); + AsymmetricKeyParameter keyPrivate = PrivateKeyFactory.createKey(instancePrivate); + Signer signer = new Ed25519Signer(); + signer.init(true, keyPrivate); + var payloadBytes = payload.getBytes(charSet); + signer.update(payloadBytes, 0, payloadBytes.length); + byte[] signature = signer.generateSignature(); + return new String(Base64.getEncoder().encode(signature)); + } + + @Override + public Completable disconnect() { + compositeDisposable.dispose(); + return super.disconnect(); + } + + @Override + public void messageHandler(String message) { + LOG.debug("Received message: {}", message); + super.messageHandler(message); + } + + @Override + protected void handleMessage(JsonNode message) { + try { + if (message.get("event") != null) { + super.handleMessage(message.get("event")); + } else { + super.handleMessage(message); + } + } catch (Exception e) { + LOG.error("Error handling message: " + message, e); + } + } + + public Observable subscribeChannel(BinanceWebSocketTypes eventType) { + + return super.subscribeChannel(eventType.getSerializedValue()); + } + + + @Override + protected String getChannelNameFromMessage(JsonNode message) { + if (message.get("e") != null) { + return message.get("e").asText(); + } else if (message.get("id") != null) { + return message.get("id").asText(); + } + return null; + } + + @Override + public String getUnsubscribeMessage(String channelName, Object... args) { + // No op. Disconnecting from the web socket will cancel subscriptions. + return null; + } + +} diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java index 0de906b823..3f611d22d5 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserTradeStreamingService.java @@ -52,7 +52,6 @@ public class BinanceUserTradeStreamingService extends JsonNettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(BinanceUserTradeStreamingService.class); - private static final Pattern p = Pattern.compile("[a-z.]+|\\d+"); private final String apiKey; private final String privateKey; CompositeDisposable compositeDisposable = new CompositeDisposable(); diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/trade/ExecutionReportBinanceUserTransaction.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/trade/ExecutionReportBinanceUserTransaction.java index bbb0dcd9ea..633ca40e8a 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/trade/ExecutionReportBinanceUserTransaction.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/trade/ExecutionReportBinanceUserTransaction.java @@ -121,7 +121,7 @@ public Order toOrder(boolean isFuture) { getSymbol(), orderId, clientOrderId, - orderPrice, + lastExecutedPrice, orderQuantity, lastExecutedQuantity, // not sure 100%, but according SPOT documentation api diff --git a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPrivateTest.java b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPrivateTest.java index 2ae9a4cb6c..dbc96f84d1 100644 --- a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPrivateTest.java +++ b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPrivateTest.java @@ -42,8 +42,9 @@ public class BinanceFutureStreamPrivateTest { public void setUp() { ExchangeSpecification spec = new ExchangeSpecification(BinanceFutureStreamingExchange.class); // The most convenient way. Can store all keys in .ssh folder - AuthUtils.setApiAndSecretKey(spec, "binance-demo-futures"); - spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); + AuthUtils.setApiAndSecretKey(spec, "binance-main-ed25519"); // apikey and ed2519 private key + spec.setExchangeSpecificParametersItem("ed25519", true); +// spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); spec.setExchangeSpecificParametersItem(EXCHANGE_TYPE, FUTURES); exchange = StreamingExchangeFactory.INSTANCE.createExchange(spec); binanceFutureStreamingExchange = (BinanceFutureStreamingExchange) exchange; @@ -109,7 +110,7 @@ public void getOrderAndPositionChanges() throws IOException { }); Thread.sleep(3000); Ticker ticker = exchange.getMarketDataService().getTicker(instrument); - BigDecimal amount = new BigDecimal("0.01"); + BigDecimal amount = new BigDecimal("0.012"); // place limit order String orderId = exchange diff --git a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPublicTest.java b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPublicTest.java index bbd6d25580..9f36fffb59 100644 --- a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPublicTest.java +++ b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceFutureStreamPublicTest.java @@ -48,9 +48,9 @@ public void setUp() { ExchangeSpecification spec = new ExchangeSpecification(BinanceFutureStreamingExchange.class); // The most convenient way. Can store all keys in .ssh folder AuthUtils.setApiAndSecretKey(spec, "binance-demo-futures"); - spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); +// spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); spec.setExchangeSpecificParametersItem(EXCHANGE_TYPE, FUTURES); - // optional - more frequent ticker updates + // optional - more frequent OrderBook ticker updates // spec.setExchangeSpecificParametersItem(USE_REALTIME_BOOK_TICKER, true); // optional more frequent order book updates // spec.setExchangeSpecificParametersItem(USE_HIGHER_UPDATE_FREQUENCY, true); diff --git a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamPrivateTest.java b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamPrivateTest.java index 41eb3519bd..bfac7b0f06 100644 --- a/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamPrivateTest.java +++ b/xchange-stream-binance/src/test/java/info/bitrich/xchangestream/binance/examples/BinanceSpotStreamPrivateTest.java @@ -1,11 +1,11 @@ package info.bitrich.xchangestream.binance.examples; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.knowm.xchange.Exchange.USE_SANDBOX; import static org.knowm.xchange.binance.BinanceExchange.EXCHANGE_TYPE; import static org.knowm.xchange.binance.dto.ExchangeType.SPOT; import info.bitrich.xchangestream.binance.BinanceStreamingExchange; +import info.bitrich.xchangestream.binance.BinanceStreamingTradeService; import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingExchangeFactory; @@ -30,18 +30,19 @@ public class BinanceSpotStreamPrivateTest { private static final Logger LOG = LoggerFactory.getLogger(BinanceSpotStreamPrivateTest.class); - private static StreamingExchange exchange; - BinanceStreamingExchange binanceStreamingExchange; private static final Instrument instrument = new CurrencyPair("ETH/USDT"); private static final Instrument instrument2 = new CurrencyPair("SOL/USDT"); private static final boolean logOutput = false; + private static StreamingExchange exchange; + BinanceStreamingExchange binanceStreamingExchange; @Before public void setUp() { ExchangeSpecification spec = new ExchangeSpecification(BinanceStreamingExchange.class); // The most convenient way. Can store all keys in .ssh folder - AuthUtils.setApiAndSecretKey(spec, "binance-demo"); - spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); + AuthUtils.setApiAndSecretKey(spec, "binance-main-ed25519"); // apikey and ed2519 private key + spec.setExchangeSpecificParametersItem("ed25519", true); +// spec.setExchangeSpecificParametersItem(USE_SANDBOX, true); spec.setExchangeSpecificParametersItem(EXCHANGE_TYPE, SPOT); exchange = StreamingExchangeFactory.INSTANCE.createExchange(spec); binanceStreamingExchange = (BinanceStreamingExchange) exchange; @@ -69,6 +70,8 @@ public void getOrderAndPositionChanges() throws IOException, InterruptedExceptio assertThat(orderChanges.getInstrument().equals(instrument)).isTrue(); assertThat(orderChanges.getType().equals(OrderType.BID)).isTrue(); }); + Thread.sleep(3000); + Disposable userTradeLiteDisposable = exchange .getStreamingTradeService() @@ -76,7 +79,7 @@ public void getOrderAndPositionChanges() throws IOException, InterruptedExceptio .subscribe( trade -> { if (logOutput) { - LOG.info("trade lite subscribe: {}", trade); + LOG.info("trade subscribe: {}", trade); } assertThat(trade.getInstrument().equals(instrument)).isTrue(); assertThat(trade.getType().equals(OrderType.BID)).isTrue(); @@ -93,7 +96,7 @@ public void getOrderAndPositionChanges() throws IOException, InterruptedExceptio }); Thread.sleep(3000); Ticker ticker = exchange.getMarketDataService().getTicker(instrument); - BigDecimal amount = new BigDecimal("0.01"); + BigDecimal amount = new BigDecimal("0.005"); // place limit order String orderId = exchange