Source code for colicoords.fitting

import numpy as np
import numbers
from symfit import Fit, CallableNumericalModel
from symfit.core.fit import TakesData

from colicoords.models import NumericalCellModel
from colicoords.minimizers import Powell
from colicoords.support import ArrayFitResults


[docs]class RadialData(np.lib.mixins.NDArrayOperatorsMixin): """ Class mimicking a numpy ndarray used as dependent data for fitting STORM-membrane data. The apparent value of this object is an array with length `length` and whoes values are all equal to the radius of `cell_obj`. Parameters ---------- cell_obj : :class:`~colicoords.cell.Cell` Cell object whos radius gives this array's values. length : :obj:`int` Length of the array. """ _HANDLED_TYPES = (np.ndarray, numbers.Number) def __init__(self, cell_obj, length): self._cell_obj = cell_obj self._len = length self._array = np.ones(length, dtype=np.float) def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): out = kwargs.get('out', ()) for x in inputs + out: # Only support operations with instances of _HANDLED_TYPES. # Use ArrayLike instead of type(self) for isinstance to # allow subclasses that don't override __array_ufunc__ to # handle ArrayLike objects. if not isinstance(x, self._HANDLED_TYPES + (RadialData,)): return NotImplemented # Defer to the implementation of the ufunc on unwrapped values. inputs = tuple(x.value if isinstance(x, RadialData) else x for x in inputs) if out: kwargs['out'] = tuple( x.value if isinstance(x, RadialData) else x for x in out) result = getattr(ufunc, method)(*inputs, **kwargs) if type(result) is tuple: # multiple return values return tuple(np.array(x) for x in result) elif method == 'at': # no return value return None else: # one return value return np.array(result) @property def shape(self): return self.value.shape @property def value(self): return self._cell_obj.radius*self._array
[docs]class CellMinimizeFunctionBase(object): """ Base class for Objective objects used by ``CellFit`` to optimize the coordinate system. The base class takes a :class:`~colicoords.cell.Cell` object and the name of target data element to perform optimization on. Subclasses of ``CellMinimizeFunctionBase`` must implement the `__call__` builtin, which takes the coordinate system's parameters as keyword arguments. Note that this is not an objective function to be minimized, but instead the return value is compared with the specified data element or specific target data to give the chi-squared. Parameters ---------- cell_obj : :class:`~colicoords.cell.Cell` Cell object to optimize. data_name : :obj:`str` Target data element name. """ def __init__(self, cell_obj, data_name): self.cell_obj = cell_obj self.data_name = data_name
[docs]class CellBinaryFunction(CellMinimizeFunctionBase): """ Binary data element objective function. Calling this object with coordinate system parameters returns a binary image by thresholding the radial distance image with the radius of the cell. """ def __call__(self, **parameters): self.cell_obj.coords.sub_par(parameters) binary = self.cell_obj.coords.rc < self.cell_obj.coords.r return binary.astype(int)
[docs]class CellImageFunction(CellMinimizeFunctionBase): """ Image element objective function. Calling this object with coordinate system parameters returns a reconstructed image of the target data element. """ #todo add additional parameters to init parameters def __call__(self, **parameters): r = parameters.pop('r', self.cell_obj.coords.r) r = r / self.cell_obj.coords.r self.cell_obj.coords.sub_par(parameters) #todo check and make sure that the r_dist isnt calculated to far out which can give some strange results stop = np.max(self.cell_obj.data.shape) / 2 step = 1 #todo some way to access these kwargs #todo scipy.fftsmth.convolve xp, fp = self.cell_obj.r_dist(stop, step, data_name=self.data_name, method='box') simulated = np.interp(r * self.cell_obj.coords.rc, xp, np.nan_to_num(fp)) # todo check nantonum cruciality return simulated
[docs]class CellSTORMMembraneFunction(CellMinimizeFunctionBase): """ STORM membrane objective function. Calling this object with coordinate system parameters returns a reconstructed image of the target data element. """ #todo booleans is not going to work here, needs to be done via sigma_y! #todo remove init def __init__(self, *args, **kwargs): # self.r_upper = kwargs.pop('r_upper', None) # self.r_lower = kwargs.pop('r_lower', lambda x: 2*np.std(x)) super(CellSTORMMembraneFunction, self).__init__(*args, **kwargs) def __call__(self, **parameters): self.cell_obj.coords.sub_par(parameters) storm_data = self.cell_obj.data.data_dict[self.data_name] r_vals = self.cell_obj.coords.calc_rc(storm_data['x'], storm_data['y']) # b_upper = r_vals < (r_vals.mean() + self.r_upper(r_vals)) if self.r_upper else True # b_lower = r_vals > (r_vals.mean() - self.r_lower(r_vals)) if self.r_lower else True # # b = np.logical_and(b_upper, b_lower) r_vals = r_vals return r_vals.astype(np.float) @property def target_data(self): """Dependent (target) data for coordinate optimization based on STORM membrane markers""" return RadialData(self.cell_obj, len(self.cell_obj.data.data_dict[self.data_name]['x']))
[docs]class DepCellFit(Fit): # in this implementation stepwise fitting doesnt work, however direct subclassing of Fit is preferred. Reasses with # new symfit version defaults = { 'binary': CellBinaryFunction, 'storm': CellSTORMMembraneFunction, 'brightfield': CellImageFunction, 'fluorescence': CellImageFunction, } def __init__(self, cell_obj, data_name='binary', objective=None, minimizer=Powell, **kwargs): self.cell_obj = cell_obj self.data_name = data_name self.minimizer = minimizer self.kwargs = kwargs dclass = self.data_elem.dclass obj = self.defaults[dclass] if not objective else objective obj(self.cell_obj, data_name) self.model = NumericalCellModel(cell_obj, obj(self.cell_obj, data_name)) super(DepCellFit, self).__init__(self.model, self.data_elem, minimizer=minimizer, **kwargs) def renew_fit(self): super(DepCellFit, self).__init__(self.model, self.data_elem, minimizer=self.minimizer, **self.kwargs) def fit_parameters(self, parameters, **kwargs): with set_params(self, parameters): super(DepCellFit, self).__init__(self.model, self.data_elem, minimizer=self.minimizer, **self.kwargs) res = self.execute(**kwargs) for k, v in res.params.items(): i = [par.name for par in self.model.params].index(k) self.model.params[i].value = v self.model.cell_obj.coords.sub_par(res.params) return res def execute_stepwise(self, **kwargs): i = 0 j = 0 prev_val = 0 imax = kwargs.get('imax', 3) jmax = kwargs.get('jmax', 5) assert imax > 0 assert jmax > 0 while i < imax and j < jmax: #todo checking and testng j += 1 res = self.fit_parameters('r', **kwargs) res = self.fit_parameters('xl xr', **kwargs) res = self.fit_parameters('a0 a1 a2', **kwargs) print('Current minimize value: {}'.format(res.objective_value)) if prev_val == res.objective_value: i += 1 prev_val = res.objective_value return res @property def data_elem(self): return self.cell_obj.data.data_dict[self.data_name]
class CellFit(object): # Default functions to use for given data classes. defaults = { 'binary': CellBinaryFunction, 'storm': CellSTORMMembraneFunction, 'brightfield': CellImageFunction, 'fluorescence': CellImageFunction, } def __init__(self, cell_obj, data_name='binary', cell_function=None, minimizer=Powell, **kwargs): self.cell_obj = cell_obj self.data_name = data_name self.minimizer = minimizer self.kwargs = kwargs #todo self.dclass = self.data_elem.dclass #the self.data_elem value changes when sefl.cell_fuction is defined! func_klass = self.defaults[self.dclass] if not cell_function else cell_function #todo check custom function if issubclass(func_klass, CellMinimizeFunctionBase): self.cell_function = func_klass(self.cell_obj, data_name) elif callable(func_klass): self.cell_function = func_klass else: raise TypeError("Invalid type for cell_function keyword argument.") self.model = NumericalCellModel(cell_obj, self.cell_function) if self.dclass == 'storm': data_elem = self.cell_obj.data.data_dict[self.data_name] r_mean = self.cell_obj.coords.calc_rc(data_elem['x'], data_elem['y']).mean() idx = [p.name for p in self.model.params].index('r') self.model.params[idx].value = r_mean self.model.params[idx].min = 0.8*r_mean self.model.params[idx].max = 1.2*r_mean self.fit = Fit(self.model, self.data_elem, minimizer=minimizer, **kwargs) def renew_fit(self): self.fit = Fit(self.model, self.data_elem, minimizer=self.minimizer, **self.kwargs) def execute(self, **kwargs): return self.fit.execute(**kwargs) def fit_parameters(self, parameters, **kwargs): with set_params(self.fit, parameters): self.renew_fit() res = self.execute(**kwargs) for k, v in res.params.items(): i = [par.name for par in self.model.params].index(k) self.model.params[i].value = v self.model.cell_obj.coords.sub_par(res.params) return res def execute_stepwise(self, **kwargs): i = 0 j = 0 prev_val = 0 imax = kwargs.get('imax', 3) jmax = kwargs.get('jmax', 5) assert imax > 0 assert jmax > 0 while i < imax and j < jmax: #todo checking and testng j += 1 res = self.fit_parameters('r', **kwargs) res = self.fit_parameters('xl xr', **kwargs) res = self.fit_parameters('a0 a1 a2', **kwargs) print('Current minimize value: {}'.format(res.objective_value)) if prev_val == res.objective_value: i += 1 prev_val = res.objective_value return res @property def data_elem(self): try: return self.cell_function.target_data except AttributeError: return self.cell_obj.data.data_dict[self.data_name]
[docs]class LinearModelFit(Fit): """ Fitting of a model with linear parameters where the linear parameters are not fitted by ``symfit`` but instead solved as a system of linear equations. Parameters ---------- model : : """ def __init__(self, model, *args, **kwargs): objective = kwargs.pop('objective', None) minimizer = kwargs.pop('minimizer', None) constraints = kwargs.pop('constraints', None) temp_data = TakesData(model, *args, **kwargs) self._old_model = model self._new_model = make_linear_model(model, temp_data.dependent_data) super(LinearModelFit, self).__init__(self._new_model, *args, **kwargs, minimizer=minimizer, objective=objective, constraints=constraints)
[docs] def execute(self, **kwargs): res = super(LinearModelFit, self).execute(**kwargs) linear_dict = {par.name: value for par, value in zip(self._old_model.linear_params, [func.a_list for func in self.model.numerical_components][0])} # î assuming all linear parameters are in all numerical components which is not true i guess overall_dict = {**res.params, **linear_dict} popt = [overall_dict[par.name] for par in self._old_model.params] try: return ArrayFitResults(self._old_model, popt, None, res.infodict, res.status_message, res.iterations, **res.gof_qualifiers) except AttributeError: return ArrayFitResults(self._old_model, popt, None, res.minimizer, res.objective, res.status_message, **res.minimizer_output)
[docs]def solve_linear_system(y_list, data): """Solve system of linear eqns a1*y1 + a2*y2 == data but then also vector edition of that""" D_vec = np.stack([data.dot(y) for y in y_list]).flatten() M = np.array([y1.dot(y2) for y1 in y_list for y2 in y_list]).reshape(len(y_list), len(y_list)) l = len(data) if data.ndim == 2 else 1 bigM = np.kron(M, np.eye(l)) ai = np.linalg.solve(bigM, D_vec) return np.split(ai, len(y_list))
class set_params(object): def __init__(self, fit_object, parameters): self.parametes = [par.rstrip(',.: ') for par in parameters.split(' ')] self.fit_object = fit_object def __enter__(self): # todo context manager for fixing / unfixing params self.original_fixing = [par.fixed for par in self.fit_object.model.params] fixed_params = [par for par in self.fit_object.model.params if not par.name in self.parametes] for par in fixed_params: par.fixed = True def __exit__(self, *args): for par, fixed in zip(self.fit_object.model.params, self.original_fixing): par.fixed = fixed class wrapped_func(object): def __init__(self, func, data, linear_params): self.func = func # the original callable self.data = data # only data no dict self.linear_params = linear_params # list of names only def __call__(self, *args, **kwargs): y_list = [self.func(*args, **{par: 1 if par == s_par else 0 for par in self.linear_params}, **kwargs) for s_par in self.linear_params] self.a_list = solve_linear_system(y_list, self.data) result = sum([a_elem[:, np.newaxis] * y_elem for a_elem, y_elem in zip(self.a_list, y_list)]) return result def make_linear_model(model, data): new_dict = {k: wrapped_func(v, data[k], [par.name for par in model.linear_params]) for k, v in model.model_dict.items()} new_params = [par for par in model.params if par not in model.linear_params] return CallableNumericalModel(new_dict, model.independent_vars, new_params)