Skip to content
Snippets Groups Projects
Functions_output.py 27.8 KiB
Newer Older
nina.marthe_ird.fr's avatar
nina.marthe_ird.fr committed
from .Graph_gff import Segments, Features
from .Functions import *
from tqdm import tqdm


# outputs the gff line of the target genome for a feature occurence (a feature can be transfered at several positions)
def generate_target_gff(feature_id,seg_size,args,reason_features_not_transfered,match,file_out_gff):
    feature=Features[feature_id]

    # get list of the segments where it is and the first and last segment of the feature on the new genome
    [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
    inversion=detect_feature_inversion(feature.segments_list_source,feature_target_path)
    if first_seg!='': # feature present on the target genome

        if inversion:
            size_on_new_genome=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id)-get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id)+1
        else:
            size_on_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id)-get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id)+1
        size_diff=abs(size_on_new_genome-feature.size)

        [cov,id]=get_id_cov(feature_id,seg_size,feature_target_path)

        if (cov*100<args.coverage) or (id*100<args.identity):
            reason_features_not_transfered[1]+=1
            match.append(False) # store the information that this gene copy didnt pass the filters
            return "no"
            line_gff=create_line_target_gff(first_seg,last_seg,feature_id,size_diff,inversion,walk,cov,id,copy_id) # transfer the gene
            file_out_gff.write(line_gff)

            # transfer all the gene's childs
            transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size,file_out_gff)

            match.append(True) # store the information that this gene copy was transfered
            return size_diff
    else :
        reason_features_not_transfered[0]+=1
        match.append(False) # store the information that this gene copy didnt pass the filters
        return "no"
    
def transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size,file_out_gff):
    childs_list=get_child_list(feature_id)
    for child_id in childs_list:
        child=Features[child_id]
        # look for the first and last seg of the child in its parent path
        [child_first_seg,child_last_seg]=['','']
        for seg in child.segments_list_source: # get first seg in the parent path
            if seg in feature_target_path: # parent path
                child_first_seg=seg
                break
            elif invert_seg(seg) in feature_target_path:
                child_first_seg=invert_seg(seg)
                break

        if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
            for seg in reversed(child.segments_list_source): # get last seg in the parent path
                if seg in feature_target_path: # parent path
                    child_last_seg=seg
                    break
                elif invert_seg(seg) in feature_target_path:
                    child_last_seg=invert_seg(seg)
                    break

            if inversion: # calculate the child feature size
                child_size_on_new_genome=get_feature_start_on_target_genome_inv(child_last_seg,child_id,walk,copy_id)-get_feature_stop_on_target_genome_inv(child_first_seg,child_id,walk,copy_id)+1
            else:
                child_size_on_new_genome=get_feature_stop_on_target_genome(child_last_seg,child_id,walk,copy_id)-get_feature_start_on_target_genome(child_first_seg,child_id,walk,copy_id)+1
            child_size_diff=abs(child_size_on_new_genome-child.size)

            for match in child.segments_list_target: # look for the right occurence of the child feature
                if (match[0]==walk) and (match[1]==copy_id):
                    seg_list=match[2]
                    [cov,id]=get_id_cov(child_id,seg_size,seg_list)
                    break

            if inversion:
                temp=child_first_seg
                child_first_seg=child_last_seg
                child_last_seg=temp

            # transfer the child feature:
            line_gff=create_line_target_gff(child_first_seg,child_last_seg,child_id,child_size_diff,inversion,walk,cov,id,copy_id)
            file_out_gff.write(line_gff)

# handles all the transfer, variation, alignment (stats?), on the target genome.
def transfer_on_target(segments_file, out_gff, out_var,out_aln,target_genome,target_genome_paths,list_feat_absent,seg_size,args):
    print(f'     Generating the {target_genome} output')
    stats=False
    list_feature_to_transfer= Features.keys()
    segments_list={}

