Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions lib/ood_core/job/adapters/pbspro.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "time"
require "json"
require "ood_core/refinements/hash_extensions"
require "ood_core/job/adapters/helper"

Expand Down Expand Up @@ -85,6 +86,50 @@ def initialize(host: nil, submit_host: "", strict_host_checking: true, pbs_exec:
@bin_overrides = bin_overrides
end

# Get a ClusterInfo object containing information about the given cluster
# @return [ClusterInfo] object containing cluster details
def get_cluster_info
args = ["-a", "-F", "json"]
stdout = call("pbsnodes", *args)
node_info = JSON.parse(stdout)

# Initialize cluster info values
total_nodes = 0
allocated_nodes = 0
total_cpus = 0
allocated_cpus = 0
total_gpus = 0
allocated_gpus = 0

nodes = node_info.fetch('nodes', {})

nodes.each do |_node_name, node|
total_nodes += 1
resources_avail = node.fetch('resources_available', {})
total_cpus += get_node_resource(resources_avail, 'ncpus')
total_gpus += get_node_resource(resources_avail, 'ngpus')

# Resources assigned (currently allocated to jobs)
resources_assigned = node.fetch('resources_assigned', {})
ncpus_assigned = get_node_resource(resources_assigned, 'ncpus')
ngpus_assigned = get_node_resource(resources_assigned, 'ngpus')

allocated_cpus += ncpus_assigned
allocated_gpus += ngpus_assigned

# A node is allocated if at least one CPU has been assigned to a job
allocated_nodes += 1 if ncpus_assigned > 0
end

ClusterInfo.new(active_nodes: allocated_nodes,
total_nodes: total_nodes,
active_processors: allocated_cpus,
total_processors: total_cpus,
active_gpus: allocated_gpus,
total_gpus: total_gpus
)
end

# Get a list of hashes detailing each of the jobs on the batch server
# @example Status info for all jobs
# my_batch.get_jobs
Expand Down Expand Up @@ -174,6 +219,13 @@ def submit_string(str, args: [], chdir: nil)
end

private
# Get a resource value from a node's resources hash, returning 0 if the
# resource is not present
def get_node_resource(resources, key)
val = resources.fetch(key, 0)
val.to_i
end

# Call a forked PBS Pro command for a given batch server
def call(cmd, *args, env: {}, stdin: "", chdir: nil)
cmd = cmd.to_s
Expand Down Expand Up @@ -299,6 +351,10 @@ def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
raise JobAdapterError, e.message
end

def cluster_info
@pbspro.get_cluster_info
end

# Retrieve info for all jobs from the resource manager
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
Expand Down
Loading