From 3f4a6a7294f3a7be615daa2137d5fb7b93a98f44 Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Sun, 18 Jun 2023 17:19:48 +0000 Subject: [PATCH] zmqreceiver can handle inaccurate ts/more than one frame outstanding. --- gamutrf/zmqreceiver.py | 43 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/gamutrf/zmqreceiver.py b/gamutrf/zmqreceiver.py index 5ea8f1c5..8479473f 100644 --- a/gamutrf/zmqreceiver.py +++ b/gamutrf/zmqreceiver.py @@ -111,7 +111,7 @@ def __init__( self.context = zstandard.ZstdDecompressor() self.txt_buf = "" self.fftbuffer = None - self.last_sweep_start = 0 + self.scan_configs = {} self.proxy_result = executor.submit( proxy, addr, port, self.buff_file, live_file=live_file ) @@ -149,27 +149,27 @@ def txtbuf_to_lines(self, log): def read_new_frame_df(self, df, discard_time): frame_df = None + scan_config = None if discard_time: df = df[(time.time() - df.ts).abs() < discard_time] if df.size: - lastfreq = df.freq.iat[-1] + lastfreq = df["freq"].iat[-1] logging.info("last frequency read %f MHz", lastfreq / 1e6) - max_sweep_start = df["sweep_start"].max() - if max_sweep_start != self.last_sweep_start: - if self.fftbuffer is None: - frame_df = df - else: - frame_df = pd.concat( - [self.fftbuffer, df[df["sweep_start"] == self.last_sweep_start]] - ) - self.fftbuffer = df[df["sweep_start"] != self.last_sweep_start] - self.last_sweep_start = max_sweep_start + if self.fftbuffer is None: + self.fftbuffer = df else: - if self.fftbuffer is None: - self.fftbuffer = df - else: - self.fftbuffer = pd.concat([self.fftbuffer, df]) - return frame_df + self.fftbuffer = pd.concat([self.fftbuffer, df]) + if self.fftbuffer["sweep_start"].nunique() > 1: + min_sweep_start = self.fftbuffer["sweep_start"].min() + frame_df = self.fftbuffer[ + self.fftbuffer["sweep_start"] == min_sweep_start + ].copy() + self.fftbuffer = self.fftbuffer[ + self.fftbuffer["sweep_start"] != min_sweep_start + ] + scan_config = self.scan_configs[min_sweep_start] + del self.scan_configs[min_sweep_start] + return (scan_config, frame_df) def lines_to_df(self, lines): try: @@ -181,6 +181,7 @@ def lines_to_df(self, lines): sweep_start = float(json_record["sweep_start"]) buckets = json_record["buckets"] scan_config = json_record["config"] + self.scan_configs[sweep_start] = scan_config records.extend( [ { @@ -192,10 +193,10 @@ def lines_to_df(self, lines): for freq, db in buckets.items() ] ) - return (scan_config, pd.DataFrame(records)) + return pd.DataFrame(records) except ValueError as err: logging.error(str(err)) - return (None, None) + return None def read_buff(self, log, discard_time): scan_config = None @@ -203,8 +204,8 @@ def read_buff(self, log, discard_time): if self.read_buff_file(): lines = self.txtbuf_to_lines(log) if lines: - scan_config, df = self.lines_to_df(lines) - frame_df = self.read_new_frame_df(df, discard_time) + df = self.lines_to_df(lines) + scan_config, frame_df = self.read_new_frame_df(df, discard_time) return scan_config, frame_df