Skip to content
Snippets Groups Projects
variation_details.py 22.6 KiB
Newer Older
from .usefull_little_functions import *
from .intersect import Features,Segments,search_segment,invert_seg
from .detect_inversion import detect_feature_inversion

# functions to get the detail of the variations in the features

# handle the variations analysis between the feature on the source genome and the feature on the target genome
def generate_variations_details(feature_id,seg_seq,match,file_out_var,segments_on_target_genome):
    # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
    [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)

    # do it for the gene, then for its childs.
    if match[4]==True: # gene passed the filters
        inversion=detect_feature_inversion(Features[feature_id].segments_list_source,feature_target_path)
        print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome)
        child_variation_details(feature_id,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome)
def child_variation_details(feature_id,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome):
        for child_id in Features[feature_id].get_child_list(Features):
            child=Features[child_id]
            # look for the first and last seg of the child in its parent path
            [child_first_seg,child_last_seg]=get_child_first_last_seg(child.segments_list_source,feature_target_path,inversion)

            if child_first_seg!='': # else the child faeture is absent in this occurence of the parent feature
                child_target_path=find_child_target_path(child.segments_list_target,walk,copy_id)
                print_variations(child_id,child_first_seg,child_last_seg,copy_id,walk,seg_seq,child_target_path,inversion,file_out_var,segments_on_target_genome)
def get_child_first_last_seg(child_segments_list_source,feature_target_path,inversion):
    [child_first_seg,child_last_seg]=['','']
    
    child_first_seg=get_child_first_seg(child_segments_list_source,feature_target_path)

    if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
        child_last_seg=get_child_last_seg(child_segments_list_source,feature_target_path)

    if inversion:
        temp=child_first_seg
        child_first_seg=child_last_seg
        child_last_seg=temp

    return [child_first_seg,child_last_seg]

def get_child_first_seg(child_segments_list_source,feature_target_path):
    for seg in child_segments_list_source: # get first seg in the parent path
        if seg in feature_target_path: # parent path
            return seg
        elif invert_seg(seg) in feature_target_path:
            return invert_seg(seg)
    return ''

def get_child_last_seg(child_segments_list_source,feature_target_path):
    for seg in reversed(child_segments_list_source): # get last seg in the parent path
        if seg in feature_target_path: # parent path
            return seg
        elif invert_seg(seg) in feature_target_path:
            return invert_seg(seg)
    return ''

def find_child_target_path(child_segments_list_target,walk,copy_id):
    for match in child_segments_list_target: # look for the right occurence of the child feature
        if (match[0]==walk) and (match[1]==copy_id):
            return match[2]

