-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmpi_script.py
More file actions
164 lines (131 loc) · 5.95 KB
/
mpi_script.py
File metadata and controls
164 lines (131 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import ast
from functools import partial
import networkx as nx
import numpy as np
from typing import Dict
import matplotlib.pyplot as plt
from dataclasses import dataclass
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
from repast4py import context as ctx
from repast4py import core, random, schedule, logging, parameters
from repast4py.network import write_network, read_network
from network import *
model = None
class RumorAgent(core.Agent):
def __init__(self, nid: int, agent_type: int, rank: int, received_rumor=False, shadow=None):
super().__init__(nid, agent_type, rank)
self.received_rumor = received_rumor
self.shadow = shadow or {}
def save(self):
"""Saves the state of this agent as tuple.
A non-ghost agent will save its state using this
method, and any ghost agents of this agent will
be updated with that data (self.received_rumor).
Returns:
The agent's state
"""
return (self.uid, self.received_rumor, self.shadow)
def update(self, data: bool):
"""Updates the state of this agent when it is a ghost
agent on some rank other than its local one.
Args:
data: the new agent state (received_rumor)
"""
received_rumor, shadow_data = data[0], data[1]
if not self.received_rumor and received_rumor:
# only update if the received rumor state
# has changed from false to true
model.rumor_spreaders.append(self)
self.received_rumor = received_rumor
self.shadow = shadow_data
def create_rumor_agent(nid, agent_type, rank, **kwargs):
shadow_data = {}
if 'data' in kwargs: # New compressed format
shadow_data = decompress_and_convert_shadow_data(kwargs['data'])
return RumorAgent(nid, agent_type, rank, received_rumor=None, shadow=shadow_data)
def restore_agent(agent_data):
uid = agent_data[0]
received_rumor = agent_data[1]
shadow = agent_data[2] if len(agent_data) > 2 else {} # Handle cases where shadow might not be saved
return RumorAgent(uid[0], uid[1], uid[2], received_rumor, shadow)
@dataclass
class RumorCounts:
total_rumor_spreaders: int
new_rumor_spreaders: int
class Model:
def __init__(self, comm, params):
self.runner = schedule.init_schedule_runner(comm)
self.runner.schedule_stop(params['stop.at'])
self.runner.schedule_end_event(self.at_end)
fpath = params['network_file']
self.context = ctx.SharedContext(comm)
read_network(fpath, self.context, create_rumor_agent, restore_agent)
self.net = self.context.get_projection('rumor_network')
self.rumor_spreaders = []
self.rank = comm.Get_rank()
self._seed_rumor(params['initial_rumor_count'], comm)
rumored_count = len(self.rumor_spreaders)
self.counts = RumorCounts(rumored_count, rumored_count)
loggers = logging.create_loggers(self.counts, op=MPI.SUM, rank=self.rank)
self.data_set = logging.ReducingDataSet(loggers, comm, params['counts_file'])
self.data_set.log(0)
self.rumor_prob = params['rumor_probability']
# Schedule layer-specific steps
layer_schedules = params['layer_schedules']
for layer_id, schedule_config in enumerate(layer_schedules):
start = schedule_config['start']
interval = schedule_config['interval']
self.runner.schedule_repeating_event(start, interval, partial(self.new_step, layer=layer_id))
def _seed_rumor(self, init_rumor_count: int, comm):
world_size = comm.Get_size()
rumor_counts = np.zeros(world_size, dtype=np.int32)
if self.rank == 0:
rng = np.random.default_rng()
for _ in range(init_rumor_count):
idx = rng.integers(0, high=world_size)
rumor_counts[idx] += 1
rumor_count = np.empty(1, dtype=np.int32)
comm.Scatter(rumor_counts, rumor_count, root=0)
for agent in self.context.agents(count=rumor_count[0], shuffle=True):
agent.received_rumor = True
self.rumor_spreaders.append(agent)
def at_end(self):
self.data_set.close()
def new_step(self, layer):
print(f"Rank {self.rank} is executing step {self.runner.schedule.tick} for layer {layer}")
new_rumor_spreaders = []
rng = np.random.default_rng()
for agent in self.rumor_spreaders:
# Agent does not have outgoing edges in this layer
if layer not in agent.shadow.keys():
continue
ngh_tuples = agent.shadow[layer].keys()
for ngh_tuple in ngh_tuples:
ngh_agent = self.context.agent(ngh_tuple)
if ngh_agent is None:
continue # Neighbor not found (shouldn't happen if network is correct)
# Only spread to local agents that haven't received the rumor
if ngh_agent.local_rank == self.rank and not ngh_agent.received_rumor:
if rng.uniform() <= self.rumor_prob:
ngh_agent.received_rumor = True
new_rumor_spreaders.append(ngh_agent)
# Update the list of rumor spreaders with new local agents
self.rumor_spreaders += new_rumor_spreaders
# Update counts
self.counts.new_rumor_spreaders = len(new_rumor_spreaders)
self.counts.total_rumor_spreaders += self.counts.new_rumor_spreaders
self.data_set.log(self.runner.schedule.tick)
# Synchronize agents across ranks
self.context.synchronize(restore_agent)
def start(self):
self.runner.execute()
def run(params: Dict):
global model
model = Model(MPI.COMM_WORLD, params)
model.start()
if __name__ == "__main__":
parser = parameters.create_args_parser()
args = parser.parse_args()
params = parameters.init_params(args.parameters_file, args.parameters)
run(params)