nina.marthe_ird.fr's avatar
nina.marthe_ird.fr committed
    # clear feature target path from the previous genome
    for feature in Features.keys(): # empty the feature target paths for the next genome treated
        Features[feature].segments_list_target=[]

    for feat in tqdm(list_feature_to_transfer,desc="        Computing features paths in target genome",unit=" feature",disable=not args.verbose):
        if Features[feat].parent=='':
            add_target_genome_paths(feat,target_genome_paths)

    if args.annotation:
        with open(out_gff,'w') as file_out_gff:
            reason_features_not_transfered=[0,0] # absent_features, low_cov_id
            diff_size_transfered_features=[0,0] # [count,sum], to get the average

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating {target_genome} gff',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target:
                        transfer_stat=generate_target_gff(feat,seg_size,args,reason_features_not_transfered,match,file_out_gff) # the childs are handled in this function
                        if transfer_stat=="no":
                            list_feat_absent.append(feat)
                        else:
                            diff_size_transfered_features[0]+=1
                            diff_size_transfered_features[1]+=transfer_stat

    if args.variation or args.alignment : # append dict of segments for which we may need the sequence
        for feat in tqdm(list_feature_to_transfer,desc="        Fetching the sequence of the segments",unit=" feature",disable=not args.verbose):
            list_seg=Features[feat].segments_list_source
            if Features[feat].parent=="":
                feature_target_path=[]
                for occurence in Features[feat].segments_list_target: # add all the occurences of the feature in the target genome
                    feature_target_path+=occurence[2]
            for segment in list_seg:
                segments_list[segment[1:]]=''
            for segment in feature_target_path:
                segments_list[segment[1:]]=''

        if not args.annotation:
            # cov and id filter tests (if not done in annotation) 
            for feature_id in tqdm(list_feature_to_transfer,desc="        Computing the coverage and sequence id of the features before transfer",disable=not args.verbose):
                feature=Features[feature_id]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target:
                        [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
                        # add the filter info in all the matches...
                        [cov,id]=get_id_cov(feature_id,seg_size,feature_target_path)

                        if (cov*100<args.coverage) or (id*100<args.identity) or (first_seg==''): # didnt put the "right_size" filter)
                            match.append(False) # store the information that this gene copy didnt pass the filters
                        else:
                            match.append(True)

    if args.variation:
        seg_seq=get_segments_sequence(segments_file,segments_list)
        with open(out_var, 'w') as file_out_var:

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating {target_genome} genes variation details',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target: # for all occurences of the gene
                        generate_variations_details(feat,seg_seq,match,file_out_var)

    if args.alignment:
        if not args.variation:
            seg_seq=get_segments_sequence(segments_file,segments_list)
        with open(out_aln,'w') as file_out_aln:
            line="Sequence alignment generated from feature path comparison in pangenome graph. Made with GrAnnoT v1.\n\n"
            file_out_aln.write(line)

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating the {args.source_genome} features alignment with {target_genome} features',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target: # for all occurences of the gene
                        print_alignment(feat,match,seg_seq,file_out_aln)


    if stats:
        # create objects for stats on how many segments are absent in target genome, their average length, etc
        feature_missing_segments=[[],[],[],[],[],[],[]] # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
        # the fist segment of the feature is missing - feature_missing_first
        # the last segment of the feature is missing - feature_missing_last
        # at least one middle segment of the feature is missing - feature_missing_middle
        # the entire feature is missing - feature_missing_all
        # at least one segment is missing first, last, or middle) - feature_missing_total
        # no segment is missing, the feature is complete - feature_ok  
        # total number of features, with missing segments or not - feature_total

        # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
        list_seg=Features[feat].segments_list_source
        [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])

        for feat in list_feature_to_transfer:
            stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feat,walk)

        if args.annotation:
            absent_features=reason_features_not_transfered[0];low_cov_id=reason_features_not_transfered[1]
            print(len(Features)-(absent_features+low_cov_id),"out of",len(Features),"features are transfered.")
            print(absent_features,"out of",len(Features),"features are not transfered because they are absent in the new genome.")
            print(low_cov_id,"out of",len(Features),"features are not transfered because their coverage or sequence identity is below threshold.")
            print("average length difference of the transfered genes : ",diff_size_transfered_features[1]/diff_size_transfered_features[0])
        stats_features(feature_missing_segments)

nina.marthe_ird.fr's avatar
nina.marthe_ird.fr committed
    #clear segment info for next transfer
    segments_on_target_genome.clear() # empty dict for the next genome treated


# gives the first and last segments of a walk (list)
def get_featurePath_ends(walk_info): # walk_info = [walk_name,copy_id,[list_seg]]
    walk_name=walk_info[0]
    if walk_name!='':
        seg_list=walk_info[2]
        copy_id=walk_info[1]
        [first_seg,last_seg]=[seg_list[0],seg_list[-1]]
    else:
        [first_seg,last_seg,walk_name,copy_id,seg_list]=['','','','',[]]
    return [first_seg,last_seg,walk_name,copy_id,seg_list]

