Conversation
|
the calculation within the streaming each row is taking about ~2sec to solve the simulation with mapping of uuids and converting into pyarrow table to have a 30hz streaming data. currently only getting about 30 points per minute. |
cbrochtrup
left a comment
There was a problem hiding this comment.
This is super cool! I can think of a lot of great uses for this tool already. Thanks for sharing!
| _streamset = StreamSet([Stream(conn, uuid=k) for k in | ||
| data_map.keys()]) | ||
| _streamset.arrow_insert(data_map) | ||
| if i % INSERT_BATCH_SIZE == 0: |
There was a problem hiding this comment.
What happens if end this loop when this condition is not met? will the data not flush? If so, we should add another condition here.
| if i % INSERT_BATCH_SIZE == 0: | |
| if i % INSERT_BATCH_SIZE == 0 or i == new_load.shape[1]-1: |
There was a problem hiding this comment.
right now for this, i was just testing for running for a few mins at a time, and wanted points inserted with a specific flush condition. i'm not sure if the INSERT_BATCH_SIZE is actually even a good number for flushing.
at the end if not flushed will automatically flushed after 8 hours according to the pqm.
| class TestOpendssIngestor: | ||
| def test_initialize_simulation(self): | ||
| # Arrange | ||
| mock_model_loc = os.path.join( | ||
| MODEL_REL_PATH, | ||
| "Models/13Bus/IEEE13Nodeckt.dss" | ||
| ) | ||
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | ||
| 485., 68., 290., 170., 128., 17., 66., 117.], | ||
| ['671', '634a', '634b', '634c', '645', '646', '692', | ||
| '675a', '675b', '675c', '611', '652', '670a', '670b', | ||
| '670c']) | ||
| # Act | ||
| results = initialize_simulation(mock_model_loc) | ||
| assert results[0].tolist() == load | ||
| assert results[-1] == load_names |
There was a problem hiding this comment.
Nice tests. This setup is great and it'd be useful in a fixture to improve our testing suite. IIUC, we have to run test_initialize_simulation before we run test_simulate_network otherwise the dss model won't be loaded.
To that end, we should make the initialization a fixture both tests can explicitly depend on the setup.
| class TestOpendssIngestor: | |
| def test_initialize_simulation(self): | |
| # Arrange | |
| mock_model_loc = os.path.join( | |
| MODEL_REL_PATH, | |
| "Models/13Bus/IEEE13Nodeckt.dss" | |
| ) | |
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | |
| 485., 68., 290., 170., 128., 17., 66., 117.], | |
| ['671', '634a', '634b', '634c', '645', '646', '692', | |
| '675a', '675b', '675c', '611', '652', '670a', '670b', | |
| '670c']) | |
| # Act | |
| results = initialize_simulation(mock_model_loc) | |
| assert results[0].tolist() == load | |
| assert results[-1] == load_names | |
| @pytest.fixture(scope="session") | |
| def _initialize_simulation(): | |
| class TestOpendssIngestor: | |
| @pytest.fixutre(scope="session") | |
| def simulation_setup(self, initialize_simulation): | |
| # Arrange | |
| mock_model_loc = os.path.join( | |
| MODEL_REL_PATH, | |
| "Models/13Bus/IEEE13Nodeckt.dss" | |
| ) | |
| # Act | |
| return initialize_simulation(mock_model_loc) | |
| def test_initialize_simulation(self, simulation_setup): | |
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | |
| 485., 68., 290., 170., 128., 17., 66., 117.], | |
| ['671', '634a', '634b', '634c', '645', '646', '692', | |
| '675a', '675b', '675c', '611', '652', '670a', '670b', | |
| '670c']) | |
| assert simulation_setup[0].tolist() == load | |
| assert simulation_setup[-1] == load_names |
It may also make sense have run_simulation use an already initialized test. That way we can test these two functions more easily. So maybe the main function looks more like
def main(stuff):
results = initialize_simulation(path)
run_simulation(other_args_plus, results)There was a problem hiding this comment.
still needs more test added definitely. and the initialize_simulation should be called. I wanted this test to make sure that we always have the Model files. all 3 files are needed, or else we get an error in the subsequent calls to the Opendss modeling code.
| mock_model_loc = os.path.join( | ||
| MODEL_REL_PATH, | ||
| "Models/13Bus/IEEE13Nodeckt.dss" | ||
| ) |
There was a problem hiding this comment.
we also create this model path in simulation_utils. Can we create this path in one spot and use that reference both here and in simulation_utils.py
|
|
||
| def get_loads() -> Tuple[np.ndarray, List[str]]: | ||
| """Get all the loads in the network""" | ||
| load_names = dss.Loads.AllNames() |
There was a problem hiding this comment.
How does this function know where to load the files from?
| while True: | ||
| for row in data.iterrows(): | ||
| streamset.insert(row.to_dict()) |
There was a problem hiding this comment.
iterrows is terrifyingly slow sometimes, 10x slower than itertuples. Can we use that here instead. The updated code would look something like this:
| while True: | |
| for row in data.iterrows(): | |
| streamset.insert(row.to_dict()) | |
| while True: | |
| for row in data.itertuples(index=False): | |
| streamset.insert(row._asdict()) |
| collection = bus_name | ||
| if ismag: | ||
| lastltr = "M" | ||
| else: | ||
| lastltr = "A" | ||
|
|
||
| name = "V" + phase + lastltr | ||
| return collection, name |
There was a problem hiding this comment.
I think we can remove bus_name from this function. We return collection which is equal to bus_name.
Creating opendss_simulations into ingress tool to make available to any python programmers.