Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Top-level Snakemake run logs, including:
Unlike data files, the timestamps in these file names are when the snakemake
pipeline was invoked.

### `{date}.{hashed_csn}.{stream_id}.{units}.log`
### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.log`
Job-level log for the `csv_to_parquet` rule. Contains:
- CSV -> parquet info
- pseudonymisation steps
Expand All @@ -90,12 +90,12 @@ Job-level log for the `csv_to_parquet` rule. Contains:

Produced under `waveform-export/ftps-logs/`.

### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.log`
### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.ftps.log`
Job-level FTPS upload logs. Useful for:
- connection/authentication errors
- transfer failures

### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.uploaded.json`
### `{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}.ftps.uploaded.json`
Upload marker file (aka sentinel) written after a successful transfer.
It contains, in JSON format:
- `uploaded_file` (the uploaded file path)
Expand Down
6 changes: 4 additions & 2 deletions src/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def waveform_callback(ch, method_frame, _header_frame, body):
try:
location_string = data["mappedLocationString"]
observation_timestamp = data["observationTime"]
source_stream_id = data["sourceStreamId"]
source_variable_id = data["sourceVariableId"]
source_channel_id = data["sourceChannelId"]
sampling_rate = data["samplingRate"]
units = data["unit"]
waveform_data = data["numericValues"]
Expand Down Expand Up @@ -79,7 +80,8 @@ def waveform_callback(ch, method_frame, _header_frame, body):

