Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zmqreceiver can handle inaccurate ts/more than one frame outstanding. #730

Merged
merged 1 commit into from
Jun 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions gamutrf/zmqreceiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(
self.context = zstandard.ZstdDecompressor()
self.txt_buf = ""
self.fftbuffer = None
self.last_sweep_start = 0
self.scan_configs = {}
self.proxy_result = executor.submit(
proxy, addr, port, self.buff_file, live_file=live_file
)
Expand Down Expand Up @@ -149,27 +149,27 @@ def txtbuf_to_lines(self, log):

def read_new_frame_df(self, df, discard_time):
frame_df = None
scan_config = None
if discard_time:
df = df[(time.time() - df.ts).abs() < discard_time]
if df.size:
lastfreq = df.freq.iat[-1]
lastfreq = df["freq"].iat[-1]
logging.info("last frequency read %f MHz", lastfreq / 1e6)
max_sweep_start = df["sweep_start"].max()
if max_sweep_start != self.last_sweep_start:
if self.fftbuffer is None:
frame_df = df
else:
frame_df = pd.concat(
[self.fftbuffer, df[df["sweep_start"] == self.last_sweep_start]]
)
self.fftbuffer = df[df["sweep_start"] != self.last_sweep_start]
self.last_sweep_start = max_sweep_start
if self.fftbuffer is None:
self.fftbuffer = df
else:
if self.fftbuffer is None:
self.fftbuffer = df
else:
self.fftbuffer = pd.concat([self.fftbuffer, df])
return frame_df
self.fftbuffer = pd.concat([self.fftbuffer, df])
if self.fftbuffer["sweep_start"].nunique() > 1:
min_sweep_start = self.fftbuffer["sweep_start"].min()
frame_df = self.fftbuffer[
self.fftbuffer["sweep_start"] == min_sweep_start
].copy()
self.fftbuffer = self.fftbuffer[
self.fftbuffer["sweep_start"] != min_sweep_start
]
scan_config = self.scan_configs[min_sweep_start]
del self.scan_configs[min_sweep_start]
return (scan_config, frame_df)

def lines_to_df(self, lines):
try:
Expand All @@ -181,6 +181,7 @@ def lines_to_df(self, lines):
sweep_start = float(json_record["sweep_start"])
buckets = json_record["buckets"]
scan_config = json_record["config"]
self.scan_configs[sweep_start] = scan_config
records.extend(
[
{
Expand All @@ -192,19 +193,19 @@ def lines_to_df(self, lines):
for freq, db in buckets.items()
]
)
return (scan_config, pd.DataFrame(records))
return pd.DataFrame(records)
except ValueError as err:
logging.error(str(err))
return (None, None)
return None

def read_buff(self, log, discard_time):
scan_config = None
frame_df = None
if self.read_buff_file():
lines = self.txtbuf_to_lines(log)
if lines:
scan_config, df = self.lines_to_df(lines)
frame_df = self.read_new_frame_df(df, discard_time)
df = self.lines_to_df(lines)
scan_config, frame_df = self.read_new_frame_df(df, discard_time)
return scan_config, frame_df


Expand Down
Loading