Source code for curveball.scripts.cli

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# This file is part of curveball.
# https://github.com/yoavram/curveball

# Licensed under the MIT license:
# http://www.opensource.org/licenses/MIT-license
# Copyright (c) 2015, Yoav Ram <yoav@yoavram.com>
from builtins import map
import sys
import os.path
import pkg_resources
import glob
import warnings
# catch some future warnings, mostly caused by matplotlib
warnings.simplefilter(action="ignore", category=FutureWarning)
import curveball
import numpy as np
import pandas as pd
import click
import xlrd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("ticks")


VERBOSE = False
PLOT = True
PROMPT = True
ERROR_COLOR = 'red'
INFO_COLOR = 'white'
file_extension_handlers = {
	'.mat': curveball.ioutils.read_tecan_mat, 
	'.xlsx': curveball.ioutils.read_tecan_xlsx,
	'.csv': curveball.ioutils.read_curveball_csv,
}


def echo_error(message):
	click.secho("Error: %s" % message, fg=ERROR_COLOR)


def echo_info(message):
	if VERBOSE:
		click.secho(message, fg=INFO_COLOR)


def ioerror_to_click_exception(io_error):
	raise click.FileError(io_error.filename, hint=io_error.message)


def to_dict(ctx, param, value):
    return dict(value)


def to_set(ctx, param, value):
    return set(value)


