forked from Special-K-s-Flightsim-Bots/DCSServerBot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmission.py
More file actions
87 lines (76 loc) · 3.3 KB
/
mission.py
File metadata and controls
87 lines (76 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations
import asyncio
import os
from core.data.dataobject import DataObject, DataObjectFactory
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING
from .. import Status, utils
if TYPE_CHECKING:
from .server import Server
__all__ = ["Mission"]
@dataclass
@DataObjectFactory.register()
class Mission(DataObject):
server: Server = field(compare=False)
map: str
start_time: int = field(compare=False, default=0)
mission_time: int = field(compare=False, default=0)
real_time: int = field(compare=False, default=0)
filename: str = None
date: str | datetime = None
num_slots_blue = 0
num_slots_red = 0
weather: dict = field(repr=False, default_factory=dict)
clouds: dict = field(repr=False, default_factory=dict)
airbases: list = field(repr=False, default_factory=list)
@property
def display_name(self) -> str:
return utils.escape_string(self.name)
async def pause(self):
if self.server.status == Status.RUNNING:
await self.server.send_to_dcs({"command": "pauseMission"})
await self.server.wait_for_status_change([Status.PAUSED])
async def unpause(self):
if self.server.status == Status.PAUSED:
await self.server.send_to_dcs({"command": "unpauseMission"})
await self.server.wait_for_status_change([Status.RUNNING])
async def restart(self):
await self.server.send_to_dcs({"command": "restartMission"})
# wait for a status change (STOPPED or LOADING)
timeout = 180 if self.node.locals.get('slow_system', False) else 120
await self.server.wait_for_status_change([Status.STOPPED, Status.LOADING], timeout)
# wait until we are running again
try:
await self.server.wait_for_status_change([Status.RUNNING, Status.PAUSED], timeout)
except (TimeoutError, asyncio.TimeoutError):
self.log.debug(f'Trying to force start server "{self.server.name}" due to DCS bug.')
await self.server.start()
def update(self, data: dict):
if 'current_map' in data:
self.map = data['current_map']
if 'current_mission' in data:
self.name = data['current_mission'].replace('.sav', '')
if 'start_time' in data:
self.start_time = data['start_time']
if 'mission_time' in data:
self.mission_time = data['mission_time']
if 'real_time' in data:
self.real_time = data['real_time']
if 'filename' in data:
self.filename = os.path.normpath(data['filename'])
if 'num_slots_blue' in data:
self.num_slots_blue = data['num_slots_blue']
if 'num_slots_red' in data:
self.num_slots_red = data['num_slots_red']
if 'date' in data:
if data['date']['Year'] >= 1970:
self.date = datetime(data['date']['Year'], data['date']['Month'], data['date']['Day'], 0, 0)
else:
self.date = '{}-{:02d}-{:02d}'.format(data['date']['Year'], data['date']['Month'], data['date']['Day'])
if 'weather' in data:
self.weather = data['weather']
if 'clouds' in data:
self.clouds = data['clouds']
if 'airbases' in data:
self.airbases = data['airbases']