import sys
import math
import re
import ghmm
import drawing
import exceptions
import utils
import hit as hit_module
from constants import ALPHABET, EXTENDED_ALPHABET
[docs]class TFFM(ghmm.DiscreteEmissionHMM):
[docs] def __init__(self, emission_domain, distribution, cmodel, kind,
name="TFFM"):
super(ghmm.DiscreteEmissionHMM, self).__init__(emission_domain,
distribution, cmodel)
if kind != "1st-order" and kind != "detailed":
raise exceptions.TFFMKindError(kind)
self.kind = kind
self.name = name
[docs] def __del__(self):
super(ghmm.DiscreteEmissionHMM, self).__del__()
[docs] def __len__(self):
if self.kind == "1st-order":
return self.N - 2
return self.N / 4 - 1
[docs] def background_emission_proba(self):
emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.}
if self.kind == "1st-order":
last_emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.}
# Retrieve emission proba for the first state
for i in xrange(4):
last_emissions[ALPHABET[i]] = self.getEmission(0)[i]
# Compute emission proba for the background
for i in xrange(4):
for j in xrange(4):
proba = self.getEmission(1)[j * 4 + i]
proba *= last_emissions[ALPHABET[j]]
emissions[ALPHABET[i]] += proba
elif self.kind == "detailed":
for i in xrange(4):
for j in xrange(4):
proba = self.getTransition(i, j) * 0.25
emissions[ALPHABET[j]] += proba
somme = sum(emissions.values())
for i in xrange(4):
emissions[ALPHABET[i]] /= somme
return emissions
[docs] def print_summary_logo(self, output=sys.stdout):
drawing.draw_logo(self, "summary", output)
[docs] def print_dense_logo(self, output=sys.stdout):
drawing.draw_logo(self, "dense", output)
[docs] def get_positions_ic(self):
previous_position_proba = self.background_emission_proba()
positions_information_content = []
start = drawing.get_position_start(self)
for pos in xrange(start, len(self) + start):
position_proba = {'A': 0., 'C': 0., 'G': 0., 'T': 0.}
for i in xrange(4):
emissions = get_emission_update_pos_proba(
self, position_proba, pos, previous_position_proba, i)
previous_position_proba = position_proba.copy()
if self.kind == "detailed":
somme = sum(previous_position_proba.values())
for i in xrange(4):
previous_position_proba[ALPHABET[i]] /= somme
values = previous_position_proba.values()
keys = previous_position_proba.keys()
emissions = zip(values, keys)
entropy = compute_entropy(emissions)
positions_information_content.append(2.0 - entropy)
return positions_information_content
[docs] def get_information_content(self):
pos_ic = self.get_positions_ic()
return sum(pos_ic)
[docs] def final_states(self):
if self.kind == "1st-order":
return [len(self) + 1]
else:
size = len(self) * 4
return [size, size + 1, size + 2, size + 3]
[docs] def train(self, training_file, epsilon=0.0001, max_iter=500):
training_sequences = ghmm.SequenceSet(ghmm.Alphabet(ALPHABET),
training_file)
utils.set_sequences_weight(training_sequences, 1.0)
self.baumWelch(training_sequences, max_iter, epsilon)
[docs] def test(self, seq_file, threshold=0., only_best=False):
sequence_list = utils.parse_fasta(seq_file)
all_hits = []
for seq_record in sequence_list:
hits_positive_strand = get_hits(self, seq_record, threshold)
hits_negative_strand = get_hits(self, seq_record, threshold,
negative=True)
hits = merge_hits(hits_positive_strand,
hits_negative_strand, only_best)
all_hits.extend(hits)
return all_hits
[docs] def trim(self, threshold):
first, last = get_significant_positions(self, threshold)
transitions, emissions, starts = trim_hmm(self, first, last)
new_hmm = ghmm.HMMFromMatrices(self.emissionDomain, self.distribution,
transitions, emissions, starts)
self.cmodel = new_hmm.cmodel
self.N = new_hmm.cmodel.N
self.M = new_hmm.cmodel.M
self.model_type = new_hmm.cmodel.model_type
self.maxorder = new_hmm.cmodel.maxorder
[docs]def trim_hmm(tffm, first, last):
nb_states = last - first + 1
if nb_states < 0:
# TODO raise an error rather than a sys.exit
sys.exit("\nTrimmed HMM is empty!!!\n")
matrices = tffm.asMatrices()
transitions = matrices[0]
emissions = matrices[1]
for index in xrange(tffm.N):
transitions[index] = transitions[index][0:nb_states+2]
if index > 1 and index < nb_states + 2:
emissions[index] = emissions[first+1]
first += 1
transitions = transitions[0:nb_states+2]
transitions[nb_states+1][nb_states+1] = 0.0
transitions[nb_states+1][1] = 1.0
emissions = emissions[0:nb_states+2]
starts = matrices[2][0:nb_states+2]
return transitions, emissions, starts
[docs]def get_significant_positions(tffm, threshold):
pos_ic = tffm.get_positions_ic()
first = 1
last = len(tffm)
while first <= last and pos_ic[first-1] < threshold:
first += 1
while last > 0 and pos_ic[last-1] < threshold:
last -= 1
return first, last
[docs]def best_hit(hits_positive_strand, hits_negative_strand):
max_positive_strand = max(hits_positive_strand)
max_negative_strand = max(hits_negative_strand)
if max_positive_strand or max_negative_strand:
return [max(max_positive_strand, max_negative_strand)]
else:
return []
[docs]def merge_hits(hits_positive_strand, hits_negative_strand, only_best):
if only_best:
return best_hit(hits_positive_strand, hits_negative_strand)
else:
return [hit for hit in utils.roundrobin(hits_positive_strand,
hits_negative_strand)]
[docs]def split_sequence(sequence):
ghmm_extended_alphabet = ghmm.Alphabet(EXTENDED_ALPHABET)
sequence_str = "".join(ghmm_extended_alphabet.externalSequence(sequence))
return ghmm.SequenceSet(ghmm_extended_alphabet,
re.split("([ACGT]+)", sequence_str))
[docs]def get_posterior_proba(tffm, sequence_split):
ghmm_extended_alphabet = ghmm.Alphabet(EXTENDED_ALPHABET)
posterior_proba = []
null_proba = [0.] * tffm.N
for sequence in sequence_split:
if re.match("[ACGT]", sequence):
emission_sequence = ghmm.SequenceSet(ghmm_extended_alphabet,
[sequence])[0]
posterior_proba.extend(tffm.posterior(emission_sequence))
else:
for __ in xrange(len(sequence)):
posterior_proba.append(null_proba)
return posterior_proba
[docs]def get_start_end_strand(position, seq_record, tffm, negative):
# position is 0-based and we need 1-based coordinates
if negative:
start = len(seq_record) - position
end = len(seq_record) - position + len(tffm) - 1
strand = "-"
else:
end = position + 1
start = position - len(tffm) + 2
strand = "+"
return start, end, strand
[docs]def construct_hits(posterior_proba, seq_record, tffm, threshold,
negative):
hits = [None] * len(seq_record)
for position in xrange(len(tffm)-1, len(posterior_proba)):
best_proba = 0.0
for state in tffm.final_states():
proba = posterior_proba[position][state]
if proba > threshold and proba > best_proba:
best_proba = proba
start, end, strand = get_start_end_strand(position,
seq_record, tffm, negative)
hits[end-1] = hit_module.HIT(seq_record, start,
end, strand, proba, tffm.name, state)
return hits
[docs]def get_hits(tffm, seq_record, threshold, negative=False):
sequence = seq_record.seq
if negative:
sequence = seq_record.reverse_complement().seq
sequence_split = re.split("([ACGT]+)", str(sequence))
posterior_proba = get_posterior_proba(tffm, sequence_split)
hits = construct_hits(posterior_proba, seq_record, tffm, threshold,
negative)
return hits
[docs]def compute_entropy(emissions):
entropy = 0.
for i in xrange(4):
proba, __ = emissions[i]
if proba != 0.:
entropy += proba * math.log(proba, 2)
return -entropy
[docs]def get_emission_update_pos_proba(tffm, position_proba, position,
previous_position_proba, index):
emissions_dic = {'A': 0., 'C': 0., 'G': 0., 'T': 0.}
if tffm.kind == "1st-order":
for j in xrange(4):
letter = ALPHABET[j]
emissions_dic[letter] = tffm.getEmission(position)[index * 4 + j]
proba = tffm.getEmission(position)[j * 4 + index]
proba *= previous_position_proba[letter]
position_proba[ALPHABET[index]] += proba
emissions = zip(emissions_dic.values(), emissions_dic.keys())
elif tffm.kind == "detailed":
for j in xrange(4):
start_state = (position - 1) * 4 + index
end_state = position * 4 + j
emissions_dic[ALPHABET[j]] = tffm.getTransition(start_state,
end_state)
proba = previous_position_proba[ALPHABET[index]]
proba *= tffm.getTransition(start_state, end_state)
position_proba[ALPHABET[j]] += proba
somme = sum(emissions_dic.values())
emissions = zip(emissions_dic.values(), emissions_dic.keys())
emissions = map(lambda (x, y): (x / somme, y), emissions)
emissions.sort(reverse=True)
return emissions
[docs]def tffm_from_xml(xml, kind):
hmm = ghmm.HMMOpen(xml)
return TFFM(hmm.emissionDomain, hmm.distribution, hmm.cmodel, kind)
[docs]def tffm_from_meme(meme_output, kind):
print "Not implemented yet"
[docs]def tffm_from_fasta(fasta_file, kind):
print "Not implemented yet"