Skip to content

pyfsd

Root package of PyFSD.

Attributes:

Name Type Description
__version__ str

Version of PyFSD.

__main__

PyFSD entrypoint.

db_tables

PyFSD database tables.

Attributes:

Name Type Description
metadata MetaData

SQLAlchemy metadata.

users_table Table

Table used to store user info.

Note

These databases were initialized in pyfsd.main.main

define

PyFSD -- pyfsd.define package.

Most definition and utils.

broadcast

The core of PyFSD broadcast system -- broadcast checker.

Attributes:

Name Type Description
BroadcastChecker

Type of a broadcast checker, used in pyfsd.factory.client.ClientFactory.broadcast.

Example

ClientFactory.broadcast(..., check_func=at_checker)

all_ATC_checker

all_ATC_checker(_: Optional[Client], to_client: Client) -> bool

A broadcast checker which only broadcast to ATC.

Parameters:

Name Type Description Default
to_client Client

The dest client.

required

Returns:

Type Description
bool

The check result (send message to to_client or not).

Source code in src/pyfsd/define/broadcast.py
124
125
126
127
128
129
130
131
132
133
def all_ATC_checker(_: Optional[Client], to_client: Client) -> bool:  # noqa: N802
    """A broadcast checker which only broadcast to ATC.

    Parameters:
        to_client: The dest client.

    Returns:
        The check result (send message to to_client or not).
    """
    return to_client.is_controller

all_pilot_checker

all_pilot_checker(_: Optional[Client], to_client: Client) -> bool

A broadcast checker which only broadcast to pilot.

Parameters:

Name Type Description Default
to_client Client

The dest client.

required

Returns:

Type Description
bool

The check result (send message to to_client or not).

Source code in src/pyfsd/define/broadcast.py
136
137
138
139
140
141
142
143
144
145
def all_pilot_checker(_: Optional[Client], to_client: Client) -> bool:
    """A broadcast checker which only broadcast to pilot.

    Parameters:
        to_client: The dest client.

    Returns:
        The check result (send message to to_client or not).
    """
    return not to_client.is_controller

at_checker

at_checker(from_client: Optional[Client], to_client: Client) -> bool

A broadcast checker which checks visual range when dest startswith @.

Parameters:

Name Type Description Default
from_client Optional[Client]

The from client.

required
to_client Client

The dest client.

required

Returns:

Type Description
bool

The check result (send message to to_client or not).

Source code in src/pyfsd/define/broadcast.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def at_checker(from_client: Optional[Client], to_client: Client) -> bool:
    """A broadcast checker which checks visual range when dest startswith @.

    Parameters:
        from_client: The from client.
        to_client: The dest client.

    Returns:
        The check result (send message to to_client or not).
    """
    if from_client is None:
        raise RuntimeError("at_checker needs from_client")
    if not from_client.position_ok or not to_client.position_ok:
        return False
    distance = calc_distance(from_client.position, to_client.position)
    return distance < from_client.get_range()

broadcast_checkers

broadcast_checkers(*checkers: BroadcastChecker) -> BroadcastChecker

Create a set of broadcast.

Parameters:

Name Type Description Default
checkers BroadcastChecker

The broadcast checkers.

()

Returns:

Type Description
BroadcastChecker

The broadcast checker.

Source code in src/pyfsd/define/broadcast.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def broadcast_checkers(*checkers: BroadcastChecker) -> BroadcastChecker:
    """Create a set of broadcast.

    Parameters:
        checkers: The broadcast checkers.

    Returns:
        The broadcast checker.
    """

    def checker(from_client: Optional[Client], to_client: Client) -> bool:
        return all(checker(from_client, to_client) for checker in checkers)

    return checker

broadcast_message_checker

broadcast_message_checker(from_client: Optional[Client], to_client: Client) -> bool

A broadcast checker which checks visual range while broadcasting message.

Parameters:

Name Type Description Default
from_client Optional[Client]

The from client.

required
to_client Client

The dest client.

required

Returns:

Type Description
bool

The check result (send message to to_client or not).

Source code in src/pyfsd/define/broadcast.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def broadcast_message_checker(from_client: Optional[Client], to_client: Client) -> bool:
    """A broadcast checker which checks visual range while broadcasting message.

    Parameters:
        from_client: The from client.
        to_client: The dest client.

    Returns:
        The check result (send message to to_client or not).
    """
    if from_client is None:
        raise RuntimeError("broadcast_message_checker needs from_client")
    if not from_client.position_ok or not to_client.position_ok:
        return False
    visual_range: int
    x: int = to_client.get_range()
    y: int = from_client.get_range()
    if (not from_client.is_controller) and (not to_client.is_controller):
        visual_range = x + y
    else:
        visual_range = max(y, x)
    distance = calc_distance(from_client.position, to_client.position)
    return distance < visual_range

broadcast_position_checker

broadcast_position_checker(from_client: Optional[Client], to_client: Client) -> bool

A broadcast checker which checks visual range while broadcasting position.

Parameters:

Name Type Description Default
from_client Optional[Client]

The from client.

required
to_client Client

The dest client.

required

Returns:

Type Description
bool

The check result (send message to to_client or not).

Source code in src/pyfsd/define/broadcast.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def broadcast_position_checker(
    from_client: Optional[Client],
    to_client: Client,
) -> bool:
    """A broadcast checker which checks visual range while broadcasting position.

    Parameters:
        from_client: The from client.
        to_client: The dest client.

    Returns:
        The check result (send message to to_client or not).
    """
    if from_client is None:
        raise RuntimeError("broadcast_position_checker needs from_client")
    if not from_client.position_ok or not to_client.position_ok:
        return False
    visual_range: int
    x: int = to_client.get_range()
    y: int = from_client.get_range()
    if to_client.is_controller:
        visual_range = to_client.visual_range
    elif not from_client.is_controller:
        visual_range = x + y
    else:
        visual_range = max(x, y)
    distance = calc_distance(from_client.position, to_client.position)
    return distance < visual_range

create_broadcast_range_checker

create_broadcast_range_checker(visual_range: int) -> BroadcastChecker

Create a broadcast checker which checks visual range.

Parameters:

Name Type Description Default
visual_range int

Visual range.

required

Returns:

Type Description
BroadcastChecker

The broadcast checker.

Source code in src/pyfsd/define/broadcast.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def create_broadcast_range_checker(visual_range: int) -> BroadcastChecker:
    """Create a broadcast checker which checks visual range.

    Parameters:
        visual_range: Visual range.

    Returns:
        The broadcast checker.
    """

    def checker(from_client: Optional[Client], to_client: Client) -> bool:
        if from_client is None:
            raise RuntimeError("broadcast_range_checker needs from_client")
        if not from_client.position_ok or not to_client.position_ok:
            return False
        distance = calc_distance(from_client.position, to_client.position)
        return distance < visual_range

    return checker

is_multicast

is_multicast(callsign: str) -> bool

Determine if dest callsign is multicast sign.

Parameters:

Name Type Description Default
callsign str

The dest callsign.

required

Returns:

Type Description
bool

Is multicast or not.

Source code in src/pyfsd/define/broadcast.py
166
167
168
169
170
171
172
173
174
175
def is_multicast(callsign: str) -> bool:
    """Determine if dest callsign is multicast sign.

    Parameters:
        callsign: The dest callsign.

    Returns:
        Is multicast or not.
    """
    return callsign in {"*", "*A"} or (callsign == "*P" or callsign.startswith("@"))

check_dict

Tools to perform runtime TypedDict type check.

It can be used to perform config check. Only TypedDict, Literal, NotRequired, Union, List and Dict are supported.

Attributes:

Name Type Description
DictStructure

Type of a object describes structure of a dict, can be TypedDict or dict.

TypeHint

Type of a type hint.

Examples:

>>> list(check_simple_type(1, Union[int, str]))
[]
>>> list(check_simple_type(b"imbytes", Union[int, str]))
[VerifyTypeError('object', typing.Union[int, str], b'imbytes')]
>>> list(check_dict({ "a": 1 }, TypedDict("A", { "a": int })))
[]
>>> list(check_dict({ "a": "imstr" }, TypedDict("A", { "a": int })))
[VerifyTypeError("dict['a']", <class 'int'>, 'imstr')]

VerifyKeyError

VerifyKeyError(dict_name: str, key: Hashable, type_: Literal['missing', 'extra'])

Bases: KeyError

A exception describes a missing or extra key in a dict.

Attributes:

Name Type Description
dict_name str

The dict name.

key Hashable

The key name.

type Literal['missing', 'extra']

Type of error, a missing or extra key found.

Parameters:

Name Type Description Default
dict_name str

The dict name.

required
key Hashable

The key name.

required
type_ Literal['missing', 'extra']

Type of error, a missing or extra key found.

required
Source code in src/pyfsd/define/check_dict.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def __init__(
    self, dict_name: str, key: Hashable, type_: Literal["missing", "extra"]
) -> None:
    """Create a VerifyKeyError instance.

    Args:
        dict_name: The dict name.
        key: The key name.
        type_: Type of error, a missing or extra key found.
    """
    self.dict_name = dict_name
    self.key = key
    self.type = type_
    super().__init__(dict_name, key, type_)
__eq__
__eq__(other: object) -> bool

Return self==other.

Source code in src/pyfsd/define/check_dict.py
191
192
193
194
195
196
197
def __eq__(self, other: object) -> bool:
    """Return self==other."""
    if self is other:
        return True
    if isinstance(other, VerifyKeyError):
        return self.dict_name == other.dict_name and self.type == other.type
    return NotImplemented
__str__
__str__() -> str

Format a VerifyKeyError to string.

Returns:

Type Description
str

The formatted string, includes name, error type

Source code in src/pyfsd/define/check_dict.py
183
184
185
186
187
188
189
def __str__(self) -> str:
    """Format a VerifyKeyError to string.

    Returns:
        The formatted string, includes name, error type
    """
    return f"{self.dict_name}[{self.key!r}] is {self.type}"

VerifyTypeError

VerifyTypeError(name: str, excepted: TypeHint, actually: object)

Bases: TypeError

A exception describes a value does not match specified type.

Attributes:

Name Type Description
name str

The name of this value.

excepted TypeHint

The expected type.

actually object

The actually value.

Parameters:

Name Type Description Default
name str

The name of the value.

required
excepted TypeHint

The expected type.

required
actually object

The actually value.

required
Source code in src/pyfsd/define/check_dict.py
114
115
116
117
118
119
120
121
122
123
124
125
def __init__(self, name: str, excepted: TypeHint, actually: object) -> None:
    """Create a VerifyTypeError instance.

    Args:
        name: The name of the value.
        excepted: The expected type.
        actually: The actually value.
    """
    self.name = name
    self.excepted = excepted
    self.actually = actually
    super().__init__(name, excepted, actually)
__eq__
__eq__(other: object) -> bool

Check if another object equals to this ConfigTypeError.

Returns:

Type Description
bool

Equals or not.

Source code in src/pyfsd/define/check_dict.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def __eq__(self, other: object) -> bool:
    """Check if another object equals to this ConfigTypeError.

    Returns:
        Equals or not.
    """
    if self is other:
        return True
    if isinstance(other, VerifyTypeError):
        return (
            self.name == other.name
            and self.excepted == other.excepted
            and self.actually == other.actually
        )
    return NotImplemented
__str__
__str__() -> str

Format a VerifyTypeError to string.

Returns:

Type Description
str

The formatted string, includes name, expected type and actually value

Source code in src/pyfsd/define/check_dict.py
127
128
129
130
131
132
133
134
135
136
def __str__(self) -> str:
    """Format a VerifyTypeError to string.

    Returns:
        The formatted string, includes name, expected type and actually value
    """
    return (
        f"'{self.name}' must be {explain_type(self.excepted)}"
        f", not {type(self.actually).__name__}"
    )

assert_dict

assert_dict(dict_obj: dict, structure: DictStructure, *, name: str = 'dict', allow_extra_keys: bool = False) -> None

Wrapper of check_dict, which raises once an error is generated.

Check type of a dict accord TypedDict.

Parameters:

Name Type Description Default
dict_obj dict

The dict to be checked.

required
structure DictStructure

Expected type.

required
name str

Name of the dict.

'dict'
allow_extra_keys bool

Allow extra keys in dict_obj or not.

False

Raises:

Type Description
VerifyTypeError

When found type error.

VerifyKeyError

When found a type error about key.

TypeError

When an unsupported/invalid type passed.

Source code in src/pyfsd/define/check_dict.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def assert_dict(
    dict_obj: dict,
    structure: DictStructure,
    *,
    name: str = "dict",
    allow_extra_keys: bool = False,
) -> None:
    """Wrapper of check_dict, which raises once an error is generated.

    Check type of a dict accord TypedDict.

    Args:
        dict_obj: The dict to be checked.
        structure: Expected type.
        name: Name of the dict.
        allow_extra_keys: Allow extra keys in dict_obj or not.

    Raises:
        VerifyTypeError: When found type error.
        VerifyKeyError: When found a type error about key.
        TypeError: When an unsupported/invalid type passed.
    """
    try:
        error = next(
            iter(
                check_dict(
                    dict_obj,
                    structure,
                    name=name,
                    allow_extra_keys=allow_extra_keys,
                )
            )
        )
    except StopIteration:
        pass
    else:
        raise error

assert_simple_type

assert_simple_type(obj: object, typ: TypeHint, name: str = 'object') -> None

Wrapper of check_simple_type, but raise first error.

Simple runtime type checker, supports Union, Literal, List, Dict.

Parameters:

Name Type Description Default
obj object

The object to be verified.

required
typ TypeHint

The expected type. Union, Literal, List, Dict or runtime checkable type

required
name str

Name of the object.

'object'

Raises:

Type Description
VerifyTypeError

When a type error detected.

TypeError

When an unsupported type is specified.

Source code in src/pyfsd/define/check_dict.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def assert_simple_type(
    obj: object,
    typ: TypeHint,
    name: str = "object",
) -> None:
    """Wrapper of check_simple_type, but raise first error.

    Simple runtime type checker, supports Union, Literal, List, Dict.

    Args:
        obj: The object to be verified.
        typ: The expected type. Union, Literal, List, Dict or runtime checkable type
        name: Name of the object.

    Raises:
        VerifyTypeError: When a type error detected.
        TypeError: When an unsupported type is specified.
    """
    try:
        error = next(iter(check_simple_type(obj, typ, name)))
    except StopIteration:
        pass
    else:
        raise error

check_dict

check_dict(dict_obj: dict, structure: DictStructure, *, name: str = 'dict', allow_extra_keys: bool = False) -> Iterable[Union[VerifyTypeError, VerifyKeyError]]

Check type of a dict accord TypedDict.

Parameters:

Name Type Description Default
dict_obj dict

The dict to be checked.

required
structure DictStructure

Expected type.

required
name str

Name of the dict.

'dict'
allow_extra_keys bool

Allow extra keys in dict_obj or not.

False

Yields:

Type Description
Iterable[Union[VerifyTypeError, VerifyKeyError]]

Detected type error, in VerifyTypeError / VerifyKeyError

Raises:

Type Description
TypeError

When an unsupported/invalid type passed.

Examples:

>>> class AType(TypedDict):
...     a: int
...
>>> list(check_dict({ "a": 114514 }, AType, allow_extra_keys=False))
[]
>>> list(check_dict({ "a": "" }, AType, allow_extra_keys=False, name="mything"))
[VerifyTypeError("mything['a']", <class 'int'>, '')]
>>> list(check_dict({}, AType))
[VerifyKeyError('dict', 'a', 'missing')]
>>> list(check_dict(
...     { "a": 114514, "b": 1919810 },
...     AType,
...     allow_extra_keys=True
... ))
[]
>>> list(check_dict(
...     { "a": 114514, "b": 1919810 },
...     AType,
...     allow_extra_keys=False
... ))
[VerifyKeyError('dict', 'b', 'extra')]
Source code in src/pyfsd/define/check_dict.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def check_dict(
    dict_obj: dict,
    structure: DictStructure,
    *,
    name: str = "dict",
    allow_extra_keys: bool = False,
) -> Iterable[Union[VerifyTypeError, VerifyKeyError]]:
    """Check type of a dict accord TypedDict.

    Args:
        dict_obj: The dict to be checked.
        structure: Expected type.
        name: Name of the dict.
        allow_extra_keys: Allow extra keys in dict_obj or not.

    Yields:
        Detected type error, in VerifyTypeError / VerifyKeyError

    Raises:
        TypeError: When an unsupported/invalid type passed.

    Examples:
        >>> class AType(TypedDict):
        ...     a: int
        ...
        >>> list(check_dict({ "a": 114514 }, AType, allow_extra_keys=False))
        []
        >>> list(check_dict({ "a": "" }, AType, allow_extra_keys=False, name="mything"))
        [VerifyTypeError("mything['a']", <class 'int'>, '')]
        >>> list(check_dict({}, AType))
        [VerifyKeyError('dict', 'a', 'missing')]
        >>> list(check_dict(
        ...     { "a": 114514, "b": 1919810 },
        ...     AType,
        ...     allow_extra_keys=True
        ... ))
        []
        >>> list(check_dict(
        ...     { "a": 114514, "b": 1919810 },
        ...     AType,
        ...     allow_extra_keys=False
        ... ))
        [VerifyKeyError('dict', 'b', 'extra')]
    """

    def deal_dict_not_required(
        dic: Mapping,
    ) -> Iterable[tuple[Hashable, Union[TypeHint, DictStructure]]]:
        for key, typ in dic.items():
            if get_origin(typ) in (NotRequired, NotRequired_ext):
                yield key, get_args(typ)[0]
            else:
                yield key, typ

    left_keys = list(dict_obj.keys())
    required_keys = tuple(lookup_required(structure))
    # New get_type_hints will change NotRequired[...] into ...
    for key, type_ in (
        new_get_type_hints(structure).items()
        if is_typeddict(structure)
        else deal_dict_not_required(structure)  # type: ignore[arg-type]
    ):
        try:
            value = dict_obj[key]
        except KeyError:
            if key in required_keys:
                yield VerifyKeyError(name, key, "missing")
            continue
        else:
            if not allow_extra_keys:
                left_keys.remove(key)
        if is_typeddict(type_) or isinstance(type_, dict):
            if not isinstance(value, dict):
                yield VerifyTypeError(f"{name}[{key!r}]", type_, value)
            else:
                yield from check_dict(
                    value,
                    type_,  # type: ignore[arg-type]
                    name=f"{name}[{key!r}]",
                    allow_extra_keys=allow_extra_keys,
                )
        else:
            yield from check_simple_type(value, type_, name=f"{name}[{key!r}]")
    if not allow_extra_keys and left_keys:
        for left_key in left_keys:
            yield VerifyKeyError(name, left_key, "extra")

check_simple_type

check_simple_type(obj: object, typ: TypeHint, name: str = 'object') -> Iterable[VerifyTypeError]

Simple runtime type checker, supports Union, Literal, List, Dict.

Parameters:

Name Type Description Default
obj object

The object to be verified.

required
typ TypeHint

The expected type. Union, Literal, List, Dict or runtime checkable type

required
name str

Name of the object.

'object'

Yields:

Type Description
Iterable[VerifyTypeError]

When a type error was detected.

Raises:

Type Description
TypeError

When an unsupported type is specified.