def print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome):

    if first_seg!='': # if the feature is not completly absent        # add the else, output absent features
        feat_var=FeatureVariation(feature_id,file_out_var,first_seg,last_seg,walk,copy_id,feature_target_path,inversion,segments_on_target_genome)
        feature_path_source_genome=feat_var.feature_path_source_genome # removes the strand on the segments list
        feature_path_target_genome=feat_var.feature_path_target_genome # removes the strand on the segments list and inverts the list if necessary
        feature=Features[feature_id]
        # loop to go through both paths with i and j

        # detect and print variations ignoring the strands
        while (i<len(feature_path_source_genome)) and (j<len(feature_path_target_genome)):
            if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths

                if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
                    if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome

                        # if both segments are absent in the other genome, its a substitution
                        feat_var.last_seg_in_target=feature_path_target_genome[j]
                        if (feat_var.type=='insertion') or (feat_var.type=='deletion'): # print the current variation before treating the substitution
                            feat_var.print_current_var(segments_on_target_genome)
                        if feat_var.type=='substitution':
                            feat_var.continue_var(seg_seq,i,j,0)
                            feat_var.init_new_var("substitution",i,j,seg_seq,feature)
                        i+=1;j+=1

                    else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution
                        if feat_var.type=='deletion': # print the current variation before treating the insertion
                            feat_var.print_current_var(segments_on_target_genome)
                        feat_var.last_seg_in_target=feature_path_target_genome[j]
                        if feat_var.type=='insertion':
                            feat_var.continue_var(seg_seq,i,j,0)
                        elif feat_var.type=="substitution":
                            while feature_path_target_genome[j]!=feature_path_source_genome[i]:
                                feat_var.continue_var(seg_seq,i,j,2)
                            feat_var.print_current_var(segments_on_target_genome)
                            feat_var.init_new_var("insertion",i,j,seg_seq,feature)
                        j+=1

                elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution
                    if feat_var.type=='insertion': # print the current variation before treating the deletion
                        feat_var.print_current_var(segments_on_target_genome)
                    if feat_var.type=='deletion':
                        feat_var.continue_var(seg_seq,i,j,0)
                    elif feat_var.type=="substitution":
                        while feature_path_target_genome[j]!=feature_path_source_genome[i]:
                            feat_var.continue_var(seg_seq,i,j,1)
                        feat_var.print_current_var(segments_on_target_genome)
                        feat_var.init_new_var("deletion",i,j,seg_seq,feature)
                    i+=1

                else : # if both segments are present in the other genome but not at the same position. weird case never found yet
                    # can be a substitution. check later if its not an inversion
                    feat_var.last_seg_in_target=feature_path_target_genome[j]
                    if (feat_var.type=='insertion') or (feat_var.type=='deletion'): # print the current variation before treating the substitution
                        feat_var.print_current_var(segments_on_target_genome)
                    if feat_var.type=='substitution':
                        feat_var.continue_var(seg_seq,i,j,0)
                        feat_var.init_new_var("substitution",i,j,seg_seq,feature)
            else: # segment present in both, no variation; print the running indel if there is one
                if feat_var.type!='': # print the current variation if there is one
                    feat_var.print_current_var(segments_on_target_genome)
                feat_var.last_seg_in_target=feature_path_target_genome[j]
        if (feat_var.type!=''): # if there was a current variation when we reached the end, print it
            feat_var.print_current_var(segments_on_target_genome)

        if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
            feat_var.print_last_deletion(i,feature,seg_seq)

        if feat_var.var_count==0: # if no variation was encountered
            feat_var.print_novar()
            


