Source code for tffm

import sys
import math
import re

import ghmm

import drawing
import exceptions
import utils
import hit as hit_module

from constants import ALPHABET, EXTENDED_ALPHABET


[docs]class TFFM(ghmm.DiscreteEmissionHMM):
[docs] def __init__(self, emission_domain, distribution, cmodel, kind, name="TFFM"): super(ghmm.DiscreteEmissionHMM, self).__init__(emission_domain, distribution, cmodel) if kind != "1st-order" and kind != "detailed": raise exceptions.TFFMKindError(kind) self.kind = kind self.name = name
[docs] def __del__(self): super(ghmm.DiscreteEmissionHMM, self).__del__()
[docs] def __len__(self): if self.kind == "1st-order": return self.N - 2 return self.N / 4 - 1
[docs] def background_emission_proba(self): emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} if self.kind == "1st-order": last_emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} # Retrieve emission proba for the first state for i in xrange(4): last_emissions[ALPHABET[i]] = self.getEmission(0)[i] # Compute emission proba for the background for i in xrange(4): for j in xrange(4): proba = self.getEmission(1)[j * 4 + i] proba *= last_emissions[ALPHABET[j]] emissions[ALPHABET[i]] += proba elif self.kind == "detailed": for i in xrange(4): for j in xrange(4): proba = self.getTransition(i, j) * 0.25 emissions[ALPHABET[j]] += proba somme = sum(emissions.values()) for i in xrange(4): emissions[ALPHABET[i]] /= somme return emissions
[docs] def get_positions_ic(self): previous_position_proba = self.background_emission_proba() positions_information_content = [] start = drawing.get_position_start(self) for pos in xrange(start, len(self) + start): position_proba = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} for i in xrange(4): emissions = get_emission_update_pos_proba( self, position_proba, pos, previous_position_proba, i) previous_position_proba = position_proba.copy() if self.kind == "detailed": somme = sum(previous_position_proba.values()) for i in xrange(4): previous_position_proba[ALPHABET[i]] /= somme values = previous_position_proba.values() keys = previous_position_proba.keys() emissions = zip(values, keys) entropy = compute_entropy(emissions) positions_information_content.append(2.0 - entropy) return positions_information_content
[docs] def get_information_content(self): pos_ic = self.get_positions_ic() return sum(pos_ic)
[docs] def final_states(self): if self.kind == "1st-order": return [len(self) + 1] else: size = len(self) * 4 return [size, size + 1, size + 2, size + 3]
[docs] def train(self, training_file, epsilon=0.0001, max_iter=500): training_sequences = ghmm.SequenceSet(ghmm.Alphabet(ALPHABET), training_file) utils.set_sequences_weight(training_sequences, 1.0) self.baumWelch(training_sequences, max_iter, epsilon)
[docs] def test(self, seq_file, threshold=0., only_best=False): sequence_list = utils.parse_fasta(seq_file) all_hits = [] for seq_record in sequence_list: hits_positive_strand = get_hits(self, seq_record, threshold) hits_negative_strand = get_hits(self, seq_record, threshold, negative=True) hits = merge_hits(hits_positive_strand, hits_negative_strand, only_best) all_hits.extend(hits) return all_hits
[docs] def trim(self, threshold): first, last = get_significant_positions(self, threshold) transitions, emissions, starts = trim_hmm(self, first, last) new_hmm = ghmm.HMMFromMatrices(self.emissionDomain, self.distribution, transitions, emissions, starts) self.cmodel = new_hmm.cmodel self.N = new_hmm.cmodel.N self.M = new_hmm.cmodel.M self.model_type = new_hmm.cmodel.model_type self.maxorder = new_hmm.cmodel.maxorder
[docs]def trim_hmm(tffm, first, last): nb_states = last - first + 1 if nb_states < 0: # TODO raise an error rather than a sys.exit sys.exit("\nTrimmed HMM is empty!!!\n") matrices = tffm.asMatrices() transitions = matrices[0] emissions = matrices[1] for index in xrange(tffm.N): transitions[index] = transitions[index][0:nb_states+2] if index > 1 and index < nb_states + 2: emissions[index] = emissions[first+1] first += 1 transitions = transitions[0:nb_states+2] transitions[nb_states+1][nb_states+1] = 0.0 transitions[nb_states+1][1] = 1.0 emissions = emissions[0:nb_states+2] starts = matrices[2][0:nb_states+2] return transitions, emissions, starts
[docs]def get_significant_positions(tffm, threshold): pos_ic = tffm.get_positions_ic() first = 1 last = len(tffm) while first <= last and pos_ic[first-1] < threshold: first += 1 while last > 0 and pos_ic[last-1] < threshold: last -= 1 return first, last
[docs]def best_hit(hits_positive_strand, hits_negative_strand): max_positive_strand = max(hits_positive_strand) max_negative_strand = max(hits_negative_strand) if max_positive_strand or max_negative_strand: return [max(max_positive_strand, max_negative_strand)] else: return []
[docs]def merge_hits(hits_positive_strand, hits_negative_strand, only_best): if only_best: return best_hit(hits_positive_strand, hits_negative_strand) else: return [hit for hit in utils.roundrobin(hits_positive_strand, hits_negative_strand)]
[docs]def split_sequence(sequence): ghmm_extended_alphabet = ghmm.Alphabet(EXTENDED_ALPHABET) sequence_str = "".join(ghmm_extended_alphabet.externalSequence(sequence)) return ghmm.SequenceSet(ghmm_extended_alphabet, re.split("([ACGT]+)", sequence_str))
[docs]def get_posterior_proba(tffm, sequence_split): ghmm_extended_alphabet = ghmm.Alphabet(EXTENDED_ALPHABET) posterior_proba = [] null_proba = [0.] * tffm.N for sequence in sequence_split: if re.match("[ACGT]", sequence): emission_sequence = ghmm.SequenceSet(ghmm_extended_alphabet, [sequence])[0] posterior_proba.extend(tffm.posterior(emission_sequence)) else: for __ in xrange(len(sequence)): posterior_proba.append(null_proba) return posterior_proba
[docs]def get_start_end_strand(position, seq_record, tffm, negative): # position is 0-based and we need 1-based coordinates if negative: start = len(seq_record) - position end = len(seq_record) - position + len(tffm) - 1 strand = "-" else: end = position + 1 start = position - len(tffm) + 2 strand = "+" return start, end, strand
[docs]def construct_hits(posterior_proba, seq_record, tffm, threshold, negative): hits = [None] * len(seq_record) for position in xrange(len(tffm)-1, len(posterior_proba)): best_proba = 0.0 for state in tffm.final_states(): proba = posterior_proba[position][state] if proba > threshold and proba > best_proba: best_proba = proba start, end, strand = get_start_end_strand(position, seq_record, tffm, negative) hits[end-1] = hit_module.HIT(seq_record, start, end, strand, proba, tffm.name, state) return hits
[docs]def get_hits(tffm, seq_record, threshold, negative=False): sequence = seq_record.seq if negative: sequence = seq_record.reverse_complement().seq sequence_split = re.split("([ACGT]+)", str(sequence)) posterior_proba = get_posterior_proba(tffm, sequence_split) hits = construct_hits(posterior_proba, seq_record, tffm, threshold, negative) return hits
[docs]def compute_entropy(emissions): entropy = 0. for i in xrange(4): proba, __ = emissions[i] if proba != 0.: entropy += proba * math.log(proba, 2) return -entropy
[docs]def get_emission_update_pos_proba(tffm, position_proba, position, previous_position_proba, index): emissions_dic = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} if tffm.kind == "1st-order": for j in xrange(4): letter = ALPHABET[j] emissions_dic[letter] = tffm.getEmission(position)[index * 4 + j] proba = tffm.getEmission(position)[j * 4 + index] proba *= previous_position_proba[letter] position_proba[ALPHABET[index]] += proba emissions = zip(emissions_dic.values(), emissions_dic.keys()) elif tffm.kind == "detailed": for j in xrange(4): start_state = (position - 1) * 4 + index end_state = position * 4 + j emissions_dic[ALPHABET[j]] = tffm.getTransition(start_state, end_state) proba = previous_position_proba[ALPHABET[index]] proba *= tffm.getTransition(start_state, end_state) position_proba[ALPHABET[j]] += proba somme = sum(emissions_dic.values()) emissions = zip(emissions_dic.values(), emissions_dic.keys()) emissions = map(lambda (x, y): (x / somme, y), emissions) emissions.sort(reverse=True) return emissions
[docs]def tffm_from_xml(xml, kind): hmm = ghmm.HMMOpen(xml) return TFFM(hmm.emissionDomain, hmm.distribution, hmm.cmodel, kind)
[docs]def tffm_from_meme(meme_output, kind): print "Not implemented yet"
[docs]def tffm_from_fasta(fasta_file, kind): print "Not implemented yet"