Source code in src/pyfsd/define/check_dict.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def check_simple_type(
    obj: object,
    typ: TypeHint,
    name: str = "object",
) -> Iterable[VerifyTypeError]:
    """Simple runtime type checker, supports Union, Literal, List, Dict.

    Args:
        obj: The object to be verified.
        typ: The expected type. Union, Literal, List, Dict or runtime checkable type
        name: Name of the object.

    Yields:
        When a type error was detected.

    Raises:
        TypeError: When an unsupported type is specified.
    """
    if type_origin := get_origin(typ):  # elif (t_o is not None)
        if type_origin is Union:
            for sub_type in get_args(typ):
                if is_empty_iterable(check_simple_type(obj, sub_type, name=name)):
                    return
            yield VerifyTypeError(name, typ, obj)
        elif type_origin is Literal:
            if obj not in get_args(typ):
                yield VerifyTypeError(name, typ, obj)
        elif type_origin is list:
            if not isinstance(obj, list):
                yield VerifyTypeError(name, typ, obj)
                return
            for i, val in enumerate(obj):
                yield from check_simple_type(
                    val,
                    get_args(typ)[0],
                    name=f"{name}[{i}]",
                )
        elif type_origin is dict:
            if not isinstance(obj, dict):
                yield VerifyTypeError(name, typ, obj)
                return
            key_type, value_type = get_args(typ)
            for key, value in obj.items():
                # TODO: Better description of key
                yield from check_simple_type(
                    key,
                    key_type,
                    name=f"{name}[{key!r}]",
                )
                yield from check_simple_type(
                    value,
                    value_type,
                    name=f"{name}[{key!r}]",
                )
        else:
            raise TypeError(f"Unsupported type: {type_origin!r}")
    elif isinstance(typ, type):
        if not isinstance(obj, typ):
            yield VerifyTypeError(name, typ, obj)
    else:
        raise TypeError(f"Invalid type: {typ!r}")

explain_type

explain_type(typ: TypeHint) -> str

Explain a type.

Parameters:

Name Type Description Default
typ TypeHint

The type to be explained.

required

Returns:

Type Description
str

Description of the type.

Raises:

Type Description
TypeError

When an unsupported/invalid type passed.

Source code in src/pyfsd/define/check_dict.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def explain_type(typ: TypeHint) -> str:
    """Explain a type.

    Args:
        typ: The type to be explained.

    Returns:
        Description of the type.

    Raises:
        TypeError: When an unsupported/invalid type passed.
    """
    if isinstance(typ, dict) or is_typeddict(typ):
        return "dict"
    if type_origin := get_origin(typ):  # elif (t_o is not None)
        if type_origin is Union:
            return " or ".join(explain_type(sub_type) for sub_type in get_args(typ))
        if type_origin is Literal:
            return " or ".join(repr(sub_value) for sub_value in get_args(typ))
        if type_origin in (list, dict):
            return str(typ).removeprefix(typ.__module__ + ".")
        raise TypeError(f"Unsupported type: {type_origin!r}")
    if isinstance(typ, type):
        return typ.__name__
    raise TypeError(f"Invalid type: {typ!r}")

lookup_required

lookup_required(structure: DictStructure) -> Iterable[Hashable]

Yields all required key in a TypedDict.

Parameters:

Name Type Description Default
structure DictStructure

The type structure, TypedDict or dict.

required

Yields:

Type Description
Iterable[Hashable]

Keys that are required, str normally.

Source code in src/pyfsd/define/check_dict.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def lookup_required(structure: DictStructure) -> Iterable[Hashable]:
    """Yields all required key in a TypedDict.

    Args:
        structure: The type structure, TypedDict or dict.

    Yields:
        Keys that are required, str normally.
    """
    if is_typeddict(structure):
        # Python < 3.9 not supported
        # ---------
        # Mypy bug, ignore it
        if not structure.__total__:  # type: ignore[union-attr]
            # Nothing is required
            return
        if NotRequired.__module__ == "typing":  # Python 3.11+, not need to Workaround
            yield from structure.__required_keys__  # type: ignore[union-attr]
            return
        # Python 3.9, 3.10
        type_hints = get_type_hints(structure)
        for may_required_keys in structure.__required_keys__:  # type: ignore[union-attr]
            if get_origin(type_hints[may_required_keys]) not in (
                NotRequired,
                NotRequired_ext,
            ):
                yield may_required_keys
    else:
        for may_required_keys, type_ in structure.items():  # type: ignore[union-attr]
            if get_origin(type_) not in (NotRequired, NotRequired_ext):
                yield may_required_keys

errors

FSD client protocol errors.

FSDClientError

Bases: IntEnum

Errno constants.

__str__
__str__() -> str

Return the error string.

Source code in src/pyfsd/define/errors.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __str__(self) -> str:
    """Return the error string."""
    return (
        "No error",
        "Callsign in use",
        "Invalid callsign",
        "Already registerd",  # codespell:ignore registerd
        "Syntax error",
        "Invalid source callsign",
        "Invalid CID/password",
        "No such callsign",
        "No flightplan",
        "No such weather profile",
        "Invalid protocol revision",
        "Requested level too high",
        "Too many clients connected",
        "CID/PID was suspended",
    )[int(self)]

packet

Utilities to deal with FSD packet.

Attributes:

Name Type Description
CLIENT_USED_COMMAND list[FSDClientCommand]

All possibly command can be issued by user in protocol 9.

SPLIT_SIGN CompatibleString

FSD client packet's split sign.

CompatibleString

CompatibleString(value: str)

Helper to deal with bytes and str.

Too hard to describe, please see examples section.

Examples:

>>> str1 = CompatibleString("1234")
>>> assert str1 + "test" == "1234test"
>>> assert str1 + b"test" == b"1234test"
>>> assert str1 + CompatibleString("test") == CompatibleString("1234test")
>>> assert "1" in str1
>>> assert b"2" in str1
>>> assert CompatibleString("3") in str1

Attributes:

Name Type Description
string str

The original ascii-only str.

Parameters:

Name Type Description Default
value str

The original ascii-only str.

required

Raises:

Type Description
ValueError

When the value contains non-ascii characters.

Source code in src/pyfsd/define/packet.py
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, value: str) -> None:
    """Create a CompatibleString instance.

    Args:
        value: The original ascii-only str.

    Raises:
        ValueError: When the value contains non-ascii characters.
    """
    if not value.isascii():
        raise ValueError("String can only contain ASCII characters")
    self.string = value
__add__
__add__(other: _T_str) -> _T_str

Return self+other.

Parameters:

Name Type Description Default
other _T_str

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
195
196
197
198
199
200
201
202
203
204
205
206
207
def __add__(self, other: _T_str) -> _T_str:
    """Return self+other.

    Args:
        other: str, bytes or CompatibleString.
    """
    if isinstance(other, CompatibleString):
        return CompatibleString(self.string + other.string)
    if isinstance(other, str):
        return self.string + other
    if isinstance(other, bytes):
        return self.string.encode() + other
    return NotImplemented
__bytes__
__bytes__() -> bytes

Convert this CompatibleString into bytes.

Source code in src/pyfsd/define/packet.py
86
87
88
def __bytes__(self) -> bytes:
    """Convert this CompatibleString into bytes."""
    return self.string.encode()
__complex__
__complex__() -> complex

Return complex(self.string).

Source code in src/pyfsd/define/packet.py
82
83
84
def __complex__(self) -> complex:
    """Return complex(self.string)."""
    return complex(self.string)
__contains__
__contains__(part: object) -> bool

Return part in self.

Parameters:

Name Type Description Default
part object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def __contains__(self, part: object) -> bool:
    """Return part in self.

    Args:
        part: str, bytes or CompatibleString.
    """
    if isinstance(part, CompatibleString):
        return part.string in self.string
    if isinstance(part, str):
        return part in self.string
    if isinstance(part, bytes):
        return part in self.string.encode()
    raise TypeError(
        "'in <CompatibleString>' requires string or bytes or "
        f"CompatibleString as left operand, not {type(part).__name__}"
    )
__eq__
__eq__(value: object) -> bool

Return self == value.

Parameters:

Name Type Description Default
value object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __eq__(self, value: object) -> bool:
    """Return self == value.

    Args:
        value: str, bytes or CompatibleString.
    """
    if self is value:
        return True
    if isinstance(value, CompatibleString):
        return self.string == value.string
    if isinstance(value, str):
        return self.string == value
    if isinstance(value, bytes):
        return self.string.encode() == value
    return NotImplemented
__float__
__float__() -> float

Return float(self.string).

Source code in src/pyfsd/define/packet.py
78
79
80
def __float__(self) -> float:
    """Return float(self.string)."""
    return float(self.string)
__ge__
__ge__(value: object) -> bool

Return self >= value.

Parameters:

Name Type Description Default
value object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
156
157
158
159
160
161
162
163
164
165
166
167
168
def __ge__(self, value: object) -> bool:
    """Return self >= value.

    Args:
        value: str, bytes or CompatibleString.
    """
    if isinstance(value, CompatibleString):
        return self.string >= value.string
    if isinstance(value, str):
        return self.string >= value
    if isinstance(value, bytes):
        return self.string.encode() >= value
    return NotImplemented
__getitem__
__getitem__(index: Union[int, slice]) -> CompatibleString

Return self[index].

Source code in src/pyfsd/define/packet.py
191
192
193
def __getitem__(self, index: Union[int, slice]) -> "CompatibleString":
    """Return self[index]."""
    return self.__class__(self.string[index])
__getnewargs__
__getnewargs__() -> tuple[str]

Return self.string in tuple, for pickle.

Source code in src/pyfsd/define/packet.py
94
95
96
def __getnewargs__(self) -> tuple[str]:
    """Return self.string in tuple, for pickle."""
    return (self.string[:],)
__gt__
__gt__(value: object) -> bool

Return self > value.

Parameters:

Name Type Description Default
value object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
142
143
144
145
146
147
148
149
150
151
152
153
154
def __gt__(self, value: object) -> bool:
    """Return self > value.

    Args:
        value: str, bytes or CompatibleString.
    """
    if isinstance(value, CompatibleString):
        return self.string > value.string
    if isinstance(value, str):
        return self.string > value
    if isinstance(value, bytes):
        return self.string.encode() > value
    return NotImplemented
__hash__
__hash__() -> int

Return hash(self.string).

Source code in src/pyfsd/define/packet.py
90
91
92
def __hash__(self) -> int:
    """Return hash(self.string)."""
    return hash(self.string)
__int__
__int__() -> int

Return int(self.string).

Source code in src/pyfsd/define/packet.py
74
75
76
def __int__(self) -> int:
    """Return int(self.string)."""
    return int(self.string)
__le__
__le__(value: object) -> bool

Return self <= value.

Parameters:

Name Type Description Default
value object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
128
129
130
131
132
133
134
135
136
137
138
139
140
def __le__(self, value: object) -> bool:
    """Return self <= value.

    Args:
        value: str, bytes or CompatibleString.
    """
    if isinstance(value, CompatibleString):
        return self.string <= value.string
    if isinstance(value, str):
        return self.string <= value
    if isinstance(value, bytes):
        return self.string.encode() <= value
    return NotImplemented
__len__
__len__() -> int

Return len(self).

Source code in src/pyfsd/define/packet.py
187
188
189
def __len__(self) -> int:
    """Return len(self)."""
    return len(self.string)
__lt__
__lt__(value: object) -> bool

Return self < value.

Parameters:

Name Type Description Default
value object

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
114
115
116
117
118
119
120
121
122
123
124
125
126
def __lt__(self, value: object) -> bool:
    """Return self < value.

    Args:
        value: str, bytes or CompatibleString.
    """
    if isinstance(value, CompatibleString):
        return self.string < value.string
    if isinstance(value, str):
        return self.string < value
    if isinstance(value, bytes):
        return self.string.encode() < value
    return NotImplemented
__mod__
__mod__(args: Union[tuple, object]) -> CompatibleString

Return self % args.

Source code in src/pyfsd/define/packet.py
231
232
233
def __mod__(self, args: Union[tuple, object]) -> "CompatibleString":
    """Return self % args."""
    return self.__class__(self.string % args)
__mul__
__mul__(n: int) -> CompatibleString

Return self * n.

Source code in src/pyfsd/define/packet.py
223
224
225
def __mul__(self, n: int) -> "CompatibleString":
    """Return self * n."""
    return self.__class__(self.string * n)
__radd__
__radd__(other: _T_str) -> _T_str

Return other+self.

Parameters:

Name Type Description Default
other _T_str

str, bytes or CompatibleString.

required
Source code in src/pyfsd/define/packet.py
209
210
211
212
213
214
215
216
217
218
219
220
221
def __radd__(self, other: _T_str) -> _T_str:
    """Return other+self.

    Args:
        other: str, bytes or CompatibleString.
    """
    if isinstance(other, CompatibleString):
        return CompatibleString(other.string + self.string)
    if isinstance(other, str):
        return other + self.string
    if isinstance(other, bytes):
        return other + self.string.encode()
    return NotImplemented
__repr__
__repr__() -> str

Return the canonical string representation.

Source code in src/pyfsd/define/packet.py
70
71
72
def __repr__(self) -> str:
    """Return the canonical string representation."""
    return f"CompatibleString('{self.string}')"
__rmod__
__rmod__(template: _T_str) -> _T_str

Return template % self.

Source code in src/pyfsd/define/packet.py
235
236
237
238
239
240
241
242
243
244
245
def __rmod__(self, template: _T_str) -> _T_str:
    """Return template % self."""
    # Useless?
    if isinstance(template, CompatibleString):
        # ????
        return template % self
    if isinstance(template, str):
        return template % self.string
    if isinstance(template, bytes):
        return template % self.string.encode()
    return NotImplemented
__rmul__
__rmul__(n: int) -> CompatibleString

Return n * self.

Source code in src/pyfsd/define/packet.py
227
228
229
def __rmul__(self, n: int) -> "CompatibleString":
    """Return n * self."""
    return self.__class__(self.string * n)
__str__
__str__() -> str

Return str(self).

Source code in src/pyfsd/define/packet.py
66
67
68
def __str__(self) -> str:
    """Return str(self)."""
    return str(self.string)
as_type
as_type(type_: type[AnyStr]) -> AnyStr

Convert this CompatibleString into specified type.

Parameters:

Name Type Description Default
type_ type[AnyStr]

literally str or bytes.

required
Source code in src/pyfsd/define/packet.py
247
248
249
250
251
252
253
254
255
256
257
def as_type(self, type_: type[AnyStr]) -> AnyStr:
    """Convert this CompatibleString into specified type.

    Args:
        type_: literally str or bytes.
    """
    if type_ is str:
        return self.string  # type: ignore[return-value]
    if type_ is bytes:
        return self.string.encode()  # type: ignore[return-value]
    raise TypeError(f"Invalid string type: {type_}")

FSDClientCommand

FSDClientCommand(value: str)

Bases: CompatibleString, Enum

FSD client command.

Source code in src/pyfsd/define/packet.py
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, value: str) -> None:
    """Create a CompatibleString instance.

    Args:
        value: The original ascii-only str.

    Raises:
        ValueError: When the value contains non-ascii characters.
    """
    if not value.isascii():
        raise ValueError("String can only contain ASCII characters")
    self.string = value

break_packet

break_packet(packet: AnyStr, possibly_commands: Iterable[AnyStr]) -> tuple[Optional[AnyStr], tuple[AnyStr, ...]]
break_packet(packet: AnyStr, possibly_commands: Iterable[FSDClientCommand]) -> tuple[Optional[FSDClientCommand], tuple[AnyStr, ...]]
break_packet(packet: AnyStr, possibly_commands: Iterable[Union[AnyStr, FSDClientCommand]]) -> tuple[Optional[Union[AnyStr, FSDClientCommand]], tuple[AnyStr, ...]]
break_packet(packet: AnyStr, possibly_commands: Iterable[Union[AnyStr, FSDClientCommand]]) -> tuple[Optional[Union[AnyStr, FSDClientCommand]], tuple[AnyStr, ...]]

Break a packet into command and parts.

#APzzzzzzzzzzzz1:zzzzzzz3:zzzzzzz4
[^][^^^^^^^^^^^] [^^^^^^] [^^^^^^]
command parts[0] parts[1] parts[2]

Parameters:

Name Type Description Default
packet AnyStr

The original packet.

required
possibly_commands Iterable[Union[AnyStr, FSDClientCommand]]

All possibly commands. This function will check if packet starts with one of possibly commands then split it out.

required

Returns:

Type Description
tuple[Optional[Union[AnyStr, FSDClientCommand]], tuple[AnyStr, ...]]

tuple[command or None, tuple[every_part, ...]]

Source code in src/pyfsd/define/packet.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def break_packet(
    packet: AnyStr,
    possibly_commands: Iterable[Union[AnyStr, FSDClientCommand]],
) -> tuple[Optional[Union[AnyStr, FSDClientCommand]], tuple[AnyStr, ...]]:
    """Break a packet into command and parts.

        #APzzzzzzzzzzzz1:zzzzzzz3:zzzzzzz4
        [^][^^^^^^^^^^^] [^^^^^^] [^^^^^^]
        command parts[0] parts[1] parts[2]

    Args:
        packet: The original packet.
        possibly_commands: All possibly commands. This function will check if packet \
            starts with one of possibly commands then split it out.

    Returns:
        tuple[command or None, tuple[every_part, ...]]
    """
    packet_type = type(packet)
    command: Optional[Union[AnyStr, FSDClientCommand]] = None
    splited_packet: list[AnyStr]
    for possibly_command in possibly_commands:
        command_str: AnyStr
        if isinstance(possibly_command, FSDClientCommand):
            command_str = possibly_command.as_type(packet_type)
        else:
            command_str = possibly_command
        if packet.startswith(command_str):
            command = possibly_command
            break
    splited_packet = packet.split(  # pyright: ignore[reportAssignmentType]
        SPLIT_SIGN.as_type(packet_type)
    )
    if command is not None:
        splited_packet[0] = splited_packet[0][len(command) :]
    return (command, tuple(splited_packet))

join_lines

join_lines(*lines: AnyStr, newline: bool = True) -> AnyStr

Join lines together.

Parameters:

Name Type Description Default
lines AnyStr

The lines.

()
newline bool

Append '\r\n' to every line or not.

True

Returns:

Type Description
AnyStr

The result.

Source code in src/pyfsd/define/packet.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def join_lines(*lines: AnyStr, newline: bool = True) -> AnyStr:
    r"""Join lines together.

    Args:
        lines: The lines.
        newline: Append '\r\n' to every line or not.

    Returns:
        The result.
    """
    result = CompatibleString("")
    split_sign = CompatibleString("\r\n")
    for line in lines:
        # Ignore type errors. Just let it raise.
        result += line + split_sign if newline else line  # type: ignore[assignment]
    return cast("AnyStr", result)

make_packet

make_packet(*parts: Union[AnyStr, FSDClientCommand]) -> AnyStr

Join parts together and add split sign between every two parts.

Source code in src/pyfsd/define/packet.py
297
298
299
300
301
302
303
304
def make_packet(*parts: Union[AnyStr, FSDClientCommand]) -> AnyStr:
    """Join parts together and add split sign between every two parts."""
    result = CompatibleString("")
    for part in parts:
        result += part + SPLIT_SIGN  # type: ignore[assignment]
    if isinstance(result, FSDClientCommand):
        raise TypeError("Must have str or bytes item")
    return cast("AnyStr", result[:-1])