# stores information about a feature and its current variation
class FeatureVariation:

    def __init__(self,feature_id,file_out_var,first_seg,last_seg,walk,copy_id,feature_target_path,inversion,segments_on_target_genome):
        feature=Features[feature_id]
        # get feature paths on the original genome and on the target genome
        feature_path_target_genome=feature_target_path
        feature_path_source_genome=feature.segments_list_source

        if inversion:
            feature_path_target_genome=invert_segment_list(feature_path_target_genome)
            stop_new_genome=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
            start_new_genome=get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
        else:
            start_new_genome=get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
            stop_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
        size_new_genome=stop_new_genome-start_new_genome+1
        size_diff=str(size_new_genome-feature.size)
        sequence_name=get_seg_occ(first_seg,walk,feature_id,copy_id,segments_on_target_genome)[0]

        self.feature_id=feature.id # real id in the gff
        self.feature_key=feature_id # key to access the feature info in the dict Features
        self.feature_type=feature.type
        self.chr=sequence_name
        self.start_new=start_new_genome
        self.stop_new=stop_new_genome
        self.inversion=inversion
        self.size_diff=size_diff
        self.type=''
        self.last_seg_in_target=''
        self.seg_ref=list()
        self.seg_alt=list()
        self.feature_path_source_genome=feature_path_source_genome
        self.feature_path_target_genome=feature_path_target_genome
        self.feat_start=feature.start
        self.copy_id=copy_id
        self.walk=walk
        self.start_feat_seg_target=feature_path_target_genome[0]
        self.var_count=0
        self.file_out_var=file_out_var


    # check if a substitution could be an inversion inside a feature
    def detect_small_inversion(self):
        [list_ref_common,list_alt_common]=[list(),list()]
        list_ref_unstrand=[segment_stranded[1:] for segment_stranded in self.seg_ref]
        list_alt_unstrand=[segment_stranded[1:] for segment_stranded in self.seg_alt]
        for seg in self.seg_ref:
            if seg[1:] in list_alt_unstrand:
                list_ref_common.append(seg)
        for seg in self.seg_alt:
            if seg[1:] in list_ref_unstrand:
                list_alt_common.append(seg)
        if (len(list_ref_common)>len(list_ref_unstrand)*0.5) and (len(list_alt_common)>len(list_alt_unstrand)*0.5):
            return f'\t# Suspected inversion within this substitution.'
        else:
            return ''
        
    # print a variation
    def print_current_var(self,segments_on_target_genome):
        warning=''
        if self.type=='insertion':
            [pos_old,pos_new]=self.get_old_new_pos_insertion(segments_on_target_genome)
            line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tinsertion\t-\t{self.alt}\t{len(self.alt)}\t{pos_old}\t{pos_new}{warning}\n'
        elif self.type=='deletion':
            [pos_old,pos_new]=self.get_old_new_pos_deletion(segments_on_target_genome)
            line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tdeletion\t{self.ref}\t-\t{len(self.ref)}\t{pos_old}\t{pos_new}{warning}\n'
        elif self.type=='substitution':
            warning=self.detect_small_inversion()
            [pos_old,pos_new]=self.get_old_new_pos_substitution(segments_on_target_genome)
            size_subs=f'{len(self.ref)}/{len(self.alt)}'
            line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tsubstitution\t{self.ref}\t{self.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
            
        self.file_out_var.write(line)
        self.var_count+=1
        self.reset_var()

    # print a feature with no variation
    def print_novar(self):
        line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tno_var\t-\t-\t-\t-\t-\n'
        self.file_out_var.write(line)

    # print a deletion at the end of a feature
    def print_last_deletion(self,i,feature,seg_seq):
        seg_del=search_segment(self.feature_path_source_genome[i])
        pos_old=int(Segments[seg_del].get_start(feature.id))-self.feat_start+1
        pos_new=str(self.size_new+1) # the deletion is at the end of the feature on the new genome
        del_sequence=FeatureVariation.get_sequence_list_seg(self.feature_path_source_genome,i,feature.pos_stop,seg_seq)

        line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tdeletion\t{del_sequence}\t-\t{len(del_sequence)}\t{pos_old}\t{pos_new}\n'
        self.file_out_var.write(line)
        self.var_count+=1
        self.reset_var()

    # reset the informations of the variation, but keep the information about the feature
    def reset_var(self):
        self.type='' # todo : make type enumerate
        self.size_var=0
        self.start_var=''
        self.start_var_index=0
        self.ref=''
        self.alt=''

    # change the variation information, but keep the feature information (the variation is on the feature)
    def init_new_var(self,type,i,j,seg_seq,feature):
        feature_path_source_genome=self.feature_path_source_genome
        feature_path_target_genome=self.feature_path_target_genome
        self.type=type
        self.start_var=feature_path_source_genome[i]
        self.start_var_index=i
        if type=="substitution":
            self.start_on_target=feature_path_target_genome[j]
            self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
            self.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
            self.seg_ref.append(feature_path_source_genome[i])
            self.seg_alt.append(feature_path_target_genome[j])
        elif type=="insertion":
            self.ref="-"
            self.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
            self.seg_alt.append(feature_path_target_genome[j])
        elif type=="deletion":
            if i==0: # if the deletion is at the start of the feature, the deletion doesnt always start at the start at the first segment : 
                #use pos_start, position of the feature on its first segment
                self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])[feature.pos_start-1:]
                self.seg_ref.append(feature_path_source_genome[i])
            else: # else, the deletion will always start at the start of the first segment.
                self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
                self.seg_ref.append(feature_path_source_genome[i])
            self.alt="-"

    # update the variation
    def continue_var(self,seg_seq,i,j,genome_to_continue):
        feature_path_source_genome=self.feature_path_source_genome
        feature_path_target_genome=self.feature_path_target_genome
        if self.type=="substitution":
            if genome_to_continue==0: # genome_to_continue allows to choose if the substitution continues for the original or the target genome, or both.
                self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
                self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
                self.seg_ref.append(feature_path_source_genome[i])
                self.seg_alt.append(feature_path_target_genome[j])
            elif genome_to_continue==1: # deletion
                self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
                self.seg_ref.append(feature_path_source_genome[i])
            elif genome_to_continue==2: # insertion
                self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
                self.seg_alt.append(feature_path_target_genome[j])
        elif self.type=="insertion":
            self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
            self.seg_alt.append(feature_path_target_genome[j])
        elif self.type=="deletion":
            self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
            self.seg_ref.append(feature_path_source_genome[i])


    # find the position of a substitution on the source and the target sequence
    def get_old_new_pos_substitution(self,segments_on_target_genome):
        feat=self.feature_key
        seg_pos=search_segment(self.start_var)
        pos_old=str(int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start)

        var_start_seg=self.start_on_target
        if self.inversion:
            var_start_seg=invert_seg(var_start_seg)
            end_var=get_seg_occ(var_start_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]
            start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
            pos_new=str(start_feat-end_var)
        else:
            start_var=get_seg_occ(var_start_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]
            start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
            pos_new=str(start_var-start_feat)
        return [pos_old,pos_new] # pos_old and pos_new are the base before the change

    # find the position of an insertion on the source and the target sequence
    def get_old_new_pos_insertion(self,segments_on_target_genome):
        feat=self.feature_key
        seg_pos=search_segment(self.start_var) # start_var is the segment AFTER the insertion
        pos_old=str(int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start)
        start_var_seg=self.start_var
        if self.inversion:
            start_var_seg=invert_seg(start_var_seg)
            end_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]+len(self.alt) # start_var_seg is the segment AFTER the insertion
            start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
            pos_new=str(start_feat-end_var)
        else:
            start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]-len(self.alt) # start_var_seg is the segment AFTER the insertion
            start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
            pos_new=str(start_var-start_feat)
        return [pos_old,pos_new] # pos_old and pos_new are the base before the change

    # find the position of a deletion on the source and the target sequence
    def get_old_new_pos_deletion(self,segments_on_target_genome):
        feat=self.feature_key
        seg_pos=search_segment(self.start_var)
        if self.start_var_index==0:
            pos_old=int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start+Features[feat].pos_start-1
        else:
            pos_old=int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start
            if pos_old<0:
                pos_old=0
                print("error with variation position",self.inversion,"***")

        if self.last_seg_in_target=="": # deletion of the beggining of the feature, so no segment placed in the new genome yet. 
            pos_new=0
        else:
            start_var_seg=self.last_seg_in_target
            if self.inversion:
                start_var_seg=invert_seg(start_var_seg)
                start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]-1
                start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
                pos_new=str(start_feat-start_var)
            else:
                start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]+1
                start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
                pos_new=str(start_var-start_feat)
        return [pos_old,pos_new] # pos_old and pos_new are the base before the change

    # get a digit for the inversion (convert bool in int)
    @staticmethod
    def print_inversion(bool):
        if bool==True:
            return '1'
            return '0'

    # outputs the nucleotide sequence of a list of segments, corresponding to the end of a feature
    @staticmethod
    def get_sequence_list_seg(list_seg,i,feat_pos_stop,seg_seq):
        del_sequence=""
        for k in range(i,len(list_seg)):
            if k==len(list_seg)-1:
                del_sequence+=get_segment_sequence(seg_seq,list_seg[k])[0:feat_pos_stop]
            else:
                del_sequence+=get_segment_sequence(seg_seq,list_seg[k])
        return del_sequence