From d9921f541e29664d8f3a78c68e74ca52ce88a8ce Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 26 Jan 2026 18:16:05 +0000 Subject: [PATCH 1/3] Split stream ID into variable and channel ID --- docs/debugging.md | 6 +++--- src/controller.py | 6 ++++-- src/csv_writer.py | 15 +++++++++------ src/locations.py | 4 ++-- src/pipeline/Snakefile | 11 +++++++---- src/pseudon/pseudon.py | 14 ++++++++------ tests/test_file_writer.py | 11 ++++++----- 7 files changed, 39 insertions(+), 28 deletions(-) diff --git a/docs/debugging.md b/docs/debugging.md index 4ef6968..c87d0d6 100644 --- a/docs/debugging.md +++ b/docs/debugging.md @@ -81,7 +81,7 @@ Top-level Snakemake run logs, including: Unlike data files, the timestamps in these file names are when the snakemake pipeline was invoked. -### `{date}.{hashed_csn}.{stream_id}.{units}.log` +### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.log` Job-level log for the `csv_to_parquet` rule. Contains: - CSV -> parquet info - pseudonymisation steps @@ -90,12 +90,12 @@ Job-level log for the `csv_to_parquet` rule. Contains: Produced under `waveform-export/ftps-logs/`. -### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.log` +### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.ftps.log` Job-level FTPS upload logs. Useful for: - connection/authentication errors - transfer failures -### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.uploaded.json` +### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.ftps.uploaded.json` Upload marker file (aka sentinel) written after a successful transfer. It contains, in JSON format: - `uploaded_file` (the uploaded file path) diff --git a/src/controller.py b/src/controller.py index ccaf776..6d57f31 100644 --- a/src/controller.py +++ b/src/controller.py @@ -47,7 +47,8 @@ def waveform_callback(ch, method_frame, _header_frame, body): try: location_string = data["mappedLocationString"] observation_timestamp = data["observationTime"] - source_stream_id = data["sourceStreamId"] + source_variable_id = data["sourceVariableId"] + source_channel_id = data["sourceChannelId"] sampling_rate = data["samplingRate"] units = data["unit"] waveform_data = data["numericValues"] @@ -79,7 +80,8 @@ def waveform_callback(ch, method_frame, _header_frame, body): if writer.write_frame( waveform_data, - source_stream_id, + source_variable_id, + source_channel_id, observation_timestamp, units, sampling_rate, diff --git a/src/csv_writer.py b/src/csv_writer.py index 47b128f..97dd805 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -6,19 +6,21 @@ def create_file_name( - source_stream_id: str, observation_time: datetime, csn: str, units: str + source_variable_id: str, source_channel_id: str, + observation_time: datetime, csn: str, units: str ) -> str: """Create a unique file name based on the patient contact serial number (csn) the date, and the source system.""" datestring = observation_time.strftime("%Y-%m-%d") units = units.replace("/", "p") units = units.replace("%", "percent") - return f"{datestring}.{csn}.{source_stream_id}.{units}.csv" + return f"{datestring}.{csn}.{source_variable_id}.{source_channel_id}.{units}.csv" def write_frame( waveform_data: dict, - source_stream_id: str, + source_variable_id: str, + source_channel_id: str, observation_timestamp: float, units: str, sampling_rate: int, @@ -35,14 +37,14 @@ def write_frame( WAVEFORM_ORIGINAL_CSV.mkdir(exist_ok=True, parents=False) filename = WAVEFORM_ORIGINAL_CSV / create_file_name( - source_stream_id, observation_datetime, csn, units + source_variable_id, source_channel_id, observation_datetime, csn, units ) # write header if is new file if not filename.exists(): with open(filename, "w") as fileout: fileout.write( - "csn,mrn,source_stream_id,units,sampling_rate,timestamp,location,values\n" + "csn,mrn,source_variable_id,source_channel_id,units,sampling_rate,timestamp,location,values\n" ) with open(filename, "a") as fileout: @@ -53,7 +55,8 @@ def write_frame( [ csn, mrn, - source_stream_id, + source_variable_id, + source_channel_id, units, sampling_rate, observation_timestamp, diff --git a/src/locations.py b/src/locations.py index ff37838..64bdb90 100644 --- a/src/locations.py +++ b/src/locations.py @@ -9,8 +9,8 @@ # file patterns -FILE_STEM_PATTERN = "{date}.{csn}.{stream_id}.{units}" -FILE_STEM_PATTERN_HASHED = "{date}.{hashed_csn}.{stream_id}.{units}" +FILE_STEM_PATTERN = "{date}.{csn}.{variable_id}.{channel_id}.{units}" +FILE_STEM_PATTERN_HASHED = "{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}" CSV_PATTERN = WAVEFORM_ORIGINAL_CSV / (FILE_STEM_PATTERN + ".csv") ORIGINAL_PARQUET_PATTERN = WAVEFORM_ORIGINAL_PARQUET / (FILE_STEM_PATTERN + ".parquet") PSEUDONYMISED_PARQUET_PATTERN = WAVEFORM_PSEUDONYMISED_PARQUET / ( diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 89bc22b..30a4ef1 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -59,12 +59,14 @@ def determine_eventual_outputs(): _hash_to_csn[hash_csn(csn)] = csn # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems _all_outputs = [] - for date, csn, stream_id, units in zip(all_wc.date, all_wc.csn, all_wc.stream_id, all_wc.units): + for date, csn, variable_id, channel_id, units \ + in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units): subs_dict = dict( date = date, csn = csn, hashed_csn = hash_csn(csn), - stream_id = stream_id, + variable_id = variable_id, + channel_id = channel_id, units = units ) orig_file = Path(str(CSV_PATTERN).format(**subs_dict)) @@ -105,7 +107,7 @@ rule all: def input_file_maker(wc): unhashed_csn = hash_to_csn[wc.hashed_csn] # when using input functions, snakemake doesn't do its normal templating, you have to do it, hence the f-string - return WAVEFORM_ORIGINAL_CSV / f"{wc.date}.{unhashed_csn}.{wc.stream_id}.{wc.units}.csv" + return WAVEFORM_ORIGINAL_CSV / f"{wc.date}.{unhashed_csn}.{wc.variable_id}.{wc.channel_id}.{wc.units}.csv" rule csv_to_parquet: input: @@ -126,7 +128,8 @@ rule csv_to_parquet: date_str=wildcards.date, original_csn=original_csn, hashed_csn=wildcards.hashed_csn, - variable_id=wildcards.stream_id, + variable_id=wildcards.variable_id, + channel_id=wildcards.channel_id, units=wildcards.units) diff --git a/src/pseudon/pseudon.py b/src/pseudon/pseudon.py index 5f18c3f..57f29bf 100644 --- a/src/pseudon/pseudon.py +++ b/src/pseudon/pseudon.py @@ -25,7 +25,7 @@ def pseudon_cli(): def csv_to_parquets( - *, date_str: str, original_csn: str, hashed_csn: str, variable_id: str, units: str + *, date_str: str, original_csn: str, hashed_csn: str, variable_id: str, channel_id: str, units: str ) -> None: """Convert CSV data (with full identifiers) to two versions in parquet format: @@ -47,7 +47,7 @@ def csv_to_parquets( csv_path = Path( str(CSV_PATTERN).format( - date=date_str, csn=original_csn, stream_id=variable_id, units=units + date=date_str, csn=original_csn, variable_id=variable_id, channel_id=channel_id, units=units ) ) # it's in the csv_path, but at least nowhere else! @@ -61,7 +61,8 @@ def csv_to_parquets( dtype={ "csn": str, "mrn": str, - "source_stream_id": str, + "source_variable_id": str, + "source_channel_id": str, "units": str, "sampling_rate": int, "timestamp": float, @@ -83,7 +84,8 @@ def parse_array(x): [ ("csn", pa.string()), ("mrn", pa.string()), - ("source_stream_id", pa.string()), + ("source_variable_id", pa.string()), + ("source_channel_id", pa.string()), ("units", pa.string()), ("sampling_rate", pa.int32()), ("timestamp", pa.float64()), @@ -120,7 +122,7 @@ def parse_array(x): hashed_path = Path( str(PSEUDONYMISED_PARQUET_PATTERN).format( - date=date_str, hashed_csn=hashed_csn, stream_id=variable_id, units=units + date=date_str, hashed_csn=hashed_csn, variable_id=variable_id, channel_id=channel_id, units=units ) ) pq.write_table( @@ -136,7 +138,7 @@ def parse_array(x): ) -SAFE_COLUMNS = ["sampling_rate", "source_stream_id", "timestamp", "units", "values"] +SAFE_COLUMNS = ["sampling_rate", "source_variable_id", "source_channel_id", "timestamp", "units", "values"] def pseudonymise_relevant_columns(df: pd.DataFrame): diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 1d15b51..0a97e3b 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -6,17 +6,18 @@ @pytest.mark.parametrize( "units, expected_filename", [ - ("uV", "2025-01-01.12345678.11.uV.csv"), - ("mL/s", "2025-01-01.12345678.11.mLps.csv"), - ("%", "2025-01-01.12345678.11.percent.csv"), + ("uV", "2025-01-01.12345678.11.3.uV.csv"), + ("mL/s", "2025-01-01.12345678.11.3.mLps.csv"), + ("%", "2025-01-01.12345678.11.3.percent.csv"), ], ) def test_create_file_name_handles_units(units, expected_filename, tmp_path): - sourceStreamId = "11" + source_variable_id = "11" + source_channel_id = "3" observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc) csn = "12345678" - filename = create_file_name(sourceStreamId, observationTime, csn, units) + filename = create_file_name(source_variable_id, source_channel_id, observationTime, csn, units) assert filename == expected_filename From f8547e82376119bff443444d4296483d0b39727e Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 26 Jan 2026 18:16:23 +0000 Subject: [PATCH 2/3] Use "noCh" rather than "None" if there is no channel so it looks more deliberate. --- src/csv_writer.py | 13 +++++++++++-- src/locations.py | 9 +++++++++ src/pipeline/Snakefile | 23 +++++++++++++---------- tests/test_file_writer.py | 15 +++++++-------- 4 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/csv_writer.py b/src/csv_writer.py index 97dd805..e1333df 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -2,7 +2,8 @@ import csv from datetime import datetime -from locations import WAVEFORM_ORIGINAL_CSV + +from locations import WAVEFORM_ORIGINAL_CSV, make_file_name, FILE_STEM_PATTERN def create_file_name( @@ -14,7 +15,15 @@ def create_file_name( datestring = observation_time.strftime("%Y-%m-%d") units = units.replace("/", "p") units = units.replace("%", "percent") - return f"{datestring}.{csn}.{source_variable_id}.{source_channel_id}.{units}.csv" + subs_dict = dict( + date=datestring, + csn=csn, + variable_id=source_variable_id, + channel_id=source_channel_id, + units=units, + ) + stem = make_file_name(FILE_STEM_PATTERN, subs_dict) + return f"{stem}.csv" def write_frame( diff --git a/src/locations.py b/src/locations.py index 64bdb90..a961a17 100644 --- a/src/locations.py +++ b/src/locations.py @@ -16,3 +16,12 @@ PSEUDONYMISED_PARQUET_PATTERN = WAVEFORM_PSEUDONYMISED_PARQUET / ( FILE_STEM_PATTERN_HASHED + ".parquet" ) + + +def make_file_name(template: str, subs: dict[str, str]): + # Don't allow the string "None" to appear in the file name if the channel is None, + # because it just looks broken. + channel_id_key = "channel_id" + if channel_id_key in subs and subs.get(channel_id_key) is None: + subs[channel_id_key] = "noCh" + return template.format(**subs) diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 30a4ef1..8d88fa7 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,19 +1,17 @@ -import os -import sys +import json import time from datetime import datetime, timedelta, timezone from snakemake.io import glob_wildcards from exporter.ftps import do_upload from locations import ( - WAVEFORM_EXPORT_BASE, WAVEFORM_ORIGINAL_CSV, - WAVEFORM_ORIGINAL_PARQUET, WAVEFORM_SNAKEMAKE_LOGS, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, CSV_PATTERN, + make_file_name, ) from pathlib import Path from pseudon.hashing import do_hash @@ -69,7 +67,7 @@ def determine_eventual_outputs(): channel_id = channel_id, units = units ) - orig_file = Path(str(CSV_PATTERN).format(**subs_dict)) + orig_file = Path(make_file_name(str(CSV_PATTERN), subs_dict)) if csn == 'unmatched_csn': print(f"Skipping file with unmatched CSN: {orig_file}") continue @@ -77,7 +75,7 @@ def determine_eventual_outputs(): if file_age < CSV_WAIT_TIME: print(f"File too new (age={file_age}): {orig_file}") continue - final_stem = FILE_STEM_PATTERN_HASHED.format(**subs_dict) + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, subs_dict) final_output_file = WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") _all_outputs.append(final_output_file) after = time.perf_counter() @@ -107,7 +105,15 @@ rule all: def input_file_maker(wc): unhashed_csn = hash_to_csn[wc.hashed_csn] # when using input functions, snakemake doesn't do its normal templating, you have to do it, hence the f-string - return WAVEFORM_ORIGINAL_CSV / f"{wc.date}.{unhashed_csn}.{wc.variable_id}.{wc.channel_id}.{wc.units}.csv" + subs_dict = dict( + date=wc.date, + csn=unhashed_csn, + variable_id=wc.variable_id, + channel_id=wc.channel_id, + units=wc.units, + ) + stem = make_file_name(FILE_STEM_PATTERN, subs_dict) + return WAVEFORM_ORIGINAL_CSV / f"{stem}.csv" rule csv_to_parquet: input: @@ -142,9 +148,6 @@ rule send_ftps: log: WAVEFORM_FTPS_LOGS / (FILE_STEM_PATTERN_HASHED + ".ftps.log") run: - from datetime import datetime - import json - import time start_perf = time.perf_counter() start_timestamp = datetime.now(timezone.utc).isoformat() logger = configure_file_logging(log[0]) diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 0a97e3b..87b24d6 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -4,20 +4,19 @@ @pytest.mark.parametrize( - "units, expected_filename", + "units, variable_id, channel_id, expected_filename", [ - ("uV", "2025-01-01.12345678.11.3.uV.csv"), - ("mL/s", "2025-01-01.12345678.11.3.mLps.csv"), - ("%", "2025-01-01.12345678.11.3.percent.csv"), + ("uV", "11", "3", "2025-01-01.12345678.11.3.uV.csv"), + ("uV", "12", None, "2025-01-01.12345678.12.noCh.uV.csv"), + ("mL/s", "11", "3", "2025-01-01.12345678.11.3.mLps.csv"), + ("%", "11", "3", "2025-01-01.12345678.11.3.percent.csv"), ], ) -def test_create_file_name_handles_units(units, expected_filename, tmp_path): - source_variable_id = "11" - source_channel_id = "3" +def test_create_file_name_handles_units(units, variable_id, channel_id, expected_filename, tmp_path): observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc) csn = "12345678" - filename = create_file_name(source_variable_id, source_channel_id, observationTime, csn, units) + filename = create_file_name(variable_id, channel_id, observationTime, csn, units) assert filename == expected_filename From 676548807bb3e314bb20610779a0627afa1f5985 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 28 Jan 2026 10:26:22 +0000 Subject: [PATCH 3/3] Linting fixes --- src/csv_writer.py | 7 +++++-- src/pseudon/pseudon.py | 29 +++++++++++++++++++++++++---- tests/test_file_writer.py | 4 +++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/csv_writer.py b/src/csv_writer.py index e1333df..92c284e 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -7,8 +7,11 @@ def create_file_name( - source_variable_id: str, source_channel_id: str, - observation_time: datetime, csn: str, units: str + source_variable_id: str, + source_channel_id: str, + observation_time: datetime, + csn: str, + units: str, ) -> str: """Create a unique file name based on the patient contact serial number (csn) the date, and the source system.""" diff --git a/src/pseudon/pseudon.py b/src/pseudon/pseudon.py index 57f29bf..8e5e907 100644 --- a/src/pseudon/pseudon.py +++ b/src/pseudon/pseudon.py @@ -25,7 +25,13 @@ def pseudon_cli(): def csv_to_parquets( - *, date_str: str, original_csn: str, hashed_csn: str, variable_id: str, channel_id: str, units: str + *, + date_str: str, + original_csn: str, + hashed_csn: str, + variable_id: str, + channel_id: str, + units: str, ) -> None: """Convert CSV data (with full identifiers) to two versions in parquet format: @@ -47,7 +53,11 @@ def csv_to_parquets( csv_path = Path( str(CSV_PATTERN).format( - date=date_str, csn=original_csn, variable_id=variable_id, channel_id=channel_id, units=units + date=date_str, + csn=original_csn, + variable_id=variable_id, + channel_id=channel_id, + units=units, ) ) # it's in the csv_path, but at least nowhere else! @@ -122,7 +132,11 @@ def parse_array(x): hashed_path = Path( str(PSEUDONYMISED_PARQUET_PATTERN).format( - date=date_str, hashed_csn=hashed_csn, variable_id=variable_id, channel_id=channel_id, units=units + date=date_str, + hashed_csn=hashed_csn, + variable_id=variable_id, + channel_id=channel_id, + units=units, ) ) pq.write_table( @@ -138,7 +152,14 @@ def parse_array(x): ) -SAFE_COLUMNS = ["sampling_rate", "source_variable_id", "source_channel_id", "timestamp", "units", "values"] +SAFE_COLUMNS = [ + "sampling_rate", + "source_variable_id", + "source_channel_id", + "timestamp", + "units", + "values", +] def pseudonymise_relevant_columns(df: pd.DataFrame): diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 87b24d6..6b5b923 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -12,7 +12,9 @@ ("%", "11", "3", "2025-01-01.12345678.11.3.percent.csv"), ], ) -def test_create_file_name_handles_units(units, variable_id, channel_id, expected_filename, tmp_path): +def test_create_file_name_handles_units( + units, variable_id, channel_id, expected_filename, tmp_path +): observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc) csn = "12345678"