simulation

This module simulates some feature in C++ like integer overflow.

Int32MRand

Int32MRand()

Bases: MRand

MRand that simulates int32 overflow.

Source code in src/pyfsd/define/simulation.py
14
15
16
def __init__(self) -> None:
    """Create a MRand instance."""
    self._really_randseed = 0
mrandseed property writable
mrandseed: int

Get mrandseed.

utils

Collection of tools that are used frequently.

Attributes:

Name Type Description
task_keeper TaskKeeper

Helper to keep your asyncio.Task's strong reference and cancel it when PyFSD is shutting down.

mustdone_task_keeper TaskKeeper

Similar to task_keeper, but PyFSD will await them before stop.

MRand

Python implementation of FSD MRand.

Note

This class does not simulate int32 overflow. See also: pyfsd.define.simulation.Int32MRand

Attributes:

Name Type Description
mrandseed int

Random seed.

__call__
__call__() -> int

Generate a random number.

Source code in src/pyfsd/define/utils.py
239
240
241
242
243
244
245
def __call__(self) -> int:
    """Generate a random number."""
    self.mrandseed ^= 0x22591D8C
    part1 = (self.mrandseed << 1) & 0xFFFFFFFF
    part2 = self.mrandseed >> 31
    self.mrandseed ^= part1 | part2
    return self.mrandseed
srand
srand(seed: int) -> None

Set random seed.

Source code in src/pyfsd/define/utils.py
247
248
249
def srand(self, seed: int) -> None:
    """Set random seed."""
    self.mrandseed = seed

TaskKeeper

TaskKeeper()

Helper to keep strong reference of running asyncio.Tasks.

Note

You're advised not to create new instance and use pyfsd.define.utils.task_keeper instead.

Source code in src/pyfsd/define/utils.py
262
263
264
def __init__(self) -> None:
    """Create a TaskKeeper instance."""
    self.tasks = set()
add
add(task: Task) -> None

Add a task that to be kept.

Source code in src/pyfsd/define/utils.py
271
272
273
274
def add(self, task: "Task") -> None:
    """Add a task that to be kept."""
    self.tasks.add(task)
    task.add_done_callback(self.tasks.discard)
cancel_all
cancel_all() -> None

Cancel all tasks.

Source code in src/pyfsd/define/utils.py
266
267
268
269
def cancel_all(self) -> None:
    """Cancel all tasks."""
    for task in self.tasks:
        task.cancel()

assert_no_duplicate

assert_no_duplicate(iterator: Iterable[Hashable]) -> None

Assert nothing duplicated in a iterable object.

Parameters:

Name Type Description Default
iterator Iterable[Hashable]

The iterable object.

required

Raises:

Type Description
AssertionError

When a duplicated value detected

Source code in src/pyfsd/define/utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def assert_no_duplicate(
    iterator: Iterable[Hashable],
) -> None:
    """Assert nothing duplicated in a iterable object.

    Args:
        iterator: The iterable object.

    Raises:
        AssertionError: When a duplicated value detected
    """
    list_val = list(iterator)
    nodup_list_val = list(set(list_val))

    if len(list_val) != len(nodup_list_val):
        for nodup_val in nodup_list_val:
            list_val.remove(nodup_val)
        raise AssertionError(f"Duplicated value: {list_val}")

asyncify

asyncify(func: Callable[P, T]) -> Callable[P, Awaitable[T]]

Decorator to patch a sync function to become async by execute it in thread.

Examples:

>>> @asyncify
>>> def blocking_func():
...     sleep(100) # Blocking call
...
>>> async def another_func():
...     await blocking_func()  # Not blocking anymore
Source code in src/pyfsd/define/utils.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def asyncify(func: Callable[P, T]) -> Callable[P, Awaitable[T]]:
    """Decorator to patch a sync function to become async by execute it in thread.

    Examples:
        >>> @asyncify
        >>> def blocking_func():
        ...     sleep(100) # Blocking call
        ...
        >>> async def another_func():
        ...     await blocking_func()  # Not blocking anymore
    """

    @wraps(func)
    async def _call(*args: P.args, **kwargs: P.kwargs) -> T:
        loop = get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))

    return _call

calc_distance

calc_distance(from_position: Position, to_position: Position, unit: Unit = NAUTICAL_MILES) -> float

Calculate the distance from one point to another point.

A wrapper of haversine since it's not typed well

Parameters:

Name Type Description Default
from_position Position

The first point.

required
to_position Position

The second point.

required
unit Unit

Unit of the distance, nm by default.

NAUTICAL_MILES

Returns:

Type Description
float

The distance.

Source code in src/pyfsd/define/utils.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def calc_distance(
    from_position: "Position",
    to_position: "Position",
    unit: Unit = Unit.NAUTICAL_MILES,
) -> float:
    """Calculate the distance from one point to another point.

    A wrapper of haversine since it's not typed well

    Args:
        from_position: The first point.
        to_position: The second point.
        unit: Unit of the distance, nm by default.

    Returns:
        The distance.
    """
    return cast("float", haversine(from_position, to_position, unit=unit))

is_callsign_valid

is_callsign_valid(callsign: Union[str, bytes]) -> bool

Check if a callsign is valid or not.

Source code in src/pyfsd/define/utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def is_callsign_valid(callsign: Union[str, bytes]) -> bool:
    """Check if a callsign is valid or not."""
    if not CALLSIGN_MIN_LEN < len(callsign) < CALLSIGN_MAX_LEN:
        return False
    if isinstance(callsign, str):
        return not (
            ("!" in callsign)
            or ("@" in callsign)
            or ("#" in callsign)
            or ("$" in callsign)
            or ("%" in callsign)
            or ("*" in callsign)
            or (":" in callsign)
            or ("&" in callsign)
            or (" " in callsign)
            or ("\t" in callsign)
        )
    return not (
        (b"!" in callsign)
        or (b"@" in callsign)
        or (b"#" in callsign)
        or (b"$" in callsign)
        or (b"%" in callsign)
        or (b"*" in callsign)
        or (b":" in callsign)
        or (b"&" in callsign)
        or (b" " in callsign)
        or (b"\t" in callsign)
    )

is_empty_iterable

is_empty_iterable(iter_obj: Iterable) -> bool

Check if a iterable object is empty.

Source code in src/pyfsd/define/utils.py
154
155
156
157
158
159
160
161
def is_empty_iterable(iter_obj: Iterable) -> bool:
    """Check if a iterable object is empty."""
    try:
        next(iter(iter_obj))
    except StopIteration:
        return True
    else:
        return False

iter_callable

iter_callable(obj: object, *, ignore_private: bool = True) -> Iterable[Callable]

Yields all callable attribute in a object.

Parameters:

Name Type Description Default
obj object

The object.

required
ignore_private bool

Don't yield attributes which name starts with '_'.

True

Yields:

Type Description
Iterable[Callable]

Callable attributes.

Source code in src/pyfsd/define/utils.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def iter_callable(obj: object, *, ignore_private: bool = True) -> Iterable[Callable]:
    """Yields all callable attribute in a object.

    Args:
        obj: The object.
        ignore_private: Don't yield attributes which name starts with '_'.

    Yields:
        Callable attributes.
    """
    for attr_name in dir(obj):
        if ignore_private and attr_name.startswith("_"):
            continue
        attr = getattr(obj, attr_name)
        if callable(attr):
            yield attr

iterables

iterables(*iterators: Iterable[T]) -> Iterable[T]
iterables(*iterators: Iterable) -> Iterable
iterables(*iterators: Iterable) -> Iterable

Iterate multiple iterable objects at once.

Parameters:

Name Type Description Default
iterators Iterable

Iterable objects.

()

Yields:

Type Description
Iterable

Iterate result.

Source code in src/pyfsd/define/utils.py
172
173
174
175
176
177
178
179
180
181
182
def iterables(*iterators: Iterable) -> Iterable:
    """Iterate multiple iterable objects at once.

    Args:
        iterators: Iterable objects.

    Yields:
        Iterate result.
    """
    for iterator in iterators:
        yield from iterator

str_to_float

str_to_float(string: Union[str, bytes], default_value: float = 0.0) -> float

Convert a str or bytes into float.

Parameters:

Name Type Description Default
string Union[str, bytes]

The string to be converted.

required
default_value float

Default value when convert failed.

0.0

Returns:

Type Description
float

The float number.

Source code in src/pyfsd/define/utils.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def str_to_float(string: Union[str, bytes], default_value: float = 0.0) -> float:
    """Convert a str or bytes into float.

    Args:
        string: The string to be converted.
        default_value: Default value when convert failed.

    Returns:
        The float number.
    """
    try:
        return float(string)
    except ValueError:
        return default_value

str_to_int

str_to_int(string: Union[str, bytes], default_value: int = 0) -> int

Convert a str or bytes into int.

Parameters:

Name Type Description Default
string Union[str, bytes]

The string to be converted.

required
default_value int

Default value when convert failed.

0

Returns:

Type Description
int

The int.

Source code in src/pyfsd/define/utils.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def str_to_int(string: Union[str, bytes], default_value: int = 0) -> int:
    """Convert a str or bytes into int.

    Args:
        string: The string to be converted.
        default_value: Default value when convert failed.

    Returns:
        The int.
    """
    try:
        return int(string)
    except ValueError:
        return default_value

dependencies

PyFSD dependencies container.

Container

Bases: DeclarativeContainer

PyFSD dependencies container.

Attributes:

Name Type Description
config RootPyFSDConfigProvider

The root config.

db_engine Singleton[AsyncEngine]

sqlalchemy database engine.

plugin_manager Singleton[PluginManager]

Plugin manager.

metar_manager Singleton[MetarManager]

Metar manager.

client_factory Singleton[ClientFactory]

Client protocol factory, which stores clients and do something else.

RootPyFSDConfigProvider

Bases: Configuration

Customized providers.Configuration with correct type annotation.

from_dict

from_dict(options: RootPyFSDConfig, *, required: bool = False) -> None

Load configuration from dict.

Source code in src/pyfsd/dependencies.py
19
20
21
22
23
24
25
26
def from_dict(  # pyright: ignore[reportIncompatibleMethodOverride]
    self,
    options: "RootPyFSDConfig",  # type: ignore[override]
    *,
    required: bool = False,
) -> None:
    """Load configuration from dict."""
    return super().from_dict(options, required=required)  # type: ignore[arg-type]

factory

PyFSD protocol factories.

client

Protocol factory -- client.

ClientFactory

ClientFactory(motd: bytes, blacklist: list[str], metar_manager: MetarManager, plugin_manager: PluginManager, db_engine: AsyncEngine)

Factory of ClientProtocol.

Attributes:

Name Type Description
clients dict[bytes, Client]

All clients, Dict[callsign(bytes), Client]

heartbeat_task Task[NoReturn] | None

Task to send heartbeat to clients.

motd list[bytes]

The Message Of The Day.

blacklist list[str]

IP blacklist.

metar_manager MetarManager

The metar manager.

plugin_manager PluginManager

The plugin manager.

db_engine AsyncEngine

Async sqlalchemy engine.

password_hasher PasswordHasher

Argon2 password hasher.

Source code in src/pyfsd/factory/client.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    motd: bytes,
    blacklist: list[str],
    metar_manager: "MetarManager",
    plugin_manager: "PluginManager",
    db_engine: "AsyncEngine",
) -> None:
    """Create a ClientFactory instance."""
    self.clients = {}
    self.heartbeat_task = None
    self.motd = motd.splitlines()
    self.blacklist = blacklist
    self.metar_manager = metar_manager
    self.plugin_manager = plugin_manager
    self.db_engine = db_engine
    self.password_hasher = PasswordHasher()
__call__
__call__() -> ClientProtocol

Create a ClientProtocol instance.

Source code in src/pyfsd/factory/client.py
112
113
114
def __call__(self) -> ClientProtocol:
    """Create a ClientProtocol instance."""
    return ClientProtocol(self)
broadcast
broadcast(*lines: bytes, check_func: BroadcastChecker = lambda _, __: True, auto_newline: bool = True, from_client: Optional[Client] = None) -> bool

Broadcast a message.

Parameters:

Name Type Description Default
lines bytes

Lines to be broadcasted.

()
check_func BroadcastChecker

Function to check if message should be sent to a client.

lambda _, __: True
auto_newline bool

Auto put newline marker between lines or not.

True
from_client Optional[Client]

Where the message from.

None
Return

Lines sent to at least one client or not.

Source code in src/pyfsd/factory/client.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def broadcast(
    self,
    *lines: bytes,
    check_func: "BroadcastChecker" = lambda _, __: True,
    auto_newline: bool = True,
    from_client: Optional["Client"] = None,
) -> bool:
    """Broadcast a message.

    Args:
        lines: Lines to be broadcasted.
        check_func: Function to check if message should be sent to a client.
        auto_newline: Auto put newline marker between lines or not.
        from_client: Where the message from.

    Return:
        Lines sent to at least one client or not.
    """
    have_one = False
    data = join_lines(*lines, newline=auto_newline)
    for client in self.clients.values():
        if client == from_client:
            continue
        if not check_func(from_client, client):
            continue
        have_one = True
        if not client.transport.is_closing():
            client.transport.write(data)
    return have_one
check_auth async
check_auth(username: str, password: str) -> Optional[int]

Check if password and username is correct.

Source code in src/pyfsd/factory/client.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
async def check_auth(self, username: str, password: str) -> Optional[int]:
    """Check if password and username is correct."""

    # This function updates hash (argon2)
    async def update_hashed(new_hashed: str) -> None:
        async with self.db_engine.begin() as conn:
            await conn.execute(
                update(users_table)
                .where(users_table.c.callsign == username)
                .values(password=new_hashed)
            )

    # Fetch current hashed password and rating
    async with self.db_engine.begin() as conn:
        infos = tuple(
            await conn.execute(
                select(users_table.c.password, users_table.c.rating).where(
                    users_table.c.callsign == username
                )
            )
        )
    if len(infos) == 0:  # User not found
        return None
    if len(infos) != 1:  # User duplicated
        raise RuntimeError(f"Duplicated callsign in users database: {username}")
    hashed, rating = cast("tuple[str, int]", infos[0])

    # =============== Check if hash is sha256
    if len(hashed) == SHA256_HEX_LENGTH:
        if sha256(password.encode()).hexdigest() == hashed:  # correct
            # Now we have the plain password, save it as argon2
            new_hashed = self.password_hasher.hash(password)
            await update_hashed(new_hashed)
            return rating
        return None  # incorrect
    # =============== Check argon2
    try:
        self.password_hasher.verify(hashed, password)
    except exceptions.VerifyMismatchError:  # Incorrect
        return None
    except exceptions.InvalidHashError:
        await logger.aerror(f"Invalid hash found in users table: {hashed}")
        return None
    # Check if need rehash
    if self.password_hasher.check_needs_rehash(hashed):
        await update_hashed(self.password_hasher.hash(password))
    return rating
get_heartbeat_task
get_heartbeat_task() -> Task[NoReturn]

Get heartbeat task.

Source code in src/pyfsd/factory/client.py
87
88
89
90
91
92
93
94
95
96
97
98
def get_heartbeat_task(self) -> "Task[NoReturn]":
    """Get heartbeat task."""
    if self.heartbeat_task is not None:
        return self.heartbeat_task

    async def heartbeater() -> NoReturn:
        while True:
            await asleep(70)
            self.heartbeat()

    self.heartbeat_task = create_task(heartbeater())
    return self.heartbeat_task
heartbeat
heartbeat() -> None

Send heartbeat to clients.

Source code in src/pyfsd/factory/client.py
100
101
102
103
104
105
106
107
108
109
110
def heartbeat(self) -> None:
    """Send heartbeat to clients."""
    random_int: int = randint(-214743648, 2147483647)  # noqa: S311
    self.broadcast(
        make_packet(
            FSDClientCommand.WIND_DELTA + "SERVER",
            "*",
            f"{random_int % 11 - 5}",
            f"{random_int % 21 - 10}",
        ).encode("ascii"),
    )
remove_all_clients
remove_all_clients() -> None

Remove all clients.

Source code in src/pyfsd/factory/client.py
215
216
217
218
def remove_all_clients(self) -> None:
    """Remove all clients."""
    for client in self.clients.copy().values():
        client.transport.close()
send_to
send_to(callsign: bytes, *lines: bytes, auto_newline: bool = True) -> bool

Send lines to a specified client.

Parameters:

Name Type Description Default
callsign bytes

The client's callsign.

required
lines bytes

Lines to be broadcasted.

()
auto_newline bool

Auto put newline marker between lines or not.

True

Returns:

Type Description
bool

Is there a client called {callsign} (and is message sent or not).

Source code in src/pyfsd/factory/client.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def send_to(
    self, callsign: bytes, *lines: bytes, auto_newline: bool = True
) -> bool:
    """Send lines to a specified client.

    Args:
        callsign: The client's callsign.
        lines: Lines to be broadcasted.
        auto_newline: Auto put newline marker between lines or not.

    Returns:
        Is there a client called {callsign} (and is message sent or not).
    """
    data = join_lines(*lines, newline=auto_newline)
    try:
        self.clients[callsign].transport.write(data)
    except KeyError:
        return False
    else:
        return True

main

Run PyFSD.

Attributes:

Name Type Description
DEFAULT_CONFIG str

Default config of PyFSD.

PyFSDConfig

Bases: TypedDict

PyFSD config.

PyFSDDatabaseConfig

Bases: TypedDict

PyFSD database config.

Attributes:

Name Type Description
url str

The database url, see SQLALchemy docs.

RootPyFSDConfig

Bases: TypedDict

PyFSD root config.

launch async

launch(config: RootPyFSDConfig, *, wait_all_tasks_done: bool = True) -> None

Launch PyFSD.

Source code in src/pyfsd/main.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def launch(config: RootPyFSDConfig, *, wait_all_tasks_done: bool = True) -> None:
    """Launch PyFSD."""
    # =============== Initialize dependencies
    container = Container()
    container.config.from_dict(config)
    register_loader_containers(container)  # Register
    # Then load plugins to wire them
    pm = container.plugin_manager()
    await pm.pick_plugins(config.get("plugin", {}))
    container.metar_manager().check_fetchers()
    # Initialize database
    async with container.db_engine().begin() as conn:
        await conn.run_sync(metadata.create_all)
    # =============== Startup
    loop = get_event_loop()
    client_server = await loop.create_server(
        container.client_factory(), port=config["pyfsd"]["client"]["port"]
    )
    await container.plugin_manager().trigger_event_auditers("before_start", (), {})
    await logger.ainfo(f"PyFSD {version}")

    plugins_count = pm.plugins_count()
    await logger.ainfo(
        f"{pm.plugins_count()} plugins{': ' if plugins_count else ''}{pm!s}"
    )
    tasks_pyfsd = (
        container.metar_manager().get_cron_task(),
        container.client_factory().get_heartbeat_task(),
        create_task(client_server.serve_forever()),
    )
    try:
        async with client_server:
            await gather(
                *tasks_pyfsd,
            )
    except CancelledError:
        # =========== Stop
        container.client_factory().remove_all_clients()
        await container.plugin_manager().trigger_event_auditers("before_stop", (), {})
        await logger.ainfo("Stopping")
        client_server.close()
        await client_server.wait_closed()
        for task in tasks_pyfsd:
            task.cancel()
        for task in task_keeper.tasks:
            task.cancel()

        if wait_all_tasks_done:
            tasks = all_tasks()
            tasks.discard(cast("Task", current_task()))
        else:
            tasks = mustdone_task_keeper.tasks
        if tasks:
            total_wait_seconds = 0
            while True:
                total_wait_seconds += 5
                _, pending = await wait(tasks, timeout=5)
                if not pending:
                    break
                await logger.adebug(
                    f"Waited {total_wait_seconds} second, "
                    "but these tasks are still running",
                    stack="\n".join(f"  {task!s}" for task in tasks),
                )
        await container.db_engine().dispose()
        raise

