Source code for chopchop.core

"""
Core module
===========

This module provides functions to compute and visualize data delivery scenarios.

Instrument context
------------------

The :class:`InstrumentContext` class defines the constraints from the
instrument, such as the segment duration, the session duration, the data
generation rate, etc.

The default values are based on ESA-LISA-EST-MIS-TN-0003 - LISA Science Data
Description and Budget (Iss1Rev0), assuming no data loss. You can change these
values to match your specific context.

>>> context = InstrumentContext(packet_loss_rate=0.1)

Derived parameters, such as the volume of live data to download per telemetry
session, the necessary download rate to send all live data in a session, the
time to download all archived data, etc., are computed from the context. You can
print a summary of these parameters.

>>> context.print_derived_parameters()
Volume of archived data to download per telemetry session: 6423.926 Mbits
Necessary download rate to send all live data in a session: 223.1 kbps
Volume of priority-1 data in one segement: 5316.3 kbits
Download time for one priority-1 data segment: 23.8 s
Number of segments that can be downloaded between priority live segments: 11
Time to download all 192 archived data: 1.5 h

.. autoclass:: InstrumentContext
    :members:

Telemetry strategy
------------------

We currently implement a single telemetry strategy, the
:class:`DefaultTelemetryStrategy`. In this strategy, we download the live data
segments as soon as they are available, and we download the archived data
segments in the idle time between two live segments.

Packets that are lost during the download are retransmitted at the end of the
download queue between live segments, once all archived data has been
downloaded.

.. autoclass:: DefaultTelemetryStrategy

Telemetry simulation
--------------------

You can simulate a telemetry session using the :func:`simulate_telemetry`
function. You can provide a custom telemetry strategy, an instrument context, and
a random seed for data packet loss simulation.

>>> strategy = DefaultTelemetryStrategy()
>>> simulation = simulate_telemetry(strategy)
>>> len(simulation.received_packets)
287
>>> simulation.lost_packets
[]

You can also use this function to plot the received segments and the accumulated
data at each data packet reception time.

>>> simulation.plot_received_segments()
>>> simulation.plot_accumulated_data()

.. autoclass:: TelemetrySimulation
    :members:

Types
-----

.. autoclass:: Time

.. autoclass:: DataPacket

.. autoclass:: DataSegment
    :members:

.. autofunction:: merge_segements

"""

from __future__ import annotations

import math
from functools import cached_property
from typing import TypeAlias

import attrs
import matplotlib.pyplot as plt
import numpy as np
from lisaconstants import c
from numpy.typing import ArrayLike

Time: TypeAlias = float
"""Time [s].

The time scale is here given by the user, and depends on the context.
"""


