API pre-built

Has been a while since the last post, however I’m now able to proudly present the pre-built of the later functionality. To achieve this parts of the script were wrapped into functions, that in turn were set up such, that they can easily be ported to become a method of an object in the final API. Furthermore the example will probably soon be available via the MNE-python documentation.

All right! Let’s get into it!

from dipy.align import imaffine, imwarp, metrics, reslice, transforms

import matplotlib.pylab as plt

import mne
from mne.beamformer import make_lcmv, apply_lcmv
from mne.datasets import sample
from mne.externals.h5io import read_hdf5, write_hdf5

import nibabel as nib

from nilearn.image import index_img
from nilearn.plotting import plot_anat

import numpy as np

from os import path, makedirs

print(__doc__)

The first function will compute our example data according to the existing MNE-Python example and return a volumetric source space result of the example’s peak time:

def compute_lcmv_example_data(data_path, fname=None):
    raw_fname = data_path + '/MEG/sample/sample_audvis_raw.fif'
    event_fname = data_path + '/MEG/sample/sample_audvis_raw-eve.fif'
    fname_fwd = data_path + '/MEG/sample/sample_audvis-meg-vol-7-fwd.fif'
    # Get epochs
    event_id, tmin, tmax = [1, 2], -0.2, 0.5

    # Setup for reading the raw data
    raw = mne.io.read_raw_fif(raw_fname, preload=True)
    raw.info['bads'] = ['MEG 2443', 'EEG 053']  # 2 bad channels
    events = mne.read_events(event_fname)

    # Set up pick list: gradiometers and magnetometers, excluding bad channels
    picks = mne.pick_types(raw.info, meg=True, eeg=False, stim=True, eog=True,
                           exclude='bads')

    # Pick the channels of interest
    raw.pick_channels([raw.ch_names[pick] for pick in picks])

    # Re-normalize our empty-room projectors, so they are fine after
    # subselection
    raw.info.normalize_proj()
    # Read epochs
    proj = False  # already applied
    epochs = mne.Epochs(raw, events, event_id, tmin, tmax,
                        baseline=(None, 0), preload=True, proj=proj,
                        reject=dict(grad=4000e-13, mag=4e-12, eog=150e-6))
    evoked = epochs.average()

    # Read regularized noise covariance and compute regularized data covariance
    noise_cov = mne.compute_covariance(epochs, tmin=tmin, tmax=0,
                                       method='shrunk')
    data_cov = mne.compute_covariance(epochs, tmin=0.04, tmax=0.15,
                                      method='shrunk')
    # Read forward model
    forward = mne.read_forward_solution(fname_fwd)

    # Compute weights of free orientation (vector) beamformer with weight
    # normalization (neural activity index, NAI). Providing a noise covariance
    # matrix enables whitening of the data and forward solution. Source
    # orientation is optimized by setting pick_ori to 'max-power'.
    # weight_norm can also be set to 'unit-noise-gain'. Source orientation can
    # also be 'normal' (but only when using a surface-based source space) or
    # None, which computes a vector beamfomer. Note, however, that not all
    # combinations of orientation selection and weight normalization are
    # implemented yet.
    filters = make_lcmv(evoked.info, forward, data_cov, reg=0.05,
                        noise_cov=noise_cov, pick_ori='max-power',
                        weight_norm='nai')

    # Apply this spatial filter to the evoked data.
    stc = apply_lcmv(evoked, filters, max_ori_out='signed')

    # take absolute values for plotting
    stc.data[:, :] = np.abs(stc.data)

    # Save result in stc files if desired
    if fname is not None:
        stc.save('lcmv-vol')

    # select time window (tmin, tmax) in ms - consider changing for real data
    # scenario, since those values were chosen to optimize computation time
    stc.crop(0.087, 0.087)

    # Save result in a 4D nifti file
    img = mne.save_stc_as_volume(fname, stc, forward['src'],
                                 mri_resolution=True)

    return img

Next, we define our morph mapping that we want to compute. The function takes a Moving volume (img_m) that is morphed towards the Static volume (img_s). In general we compute the morph in two steps:

  1. we compute an affine linear transform
  2. we compute a non-linear transform on the pre-transformed image

The maximum number of iterations during the optimization can be set. niter_affine defines the number of optimization levels and the corresponding maximum number of iterations. The same applies to niter_sdr but for performing the Symmetric Diffeomorphic Registration.

As a result the function will return mapping and affine, being the respective non-linear and affine mapping objects.