main

main() -> None

Main function of PyFSD.

Source code in src/pyfsd/main.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def main() -> None:
    """Main function of PyFSD."""
    # =============== Config
    parser = ArgumentParser()
    parser.add_argument(
        "-c",
        "--config-path",
        help="Path to the config file.",
        default="pyfsd.toml",
        type=str,
    )
    args = parser.parse_args()
    try:
        with open(args.config_path) as config_file:
            config = loads(config_file.read())
    except FileNotFoundError:
        with open(args.config_path, "w") as config_file:
            config_file.write(DEFAULT_CONFIG)
        config = loads(DEFAULT_CONFIG)

    assert_dict(
        config,
        RootPyFSDConfig,
        name="config",
    )
    # Replace database scheme with async dialect
    db_url: str = config["pyfsd"]["database"]["url"]
    if "://" not in db_url:
        raise ValueError("Invalid database url")
    scheme, url = db_url.split("://", 1)
    if "+" not in scheme:  # if user didn't specified driver
        if scheme == "postgresql":
            db_url = "postgresql+asyncpg://" + url
        elif scheme in ("mysql", "mariadb"):
            db_url = "mysql+asyncmy://" + url
        elif scheme == "sqlite":
            db_url = "sqlite+aiosqlite://" + url
        elif scheme == "oracle":
            db_url = "oracle+oracledb_async://" + url
        elif scheme == "mssql":
            db_url = "mssql+aioodbc://" + url
        # else we have nothing to do :)
        config["pyfsd"]["database"]["url"] = db_url

    # =============== Logger
    suppress_metar_parser_warning()
    setup_logger(config["pyfsd"]["logger"])

    # =============== Startup
    with suppress(ImportError):
        from uvloop import EventLoopPolicy

        set_event_loop_policy(EventLoopPolicy())

    loop = get_event_loop()

    async def runner() -> None:
        try:
            await launch(cast("RootPyFSDConfig", config))
        except CancelledError:
            pass
        except BaseException:
            logger.exception("Error happened when launching PyFSD")

        await loop.shutdown_asyncgens()
        await loop.shutdown_default_executor()
        loop.stop()

    runner_task = loop.create_task(runner())

    for signal in [SIGINT, SIGTERM, SIGHUP]:
        loop.add_signal_handler(signal, runner_task.cancel)
    loop.run_forever()  # complete after loop.stop()

    # =============== Stop
    loop.close()
    # Ensure we have working loggers when cpython is shutting down
    setup_logger(config["pyfsd"]["logger"], finalize=True)

metar

PyFSD -- Metar fetch & parse module.

fetch

Metar fetcher defines.

Attributes:

Name Type Description
MetarInfoDict

Type of a dict that describes all airports' metar.

CronFetcher

Type of cron mode metar fetcher.

OnceFetcher

Type of once mode metar fetcher.

noaa_fetch_all async

noaa_fetch_all(_: object) -> Optional[MetarInfoDict]

Fetch all airports' metar from NOAA.

Source code in src/pyfsd/metar/fetch.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def noaa_fetch_all(_: object) -> Optional[MetarInfoDict]:
    """Fetch all airports' metar from NOAA."""
    utc_hour = datetime.now(timezone.utc).hour

    async with (
        ClientSession() as session,
        session.get(
            "https://tgftp.nws.noaa.gov/data/observations/metar/cycles/"
            f"{utc_hour:02d}Z.TXT"
        ) as resp,
    ):
        if resp.status != HTTP_OK:
            return None
        all_metar: MetarInfoDict = {}
        loop = get_event_loop()
        metar_blocks = (await resp.text(errors="ignore")).split("\n\n")

        def parser() -> None:
            for block in metar_blocks:
                blocklines = block.splitlines()
                if len(blocklines) < NOAA_METAR_BLOCK_LINES:
                    continue
                current_metar = WeatherProfile(blocklines[1])
                if current_metar.name is not None:
                    all_metar[current_metar.name] = current_metar

        await loop.run_in_executor(None, parser)
        return all_metar

noaa_fetch_once async

noaa_fetch_once(_: object, icao: str) -> Optional[WeatherProfile]

Fetch single airport's metar from NOAA.

Source code in src/pyfsd/metar/fetch.py
41
42
43
44
45
46
47
48
49
50
51
async def noaa_fetch_once(_: object, icao: str) -> Optional[WeatherProfile]:
    """Fetch single airport's metar from NOAA."""
    async with (
        ClientSession() as session,
        session.get(
            f"https://tgftp.nws.noaa.gov/data/observations/metar/stations/{icao}.TXT"
        ) as resp,
    ):
        if resp.status == HTTP_OK:
            return None
        return WeatherProfile((await resp.text(errors="ignore")).splitlines()[1])

manager

PyFSD metar manager.

MetarFetchers

Bases: TypedDict

A dict that stores METAR fetchers.

MetarManager

MetarManager(config: Union[dict, PyFSDMetarConfig])

The PyFSD metar manager.

Attributes:

Name Type Description
fetchers MetarFetchers

All registered fetchers.

used_fetchers MetarFetchers

Fetchers that we're going to use.

metar_cache MetarInfoDict

Metars fetched in cron mode.

config Union[dict, PyFSDMetarConfig]

pyfsd.metar section of config.

cron_time Optional[float]

Interval time between every two cron fetch. None if not in cron mode.

cron_task Optional[Task[NoReturn]]

Task to perform cron metar cache.

Parameters:

Name Type Description Default
config Union[dict, PyFSDMetarConfig]

pyfsd.metar section of config.

required
Source code in src/pyfsd/metar/manager.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(self, config: Union[dict, PyFSDMetarConfig]) -> None:
    """Create a MetarManager instance.

    Args:
        config: pyfsd.metar section of config.
    """
    self.fetchers = {
        "cron": {"noaa": noaa_fetch_all},
        "once": {"noaa": noaa_fetch_once},
    }
    self.used_fetchers = {"cron": {}, "once": {}}
    self.cron_time = config.get("cron_time") if config["mode"] == "cron" else None
    self.cron_task = None
    self.config = config
    self.metar_cache = {}
cache_metar async
cache_metar() -> None

Perform a cron fetch.

Raises:

Type Description
RuntimeError

if not in cron mode.

Source code in src/pyfsd/metar/manager.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def cache_metar(self) -> None:
    """Perform a cron fetch.

    Raises:
        RuntimeError: if not in cron mode.
    """
    await logger.ainfo("Fetching METAR")

    for name, fetcher in self.used_fetchers["cron"].items():
        try:
            metars = await fetcher(self.config)
        # ruff: noqa: PERF203
        except (VerifyKeyError, VerifyTypeError) as err:
            await logger.aerror(
                f"Metar fetcher {name} doesn't work because {err!s}"
            )
        except (KeyboardInterrupt, CancelledError):
            raise
        # ruff: noqa: BLE001
        except BaseException:
            await logger.aexception("Exception raised when caching metar")
        else:
            if metars is not None:
                await logger.ainfo(f"Fetched {len(metars)} metars.")
                self.metar_cache = metars
                return
            continue
    await logger.aerror("No metar was fetched. All metar fetcher failed.")
check_fetchers
check_fetchers() -> None

Check if all specified metar fetchers in config is already here.

Source code in src/pyfsd/metar/manager.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def check_fetchers(self) -> None:
    """Check if all specified metar fetchers in config is already here."""
    used: MetarFetchers = {"cron": {}, "once": {}}
    is_once_mode = self.cron_time is None
    has_once_fallback = not is_once_mode and self.config.get("fallback_once", False)
    for need_fetcher in self.config["fetchers"]:
        found = 0
        # once only or cron with once fallback
        if (is_once_mode or has_once_fallback) and need_fetcher in self.fetchers[
            "once"
        ]:
            found += 1
            used["once"][need_fetcher] = self.fetchers["once"][need_fetcher]
        # cron
        if not is_once_mode and need_fetcher in self.fetchers["cron"]:
            found += 1
            used["cron"][need_fetcher] = self.fetchers["cron"][need_fetcher]
        if not found:
            logger.error("No such METAR fetcher: %s", need_fetcher)
    self.used_fetchers = used
fetch async
fetch(icao: str, *, ignore_case: bool = True) -> WeatherProfile | None

Try to fetch metar.

If in cron mode, we'll try to get metar from cron cache (generated by a cron fetcher). If specified airport not found in cache and config['fallback_once'] is True, we'll try to fetch from once fetchers.

Parameters:

Name Type Description Default
icao str

ICAO of the airport.

required
ignore_case bool

Ignore ICAO case.

True
Source code in src/pyfsd/metar/manager.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
async def fetch(
    self, icao: str, *, ignore_case: bool = True
) -> "WeatherProfile | None":
    """Try to fetch metar.

    If in cron mode, we'll try to get metar from cron cache (generated by a cron
        fetcher). If specified airport not found in cache and
        `config['fallback_once']` is True, we'll try to fetch from once fetchers.

    Args:
        icao: ICAO of the airport.
        ignore_case: Ignore ICAO case.
    """
    if ignore_case:
        icao = icao.upper()

    fallback_once = self.config.get("fallback_once", None)

    if self.cron_time is not None:
        if icao in self.metar_cache:
            return self.metar_cache[icao]
        if fallback_once:
            # Already uppercased
            return await self.fetch_once(icao, ignore_case=False)
        return None
    return await self.fetch_once(icao, ignore_case=False)
fetch_once async
fetch_once(icao: str, *, ignore_case: bool = True) -> WeatherProfile | None

Try to fetch metar from once fetchers.

Parameters:

Name Type Description Default
icao str

ICAO of the airport.

required
ignore_case bool

Ignore ICAO case.

True

Returns:

Type Description
WeatherProfile | None

The parsed Metar or None if nothing fetched.

Source code in src/pyfsd/metar/manager.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def fetch_once(
    self,
    icao: str,
    *,
    ignore_case: bool = True,
) -> "WeatherProfile | None":
    """Try to fetch metar from once fetchers.

    Args:
        icao: ICAO of the airport.
        ignore_case: Ignore ICAO case.

    Returns:
        The parsed Metar or None if nothing fetched.
    """
    if ignore_case:
        icao = icao.upper()

    for name, fetcher in self.used_fetchers["once"].items():
        try:
            metar = await fetcher(self.config, icao)
            if metar is not None:
                return metar
        except (KeyboardInterrupt, CancelledError):
            raise
        except (VerifyKeyError, VerifyTypeError) as err:
            await logger.aerror(
                f"Metar fetcher {name} doesn't work because {err!s}"
            )
        except BaseException:
            await logger.aexception("Exception raised when fetching metar")
    return None
get_cron_task
get_cron_task() -> Task[NoReturn]

Get cron fetching task.

Raises:

Type Description
RuntimeError

if not in cron mode.

Source code in src/pyfsd/metar/manager.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_cron_task(self) -> "Task[NoReturn]":
    """Get cron fetching task.

    Raises:
        RuntimeError: if not in cron mode.
    """
    if self.cron_time is None:
        raise RuntimeError("Not in cron mode")
    if self.cron_task is not None:
        return self.cron_task

    async def runner() -> NoReturn:
        if self.cron_time is None:
            raise RuntimeError("***BUG: cron_time is None")
        while True:
            await self.cache_metar()
            await asleep(self.cron_time)

    self.cron_task = create_task(runner(), name="cron_metar_fetcher")
    return self.cron_task
register_cron_fetcher
register_cron_fetcher(name: str, fetcher: CF) -> CF

Register a cron mode fetcher.

Source code in src/pyfsd/metar/manager.py
106
107
108
109
110
111
def register_cron_fetcher(self, name: str, fetcher: CF) -> CF:
    """Register a cron mode fetcher."""
    if name in self.fetchers["cron"]:
        raise RuntimeError(f"cron fetcher '{name}' already exists")
    self.fetchers["cron"][name] = fetcher
    return fetcher
register_once_fetcher
register_once_fetcher(name: str, fetcher: OF) -> OF

Register a once mode fetcher.

Source code in src/pyfsd/metar/manager.py
113
114
115
116
117
118
def register_once_fetcher(self, name: str, fetcher: OF) -> OF:
    """Register a once mode fetcher."""
    if name in self.fetchers["once"]:
        raise RuntimeError(f"once fetcher '{name}' already exists")
    self.fetchers["once"][name] = fetcher
    return fetcher

PyFSDMetarConfig

Bases: TypedDict

PyFSD metar config.

Attributes:

Name Type Description
mode Literal['cron', 'once']

Mode to fetch metar. once means fetch at once when client request metar, cron means cache all airports' metar every specified interval.

fallback_once NotRequired[bool]

If specified airport not found in cron metar, fetch by once or not. Will be ignored if not in cron mode.

fetchers list

Enabled metar fetchers.

cron_time NotRequired[Union[float, int]]

The cron mode's specified interval. (see mode)

suppress_metar_parser_warning

suppress_metar_parser_warning() -> None

Suppress metar parser's warnings.

Source code in src/pyfsd/metar/manager.py
59
60
61
def suppress_metar_parser_warning() -> None:
    """Suppress metar parser's warnings."""
    filterwarnings("ignore", category=RuntimeWarning, module="metar.Metar")

profile

fsd/wprofile implemented in Python.

CloudLayer dataclass

CloudLayer(ceiling: int, floor: int, coverage: int = 0, icing: int = 0, turbulence: int = 0)

This dataclass describes a cloud layer.

TempLayer dataclass

TempLayer(ceiling: int, temp: int = 0)

This dataclass describes a temperature layer.

Attributes:

Name Type Description
temp int

The temperature.

WeatherProfile dataclass

WeatherProfile(metar: str, creation: int = lambda: int(time())(), name: Optional[str] = None, season: int = 0, active: bool = False, dew_point: int = 0, visibility: float = 15.0, barometer: int = 2950, winds: tuple[WindLayer, WindLayer, WindLayer, WindLayer] = lambda: (WindLayer(-1, -1), WindLayer(10400, 2500), WindLayer(22600, 10400), WindLayer(90000, 20700))(), temps: tuple[TempLayer, TempLayer, TempLayer, TempLayer] = lambda: (TempLayer(100), TempLayer(10000), TempLayer(18000), TempLayer(35000))(), clouds: tuple[CloudLayer, CloudLayer] = lambda: (CloudLayer(-1, -1), CloudLayer(-1, -1))(), tstorm: CloudLayer = lambda: CloudLayer(-1, -1)(), skip_parse: InitVar[bool] = False)

Profile of weather.

Attributes:

Name Type Description
metar str

Original METAR.

creation int

Create time of the profile.

name Optional[str]

Metar station.

season int

Season of the metar's time.

active bool

The profile is activate or not.

dew_point int

Dew point.

visibility float

Visibility, in MI (maybe)

barometer int

Barometer.

__post_init__
__post_init__(skip_parse: bool) -> None

Call feed_metar conditionally.

Source code in src/pyfsd/metar/profile.py
169
170
171
172
def __post_init__(self, skip_parse: bool) -> None:
    """Call feed_metar conditionally."""
    if not skip_parse:
        self.feed_metar()
clone
clone() -> WeatherProfile

Clone myself.

Source code in src/pyfsd/metar/profile.py
174
175
176
def clone(self) -> "WeatherProfile":
    """Clone myself."""
    return deepcopy(self)
feed_metar
feed_metar() -> None

Parse metar.

Note

I don't know what does ceiling or floor stands for, these code are heavily based on FSD.

Source code in src/pyfsd/metar/profile.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def feed_metar(self) -> None:
    """Parse metar.

    Note:
        I don't know what does ceiling or floor stands for,
        these code are heavily based on FSD.
    """
    metar = Metar(self.metar, strict=False)
    if metar.station_id is not None:
        self.name = metar.station_id

    # Wind
    if metar.wind_speed is not None and metar.wind_dir is not None:
        if metar.wind_gust is not None:
            self.winds[0].gusting = 1
        self.winds[0].speed = int(metar.wind_speed.value())
        self.winds[0].ceiling = 2500
        self.winds[0].floor = 0
        self.winds[0].direction = int(metar.wind_dir.value())
    # Visibility
    if metar.vis is not None:
        vis = metar.vis.value("M")
        if vis == 10000:
            self.visibility = 15
            if "9999" not in metar.code:
                self.clouds[1].ceiling = 26000
                self.clouds[1].floor = 24000
                self.clouds[1].icing = 0
                self.clouds[1].turbulence = 0
                self.clouds[1].coverage = 1
        elif "M1/4SM" in metar.code:
            self.visibility = 0.15
        else:
            self.visibility = metar.vis.value("MI")
    # Runway visual range: nothing
    # Weather parsing: nothing
    # Sky
    sky_coverage = {
        "SKC": 0,
        "CLR": 0,
        "VV": 8,
        "FEW": 1,
        "SCT": 3,
        "BKN": 5,
        "OVC": 8,
    }
    for i, sky in enumerate(metar.sky[:2]):
        sky_status, distance, _ = sky
        with contextlib.suppress(KeyError):
            self.clouds[i].coverage = sky_coverage[sky_status]
        if distance is not None:
            self.clouds[i].floor = int(distance.value())
    if len(metar.sky) >= 2:
        if self.clouds[1].floor > self.clouds[0].floor:
            self.clouds[0].ceiling = (
                self.clouds[0].floor
                + (self.clouds[1].floor - self.clouds[0].floor) // 2
            )
            self.clouds[1].ceiling = self.clouds[1].floor + 3000
        else:
            self.clouds[1].ceiling = (
                self.clouds[1].floor
                + (self.clouds[0].floor - self.clouds[1].floor) // 2
            )
            self.clouds[0].ceiling = self.clouds[0].floor + 3000
        self.clouds[0].turbulence = (
            self.clouds[0].ceiling - self.clouds[0].floor
        ) // 175
        self.clouds[1].turbulence = (
            self.clouds[1].ceiling - self.clouds[1].floor
        ) // 175
    elif len(metar.sky) == 1:
        self.clouds[0].ceiling = self.clouds[0].floor + 3000
        self.clouds[0].turbulence = 17
    # Temp
    if metar.temp is not None and metar.dewpt is not None:
        temp: int = int(metar.temp.value())
        self.temps[0].temp = temp
        self.dew_point = int(metar.dewpt.value())
        if -10 < temp < 10:
            if self.clouds[0].ceiling < 12000:
                self.clouds[0].icing = 1
            if self.clouds[1].ceiling < 12000:
                self.clouds[1].icing = 1
    # Barometer
    if metar.press is not None:
        self.barometer = round(metar.press.value("IN") * 100)
    else:
        self.barometer = 2992
fix
fix(position: Position) -> None

Fix this profile at a point.