if writer.write_frame(
waveform_data,
source_stream_id,
source_variable_id,
source_channel_id,
observation_timestamp,
units,
sampling_rate,
Expand Down
29 changes: 22 additions & 7 deletions src/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,37 @@

import csv
from datetime import datetime
from locations import WAVEFORM_ORIGINAL_CSV

from locations import WAVEFORM_ORIGINAL_CSV, make_file_name, FILE_STEM_PATTERN


def create_file_name(
source_stream_id: str, observation_time: datetime, csn: str, units: str
source_variable_id: str,
source_channel_id: str,
observation_time: datetime,
csn: str,
units: str,
) -> str:
"""Create a unique file name based on the patient contact serial number (csn) the
date, and the source system."""
datestring = observation_time.strftime("%Y-%m-%d")
units = units.replace("/", "p")
units = units.replace("%", "percent")
return f"{datestring}.{csn}.{source_stream_id}.{units}.csv"
subs_dict = dict(
date=datestring,
csn=csn,
variable_id=source_variable_id,
channel_id=source_channel_id,
units=units,
)
stem = make_file_name(FILE_STEM_PATTERN, subs_dict)
return f"{stem}.csv"


def write_frame(
waveform_data: dict,
source_stream_id: str,
source_variable_id: str,
source_channel_id: str,
observation_timestamp: float,
units: str,
sampling_rate: int,
Expand All @@ -35,14 +49,14 @@ def write_frame(
WAVEFORM_ORIGINAL_CSV.mkdir(exist_ok=True, parents=False)

filename = WAVEFORM_ORIGINAL_CSV / create_file_name(
source_stream_id, observation_datetime, csn, units
source_variable_id, source_channel_id, observation_datetime, csn, units
)

# write header if is new file
if not filename.exists():
with open(filename, "w") as fileout:
fileout.write(
"csn,mrn,source_stream_id,units,sampling_rate,timestamp,location,values\n"
"csn,mrn,source_variable_id,source_channel_id,units,sampling_rate,timestamp,location,values\n"
)

with open(filename, "a") as fileout:
Expand All @@ -53,7 +67,8 @@ def write_frame(
[
csn,
mrn,
source_stream_id,
source_variable_id,
source_channel_id,
units,
sampling_rate,
observation_timestamp,
Expand Down
13 changes: 11 additions & 2 deletions src/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@


# file patterns
FILE_STEM_PATTERN = "{date}.{csn}.{stream_id}.{units}"
FILE_STEM_PATTERN_HASHED = "{date}.{hashed_csn}.{stream_id}.{units}"
FILE_STEM_PATTERN = "{date}.{csn}.{variable_id}.{channel_id}.{units}"
FILE_STEM_PATTERN_HASHED = "{date}.{hashed_csn}.{variable_id}.{channel_id}.{units}"
CSV_PATTERN = WAVEFORM_ORIGINAL_CSV / (FILE_STEM_PATTERN + ".csv")
ORIGINAL_PARQUET_PATTERN = WAVEFORM_ORIGINAL_PARQUET / (FILE_STEM_PATTERN + ".parquet")
PSEUDONYMISED_PARQUET_PATTERN = WAVEFORM_PSEUDONYMISED_PARQUET / (
FILE_STEM_PATTERN_HASHED + ".parquet"
)


def make_file_name(template: str, subs: dict[str, str]):
# Don't allow the string "None" to appear in the file name if the channel is None,
# because it just looks broken.
channel_id_key = "channel_id"
if channel_id_key in subs and subs.get(channel_id_key) is None:
subs[channel_id_key] = "noCh"
return template.format(**subs)
32 changes: 19 additions & 13 deletions src/pipeline/Snakefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import os
import sys
import json
import time
from datetime import datetime, timedelta, timezone
from snakemake.io import glob_wildcards
from exporter.ftps import do_upload
from locations import (
WAVEFORM_EXPORT_BASE,
WAVEFORM_ORIGINAL_CSV,
WAVEFORM_ORIGINAL_PARQUET,
WAVEFORM_SNAKEMAKE_LOGS,
WAVEFORM_PSEUDONYMISED_PARQUET,
WAVEFORM_FTPS_LOGS,
FILE_STEM_PATTERN,
FILE_STEM_PATTERN_HASHED,
CSV_PATTERN,
make_file_name,
)
from pathlib import Path
from pseudon.hashing import do_hash
Expand Down Expand Up @@ -59,23 +57,25 @@ def determine_eventual_outputs():
_hash_to_csn[hash_csn(csn)] = csn
# Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems
_all_outputs = []
for date, csn, stream_id, units in zip(all_wc.date, all_wc.csn, all_wc.stream_id, all_wc.units):
for date, csn, variable_id, channel_id, units \
in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units):
subs_dict = dict(
date = date,
csn = csn,
hashed_csn = hash_csn(csn),
stream_id = stream_id,
variable_id = variable_id,
channel_id = channel_id,
units = units
)
orig_file = Path(str(CSV_PATTERN).format(**subs_dict))
orig_file = Path(make_file_name(str(CSV_PATTERN), subs_dict))
if csn == 'unmatched_csn':
print(f"Skipping file with unmatched CSN: {orig_file}")
continue
file_age = get_file_age(orig_file)
if file_age < CSV_WAIT_TIME:
print(f"File too new (age={file_age}): {orig_file}")
continue
final_stem = FILE_STEM_PATTERN_HASHED.format(**subs_dict)
final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, subs_dict)
final_output_file = WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json")
_all_outputs.append(final_output_file)
after = time.perf_counter()
Expand Down Expand Up @@ -105,7 +105,15 @@ rule all:
def input_file_maker(wc):
unhashed_csn = hash_to_csn[wc.hashed_csn]
# when using input functions, snakemake doesn't do its normal templating, you have to do it, hence the f-string
return WAVEFORM_ORIGINAL_CSV / f"{wc.date}.{unhashed_csn}.{wc.stream_id}.{wc.units}.csv"
subs_dict = dict(
date=wc.date,
csn=unhashed_csn,
variable_id=wc.variable_id,
channel_id=wc.channel_id,
units=wc.units,
)
stem = make_file_name(FILE_STEM_PATTERN, subs_dict)
return WAVEFORM_ORIGINAL_CSV / f"{stem}.csv"

rule csv_to_parquet:
input:
Expand All @@ -126,7 +134,8 @@ rule csv_to_parquet:
date_str=wildcards.date,
original_csn=original_csn,
hashed_csn=wildcards.hashed_csn,
variable_id=wildcards.stream_id,
variable_id=wildcards.variable_id,
channel_id=wildcards.channel_id,
units=wildcards.units)


Expand All @@ -139,9 +148,6 @@ rule send_ftps:
log:
WAVEFORM_FTPS_LOGS / (FILE_STEM_PATTERN_HASHED + ".ftps.log")
run:
from datetime import datetime
import json
import time
start_perf = time.perf_counter()
start_timestamp = datetime.now(timezone.utc).isoformat()
logger = configure_file_logging(log[0])
Expand Down
35 changes: 29 additions & 6 deletions src/pseudon/pseudon.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ def pseudon_cli():


def csv_to_parquets(
*, date_str: str, original_csn: str, hashed_csn: str, variable_id: str, units: str
*,
date_str: str,
original_csn: str,
hashed_csn: str,
variable_id: str,
channel_id: str,
units: str,
) -> None:
"""Convert CSV data (with full identifiers) to two versions in parquet
format:
Expand All @@ -47,7 +53,11 @@ def csv_to_parquets(

csv_path = Path(
str(CSV_PATTERN).format(
date=date_str, csn=original_csn, stream_id=variable_id, units=units
date=date_str,
csn=original_csn,
variable_id=variable_id,
channel_id=channel_id,
units=units,
)
)
# it's in the csv_path, but at least nowhere else!
Expand All @@ -61,7 +71,8 @@ def csv_to_parquets(
dtype={
"csn": str,
"mrn": str,
"source_stream_id": str,
"source_variable_id": str,
"source_channel_id": str,
"units": str,
"sampling_rate": int,
"timestamp": float,
Expand All @@ -83,7 +94,8 @@ def parse_array(x):
[
("csn", pa.string()),
("mrn", pa.string()),
("source_stream_id", pa.string()),
("source_variable_id", pa.string()),
("source_channel_id", pa.string()),
("units", pa.string()),
("sampling_rate", pa.int32()),
("timestamp", pa.float64()),
Expand Down Expand Up @@ -120,7 +132,11 @@ def parse_array(x):

hashed_path = Path(
str(PSEUDONYMISED_PARQUET_PATTERN).format(
date=date_str, hashed_csn=hashed_csn, stream_id=variable_id, units=units
date=date_str,
hashed_csn=hashed_csn,
variable_id=variable_id,
channel_id=channel_id,
units=units,
)
)
pq.write_table(
Expand All @@ -136,7 +152,14 @@ def parse_array(x):
)


SAFE_COLUMNS = ["sampling_rate", "source_stream_id", "timestamp", "units", "values"]
SAFE_COLUMNS = [
"sampling_rate",
"source_variable_id",
"source_channel_id",
"timestamp",
"units",
"values",
]


def pseudonymise_relevant_columns(df: pd.DataFrame):
Expand Down
16 changes: 9 additions & 7 deletions tests/test_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@


@pytest.mark.parametrize(
"units, expected_filename",
"units, variable_id, channel_id, expected_filename",
[
("uV", "2025-01-01.12345678.11.uV.csv"),
("mL/s", "2025-01-01.12345678.11.mLps.csv"),
("%", "2025-01-01.12345678.11.percent.csv"),
("uV", "11", "3", "2025-01-01.12345678.11.3.uV.csv"),
("uV", "12", None, "2025-01-01.12345678.12.noCh.uV.csv"),
("mL/s", "11", "3", "2025-01-01.12345678.11.3.mLps.csv"),
("%", "11", "3", "2025-01-01.12345678.11.3.percent.csv"),
],
)
def test_create_file_name_handles_units(units, expected_filename, tmp_path):
sourceStreamId = "11"
def test_create_file_name_handles_units(
units, variable_id, channel_id, expected_filename, tmp_path
):
observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc)
csn = "12345678"

filename = create_file_name(sourceStreamId, observationTime, csn, units)
filename = create_file_name(variable_id, channel_id, observationTime, csn, units)

assert filename == expected_filename

Expand Down