Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
93c8317
Setup new module for a job queue service
DiegoTavares Jul 10, 2025
425a393
Initial version of the distributed job-scheduler
DiegoTavares Jul 18, 2025
1068970
[draft] dispatcher
DiegoTavares Jul 22, 2025
dc3bf95
Add frame range parsing and chunking for job dispatching
DiegoTavares Aug 7, 2025
71aa58b
Add job_resource cores limits to host_dao query
DiegoTavares Aug 8, 2025
378dde3
Merge branch 'master' into distributed_scheduler
DiegoTavares Aug 8, 2025
c0a6193
Implement scheduler using kafka
DiegoTavares Aug 19, 2025
afb000c
Compiles
DiegoTavares Aug 29, 2025
75ac963
Make all memory fields bytesize
DiegoTavares Aug 29, 2025
1596d02
Fix database memory values from bytes to kb
DiegoTavares Aug 29, 2025
dd3d41a
Implement cluster logic using facility+show+tag
DiegoTavares Sep 4, 2025
40d8a2b
Fix layer host candidate loop
DiegoTavares Sep 4, 2025
b147b7f
Remove dead files and old TODOs
DiegoTavares Sep 4, 2025
94d7a6c
Add integration tests
DiegoTavares Sep 5, 2025
d04eaa9
Rename and refactor integration_tests to smoke_tests
DiegoTavares Sep 12, 2025
804f2ae
WIP: Add scheduler stress tests
DiegoTavares Sep 18, 2025
78ecb0e
Minor fixes
DiegoTavares Sep 19, 2025
623fcc3
First working stress tests
DiegoTavares Sep 22, 2025
fd6305e
Update job fetcher to use fetch_all and stream processing
DiegoTavares Sep 23, 2025
d959d16
Refactor modules
DiegoTavares Sep 24, 2025
1b7a9ad
Batch layer and frame queries
DiegoTavares Sep 24, 2025
2b97f12
Fixed several dashmap related deadlocks
DiegoTavares Sep 25, 2025
3216258
Convert host_cache to scc
DiegoTavares Sep 26, 2025
2e9f651
Wrap HostCache in an Actor System using actix
DiegoTavares Sep 27, 2025
55f5e41
Remove unecessary debug statements
DiegoTavares Sep 27, 2025
733f366
Migrate Dispatcher interface into an Actor
DiegoTavares Sep 27, 2025
2dee076
Migrate Dispatcher interface into an Actor
DiegoTavares Sep 27, 2025
2af6c54
Clean up warnings
DiegoTavares Oct 2, 2025
91ab556
Prevent race condition trying to book the same host
DiegoTavares Oct 2, 2025
dd5f9e3
Document pub functions
DiegoTavares Oct 2, 2025
8edb761
Document host_cache
DiegoTavares Oct 2, 2025
7b68055
Add host locking mechanism to avoid race conditions
DiegoTavares Oct 9, 2025
915bebf
Add option to launch scheduler with a list of clusters
DiegoTavares Oct 9, 2025
b825e18
Add ProcDao to store resource allocation for frame dispatch
DiegoTavares Oct 9, 2025
f945316
Fix warnings
DiegoTavares Oct 9, 2025
b3eaadb
Add Allocation and Subscription Service to Scheduler
DiegoTavares Oct 20, 2025
1cd71c6
Replace experimental duration_constructors with explicit Duration calls
DiegoTavares Oct 28, 2025
f49c759
Refactor dispatcher to centralize virtual proc dispatch logic
DiegoTavares Oct 28, 2025
899e379
Add row-level locking to frame dispatch to prevent conflicts
DiegoTavares Oct 28, 2025
82970a4
Handle selfish services
DiegoTavares Oct 28, 2025
eba78c7
Fix unit tests
DiegoTavares Oct 28, 2025
48890ef
Add show:alloc exclusion list to cuebot
DiegoTavares Oct 28, 2025
faa4efa
Turn off spotless for HostDaoJdbc and format queries
DiegoTavares Oct 29, 2025
20bb2ca
Merge branch 'master' into distributed_scheduler_2
DiegoTavares Oct 29, 2025
e1caa64
Use rust latest stable edition 2021
DiegoTavares Oct 29, 2025
c9d9714
Remove cargo.lock
DiegoTavares Oct 29, 2025
9acc76d
Ignore Cargo.lock
DiegoTavares Oct 29, 2025
7913f3f
Remove stress test from the basic build
DiegoTavares Oct 29, 2025
e7a4591
Fix warnings
DiegoTavares Oct 29, 2025
b900590
Use Cargo resolver version 2 in workspace configuration
DiegoTavares Oct 29, 2025
fd7aa51
Add a dockerfile for the scheduler module
DiegoTavares Oct 30, 2025
c096ae1
Remove kafka references from scheduler
DiegoTavares Oct 30, 2025
3bf764c
Use channels between cluster and entrypoint logics
DiegoTavares Nov 5, 2025
7c1ed0b
Add LayerPermitService to prevent concurrent processing of same layer
DiegoTavares Nov 5, 2025
6da9ee0
Add memory_hungry.sh script to allocate specified memory for testing
DiegoTavares Nov 5, 2025
d6e48d6
Refine dry run logging to debug level
DiegoTavares Nov 6, 2025
511e889
Add host booking strategy and cluster round counters
DiegoTavares Nov 6, 2025
548ccd6
Remove lock_for_update from frame dispatch logic
DiegoTavares Nov 6, 2025
9acbd1a
Fix unit of memory on proc_dao.rs
DiegoTavares Nov 6, 2025
b4cc44f
Improve logging
DiegoTavares Nov 7, 2025
e76db1c
Add updated_at timestamp to frame and layer models
DiegoTavares Nov 7, 2025
35ee40a
Add Prometheus metrics to scheduler service
DiegoTavares Nov 7, 2025
f6f2008
Disable stress tests on default testset
DiegoTavares Nov 7, 2025
bc962d8
Merge branch 'master' into distributed_scheduler_2
DiegoTavares Nov 7, 2025
aec329a
Add optional host locking in dispatcher commands
DiegoTavares Nov 7, 2025
401cf64
Refactor DatabaseConfig to use explicit connection params
DiegoTavares Nov 7, 2025
550c7b8
Update Scheduler Configuration and Config Model
DiegoTavares Nov 8, 2025
886c194
Fix unit tests and warnings
DiegoTavares Nov 8, 2025
635b6ff
Add URL encoding for database credentials
DiegoTavares Nov 8, 2025
14e60b5
Improve RqdDispatcherService connection caching and timeout handling
DiegoTavares Nov 10, 2025
221e120
Add metrics for job query duration in scheduler
DiegoTavares Nov 10, 2025
491feb3
Add config option to turn off host booking
DiegoTavares Nov 10, 2025
3ac24dc
Change log level from info to debug in matcher
DiegoTavares Nov 10, 2025
1816b49
Add OS validation to host matching process
DiegoTavares Nov 10, 2025
bb8a53c
Ensure grpc connection cache is invalidated in any error condition
DiegoTavares Nov 10, 2025
de1de29
Fix reference to facility ID in cluster feed loading
DiegoTavares Nov 10, 2025
f092f62
[rebase] Update host cache and DAO to improve resource tracking and a…
DiegoTavares Nov 13, 2025
f75c8ba
[rebase] Refactor host_cache
DiegoTavares Nov 13, 2025
1ed312a
Refactor host_cache
DiegoTavares Nov 20, 2025
125930f
Refactor HostStore with atomic operations and improved concurrency
DiegoTavares Nov 20, 2025
f1c9016
Add debug logging for host cache and signal handling
DiegoTavares Nov 21, 2025
ac35982
Add debug log when host is considered stale in cache
DiegoTavares Nov 21, 2025
178ba52
Change Id types to Uuid
DiegoTavares Nov 24, 2025
11aab71
Replace host.ts_last_updated by host_stat.ts_ping
DiegoTavares Nov 25, 2025
0e9bc63
Set PostgreSQL connection to UTC timezone
DiegoTavares Nov 25, 2025
e5d60bb
Remove custom timestamp layers from tracing logs
DiegoTavares Nov 25, 2025
c3581e6
Fix case issue on facility pk on hos_dao
DiegoTavares Nov 25, 2025
8d35a31
Revert cuebot changes
DiegoTavares Nov 25, 2025
fafef7e
Remove unused opencue.properties entries
DiegoTavares Nov 25, 2025
00c318c
Add retry count tracking for dispatched frames
DiegoTavares Nov 27, 2025
9ffcb65
Add option to ignore a list of tags
DiegoTavares Nov 28, 2025
06ad353
Make host_dao facility query case-insensitive
DiegoTavares Dec 3, 2025
d11b51d
Add configurable frame memory soft and hard limits
DiegoTavares Dec 4, 2025
2faa2cf
Update Cluster type to include facility ID
DiegoTavares Dec 8, 2025
0b62d3b
Merge branch 'master' into distributed_scheduler_2
DiegoTavares Dec 8, 2025
5699d77
Remove local files
DiegoTavares Dec 8, 2025
1db651c
Add int_concurrent_frames_limit to host
DiegoTavares Dec 9, 2025
cd2de9e
[scheduler/cuebot] Introduce booking by slot to scheduler
DiegoTavares Dec 9, 2025
493b55f
Add Layer Slots Required Field
DiegoTavares Dec 11, 2025
2ff2142
Merge branch 'master' into scheduler_resource_naive_mode
DiegoTavares Dec 11, 2025
b4251f5
Add slots_required field to layer in OpenCue DTD and PyOutline layer
DiegoTavares Dec 11, 2025
18d0e9c
Version up to account for changes on .proto
DiegoTavares Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ cuebot/.project
/pycue/opencue/compiled_proto/
/rqd/rqd/compiled_proto/
docker-compose-local.yml
/sandbox/kafka*
/sandbox/zookeeper*
docs/_site/
docs/bin/
sandbox/kafka-data
sandbox/zookeeper-data
sandbox/zookeeper-logs
docs/_data/version.yml
target/*
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14
1.15
4 changes: 2 additions & 2 deletions cuebot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ dependencies {

compileJava {
dependsOn generateProto
options.compilerArgs << "-Xlint:all" << "-Werror"
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
}

compileTestJava {
dependsOn generateProto
options.compilerArgs << "-Xlint:all" << "-Werror"
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class LayerDetail extends LayerEntity implements LayerInterface {
public int timeout_llu;
public int dispatchOrder;
public int totalFrameCount;
public int slotsRequired;

public Set<String> tags = new LinkedHashSet<String>();
public Set<String> services = new LinkedHashSet<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public interface HostDao {
*/
void updateHostStats(HostInterface host, long totalMemory, long freeMemory, long totalSwap,
long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, long freeGpuMemory,
int load, Timestamp bootTime, String os);
int load, Timestamp bootTime, String os, int runningProcs);