Source code in src/pyfsd/metar/profile.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def fix(self, position: "Position") -> None:
    """Fix this profile at a point."""
    a1 = position[0]
    a2 = fabs(position[1] / 18)
    now = datetime.now(timezone.utc)
    season = get_season(now.month, swap=a1 < 0)
    lat_var = get_variation(now, VAR_UPDIRECTION, -25, 25)
    self.winds[3].direction = round(6 if a1 > 0 else -6 * a1 + lat_var + a2)
    self.winds[3].direction = (self.winds[3].direction + 360) % 360

    max_velocity = 0
    if season == 0:
        max_velocity = 120
    elif season == 1:
        max_velocity = 80
    elif season == 2:
        max_velocity = 50

    self.winds[3].speed = round(fabs(sin(a1 * pi / 180.0)) * max_velocity)
    # ------
    lat_var = get_variation(now, VAR_MIDDIRECTION, 10, 45)
    coriolis_var = get_variation(now, VAR_MIDCOR, 10, 30)
    self.winds[2].direction = round(
        6 if a1 > 0 else -6 * a1 + lat_var + a2 - coriolis_var,
    )
    self.winds[2].direction = (self.winds[2].direction + 360) % 360

    self.winds[2].speed = int(
        self.winds[3].speed * (get_variation(now, VAR_MIDSPEED, 500, 800) / 1000.0),
    )
    # ------
    coriolis_var_low = coriolis_var + get_variation(now, VAR_LOWCOR, 10, 30)
    lat_var = get_variation(now, VAR_LOWDIRECTION, 10, 45)
    self.winds[1].direction = round(
        6 if a1 > 0 else -6 * a1 + lat_var + a2 - coriolis_var_low,
    )
    self.winds[1].direction = (self.winds[1].direction + 360) % 360

    self.winds[1].speed = (self.winds[0].speed + self.winds[1].speed) // 2
    # ------
    self.temps[3].temp = -57 + get_variation(now, VAR_UPTEMP, -4, 4)
    self.temps[2].temp = -21 + get_variation(now, VAR_MIDTEMP, -7, 7)
    self.temps[1].temp = -5 + get_variation(now, VAR_LOWTEMP, -12, 12)

WindLayer dataclass

WindLayer(ceiling: int, floor: int, direction: int = 0, speed: int = 0, gusting: int = 0, turbulence: int = 0)

This dataclass describes a wind layer.

Attributes:

Name Type Description
gusting int

The wind is gusting or not.

speed int

Windspeed.

direction int

Direction of the wind.

get_now_variation cached

get_now_variation(seed: int) -> tuple[int, int, int, int, int, int, int, int, int, int]

Get current variation.

Source code in src/pyfsd/metar/profile.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@lru_cache(maxsize=1)
def get_now_variation(
    seed: int,
) -> tuple[int, int, int, int, int, int, int, int, int, int]:
    """Get current variation."""
    mrand.srand(seed)
    return (
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
        mrand(),
    )

get_season

get_season(month: int, *, swap: bool) -> int

Get season by month.

Parameters:

Name Type Description Default
month int

The month.

required
swap bool

Swap spring and autumn or not.

required

Returns:

Name Type Description
season int

The season. Note it starts from 0.

Source code in src/pyfsd/metar/profile.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get_season(month: int, *, swap: bool) -> int:
    """Get season by month.

    Args:
        month: The month.
        swap: Swap spring and autumn or not.

    Returns:
        season: The season. Note it starts from 0.
    """
    if month in [12, 1, 2]:
        return 2 if swap else 0
    if month in [3, 4, 5]:
        return 1
    if month in [6, 7, 8]:
        return 0 if swap else 2
    if month in [9, 10, 11]:
        return 1
    raise ValueError(f"Invalid month {month}")

get_variation

get_variation(now: datetime, num: int, min_: int, max_: int) -> int

Get variation.

Source code in src/pyfsd/metar/profile.py
54
55
56
57
58
59
def get_variation(now: datetime, num: int, min_: int, max_: int) -> int:
    """Get variation."""
    return (
        abs(get_now_variation(now.hour * (now.year - 1900) * now.month)[num])
        % (max_ - min_ + 1)
    ) + min_

object

PyFSD objects.

client

Client object's dataclasses.

Client dataclass

Client(is_controller: bool, callsign: bytes, rating: int, cid: str, protocol: int, realname: bytes, sim_type: int, transport: Transport, position: Position = (0, 0), transponder: int = 0, altitude: int = 0, ground_speed: int = 0, frequency: int = 0, facility_type: int = 0, visual_range: int = 40, flags: int = 0, pbh: int = 0, flight_plan: Optional[FlightPlan] = None, sector: Optional[bytes] = None, ident_flag: Optional[bytes] = None, start_time: int = lambda: int(time())(), last_updated: int = lambda: int(time())())

This dataclass stores a client.

frequency_ok property
frequency_ok: bool

The frequency is valid or not.

position_ok property
position_ok: bool

The position is valid or not.

get_range
get_range() -> int

Get visual range.

Source code in src/pyfsd/object/client.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_range(self) -> int:
    """Get visual range."""
    if not self.is_controller:
        altitude: int
        if self.altitude is None or self.altitude < 0:
            altitude = 0
        else:
            altitude = self.altitude
        return int(10 + 1.414 * sqrt(altitude))
    if self.facility_type in {2, 3}:
        # CLR_DEL or GROUND
        return 5
    if self.facility_type == 4:
        # TOWER
        return 30
    if self.facility_type == 5:
        # APP/DEP
        return 100
    if self.facility_type == 6:
        # CENTER
        return 400
    if self.facility_type in {1, 7}:
        # FSS or MONITOR
        return 1500
    # Unknown
    return 40
update_ATC_position
update_ATC_position(frequency: int, facility_type: int, visual_range: int, lat: float, lon: float, altitude: int) -> None

Update ATC position.

Source code in src/pyfsd/object/client.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def update_ATC_position(  # noqa: N802
    self,
    frequency: int,
    facility_type: int,
    visual_range: int,
    lat: float,
    lon: float,
    altitude: int,
) -> None:
    """Update ATC position."""
    self.frequency = frequency
    self.facility_type = facility_type
    self.visual_range = visual_range
    self.position = (lat, lon)
    self.altitude = altitude
    self.last_updated = int(time())
update_pilot_position
update_pilot_position(mode: bytes, transponder: int, lat: float, lon: float, altitdue: int, groundspeed: int, pbh: int, flags: int) -> None

Update pilot position.

Source code in src/pyfsd/object/client.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def update_pilot_position(
    self,
    mode: bytes,
    transponder: int,
    lat: float,
    lon: float,
    altitdue: int,
    groundspeed: int,
    pbh: int,
    flags: int,
) -> None:
    """Update pilot position."""
    self.ident_flag = mode
    self.transponder = transponder
    self.position = (lat, lon)
    self.altitude = altitdue
    self.ground_speed = groundspeed
    self.pbh = pbh
    self.flags = flags
    self.last_updated = int(time())
update_plan
update_plan(plan_type: bytes, aircraft: bytes, tascruise: int, dep_airport: bytes, dep_time: int, act_dep_time: int, alt: bytes, dest_airport: bytes, hrs_enroute: int, min_enroute: int, hrs_fuel: int, min_fuel: int, alt_airport: bytes, remarks: bytes, route: bytes) -> int

Update flight plan.

Source code in src/pyfsd/object/client.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def update_plan(
    self,
    plan_type: bytes,
    aircraft: bytes,
    tascruise: int,
    dep_airport: bytes,
    dep_time: int,
    act_dep_time: int,
    alt: bytes,
    dest_airport: bytes,
    hrs_enroute: int,
    min_enroute: int,
    hrs_fuel: int,
    min_fuel: int,
    alt_airport: bytes,
    remarks: bytes,
    route: bytes,
) -> int:
    """Update flight plan."""
    revision: int
    revision = self.flight_plan.revision + 1 if self.flight_plan is not None else 0
    self.flight_plan = FlightPlan(
        revision,
        plan_type,
        aircraft,
        tascruise,
        dep_airport,
        dep_time,
        act_dep_time,
        alt,
        dest_airport,
        hrs_enroute,
        min_enroute,
        hrs_fuel,
        min_fuel,
        alt_airport,
        remarks,
        route,
    )
    return revision

FlightPlan dataclass

FlightPlan(revision: int, type: bytes, aircraft: bytes, tascruise: int, dep_airport: bytes, dep_time: int, act_dep_time: int, alt: bytes, dest_airport: bytes, hrs_enroute: int, min_enroute: int, hrs_fuel: int, min_fuel: int, alt_airport: bytes, remarks: bytes, route: bytes)

This dataclass describes a flight plan.

Attributes:

Name Type Description
type bytes

b"I" => IFR, b"V" => VFR

plugin

PyFSD plugin architecture.

Attributes:

Name Type Description
API_LEVEL tuple[int, int]

Current PyFSD plugin api level, (major, minor). If changes will break something, major is increased, otherwise minor.

EventResult

event handle result for handleable events.

EventListenersDict

Bases: TypedDict

Dict that stores event listeners (handlers & auditers).

Plugin

Base class of a PyFSD plugin.

Attributes:

Name Type Description
name str

Name of this plugin.

api tuple[int, int]

API level of this plugin. See pyfsd.plugin.API_LEVEL

version tuple[int, str]

int and human readable version of this plugin.

expected_config Union[type[TypedDict], dict, None]

Configuration structure description, in dict or TypedDict, which is structure parameter of pyfsd.define.check_dict function. use None to disable config check.

__eq__

__eq__(value: object) -> bool

Check if this plugin equals to another one.

Source code in src/pyfsd/plugin/__init__.py
85
86
87
88
89
90
91
92
93
94
95
def __eq__(self, value: object, /) -> bool:
    """Check if this plugin equals to another one."""
    if value is self:
        return True
    if isinstance(value, Plugin):
        return (
            self.name == value.name
            and self.api == value.api
            and self.version == value.version
        )
    return NotImplemented

__hash__

__hash__() -> int

Return hash of this plugin.

Source code in src/pyfsd/plugin/__init__.py
80
81
82
83
def __hash__(self) -> int:
    """Return hash of this plugin."""
    # self.name is ensured to be unique by PluginManager
    return hash(self.name)

__repr__

__repr__() -> str

Return the canonical string representation of this plugin.

Source code in src/pyfsd/plugin/__init__.py
97
98
99
def __repr__(self) -> str:
    """Return the canonical string representation of this plugin."""
    return f"<PyFSDPlugin {self.name} v{self.version[1]} ({self.version[0]})>"

setup async

Setup this plugin.

Returns:

Type Description
Optional[EventListenersDict]

A dict that stores event listeners. See pyfsd.plugin.EventListenersDict None if this plugin does not register event listeners.

Source code in src/pyfsd/plugin/__init__.py
101
102
103
104
105
106
107
async def setup(self) -> Optional[EventListenersDict]:
    """Setup this plugin.

    Returns:
        A dict that stores event listeners. See `pyfsd.plugin.EventListenersDict`
            None if this plugin does not register event listeners.
    """

PluginHandledEventResult

Bases: TypedDict

A result handled by plugin.

This means a plugin raised pyfsd.plugin.PreventEvent.

Attributes:

Name Type Description
handled_by_plugin Literal[True]

Event handled by plugin or not.

plugin_name Literal[True]

Name of the plugin.

PreventEvent

PreventEvent(result: Optional[dict] = None)

Bases: BaseException

Prevent a PyFSD plugin event.

Attributes:

Name Type Description
result dict

The event result reported by plugin.

Source code in src/pyfsd/plugin/__init__.py
49
50
51
52
53
def __init__(self, result: Optional[dict] = None) -> None:
    """Create a PreventEvent instance."""
    if result is None:
        result = {}
    self.result = result

PyFSDHandledEventResult

Bases: TypedDict

A result handled by PyFSD.

Attributes:

Name Type Description
handled_by_plugin Literal[False]

Event handled by plugin or not.

success bool

The event successfully handled or not.

SimplePlugin dataclass

SimplePlugin(name: str, api: tuple[int, int], version: tuple[int, str], expected_config: Union[type[TypedDict], dict, None], listeners: EventListenersDict = lambda: {'auditers': {}, 'handlers': {}}())

Bases: Plugin

Create a simple plugin by decorators.

Attributes:

Name Type Description
listeners EventListenersDict

Event listeners.

audit

audit(event: str) -> Callable[[C], C]

Add a event auditer for specified event.

Source code in src/pyfsd/plugin/__init__.py
156
157
158
159
160
161
162
163
164
165
def audit(self, event: str) -> Callable[[C], C]:
    """Add a event auditer for specified event."""
    if event not in self.listeners["auditers"]:
        self.listeners["auditers"][event] = []

    def decorator(auditer: C) -> C:
        self.listeners["auditers"][event].append(auditer)
        return auditer

    return decorator

handle

handle(event: str) -> Callable[[C], C]

Add a event handler for specified event.

Source code in src/pyfsd/plugin/__init__.py
145
146
147
148
149
150
151
152
153
154
def handle(self, event: str) -> Callable[[C], C]:
    """Add a event handler for specified event."""
    if event not in self.listeners["handlers"]:
        self.listeners["handlers"][event] = []

    def decorator(handler: C) -> C:
        self.listeners["handlers"][event].append(handler)
        return handler

    return decorator

setup async

setup() -> EventListenersDict

Return listeners registered by self.handle() and self.audit() before.

Source code in src/pyfsd/plugin/__init__.py
139
140
141
142
143
async def setup(self) -> EventListenersDict:
    """Return listeners registered by self.handle() and self.audit() before."""
    if callable(pre_setup := getattr(self, "__pre_setup", None)):
        await pre_setup()
    return self.listeners

setuper

setuper(setuper: C) -> C

Set setuper.

Source code in src/pyfsd/plugin/__init__.py
167
168
169
170
171
172
def setuper(self, setuper: C) -> C:
    """Set setuper."""
    if callable(getattr(self, "__pre_setup", None)):
        raise TypeError("setuper already exist")
    object.__setattr__(self, "__pre_setup", setuper)
    return setuper

StubPlugin dataclass

StubPlugin(name: str, api: tuple[int, int], version: tuple[int, str], expected_config: Union[type[TypedDict], dict, None])

Bases: Plugin

Stub plugin that does nothing.

collect

Tools to collect PyFSD plugins.

iter_submodules

iter_submodules(path: Iterable[str], name: str, error_handler: Optional[Callable[[str], None]] = None) -> Iterable[ModuleType]

Yields {name}'s submodules on path.

Parameters:

Name Type Description Default
path Iterable[str]

search path, like package.path

required
name str

package name, like package.name

required
error_handler Optional[Callable[[str], None]]

Handler that will be called because of uncaught exception

None

Returns:

Type Description
Iterable[ModuleType]

Submodules.

Source code in src/pyfsd/plugin/collect.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def iter_submodules(
    path: Iterable[str],
    name: str,
    error_handler: Optional[Callable[[str], None]] = None,
) -> Iterable["ModuleType"]:
    """Yields {name}'s submodules on path.

    Args:
        path: search path, like package.__path__
        name: package name, like package.__name__
        error_handler: Handler that will be called because of uncaught exception

    Returns:
        Submodules.
    """
    for module_info in iter_modules(path, name + "."):
        try:
            yield import_module(module_info.name)
        # ruff: noqa: PERF203
        except BaseException:
            if error_handler:
                error_handler(module_info.name)
            else:
                raise

manager

PyFSD plugin manager.

how the plugin architecture works:

Assume your plugin registered a handler and an auditer:

async def setup():
    return {
        "handler": {"some_event": (self.handle_sth,)},
        "auditer": {"some_event": (self.audit_sth,)},
    }

If this event is handleable, then somewhere of PyFSD will call

PluginManager.trigger_event_handlers("some_event", ...)

So handler in your plugin got called:

def handle_sth(...) -> None: pass

If the handler prevented the event by raise PreventEvent, then this event won't be passed to other plugins and PyFSD won't handle the event too.

Later after this event processed (handled by PyFSD or prevented by one plugin), PyFSD'll call PluginManager.trigger_event_auditers("some_event", ...), so auditer in your plugin got called:

def audit_sth(..) -> None: pass

PluginManager

PyFSD Plugin manager.

Attributes:

Name Type Description
all_plugins Optional[tuple[Plugin, ...]]

All collected plugin files.

sorted_plugins Optional[SortedPlugins]

Plugins sorted with event name which it audits or handles.

awaitable_services Optional[SortedPlugins]

Registered awaitable services.

__repr__
__repr__() -> str

Return the canonical string representation.

Source code in src/pyfsd/plugin/manager.py
354
355
356
357
358
359
360
def __repr__(self) -> str:
    """Return the canonical string representation."""
    if not self.all_plugins:
        return "<pyfsd.plugin.manager.PluginManager (not initialized)>"
    return (
        f"<pyfsd.plugin.manager.PluginManager ({len(self.all_plugins)}plugin(s))>"
    )
__str__
__str__() -> str

Return all plugins' name.

Source code in src/pyfsd/plugin/manager.py
362
363
364
365
366
def __str__(self) -> str:
    """Return all plugins' name."""
    if not self.all_plugins:
        return ""
    return ", ".join(plugin.name for plugin in self.all_plugins)
pick_plugins async
pick_plugins(plugin_config_root: dict) -> None

Pick all plugins into self.all_plugins & self.sorted_plugins.

