Skip to content
Snippets Groups Projects
Functions.py 19.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • from Graph_gff import Segments, Features, get_feature_start_on_segment, get_feature_stop_on_segment
    global segments_on_target_genome
    segments_on_target_genome={}
    
    
    # get the start position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
    
    def get_feature_start_on_genome(start_seg,feat_id):
        seg_start_pos=segments_on_target_genome[start_seg][1]
        feat_start_pos=get_feature_start_on_segment(start_seg,feat_id)
    
        start=int(seg_start_pos)+int(feat_start_pos)-1
    
        return start
    
    # get the stop position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
    
    def get_feature_stop_on_genome(stop_seg,feat_id):
        seg_start_pos=segments_on_target_genome[stop_seg][1]
        feat_stop_pos=get_feature_stop_on_segment(stop_seg,feat_id)
    
        stop=int(seg_start_pos)+int(feat_stop_pos)-1
    
    NMarthe's avatar
    NMarthe committed
    
    
    
    # functions to get the detailed gff with all the fragments of the features
    
    
    NMarthe's avatar
    NMarthe committed
    # get the position of a part of a feature on the complete feature (on the original genome)
    
    def get_position_on_feature(start_seg,stop_seg,feature_start_segment,feature):
        start_on_feature=get_segment_start_on_feature(feature_start_segment,start_seg,feature)
        stop_on_feature=get_segment_stop_on_feature(start_seg,stop_seg,feature,start_on_feature)
        position=";position="+str(start_on_feature)+"-"+str(stop_on_feature)
        #position=";start_position_on_feature="+str(start_on_gene)+":stop_position_on_feature="+str(stop_on_gene)
    
    def get_segment_start_on_feature(feature_start_segment,start_seg,feature):
        feature_start_pos=Segments[feature_start_segment].start+get_feature_start_on_segment(feature_start_segment,feature.id)-1
        start_on_reference=Segments[start_seg].start+get_feature_start_on_segment(start_seg,feature.id)-1
        start_on_feature=int(start_on_reference)-int(feature_start_pos)+1
        return start_on_feature
    
    def get_segment_stop_on_feature(start_seg,stop_seg,feature,start_on_feature):
        start_on_new_genome=get_feature_start_on_genome(start_seg,feature.id)
        stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature.id)
        # length on new genome is the same as length on reference, as we only get the positions of conserved parts of the feature
        stop_on_feature=start_on_feature+(stop_on_new_genome-start_on_new_genome) # stop position : start+length
        return stop_on_feature
    
    NMarthe's avatar
    NMarthe committed
    
    # get the proportion of a part of the feature on the total length
    
    def get_proportion_of_feature_part(start_seg,stop_seg,feature):
        start_on_new_genome=get_feature_start_on_genome(start_seg,feature.id)
        stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature.id)
        # length on new genome is the same as length on reference, as we only get the positions of conserved parts of the feature
        proportion=";proportion="+str(stop_on_new_genome-start_on_new_genome+1)+"/"+str(feature.size)
    
        #proportion=";number_bases="+str(stop_new_genome-start_new_genome+1)+";total_bases="+str(feature.size)
    
    # returns the gff line to write in the output file for the function gff_detail
    def create_line_detail(last_seg,feature_id,start_seg,feat_start):
        [stop_seg,feature,chr,strand]=[last_seg[1:],Features[feature_id],segments_on_target_genome[start_seg][0],last_seg[0:1]]
    
        # start and stop position of the feature on the genome we transfer on
    
        start_on_new_genome=get_feature_start_on_genome(start_seg,feature_id) 
        stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature_id)
    
        proportion=get_proportion_of_feature_part(start_seg,stop_seg,feature)
        position=get_position_on_feature(start_seg,stop_seg,feat_start,feature)
    
        annotation=feature.annot+proportion+position
        
    
        output_line=chr+"\tGrAnnot\t"+feature.type+"\t"+str(start_on_new_genome)+"\t"+str(stop_on_new_genome)+"\t.\t"+strand+"\t.\t"+annotation+"\n"
        return output_line
    
    # functions to get the gff with one line per feature
    def right_size(size,max_diff,feat):
        if max_diff==0:
    
        return not ((size>Features[feat].size*max_diff) | (size<Features[feat].size/max_diff)) 
    
    def create_line_gff_one(first_seg,last_seg,feature_id,size_diff,list_seg):
        [chr,strand,feature]=[segments_on_target_genome[first_seg][0],list_seg[0][0:1],Features[feature_id]]
    
        variant_count=0
        for segment in list_seg:
            if segment[1:] not in segments_on_target_genome: # if a segment is absent, it is either a deletion of a substitution. insertions are not considered here.
                variant_count+=1
        annotation=feature.annot+";Size_diff="+str(size_diff)+";Nb_variants="+str(variant_count)
    
        line=chr+"  GrAnnot   "+feature.type+" "+str(get_feature_start_on_genome(first_seg,feature_id))+" "+str(get_feature_stop_on_genome(last_seg,feature_id))+"   .   "+strand+"   .   "+annotation+"\n"
        return line
    
    NMarthe's avatar
    NMarthe committed
    # functions to output the stats on the transfer
    
    def stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feature_id):
    # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
        feature_missing_segments[5].append(feature_id)
    
        if (first_seg=='') : # no segment of the feature is in the genome, the feature is missing entirely
            feature_missing_segments[3].append(feature_id)
        elif first_seg != list_seg[0][1:]: # the first segment is missing 
            feature_missing_segments[0].append(feature_id)
        elif last_seg!=list_seg[-1][1:]: # the last segment is missing
            feature_missing_segments[2].append(feature_id)
    
        # go through all the segments, check if some are missing in the middle of the feature
        elif (len(list_seg)!=1) & (feature_id not in feature_missing_segments[3]): # to access the second to last element
            for segment in list_seg[1-(len(list_seg)-2)]:
                if segment[1:] not in segments_on_target_genome:
                    feature_missing_segments[1].append(feature_id)
                    break
    
        # go through the segments, to see if one is missing anywhere on the feature
        for segment in list_seg:
            if segment[1:] not in segments_on_target_genome:
                if feature_id not in feature_missing_segments[4]:
                    feature_missing_segments[4].append(feature_id)
                    break
    
        # if the feature doesnt have a missing segment, it is complete.         ADD THE PATH CHECK FOR INSERTIONS !!
        if feature_id not in feature_missing_segments[4]:
            feature_missing_segments[6].append(feature_id)
    
    def get_annot_features(list_features):
        list_annot_features=[]
        for feature in list_features:
            list_annot_features.append(Features[feature].note)
        return list_annot_features
    
    def count_hypput_total(list_annot_first):
        total=len(list_annot_first)
        count_hypput=0
        for annot in list_annot_first:
            if ("hypothetical" in annot) | ("putative" in annot):
                count_hypput+=1
        return [count_hypput,total]
    
    # print stats on the transfer : number of feature that have segments in different positions missing. 
    def stats_features(feature_missing_segments):
    # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
        list_annot_first=get_annot_features(feature_missing_segments[0])
        [hyp_put,total]=count_hypput_total(list_annot_first)
        print("\nthe first segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_middle=get_annot_features(feature_missing_segments[1])
        [hyp_put,total]=count_hypput_total(list_annot_middle)
        print("a middle segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_last=get_annot_features(feature_missing_segments[2])
        [hyp_put,total]=count_hypput_total(list_annot_last)
        print("the last segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_all=get_annot_features(feature_missing_segments[3])
        [hyp_put,total]=count_hypput_total(list_annot_all)
        print(total,"features are entirely missing, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_total=get_annot_features(feature_missing_segments[4])
        [hyp_put,total]=count_hypput_total(list_annot_total)
        print("there is at least one segment missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_ok=get_annot_features(feature_missing_segments[6])
        [hyp_put,total]=count_hypput_total(list_annot_ok)
        print(total ,"features are entirely present in the new genome, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
        list_annot_features=get_annot_features(feature_missing_segments[5])
        [hyp_put,total]=count_hypput_total(list_annot_features)
        print("there is", total,"features in total, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
    
    
    
    
    # functions to generate the different gffs
    
    NMarthe's avatar
    NMarthe committed
    def get_segments_positions_on_genome(pos_seg):
    
        #global segments_on_target_genome
        #segments_on_target_genome={}
    
    NMarthe's avatar
    NMarthe committed
        bed=open(pos_seg,'r')
        lines=bed.readlines() # read line by line ?
        bed.close()
        for line in lines:
            line=line.split()
            [seg,chrom,start,stop]=[line[3][1:],line[0],line[1],line[2]]
            if line[3][0:1]=='>':
                strand='+'
            elif line[3][0:1]=='<':
                strand='-'
            else:
                strand=''
            segments_on_target_genome[seg]=[chrom,start,stop,strand]
    
    def get_segments_sequence_and_paths(gfa):
        file_gfa=open(gfa,'r')
        lines_gfa=file_gfa.readlines()
        file_gfa.close()
        seg_seq={}
        paths={}
        for line in lines_gfa:
            line=line.split()
            if (line[0]=="S"): # get the sequence of the segment
                seg_id='s'+line[1]
                seg_seq[seg_id]=line[2]
    
            if (line[0]=="W") & (line[1]!="_MINIGRAPH_"): # get the walk of the genome
                
                path=line[6].replace(">",";>")
                path=path.replace("<",";<").split(';')
                list_path=[]
                for segment in path:
                    if segment[0:1]=='>':
                        list_path.append('+s'+segment[1:])
                    elif segment[0:1]=='<':
                        list_path.append('-s'+segment[1:])
                    else:
                        list_path.append('s'+segment[1:])
     
                paths[line[3]]=list_path
        return [paths,seg_seq]
    
    def get_first_seg(list_seg):
        first_seg_found=''
        for segment in list_seg:
            if segment[1:] in segments_on_target_genome:
                first_seg_found=segment[1:]
                break
        return first_seg_found
    
    def get_all_features_in_gff(gff):
        gff=open(gff,'r')
        lines=gff.readlines()
        gff.close()
        list_feature=[]
        for line in lines:
            feature_id=(line.split()[8].split(";")[0].split("=")[1].replace(".","_").replace(":","_"))
            if feature_id not in list_feature:
                list_feature.append(feature_id)
        return list_feature
    
    
    
    
    # functions to get the detail of the variations in the features
    
    def compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand):
        # get the list of segments in common
        seg_common=[]
        for segment in list_1_unstrand:
            if segment in list_2_unstrand:
                seg_common.append(segment)
    
        # for each segment in common, check if the strand is the same. check index in list unstranded to get the segment in list stranded
        same_strand_count=0
        for segment in seg_common:
            index_1=list_1_unstrand.index(segment)
            index_2=list_2_unstrand.index(segment)
            if list_1[index_1]==list_2[index_2]:
                same_strand_count+=1
        return [seg_common,same_strand_count]
    
    def get_feature_path(paths,first_seg,last_seg):
        # find the path in azucena. 
        first_strand=segments_on_target_genome[first_seg][3]
        first_seg_stranded=first_strand+first_seg
        last_strand=segments_on_target_genome[last_seg][3]
        last_seg_stranded=last_strand+last_seg
        id_first_seg=int(paths["CM020642.1_Azucena_chromosome10"].index(first_seg_stranded))
        id_last_seg=int(paths["CM020642.1_Azucena_chromosome10"].index(last_seg_stranded))
        first_index=min(id_first_seg,id_last_seg)
        last_index=max(id_last_seg,id_first_seg)
        list_segfeat_azu=paths["CM020642.1_Azucena_chromosome10"][first_index:last_index+1]
        return list_segfeat_azu
    
    def get_sequence_list_seg(list_segfeat_nb,i,feature,seg_seq):
        del_sequence=""
        for k in range(i,len(list_segfeat_nb)):
    
                del_sequence+=seg_seq[list_segfeat_nb[k]][0:feature.pos_stop]
            else:
                del_sequence+=seg_seq[list_segfeat_nb[k]]
        return del_sequence
    
    class Variation:
        def __init__(self,feature_id,feature_type,chr,start_new,stop_new,inversion,size_diff):
            self.feature_id=feature_id
            self.feature_type=feature_type
            self.chr=chr
            self.start_new=start_new
            self.stop_new=stop_new
            self.inversion=str(inversion)
            self.size_diff=str(size_diff)
            self.size_new=str(self.stop_new-self.start_new+1)
            self.type=''
            self.last_seg_in_target=''
            
        # ajouter fct pour écrire la ligne. return line.    # line : formated line f"{feat}\t"
    
        #def __str__(self):
        #    return f"id={self.id}, position on the original genome={self.chr}:{self.start}-{self.stop}, size={self.size}, features={self.features}"
    
    def create_var(feature_id,first_seg,last_seg,paths):
        feature=Features[feature_id]
        start_new_genome=get_feature_start_on_genome(first_seg,feature_id)
        stop_new_genome=get_feature_stop_on_genome(last_seg,feature_id)
        size_new_genome=int(stop_new_genome)-int(start_new_genome)+1
        size_diff=str(size_new_genome-feature.size)
    
        # get feature paths on the original genome and on the target genome
        list_segfeat_azu=get_feature_path(paths,first_seg,last_seg)
        list_segfeat_nb=feature.segments_list
    
        [list_segfeat_nb,list_segfeat_azu,inversion]=detect_inversion(list_segfeat_nb,list_segfeat_azu)
    
        variation=Variation(feature_id,feature.type,feature.chr,start_new_genome,stop_new_genome,inversion,size_diff)
    
        return(variation,list_segfeat_nb,list_segfeat_azu)
    
    def reset_var(variation):
        variation.type='' # type énuméré.
        variation.size_var=0
        variation.start_var=''
        variation.ref=''
        variation.alt=''
    
    
    def get_old_new_pos_substitution(feat_start,variation,list_segfeat_azu,feat,j):
    
        #pos_old=int(Segments[list_segfeat_nb[i]].start)-int(feat_start)+1
        pos_old=str(int(Segments[variation.start_var].start)-int(feat_start)+1)
        start_feat=get_feature_start_on_genome(list_segfeat_azu[0],feat)
        start_var=int(segments_on_target_genome[variation.start_on_target][1])
        pos_new=str(start_var-start_feat+1)
        return [pos_old,pos_new]
    
    
    def get_old_new_pos_insertion(variation,feat_start,list_segfeat_azu,feat):
    
        pos_old=str(int(Segments[variation.start_var].start)-int(feat_start)+1)
    
        start_feat=get_feature_start_on_genome(list_segfeat_azu[0],feat) 
        start_var=int(segments_on_target_genome[variation.start_var][1])-1
        pos_new=str(start_var-start_feat+1)
        return [pos_old,pos_new]
    
    
    def get_old_new_pos_deletion(variation,feat_start,list_segfeat_azu,feat,i):
    
        if i==0:
            pos_old=int(Segments[variation.start_var].start)-int(feat_start)+1+Features[feat].pos_start
        else:
            pos_old=int(Segments[variation.start_var].start)-int(feat_start)+1
            if pos_old<0:
                pos_old=0
    
        if variation.last_seg_in_target=="": # deletion of the beggining of the feature, so no segment placed in the new genome yet. 
            pos_new="1"
        else:
            start_feat=get_feature_start_on_genome(list_segfeat_azu[0],feat) 
            start_var=int(segments_on_target_genome[variation.last_seg_in_target][2])+1 
            pos_new=str(start_var-start_feat+1)
        
        return [pos_old,pos_new]
    
    def init_new_var(variation,type,list_segfeat_nb,list_segfeat_azu,i,j,seg_seq,feature):
        variation.type=type
        variation.start_var=list_segfeat_nb[i]
        if type=="substitution":
            variation.start_on_target=list_segfeat_azu[j]
            variation.ref=seg_seq[list_segfeat_nb[i]]
            variation.alt=seg_seq[list_segfeat_azu[j]]
        elif type=="insertion":
            variation.ref="-"
            variation.alt=seg_seq[list_segfeat_azu[j]]
        elif type=="deletion":
            if i==0: # if the deletion is at the start of the feature, the deletion doesnt always start at the start at the first segment : 
                #use pos_start, position of the feature on its first segment
                variation.ref=seg_seq[list_segfeat_nb[i]][feature.pos_start-1:]
            else: # else, the deletion will always start at the start of the first segment.
                variation.ref=seg_seq[list_segfeat_nb[i]]
            variation.alt="-"
    
    def continue_var(variation,seg_seq,list_segfeat_nb,list_segfeat_azu,i,j):
    
        if variation.type=="substitution":
            variation.ref+=seg_seq[list_segfeat_nb[i]]
            variation.alt+=seg_seq[list_segfeat_azu[j]]
        elif variation.type=="insertion":
            variation.alt+=seg_seq[list_segfeat_azu[j]]
        elif variation.type=="deletion":
            variation.ref+=seg_seq[list_segfeat_nb[i]]
    
    def get_common_segments(list1,list2):
        list_output=[]
        for elem in list1:
            if elem in list2:
                list_output.append(elem)
        return list_output
    
    def detect_segment_order_inversion(list_1,list_2):
        cpt=0
        i=0
        list_1_common=get_common_segments(list_1,list_2)
        list_2_common=get_common_segments(list_2,list_1)
        list_2_common_reversed=list(reversed(list_2_common))
        while i<len(list_1_common):
            #if list_2_common[i]!=list_1_common[i]: pour l'option 1 qui semble fonctionner ??
            #    cpt+=1
            #i+=1
            if list_2_common_reversed[i]==list_1_common[i]: # pour l'option 3
                cpt+=1
            i+=1
        #if cpt>len(list_1_common)/2: # if more than half of the common segments are not at the same position ?? semble fonctionner
        #if list_1_common==reversed(list_2_common): fonctionne pas
        if cpt>len(list_1_common)*0.9:# if more than 90% of the segments are on the same position when the lists are reversed, there is an inversion. 
            #print("ee")
            return [True,list_1,list(reversed(list_2))]
        else:
            return [False,list_1,list_2]
    
    # takes two lists of segments for two genes, check if the first list is an inversion of the second one (if the segments in common are on the opposite strand)
    def detect_inversion(list_1,list_2):
        # removes strand for the lists of stranded segments
        list_1_unstrand=[segment_stranded[1:] for segment_stranded in list_1]
        list_2_unstrand=[segment_stranded[1:] for segment_stranded in list_2]
        [seg_common,same_strand_count]=compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand)
    
        # if there is less than 10% of strand difference among the common segments (ie more than 90% of same strand), return 0, no inversion 
    
        #if len(seg_common)-same_strand_count<len(seg_common)/10:
        if same_strand_count>=len(seg_common)*0.9: # if more than 90% of segments shared have the same strand, no inversion
    
            if same_strand_count!=0:
                print("ignored strand difference :/")
    
    
        # check if we have an inversion for the order of the segmetns. double inversion = no inversion. 
        [segment_order_inversion,list_segfeat_nb,list_segfeat_azu]=detect_segment_order_inversion(list_1_unstrand,list_2_unstrand)
        if segment_order_inversion:
           inversion=abs(inversion-1)
        return [list_segfeat_nb,list_segfeat_azu,inversion]