import h5py
import numpy as np
from scipy.stats import rankdata
from exm.args import Args
from exm.utils.log import configure_logger
logger = configure_logger('ExSeq-Toolbox')
# TODO evaluate the benefits of this step
[docs]
def quantile_normalization(args: Args, code: int, fov: int) -> None:
r"""
Applies quantile normalization to the volumes aligned .h5 file.
:param args: Configuration options. This should be an instance of the Args class.
:type args: Args
:param code: The code of the volume to be normalized.
:type code: int
:param fov: The field of view of the volume to be normalized.
:type fov: int
"""
logger.warn("This function `quantile_normalization` is experimental.")
try:
logger.info(
f"Starting quantile normalization for Code: {code}, FOV: {fov}")
channels = args.channel_names[:-1]
with h5py.File(args.h5_path.format(code, fov), "r") as f:
volumes = [f[channel][()] for channel in channels]
flattened_volumes = np.concatenate(
[vol.ravel() for vol in volumes]).reshape(-1, len(channels))
sorted_volumes = np.sort(flattened_volumes, axis=0)
mean_volumes = np.mean(sorted_volumes, axis=1)
rank_volumes = np.empty(flattened_volumes.shape, dtype=int)
for i in range(flattened_volumes.shape[1]):
rank_volumes[:, i] = rankdata(
flattened_volumes[:, i], method='min')
normalized_volumes = mean_volumes[rank_volumes - 1]
# Reshape back to original shape
reshaped_volumes = normalized_volumes.reshape(
len(channels), *volumes[0].shape)
# Split into separate volumes
separate_volumes = np.split(reshaped_volumes, len(channels), axis=0)
for vol, channel in zip(separate_volumes, channels):
vol = np.squeeze(vol)
channel = channel + '_norm'
with h5py.File(args.h5_path.format(code, fov), "a") as f:
if channel in f.keys():
del f[channel]
f.create_dataset(channel, vol.shape, dtype=vol.dtype, data=vol)
logger.info(
f"Quantile normalization complete for Code: {code}, FOV: {fov}")
except Exception as e:
print(f"Error occurred while applying quantile normalization: {e}")
raise
from exm.io.io import nd2ToVol
#TODO Fine-tune alignment parameter
# confirm a shift in the original data
[docs]
def algin_channels_function(args, tasks_queue, q_lock):
r"""
Applies alignment between other channels and DAPI within the same round and fov.
"""
import queue
import multiprocessing
from bigstream.transform import apply_transform
from bigstream.align import affine_align, alignment_pipeline
while True: # Check for remaining task in the Queue
try:
with q_lock:
code, fov = tasks_queue.get_nowait()
logger.info(
f"Remaining tasks to process : {tasks_queue.qsize()}")
except queue.Empty:
logger.info(f"{multiprocessing.current_process().name}: Done")
break
except Exception as e:
logger.error(f"Error fetching task from queue: {e}")
break
else:
# with h5py.File(args.h5_path.format(code, fov), "r") as f:
# fix_volume = f['561'][()]
def top_percent_threshold(image: np.ndarray, percent: int) -> np.ndarray:
threshold_value = np.percentile(image, 100 - percent)
output_image = np.copy(image)
output_image[output_image < threshold_value] = 0
return output_image
rigid_kwargs = { 'metric' : 'MMI',
'optimizer':'LBFGSB',
'alignment_spacing': 0.5,
'shrink_factors': (8, 4, 2, 1),
'smooth_sigmas': (0., 0., 0., 0.),
'optimizer_args': {
'gradientConvergenceTolerance': 1e-6, # was 1e-5
'numberOfIterations': 1000, # was 500
'maximumNumberOfCorrections': 10, # was 5
'maximumNumberOfFunctionEvaluations': 5000, # was 2000
'costFunctionConvergenceFactor': 1e+6, # was 1e+7
},
}
spacing = [0.40, 0.1625, 0.1625]
init_mat = np.eye(4)
fix_volume = nd2ToVol(
args.nd2_path.format(code, '594', 1), fov, '594')
mov_volume = nd2ToVol(
args.nd2_path.format(code, '640', 0), fov, '640')
affine_1 = affine_align(
top_percent_threshold(fix_volume,0.1), top_percent_threshold(mov_volume,0.1),
spacing, spacing,
initial_condition=init_mat,
# rigid=True,
**rigid_kwargs
)
np.savetxt(args.tform_path.format(
code, f"fov{fov}_640_affine.mat"), affine_1)
aligned_vol_640_1 = apply_transform(
fix_volume, mov_volume,
spacing, spacing,
transform_list=[affine_1,],)
fix_volume = nd2ToVol(
args.nd2_path.format(code, '561', 2), fov, '561')
mov_volume = nd2ToVol(
args.nd2_path.format(code, '594', 1), fov, '594')
affine_2 = affine_align(
top_percent_threshold(fix_volume,0.1), top_percent_threshold(mov_volume,0.1),
spacing, spacing,
initial_condition=init_mat,
# rigid=True,
**rigid_kwargs
)
np.savetxt(args.tform_path.format(
code, f"fov{fov}_594_affine.mat"), affine_2)
aligned_vol_640_2 = apply_transform(
fix_volume, aligned_vol_640_1,
spacing, spacing,
transform_list=[affine_2,],)
del aligned_vol_640_1
aligned_vol_594_1 = apply_transform(
fix_volume, mov_volume,
spacing, spacing,
transform_list=[affine_2,],)
del fix_volume
del mov_volume
with h5py.File(args.h5_path.format(code, fov), "a") as f:
aligned_channel_1 = '640'
aligned_channel_2 = '594'
if '488_align' in f.keys():
del f['488_align']
del f['561_align']
del f['640_align']
del f['594_align']
if aligned_channel_1 in f.keys():
del f[aligned_channel_1]
if aligned_channel_2 in f.keys():
del f[aligned_channel_2]
f.create_dataset(
aligned_channel_1, aligned_vol_640_2.shape, dtype=aligned_vol_640_2.dtype, data=aligned_vol_640_2)
f.create_dataset(
aligned_channel_2, aligned_vol_594_1.shape, dtype=aligned_vol_594_1.dtype, data=aligned_vol_594_1)
for i,channel in enumerate(['561','488','405']):
if channel in f.keys():
del f[channel]
volume = nd2ToVol(
args.nd2_path.format(code, channel, i+2), fov, channel)
f.create_dataset(
channel, volume.shape, dtype=volume.dtype, data=volume)
[docs]
def algin_channels(args: Args,
code_fov_pairs,
parallel_processes: int = 1) -> None:
r"""
Applies alignment between other channels and DAPI within the same round and fov.
"""
logger.warn("This function `algin_channels` is experimental.")
import multiprocessing
child_processes = []
tasks_queue = multiprocessing.Queue()
q_lock = multiprocessing.Lock()
if not code_fov_pairs:
code_fov_pairs = [[round_val, roi_val]
for round_val in args.codes for roi_val in args.fovs]
for round, roi in code_fov_pairs:
tasks_queue.put((round, roi))
for w in range(int(parallel_processes)):
p = multiprocessing.Process(
target=algin_channels_function, args=(args, tasks_queue, q_lock))
child_processes.append(p)
p.start()
for p in child_processes:
p.join()
#TODO fine-tune foreground_segmentation parameter
# test possiblity to train a u-net for each speices
[docs]
def foreground_segmentation(args):
r"""
Applies forground_segmentation on code 0 DAPI for each fov.
"""
import numpy as np
from exm.io.io import nd2ToVol
from scipy import ndimage
from bigstream import level_set
import tifffile
import os
logger.warn("This function `forground_segmentation` is experimental.")
def histogram_stretch(image, min_out=0, max_out=255):
# Calculate the minimum and maximum pixel values in the image
min_val = np.min(image)
max_val = np.max(image)
# Perform histogram stretching
adjusted_image = ((image - min_val) / (max_val - min_val)
* (max_out - min_out) + min_out).astype(np.int8)
return adjusted_image
for fov in args.fovs:
vol = nd2ToVol(args.nd2_path.format(0,'405', 4), fov)
vol = histogram_stretch(vol, min_out=0, max_out=255)
vol_cont = ndimage.zoom(vol, (1, 0.1, 0.1), order=1)
noise = level_set.estimate_background(
vol_cont.astype(np.uint8), int(vol_cont.shape[1]/20))
mask = level_set.foreground_segmentation(
vol_cont.astype(np.uint8),
voxel_spacing = np.array([1,1,1]),
mask_smoothing=2,
iterations=[80, 40, 10],
smooth_sigmas=[12, 6, 3],
lambda2=1.0,
background=noise,
)
vol_cont = ndimage.zoom(mask, (1, 10, 10), order=0)
tifffile.imwrite(os.path.join(args.puncta_path,'fg_masks',f'fov{fov}.tif'), mask)
#TODO fine-tune foreground_segmentation parameter
# test possiblity to train a u-net for each speices
[docs]
def foreground_segmentation_new(args):
r"""
Applies forground_segmentation on code 0 DAPI for each fov.
"""
import numpy as np
from exm.io.io import nd2ToVol
from scipy import ndimage
from bigstream import level_set
import tifffile
import os
from skimage import morphology
logger.warn("This function `foreground_segmentation_new` is experimental.")
for fov in args.fovs:
volume = nd2ToVol(args.nd2_path.format(0,'405', 4), fov)
vol = ndimage.zoom(volume,(0.1,0.1,0.1),order=1)
noise = level_set.estimate_background(vol.astype(np.uint8),int(vol.shape[1]/20))
skip_spacing = np.array([1,1,1])
fg_mask = level_set.foreground_segmentation(
vol.astype(np.uint8), skip_spacing,
shrink_factors = [4,2,1],
mask_smoothing=2,
iterations=[80,40,10],
smooth_sigmas=[12,6,3],
lambda2=1.0,
background = noise,
)
for _ in range(10):
fg_mask = morphology.dilation(fg_mask)
upsample_factor = np.array(volume.shape) / np.array(fg_mask.shape)
fg_mask = ndimage.zoom(fg_mask,upsample_factor,order=1)
tifffile.imwrite(os.path.join(args.puncta_path,'fg_masks',f'fov{fov}.tif'), fg_mask)
import os
from bigstream.align import affine_align, alignment_pipeline
#TODO Fine-tune alignment parameter
# confirm a shift in the original data
[docs]
def algin_channels_function_new(args, tasks_queue, q_lock):
r"""
Applies alignment between other channels and DAPI within the same round and fov.
"""
import queue
import multiprocessing
from bigstream.transform import apply_transform
from bigstream.align import affine_align, alignment_pipeline
while True: # Check for remaining task in the Queue
try:
with q_lock:
code, fov = tasks_queue.get_nowait()
logger.info(
f"Remaining tasks to process : {tasks_queue.qsize()}")
except queue.Empty:
logger.info(f"{multiprocessing.current_process().name}: Done")
break
except Exception as e:
logger.error(f"Error fetching task from queue: {e}")
break
else:
def top_percent_threshold(image: np.ndarray, percent: int) -> np.ndarray:
threshold_value = np.percentile(image, 100 - percent)
output_image = np.copy(image)
output_image[output_image < threshold_value] = 0
return output_image
spacing = np.array([0.40, 0.1625, 0.1625])
init_mat = np.eye(4)
# h5_path = os.path.join(args.processed_data_path, "code{}/{}.h5")
with h5py.File(args.h5_path.format(code,fov),'r') as f:
fix_volume = f['594'][()]
with h5py.File(args.h5_path.format(code,fov),'r') as f:
mov_volume = f['640'][()]
rigid_kwargs = { 'metric' : 'D',
'optimizer':'LBFGSB',
'alignment_spacing': 0.5,
'shrink_factors': (4, 2, 1),
'smooth_sigmas': (1.,1.,1.),
'initial_condition':'CENTER',
'optimizer_args': {
'gradientConvergenceTolerance': 1e-6, # was 1e-5
'numberOfIterations': 1000, # was 500
'maximumNumberOfCorrections': 10, # was 5
'maximumNumberOfFunctionEvaluations': 5000, # was 2000
'costFunctionConvergenceFactor': 1e+6, # was 1e+7
},
}
affine_1 = affine_align(
fix_volume, mov_volume,
spacing, spacing,
rigid=True,
**rigid_kwargs
)
if np.array_equal(affine_1, init_mat):
logger.error(f" Align 640 -> 594 code{code} fov{fov} failed")
# np.savetxt(args.tform_path.format(
# code, f"fov{fov}_640_affine.mat"), affine_1)
aligned_vol_640_1 = apply_transform(
fix_volume, mov_volume,
spacing, spacing,
transform_list=[affine_1,],)
with h5py.File(args.h5_path.format(code,fov),'r') as f:
fix_volume = f['561'][()]
with h5py.File(args.h5_path.format(code,fov),'r') as f:
mov_volume = f['594'][()]
affine_2 = affine_align(
fix_volume, mov_volume,
spacing, spacing,
rigid=True,
**rigid_kwargs
)
if np.array_equal(affine_2, init_mat):
logger.error(f" Align 594 -> 561 code{code} fov{fov} failed")
# np.savetxt(args.tform_path.format(
# code, f"fov{fov}_594_affine.mat"), affine_2)
aligned_vol_640_2 = apply_transform(
fix_volume, aligned_vol_640_1,
spacing, spacing,
transform_list=[affine_2,],)
del aligned_vol_640_1
aligned_vol_594_1 = apply_transform(
fix_volume, mov_volume,
spacing, spacing,
transform_list=[affine_2,],)
del fix_volume
del mov_volume
with h5py.File(args.h5_path.format(code, fov), "a") as f:
aligned_channel_1 = '640'
aligned_channel_2 = '594'
if aligned_channel_1 in f.keys():
del f[aligned_channel_1]
if aligned_channel_2 in f.keys():
del f[aligned_channel_2]
f.create_dataset(
aligned_channel_1, aligned_vol_640_2.shape, dtype=aligned_vol_640_2.dtype, data=aligned_vol_640_2)
f.create_dataset(
aligned_channel_2, aligned_vol_594_1.shape, dtype=aligned_vol_594_1.dtype, data=aligned_vol_594_1)
# with h5py.File(args.h5_path.format(code,fov),'r') as f:
# for i,channel in enumerate(['561','488','405']):
# volume = f[channel][()]
# with h5py.File(args.h5_path.format(code, fov), "a") as x:
# x.create_dataset(
# channel, volume.shape, dtype=volume.dtype, data=volume)
[docs]
def algin_channels_new(args: Args,
code_fov_pairs,
parallel_processes: int = 1) -> None:
r"""
Applies alignment between other channels and DAPI within the same round and fov.
"""
logger.warn("This function `algin_channels_new` is experimental.")
import multiprocessing
child_processes = []
tasks_queue = multiprocessing.Queue()
q_lock = multiprocessing.Lock()
if not code_fov_pairs:
code_fov_pairs = [[round_val, roi_val]
for round_val in args.codes for roi_val in args.fovs]
for round, roi in code_fov_pairs:
tasks_queue.put((round, roi))
for w in range(int(parallel_processes)):
p = multiprocessing.Process(
target=algin_channels_function_new, args=(args, tasks_queue, q_lock))
child_processes.append(p)
p.start()
for p in child_processes:
p.join()
from typing import List, Tuple, Optional, Dict, Set
[docs]
def blend_ind(offsets: np.ndarray, pictures: List[np.ndarray],
indices: Optional[List[int]] = None, inverts: Optional[List[int]] = None) -> np.ndarray:
"""
Blends a list of volume tiles together according to specified offsets, accounting for different z-stack sizes and starting points.
:param offsets: An array of offsets for each image tile.
:param pictures: A list of image tiles to blend.
:param indices: Optional list of indices specifying the order in which to blend the images.
:param inverts: Optional list of axes along which to invert the corresponding image tiles.
:return: The blended image.
"""
if not pictures:
raise ValueError("The list of pictures must not be empty.")
if len(offsets) != len(pictures):
raise ValueError("The number of offsets must match the number of pictures.")
try:
# Determine the overall Z-dimension range
min_z = int(min(offset[2] for offset in offsets))
max_z = int(max(offset[2] + tile.shape[0] for offset, tile in zip(offsets, pictures)))
total_z_range = max_z - min_z
min_y = int(min(offset[1] for offset in offsets))
max_y = int(max(offset[1] + tile.shape[1] for offset, tile in zip(offsets, pictures)))
total_y_range = max_y - min_y
min_x = int(min(offset[0] for offset in offsets))
max_x = int(max(offset[0] + tile.shape[2] for offset, tile in zip(offsets, pictures)))
total_x_range = max_x - min_x
# Initialize newpic with the total Z-dimension range
newpic_shape = (total_z_range,total_y_range, total_x_range)
newpic = np.zeros(newpic_shape, dtype=pictures[0].dtype)
print(newpic.shape)
# Process each tile
for off, tile in zip(offsets, pictures):
start_z = int(off[2] - min_z)
end_z = int(start_z + tile.shape[0])
start_y = int(off[1] - min_y)
end_y = int(start_y + tile.shape[1])
start_x = int(off[0] - min_x)
end_x = int(start_x + tile.shape[2])
# Adjust for inverts, if applicable
if inverts:
tile = np.flip(tile, axis=inverts)
# Determine the range to update in newpic
update_range_z = slice(start_z, start_z + tile.shape[0])
update_range_y = slice(start_y, start_y + tile.shape[1])
update_range_x = slice(start_x, start_x + tile.shape[2])
# Update newpic with the current tile's data
newpic[update_range_z, update_range_y, update_range_x] = tile
return newpic
except Exception as e:
raise RuntimeError(f"Unexpected error occurred during image blending: {e}")