Source code in src/pyfsd/plugin/manager.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
async def pick_plugins(self, plugin_config_root: dict) -> None:
    """Pick all plugins into self.all_plugins & self.sorted_plugins."""

    def setattr1(obj: object, name: str, val: object) -> None:
        """Set attribute forcefully."""
        object.__setattr__(obj, name, val)

    used_name = []
    all_plugins: list[Plugin] = []
    plugins_handlers: dict[Plugin, EventListenersDict] = {}

    for module in iter_submodules(
        plugins.__path__, plugins.__name__, deal_exception
    ):
        plugin = getattr(module, "pyfsd_plugin", None)
        path = brief_path(getfile(module))
        if not isinstance(plugin, Plugin):
            await logger.awarning(f"Invalid plugin: {path}")
            continue

        default_name = getfile(module).split("/")[-1][:-3]
        if not hasattr(plugin, "name"):
            setattr1(plugin, "name", default_name)
        if not hasattr(plugin, "api"):
            setattr1(plugin, "api", (-1, 0))
        if not hasattr(plugin, "version"):
            setattr1(plugin, "version", (0, "unknown"))
        if not hasattr(plugin, "expected_config"):
            setattr1(plugin, "expected_config", None)

        # ruff: noqa: PLR2004
        if (
            (not (plugin_name_ok := isinstance(plugin.name, str)))
            or (not isinstance(plugin.api, tuple))
            or (not isinstance(plugin.version, tuple))
            or (len(plugin.api) != 2)
            or (len(plugin.version) != 2)
            or (not isinstance(plugin.api[0], int))
            or (not isinstance(plugin.api[1], int))
            or (not isinstance(plugin.version[0], int))
            or (not isinstance(plugin.version[1], str))
            or (
                plugin.expected_config is not None
                and not isinstance(plugin.expected_config, dict)
            )
            or (not callable(getattr(plugin, "setup", None)))
        ):
            await logger.aerror(
                "Cannot load plugin "
                + (plugin.name if plugin_name_ok else default_name)
                + ": malformed plugin",
            )
            continue

        # Check API
        # We expect <major> is the same as API_LEVEL's major
        # and <minor> <= API_LEVEL's minor
        if plugin.api[0] != API_LEVEL[0] or plugin.api[1] > API_LEVEL[1]:
            await logger.aerror(
                f"Cannot load plugin {plugin.name}: needs API {plugin.api}"
            )
            continue

        # Check config
        if plugin.expected_config is not None:
            plugin_config = plugin_config_root.get(plugin.name)
            if plugin_config is None:
                await logger.aerror(
                    f"Cannot load plugin {plugin.name}: config required"
                )
                continue
            config_errors = tuple(
                check_dict(
                    plugin_config,
                    plugin.expected_config,
                    name=f"plugin[{plugin.name!r}]",
                    allow_extra_keys=True,
                )
            )
            if config_errors:
                await logger.aerror(
                    f"Cannot load plugin {plugin.name}: invalid config",
                    stack="\n".join(f"  {err!s}" for err in config_errors),
                )
                continue

        # Check duplicated
        if plugin in all_plugins:
            await logger.awarning(f"Duplicated plugin {plugin.name}, skip loading")
            continue

        if plugin.name in used_name:
            new_name = plugin.name
            while new_name in used_name:
                # ruff: noqa: S311
                new_name = f"{plugin.name}_{''.join(choices(ascii_letters, k=5))}"
            await logger.awarning(
                f"Replacing duplicated plugin name {plugin.name} with {new_name}"
            )
            setattr1(plugin, "name", new_name)

        # Nothing wrong, load it
        used_name.append(plugin.name)
        await logger.adebug(f"Loading plugin {plugin!r}")
        try:
            if handlers := await plugin.setup():
                plugins_handlers[plugin] = handlers
        except (KeyboardInterrupt, CancelledError):
            raise
        except BaseException:
            await logger.aexception(
                f"Error happened when loading plugin {plugin.name}",
            )
        all_plugins.append(plugin)

    self.all_plugins = tuple(all_plugins)
    self.sort_pyfsd_plugins(plugins_handlers)
plugins_count
plugins_count() -> int

Get count of plugins.

Source code in src/pyfsd/plugin/manager.py
368
369
370
def plugins_count(self) -> int:
    """Get count of plugins."""
    return len(self.all_plugins) if self.all_plugins else 0
sort_pyfsd_plugins
sort_pyfsd_plugins(plugins_handlers: dict[Plugin, EventListenersDict]) -> None

Sort PyFSD plugins into self.pyfsd_plugins.

Parameters:

Name Type Description Default
plugins_handlers dict[Plugin, EventListenersDict]

{"plugin_name": , ...}

required
Source code in src/pyfsd/plugin/manager.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def sort_pyfsd_plugins(
    self, plugins_handlers: dict[Plugin, EventListenersDict]
) -> None:
    """Sort PyFSD plugins into self.pyfsd_plugins.

    Args:
        plugins_handlers: {"plugin_name": <EventListenersDict>, ...}
    """
    all_auditers: dict[str, list[tuple[Plugin, Callable[..., Awaitable]]]] = {}
    all_handlers: dict[str, list[tuple[Plugin, Callable[..., Awaitable]]]] = {}

    for plugin, listeners in plugins_handlers.items():
        for event_name, plugin_auditers in listeners["auditers"].items():
            if event_name not in all_auditers:
                all_auditers[event_name] = []
            all_auditers[event_name].extend(
                (plugin, auditer) for auditer in plugin_auditers
            )
        for event_name, plugin_handlers in listeners["handlers"].items():
            if event_name not in all_handlers:
                all_handlers[event_name] = []
            all_handlers[event_name].extend(
                (plugin, handler) for handler in plugin_handlers
            )

    self.sorted_plugins = {
        "auditers": {
            name: tuple(auditer_infos)
            for name, auditer_infos in all_auditers.items()
        },
        "handlers": {
            name: tuple(handler_infos)
            for name, handler_infos in all_handlers.items()
        },
    }
trigger_event_auditers async
trigger_event_auditers(event_name: str, args: Iterable, kwargs: Mapping) -> None

Trigger a audit event and call auditers from plugins.

Source code in src/pyfsd/plugin/manager.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
async def trigger_event_auditers(
    self,
    event_name: str,
    args: Iterable,
    kwargs: Mapping,
) -> None:
    """Trigger a audit event and call auditers from plugins."""
    if self.sorted_plugins is None:
        raise RuntimeError("plugins not sorted")

    async def auditer_runner(auditer: Callable[..., Awaitable], name: str) -> None:
        try:
            await auditer(*args, **kwargs)
        except (KeyboardInterrupt, CancelledError):
            raise
        except BaseException:
            await logger.aexception(f"Error happened when calling plugin {name}")

    # run auditers together since we don't expect response from them
    await gather(
        *(
            auditer_runner(auditer, plugin.name)
            for plugin, auditer in self.sorted_plugins["auditers"].get(
                event_name, ()
            )
        )
    )
trigger_event_auditers_nonblock
trigger_event_auditers_nonblock(event_name: str, args: Iterable, kwargs: Mapping) -> None

Trigger a audit event and call auditers from plugins, not blocking.

Source code in src/pyfsd/plugin/manager.py
343
344
345
346
347
348
349
350
351
352
def trigger_event_auditers_nonblock(
    self,
    event_name: str,
    args: Iterable,
    kwargs: Mapping,
) -> None:
    """Trigger a audit event and call auditers from plugins, not blocking."""
    mustdone_task_keeper.add(
        create_task(self.trigger_event_auditers(event_name, args, kwargs))
    )
trigger_event_handlers async
trigger_event_handlers(event_name: str, args: Iterable, kwargs: Mapping) -> PluginHandledEventResult | None

Trigger a handle event and call handlers from plugins.

Source code in src/pyfsd/plugin/manager.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def trigger_event_handlers(
    self,
    event_name: str,
    args: Iterable,
    kwargs: Mapping,
) -> "PluginHandledEventResult | None":
    """Trigger a handle event and call handlers from plugins."""
    if self.sorted_plugins is None:
        raise RuntimeError("plugins not sorted")
    for plugin, handler in self.sorted_plugins["handlers"].get(event_name, ()):
        try:
            await handler(*args, **kwargs)
        # ruff: noqa: PERF203
        except PreventEvent as prevent_result:
            return PluginHandledEventResult(
                **prevent_result.result,
                handled_by_plugin=True,
                plugin=plugin,
            )
        except (KeyboardInterrupt, CancelledError):
            raise
        except BaseException:
            await logger.aexception(
                f"Error happened when calling plugin {plugin.name}",
            )
    return None

SortedPlugins

Bases: TypedDict

A dict that stores PyFSD plugins that can handle specified events.

Attributes:

Name Type Description
auditers dict[str, tuple[tuple[Plugin, Callable[..., Awaitable]], ...]]

{ "event name": ((, ), ...), ... }

handlers dict[str, tuple[tuple[Plugin, Callable[..., Awaitable]], ...]]

{ "event name": ((, ), ...), ... }

brief_path

brief_path(path: str) -> str

Shorten filepath to relative one if it's under current working directory.

Source code in src/pyfsd/plugin/manager.py
102
103
104
105
106
def brief_path(path: str) -> str:
    """Shorten filepath to relative one if it's under current working directory."""
    if path.startswith(_cwd):
        return path[len(_cwd) + 1 :]
    return path

deal_exception

deal_exception(name: str) -> None

Handle exceptions when importing plugins.

Source code in src/pyfsd/plugin/manager.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def deal_exception(name: str) -> None:
    """Handle exceptions when importing plugins."""
    type_, exception, traceback = exc_info()

    if exception is not None and type_ in (
        KeyboardInterrupt,
        CancelledError,
    ):
        raise exception.with_traceback(traceback)

    # Cut traceback to plugin file
    current_traceback = traceback
    while (
        current_traceback is not None
        and current_traceback.tb_next is not None
        and current_traceback.tb_frame.f_code.co_name != "<module>"
    ):
        current_traceback = current_traceback.tb_next
    traceback = current_traceback

    logger.exception(
        "Error happened during load plugin %s",
        name,
        exc_info=(type_, exception, traceback),
    )

plugins

Package of PyFSD Plugins.

protocol

PyFSD protocols.

LineProtocol

Bases: LineReceiver

Protocol to deal with lines.

Attributes:

Name Type Description
buffer bytes

Buffer used to store a line's data.

delimiter bytes

Line delimiter.

buffer_size_exceed

buffer_size_exceed(length: int) -> None

Kill when line length exceed max length.

Source code in src/pyfsd/protocol/__init__.py
69
70
71
def buffer_size_exceed(self, length: int) -> None:
    """Kill when line length exceed max length."""
    self.transport.close()

connection_made

connection_made(transport: Transport) -> None

Save transport after the connection was made.

Source code in src/pyfsd/protocol/__init__.py
64
65
66
def connection_made(self, transport: "Transport") -> None:  # type: ignore[override]
    """Save transport after the connection was made."""
    self.transport = transport

send_line

send_line(line: bytes) -> None

Send line to client.

Source code in src/pyfsd/protocol/__init__.py
73
74
75
def send_line(self, line: bytes) -> None:
    """Send line to client."""
    self.transport.write(line + self.delimiter)

send_lines

send_lines(*lines: bytes, auto_newline: bool = True, together: bool = True) -> None

Send lines to client.

Parameters:

Name Type Description Default
lines bytes

Lines to be sent.

()
auto_newline bool

Insert newline between every two line or not.

True
together bool

Send lines together or not.

True
Source code in src/pyfsd/protocol/__init__.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def send_lines(
    self,
    *lines: bytes,
    auto_newline: bool = True,
    together: bool = True,
) -> None:
    """Send lines to client.

    Args:
        lines: Lines to be sent.
        auto_newline: Insert newline between every two line or not.
        together: Send lines together or not.
    """
    if together:
        self.transport.write(
            join_lines(*lines, newline=auto_newline),
        )
    else:
        for line in lines:
            self.transport.write(
                (line + self.delimiter) if auto_newline else line,
            )

LineReceiver

Bases: Protocol

Line receiver.

Attributes:

Name Type Description
buffer bytes

Buffer used to store a line's data.

delimiter bytes

Line delimiter.

max_length bytes

Max acceptable line length. Set it to -1 to allow infinite

buffer_size_exceed abstractmethod

buffer_size_exceed(length: int) -> None

Called when buffer exceed max size.

Source code in src/pyfsd/protocol/__init__.py
33
34
35
36
@abstractmethod
def buffer_size_exceed(self, length: int) -> None:
    """Called when buffer exceed max size."""
    raise NotImplementedError

data_received

data_received(data: bytes) -> None

Handle data and call line_received as soon as we received a line.

Source code in src/pyfsd/protocol/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def data_received(self, data: bytes) -> None:
    """Handle data and call line_received as soon as we received a line."""
    if self.buffer_size != -1:
        length = len(self.buffer) + len(data)
        if length > self.buffer_size:
            self.buffer_size_exceed(length)

    self.buffer += data
    del data
    if self.delimiter in self.buffer:
        *lines, left = self.buffer.split(self.delimiter)
        for line in lines:
            self.line_received(line)
        self.buffer = left

line_received abstractmethod

line_received(line: bytes) -> None

Called when a line was received.

Source code in src/pyfsd/protocol/__init__.py
28
29
30
31
@abstractmethod
def line_received(self, line: bytes) -> None:
    """Called when a line was received."""
    raise NotImplementedError

client

PyFSD client protocol.

ClientProtocol

ClientProtocol(factory: ClientFactory)

Bases: LineProtocol

PyFSD client protocol.

Attributes:

Name Type Description
factory ClientFactory

The client protocol factory.

timeout_killer ClientFactory

Helper to disconnect when timeout.

transport Transport

Asyncio transport.

client Optional[Client]

The client info. None before #AA or #AP to create new client.

tasks Optional[Client]

Processing handle_line tasks.

Source code in src/pyfsd/protocol/client.py
156
157
158
159
160
161
162
163
def __init__(self, factory: "ClientFactory") -> None:
    """Create a ClientProtocol instance."""
    self.factory = factory
    self.client = None
    self.timeout_killer_task = None
    self.worker_task = None
    self.worker_queue = Queue()
    super().__init__()
buffer_size_exceed
buffer_size_exceed(length: int) -> None

Called when client exceed max buffer size.

Source code in src/pyfsd/protocol/client.py
260
261
262
263
264
265
266
267
def buffer_size_exceed(self, length: int) -> None:
    """Called when client exceed max buffer size."""
    logger.info(
        "Kicking %s: buffer size exceeded (%d)",
        self.get_description(),
        length,
    )
    return super().buffer_size_exceed(length)
connection_lost
connection_lost(exc: Optional[BaseException] = None) -> None

Handle connection lost.

Source code in src/pyfsd/protocol/client.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def connection_lost(self, exc: Optional[BaseException] = None) -> None:
    """Handle connection lost."""
    if self.timeout_killer_task:
        self.timeout_killer_task.cancel()
        self.timeout_killer_task = None
    if self.worker_task:
        self.worker_task.cancel()
        self.worker_task = None

    client = None
    if self.client is not None:
        self.factory.broadcast(
            make_packet(
                (
                    FSDClientCommand.REMOVE_ATC
                    if self.client.is_controller
                    else FSDClientCommand.REMOVE_PILOT
                )
                + self.client.callsign,
                self.client.cid.encode(),
            ),
            from_client=self.client,
        )
        del self.factory.clients[self.client.callsign]
        client = self.client
    logger.info(
        "%s disconnected%s.",
        self.get_description(),
        f" due to {exc}" if exc else "",
    )
    self.client = None

    self.factory.plugin_manager.trigger_event_auditers_nonblock(
        "client_disconnected",
        (self, client),
        {},
    )
connection_made
connection_made(transport: Transport) -> None

Initialize something after the connection is made.

Source code in src/pyfsd/protocol/client.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def connection_made(self, transport: "Transport") -> None:  # type: ignore[override]
    """Initialize something after the connection is made."""
    super().connection_made(transport)
    ip = self.transport.get_extra_info("peername")[0]
    if ip in self.factory.blacklist:
        logger.info("Kicking %s: blacklist", ip)
        self.transport.close()
        return

    self.worker_task = create_task(self.handle_line_worker_func())
    self.reset_timeout_killer()
    logger.info("New connection from %s.", ip)
    self.factory.plugin_manager.trigger_event_auditers_nonblock(
        "new_connection_established", (self,), {}
    )
get_description
get_description() -> str

Get text description of this client.

Source code in src/pyfsd/protocol/client.py
278
279
280
281
282
283
284
285
286
def get_description(self) -> str:
    """Get text description of this client."""
    if self.client is not None:
        return (
            cast("str", self.transport.get_extra_info("peername")[0])
            + f" ({self.client.callsign.decode(errors='replace')})"
        )

    return cast("str", self.transport.get_extra_info("peername")[0])
handle_ATC_position_update
handle_ATC_position_update(packet: tuple[bytes, ...]) -> HandleResult

Handle ATC position update request.

Source code in src/pyfsd/protocol/client.py
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
@check_packet(8)
def handle_ATC_position_update(  # noqa: N802
    self,
    packet: tuple[bytes, ...],
) -> HandleResult:
    """Handle ATC position update request."""
    assert self.client is not None
    (
        frequency,
        facility_type,
        visual_range,
        _,
        lat,
        lon,
        altitdue,
    ) = packet[1:8]
    lat_float = str_to_float(lat, default_value=0.0)
    lon_float = str_to_float(lon, default_value=0.0)
    frequency_int = str_to_int(frequency, default_value=0)
    facility_type_int = str_to_int(facility_type, default_value=0)
    visual_range_int = str_to_int(visual_range, default_value=0)
    altitdue_int = str_to_int(altitdue, default_value=0)
    if (
        lat_float > 90.0
        or lat_float < -90.0
        or lon_float > 180.0
        or lon_float < -180.0
    ):
        logger.debug(
            "Got invalid position (%f, %f) from %s",
            lat_float,
            lon_float,
            self.get_description(),
        )
    self.client.update_ATC_position(
        frequency_int,
        facility_type_int,
        visual_range_int,
        lat_float,
        lon_float,
        altitdue_int,
    )
    self.factory.broadcast(
        make_packet(
            FSDClientCommand.ATC_POSITION + self.client.callsign,
            frequency,
            facility_type,
            visual_range,
            b"%d" % self.client.rating,
            b"%.5f" % lat_float,
            b"%.5f" % lon_float,
            altitdue,
        ),
        check_func=broadcast_position_checker,
        from_client=self.client,
    )
    return True, True
handle_acars async
handle_acars(packet: tuple[bytes, ...]) -> HandleResult

Handle acars request.

Source code in src/pyfsd/protocol/client.py
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
@check_packet(3)
async def handle_acars(
    self,
    packet: tuple[bytes, ...],
) -> HandleResult:
    """Handle acars request."""
    assert self.client is not None

    if packet[2].upper() == b"METAR" and len(packet) > 3:
        metar = await self.factory.metar_manager.fetch(
            packet[3].decode(errors="ignore")
        )

        if metar is None:
            self.send_error(FSDClientError.NOWEATHER, env=packet[3])
            return True, False

        self.send_line(
            make_packet(
                FSDClientCommand.REPLY_ACARS + b"server",
                self.client.callsign,
                b"METAR",
                metar.metar.encode("ascii"),
            ),
        )
        return True, True
    return True, True  # yep
handle_add_client async
handle_add_client(packet: tuple[bytes, ...], is_AA: bool) -> HandleResult

Handle add client request.

Parameters:

Name Type Description Default
packet tuple[bytes, ...]

The packet.

required
is_AA bool

True if this packet is #AA (add atc), else #AP