[docs]def get_filename(filepath): """Get a file name out of a file path. Parameters ---------- filepath : str Returns ------- str filename """ if filepath is None: return '' filename = os.path.split(filepath)[-1] if filename is None: return '' return filename
[docs]def find_plate_file(plate_folder, plate_file): """Finds a plate file, either in the current working dir or in the package data resources. Parameters ---------- plate_file : str the filename of the plate file, may include absolute or relative path Returns ------- str the full path of the plate file. """ plate_path = os.path.join(plate_folder, plate_file) if not os.path.exists(plate_path): # if plate path doesn't exist try to get it from package data plate_path = pkg_resources.resource_filename(plate_folder, plate_file) if not os.path.exists(plate_path): raise click.FileError(plate_path, hint="can't find file.") return plate_path
[docs]def load_plate(plate_path): """Loads a plate template from a CSV file. Parameters ---------- plate_path : str full or relative path to the plate template file. Returns ------- pandas.DataFrame the plate template in tidy data format (see :py:mod:`.ioutils`). See also -------- find_plate_file """ try: plate = pd.read_csv(plate_path) except IOError as e: ioerror_to_click_exception(e) except pd.parser.CParserError as e: raise click.FileError(plate_path, hint="parser error, probably not a CSV file, {0}".format(e.args[0])) return plate
[docs]def where(ctx, param, value): """Prints the path where Curveball is installed and exits. Parameters are generally ignored; if `value` or `ctx.resilient_parsing` are not empty/:const:`False`/:const:`None`, the function returns without doing anything. """ if not value or ctx.resilient_parsing: return path = curveball.__file__ folder = os.path.split(path)[0] click.secho(click.format_filename(folder)) ctx.exit()
@click.group() @click.option('-v/-V', '--verbose/--no-verbose', default=False) @click.option('-l/-L', '--plot/--no-plot', default=True) @click.option('-p/-P', '--prompt/--no-prompt', default=False) @click.option('--where', is_flag=True, default=False, is_eager=True, callback=where, help='prints the path where Curveball is installed') @click.version_option(version=curveball.__version__, prog_name=curveball.__name__) def cli(verbose, plot, prompt, where): """Main entry point to curveball To get help for the parameters, run: >>> curveball --help """ global VERBOSE VERBOSE = verbose global PLOT PLOT = plot global PROMPT PROMPT = prompt if VERBOSE: click.secho('=' * 40, fg='cyan') click.secho('Curveball %s' % curveball.__version__, fg='cyan') click.secho('=' * 40, fg='cyan') @click.option('--plate_folder', default='plate_templates', help='plate templates default folder', type=click.Path()) @click.option('--plate_file', default='checkerboard.csv', help='plate templates csv file') @click.option('-o', '--output_file', default='-', help='output file path', type=click.File(mode='w', lazy=True)) @click.option('--list', is_flag=True, default=False, help='list plate templates in the default folder') @click.option('--show', is_flag=True, default=False, help='display the plate template as an image') @cli.command() def plate(plate_folder, plate_file, output_file, list, show): """Read and print a plate template from a plate template CSV file. To get help for the parameters, run: >>> curveball plate --help """ if list: files = pkg_resources.resource_listdir('plate_templates', '') files = [fn for fn in files if os.path.splitext(fn)[-1].lower() == '.csv'] files = os.linesep.join(files) click.echo(files) return plate_path = find_plate_file(plate_folder, plate_file) plate = load_plate(plate_path) if show: fig, ax = curveball.plots.plot_plate(plate) if output_file.name == '-': plt.show() else: fig.savefig(output_file.name) else: plate.to_csv(output_file, index=False) if output_file.name != '-': echo_info("Wrote output to {0}".format(click.format_filename(output_file.name))) @click.argument('path', type=click.Path(exists=True, readable=True)) @click.option('--plate_folder', default='plate_templates', help='plate templates default folder', type=click.Path()) @click.option('--plate_file', default='checkerboard.csv', help='plate templates csv file') @click.option('-o', '--output_file', default='-', help='output csv file path', type=click.File(mode='w', lazy=True)) @click.option('--blank_strain', default='0', type=str, help='blank strain for background calibration') @click.option('--ref_strain', default='1', type=str, help='reference strain for competitions') @click.option('--max_time', default=np.inf, help='omit data after max_time hours') @click.option('--guess', type=(str, float), multiple=True, callback=to_dict, help='set the initial guess for a parameter') @click.option('--param_min', type=(str, float), multiple=True, callback=to_dict, help='set the minimum allowed value for a parameter') @click.option('--param_max', type=(str, float), multiple=True, callback=to_dict, help='set the maximum allowed value for a parameter') @click.option('--param_fix', type=str, multiple=True, callback=to_set, help='fix a parameter to it\'s initial guess') @click.option('--weights/--no-weights', default=False, help="use weights for the fitting procedure") @click.option('--ci/--no-ci', default=False, help="find confidence intervals for lag and max growth rate") @click.option('--nsamples', default=1000, help="number of bootstrap samples to use, only applicable when using --ci") @cli.command() def analyse(path, output_file, plate_folder, plate_file, blank_strain, ref_strain, max_time, guess, param_min, param_max, param_fix, weights, ci, nsamples): """Analyse growth curves data using Curveball. To get help for the parameters, run: >>> curveball plate --help """ results = [] plate_path = find_plate_file(plate_folder, plate_file) if VERBOSE: click.echo('- Processing %s' % click.format_filename(path)) click.echo('- Using plate template from %s' % click.format_filename(plate_path)) click.echo('- Blank strain: %s; Reference strain: %s' % (blank_strain, ref_strain)) click.echo('- Omitting data after %.2f hours' % max_time) click.echo('-' * 40) plate = load_plate(plate_path) plate.Strain = list(map(str, plate.Strain)) plate_strains = plate.Strain.unique().tolist() if PROMPT: fig,ax = curveball.plots.plot_plate(plate) fig.show() click.echo("Plate with %d strains: %s" % (len(plate_strains), ', '.join(plate_strains))) click.confirm('Is this the plate you wanted?', default=False, abort=True, show_default=True) if os.path.isdir(path): files = glob.glob(os.path.join(path, '*')) #files = [os.path.join(path, fn) for fn in files] else: files = glob.glob(path) files = [fn for fn in files if os.path.splitext(fn)[-1].lower() in file_extension_handlers.keys()] if not files: raise click.ClickException("No data files found in folder {0}".format(click.format_filename(path))) with click.progressbar(files, label='Processing files:', item_show_func=get_filename, color='green') as bar: for filepath in bar: file_results = _process_file(filepath, plate, blank_strain, ref_strain, max_time, guess, param_min, param_max, param_fix, weights, ci, nsamples) results.extend(file_results) output_table = pd.DataFrame(results) output_table.to_csv(output_file, index=False) if VERBOSE and output_file.name != '-': click.secho("Wrote output to %s" % output_file.name, fg='green') def _process_file(filepath, plate, blank_strain, ref_strain, max_time, guess, param_min, param_max, param_fix, weights, ci, nsamples): """Analyses a single growth curves file. See also -------- analyse """ results = [] fn, ext = os.path.splitext(filepath) echo_info("\tHandler: {1}\n".format(filepath, ext)) handler = file_extension_handlers.get(ext) if handler is None: echo_info("No handler found for file {0}".format(click.format_filename(filepath))) return results try: if np.isfinite(max_time): df = handler(filepath, plate=plate, max_time=max_time) else: df = handler(filepath, plate=plate) except IOError as e: ioerror_to_click_exception(e) except xlrd.biffh.XLRDError as e: raise click.FileError(filepath, hint="parser error, probably not a {1} file, {0}".format(e.args[0], ext)) strains = plate.Strain.unique().tolist() if blank_strain is not None and blank_strain != 'none': if blank_strain in strains: bg = df[(df.Strain == blank_strain) & (df.Time == df.Time.min())] bg = bg.OD.mean() df.OD -= bg df.loc[df.OD < 0, 'OD'] = 0 else: echo_error("Warning! Blank strain '%s' doesn't exist" % blank_strain) if PLOT: wells_plot_fn = fn + '_wells.png' g = curveball.plots.plot_wells(df, output_filename=wells_plot_fn) echo_info("Wrote wells plot to %s" % click.format_filename(wells_plot_fn)) strains_plot_fn = fn + '_strains.png' g = curveball.plots.plot_strains(df, output_filename=strains_plot_fn) echo_info("Wrote strains plot to %s" % click.format_filename(strains_plot_fn)) if blank_strain in strains: strains.remove(blank_strain) if ref_strain in strains: strains.remove(ref_strain) strains.insert(0, ref_strain) else: echo_error("Warning, reference strains '%s' doesn't exist!" % ref_strain) for strain in strains: strain_df = df[df.Strain == strain] _ = curveball.models.fit_model(strain_df, param_guess=guess, param_min=param_min, param_max=param_max, param_fix=param_fix, use_weights=weights, PLOT=PLOT, PRINT=VERBOSE) if PLOT: fit_results,fig,ax = _ strain_plot_fn = fn + ('_strain_%s.png' % strain) fig.savefig(strain_plot_fn) echo_info("Wrote strain %s plot to %s" % (strain, click.format_filename(strain_plot_fn))) else: fit_results = _ res = {} fit = fit_results[0] res['folder'] = os.path.dirname(filepath) res['filename'] = os.path.splitext(os.path.basename(fn))[0] res['strain'] = strain res['model'] = fit.model.name res['RSS'] = fit.chisqr res['RMSD'] = np.sqrt(res['RSS'] / fit.ndata) res['NRMSD'] = res['RMSD'] / (strain_df.OD.max() - strain_df.OD.min()) res['CV(RMSD)'] = res['RMSD'] / (strain_df.OD.mean()) res['bic'] = fit.bic res['aic'] = fit.aic res['weighted_bic'] = fit.weighted_bic res['weighted_aic'] = fit.weighted_aic params = fit.params res['y0'] = params['y0'].value res['K'] = params['K'].value res['r'] = params['r'].value res['nu'] = params['nu'].value if 'nu' in params else 1 res['q0'] = params['q0'].value if 'q0' in params else 0 res['v'] = params['v'].value if 'v' in params else 0 res['has_lag'] = curveball.models.has_lag(fit_results) res['has_nu'] = curveball.models.has_nu(fit_results, PRINT=VERBOSE) res['max_growth_rate'] = curveball.models.find_max_growth(fit)[-1] res['min_doubling_time'] = curveball.models.find_min_doubling_time(fit) res['lag'] = curveball.models.find_lag(fit) if ci: param_samples = curveball.models.bootstrap_params(strain_df, fit, nsamples=nsamples) _, _, low, high = curveball.models.find_max_growth_ci(fit, param_samples) res['max_growth_rate_low'] = low res['max_growth_rate_high'] = high low, high = curveball.models.find_lag_ci(fit, param_samples) res['lag_low'] = low res['lag_high'] = high low, high = curveball.models.find_min_doubling_time_ci(fit, param_samples) res['min_doubling_time_low'] = low res['min_doubling_time_high'] = high low, high = curveball.models.find_K_ci(param_samples) res['K_low'] = low res['K_high'] = high if strain == ref_strain: ref_fit = fit res['w'] = 1 elif ref_strain in strains: colors = plate[plate.Strain.isin([strain, ref_strain])].Color.unique() _ = curveball.competitions.compete(fit, ref_fit, hours=df.Time.max(), colors=colors, PLOT=PLOT) if PLOT: t,y,fig,ax = _ competition_plot_fn = fn + ('_%s_vs_%s.png' % (strain, ref_strain)) fig.savefig(competition_plot_fn) echo_info("Wrote competition %s vs %s plot to %s" % (strain, ref_strain, click.format_filename(strain_plot_fn))) else: t,y = _ res['w'] = curveball.competitions.fitness_LTEE(y, assay_strain=0, ref_strain=1) # TODO CI for w results.append(res) plt.clf() return results if __name__ == '__main__': cli()