def compute_morph_map(img_m, img_s=None, niter_affine=(100, 100, 10),
                      niter_sdr=(5, 5, 3)):
    # get Static to world transform
    img_s_grid2world = img_s.affine

    # output Static as ndarray
    img_s = img_s.dataobj[:, :, :]

    # normalize values
    img_s = img_s.astype('float') / img_s.max()

    # get Moving to world transform
    img_m_grid2world = img_m.affine

    # output Moving as ndarray
    img_m = img_m.dataobj[:, :, :]

    # normalize values
    img_m = img_m.astype('float') / img_m.max()

    # compute center of mass
    c_of_mass = imaffine.transform_centers_of_mass(img_s, img_s_grid2world,
                                                   img_m, img_m_grid2world)

    nbins = 32

    # set up Affine Registration
    affreg = imaffine.AffineRegistration(
        metric=imaffine.MutualInformationMetric(nbins, None),
        level_iters=list(niter_affine),
        sigmas=[3.0, 1.0, 0.0],
        factors=[4, 2, 1])

    # translation
    translation = affreg.optimize(img_s, img_m,
                                  transforms.TranslationTransform3D(), None,
                                  img_s_grid2world, img_m_grid2world,
                                  starting_affine=c_of_mass.affine)

    # rigid body transform (translation + rotation)
    rigid = affreg.optimize(img_s, img_m,
                            transforms.RigidTransform3D(), None,
                            img_s_grid2world, img_m_grid2world,
                            starting_affine=translation.affine)

    # affine transform (translation + rotation + scaling)
    affine = affreg.optimize(img_s, img_m,
                             transforms.AffineTransform3D(), None,
                             img_s_grid2world, img_m_grid2world,
                             starting_affine=rigid.affine)

    # apply affine transformation
    img_m_affine = affine.transform(img_m)

    # set up Symmetric Diffeomorphic Registration (metric, iterations)
    sdr = imwarp.SymmetricDiffeomorphicRegistration(
        metrics.CCMetric(3), list(niter_sdr))

    # compute mapping
    mapping = sdr.optimize(img_s, img_m_affine)

    return mapping, affine

In order to make the functionality useful in a real case scenario, one might want to save and load the respective mapping data. For this reason the following two functions were implemented:

def save_mapping(fname, data, overwrite=True):
    out = dict()

    # dissolve object structure
    for d in data:
        # save type for order independent decomposition
        out[type(d).__name__] = d.__dict__

    write_hdf5(fname + '.h5', out, overwrite=overwrite)

… saves the data, by creating a dictionary with all data types and their respective sub-structure, that is obtained in form of the internal dictionary of mapping or affine.

def load_mapping(fname):
    # create new instances
    mapping = imwarp.DiffeomorphicMap(None, [])
    affine = imaffine.AffineMap(None)

    data = read_hdf5(fname + '.h5')

    mapping.__dict__ = data.get(type(mapping).__name__)
    affine.__dict__ = data.get(type(affine).__name__)

    return mapping, affine

… looks up the desired data types in the loaded dictionary and recomposes mapping and affine by creating an empty instance and reassigning the loaded data.

Now we are able to create, save and load our non-linear morphs. A crucial part however is still missing, namely applying the morph to our data. This step is done by the following function:

def morph_precomputed(img, affine, mapping):
    # morph img data
    img_sdr_affine = np.zeros(img.shape)
    for vol in range(img.shape[3]):
        img_sdr_affine[:, :, :, vol] = mapping.transform(
            affine.transform(img.dataobj[:, :, :, vol]))

    return img_sdr_affine

… the function takes 4D image data and loops over the 4th dimension (normally a time series). The respective 3D volume that is left, will be morphed using the affine transform first, followed by the non-linear transform. The function will return a 4D volume in the new space.

Because we also want to visualize our data later on, we want select slices to show, that are not similar relative to the volume itself, rather than the brain structure that is housed by it. Hence we might want to show different slices for Moving and Static respectively. Slices are defined as cut coordinates and shifted using a transformation matrix (in this example the inverse affine transform). The function returns an array containing the morphed 3D coordinates. Note, that in order to perform a 3D transform using a 4×4 transformation matrix, a column of ones was appended to the Nx3 array containing the old coordinates.

def morph_slice_indices(slice_indices, transmat):
    # make sure array is 2D
    slice_indices = np.atleast_2d(slice_indices)

    # add a column of ones
    slice_indices = np.append(slice_indices,
                              np.ones((slice_indices.shape[0], 1)), axis=1)

    # matrix multiplication with transformation matrix
    slice_indices_out = np.einsum('...j,ij', slice_indices, transmat)

    # round to select valid slices and remove last column
    return np.round(slice_indices_out[:, :-1])

As a last – mostly for convenience – we define a function that prepares our morph template data. This includes loading the example data and re-slicing. The first argument to give to the function is the result we obtained from compute_lcmv_example_data. The function returns the pre-processed volumes (functional, moving MRI, static MRI).

