I have a code that converts 1D ECG data to 2D ECG image. The 1D data is 9 second long with a sampling rate of 500Hz. The 1D ECG data can have "n" number of channels which produces "n" number of images. So if I have a 2 channel ECG data of 9 second (shape(2,4500) will it produce 2 images and get stored in a tensor/array with the shape of (2,400,4500) (400 is just a height I have used). The thing I am struggling with now is to load the dataset and use it in model.fit() Here is the code that converts the data and stores it in a tfrecord file (its not ideal conversion but had to do some compromises because of hardware)
def create_gray_images(arr, im_size=(224, 224), display=False):
# Check input shape
assert arr.shape[0] > 0 and arr.shape[1] == 4500, "Input array must have shape (num_channels, 4500)"
images = []
# Loop over channels and create grayscale image for each
for i in range(arr.shape[0]):
# Extract channel data
channel = arr[i]
channel = (channel - channel.min()) / (channel.max() - channel.min()) * 255
channel = channel.astype(np.uint8)
# Create a 2D numpy array for the image
img = np.zeros((400, channel.shape[0]), dtype=np.uint8)
# Set the pixel values in each row of the array
for i in range(channel.shape[0]):
img[:channel[i], i] = 255
# Add to list of images
images.append(img)
# Display image in notebook
if display:
plt.plot(channel)
plt.imshow(img, cmap='gray')
plt.title(f'Channel {i+1}')
plt.show()
# Convert list of NumPy arrays to a TensorFlow tensor
images_tensor = tf.convert_to_tensor(np.array(images))
return images_tensor
To iterate through my 1D dataset I use this code:
ef save_greyscale_img_set_to_disk(data, labels, dataset_type, im_size=(128,128), remove_old=True):
dataset_types = ['train', 'test', 'val']
if dataset_type not in dataset_types:
raise ValueError(f"{dataset_type} is not in the dataset_types: {dataset_types}!")
if not os.path.exists('/data/greyscale'):
os.makedirs('/data/greyscale')
print(f"Created directory '/data/greyscale'.")
else:
print(f"Directory '/data/greyscale' already exists.")
base_file_location = '/data/greyscale'
mlb = model_config ["mlb"]
n_channels = len(TRAINING_CHANNELS)
sample_len = model_config ["MODEL_SAMPLE_WIDTH"]
locations = data.set_index ('id')["location"].to_dict()
list_IDs = list (data ["id"])
labels = labels.set_index ('id')["labels"].to_dict() #labels
tfrecord_filename = os.path.join(base_file_location, dataset_type + ".tfrecord")
if os.path.exists(tfrecord_filename) and remove_old:
os.remove(tfrecord_filename)
with tf.io.TFRecordWriter(tfrecord_filename) as writer:
for i, ID in tqdm(enumerate(list_IDs), total=len(list_IDs), desc=f"Saving {dataset_type} dataset"):
# Store sample
# Load the samples for record "ID"
filename = locations [ID] + ID + ".pkl"
with open (filename, "rb") as f:
ecg1d = pickle.load (f)
img_tensor = create_gray_images(ecg1d, im_size=(400,4500))
labelvec = mlb.transform ([labels [ID]])[0]
feature = {
'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor(img_tensor).numpy()])),
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=labelvec))
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(example.SerializeToString())
I have created a code that reads the tfrecord file and displays the images and prints the labels to check that it is stored correctly. Here is that code if interested (this works as intended):
def read_images_from_tfrecord(filename, num_images):
images = []
labels = []
# Define the feature description for parsing the tfrecord
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([6], tf.int64)
}
# Open the tfrecord file and parse the features
dataset = tf.data.TFRecordDataset([filename])
dataset = dataset.take(num_images)
for serialized_example in dataset:
example = tf.io.parse_single_example(serialized_example, feature_description)
# Decode the image and label
img_bytes = example['image']
label = example['label']
img_tensor = tf.io.parse_tensor(img_bytes, out_type=tf.uint8)
img = np.array(img_tensor)
# Add the image and label to the list
images.append(img)
labels.append(label)
# Convert the list of images and labels to NumPy arrays
images = np.array(images)
labels = np.array(labels)
fig, axs = plt.subplots(num_images, 2, figsize=(10, 5*num_images))
for i in range(num_images):
img = images[i]
print(img.shape)
if len(img.shape) == 3: # Two-channel ECG image
axs[i, 0].imshow(img[0], cmap='gray')
axs[i, 1].imshow(img[1], cmap='gray')
else: # Single-channel image
axs[i, 0].imshow(img, cmap='gray')
axs[i, 1].axis('off')
axs[i, 0].set_title('Channel 1')
axs[i, 1].set_title('Channel 2')
axs[i, 0].set_xticks([])
axs[i, 0].set_yticks([])
axs[i, 1].set_xticks([])
axs[i, 1].set_yticks([])
print(f'The label: {labels[i]}')
plt.tight_layout()
plt.show()
return images, labels
I have tried to do something similar when loading the entire training dataset. By removing the plotting and the number of images. But this does not work, which I think is because the .take returns a different object (tensorflow.python.data.ops.dataset_ops.TakeDataset'') while tf.data.TFRecordDataset produces tensorflow.python.data.ops.readers.TFRecordDatasetV2' but it does not work.
What I want to have is something like this:
train_dataset = load_dataset('pathtotraindataset')
val_dataset = load_dataset('pathtovaldataset')
model.fit(x = train_dataset, validation_data=val_dataset)
or this is also fine:
x_train, y_train = load_dataset('pathtotraindataset')
x_val, y_val = load_dataset('pathtovaldataset')
model.fit(x = x_train, y=y_train, validation_data=(x_val,y_val))