"""
Core module
===========
This module provides functions to compute and visualize data delivery scenarios.
Instrument context
------------------
The :class:`InstrumentContext` class defines the constraints from the
instrument, such as the segment duration, the session duration, the data
generation rate, etc.
The default values are based on ESA-LISA-EST-MIS-TN-0003 - LISA Science Data
Description and Budget (Iss1Rev0), assuming no data loss. You can change these
values to match your specific context.
>>> context = InstrumentContext(packet_loss_rate=0.1)
Derived parameters, such as the volume of live data to download per telemetry
session, the necessary download rate to send all live data in a session, the
time to download all archived data, etc., are computed from the context. You can
print a summary of these parameters.
>>> context.print_derived_parameters()
Volume of archived data to download per telemetry session: 6423.926 Mbits
Necessary download rate to send all live data in a session: 223.1 kbps
Volume of priority-1 data in one segement: 5316.3 kbits
Download time for one priority-1 data segment: 23.8 s
Number of segments that can be downloaded between priority live segments: 11
Time to download all 192 archived data: 1.5 h
.. autoclass:: InstrumentContext
:members:
Telemetry strategy
------------------
We currently implement a single telemetry strategy, the
:class:`DefaultTelemetryStrategy`. In this strategy, we download the live data
segments as soon as they are available, and we download the archived data
segments in the idle time between two live segments.
Packets that are lost during the download are retransmitted at the end of the
download queue between live segments, once all archived data has been
downloaded.
.. autoclass:: DefaultTelemetryStrategy
Telemetry simulation
--------------------
You can simulate a telemetry session using the :func:`simulate_telemetry`
function. You can provide a custom telemetry strategy, an instrument context, and
a random seed for data packet loss simulation.
>>> strategy = DefaultTelemetryStrategy()
>>> simulation = simulate_telemetry(strategy)
>>> len(simulation.received_packets)
287
>>> simulation.lost_packets
[]
You can also use this function to plot the received segments and the accumulated
data at each data packet reception time.
>>> simulation.plot_received_segments()
>>> simulation.plot_accumulated_data()
.. autoclass:: TelemetrySimulation
:members:
Types
-----
.. autoclass:: Time
.. autoclass:: DataPacket
.. autoclass:: DataSegment
:members:
.. autofunction:: merge_segements
"""
from __future__ import annotations
import math
from functools import cached_property
from typing import TypeAlias
import attrs
import matplotlib.pyplot as plt
import numpy as np
from lisaconstants import c
from numpy.typing import ArrayLike
Time: TypeAlias = float
"""Time [s].
The time scale is here given by the user, and depends on the context.
"""
[docs]
@attrs.frozen(kw_only=True)
class InstrumentContext:
"""Defines the contrainsts from the instrument.
Default values are based on ESA-LISA-EST-MIS-TN-0003 - LISA Science Data
Description and Budget (Iss1Rev0), denoted as [AD1].
"""
segment_duration: float = 5 * 60.0
"""Duration of each segment [s].
A segment is the smallest unit of data that can be delivered.
"""
session_duration: float = 8 * 3600.0
"""Duration of a telemetry session [s].
A telemetry session is a commmunication session between the LISA instrument
and the ground station. During a telemetry session, commands are sent to the
instrument and data segements are received.
"""
session_interval: float = 24 * 3600.0
"""Interval between two telemetry sessions [s].
The interval between two telemetry sessions is the time between the
beginning of two consecutive telemetry sessions.
"""
sc_generation_rate: float = 10.98e3
"""Data generation rate per spacecraft, without margin [bps]."""
priority_sc_generation_rate: float = 3.58e3
"""Priority-1 data generation rate per spacecraft, without margin [bps]."""
generation_rate_margin: float = 0.5
"""Margin on the data generation rate per spacecraft (relative value)."""
packet_overhead: float = 0.1
"""Packet overhead for data generation (relative value)."""
downlink_overhead: float = 20e3
"""Overhead for data downlink [bps]."""
sc2sc_transfer_time: float = 2.5e9 / c
"""Spacecraft-to-spacecraft transfer time [s]."""
sc2ground_transfer_time: float = 50e9 / c
"""Spacecraft-to-ground transfer time [s]."""
packet_loss_rate: float = 0.0
"""Data packet loss rate during transfer."""
@cached_property
def session_idle_time(self) -> float:
"""Time between two telemetry sessions [s]."""
return self.session_interval - self.session_duration
@cached_property
def total_transfer_time(self) -> float:
"""Total transfer time [s].
This includes the time to transfer data between spacecraft and from the
time to transfer data from the spacecraft to the ground station.
"""
return self.sc2sc_transfer_time + self.sc2ground_transfer_time
@cached_property
def constellation_generation_rate(self) -> float:
"""Data generation rate for the whole constellation [bps].
This is for 3 spacecraft, including margins and overheads.
"""
return (
3
* self.sc_generation_rate
* (1 + self.generation_rate_margin)
* (1 + self.packet_overhead)
+ self.downlink_overhead
)
@cached_property
def priority_constellation_generation_rate(self) -> float:
"""Priority-1 data generation rate for the whole constellation [bps].
This is for 3 spacecraft, including margins and overheads.
"""
# TODO: This does not include downlink overhead?
return (
3
* self.priority_sc_generation_rate
* (1 + self.generation_rate_margin)
* (1 + self.packet_overhead)
)
@cached_property
def archived_data_volume_per_session(self) -> float:
"""Volume of archived data per session including marging [bits].
This represent all the data (including low-priority data) that has been
generated since the last telemetry session; note that only priority data
has been sent during the previous session, so we need to resend
everything (this is why we are using :attr:`session_interval` and not
:attr:`session_idle_time`).
"""
return self.constellation_generation_rate * self.session_interval
@cached_property
def download_rate(self) -> float:
"""Necessary download rate to transmit all archived data [bps]."""
return self.archived_data_volume_per_session / self.session_duration
@cached_property
def data_volume_per_priority_segment(self) -> float:
"""Volume of data per segment [bits]."""
return self.segment_duration * self.priority_constellation_generation_rate
@cached_property
def download_time_per_priority_segment(self) -> float:
"""Time to download a segment [s]."""
return self.data_volume_per_priority_segment / self.download_rate
@cached_property
def idle_time_between_priority_live_segments(self) -> float:
"""Idle time between two live data segments [s].
This time is typically used to download archived data.
"""
return self.segment_duration - self.download_time_per_priority_segment
@cached_property
def downloaded_segments_between_priority_live_segments(self) -> int:
"""Number of segments that can be downloaded between live segments."""
return math.floor(
self.idle_time_between_priority_live_segments
* self.download_rate
/ self.data_volume_per_priority_segment
)
@cached_property
def archived_priority_data_segments(self) -> int:
"""Number of priority-1 archived segments to be downloaded in a session.
In Chop Chop, we focus on priority-1 (science) data.
"""
return math.ceil(
self.session_idle_time
* self.priority_constellation_generation_rate
/ self.data_volume_per_priority_segment
)
@cached_property
def priority_archived_data_download_time(self) -> float:
"""Time to download all priority-1 archived data during a session [s].
After this time, we will have downloaded all the archived data and we
only need to transmit the live data (and eventually resend lost
packets).
"""
return (
self.archived_priority_data_segments
/ self.downloaded_segments_between_priority_live_segments
* self.segment_duration
)
[docs]
def print_derived_parameters(self) -> None:
"""Print a summary of derived parameters."""
print(
"Volume of archived data to download per telemetry session:"
f" {self.archived_data_volume_per_session / 1e6:.3f} Mbits\n"
f"Necessary download rate to send all live data in a session:"
f" {self.download_rate / 1e3:.1f} kbps\n"
f"Volume of priority-1 data in one segement:"
f" {self.data_volume_per_priority_segment / 1e3} kbits\n"
f"Download time for one priority-1 data segment:"
f" {self.download_time_per_priority_segment:.1f} s\n"
f"Number of segments that can be downloaded between priority live segments:"
f" {self.downloaded_segments_between_priority_live_segments}\n"
f"Time to download all priority-1 archived data:"
f" {self.priority_archived_data_download_time / 3600:.1f} h",
)
[docs]
@attrs.frozen(slots=True)
class DataSegment:
"""Data segment."""
start_time: Time
"""Segment start time [s]."""
duration: float
"""Segment duration [s]."""
@property
def end_time(self) -> Time:
"""Segment end time [s]."""
return self.start_time + self.duration
[docs]
@classmethod
def from_start_end(cls, start_time: Time, end_time: Time) -> DataSegment:
"""Create a data segment from start and end times."""
return cls(start_time, end_time - start_time)
def __contains__(self, time: Time) -> bool:
"""Check if a time is within the segment.
Parameters
----------
time : float
Time to check [s].
Returns
-------
bool
True if the time is within the segment.
"""
return self.start_time <= time <= self.end_time
[docs]
def overlaps(self, other: DataSegment) -> bool:
"""Check if the segment overlaps another segment.
Parameters
----------
other : DataSegment
Other segment.
Returns
-------
bool
True if the segments overlap.
"""
return self.start_time < other.end_time and other.start_time < self.end_time
[docs]
def merge(self, other: DataSegment) -> DataSegment:
"""Merge two segments.
Parameters
----------
other : DataSegment
Other segment.
Returns
-------
DataSegment
Merged segment.
Raises
------
ValueError
If the segments do not overlap.
"""
if not self.overlaps(other):
raise ValueError("Segments do not overlap")
return DataSegment.from_start_end(
min(self.start_time, other.start_time),
max(self.end_time, other.end_time),
)
def __repr__(self) -> str:
return f"DataSegment({self.start_time:.1f} to {self.end_time:.1f})"
[docs]
def merge_segements(
segments: list[DataSegment], *, ordered: bool = False
) -> list[DataSegment]:
"""Merge overlapping segments.
Parameters
----------
segments : list[DataSegment]
List of data segments.
ordered : bool
If True, the segments are assumed to be ordered by start time.
Returns
-------
list[DataSegment]
List of merged data segments as a new list.
"""
# We start by sorting the segments by start time
if not ordered:
sorted_segments = sorted(segments, key=lambda x: x.start_time)
else:
sorted_segments = segments
# We merge the segments
merged_segments: list[DataSegment] = []
current_segment = sorted_segments[0]
for segment in sorted_segments[1:]:
if current_segment.overlaps(segment):
current_segment = current_segment.merge(segment)
else:
merged_segments.append(current_segment)
current_segment = segment
merged_segments.append(current_segment)
return merged_segments
DataPacket: TypeAlias = DataSegment
"""For now, we represent a data packet as a data segment."""
[docs]
class DefaultTelemetryStrategy:
"""Define the default download strategy.
In this strategy, we download the live data segments as soon as they are
available, and we download the archived data segments in the idle time
between two live segments.
Packets that are lost during the download are retransmitted at the end of
the download queue between live segments, once all archived data has been
downloaded.
"""
def simulate_telemetry(
self, context: InstrumentContext, seed: int
) -> TelemetrySimulation:
"""Simulate a telemetry session.
Parameters
----------
context : InstrumentContext
Instrument context.
seed : int
Random seed used for data packet loss simulation.
Returns
-------
TelemetrySimulation
Telemetry simulation results.
"""
# Create random number generator from seed
random_gen = np.random.default_rng(seed)
# First element is the first element to be downloaded
download_queue: list[DataSegment] = [
DataSegment(-i * context.segment_duration, context.segment_duration)
for i in range(1, context.archived_priority_data_segments)
]
# List to be filled by the simulation
reception_times: list[Time] = []
received_packets: list[DataPacket] = []
lost_packets: list[tuple[Time, DataPacket]] = []
# Time at the communicating spacecraft
sc_current_time = 0.0
# Time at which the next live segment will be available
next_live_segment_start_time = 0.0
while sc_current_time < context.session_duration:
# Clean temp vars to avoid being polluted by the previous loop
new_segment = None
# Check if new kive segment is available
if sc_current_time >= next_live_segment_start_time:
# Live segments have priority
live_segment = DataSegment(
next_live_segment_start_time, context.segment_duration
)
download_queue.insert(0, live_segment)
next_live_segment_start_time += context.segment_duration
# Try to download a segment
try:
new_segment = download_queue.pop(0)
except IndexError:
# Need to wait for the next NRT segment
sc_current_time = next_live_segment_start_time
continue
if random_gen.random() < context.packet_loss_rate:
# Lost packet so add it at the end of the download queue
lost_packets.append((sc_current_time, new_segment))
download_queue.append(new_segment)
else:
# Add the segment to the list of downloaded segment
reception_times.append(sc_current_time)
received_packets.append(new_segment)
# Update the time
sc_current_time += context.download_time_per_priority_segment
return TelemetrySimulation(
context=context,
strategy=self,
reception_times=reception_times,
received_packets=received_packets,
lost_packets=lost_packets,
)
def simulate_telemetry(
strategy: DefaultTelemetryStrategy,
context: InstrumentContext | None = None,
seed: int = 0,
) -> TelemetrySimulation:
"""Simulate a telemetry session.
Parameters
----------
strategy : TelemetryStrategy
Telemetry strategy.
context : InstrumentContext or None
Instrument context. If None, the default context is used.
seed : int
Random seed used for data packet loss simulation.
Returns
-------
TelemetrySimulation
Telemetry simulation results.
"""
# Use default context if not provided
if context is None:
context = InstrumentContext()
return strategy.simulate_telemetry(context, seed)
[docs]
@attrs.frozen(kw_only=True)
class TelemetrySimulation:
"""Results of a telemetry simulation."""
context: InstrumentContext
"""Instrument context."""
strategy: DefaultTelemetryStrategy
"""Telemetry strategy."""
reception_times: list[Time]
"""List of data packet reception times (in increasing order) [s]."""
received_packets: list[DataPacket]
"""List of received packets in the same order as :attr:`reception_times`."""
lost_packets: list[tuple[Time, DataPacket]]
"""List of lost packets and their expected reception times [s]."""
@cached_property
def accumulated_data(self) -> list[list[DataSegment]]:
"""Available data at each data packet reception time [s].
For each packet reception time in :attr:`reception_times`, we provide
the list of contiguous data segments available at that time.
"""
accumulated_data: list[list[DataSegment]] = []
current_data: list[DataSegment] = []
for packet in self.received_packets:
current_data = merge_segements(current_data + [packet])
accumulated_data.append(current_data)
return accumulated_data
[docs]
def compute_masks(self, data_times: ArrayLike) -> np.ndarray:
"""Compute masks to be applied on a continuous data array.
Each mask select the available (accumulated packets) data segments at
each reception time, given in :attr:`reception_times`.
Parameters
----------
data_times : array-like of shape (M,)
Times attached to the data array to be masked [s]. The times should
be represented as the number of seconds since the current telemetry
session start.
Returns
-------
Array, of shape (N, M)
The masks are returned as a 2-dimensional array. The first dimension
corresponds to the data packet reception time (when the mask becomes
valid) and is of size ``N = len(self.reception_times)``, and the
second dimension corresponds to the data times.
"""
data_times = np.asarray(data_times)
masks = np.full((len(self.reception_times), len(data_times)), False)
for i, segment in enumerate(self.received_packets):
condition = np.vectorize(segment.__contains__)(data_times)
masks[i:, condition] = True
return masks
[docs]
def plot_received_segments(self) -> None:
"""Plot received segments."""
plt.figure(figsize=(10, 7))
for trec, packet in zip(self.reception_times, self.received_packets):
plt.plot(
np.array([packet.start_time, packet.end_time]) / 3600, # in hours
np.ones(2) * trec / 3600, # reception time in hours
color="tab:blue",
)
plt.xlabel("Received segment data coverage [h]")
plt.ylabel("Onground reception time [h]")
plt.title("Received segments")
plt.grid()
[docs]
def plot_accumulated_data(self) -> None:
"""Plot available data at each data packet reception time."""
plt.figure(figsize=(10, 7))
for trec, segments in zip(self.reception_times, self.accumulated_data):
for seg in segments:
plt.plot(
np.array([seg.start_time, seg.end_time]) / 3600, # in hours
np.ones(2) * trec / 3600, # reception time in hours
color="tab:blue",
)
plt.xlabel("Available data [h]")
plt.ylabel("Onground reception time [h]")
plt.title("Accumulated data")
plt.grid()