diff --git a/iexec-core-library/src/main/java/com/iexec/core/config/WorkerModel.java b/iexec-core-library/src/main/java/com/iexec/core/config/WorkerModel.java index 6948a4d8..79055f4c 100644 --- a/iexec-core-library/src/main/java/com/iexec/core/config/WorkerModel.java +++ b/iexec-core-library/src/main/java/com/iexec/core/config/WorkerModel.java @@ -31,8 +31,10 @@ public class WorkerModel { String cpu; int cpuNb; int memorySize; - boolean teeEnabled; boolean gpuEnabled; + // TODO remove or rename to sgxEnabled in the future + boolean teeEnabled; + boolean tdxEnabled; @JsonPOJOBuilder(withPrefix = "") public static class WorkerModelBuilder { diff --git a/iexec-core-library/src/test/java/com/iexec/core/config/WorkerModelTests.java b/iexec-core-library/src/test/java/com/iexec/core/config/WorkerModelTests.java index 4ff87713..928756b3 100644 --- a/iexec-core-library/src/test/java/com/iexec/core/config/WorkerModelTests.java +++ b/iexec-core-library/src/test/java/com/iexec/core/config/WorkerModelTests.java @@ -31,7 +31,7 @@ void shouldSerializeAndDeserialize() throws JsonProcessingException { WorkerModel model = WorkerModel.builder().build(); String jsonString = mapper.writeValueAsString(model); assertThat(jsonString).isEqualTo("{\"name\":null,\"walletAddress\":null,\"os\":null,\"cpu\":null," + - "\"cpuNb\":0,\"memorySize\":0,\"teeEnabled\":false,\"gpuEnabled\":false}"); + "\"cpuNb\":0,\"memorySize\":0,\"gpuEnabled\":false,\"teeEnabled\":false,\"tdxEnabled\":false}"); WorkerModel parsedModel = mapper.readValue(jsonString, WorkerModel.class); assertThat(parsedModel).isEqualTo(model); } diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index 035138c6..cfd8240f 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -98,52 +98,38 @@ Optional getAvailableReplicateTaskSummary(long workerLastB return Optional.empty(); } - final Optional optional = workerService.getWorker(walletAddress); - if (optional.isEmpty()) { + final Worker worker = workerService.getWorker(walletAddress).orElse(null); + // return empty if the worker is not found or if max computing task is reached + if (worker == null || worker.hasNoRemainingComputingSlot()) { return Optional.empty(); } - final Worker worker = optional.get(); - // return empty if max computing task is reached or if the worker is not found - if (!workerService.canAcceptMoreWorks(worker)) { - return Optional.empty(); - } - - return getReplicateTaskSummaryForAnyAvailableTask( - walletAddress, - worker.isTeeEnabled() - ); + return getReplicateTaskSummaryForAnyAvailableTask(worker); } /** * Loops through available tasks * and finds the first one that needs a new {@link Replicate}. * - * @param walletAddress Wallet address of the worker asking for work. - * @param isTeeEnabled Whether this worker supports TEE. + * @param worker scheduler model of the worker asking for work * @return An {@link Optional} containing a {@link ReplicateTaskSummary} * if any {@link Task} is available and can be handled by this worker, * {@link Optional#empty()} otherwise. */ - private Optional getReplicateTaskSummaryForAnyAvailableTask( - String walletAddress, - boolean isTeeEnabled) { + private Optional getReplicateTaskSummaryForAnyAvailableTask(final Worker worker) { final List alreadyScannedTasks = new ArrayList<>(); + final List excludedTags = worker.getExcludedTags(); Optional replicateTaskSummary = Optional.empty(); while (replicateTaskSummary.isEmpty()) { - final Optional oTask = taskService.getPrioritizedInitializedOrRunningTask( - !isTeeEnabled, - alreadyScannedTasks - ); - if (oTask.isEmpty()) { + final Task task = taskService.getPrioritizedInitializedOrRunningTask( + excludedTags, alreadyScannedTasks).orElse(null); + if (task == null) { // No more tasks waiting for a new replicate. return Optional.empty(); } - - final Task task = oTask.get(); alreadyScannedTasks.add(task.getChainTaskId()); - replicateTaskSummary = getReplicateTaskSummary(task, walletAddress); + replicateTaskSummary = getReplicateTaskSummary(task, worker.getWalletAddress()); } return replicateTaskSummary; } diff --git a/src/main/java/com/iexec/core/task/Task.java b/src/main/java/com/iexec/core/task/Task.java index bf1d3a43..ca24e526 100644 --- a/src/main/java/com/iexec/core/task/Task.java +++ b/src/main/java/com/iexec/core/task/Task.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -146,7 +146,7 @@ public boolean inCompletionPhase() { } public boolean isTeeTask() { - return TeeUtils.isTeeTag(getTag()); + return TeeUtils.getTeeFramework(tag) != null; } TaskModel generateModel() { diff --git a/src/main/java/com/iexec/core/task/TaskService.java b/src/main/java/com/iexec/core/task/TaskService.java index 865356b8..c739fdae 100644 --- a/src/main/java/com/iexec/core/task/TaskService.java +++ b/src/main/java/com/iexec/core/task/TaskService.java @@ -18,7 +18,6 @@ import com.iexec.commons.poco.chain.ChainTask; import com.iexec.commons.poco.chain.ChainTaskStatus; -import com.iexec.commons.poco.tee.TeeUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.replicate.ReplicatesList; import com.iexec.core.task.event.TaskCreatedEvent; @@ -229,21 +228,17 @@ public List findByCurrentStatus(List statusList) { *

* Tasks can be excluded with {@code excludedChainTaskIds}. * - * @param shouldExcludeTeeTasks Whether TEE tasks should be retrieved - * as well as standard tasks. - * @param excludedChainTaskIds Tasks to exclude from retrieval. + * @param excludedTags Whether some tags should not be eligible, it is focused on TEE tags at the moment + * @param excludedChainTaskIds Tasks to exclude from retrieval. * @return The first task which is {@link TaskStatus#INITIALIZED} * or {@link TaskStatus#RUNNING}, * or {@link Optional#empty()} if no task meets the requirements. */ public Optional getPrioritizedInitializedOrRunningTask( - boolean shouldExcludeTeeTasks, - List excludedChainTaskIds) { - final List excludedTags = shouldExcludeTeeTasks - ? List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG) - : null; + final List excludedTags, + final List excludedChainTaskIds) { return findPrioritizedTask( - Arrays.asList(INITIALIZED, RUNNING), + List.of(INITIALIZED, RUNNING), excludedTags, excludedChainTaskIds, Sort.by(Sort.Order.desc(CURRENT_STATUS_FIELD_NAME), diff --git a/src/main/java/com/iexec/core/worker/Worker.java b/src/main/java/com/iexec/core/worker/Worker.java index 39ddc05f..e5067f55 100644 --- a/src/main/java/com/iexec/core/worker/Worker.java +++ b/src/main/java/com/iexec/core/worker/Worker.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 IEXEC BLOCKCHAIN TECH + * Copyright 2020-2025 IEXEC BLOCKCHAIN TECH * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,11 @@ package com.iexec.core.worker; +import com.iexec.commons.poco.tee.TeeUtils; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.index.Indexed; import org.springframework.data.mongodb.core.mapping.Document; @@ -26,12 +28,12 @@ import java.util.Date; import java.util.List; +@Slf4j @Document @Data @Builder @AllArgsConstructor public class Worker { - @Id private String id; private String name; @@ -44,8 +46,10 @@ public class Worker { private int cpuNb; private int maxNbTasks; private int memorySize; - private boolean teeEnabled; private boolean gpuEnabled; + // TODO remove or rename to sgxEnabled in the future + private boolean teeEnabled; + private boolean tdxEnabled; @Builder.Default private List participatingChainTaskIds = List.of(); @Builder.Default @@ -60,12 +64,36 @@ void addChainTaskId(String chainTaskId) { computingChainTaskIds.add(chainTaskId); } - void removeChainTaskId(String chainTaskId) { - participatingChainTaskIds.remove(chainTaskId); - computingChainTaskIds.remove(chainTaskId); + /** + * Returns excluded tags depending on worker configuration + * + * @return The list of excluded tags + */ + public List getExcludedTags() { + if (!teeEnabled && !tdxEnabled) { + return List.of(TeeUtils.TEE_TDX_ONLY_TAG, TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + } else if (!teeEnabled) { + return List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + } else if (!tdxEnabled) { + return List.of(TeeUtils.TEE_TDX_ONLY_TAG); + } else { + // /!\ teeEnabled and tdxEnabled are both true in this branch + log.warn("Worker seems to support both SGX and TDX, this should not happen [wallet:{}]", walletAddress); + return List.of(); + } } - void removeComputedChainTaskId(String chainTaskId) { - computingChainTaskIds.remove(chainTaskId); + /** + * Returns whether the worker can accept more work or not. + * + * @return {@literal true} when the worker is at max capacity, {@literal false} otherwise + */ + public boolean hasNoRemainingComputingSlot() { + final boolean areAllComputingSlotsInUse = computingChainTaskIds.size() >= maxNbTasks; + if (areAllComputingSlotsInUse) { + log.debug("Worker is computing at max capacity [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]", + walletAddress, computingChainTaskIds.size(), maxNbTasks); + } + return areAllComputingSlotsInUse; } } diff --git a/src/main/java/com/iexec/core/worker/WorkerController.java b/src/main/java/com/iexec/core/worker/WorkerController.java index 668ab378..58ef0cdd 100644 --- a/src/main/java/com/iexec/core/worker/WorkerController.java +++ b/src/main/java/com/iexec/core/worker/WorkerController.java @@ -102,18 +102,18 @@ public ResponseEntity getToken(@RequestParam(name = "walletAddress") Str } @PostMapping(path = "/workers/register") - public ResponseEntity registerWorker(@RequestHeader("Authorization") String bearerToken, - @RequestBody WorkerModel model) { - String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken); + public ResponseEntity registerWorker(@RequestHeader("Authorization") final String bearerToken, + @RequestBody final WorkerModel model) { + final String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken); if (workerWalletAddress.isEmpty()) { WorkerUtils.emitWarnOnUnAuthorizedAccess(""); return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } // if it is a GPU worker, it can process only 1 task at a time, otherwise it can process cpuNb - int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb(); + final int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb(); - Worker worker = Worker.builder() + final Worker worker = Worker.builder() .name(model.getName()) .walletAddress(workerWalletAddress) .os(model.getOs()) @@ -121,13 +121,14 @@ public ResponseEntity registerWorker(@RequestHeader("Authorization") Str .cpuNb(model.getCpuNb()) .maxNbTasks(maxNbTasks) .memorySize(model.getMemorySize()) - .teeEnabled(model.isTeeEnabled()) .gpuEnabled(model.isGpuEnabled()) + .teeEnabled(model.isTeeEnabled()) + .tdxEnabled(model.isTdxEnabled()) .participatingChainTaskIds(new ArrayList<>()) .computingChainTaskIds(new ArrayList<>()) .build(); - Worker savedWorker = workerService.addWorker(worker); + final Worker savedWorker = workerService.addWorker(worker); log.info("Worker ready [worker:{}]", savedWorker); return ok(savedWorker); } diff --git a/src/main/java/com/iexec/core/worker/WorkerService.java b/src/main/java/com/iexec/core/worker/WorkerService.java index 223f21be..5298568c 100644 --- a/src/main/java/com/iexec/core/worker/WorkerService.java +++ b/src/main/java/com/iexec/core/worker/WorkerService.java @@ -191,19 +191,6 @@ public List getAliveWorkers() { .toList(); return workerRepository.findByWalletAddressIn(aliveWorkers); } - - public boolean canAcceptMoreWorks(Worker worker) { - int workerMaxNbTasks = worker.getMaxNbTasks(); - int runningReplicateNb = worker.getComputingChainTaskIds().size(); - - if (runningReplicateNb >= workerMaxNbTasks) { - log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]", - worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks); - return false; - } - - return true; - } // endregion // region Read-and-write methods @@ -257,21 +244,20 @@ public Optional addChainTaskIdToWorker(String chainTaskId, String wallet } private Optional addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) { - final Optional optional = workerRepository.findByWalletAddress(walletAddress); - if (optional.isPresent()) { - final Worker worker = optional.get(); - if (!canAcceptMoreWorks(worker)) { - log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]", - chainTaskId, walletAddress); - return Optional.empty(); - } - worker.addChainTaskId(chainTaskId); - log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress); - return Optional.of(workerRepository.save(worker)); + final Worker worker = workerRepository.findByWalletAddress(walletAddress).orElse(null); + if (worker == null) { + log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]", + chainTaskId, walletAddress); + return Optional.empty(); + } + if (worker.hasNoRemainingComputingSlot()) { + log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]", + chainTaskId, walletAddress); + return Optional.empty(); } - log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]", - chainTaskId, walletAddress); - return Optional.empty(); + worker.addChainTaskId(chainTaskId); + log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress); + return Optional.of(workerRepository.save(worker)); } public Optional removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) { diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index 2b6e1091..1aa713bf 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -40,7 +40,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; @@ -63,6 +62,10 @@ @ExtendWith(MockitoExtension.class) class ReplicateSupplyServiceTests { + private static final List ALL_TEE_TAGS_EXCLUSION_LIST = List.of(TeeUtils.TEE_TDX_ONLY_TAG, TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + private static final List SGX_TEE_TAGS_EXCLUSION_LIST = List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + private static final List TDX_TEE_TAGS_EXCLUSION_LIST = List.of(TeeUtils.TEE_TDX_ONLY_TAG); + private static final String WALLET_WORKER_1 = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; private static final String WALLET_WORKER_2 = "0xdcfeffee1443fbf9277e6fa3b50cf3b38f7101af"; @@ -71,7 +74,6 @@ class ReplicateSupplyServiceTests { private static final String DAPP_NAME = "dappName"; private static final String COMMAND_LINE = "commandLine"; private static final String NO_TEE_TAG = BytesUtils.EMPTY_HEX_STRING_32; - private static final String TEE_TAG = TeeUtils.TEE_SCONE_ONLY_TAG; //any supported TEE tag private static final String ENCLAVE_CHALLENGE = "dummyEnclave"; private static final long MAX_EXECUTION_TIME = 60000; long workerLastBlock = 12; @@ -89,7 +91,6 @@ class ReplicateSupplyServiceTests { @Mock private Web3jService web3jService; - @Spy @InjectMocks private ReplicateSupplyService replicateSupplyService; @@ -127,7 +128,7 @@ void shouldNotGetReplicateSinceNoRunningTask() { .build(); mockWorkerCanAcceptMoreWork(worker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())).thenReturn(Optional.empty()); + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -137,7 +138,7 @@ void shouldNotGetReplicateSinceNoRunningTask() { @Test void shouldNotGetReplicateSinceNoReplicatesList() { - Worker worker = Worker.builder() + final Worker worker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) @@ -152,7 +153,7 @@ void shouldNotGetReplicateSinceNoReplicatesList() { runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); mockWorkerCanAcceptMoreWork(worker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.empty()); @@ -165,19 +166,16 @@ void shouldNotGetReplicateSinceNoReplicatesList() { @Test void shouldNotGetReplicateSinceConsensusReachedOnChain() { - Worker worker = Worker.builder() + final Worker worker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) .maxNbTasks(3) .teeEnabled(false) .build(); - final Replicate replicate = new Replicate(WALLET_WORKER_2, CHAIN_TASK_ID); + final Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID); replicate.updateStatus(CONTRIBUTED, ReplicateStatusModifier.WORKER); - List replicates = List.of(replicate); - ReplicatesList replicatesList = spy( - new ReplicatesList(CHAIN_TASK_ID, replicates) - ); + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, List.of(replicate)); final Task runningTask = getStubTask(5); runningTask.setMaxExecutionTime(MAX_EXECUTION_TIME); @@ -186,11 +184,10 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); mockWorkerCanAcceptMoreWork(worker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(taskService.isConsensusReached(replicatesList)).thenReturn(true); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_2)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_2); @@ -208,10 +205,10 @@ void shouldNotGetAnyReplicateSinceWorkerIsFull() { .walletAddress(WALLET_WORKER_1) .cpuNb(2) .maxNbTasks(1) + .computingChainTaskIds(List.of(CHAIN_TASK_ID)) .build(); when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(worker)); - when(workerService.canAcceptMoreWorks(worker)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -230,7 +227,7 @@ void shouldNotGetAnyReplicateSinceWorkerDoesNotHaveEnoughGas() { @Test void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { - Worker existingWorker = Worker.builder() + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) @@ -243,17 +240,14 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - final ReplicatesList replicatesList = spy(new ReplicatesList( - CHAIN_TASK_ID, - Collections.singletonList(new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID)) - )); + final ReplicatesList replicatesList = new ReplicatesList( + CHAIN_TASK_ID, List.of(new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID))); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -264,7 +258,7 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { @Test void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { - Worker existingWorker = Worker.builder() + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(2) @@ -284,16 +278,13 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { replicate.setWorkerWeight(trust); replicate.setContributionHash("test"); - final ReplicatesList replicatesList = spy( - new ReplicatesList(CHAIN_TASK_ID, List.of(replicate)) - ); + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, List.of(replicate)); // Try to see if a replicate of the task can be scheduled on worker2 mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_2)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_2); assertThat(replicateTaskSummary).isEmpty(); @@ -304,7 +295,7 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { @Test void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { - Worker existingWorker = Worker.builder() + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) @@ -314,12 +305,12 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { final Task runningTask = getStubTask(5); runningTask.setMaxExecutionTime(MAX_EXECUTION_TIME); - runningTask.setTag(TEE_TAG); + runningTask.setTag(TeeUtils.TEE_SCONE_ONLY_TAG); runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); runningTask.setEnclaveChallenge(""); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(TDX_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); Optional replicateTaskSummary = @@ -357,11 +348,11 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { taskDeadlineReached.setTag(NO_TEE_TAG); taskDeadlineReached.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()); + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(task1)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, CHAIN_DEAL_ID, TASK_INDEX, BytesUtils.EMPTY_ADDRESS)) @@ -399,7 +390,7 @@ void shouldNotGetReplicateWhenTaskIsAlreadyBeingAccessed() { runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.computeIfAbsent(CHAIN_TASK_ID, k -> new ReentrantLock()); CompletableFuture.runAsync(lock::lock).join(); @@ -413,7 +404,7 @@ void shouldNotGetReplicateWhenTaskIsAlreadyBeingAccessed() { @Test void shouldGetReplicateWithNoTee() { - Worker existingWorker = Worker.builder() + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) @@ -427,17 +418,14 @@ void shouldGetReplicateWithNoTee() { runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - final ReplicatesList replicatesList = spy( - new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) - ); + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, CHAIN_DEAL_ID, TASK_INDEX, BytesUtils.EMPTY_ADDRESS)) .thenReturn(WorkerpoolAuthorization.builder().build()); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(Optional.of(existingWorker)); when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) @@ -454,8 +442,8 @@ void shouldGetReplicateWithNoTee() { } @Test - void shouldGetReplicateWithTee() { - Worker existingWorker = Worker.builder() + void shouldGetReplicateWithSgx() { + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) @@ -465,16 +453,14 @@ void shouldGetReplicateWithTee() { final Task runningTask = getStubTask(5); runningTask.setMaxExecutionTime(MAX_EXECUTION_TIME); - runningTask.setTag(TEE_TAG); + runningTask.setTag(TeeUtils.TEE_SCONE_ONLY_TAG); runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); runningTask.setEnclaveChallenge(ENCLAVE_CHALLENGE); - final ReplicatesList replicatesList = spy( - new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) - ); + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) + when(taskService.getPrioritizedInitializedOrRunningTask(TDX_TEE_TAGS_EXCLUSION_LIST, List.of())) .thenReturn(Optional.of(runningTask)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, CHAIN_DEAL_ID, TASK_INDEX, ENCLAVE_CHALLENGE)) @@ -484,7 +470,6 @@ void shouldGetReplicateWithTee() { when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) .thenReturn(true); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -496,72 +481,69 @@ void shouldGetReplicateWithTee() { } @Test - void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { - Worker existingWorker = Worker.builder() + void shouldGetReplicateWithTdx() { + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) .maxNbTasks(1) - .teeEnabled(false) + .tdxEnabled(true) .build(); final Task runningTask = getStubTask(5); runningTask.setMaxExecutionTime(MAX_EXECUTION_TIME); - runningTask.setTag(TEE_TAG); + runningTask.setTag(TeeUtils.TEE_TDX_ONLY_TAG); runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); + runningTask.setEnclaveChallenge(ENCLAVE_CHALLENGE); + + final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) - .thenReturn(Optional.empty()); + when(taskService.getPrioritizedInitializedOrRunningTask(SGX_TEE_TAGS_EXCLUSION_LIST, List.of())) + .thenReturn(Optional.of(runningTask)); + when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); + when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, CHAIN_DEAL_ID, TASK_INDEX, ENCLAVE_CHALLENGE)) + .thenReturn(WorkerpoolAuthorization.builder().build()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); - assertThat(replicateTaskSummary).isEmpty(); - verifyNoInteractions(signatureService); - assertTaskAccessForNewReplicateLockNeverUsed(CHAIN_TASK_ID); + assertThat(replicateTaskSummary).isPresent(); + + verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); + verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); + assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); } @Test - void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { - Worker existingWorker = Worker.builder() + void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { + final Worker existingWorker = Worker.builder() .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) .maxNbTasks(1) - .teeEnabled(true) + .teeEnabled(false) .build(); final Task runningTask = getStubTask(5); runningTask.setMaxExecutionTime(MAX_EXECUTION_TIME); - runningTask.setTag(TEE_TAG); + runningTask.setTag(TeeUtils.TEE_SCONE_ONLY_TAG); runningTask.setContributionDeadline(Date.from(Instant.now().plus(60, ChronoUnit.MINUTES))); - runningTask.setEnclaveChallenge(ENCLAVE_CHALLENGE); - - final ReplicatesList replicatesList = spy( - new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) - ); mockWorkerCanAcceptMoreWork(existingWorker); - when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) - .thenReturn(Optional.of(runningTask)); - when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); - when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, CHAIN_DEAL_ID, TASK_INDEX, ENCLAVE_CHALLENGE)) - .thenReturn(WorkerpoolAuthorization.builder().build()); - when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) - .thenReturn(Optional.of(existingWorker)); - when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) - .thenReturn(true); + when(taskService.getPrioritizedInitializedOrRunningTask(ALL_TEE_TAGS_EXCLUSION_LIST, List.of())) + .thenReturn(Optional.empty()); - when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); - assertThat(replicateTaskSummary).isPresent(); - - verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); - verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); - assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); + assertThat(replicateTaskSummary).isEmpty(); + verifyNoInteractions(signatureService); + assertTaskAccessForNewReplicateLockNeverUsed(CHAIN_TASK_ID); } /** @@ -571,7 +553,6 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { */ private void assertTaskAccessForNewReplicateNotDeadLocking(String chainTaskId) { final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.get(chainTaskId); - System.out.println("Task: " + chainTaskId + " ; lock : " + lock); final Boolean successfulLock = CompletableFuture.supplyAsync(() -> { final boolean locked = lock.tryLock(); if (!locked) { @@ -594,9 +575,7 @@ private void assertTaskAccessForNewReplicateLockNeverUsed(String chainTaskId) { // region getMissedTaskNotifications @Test void shouldReturnEmptyListSinceNotParticipatingToAnyTask() { - - when(taskService.getTasksByChainTaskIds(any())) - .thenReturn(Collections.emptyList()); + when(taskService.getTasksByChainTaskIds(any())).thenReturn(List.of()); List list = replicateSupplyService.getMissedTaskNotifications(1L, WALLET_WORKER_1); @@ -612,7 +591,7 @@ void shouldNotGetInterruptedReplicateSinceEnclaveChallengeNeededButNotGenerated( Task teeTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID); teeTask.setEnclaveChallenge(ENCLAVE_CHALLENGE); Optional noTeeReplicate = getStubReplicate(ReplicateStatus.COMPUTING); - teeTask.setTag(TEE_TAG); + teeTask.setTag(TeeUtils.TEE_SCONE_ONLY_TAG); when(workerService.getChainTaskIds(WALLET_WORKER_1)).thenReturn(ids); when(taskService.getTasksByChainTaskIds(ids)).thenReturn(List.of(teeTask)); @@ -1158,6 +1137,5 @@ private void mockTaskList(final TaskStatus taskStatus) { private void mockWorkerCanAcceptMoreWork(final Worker worker) { when(web3jService.hasEnoughGas(worker.getWalletAddress())).thenReturn(true); when(workerService.getWorker(worker.getWalletAddress())).thenReturn(Optional.of(worker)); - when(workerService.canAcceptMoreWorks(worker)).thenReturn(true); } } diff --git a/src/test/java/com/iexec/core/task/TaskServiceTests.java b/src/test/java/com/iexec/core/task/TaskServiceTests.java index ac34178b..b9d248f8 100644 --- a/src/test/java/com/iexec/core/task/TaskServiceTests.java +++ b/src/test/java/com/iexec/core/task/TaskServiceTests.java @@ -31,7 +31,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; @@ -266,7 +265,7 @@ void shouldGetInitializedOrRunningTasks() { final Task task = getStubTask(); task.setCurrentStatus(INITIALIZED); taskRepository.save(task); - assertThat(taskService.getPrioritizedInitializedOrRunningTask(false, List.of())) + assertThat(taskService.getPrioritizedInitializedOrRunningTask(List.of(), List.of())) .usingRecursiveComparison() .isEqualTo(Optional.of(task)); } @@ -276,7 +275,7 @@ void shouldNotGetTaskPastContributionDeadline() { final Task task = getStubTask(); task.setContributionDeadline(Date.from(Instant.now().minus(1, ChronoUnit.MINUTES))); taskRepository.save(task); - assertThat(taskService.getPrioritizedInitializedOrRunningTask(false, List.of())) + assertThat(taskService.getPrioritizedInitializedOrRunningTask(List.of(), List.of())) .usingRecursiveComparison() .isEqualTo(Optional.empty()); } @@ -308,7 +307,7 @@ void shouldConsensusNotBeReachedAsUnknownTask() { assertThat(taskService.isConsensusReached(new ReplicatesList(CHAIN_TASK_ID))) .isFalse(); - Mockito.verify(iexecHubService).getChainTask(any()); + verify(iexecHubService).getChainTask(any()); } @Test @@ -325,13 +324,13 @@ void shouldConsensusNotBeReachedAsNotRevealing() { assertThat(taskService.isConsensusReached(new ReplicatesList(task.getChainTaskId()))) .isFalse(); - Mockito.verify(iexecHubService).getChainTask(any()); + verify(iexecHubService).getChainTask(any()); } @Test void shouldConsensusNotBeReachedAsOnChainWinnersHigherThanOffchainWinners() { final Task task = getStubTask(); - final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(task.getChainTaskId())); + final ReplicatesList replicatesList = spy(new ReplicatesList(task.getChainTaskId())); final ChainTask chainTask = ChainTask .builder() .chainTaskId(task.getChainTaskId()) @@ -345,13 +344,13 @@ void shouldConsensusNotBeReachedAsOnChainWinnersHigherThanOffchainWinners() { assertThat(taskService.isConsensusReached(replicatesList)) .isFalse(); - Mockito.verify(iexecHubService).getChainTask(any()); + verify(iexecHubService).getChainTask(any()); } @Test void shouldConsensusBeReached() { final Task task = getStubTask(); - final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(task.getChainTaskId())); + final ReplicatesList replicatesList = spy(new ReplicatesList(task.getChainTaskId())); final ChainTask chainTask = ChainTask .builder() .chainTaskId(task.getChainTaskId()) @@ -365,7 +364,7 @@ void shouldConsensusBeReached() { assertThat(taskService.isConsensusReached(replicatesList)) .isTrue(); - Mockito.verify(iexecHubService).getChainTask(any()); + verify(iexecHubService).getChainTask(any()); } // endregion diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java index 870ef12d..21ce5905 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java @@ -499,26 +499,6 @@ void shouldNotFindAliveWorkers() { assertThat(workerService.getAliveWorkers()).isEmpty(); } - @Test - void shouldAcceptMoreWorks() { - Worker worker = getDummyWorker(WORKER1, - 3, - List.of("task1", "task2", "task3", "task4", "task5"), - List.of("task1", "task3")); - - assertThat(workerService.canAcceptMoreWorks(worker)).isTrue(); - } - - @Test - void shouldNotAcceptMoreWorksSinceSaturatedCpus() { - Worker worker = getDummyWorker(WORKER1, - 2, - List.of("task1", "task2", "task3", "task4"), - List.of("task1", "task3")); - - assertThat(workerService.canAcceptMoreWorks(worker)).isFalse(); - } - List getDummyWorkers() { workerService.getWorkerStatsMap().computeIfAbsent("address1", WorkerService.WorkerStats::new) .setLastAliveDate(Date.from(Instant.now())); diff --git a/src/test/java/com/iexec/core/worker/WorkerTests.java b/src/test/java/com/iexec/core/worker/WorkerTests.java new file mode 100644 index 00000000..bc817d22 --- /dev/null +++ b/src/test/java/com/iexec/core/worker/WorkerTests.java @@ -0,0 +1,88 @@ +/* + * Copyright 2025 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.worker; + +import com.iexec.commons.poco.tee.TeeUtils; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class WorkerTests { + private static final String WORKER = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; + + private Worker getDummyWorker(final int cpuNb, final List participatingIds, final List computingIds) { + return Worker.builder() + .walletAddress(WORKER) + .cpuNb(cpuNb) + .maxNbTasks(cpuNb) + .gpuEnabled(false) + .participatingChainTaskIds(participatingIds) + .computingChainTaskIds(computingIds) + .build(); + } + + // region getExcludedTag + @Test + void shouldExcludeAllTagsForStandardWorker() { + final Worker worker = Worker.builder().build(); + assertThat(worker.getExcludedTags()) + .containsExactly(TeeUtils.TEE_TDX_ONLY_TAG, TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + } + + @Test + void shouldExcludeTdxTagForSgxWorker() { + final Worker worker = Worker.builder().teeEnabled(true).build(); + assertThat(worker.getExcludedTags()) + .containsExactly(TeeUtils.TEE_TDX_ONLY_TAG); + } + + @Test + void shouldExcludeSgxTagsForTdxWorker() { + final Worker worker = Worker.builder().tdxEnabled(true).build(); + assertThat(worker.getExcludedTags()) + .containsExactly(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG); + } + + @Test + void shouldExcludeNoTag() { + final Worker worker = Worker.builder().teeEnabled(true).tdxEnabled(true).build(); + assertThat(worker.getExcludedTags()).isEmpty(); + } + // endregion + + // region hasRemainingComputingSlot + @Test + void shouldHaveRemainingComputingSlot() { + final Worker worker = getDummyWorker( + 3, + List.of("task1", "task2", "task3", "task4", "task5"), + List.of("task1", "task3")); + assertThat(worker.hasNoRemainingComputingSlot()).isFalse(); + } + + @Test + void shouldNotHaveRemainingComputingSlot() { + final Worker worker = getDummyWorker( + 2, + List.of("task1", "task2", "task3", "task4"), + List.of("task1", "task3")); + assertThat(worker.hasNoRemainingComputingSlot()).isTrue(); + } + // endregion +}