required
Source code in src/pyfsd/protocol/client.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
@check_packet(7, need_login=False)
# ruff: noqa: PLR0911, PLR0912, C901
async def handle_add_client(
    self,
    packet: tuple[bytes, ...],
    # ruff: noqa: FBT001, N803
    is_AA: bool,
) -> HandleResult:
    """Handle add client request.

    Args:
        packet: The packet.
        is_AA: True if this packet is #AA (add atc), else #AP
    """
    if self.client is not None:
        self.send_error(FSDClientError.REGISTERED)
        return False, False
    if is_AA:
        # controller
        (
            callsign,
            _,
            realname,
            cid,
            password,
            req_rating,
            protocol,
        ) = packet[:7]
        sim_type_int = -1
    else:
        # pilot
        if len(packet) < 8:
            self.send_error(FSDClientError.SYNTAX)
            return False, False
        (
            callsign,
            _,
            cid,
            password,
            req_rating,
            protocol,
            sim_type,
            realname,
        ) = packet[:8]
        sim_type_int = str_to_int(sim_type, default_value=0)
    if len(req_rating) == 0:
        req_rating_int = 1
    else:
        req_rating_int = str_to_int(req_rating, default_value=0)
    protocol_int = str_to_int(protocol, default_value=-1)
    if not is_callsign_valid(callsign):
        self.send_error(FSDClientError.CSINVALID, fatal=True)
        return False, False
    if protocol_int != 9:
        self.send_error(FSDClientError.REVISION, fatal=True)
        return False, False
    try:
        cid_str = cid.decode("utf-8")
        pwd_str = password.decode("utf-8")
    except UnicodeDecodeError:
        self.send_error(FSDClientError.CIDINVALID, env=cid, fatal=True)
        return False, False

    if callsign in self.factory.clients:
        self.send_error(FSDClientError.CSINUSE, fatal=True)
        return True, False

    rating = await self.factory.check_auth(cid_str, pwd_str)
    if rating is None:
        self.send_error(FSDClientError.CIDINVALID, env=cid, fatal=True)
        return True, False
    if rating == 0:
        self.send_error(FSDClientError.CSSUSPEND, fatal=True)
        return True, False
    if rating < req_rating_int:
        self.send_error(
            FSDClientError.LEVEL,
            env=req_rating,
            fatal=True,
        )
        return True, False
    client = Client(
        is_AA,
        callsign,
        req_rating_int,
        cid_str,
        protocol_int,
        realname,
        sim_type_int,
        self.transport,
    )
    self.factory.clients[callsign] = client
    self.client = client
    if is_AA:
        self.factory.broadcast(
            make_packet(
                FSDClientCommand.ADD_ATC + callsign,
                b"SERVER",
                realname,
                cid,
                b"",
                req_rating,
            ),
            from_client=client,
        )
    else:
        self.factory.broadcast(
            # two times of req_rating... FSD does :(
            make_packet(
                FSDClientCommand.ADD_PILOT + callsign,
                b"SERVER",
                cid,
                b"",
                req_rating,
                req_rating,
                b"%d" % sim_type_int,
            ),
            from_client=client,
        )
    self.send_motd()
    await logger.ainfo(f"New client {self.get_description()}.")
    self.factory.plugin_manager.trigger_event_auditers_nonblock(
        "new_client_created", (self,), {}
    )
    return True, True
handle_cast
handle_cast(packet: tuple[bytes, ...], command: FSDClientCommand, *, require_parts: int = 2, multicast_able: bool = True, custom_at_checker: Optional[BroadcastChecker] = None) -> HandleResult

Handle a (multi/uni)cast request.

Parameters:

Name Type Description Default
packet tuple[bytes, ...]

format: (command)(self_callsign):(to_callsign):(multicast content) Note that to_callsign could be multicast sign (*A, *P, etc.) if multicast_able.

required
command FSDClientCommand

The packet's command.

required
require_parts int

How many parts required.

#AA1012:gamecss:happy lunar new year
[0    ] [1    ] [2                 ] => 3 parts
2
multicast_able bool

to_callsign can be multicast sign or not. if not multicast_able and to_callsign is multicast sign, this function will send nothing and exit with False, False.

True
custom_at_checker Optional[BroadcastChecker]

Custom checker used when to_callsign is @.

None
Source code in src/pyfsd/protocol/client.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def handle_cast(
    self,
    packet: tuple[bytes, ...],
    command: FSDClientCommand,
    *,
    require_parts: int = 2,
    multicast_able: bool = True,
    custom_at_checker: Optional[BroadcastChecker] = None,
) -> HandleResult:
    """Handle a (multi/uni)cast request.

    Args:
        packet: format: `(command)(self_callsign):(to_callsign):(multicast content)`
            Note that to_callsign could be multicast sign (`*A`, `*P`, etc.)
            if multicast_able.
        command: The packet's command.
        require_parts: How many parts required.

                #AA1012:gamecss:happy lunar new year
                [0    ] [1    ] [2                 ] => 3 parts

        multicast_able: to_callsign can be multicast sign or not.
            if not multicast_able and to_callsign is multicast sign, this function
            will send nothing and exit with False, False.
        custom_at_checker: Custom checker used when to_callsign is `@`.
    """
    # Check common things first
    packet_len: int = len(packet)
    if packet_len < require_parts:
        self.send_error(FSDClientError.SYNTAX)
        return False, False
    if self.client is None:
        return False, False
    if self.client.callsign != packet[0]:
        self.send_error(FSDClientError.SRCINVALID, env=packet[0])
        return False, False

    to_callsign = packet[1]
    # We'll only check if it's a multicast sign, so decode is acceptable
    to_callsign_str = to_callsign.decode("ascii", "replace")
    # Prepare packet to be sent.
    to_packet = make_packet(
        command + self.client.callsign,
        to_callsign,
        # ruff: noqa: PLR2004
        *packet[2:] if packet_len > 2 else [b""],
    )

    if is_multicast(to_callsign_str):
        if multicast_able:
            return True, self.multicast(
                to_callsign_str,
                to_packet,
                custom_at_checker=custom_at_checker,
            )
        # Not allowed to multicast, so packet_ok is False
        return False, False
    return True, self.factory.send_to(
        to_callsign,
        to_packet,
    )
handle_client_query
handle_client_query(packet: tuple[bytes, ...]) -> HandleResult

Handle $CQ request.

Source code in src/pyfsd/protocol/client.py
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
@check_packet(3)
def handle_client_query(self, packet: tuple[bytes, ...]) -> HandleResult:
    """Handle $CQ request."""
    # Behavior may differ from FSD.
    assert self.client is not None
    if packet[1].upper() != b"SERVER":
        # Multicast a message.
        return self.handle_cast(
            packet,
            FSDClientCommand.CLIENT_QUERY,
            require_parts=3,
            multicast_able=True,
        )
    if packet[2].lower() == b"fp":
        # Get flight plan.
        if len(packet) < 4:
            self.send_error(FSDClientError.SYNTAX)
            return True, False
        callsign = packet[3]
        if (client := self.factory.clients.get(callsign)) is None:
            self.send_error(FSDClientError.NOSUCHCS, env=callsign)
            return True, False
        if (plan := client.flight_plan) is None:
            self.send_error(FSDClientError.NOFP)
            return True, False
        if not self.client.is_controller:
            return False, False
        self.send_line(
            make_packet(
                FSDClientCommand.PLAN + callsign,
                self.client.callsign,
                plan.type,
                plan.aircraft,
                b"%d" % plan.tascruise,
                plan.dep_airport,
                b"%d" % plan.dep_time,
                b"%d" % plan.act_dep_time,
                plan.alt,
                plan.dest_airport,
                b"%d" % plan.hrs_enroute,
                b"%d" % plan.min_enroute,
                b"%d" % plan.hrs_fuel,
                b"%d" % plan.min_fuel,
                plan.alt_airport,
                plan.remarks,
                plan.route,
            ),
        )
    elif packet[2].upper() == b"RN":
        # TODO: Implementation maybe incorrect
        # Get realname?
        callsign = packet[1]
        if (client := self.factory.clients.get(callsign)) is not None:
            self.send_line(
                make_packet(
                    FSDClientCommand.CLIENT_RESPONSE + callsign,
                    self.client.callsign,
                    b"RN",
                    client.realname,
                    b"USER",
                    b"%d" % client.rating,
                ),
            )
            return True, True
        return True, False
    return True, True
handle_kill
handle_kill(packet: tuple[bytes, ...]) -> HandleResult

Handle kill request.

Source code in src/pyfsd/protocol/client.py
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
@check_packet(3, check_callsign=False)
def handle_kill(self, packet: tuple[bytes, ...]) -> HandleResult:
    """Handle kill request."""
    assert self.client is not None
    _, callsign_kill, reason = packet[:3]
    if callsign_kill not in self.factory.clients:
        self.send_error(FSDClientError.NOSUCHCS, env=callsign_kill)
        return True, False
    if self.client.rating < 11:
        self.send_line(
            make_packet(
                FSDClientCommand.MESSAGE + b"server",
                self.client.callsign,
                b"You are not allowed to kill users!",
            ),
        )
        return True, False
    self.send_line(
        make_packet(
            FSDClientCommand.MESSAGE + b"server",
            self.client.callsign,
            b"Attempting to kill %s" % callsign_kill,
        ),
    )
    self.factory.send_to(
        callsign_kill,
        make_packet(FSDClientCommand.KILL + b"SERVER", callsign_kill, reason),
    )
    client_to_kill = self.factory.clients[callsign_kill]
    transport_to_kill = client_to_kill.transport

    if isinstance(
        protocol_to_kill := transport_to_kill.get_protocol(), ClientProtocol
    ):
        description = protocol_to_kill.get_description()
        kill_it = protocol_to_kill.kill_after_1sec
    else:
        ip_kill = self.factory.clients[callsign_kill].transport.get_extra_info(
            "peername"
        )[0]
        description = f"{ip_kill}({callsign_kill.decode(errors='replace')})"

        def kill_it() -> None:
            async def killer() -> None:
                await asleep(1)
                transport_to_kill.close()

            mustdone_task_keeper.add(create_task(killer()))

    logger.info(
        "Kicking %s: killed by %s",
        description,
        self.client.callsign.decode(errors="replace"),
    )
    kill_it()
    return True, True
handle_line async
handle_line(byte_line: bytes) -> HandleResult

Handle a line.

Source code in src/pyfsd/protocol/client.py
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
async def handle_line(
    self,
    byte_line: bytes,
) -> HandleResult:
    """Handle a line."""
    if len(byte_line) == 0:
        return True, True
    command, packet = break_packet(byte_line, CLIENT_USED_COMMAND)
    if command is None:
        self.send_error(FSDClientError.SYNTAX)
        return False, False
    if command is FSDClientCommand.ADD_ATC or command is FSDClientCommand.ADD_PILOT:
        return await self.handle_add_client(
            packet, command is FSDClientCommand.ADD_ATC
        )
    if command is FSDClientCommand.PLAN:
        return await self.handle_plan(packet)
    if (
        command is FSDClientCommand.REMOVE_ATC
        or command is FSDClientCommand.REMOVE_PILOT
    ):
        return await self.handle_remove_client(packet)
    if command is FSDClientCommand.PILOT_POSITION:
        return await self.handle_pilot_position_update(packet)
    if command is FSDClientCommand.ATC_POSITION:
        return await self.handle_ATC_position_update(packet)
    if command is FSDClientCommand.PONG:
        return self.handle_cast(
            packet,
            command,
            require_parts=2,
            multicast_able=True,
        )
    if command is FSDClientCommand.PING:
        if len(packet) > 1 and packet[1].lower() == b"server":
            return await self.handle_server_ping(packet)
        return self.handle_cast(
            packet,
            command,
            require_parts=2,
            multicast_able=True,
        )

    if command is FSDClientCommand.MESSAGE:
        return self.handle_cast(
            packet,
            command=command,
            require_parts=3,
            multicast_able=True,
            custom_at_checker=broadcast_message_checker,
        )
    if (
        command is FSDClientCommand.REQUEST_HANDOFF
        or command is FSDClientCommand.ACCEPT_HANDOFF
    ):
        return self.handle_cast(
            packet,
            command,
            require_parts=3,
            multicast_able=False,
        )
    if (
        command is FSDClientCommand.SQUAWK_BOX
        or command is FSDClientCommand.PRO_CONTROLLER
    ):
        return self.handle_cast(
            packet,
            command,
            require_parts=2,
            multicast_able=False,
        )
    if command is FSDClientCommand.WEATHER:
        return await self.handle_weather(packet)
    if command is FSDClientCommand.REQUEST_COMM:
        return self.handle_cast(
            packet,
            command,
            require_parts=2,
            multicast_able=False,
        )
    if command is FSDClientCommand.REPLY_COMM:
        return self.handle_cast(
            packet,
            command,
            require_parts=3,
            multicast_able=False,
        )
    if command is FSDClientCommand.REQUEST_ACARS:
        return await self.handle_acars(packet)
    if command is FSDClientCommand.CLIENT_RESPONSE:
        return self.handle_cast(
            packet,
            command,
            require_parts=4,
            multicast_able=False,
        )
    if command is FSDClientCommand.CLIENT_QUERY:
        return await self.handle_client_query(packet)
    if command is FSDClientCommand.KILL:
        return await self.handle_kill(packet)
    self.send_error(FSDClientError.SYNTAX)
    return False, False
handle_line_worker_func async
handle_line_worker_func() -> None

Worker processes line.

Source code in src/pyfsd/protocol/client.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
async def handle_line_worker_func(self) -> None:
    """Worker processes line."""
    result: "PyFSDHandledEventResult | PluginHandledEventResult"  # noqa: UP037

    while True:
        line = await self.worker_queue.get()

        # First try to let plugins to process
        plugin_result = await self.factory.plugin_manager.trigger_event_handlers(
            "line_received_from_client",
            (self, line),
            {},
        )
        if plugin_result is None:  # Not handled by plugin
            packet_ok, has_result = await self.handle_line(line)
            result = cast(
                "PyFSDHandledEventResult",
                {
                    "handled_by_plugin": False,
                    "success": packet_ok and has_result,
                    "packet": line,
                    "packet_ok": packet_ok,
                    "has_result": has_result,
                },
            )
        else:
            result = plugin_result

        self.factory.plugin_manager.trigger_event_auditers_nonblock(
            "line_received_from_client",
            (self, line, result),
            {},
        )
handle_pilot_position_update
handle_pilot_position_update(packet: tuple[bytes, ...]) -> HandleResult

Handle pilot position update request.

Source code in src/pyfsd/protocol/client.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
@check_packet(10, callsign_position=1)
def handle_pilot_position_update(
    self,
    packet: tuple[bytes, ...],
) -> HandleResult:
    """Handle pilot position update request."""
    assert self.client is not None
    (
        mode,
        _,
        transponder,
        _,
        lat,
        lon,
        altitdue,
        groundspeed,
        pbh,
        flags,
    ) = packet[:10]
    transponder_int = str_to_int(transponder, default_value=0)
    lat_float = str_to_float(lat, default_value=0.0)
    lon_float = str_to_float(lon, default_value=0.0)
    altitdue_int = str_to_int(altitdue, default_value=0)
    pbh_int = str_to_int(pbh, default_value=0) & 0xFFFFFFFF  # Simulate unsigned
    groundspeed_int = str_to_int(groundspeed, default_value=0)
    flags_int = str_to_int(flags, default_value=0)
    if (
        lat_float > 90.0
        or lat_float < -90.0
        or lon_float > 180.0
        or lon_float < -180.0
    ):
        logger.debug(
            "Got invalid position (%f, %f) from %s",
            lat_float,
            lon_float,
            self.get_description(),
        )
    self.client.update_pilot_position(
        mode,
        transponder_int,
        lat_float,
        lon_float,
        altitdue_int,
        groundspeed_int,
        pbh_int,
        flags_int,
    )
    self.factory.broadcast(
        make_packet(
            FSDClientCommand.PILOT_POSITION + mode,
            self.client.callsign,
            transponder,
            b"%d" % self.client.rating,
            b"%.5f" % lat_float,
            b"%.5f" % lon_float,
            altitdue,
            groundspeed,
            pbh,
            flags,
        ),
        check_func=broadcast_position_checker,
        from_client=self.client,
    )
    return True, True
handle_plan
handle_plan(packet: tuple[bytes, ...]) -> HandleResult

Handle plan update request.

Source code in src/pyfsd/protocol/client.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
@check_packet(17)
def handle_plan(self, packet: tuple[bytes, ...]) -> HandleResult:
    """Handle plan update request."""
    assert self.client is not None
    (
        plan_type,
        aircraft,
        tascruise,
        dep_airport,
        dep_time,
        act_dep_time,
        alt,
        dest_airport,
        hrs_enroute,
        min_enroute,
        hrs_fuel,
        min_fuel,
        alt_airport,
        remarks,
        route,
    ) = packet[2:17]
    plan_type = plan_type[0:1]
    tascruise_int = str_to_int(tascruise, default_value=0)
    dep_time_int = str_to_int(dep_time, default_value=0)
    act_dep_time_int = str_to_int(act_dep_time, default_value=0)
    hrs_enroute_int = str_to_int(hrs_enroute, default_value=0)
    min_enroute_int = str_to_int(min_enroute, default_value=0)
    hrs_fuel_int = str_to_int(hrs_fuel, default_value=0)
    min_fuel_int = str_to_int(min_fuel, default_value=0)
    self.client.update_plan(
        plan_type,
        aircraft,
        tascruise_int,
        dep_airport,
        dep_time_int,
        act_dep_time_int,
        alt,
        dest_airport,
        hrs_enroute_int,
        min_enroute_int,
        hrs_fuel_int,
        min_fuel_int,
        alt_airport,
        remarks,
        route,
    )
    self.factory.broadcast(
        # Another FSD quirk: truncated if plan_type is empty
        make_packet(
            FSDClientCommand.PLAN + self.client.callsign,
            b"*A",
            b"",
        )
        if len(plan_type) == 0
        else make_packet(
            FSDClientCommand.PLAN + self.client.callsign,
            b"*A",
            plan_type,
            aircraft,
            tascruise,
            dep_airport,
            dep_time,
            act_dep_time,
            alt,
            dest_airport,
            hrs_enroute,
            min_enroute,
            hrs_fuel,
            min_fuel,
            alt_airport,
            remarks,
            route,
        ),
        check_func=all_ATC_checker,
        from_client=self.client,
    )
    return True, True
handle_remove_client
handle_remove_client(_: tuple[bytes, ...]) -> HandleResult

Handle remove client request.

Source code in src/pyfsd/protocol/client.py
579
580
581
582
583
584
585
@check_packet(1)
def handle_remove_client(self, _: tuple[bytes, ...]) -> HandleResult:
    """Handle remove client request."""
    assert self.client is not None
    logger.info("Kicking %s: client asked to remove", self.get_description())
    self.kill_after_1sec()
    return True, True
handle_server_ping
handle_server_ping(packet: tuple[bytes, ...]) -> HandleResult

Handle server ping request.

Source code in src/pyfsd/protocol/client.py
789
790
791
792
793
794
795
796
797
798
799
800
@check_packet(2)
def handle_server_ping(self, packet: tuple[bytes, ...]) -> HandleResult:
    """Handle server ping request."""
    assert self.client is not None
    self.send_line(
        make_packet(
            FSDClientCommand.PONG + b"server",
            self.client.callsign,
            *packet[2:] if len(packet) > 2 else [b""],
        ),
    )
    return True, True
handle_weather async
handle_weather(packet: tuple[bytes, ...]) -> HandleResult

Handle weather request.

Source code in src/pyfsd/protocol/client.py
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
@check_packet(3)
async def handle_weather(
    self,
    packet: tuple[bytes, ...],
) -> HandleResult:
    """Handle weather request."""
    assert self.client is not None
    metar = await self.factory.metar_manager.fetch(
        packet[2].decode("ascii", "ignore")
    )
    if not metar:
        self.send_error(FSDClientError.NOWEATHER, env=packet[2])
        return True, False
    profile = metar.clone()
    profile.fix(self.client.position)

    self.send_lines(
        make_packet(
            FSDClientCommand.TEMP_DATA + b"server",
            self.client.callsign,
            *(b"%d:%d" % (temp.ceiling, temp.temp) for temp in profile.temps),
            b"%d" % profile.barometer,
        ),
        make_packet(
            FSDClientCommand.WIND_DATA + b"server",
            self.client.callsign,
            *(
                b"%d:%d:%d:%d:%d:%d"
                % (
                    wind.ceiling,
                    wind.floor,
                    wind.direction,
                    wind.speed,
                    wind.gusting,
                    wind.turbulence,
                )
                for wind in profile.winds
            ),
        ),
        make_packet(
            FSDClientCommand.CLOUD_DATA + b"server",
            self.client.callsign,
            *(
                b"%d:%d:%d:%d:%d"
                % (
                    cloud.ceiling,
                    cloud.floor,
                    cloud.coverage,
                    cloud.icing,
                    cloud.turbulence,
                )
                for cloud in (*profile.clouds, profile.tstorm)
            ),
            b"%.2f" % profile.visibility,
        ),
    )

    return True, True