# get the alignment between the features on the source genome and the features on the target genome 
def print_alignment(feat,match,seg_seq,file_out_aln):
    if match[0]!='':
        feature_target_path=match[2]
        if len(feature_target_path)>0: # if the feature is not completely absent
            feature_path_source_genome=Features[feat].segments_list_source
            inversion=detect_feature_inversion(feature_path_source_genome,feature_target_path)
            if inversion:
                feature_target_path=invert_segment_list(feature_target_path)

            line_aln=create_line_aln(feature_path_source_genome,feature_target_path,seg_seq,feat)
            file_out_aln.write(line_aln)


# functions to get the detail of the variations in the features

# handle the variations analysis between the feature on the source genome and the feature on the target genome
def generate_variations_details(feature_id,seg_seq,match,file_out_var):
    # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
    [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)

    # do it for the gene, then for its childs.
    if match[-1]==True: # gene passed the filters
        print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,file_out_var)

        inversion=detect_feature_inversion(Features[feature_id].segments_list_source,feature_target_path)

        # print child feature var
        childs_list=get_child_list(feature_id)
        for child_id in childs_list:
            child=Features[child_id]
            # look for the first and last seg of the child in its parent path
            [child_first_seg,child_last_seg]=['','']
            for seg in child.segments_list_source: # get first seg in the parent path
                if seg in feature_target_path: # parent path
                    child_first_seg=seg
                    break
                elif invert_seg(seg) in feature_target_path:
                    child_first_seg=invert_seg(seg)
                    break

            if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
                for seg in reversed(child.segments_list_source): # get last seg in the parent path
                    if seg in feature_target_path: # parent path
                        child_last_seg=seg
                        break
                    elif invert_seg(seg) in feature_target_path:
                        child_last_seg=invert_seg(seg)
                        break
                
                if inversion:
                    temp=child_first_seg
                    child_first_seg=child_last_seg
                    child_last_seg=temp
                
                for match in child.segments_list_target: # look for the right occurence of the child feature
                    if (match[0]==walk) and (match[1]==copy_id):
                        child_target_path=match[2]
                        break

                # treat the child feature:
                print_variations(child_id,child_first_seg,child_last_seg,copy_id,walk,seg_seq,child_target_path,file_out_var)

def print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,file_out_var):

    if first_seg!='': # if the feature is not completly absent        # add the else, output absent features
        [variation,feature_path_source_genome,feature_path_target_genome]=create_var(feature_id,first_seg,last_seg,walk,copy_id,feature_target_path) # removes the strands in the segment lists
        feature=Features[feature_id]
        feat_start=feature.start
        # loop to go through both paths with i and j
        [i,j,var_count]=[0,0,0]

        # detect and print variations ignoring the strands
        start_feat_seg_target=feature_path_target_genome[0]
        while (i<len(feature_path_source_genome)) and (j<len(feature_path_target_genome)):
            if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths

                if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
                    if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome

                        # if both segments are absent in the other genome, its a substitution
                        variation.last_seg_in_target=feature_path_target_genome[j]
                        if (variation.type=='insertion') or (variation.type=='deletion'): # print the current variation before treating the substitution
                            line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                            file_out_var.write(line)
                            reset_var(variation)
                            var_count+=1
                        if variation.type=='substitution':
                            continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
                        else: # initiate substitution
                            init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
                        i+=1;j+=1

                    else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution
                        if variation.type=='deletion': # print the current variation before treating the insertion
                            line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                            file_out_var.write(line)
                            reset_var(variation)
                            var_count+=1
                        variation.last_seg_in_target=feature_path_target_genome[j]
                        if variation.type=='insertion':
                            continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
                        elif variation.type=="substitution":
                            while feature_path_target_genome[j]!=feature_path_source_genome[i]:
                                continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,2)
                                j+=1
                            line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                            file_out_var.write(line)
                            reset_var(variation)
                            var_count+=1
                            j-=1
                        else: # intiate insertion
                            init_new_var(variation,"insertion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
                        j+=1

                elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution
                    if variation.type=='insertion': # print the current variation before treating the deletion
                        line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                        file_out_var.write(line)
                        reset_var(variation)
                        var_count+=1
                    if variation.type=='deletion':
                        continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
                    elif variation.type=="substitution":
                        while feature_path_target_genome[j]!=feature_path_source_genome[i]:
                            continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,1)
                            i+=1
                        line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                        file_out_var.write(line)
                        reset_var(variation)
                        var_count+=1
                        i-=1
                    else: # intiate deletion
                        init_new_var(variation,"deletion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
                    i+=1

                else : # if both segments are present in the other genome but not at the same position. weird case never found yet
                    # can be a substitution. check later if its not an inversion
                    variation.last_seg_in_target=feature_path_target_genome[j]
                    if (variation.type=='insertion') or (variation.type=='deletion'): # print the current variation before treating the substitution
                        line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                        file_out_var.write(line)
                        reset_var(variation)
                        var_count+=1
                    if variation.type=='substitution':
                        continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
                    else: # initiate substitution
                        init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
                    i+=1;j+=1

            else: # segment present in both, no variation. print the running indel if there is one
                if variation.type!='': # print the current variation if there is one
                    line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id)
                    file_out_var.write(line)
                    var_count+=1
                    reset_var(variation)
                variation.last_seg_in_target=feature_path_target_genome[j]
                i+=1;j+=1

        if (variation.type!=''): # if there was a current variation when we reached the end, print it
            line=print_current_var(variation,feat_start,start_feat_seg_target,feature_id,walk,copy_id,file_out_var)
            file_out_var.write(line)
            var_count+=1
            reset_var(variation)

        if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
            line=print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq)
            file_out_var.write(line)
            var_count+=1
            reset_var(variation)

        if var_count==0: # if no variation was encountered
            line=print_novar(variation)
            file_out_var.write(line)