/**
* Return true if the HardwareState is Up, false if it is anything else.
Expand Down
8 changes: 8 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ public interface LayerDao {
*/
void updateTimeoutLLU(LayerInterface layer, int timeout_llu);

/**
* Updates the slots required for a layer.
*
* @param layer the layer to update
* @param slots the number of slots required (<=0 means not slot-based)
*/
void updateLayerSlotsRequired(LayerInterface layer, int slots);

/**
* Lowers the minimum memory on a layer if the layer is using less memory and the currnet min
* memory is the dispatcher default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,23 @@ public CallableStatement createCallableStatement(Connection con) throws SQLExcep
+ " int_load = ?, "
+ " ts_booted = ?, "
+ " ts_ping = current_timestamp, "
+ " str_os = ? "
+ " str_os = ?, "
+ " int_running_procs = ? "
+ "WHERE "
+ " pk_host = ?";

@Override
public void updateHostStats(HostInterface host, long totalMemory, long freeMemory,
long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory,
long freeGpuMemory, int load, Timestamp bootTime, String os) {
long freeGpuMemory, int load, Timestamp bootTime, String os, int runningProcs) {

if (os == null) {
os = Dispatcher.OS_DEFAULT;
}

getJdbcTemplate().update(UPDATE_RENDER_HOST, totalMemory, freeMemory, totalSwap, freeSwap,
totalMcp, freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os,
host.getHostId());
runningProcs, host.getHostId());
}

@Override
Expand Down Expand Up @@ -631,7 +632,7 @@ public boolean isNimbyHost(HostInterface h) {
/**
* Checks if the passed in name looks like a fully qualified domain name. If so, returns the
* hostname without the domain. Otherwise returns the passed in name unchanged.
*
*
* @param fqdn - String
* @return String - hostname
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public LayerDetail mapRow(ResultSet rs, int rowNum) throws SQLException {
layer.services.addAll(Lists.newArrayList(rs.getString("str_services").split(",")));
layer.timeout = rs.getInt("int_timeout");
layer.timeout_llu = rs.getInt("int_timeout_llu");
layer.slotsRequired = rs.getInt("int_slots_required");
return layer;
}
};
Expand Down Expand Up @@ -241,7 +242,8 @@ public LayerInterface getLayer(String id) {
+ "int_dispatch_order, " + "str_tags, " + "str_type," + "int_cores_min, "
+ "int_cores_max, " + "b_threadable, " + "int_mem_min, " + "int_gpus_min, "
+ "int_gpus_max, " + "int_gpu_mem_min, " + "str_services, " + "int_timeout,"
+ "int_timeout_llu " + ") " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ "int_timeout_llu, " + "int_slots_required " + ") "
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

@Override
public void insertLayerDetail(LayerDetail l) {
Expand All @@ -250,7 +252,7 @@ public void insertLayerDetail(LayerDetail l) {
l.chunkSize, l.dispatchOrder, StringUtils.join(l.tags, " | "), l.type.toString(),
l.minimumCores, l.maximumCores, l.isThreadable, l.minimumMemory, l.minimumGpus,
l.maximumGpus, l.minimumGpuMemory, StringUtils.join(l.services, ","), l.timeout,
l.timeout_llu);
l.timeout_llu, l.slotsRequired);
}

@Override
Expand Down Expand Up @@ -553,6 +555,12 @@ public void updateTimeoutLLU(LayerInterface layer, int timeout_llu) {
layer.getLayerId());
}

@Override
public void updateLayerSlotsRequired(LayerInterface layer, int slots) {
getJdbcTemplate().update("UPDATE layer SET int_slots_required=? WHERE pk_layer=?", slots,
layer.getLayerId());
}

@Override
public void enableMemoryOptimizer(LayerInterface layer, boolean value) {
getJdbcTemplate().update("UPDATE layer SET b_optimize=? WHERE pk_layer=?", value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,8 @@ public Layer mapRow(ResultSet rs, int rowNum) throws SQLException {
Arrays.asList(SqlUtil.getString(rs, "str_limit_names").split(",")))
.setMemoryOptimizerEnabled(rs.getBoolean("b_optimize"))
.setTimeout(rs.getInt("int_timeout"))
.setTimeoutLlu(rs.getInt("int_timeout_llu"));
.setTimeoutLlu(rs.getInt("int_timeout_llu"))
.setSlotsRequired(rs.getInt("int_slots_required"));

LayerStats.Builder statsBuilder = LayerStats.newBuilder()
.setReservedCores(Convert.coreUnitsToCores(rs.getInt("int_cores")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void handleHostReport(HostReport report, boolean isBoot) {
rhost.getTotalSwap(), rhost.getFreeSwap(), rhost.getTotalMcp(),
rhost.getFreeMcp(), rhost.getTotalGpuMem(), rhost.getFreeGpuMem(),
rhost.getLoad(), new Timestamp(rhost.getBootTime() * 1000l),
rhost.getAttributesMap().get("SP_OS"));
rhost.getAttributesMap().get("SP_OS"), report.getFramesCount());

// Both logics are conflicting, only change hardware state if
// there was no need for a tempDirStorage state change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
import com.imageworks.spcue.grpc.job.LayerSetTimeoutResponse;
import com.imageworks.spcue.grpc.job.LayerSetTimeoutLLURequest;
import com.imageworks.spcue.grpc.job.LayerSetTimeoutLLUResponse;
import com.imageworks.spcue.grpc.job.LayerSetSlotsRequiredRequest;
import com.imageworks.spcue.grpc.job.LayerSetSlotsRequiredResponse;
import com.imageworks.spcue.grpc.job.LayerStaggerFramesRequest;
import com.imageworks.spcue.grpc.job.LayerStaggerFramesResponse;
import com.imageworks.spcue.grpc.limit.Limit;
Expand Down Expand Up @@ -432,6 +434,15 @@ public void setTimeoutLLU(LayerSetTimeoutLLURequest request,
}
}

@Override
public void setSlotsRequired(LayerSetSlotsRequiredRequest request,
StreamObserver<LayerSetSlotsRequiredResponse> responseObserver) {
updateLayer(request.getLayer());
jobManager.setLayerSlotsRequired(layer, request.getSlots());
responseObserver.onNext(LayerSetSlotsRequiredResponse.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void addLimit(LayerAddLimitRequest request,
StreamObserver<LayerAddLimitResponse> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public interface HostManager {
*/
void setHostStatistics(HostInterface host, long totalMemory, long freeMemory, long totalSwap,
long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory, long freeGpuMemory,
int load, Timestamp bootTime, String os);
int load, Timestamp bootTime, String os, int runningProcs);