def prepare_volume_example_data(img_in, t1_m_path, t1_s_path,
                                voxel_size=(3., 3., 3.)):
    # load lcmv inverse
    if isinstance(img_in, str):
        img_vol = nib.load(img_in)
    else:
        img_vol = img_in

    # reslice lcmv inverse
    img_vol_res, img_vol_res_affine = reslice.reslice(
        img_vol.get_data(),
        img_vol.affine,
        img_vol.header.get_zooms()[:3],
        voxel_size)

    img_vol_res = nib.Nifti1Image(img_vol_res, img_vol_res_affine)

    # load subject brain (Moving)
    t1_m_img = nib.load(t1_m_path)

    # reslice Moving
    t1_m_img_res, t1_m_img_res_affine = reslice.reslice(
        t1_m_img.get_data(),
        t1_m_img.affine,
        t1_m_img.header.get_zooms()[:3],
        voxel_size)

    t1_m_img_res = nib.Nifti1Image(t1_m_img_res, t1_m_img_res_affine)

    # load fsaverage brain (Static)
    t1_s_img = nib.load(t1_s_path)

    # reslice Static
    t1_s_img_res, t1_s_img_res_affine = reslice.reslice(
        t1_s_img.get_data(),
        t1_s_img.affine,
        t1_s_img.header.get_zooms()[:3],
        voxel_size)

    t1_s_img_res = nib.Nifti1Image(t1_s_img_res, t1_s_img_res_affine)

    return img_vol_res, t1_m_img_res, t1_s_img_res

Lastly we execute the example:

# Setup path
data_path = sample.data_path()
results_path = data_path + '/subjects/LCMV-results'

# create results directory if it doesn't exist
if not path.exists(results_path):
    makedirs(results_path)

# compute LCMV beamformer inverse example
img = compute_lcmv_example_data(data_path)

# load and reslice volumes
img_vol_res, t1_m_img_res, t1_s_img_res = prepare_volume_example_data(
    img,
    data_path + '/subjects/sample/mri/brain.mgz',
    data_path + '/subjects/fsaverage/mri/brain.mgz',
    voxel_size=(5., 5., 5.))

# compute morph map from Moving to Static
mapping, affine = compute_morph_map(t1_m_img_res, t1_s_img_res)

save_mapping(results_path + '/volume_morph', [mapping, affine])

mapping, affine = load_mapping(results_path + '/volume_morph')

# apply morph map (test if saving and loading worked)
img_vol_morphed = morph_precomputed(img_vol_res, mapping, affine)

# make transformed ndarray a nifti
img_vol_morphed = nib.Nifti1Image(img_vol_morphed,
                                  affine=t1_s_img_res.affine)

# save morphed result
nib.save(img_vol_morphed, results_path + '/lcmv-fsaverage.nii.gz')

The plotting can be done using:

# select image overlay
imgs = [index_img(img_vol_res, 0), index_img(img_vol_res, 0),
        index_img(img_vol_morphed, 0)]

# select anatomical background images
t1_imgs = [t1_m_img_res, t1_s_img_res, t1_s_img_res]

# slices to show for Static volume
slices_s = (-10, 0, 0)

# slices to show for Moving volume
# to show roughly the same view, we transform the selected Static slices using
# the inverse affine transformation. Note that due to rotation and the
# non-linear transform, both views do not overlap perfectly (pre computed for
# this example)
slices_m = morph_slice_indices(np.atleast_2d(slices_s), affine.affine_inv)[0]

slices = [slices_s, slices_s, tuple(slices_m)]

# define titles for plots
titles = ['subject brain', 'fsaverage brain', 'fsaverage brain morphed result']

# plot results
figure, (axes1, axes2, axes3) = plt.subplots(3, 1)
figure.subplots_adjust(top=0.8, left=0.1, right=0.9, hspace=0.5)
figure.patch.set_facecolor('black')

for axes, img, t1_img, cut_coords, title in zip([axes1, axes2, axes3],
                                                imgs, t1_imgs, slices, titles):
    display = plot_anat(t1_img,
                        display_mode='ortho',
                        cut_coords=cut_coords,
                        draw_cross=False,
                        axes=axes,
                        figure=figure,
                        annotate=False)

    display.add_overlay(img, alpha=0.75)
    display.annotate(size=8)
    axes.set_title(title, color='white', fontsize=12)

plt.text(plt.xlim()[1], plt.ylim()[0], 't = 0.087s', color='white')
plt.suptitle('morph subject results to fsaverage', color='white', fontsize=16)
plt.show()

… but the results shown below will are an animated t-series, whereas the plot produced by the above code will only show a single time-point.

As clearly visible the morph was successful as before, but now got a much nicer API and can be used as hands-on example.

Furthermore the above code computes within a minute, so don’t hesitate to try it out! 😉

As usual the source code will be provided via GitHub.

Stay tuned!

Cheers,

Tommy

Leave a Reply

Your email address will not be published. Required fields are marked *