kill_after_1sec
kill_after_1sec() -> None

Kill this client after 1 second by kill_func.

Source code in src/pyfsd/protocol/client.py
269
270
271
272
273
274
275
276
def kill_after_1sec(self) -> None:
    """Kill this client after 1 second by kill_func."""

    async def kill() -> None:
        await asleep(1)
        self.transport.close()

    mustdone_task_keeper.add(create_task(kill()))
line_received
line_received(line: bytes) -> None

Handle a line.

Source code in src/pyfsd/protocol/client.py
217
218
219
220
def line_received(self, line: bytes) -> None:
    """Handle a line."""
    self.reset_timeout_killer()
    self.worker_queue.put_nowait(line)
multicast
multicast(to_limiter: str, *lines: bytes, custom_at_checker: Optional[BroadcastChecker] = None) -> bool

Multicast lines.

Parameters:

Name Type Description Default
to_limiter str

Dest limiter. * means every client, *A means every ATC, *P means every pilots, @ means client in a range (see pyfsd.define.broadcast.at_checker)

required
lines bytes

lines to be sent.

()
custom_at_checker Optional[BroadcastChecker]

Custom checker used when to_limiter is @.

None

Returns:

Type Description
bool

Lines sent to at least client or not.

Raises:

Type Description
NotImplementedError

When an unsupported to_limiter specified.

Source code in src/pyfsd/protocol/client.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def multicast(
    self,
    to_limiter: str,
    *lines: bytes,
    custom_at_checker: Optional[BroadcastChecker] = None,
) -> bool:
    """Multicast lines.

    Args:
        to_limiter: Dest limiter. `*` means every client, `*A` means every ATC, `*P`
            means every pilots, `@` means client in a range (see
            [pyfsd.define.broadcast.at_checker][])
        lines: lines to be sent.
        custom_at_checker: Custom checker used when to_limiter is `@`.

    Returns:
        Lines sent to at least client or not.

    Raises:
        NotImplementedError: When an unsupported to_limiter specified.
    """
    if self.client is None:
        raise RuntimeError("No client registered.")
    if to_limiter == "*":
        # Default checker is lambda: True, so send to all client
        return self.factory.broadcast(*lines, from_client=self.client)
    if to_limiter == "*A":
        return self.factory.broadcast(
            *lines,
            check_func=all_ATC_checker,
            from_client=self.client,
        )
    if to_limiter == "*P":
        return self.factory.broadcast(
            *lines,
            check_func=all_pilot_checker,
            from_client=self.client,
        )
    if to_limiter.startswith("@"):
        return self.factory.broadcast(
            *lines,
            from_client=self.client,
            check_func=custom_at_checker
            if custom_at_checker is not None
            else at_checker,
        )
    raise NotImplementedError
reset_timeout_killer
reset_timeout_killer() -> None

Reset timeout killer.

Source code in src/pyfsd/protocol/client.py
288
289
290
291
292
293
294
295
296
297
298
299
def reset_timeout_killer(self) -> None:
    """Reset timeout killer."""

    async def timeout_killer() -> None:
        await asleep(500)
        self.send_line(b"# Timeout")
        await logger.ainfo(f"Kicking {self.get_description()}: timeout")
        self.kill_after_1sec()

    if self.timeout_killer_task:
        self.timeout_killer_task.cancel()
    self.timeout_killer_task = create_task(timeout_killer())
send_error
send_error(errno: FSDClientError, *, env: bytes = b'', fatal: bool = False) -> None

Send an error to client.

$ERserver:(callsign):(errno):(env):error_text

Parameters:

Name Type Description Default
errno FSDClientError

The error to be sent.

required
env bytes

The error env.

b''
fatal bool

Disconnect after the error is sent or not.

False
Source code in src/pyfsd/protocol/client.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def send_error(
    self, errno: FSDClientError, *, env: bytes = b"", fatal: bool = False
) -> None:
    """Send an error to client.

    $ERserver:(callsign):(errno):(env):error_text

    Args:
        errno: The error to be sent.
        env: The error env.
        fatal: Disconnect after the error is sent or not.
    """
    error_string = str(errno)
    self.send_lines(
        make_packet(
            FSDClientCommand.ERROR + b"server",
            self.client.callsign if self.client is not None else b"unknown",
            f"{int(errno):03d}".encode(),
            env,
            error_string.encode("ascii"),
        ),
    )
    if fatal:
        logger.info("Kicking %s: %s", self.get_description(), error_string)
        self.kill_after_1sec()
send_motd
send_motd() -> None

Send motd to client.

Source code in src/pyfsd/protocol/client.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def send_motd(self) -> None:
    """Send motd to client."""
    if not self.client:
        raise RuntimeError("No client registered.")
    self.send_lines(
        b"#TMserver:%s:PyFSD %s" % (self.client.callsign, version),
        *(
            make_packet(
                FSDClientCommand.MESSAGE + b"server",
                self.client.callsign,
                line,
            )
            for line in self.factory.motd
        ),
    )

check_packet

check_packet(require_parts: int, callsign_position: int = 0, *, check_callsign: bool = True, need_login: bool = True) -> Callable[[Callable[Concatenate[_T_ClientProtocol, tuple[bytes, ...], P], Union[Awaitable[HandleResult], HandleResult]]], Callable[Concatenate[_T_ClientProtocol, tuple[bytes, ...], P], Awaitable[HandleResult]]]

Create a decorator to auto check packet format and ensure awaitable.

Designed for ClientProtocol's handlers.

Parameters:

Name Type Description Default
require_parts int

How many parts required.

#AA1012:gamecss:mentally broken
[0    ] [1    ] [2            ] => 3 parts
required
callsign_position int

Which part contains callsign, used when (need_login and check_callsign). For example:

#AA1012:gamecss:mentally broken
[0    ] [1    ] [2            ]

Here parts[0] is the callsign, so callsign_position is 0.

0
need_login bool

Need self.client is not None (logined) or not.

True
check_callsign bool

Check packet[callsign_position] == self.client.callsign or not.

True
Source code in src/pyfsd/protocol/client.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def check_packet(
    require_parts: int,
    callsign_position: int = 0,
    *,
    check_callsign: bool = True,
    need_login: bool = True,
) -> Callable[
    [
        Callable[
            Concatenate[_T_ClientProtocol, tuple[bytes, ...], P],
            Union[Awaitable[HandleResult], HandleResult],
        ]
    ],
    Callable[
        Concatenate[_T_ClientProtocol, tuple[bytes, ...], P], Awaitable[HandleResult]
    ],
]:
    """Create a decorator to auto check packet format and ensure awaitable.

    Designed for ClientProtocol's handlers.

    Args:
        require_parts: How many parts required.

                #AA1012:gamecss:mentally broken
                [0    ] [1    ] [2            ] => 3 parts
        callsign_position: Which part contains callsign, used when (need_login and
            check_callsign). For example:

                #AA1012:gamecss:mentally broken
                [0    ] [1    ] [2            ]

            Here parts[0] is the callsign, so `callsign_position` is 0.
        need_login: Need self.client is not None (logined) or not.
        check_callsign: Check packet[callsign_position] == self.client.callsign or not.
    """

    def decorator(
        func: Callable[
            Concatenate[_T_ClientProtocol, tuple[bytes, ...], P],
            Union[Awaitable[HandleResult], HandleResult],
        ],
    ) -> Callable[
        Concatenate[_T_ClientProtocol, tuple[bytes, ...], P],
        Awaitable[HandleResult],
    ]:
        async def realfunc(
            self: _T_ClientProtocol,
            packet: tuple[bytes, ...],
            /,
            *args: P.args,
            **kwargs: P.kwargs,
        ) -> HandleResult:
            if len(packet) < require_parts:
                self.send_error(FSDClientError.SYNTAX)
                return (False, False)
            if need_login:
                if self.client is None:
                    return (False, False)
                if check_callsign and self.client.callsign != packet[callsign_position]:
                    self.send_error(FSDClientError.SRCINVALID, env=packet[0])
                    return (False, False)
            result = func(self, packet, *args, **kwargs)
            if isawaitable(result):
                return await cast("Awaitable[HandleResult]", result)
            return cast("HandleResult", result)

        return realfunc

    return decorator

setup_logger

Logger configurer.

LoggerConfig

Bases: TypedDict

Type of value of logging.config.dictConfig.loggers.

PyFSDLoggerConfig

Bases: TypedDict

PyFSD logger config.

Attributes:

Name Type Description
handlers dict[str, Union[dict, HandlerConfig]]

See dictConfig.

loggers dict[str, Union[dict, HandlerConfig]]

See dictConfig.

include_extra NotRequired[bool]

Print log's extra or not.

extract_record NotRequired[bool]

Extract thread and process names and add them to the event dict.

TimeFormatConfig

Bases: TypedDict

Config of time formatter.

Attributes document comes from structlog.processors.TimeStamper.

Attributes:

Name Type Description
fmt Optional[str]

strftime format string, or "iso" for ISO 8601, or "timestamp" for a UNIX timestamp.

utc bool

Whether timestamp should be in UTC or local time.

key str

Target key in event_dict for added timestamps.

make_filtering_stdlib_bound_logger

make_filtering_stdlib_bound_logger(min_level: int) -> type[BoundLogger]

Create a new BoundLogger that only logs min_level or higher.

Source code in src/pyfsd/setup_logger.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def make_filtering_stdlib_bound_logger(min_level: int) -> type[stdlib.BoundLogger]:
    """Create a new BoundLogger that only logs min_level or higher."""
    if min_level == NOTSET:
        return stdlib.BoundLogger

    def do_nothing(*_: object, **__: object) -> None:
        return None

    async def async_do_nothing(*_: object, **__: object) -> None:
        return None

    class BoundLogger(stdlib.BoundLogger):
        def log(
            self,
            level: int,
            event: Union[str, None] = None,
            *args: object,
            **kw: object,
        ) -> object:
            if level < min_level:
                return None
            return super().log(level, event, *args, **kw)

        async def alog(  # codespell:ignore alog
            self, level: object, event: str, *args: object, **kw: object
        ) -> None:
            if isinstance(level, int) and level < min_level:
                return None
            return await super().alog(  # codespell:ignore alog
                level, event, *args, **kw
            )

        if min_level > CRITICAL:  # how
            critical = do_nothing
            fatal = do_nothing
            acritical = async_do_nothing
            afatal = async_do_nothing
        elif min_level > ERROR:
            error = do_nothing
            exception = do_nothing
            aerror = async_do_nothing
            aexception = async_do_nothing
        elif min_level > WARNING:
            warning = do_nothing
            warn = do_nothing
            awarning = async_do_nothing
        elif min_level > INFO:
            info = do_nothing
            ainfo = async_do_nothing
        elif min_level > DEBUG:
            debug = do_nothing
            adebug = async_do_nothing

    return BoundLogger

setup_logger

setup_logger(config: PyFSDLoggerConfig, *, finalize: bool = False) -> None

Setup logger with config.

Parameters:

Name Type Description Default
config PyFSDLoggerConfig

logger config.

required
finalize bool

make loggers fit finalizing phrase of cpython.

False
Source code in src/pyfsd/setup_logger.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def setup_logger(config: PyFSDLoggerConfig, *, finalize: bool = False) -> None:
    """Setup logger with config.

    Args:
        config: logger config.
        finalize: make loggers fit finalizing phrase of cpython.
    """

    def append_funcname_lieneno(_: object, __: str, event: EventDict) -> EventDict:
        event["logger_name"] = f"{event.pop('func_name')}:{event.pop('lineno')}"
        return event

    reset_defaults()
    include_extra, extract_record, time = (
        config.get("include_extra", False),
        config.get("extract_record", False),
        config.get(
            "time", {"fmt": "%Y-%m-%d %H:%M:%S", "utc": False, "key": "timestamp"}
        ),
    )
    if finalize:
        # For some reason strftime() won't work when cpython is finalizing
        time["fmt"] = "ISO"
    elif time["fmt"] == "timestamp":
        time["fmt"] = None
    timestamper = processors.TimeStamper(**time)
    pre_chain = [
        # Add the log level and a timestamp to the event_dict if the log entry
        # is not from structlog.
        stdlib.add_log_level,
        stdlib.add_logger_name,
        timestamper,
    ]

    if include_extra:
        pre_chain.append(stdlib.ExtraAdder())

    def suppress_extra(_: object, __: str, event_dict: dict) -> dict:
        """Remove log's extra."""
        event_dict.pop("extra", None)
        return event_dict

    def extract_from_record(_: object, __: str, event_dict: dict) -> dict:
        """Extract thread and process names and add them to the event dict."""
        record = event_dict["_record"]
        event_dict["thread_name"] = record.threadName
        event_dict["process_name"] = record.processName
        return event_dict

    extra_dealers = []
    if not include_extra:
        extra_dealers.append(suppress_extra)
    elif extract_record:
        extra_dealers.append(extract_from_record)

    dictConfig(
        {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "plain": {
                    "()": stdlib.ProcessorFormatter,
                    "processors": [
                        *extra_dealers,
                        stdlib.ProcessorFormatter.remove_processors_meta,
                        dev.ConsoleRenderer(
                            colors=False,
                            exception_formatter=dev.better_traceback
                            if not finalize
                            else dev.plain_traceback,
                        ),
                    ],
                    "foreign_pre_chain": pre_chain,
                },
                "json": {
                    "()": stdlib.ProcessorFormatter,
                    "processors": [
                        *extra_dealers,
                        stdlib.ProcessorFormatter.remove_processors_meta,
                        processors.JSONRenderer(),
                    ],
                    "foreign_pre_chain": pre_chain,
                },
                "colored": {
                    "()": stdlib.ProcessorFormatter,
                    "processors": [
                        *extra_dealers,
                        stdlib.ProcessorFormatter.remove_processors_meta,
                        dev.ConsoleRenderer(
                            colors=True,
                            exception_formatter=dev.better_traceback
                            if not finalize
                            else dev.plain_traceback,
                        ),
                    ],
                    "foreign_pre_chain": pre_chain,
                },
            },
            "handlers": config["handlers"],  # type: ignore[typeddict-item]
            "loggers": {"": config["logger"]},  # type: ignore[dict-item]
        }
    )
    configure(
        processors=[
            processors.CallsiteParameterAdder(
                [
                    processors.CallsiteParameter.FUNC_NAME,
                    processors.CallsiteParameter.LINENO,
                ]
            ),
            append_funcname_lieneno,
            stdlib.add_log_level,
            stdlib.add_logger_name,
            stdlib.PositionalArgumentsFormatter(),
            timestamper,
            processors.StackInfoRenderer(),
            stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=stdlib.LoggerFactory(),
        wrapper_class=make_filtering_stdlib_bound_logger(
            {
                "NOTSET": 0,
                "DEBUG": 10,
                "INFO": 20,
                "WARNING": 30,
                "ERROR": 40,
                "CRITICAL": 50,
            }[config["logger"]["level"]]
        ),
        cache_logger_on_first_use=True,
    )

utils

PyFSD CLI utilities.

import_users

A tool used to convert users database in other format into PyFSD's format.

main

main() -> None

Main function of the tool.

Source code in src/pyfsd/utils/import_users/__init__.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def main() -> None:
    """Main function of the tool."""
    parser = ArgumentParser(
        description="convert users database in other format into PyFSD's format"
    )
    parser.add_argument("filename", help="filename of the original file")
    parser.add_argument(
        "format",
        help="format of the original file",
        choices=["cfcsim", "pyfsd", "fsd"],
    )
    parser.add_argument(
        "-v", "--verbose", help="increase output verbosity", action="store_true"
    )
    parser.add_argument(
        "-c", "--config-path", help="path to the config of PyFSD", default="pyfsd.toml"
    )
    args = parser.parse_args()

    with open(args.config_path, "rb") as config_file:
        config = tomllib.load(config_file)

    assert_dict(
        config,
        {
            "pyfsd": {
                "database": {"url": str},
            }
        },
        allow_extra_keys=True,
    )

    reader = formats[args.format]
    users = reader.read_all(args.filename)
    db_engine = create_engine(config["pyfsd"]["database"]["url"])
    hasher = PasswordHasher()
    import_users = 0

    with db_engine.connect() as conn:
        for user in users:
            if args.verbose:
                # ruff: noqa: T201
                print("Converting", *user)
            try:
                conn.execute(
                    users_table.insert().values(
                        user
                        if reader.argon2_hashed
                        else (user[0], hasher.hash(user[1]), user[2])
                    )
                )
            except IntegrityError:
                # ruff: noqa: T201
                print("Callsign already exist:", user[0])
            else:
                import_users += 1
        conn.commit()

    # ruff: noqa: T201
    print(f"Done. ({import_users})")

__main__

Entrypoint of pyfsd.utils.import_users.

formats

User database formats.

Attributes:

Name Type Description
User

description of a user, (callsign, password, rating)

formats dict[str, Format]

All registered formats.

CFCSIMFSDFormat

User database format of cfcsim modified fsd.

read_all staticmethod
read_all(filename: str) -> tuple[User, ...]

Read all users.

Source code in src/pyfsd/utils/import_users/formats.py
54
55
56
57
58
59
60
61
62
@staticmethod
def read_all(filename: str) -> tuple[User, ...]:
    """Read all users."""
    db = connect(filename)
    cur = db.cursor()
    result = tuple(cur.execute("SELECT callsign, password, level FROM cert;"))
    cur.close()
    db.close()
    return result
FSDTextFormat

User database format of original fsd.

read_all staticmethod
read_all(filename: str) -> tuple[User, ...]

Read all users.

Source code in src/pyfsd/utils/import_users/formats.py
70
71
72
73
74
75
76
77
78
79
@staticmethod
def read_all(filename: str) -> tuple[User, ...]:
    """Read all users."""
    users = []
    with open(filename) as file:
        for row in reader(file, delimiter=" "):
            if row[0].startswith(";"):
                continue
            users.append((row[0], row[1], int(row[2])))
    return tuple(users)
Format

Bases: Protocol

Format of user database.

Attributes:

Name Type Description
argon2_hashed bool

Password hashed by argon2 or not.

read_all
read_all(filename: str) -> tuple[User, ...]

Read all users from database.

Parameters:

Name Type Description Default
filename str

The filename of the database.

required
Source code in src/pyfsd/utils/import_users/formats.py
24
25
26
27
28
29
30
def read_all(self, filename: str) -> tuple[User, ...]:
    """Read all users from database.

    Args:
        filename: The filename of the database.
    """
    raise NotImplementedError
PyFSDFormat

User database format of PyFSD.

read_all staticmethod
read_all(filename: str) -> tuple[User, ...]

Read all users.

Source code in src/pyfsd/utils/import_users/formats.py
38
39
40
41
42
43
44
45
46
@staticmethod
def read_all(filename: str) -> tuple[User, ...]:
    """Read all users."""
    db = connect(filename)
    cur = db.cursor()
    result = tuple(cur.execute("SELECT callsign, password, rating FROM users;"))
    cur.close()
    db.close()
    return result