void deleteHost(HostInterface host);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public void rebootNow(HostInterface host) {
@Override
public void setHostStatistics(HostInterface host, long totalMemory, long freeMemory,
long totalSwap, long freeSwap, long totalMcp, long freeMcp, long totalGpuMemory,
long freeGpuMemory, int load, Timestamp bootTime, String os) {
long freeGpuMemory, int load, Timestamp bootTime, String os, int runningProcs) {

hostDao.updateHostStats(host, totalMemory, freeMemory, totalSwap, freeSwap, totalMcp,
freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os);
freeMcp, totalGpuMemory, freeGpuMemory, load, bootTime, os, runningProcs);
}

@Transactional(propagation = Propagation.SUPPORTS, readOnly = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ public interface JobManager {
*/
void setLayerMinGpus(LayerInterface layer, int gpuUnits);

/**
* Sets the slots required for a layer.
*
* @param layer the layer to update
* @param slots the number of slots required
*/
void setLayerSlotsRequired(LayerInterface layer, int slots);

/**
* Add a limit to the given layer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ public void setLayerMinGpus(LayerInterface layer, int gpu) {
layerDao.updateLayerMinGpus(layer, gpu);
}

@Override
public void setLayerSlotsRequired(LayerInterface layer, int slots) {
layerDao.updateLayerSlotsRequired(layer, slots);
}

@Override
public void setLayerMaxGpus(LayerInterface layer, int gpu) {
layerDao.updateLayerMaxGpus(layer, gpu);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ private void handleLayerTags(BuildableJob buildableJob, Element jobTag) {
layer.timeout_llu = Integer.parseInt(layerTag.getChildTextTrim("timeout_llu"));
}

if (layerTag.getChildTextTrim("slots_required") != null) {
layer.slotsRequired = Integer.parseInt(layerTag.getChildTextTrim("slots_required"));
}

/*
* Handle the layer environment
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add a field to limit the max amount of concurrent procs a host can run
-- -1 means no limit
alter table host
add int_concurrent_procs_limit INT NOT NULL DEFAULT -1;

alter table host_stat
add int_running_procs INT NOT NULL DEFAULT 0;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Add a field to mark a layer as requiring at least a specific number of slots
-- <=0 means slots are not required
alter table layer
add int_slots_required INT NOT NULL DEFAULT 0;
106 changes: 106 additions & 0 deletions cuebot/src/main/resources/public/dtd/cjsl-1.16.dtd
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<!-- ================================================================= -->
<!-- SpiCue Job Spec DTD ver 1.16 -->
<!-- middle-tier@imageworks.com -->
<!-- ================================================================= -->

<!ELEMENT spec (facility?,dept?,show,shot,user,email?,uid?,job*,depends*)>
<!ELEMENT facility (#PCDATA)*>
<!ELEMENT dept (#PCDATA)*>
<!ELEMENT show (#PCDATA)*>
<!ELEMENT shot (#PCDATA)*>
<!ELEMENT user (#PCDATA)*>
<!ELEMENT email (#PCDATA)*>
<!ELEMENT uid (#PCDATA)*>

<!-- ================================================================= -->
<!-- Jobs -->
<!-- ================================================================= -->
<!ELEMENT job (paused?,priority?,maxretries?,maxcores?,maxgpus?,autoeat?,localbook?,os?,env*,layers?)>
<!ATTLIST job
name NMTOKEN #REQUIRED
>
<!ELEMENT paused (#PCDATA)*>
<!ELEMENT priority (#PCDATA)*>
<!ELEMENT maxretries (#PCDATA)*>
<!ELEMENT maxcores (#PCDATA)*>
<!ELEMENT maxgpus (#PCDATA)*>
<!ELEMENT autoeat (#PCDATA)*>
<!ELEMENT os (#PCDATA)*>
<!ELEMENT localbook (#PCDATA)*>
<!ATTLIST localbook
host NMTOKEN #REQUIRED
cores NMTOKEN #REQUIRED
memory NMTOKEN #REQUIRED
threads NMTOKEN #REQUIRED
gpu NMTOKEN #REQUIRED
>

<!-- ================================================================= -->
<!-- Layers -->
<!-- ================================================================= -->

<!ELEMENT layers (layer+)>
<!ELEMENT layer (cmd,range,chunk,slots_required?,cores?,threadable?,memory?,gpus?,gpu?,gpu_memory?,timeout?,timeout_llu?,tags?,limits?,env*,services?,outputs*)>
<!ATTLIST layer
name NMTOKEN #REQUIRED
type (Render | Util | Post) #REQUIRED
>
<!ELEMENT cmd (#PCDATA)*>
<!ELEMENT range (#PCDATA)*>
<!ELEMENT chunk (#PCDATA)*>
<!ELEMENT slots_required (#PCDATA)*>
<!ELEMENT cores (#PCDATA)*>
<!ELEMENT threadable (#PCDATA)*>
<!ELEMENT memory (#PCDATA)*>
<!ELEMENT gpus (#PCDATA)*>
<!ELEMENT gpu (#PCDATA)*> <!-- deprecated: use gpu_memory -->
<!ELEMENT gpu_memory (#PCDATA)*>
<!ELEMENT timeout (#PCDATA)*>
<!ELEMENT timeout_llu (#PCDATA)*>
<!ELEMENT tags (#PCDATA)*>
<!ELEMENT limits (limit+)>
<!ELEMENT services (service+)>
<!ELEMENT outputs (output*)>
<!ELEMENT env (key*)>
<!-- ================================================================= -->
<!-- Layer Services -->
<!-- ================================================================= -->
<!ELEMENT service (#PCDATA)*>

<!-- ================================================================= -->
<!-- Layer Ouuputs -->
<!-- ================================================================= -->
<!ELEMENT output (#PCDATA)*>
<!ATTLIST output
name NMTOKEN #REQUIRED
>
<!-- ================================================================= -->
<!-- Layer Limits -->
<!-- ================================================================= -->
<!ELEMENT limit (#PCDATA)*>

<!-- ================================================================= -->
<!-- Environment Variables -->
<!-- ================================================================= -->

<!ELEMENT key (#PCDATA)*>
<!ATTLIST key
name NMTOKEN #REQUIRED
>

<!-- ================================================================= -->
<!-- Dependencies -->
<!-- ================================================================= -->

<!ELEMENT depends (depend*)>
<!ELEMENT depend (depjob,deplayer?,depframe?,onjob,onlayer?,onframe?)>
<!ATTLIST depend
anyframe NMTOKEN #IMPLIED
type (LAYER_ON_SIM_FRAME|PREVIOUS_FRAME|JOB_ON_JOB|JOB_ON_LAYER|JOB_ON_FRAME|LAYER_ON_JOB|LAYER_ON_FRAME|LAYER_ON_LAYER|FRAME_ON_JOB|FRAME_ON_LAYER|FRAME_ON_FRAME|FRAME_BY_FRAME) #REQUIRED
>
<!ELEMENT depjob (#PCDATA)*>
<!ELEMENT onjob (#PCDATA)*>
<!ELEMENT deplayer (#PCDATA)*>
<!ELEMENT onlayer (#PCDATA)*>
<!ELEMENT depframe (#PCDATA)*>
<!ELEMENT onframe (#PCDATA)*>
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ public void updateHostStats() {

DispatchHost dispatchHost = hostDao.findDispatchHost(TEST_HOST);
hostDao.updateHostStats(dispatchHost, CueUtil.GB8, CueUtil.GB8, CueUtil.GB8, CueUtil.GB8,
CueUtil.GB8, CueUtil.GB8, 1, 1, 100, new Timestamp(1247526000 * 1000l), "spinux1");
CueUtil.GB8, CueUtil.GB8, 1, 1, 100, new Timestamp(1247526000 * 1000l), "spinux1",
2);

Map<String, Object> result = jdbcTemplate
.queryForMap("SELECT * FROM host_stat WHERE pk_host=?", dispatchHost.getHostId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,26 @@ public void testParseMaxCoresAndMaxGpus() {
assertEquals(job.maxGpusOverride, Integer.valueOf(42));
}

@Test
public void testParseSlotsRequired() {
String xml = readJobSpec("jobspec_1_16.xml");
JobSpec spec = jobLauncher.parse(xml);
assertEquals(spec.getDoc().getDocType().getPublicID(), "SPI Cue Specification Language");
assertEquals(spec.getDoc().getDocType().getSystemID(),
"http://localhost:8080/spcue/dtd/cjsl-1.16.dtd");
assertEquals(spec.getJobs().size(), 1);
BuildableJob job = spec.getJobs().get(0);
assertEquals(job.getBuildableLayers().size(), 2);

// First layer uses slot-based booking
LayerDetail slotBasedLayer = job.getBuildableLayers().get(0).layerDetail;
assertEquals(slotBasedLayer.name, "slot_based_layer");
assertEquals(slotBasedLayer.slotsRequired, 4);

// Second layer uses regular resource booking (default slots_required = 0)
LayerDetail regularLayer = job.getBuildableLayers().get(1).layerDetail;
assertEquals(regularLayer.name, "regular_layer");
assertEquals(regularLayer.slotsRequired, 0);
}

}
Loading
Loading