Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ public class BinanceStreamingAccountService implements StreamingAccountService {
accountInfoLast.toSerialized();

private volatile Disposable accountInfo;
private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
private volatile BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService;
private volatile BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService;
private boolean isFuture = false;

private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

public BinanceStreamingAccountService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
this.binanceUserDataStreamingService = binanceUserDataStreamingService;
BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService, BinanceUserDataSpotStreamingService binanceUserDataSpotStreamingService) {
this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService;
this.binanceUserDataSpotStreamingService = binanceUserDataSpotStreamingService;
if (binanceUserDataFutureStreamingService != null) {
isFuture = true;
}
}

public Observable<OutboundAccountPositionBinanceWebsocketTransaction> getRawAccountInfo() {
Expand All @@ -45,8 +51,13 @@ public Observable<Balance> getBalanceChanges() {
}

private void checkConnected() {
if (binanceUserDataStreamingService == null || !binanceUserDataStreamingService.isSocketOpen())
if (isFuture) {
if (binanceUserDataFutureStreamingService == null || !binanceUserDataFutureStreamingService.isSocketOpen()) {
throw new ExchangeSecurityException("Not authenticated");
}
} else if (binanceUserDataSpotStreamingService == null || !binanceUserDataSpotStreamingService.isSocketOpen()) {
throw new ExchangeSecurityException("Not authenticated");
}
}

@Override
Expand All @@ -61,9 +72,23 @@ public Observable<Balance> getBalanceChanges(Currency currency, Object... args)
* handle these before the first messages arrive.
*/
public void openSubscriptions() {
if (binanceUserDataStreamingService != null) {
if (isFuture) {
if (binanceUserDataFutureStreamingService == null) {
return;
}
accountInfo =
binanceUserDataFutureStreamingService
.subscribeChannel(
BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.OUTBOUND_ACCOUNT_POSITION)
.map(this::accountInfo)
.filter(
m ->
accountInfoLast.getValue() == null
|| accountInfoLast.getValue().getEventTime().before(m.getEventTime()))
.subscribe(accountInfoPublisher::onNext);
} else if (binanceUserDataSpotStreamingService != null) {
accountInfo =
binanceUserDataStreamingService
binanceUserDataSpotStreamingService
.subscribeChannel(
BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.OUTBOUND_ACCOUNT_POSITION)
.map(this::accountInfo)
Expand All @@ -76,14 +101,14 @@ public void openSubscriptions() {
}

/**
* User data subscriptions may have to persist across multiple socket connections to different
* URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted
* stream.
* User data subscriptions may have to persist across multiple socket connections to different URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted stream.
*/
void setUserDataStreamingService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
if (accountInfo != null && !accountInfo.isDisposed()) accountInfo.dispose();
this.binanceUserDataStreamingService = binanceUserDataStreamingService;
void setUserDataFutureStreamingService(
BinanceUserDataFutureStreamingService binanceUserDataFutureStreamingService) {
if (accountInfo != null && !accountInfo.isDisposed()) {
accountInfo.dispose();
}
this.binanceUserDataFutureStreamingService = binanceUserDataFutureStreamingService;
openSubscriptions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami
public static final String FETCH_ORDER_BOOK_LIMIT = "Binance_Fetch_Order_Book_Limit";

private BinanceStreamingService streamingService;
private BinanceUserDataStreamingService userDataStreamingService;
private BinanceUserDataFutureStreamingService userDataFutureStreamingService;
private BinanceUserDataSpotStreamingService userDataSpotStreamingService;
private BinanceUserTradeStreamingService userTradeStreamingService;

private BinanceStreamingMarketDataService streamingMarketDataService;
Expand Down Expand Up @@ -85,12 +86,9 @@ public Completable connect(KlineSubscription klineSubscription, ProductSubscript
}

/**
* Binance streaming API expects connections to multiple channels to be defined at connection
* time. To define the channels for this connection pass a `ProductSubscription` in at connection
* time.
* Binance streaming API expects connections to multiple channels to be defined at connection time. To define the channels for this connection pass a `ProductSubscription` in at connection time.
*
* @param args A single `ProductSubscription` to define the subscriptions required to be available
* during this connection.
* @param args A single `ProductSubscription` to define the subscriptions required to be available during this connection.
* @return A completable which fulfils once connection is complete.
*/
@Override
Expand Down Expand Up @@ -127,17 +125,24 @@ private Completable internalConnect(
ExchangeRestProxyBuilder.forInterface(
BinanceAuthenticated.class, getExchangeSpecification())
.build();
userDataChannel =
new BinanceUserDataChannel(
binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled());
try {
completables.add(createAndConnectUserDataService(userDataChannel.getListenKey()));
if (isFuturesEnabled()) {
userDataChannel =
new BinanceUserDataChannel(
binance, exchangeSpecification.getApiKey(), onApiCall, isFuturesEnabled());
try {
completables.add(createAndConnectUserDataFutureService(userDataChannel.getListenKey()));
} catch (NoActiveChannelException e) {
throw new IllegalStateException("Failed to establish user data channel", e);
}
} else {
if (exchangeSpecification.getExchangeSpecificParametersItem("ed25519") != null
&& exchangeSpecification.getExchangeSpecificParametersItem("ed25519").equals(true)) {
completables.add(createAndConnectUserTradeService());
completables.add(createAndConnectUserDataSpotService());
}
} catch (NoActiveChannelException e) {
throw new IllegalStateException("Failed to establish user data channel", e);
}
if (exchangeSpecification.getExchangeSpecificParametersItem("ed25519") != null
&& exchangeSpecification.getExchangeSpecificParametersItem("ed25519").equals(true)) {
completables.add(createAndConnectUserTradeService());
}
}

Expand All @@ -149,10 +154,10 @@ BinanceAuthenticated.class, getExchangeSpecification())
orderBookUpdateFrequencyParameter,
realtimeOrderBookTicker,
oderBookFetchLimitParameter);
streamingAccountService = new BinanceStreamingAccountService(userDataStreamingService);
streamingAccountService = new BinanceStreamingAccountService(userDataFutureStreamingService, userDataSpotStreamingService);
streamingTradeService =
new BinanceStreamingTradeService(
this, userDataStreamingService, userTradeStreamingService, getResilienceRegistries());
this, userDataFutureStreamingService, userDataSpotStreamingService, userTradeStreamingService, getResilienceRegistries());

return Completable.concat(completables)
.doOnComplete(
Expand All @@ -161,29 +166,29 @@ BinanceAuthenticated.class, getExchangeSpecification())
.doOnComplete(() -> streamingTradeService.openSubscriptions());
}

private Completable createAndConnectUserDataService(String listenKey) {
userDataStreamingService =
BinanceUserDataStreamingService.create(
private Completable createAndConnectUserDataFutureService(String listenKey) {
userDataFutureStreamingService =
BinanceUserDataFutureStreamingService.create(
getStreamingBaseUri(), listenKey, exchangeSpecification);
applyStreamingSpecification(getExchangeSpecification(), userDataStreamingService);
return userDataStreamingService
applyStreamingSpecification(getExchangeSpecification(), userDataFutureStreamingService);
return userDataFutureStreamingService
.connect()
.doOnComplete(
() -> {
LOG.info("Connected to authenticated web socket");
userDataChannel.onChangeListenKey(
newListenKey ->
userDataStreamingService
userDataFutureStreamingService
.disconnect()
.doOnComplete(
() ->
createAndConnectUserDataService(newListenKey)
createAndConnectUserDataFutureService(newListenKey)
.doOnComplete(
() -> {
streamingAccountService.setUserDataStreamingService(
userDataStreamingService);
streamingTradeService.setUserDataStreamingService(
userDataStreamingService);
streamingAccountService.setUserDataFutureStreamingService(
userDataFutureStreamingService);
streamingTradeService.setUserDataFutureStreamingService(
userDataFutureStreamingService);
})));
});
}
Expand All @@ -199,16 +204,27 @@ private Completable createAndConnectUserTradeService() {
return userTradeStreamingService.connect();
}

private Completable createAndConnectUserDataSpotService() {
userDataSpotStreamingService =
new BinanceUserDataSpotStreamingService(
getTradeStreamingBaseUri(),
exchangeSpecification.getApiKey(),
exchangeSpecification.getSecretKey(),
getExchangeSpecification());
applyStreamingSpecification(getExchangeSpecification(), userDataSpotStreamingService);
return userDataSpotStreamingService.connect();
}

@Override
public Completable disconnect() {
List<Completable> completables = new ArrayList<>();
if (streamingService != null) {
completables.add(streamingService.disconnect());
streamingService = null;
}
if (userDataStreamingService != null) {
completables.add(userDataStreamingService.disconnect());
userDataStreamingService = null;
if (userDataFutureStreamingService != null) {
completables.add(userDataFutureStreamingService.disconnect());
userDataFutureStreamingService = null;
}
if (userDataChannel != null) {
userDataChannel.close();
Expand All @@ -221,15 +237,18 @@ public Completable disconnect() {
@Override
public boolean isAlive() {
if (exchangeSpecification.getApiKey() != null) {
if (streamingService != null)
if (isFuturesEnabled()) {
return streamingService.isSocketOpen()
&& userDataStreamingService.isSocketOpen()
&& userDataFutureStreamingService.isSocketOpen()
&& userTradeStreamingService.isSocketOpen()
&& userTradeStreamingService.isAuthorized();
else
return userDataStreamingService.isSocketOpen()
} else {
return streamingService.isSocketOpen()
&& userDataSpotStreamingService.isSocketOpen()
&& userDataSpotStreamingService.isAuthorized()
&& userTradeStreamingService.isSocketOpen()
&& userTradeStreamingService.isAuthorized();
}
} else {
return streamingService != null && streamingService.isSocketOpen();
}
Expand All @@ -251,7 +270,11 @@ public Observable<State> connectionStateObservable() {
}

public Observable<State> connectionStateObservableUserData() {
return userDataStreamingService.subscribeConnectionState();
if (isFuturesEnabled()) {
return userDataFutureStreamingService.subscribeConnectionState();
} else {
return userDataSpotStreamingService.subscribeConnectionState();
}
}

public Observable<State> connectionStateObservableUserTrade() {
Expand Down Expand Up @@ -280,7 +303,7 @@ protected BinanceStreamingService createStreamingService(
getStreamingBaseUri()
+ "stream?streams="
+ URLEncoder.encode(
buildSubscriptionStreams(subscription, klineSubscription), StandardCharsets.UTF_8);
buildSubscriptionStreams(subscription, klineSubscription), StandardCharsets.UTF_8);

BinanceStreamingService streamingService =
new BinanceStreamingService(
Expand Down Expand Up @@ -376,7 +399,9 @@ public void enableLiveSubscription() {
}

public void disableLiveSubscription() {
if (this.streamingService != null) this.streamingService.disableLiveSubscription();
if (this.streamingService != null) {
this.streamingService.disableLiveSubscription();
}
}

/**
Expand Down
Loading