# print a variation
def print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id):
    warning=''
    if variation.type=='insertion':
        [pos_old,pos_new]=get_old_new_pos_insertion(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
        line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n'
        return line
    elif variation.type=='deletion':
        [pos_old,pos_new]=get_old_new_pos_deletion(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
        line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n'
        return line
    elif variation.type=='substitution':
        warning=detect_small_inversion(variation)
        [pos_old,pos_new]=get_old_new_pos_substitution(feat_start,variation,start_feat_seg_target,feat,walk,copy_id)

        size_subs=f'{len(variation.ref)}/{len(variation.alt)}'
        line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
        
        # print the substitutions of different size as deletion+insertion.
        #if len(variation.ref) == len(variation.alt): # if the substituion is between two segment of the same size, print it
        #    size_subs=len(variation.ref)
        #    line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
        #else :
        #    # if the segments of the substitution have a different size, print deletion then insertion at the same position.
        #    line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n'
        #    line+=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n'
        return line

# check if a substitution could be an inversion inside a feature
def detect_small_inversion(variation):
    [list_ref_common,list_alt_common]=[list(),list()]
    list_ref_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_ref]
    list_alt_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_alt]
    for seg in variation.seg_ref:
        if seg[1:] in list_alt_unstrand:
            list_ref_common.append(seg)
    for seg in variation.seg_alt:
        if seg[1:] in list_ref_unstrand:
            list_alt_common.append(seg)
    if (len(list_ref_common)>len(list_ref_unstrand)*0.5) and (len(list_alt_common)>len(list_alt_unstrand)*0.5):
        return f'\t# Suspected inversion within this substitution.'
    else:
        return ''

# print a deletion at the end of a feature
def print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq):
    seg_del=search_segment(feature_path_source_genome[i])
    pos_old=int(Segments[seg_del].start)-int(feat_start)+1

    del_sequence=get_sequence_list_seg(feature_path_source_genome,i,feature,seg_seq)
    length=len(del_sequence)
    pos_new=str(int(variation.size_new)+1) # the deletion is at the end of the feature on the new genome

    if variation.inversion:
        inversion='1'
    else:
        inversion='0'

    line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tdeletion\t{del_sequence}\t-\t{length}\t{pos_old}\t{pos_new}\n'
    return line

# print a feature with no variation
def print_novar(variation):
    line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tno_var\t-\t-\t-\t-\t-\n'
    return line

# get a digit for the inversion (convert bool in int)
def print_inversion(bool):
    if bool==True:
        return '1'
    else:
        return '0'

# not used. 

# get all the segments from a list that are not on the target genome
def get_list_segments_missing(list_seg,segments_on_target_genome):
    segments_missing=[]
    for segment in list_seg:
            if segment not in segments_on_target_genome:
                segments_missing.append(Segments[segment])
    return segments_missing


# for two segments on the target genome, find a walk that has both
def find_common_walk(seg1,seg2):
    for walk in segments_on_target_genome[seg1]:
        if walk in segments_on_target_genome[seg2]:
            return walk
    return False