[docs] @attrs.frozen(kw_only=True) class InstrumentContext: """Defines the contrainsts from the instrument. Default values are based on ESA-LISA-EST-MIS-TN-0003 - LISA Science Data Description and Budget (Iss1Rev0), denoted as [AD1]. """ segment_duration: float = 5 * 60.0 """Duration of each segment [s]. A segment is the smallest unit of data that can be delivered. """ session_duration: float = 8 * 3600.0 """Duration of a telemetry session [s]. A telemetry session is a commmunication session between the LISA instrument and the ground station. During a telemetry session, commands are sent to the instrument and data segements are received. """ session_interval: float = 24 * 3600.0 """Interval between two telemetry sessions [s]. The interval between two telemetry sessions is the time between the beginning of two consecutive telemetry sessions. """ sc_generation_rate: float = 10.98e3 """Data generation rate per spacecraft, without margin [bps].""" priority_sc_generation_rate: float = 3.58e3 """Priority-1 data generation rate per spacecraft, without margin [bps].""" generation_rate_margin: float = 0.5 """Margin on the data generation rate per spacecraft (relative value).""" packet_overhead: float = 0.1 """Packet overhead for data generation (relative value).""" downlink_overhead: float = 20e3 """Overhead for data downlink [bps].""" sc2sc_transfer_time: float = 2.5e9 / c """Spacecraft-to-spacecraft transfer time [s].""" sc2ground_transfer_time: float = 50e9 / c """Spacecraft-to-ground transfer time [s].""" packet_loss_rate: float = 0.0 """Data packet loss rate during transfer.""" @cached_property def session_idle_time(self) -> float: """Time between two telemetry sessions [s].""" return self.session_interval - self.session_duration @cached_property def total_transfer_time(self) -> float: """Total transfer time [s]. This includes the time to transfer data between spacecraft and from the time to transfer data from the spacecraft to the ground station. """ return self.sc2sc_transfer_time + self.sc2ground_transfer_time @cached_property def constellation_generation_rate(self) -> float: """Data generation rate for the whole constellation [bps]. This is for 3 spacecraft, including margins and overheads. """ return ( 3 * self.sc_generation_rate * (1 + self.generation_rate_margin) * (1 + self.packet_overhead) + self.downlink_overhead ) @cached_property def priority_constellation_generation_rate(self) -> float: """Priority-1 data generation rate for the whole constellation [bps]. This is for 3 spacecraft, including margins and overheads. """ # TODO: This does not include downlink overhead? return ( 3 * self.priority_sc_generation_rate * (1 + self.generation_rate_margin) * (1 + self.packet_overhead) ) @cached_property def archived_data_volume_per_session(self) -> float: """Volume of archived data per session including marging [bits]. This represent all the data (including low-priority data) that has been generated since the last telemetry session; note that only priority data has been sent during the previous session, so we need to resend everything (this is why we are using :attr:`session_interval` and not :attr:`session_idle_time`). """ return self.constellation_generation_rate * self.session_interval @cached_property def download_rate(self) -> float: """Necessary download rate to transmit all archived data [bps].""" return self.archived_data_volume_per_session / self.session_duration @cached_property def data_volume_per_priority_segment(self) -> float: """Volume of data per segment [bits].""" return self.segment_duration * self.priority_constellation_generation_rate @cached_property def download_time_per_priority_segment(self) -> float: """Time to download a segment [s].""" return self.data_volume_per_priority_segment / self.download_rate @cached_property def idle_time_between_priority_live_segments(self) -> float: """Idle time between two live data segments [s]. This time is typically used to download archived data. """ return self.segment_duration - self.download_time_per_priority_segment @cached_property def downloaded_segments_between_priority_live_segments(self) -> int: """Number of segments that can be downloaded between live segments.""" return math.floor( self.idle_time_between_priority_live_segments * self.download_rate / self.data_volume_per_priority_segment ) @cached_property def archived_priority_data_segments(self) -> int: """Number of priority-1 archived segments to be downloaded in a session. In Chop Chop, we focus on priority-1 (science) data. """ return math.ceil( self.session_idle_time * self.priority_constellation_generation_rate / self.data_volume_per_priority_segment ) @cached_property def priority_archived_data_download_time(self) -> float: """Time to download all priority-1 archived data during a session [s]. After this time, we will have downloaded all the archived data and we only need to transmit the live data (and eventually resend lost packets). """ return ( self.archived_priority_data_segments / self.downloaded_segments_between_priority_live_segments * self.segment_duration )
[docs] def print_derived_parameters(self) -> None: """Print a summary of derived parameters.""" print( "Volume of archived data to download per telemetry session:" f" {self.archived_data_volume_per_session / 1e6:.3f} Mbits\n" f"Necessary download rate to send all live data in a session:" f" {self.download_rate / 1e3:.1f} kbps\n" f"Volume of priority-1 data in one segement:" f" {self.data_volume_per_priority_segment / 1e3} kbits\n" f"Download time for one priority-1 data segment:" f" {self.download_time_per_priority_segment:.1f} s\n" f"Number of segments that can be downloaded between priority live segments:" f" {self.downloaded_segments_between_priority_live_segments}\n" f"Time to download all priority-1 archived data:" f" {self.priority_archived_data_download_time / 3600:.1f} h", )
[docs] @attrs.frozen(slots=True) class DataSegment: """Data segment.""" start_time: Time """Segment start time [s].""" duration: float """Segment duration [s].""" @property def end_time(self) -> Time: """Segment end time [s].""" return self.start_time + self.duration
[docs] @classmethod def from_start_end(cls, start_time: Time, end_time: Time) -> DataSegment: """Create a data segment from start and end times.""" return cls(start_time, end_time - start_time)
def __contains__(self, time: Time) -> bool: """Check if a time is within the segment. Parameters ---------- time : float Time to check [s]. Returns ------- bool True if the time is within the segment. """ return self.start_time <= time <= self.end_time
[docs] def overlaps(self, other: DataSegment) -> bool: """Check if the segment overlaps another segment. Parameters ---------- other : DataSegment Other segment. Returns ------- bool True if the segments overlap. """ return self.start_time < other.end_time and other.start_time < self.end_time
[docs] def merge(self, other: DataSegment) -> DataSegment: """Merge two segments. Parameters ---------- other : DataSegment Other segment. Returns ------- DataSegment Merged segment. Raises ------ ValueError If the segments do not overlap. """ if not self.overlaps(other): raise ValueError("Segments do not overlap") return DataSegment.from_start_end( min(self.start_time, other.start_time), max(self.end_time, other.end_time), )
def __repr__(self) -> str: return f"DataSegment({self.start_time:.1f} to {self.end_time:.1f})"
[docs] def merge_segements( segments: list[DataSegment], *, ordered: bool = False ) -> list[DataSegment]: """Merge overlapping segments. Parameters ---------- segments : list[DataSegment] List of data segments. ordered : bool If True, the segments are assumed to be ordered by start time. Returns ------- list[DataSegment] List of merged data segments as a new list. """ # We start by sorting the segments by start time if not ordered: sorted_segments = sorted(segments, key=lambda x: x.start_time) else: sorted_segments = segments # We merge the segments merged_segments: list[DataSegment] = [] current_segment = sorted_segments[0] for segment in sorted_segments[1:]: if current_segment.overlaps(segment): current_segment = current_segment.merge(segment) else: merged_segments.append(current_segment) current_segment = segment merged_segments.append(current_segment) return merged_segments
DataPacket: TypeAlias = DataSegment """For now, we represent a data packet as a data segment."""
[docs] class DefaultTelemetryStrategy: """Define the default download strategy. In this strategy, we download the live data segments as soon as they are available, and we download the archived data segments in the idle time between two live segments. Packets that are lost during the download are retransmitted at the end of the download queue between live segments, once all archived data has been downloaded. """ def simulate_telemetry( self, context: InstrumentContext, seed: int ) -> TelemetrySimulation: """Simulate a telemetry session. Parameters ---------- context : InstrumentContext Instrument context. seed : int Random seed used for data packet loss simulation. Returns ------- TelemetrySimulation Telemetry simulation results. """ # Create random number generator from seed random_gen = np.random.default_rng(seed) # First element is the first element to be downloaded download_queue: list[DataSegment] = [ DataSegment(-i * context.segment_duration, context.segment_duration) for i in range(1, context.archived_priority_data_segments) ] # List to be filled by the simulation reception_times: list[Time] = [] received_packets: list[DataPacket] = [] lost_packets: list[tuple[Time, DataPacket]] = [] # Time at the communicating spacecraft sc_current_time = 0.0 # Time at which the next live segment will be available next_live_segment_start_time = 0.0 while sc_current_time < context.session_duration: # Clean temp vars to avoid being polluted by the previous loop new_segment = None # Check if new kive segment is available if sc_current_time >= next_live_segment_start_time: # Live segments have priority live_segment = DataSegment( next_live_segment_start_time, context.segment_duration ) download_queue.insert(0, live_segment) next_live_segment_start_time += context.segment_duration # Try to download a segment try: new_segment = download_queue.pop(0) except IndexError: # Need to wait for the next NRT segment sc_current_time = next_live_segment_start_time continue if random_gen.random() < context.packet_loss_rate: # Lost packet so add it at the end of the download queue lost_packets.append((sc_current_time, new_segment)) download_queue.append(new_segment) else: # Add the segment to the list of downloaded segment reception_times.append(sc_current_time) received_packets.append(new_segment) # Update the time sc_current_time += context.download_time_per_priority_segment return TelemetrySimulation( context=context, strategy=self, reception_times=reception_times, received_packets=received_packets, lost_packets=lost_packets, )
def simulate_telemetry( strategy: DefaultTelemetryStrategy, context: InstrumentContext | None = None, seed: int = 0, ) -> TelemetrySimulation: """Simulate a telemetry session. Parameters ---------- strategy : TelemetryStrategy Telemetry strategy. context : InstrumentContext or None Instrument context. If None, the default context is used. seed : int Random seed used for data packet loss simulation. Returns ------- TelemetrySimulation Telemetry simulation results. """ # Use default context if not provided if context is None: context = InstrumentContext() return strategy.simulate_telemetry(context, seed)
[docs] @attrs.frozen(kw_only=True) class TelemetrySimulation: """Results of a telemetry simulation.""" context: InstrumentContext """Instrument context.""" strategy: DefaultTelemetryStrategy """Telemetry strategy.""" reception_times: list[Time] """List of data packet reception times (in increasing order) [s].""" received_packets: list[DataPacket] """List of received packets in the same order as :attr:`reception_times`.""" lost_packets: list[tuple[Time, DataPacket]] """List of lost packets and their expected reception times [s].""" @cached_property def accumulated_data(self) -> list[list[DataSegment]]: """Available data at each data packet reception time [s]. For each packet reception time in :attr:`reception_times`, we provide the list of contiguous data segments available at that time. """ accumulated_data: list[list[DataSegment]] = [] current_data: list[DataSegment] = [] for packet in self.received_packets: current_data = merge_segements(current_data + [packet]) accumulated_data.append(current_data) return accumulated_data
[docs] def compute_masks(self, data_times: ArrayLike) -> np.ndarray: """Compute masks to be applied on a continuous data array. Each mask select the available (accumulated packets) data segments at each reception time, given in :attr:`reception_times`. Parameters ---------- data_times : array-like of shape (M,) Times attached to the data array to be masked [s]. The times should be represented as the number of seconds since the current telemetry session start. Returns ------- Array, of shape (N, M) The masks are returned as a 2-dimensional array. The first dimension corresponds to the data packet reception time (when the mask becomes valid) and is of size ``N = len(self.reception_times)``, and the second dimension corresponds to the data times. """ data_times = np.asarray(data_times) masks = np.full((len(self.reception_times), len(data_times)), False) for i, segment in enumerate(self.received_packets): condition = np.vectorize(segment.__contains__)(data_times) masks[i:, condition] = True return masks
[docs] def plot_received_segments(self) -> None: """Plot received segments.""" plt.figure(figsize=(10, 7)) for trec, packet in zip(self.reception_times, self.received_packets): plt.plot( np.array([packet.start_time, packet.end_time]) / 3600, # in hours np.ones(2) * trec / 3600, # reception time in hours color="tab:blue", ) plt.xlabel("Received segment data coverage [h]") plt.ylabel("Onground reception time [h]") plt.title("Received segments") plt.grid()
[docs] def plot_accumulated_data(self) -> None: """Plot available data at each data packet reception time.""" plt.figure(figsize=(10, 7)) for trec, segments in zip(self.reception_times, self.accumulated_data): for seg in segments: plt.plot( np.array([seg.start_time, seg.end_time]) / 3600, # in hours np.ones(2) * trec / 3600, # reception time in hours color="tab:blue", ) plt.xlabel("Available data [h]") plt.ylabel("Onground reception time [h]") plt.title("Accumulated data") plt.grid()