forked from Special-K-s-Flightsim-Bots/DCSServerBot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase.py
More file actions
189 lines (154 loc) · 6.9 KB
/
base.py
File metadata and controls
189 lines (154 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from __future__ import annotations
import asyncio
import inspect
import logging
import os
from abc import ABC
from core import utils, Port
from enum import Enum
from functools import wraps
from pathlib import Path
from typing import Callable, Any, TYPE_CHECKING
from core.const import DEFAULT_TAG
from core.data.dataobject import DataObject
if TYPE_CHECKING:
from core import Server, NodeImpl
# ruamel YAML support
from pykwalify.errors import PyKwalifyException
from ruamel.yaml import YAML
from ruamel.yaml.error import MarkedYAMLError
yaml = YAML()
__all__ = [
"proxy",
"Service",
"ServiceInstallationError"
]
logger = logging.getLogger(__name__)
def proxy(_func: Callable[..., Any] | None = None, *, timeout: float = 60):
"""
Can be used as a decorator to any service method, that should act as a remote call, if the server provided
is not on the same node.
@proxy
async def my_fancy_method(self, server: Server, *args, **kwargs) -> Any:
...
This will call my_fancy_method on the remote node if the server is remote, and on the local node, if it is not.
"""
def decorator(original_function: Callable[..., Any]):
@wraps(original_function)
async def wrapper(self, *args, **kwargs):
signature = inspect.signature(original_function)
bound_args = signature.bind(self, *args, **kwargs)
bound_args.apply_defaults()
arg_dict = {k: v for k, v in bound_args.arguments.items() if k != "self"}
# Dereference DataObject and Enum values in parameters
params = {
k: v.name if isinstance(v, DataObject)
else v.value if isinstance(v, Enum)
else v
for k, v in arg_dict.items()
if v is not None # Ignore None values
}
call = {
"command": "rpc",
"service": self.__class__.__name__,
"method": original_function.__name__,
"params": params
}
# Try to pick the node from the functions arguments
node = None
if arg_dict.get("server"):
node = arg_dict["server"].node
elif arg_dict.get("instance"):
node = arg_dict["instance"].node
elif arg_dict.get("node"):
node = arg_dict["node"]
# Log an error if no valid object is found
if node is None:
raise ValueError(
f"Cannot proxy function {original_function.__name__}: no valid reference object found in arguments. "
f"Expected 'server', 'instance', or 'node' parameter with valid node reference.")
# If the node is remote, send the call synchronously
if node.is_remote:
data = await self.bus.send_to_node_sync(call, node=node.name, timeout=timeout)
return data
# Otherwise, call the original function directly
return await original_function(self, *args, **kwargs)
return wrapper
# If used as @proxy(timeout=nn)
if _func is None:
return decorator
# If used as @proxy without parentheses
return decorator(_func)
class Service(ABC):
dependencies: list[type[Service]] = None
def __init__(self, node: NodeImpl, name: str | None = None):
self.name = name or self.__class__.__name__
self.running: bool = False
self.node = node
self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self.pool = node.pool
self.apool = node.apool
self.config = node.config
self.locals = self.read_locals()
self._config = dict[str, dict]()
async def start(self, *args, **kwargs):
from .registry import ServiceRegistry
self.log.info(f' => Starting Service {self.name} ...')
if self.dependencies:
for dependency in self.dependencies:
for i in range(30):
if ServiceRegistry.get(dependency).is_running():
break
self.log.debug(f"Waiting for service {dependency} ...")
await asyncio.sleep(.1)
else:
raise TimeoutError(f"Timeout during start of Service {self.__class__.__name__}, "
f"dependent service {dependency.__name__} is not running.")
self.log.debug(f"Dependent service {dependency.__name__} is running.")
self.running = True
async def stop(self, *args, **kwargs):
self.running = False
self.log.info(f' => Service {self.name} stopped.')
async def switch(self):
...
def is_running(self) -> bool:
return self.running
def read_locals(self) -> dict:
filename = os.path.join(self.node.config_dir, 'services', f'{self.name.lower()}.yaml')
if not os.path.exists(filename):
return {}
self.log.debug(f' - Reading service configuration from {filename} ...')
try:
path = os.path.join('services', self.name.lower(), 'schemas')
validation = self.node.config.get('validation', 'lazy')
if os.path.exists(path) and validation in ['strict', 'lazy']:
schema_files = [str(x) for x in Path(path).glob('*.yaml')]
if schema_files:
utils.validate(filename, schema_files, raise_exception=(validation == 'strict'))
else:
self.log.warning(f'No schema file for service "{self.name}" found.')
return yaml.load(Path(filename).read_text(encoding='utf-8'))
except (MarkedYAMLError, PyKwalifyException) as ex:
raise ServiceInstallationError(self.name, ex.__str__())
def save_config(self):
with open(os.path.join(self.node.config_dir, 'services', self.name + '.yaml'),
mode='w', encoding='utf-8') as outfile:
yaml.dump(self.locals, outfile)
def get_config(self, server: Server | None = None, **kwargs) -> dict:
if not server:
return self.locals.get(DEFAULT_TAG, {})
if server.node.name not in self._config:
self._config[server.node.name] = {}
if server.instance.name not in self._config[server.node.name]:
self._config[server.node.name][server.instance.name] = utils.deep_merge(
self.locals.get(DEFAULT_TAG, {}),
self.locals.get(server.node.name, self.locals).get(server.instance.name, {})
)
return self._config.get(server.node.name, {}).get(server.instance.name, {})
def reload(self):
self.locals = self.read_locals()
def get_ports(self) -> dict[str, Port]:
return {}
class ServiceInstallationError(Exception):
def __init__(self, service: str, reason: str):
super().__init__(f'Service "{service.title()}" could not be installed: {reason}')