Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 64 additions & 93 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
import com.iexec.common.lifecycle.purge.Purgeable;
import com.iexec.commons.poco.chain.*;
import com.iexec.commons.poco.contract.generated.IexecHubContract;
import com.iexec.commons.poco.utils.BytesUtils;
import com.iexec.commons.poco.encoding.LogTopic;
import io.reactivex.Flowable;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.datatypes.Event;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthLog;
import org.web3j.protocol.core.methods.response.Log;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.Date;

import static com.iexec.commons.poco.chain.ChainContributionStatus.CONTRIBUTED;
import static com.iexec.commons.poco.chain.ChainContributionStatus.REVEALED;
import static com.iexec.commons.poco.contract.generated.IexecHubContract.*;

@Slf4j
@Service
Expand All @@ -43,9 +43,9 @@ public class IexecHubService extends IexecHubAbstractService implements Purgeabl
private final SignerService signerService;
private final Web3jService web3jService;

public IexecHubService(SignerService signerService,
Web3jService web3jService,
ChainConfig chainConfig) {
public IexecHubService(final SignerService signerService,
final Web3jService web3jService,
final ChainConfig chainConfig) {
super(
signerService.getCredentials(),
web3jService,
Expand Down Expand Up @@ -156,124 +156,95 @@ public boolean isRevealed(String... args) {
// endregion

// region get event blocks
public ChainReceipt getContributionBlock(String chainTaskId,
String workerWallet,
long fromBlock) {
long latestBlock = web3jService.getLatestBlockNumber();
public ChainReceipt getInitializeBlock(final String chainTaskId,
final long fromBlock) {
log.debug("getInitializeBlock [chainTaskId:{}]", chainTaskId);
final long latestBlock = web3jService.getLatestBlockNumber();
if (fromBlock > latestBlock) {
return ChainReceipt.builder().build();
}

EthFilter ethFilter = createContributeEthFilter(fromBlock, latestBlock);

// filter only taskContribute events for the chainTaskId and the worker's wallet
// and retrieve the block number of the event
return iexecHubContract.taskContributeEventFlowable(ethFilter)
.filter(eventResponse ->
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) &&
workerWallet.equals(eventResponse.worker)
)
.map(eventResponse -> ChainReceipt.builder()
.blockNumber(eventResponse.log.getBlockNumber().longValue())
.txHash(eventResponse.log.getTransactionHash())
.build())
final EthFilter ethFilter = createEthFilter(
fromBlock, latestBlock, LogTopic.TASK_INITIALIZE_EVENT, chainTaskId);
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
.map(this::createChainReceipt)
.blockingFirst();
}

public ChainReceipt getConsensusBlock(String chainTaskId, long fromBlock) {
public ChainReceipt getContributionBlock(final String chainTaskId,
final String workerWallet,
final long fromBlock) {
log.debug("getContributionBlock [chainTaskId:{}]", chainTaskId);
long latestBlock = web3jService.getLatestBlockNumber();
if (fromBlock > latestBlock) {
return ChainReceipt.builder().build();
}

EthFilter ethFilter = createConsensusEthFilter(fromBlock, latestBlock);

// filter only taskConsensus events for the chainTaskId (there should be only one)
// and retrieve the block number of the event
return iexecHubContract.taskConsensusEventFlowable(ethFilter)
.filter(eventResponse -> chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)))
.map(eventResponse -> ChainReceipt.builder()
.blockNumber(eventResponse.log.getBlockNumber().longValue())
.txHash(eventResponse.log.getTransactionHash())
.build())
final EthFilter ethFilter = createEthFilter(
fromBlock, latestBlock, LogTopic.TASK_CONTRIBUTE_EVENT, chainTaskId, workerWallet);
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
.map(this::createChainReceipt)
.blockingFirst();
}

public ChainReceipt getRevealBlock(String chainTaskId,
String workerWallet,
long fromBlock) {
public ChainReceipt getConsensusBlock(final String chainTaskId, final long fromBlock) {
log.debug("getConsensusBlock [chainTaskId:{}]", chainTaskId);
long latestBlock = web3jService.getLatestBlockNumber();
if (fromBlock > latestBlock) {
return ChainReceipt.builder().build();
}

EthFilter ethFilter = createRevealEthFilter(fromBlock, latestBlock);

// filter only taskReveal events for the chainTaskId and the worker's wallet
// and retrieve the block number of the event
return iexecHubContract.taskRevealEventFlowable(ethFilter)
.filter(eventResponse ->
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid)) &&
workerWallet.equals(eventResponse.worker)
)
.map(eventResponse -> ChainReceipt.builder()
.blockNumber(eventResponse.log.getBlockNumber().longValue())
.txHash(eventResponse.log.getTransactionHash())
.build())
final EthFilter ethFilter = createEthFilter(
fromBlock, latestBlock, LogTopic.TASK_CONSENSUS_EVENT, chainTaskId);
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
.map(this::createChainReceipt)
.blockingFirst();
}

public ChainReceipt getFinalizeBlock(String chainTaskId, long fromBlock) {
public ChainReceipt getRevealBlock(final String chainTaskId,
final String workerWallet,
final long fromBlock) {
log.debug("getRevealBlock [chainTaskId:{}]", chainTaskId);
long latestBlock = web3jService.getLatestBlockNumber();
if (fromBlock > latestBlock) {
return ChainReceipt.builder().build();
}

EthFilter ethFilter = createFinalizeEthFilter(fromBlock, latestBlock);

// filter only taskFinalize events for the chainTaskId (there should be only one)
// and retrieve the block number of the event
return iexecHubContract.taskFinalizeEventFlowable(ethFilter)
.filter(eventResponse ->
chainTaskId.equals(BytesUtils.bytesToString(eventResponse.taskid))
)
.map(eventResponse -> ChainReceipt.builder()
.blockNumber(eventResponse.log.getBlockNumber().longValue())
.txHash(eventResponse.log.getTransactionHash())
.build())
final EthFilter ethFilter = createEthFilter(
fromBlock, latestBlock, LogTopic.TASK_REVEAL_EVENT, chainTaskId, workerWallet);
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
.map(this::createChainReceipt)
.blockingFirst();
}

private EthFilter createContributeEthFilter(long fromBlock, long toBlock) {
return createEthFilter(fromBlock, toBlock, TASKCONTRIBUTE_EVENT);
}

private EthFilter createConsensusEthFilter(long fromBlock, long toBlock) {
return createEthFilter(fromBlock, toBlock, TASKCONSENSUS_EVENT);
}

private EthFilter createRevealEthFilter(long fromBlock, long toBlock) {
return createEthFilter(fromBlock, toBlock, TASKREVEAL_EVENT);
public ChainReceipt getFinalizeBlock(final String chainTaskId,
final long fromBlock) {
log.debug("getFinalizeBlock [chainTaskId:{}]", chainTaskId);
long latestBlock = web3jService.getLatestBlockNumber();
if (fromBlock > latestBlock) {
return ChainReceipt.builder().build();
}
final EthFilter ethFilter = createEthFilter(
fromBlock, latestBlock, LogTopic.TASK_FINALIZE_EVENT, chainTaskId);
return web3jService.getWeb3j().ethGetLogs(ethFilter).flowable()
.map(this::createChainReceipt)
.blockingFirst();
}

private EthFilter createFinalizeEthFilter(long fromBlock, long toBlock) {
return createEthFilter(fromBlock, toBlock, TASKFINALIZE_EVENT);
private ChainReceipt createChainReceipt(final EthLog ethLog) {
final Log logEvent = (Log) ethLog.getLogs().get(0);
return ChainReceipt.builder()
.blockNumber(logEvent.getBlockNumber().longValue())
.txHash(logEvent.getTransactionHash())
.build();
}

private EthFilter createEthFilter(long fromBlock, long toBlock, Event event) {
IexecHubContract iexecHub = getHubContract();
DefaultBlockParameter startBlock =
DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock));
DefaultBlockParameter endBlock =
DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock));

// define the filter
EthFilter ethFilter = new EthFilter(
startBlock,
endBlock,
iexecHub.getContractAddress()
private EthFilter createEthFilter(final long fromBlock,
final long toBlock,
final String... topics) {
log.debug("createEthFilter [from:{}, to:{}]", fromBlock, toBlock);
final EthFilter ethFilter = new EthFilter(
DefaultBlockParameter.valueOf(BigInteger.valueOf(fromBlock)),
DefaultBlockParameter.valueOf(BigInteger.valueOf(toBlock)),
iexecHubAddress
);
ethFilter.addSingleTopic(EventEncoder.encode(event));
Arrays.stream(topics).forEach(ethFilter::addSingleTopic);

return ethFilter;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,6 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Optional;

@Slf4j
@Service
public class InitializedTaskDetector implements Detector {
Expand All @@ -38,9 +36,9 @@ public class InitializedTaskDetector implements Detector {
private final TaskUpdateRequestManager taskUpdateRequestManager;
private final IexecHubService iexecHubService;

public InitializedTaskDetector(TaskService taskService,
TaskUpdateRequestManager taskUpdateRequestManager,
IexecHubService iexecHubService) {
public InitializedTaskDetector(final TaskService taskService,
final TaskUpdateRequestManager taskUpdateRequestManager,
final IexecHubService iexecHubService) {
this.taskService = taskService;
this.taskUpdateRequestManager = taskUpdateRequestManager;
this.iexecHubService = iexecHubService;
Expand All @@ -54,8 +52,8 @@ public InitializedTaskDetector(TaskService taskService,
public void detect() {
log.debug("Trying to detect initializable tasks");
for (Task task : taskService.getInitializableTasks()) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (chainTask.isEmpty() || chainTask.get().getStatus().equals(ChainTaskStatus.UNSET)) {
final ChainTask chainTask = iexecHubService.getChainTask(task.getChainTaskId()).orElse(null);
if (chainTask == null || chainTask.getStatus() == ChainTaskStatus.UNSET) {
continue;
}
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ void initializing2Initialized(final Task task) {
log.info("Initialized on blockchain (tx mined) [chainTaskId:{}]", task.getChainTaskId());
final Update update = new Update();
// Without receipt, using deal block for initialization block
task.setInitializationBlockNumber(task.getDealBlockNumber());
update.set("initializationBlockNumber", task.getDealBlockNumber());
final ChainReceipt chainReceipt = iexecHubService.getInitializeBlock(task.getChainTaskId(), task.getDealBlockNumber());
task.setInitializationBlockNumber(chainReceipt.getBlockNumber());
update.set("initializationBlockNumber", chainReceipt.getBlockNumber());

// Create enclave challenge after task has been initialized on-chain
final Optional<String> enclaveChallenge = smsService.getEnclaveChallenge(
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/iexec/core/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TestUtils {
public static final String CHAIN_DEAL_ID = "0xd82223e5feff6720792ffed1665e980da95e5d32b177332013eaba8edc07f31c";
public static final String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426";
public static final int TASK_INDEX = 0;
public static final long DEAL_BLOCK = 1_000L;

public static final String WORKER_ADDRESS = "0x87ae2b87b5db23830572988fb1f51242fbc471ce";
public static final String WALLET_WORKER_1 = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248";
Expand All @@ -45,6 +46,7 @@ public class TestUtils {

public static Task getStubTask() {
final Task task = new Task(CHAIN_DEAL_ID, 0, DAPP_NAME, COMMAND_LINE, 1, 60000, NO_TEE_TAG);
task.setDealBlockNumber(DEAL_BLOCK);
task.setContributionDeadline(Date.from(Instant.now().plus(1, ChronoUnit.MINUTES)));
task.setFinalDeadline(Date.from(Instant.now().plus(1, ChronoUnit.MINUTES)));
return task;
Expand Down
Loading