Skip to content
Snippets Groups Projects
setup_transfer.py 10.7 KiB
Newer Older
from .intersect import Features
from .Functions import add_target_genome_paths,get_id_cov,stats_feature_missing_segment,stats_features
from .usefull_little_functions import get_featurePath_ends
from .load_gfa import get_segments_sequence

from .genome_transfer import generate_target_gff
from .variation_details import generate_variations_details
from .alignment import print_alignment

from tqdm import tqdm

# handles all the transfer, variation, alignment (stats?), on the target genome.
def transfer_on_target(segments_file,genome_dir,target_genome,target_genome_paths,list_feat_absent,seg_size,args,segments_on_target_genome):

    print(f'     Generating the {target_genome} output')
    stats=False
    list_feature_to_transfer= Features.keys()
    segments_list={} # list of segments usefull for the transfer. not all segments info will be loaded.

    # create output files names
    out_gff=genome_dir.joinpath(f'{target_genome}.gff')
    out_var=genome_dir.joinpath(f'{target_genome}_var.txt')
    out_aln=genome_dir.joinpath(f'{target_genome}_aln.txt')

    # clear feature target path from the previous genome
    for feature in Features.keys(): # empty the feature target paths for the next genome treated
        Features[feature].segments_list_target=[]

    for feat in tqdm(list_feature_to_transfer,desc="        Computing features paths in target genome",unit=" feature",disable=not args.verbose):
        if Features[feat].parent=='':
            add_target_genome_paths(feat,target_genome_paths,segments_on_target_genome)

    if args.annotation:
        with open(out_gff,'w') as file_out_gff:
            reason_features_not_transfered=[0,0] # absent_features, low_cov_id
            diff_size_transfered_features=[0,0] # [count,sum], to get the average

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating {target_genome} gff',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target: # compute cov and id for all matches.
                        if match[0]=='':
                            feature_target_path=[]
                        else:
                            feature_target_path=match[2]
                        match.append(get_id_cov(feat,seg_size,feature_target_path))
                    for match in feature.segments_list_target:
                        # if option no_dupl, only transfer the best match
                        if args.no_duplication:
                            # look for best match (best cov+id)
                            all_match=feature.segments_list_target
                            best_match=all_match[0]
                            for candidate_match in all_match:
                                candidate_cov=candidate_match[3][0]
                                candidate_id=candidate_match[3][1]
                                best_cov=best_match[3][0]
                                best_id=best_match[3][1]
                                if (candidate_cov+candidate_id)>(best_cov+best_id):
                                    best_match=candidate_match

                            if match!=best_match: # if current match is not best match, dont transfer
                                match.append(False)
                                continue

                        transfer_stat=generate_target_gff(feat,seg_size,args,reason_features_not_transfered,match,file_out_gff,segments_on_target_genome) # the childs are handled in this function
                        if transfer_stat=="no":
                            list_feat_absent.append(feat)
                        else:
                            diff_size_transfered_features[0]+=1
                            diff_size_transfered_features[1]+=transfer_stat

    if args.variation or args.alignment : # append dict of segments for which we may need the sequence
        for feat in tqdm(list_feature_to_transfer,desc="        Fetching the sequence of the segments",unit=" feature",disable=not args.verbose):
            list_seg=Features[feat].segments_list_source
            if Features[feat].parent=="":
                feature_target_path=[]
                for occurence in Features[feat].segments_list_target: # add all the occurences of the feature in the target genome
                    feature_target_path+=occurence[2]
            for segment in list_seg:
                segments_list[segment[1:]]=''
            for segment in feature_target_path:
                segments_list[segment[1:]]=''

        if not args.annotation:
            # cov and id filter tests (if not done in annotation) 
            for feature_id in tqdm(list_feature_to_transfer,desc="        Computing the coverage and sequence id of the features before transfer",unit=' feature',disable=not args.verbose):
                feature=Features[feature_id]
                if feature.parent=="": # usually a gene

                    for match in feature.segments_list_target: # compute cov and id for all matches.
                        if match[0]=='':
                            feature_target_path=[]
                        else:
                            feature_target_path=match[2]
                        match.append(get_id_cov(feat,seg_size,feature_target_path))

                    for match in feature.segments_list_target:
                        [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
                        # add the filter info in all the matches...
                        [cov,id]=get_id_cov(feature_id,seg_size,feature_target_path)

                        if (cov*100<args.coverage) or (id*100<args.identity) or (first_seg==''): # didnt put the "right_size" filter)
                            match.append(False) # store the information that this gene copy didnt pass the filters
                        else:
                            # if option no_dupl, only transfer the best match
                            if args.no_duplication:
                                # look for best match (best cov+id)
                                all_match=feature.segments_list_target
                                best_match=all_match[0]
                                for candidate_match in all_match:
                                    candidate_cov=candidate_match[3][0]
                                    candidate_id=candidate_match[3][1]
                                    best_cov=best_match[3][0]
                                    best_id=best_match[3][1]
                                    if (candidate_cov+candidate_id)>(best_cov+best_id):
                                        best_match=candidate_match

                                if match!=best_match: # if current match is not best match, dont transfer
                                    match.append(False)
                                    continue
                            match.append(True)

    if args.variation:
        seg_seq=get_segments_sequence(segments_file,segments_list)
        with open(out_var, 'w') as file_out_var:
            file_out_var.write(f'# feature_id\tfeature_type\tsequence_id\ttarget_start_position\ttarget_stop_position\ttarget_feature_length\tinversion\tlength_difference\tvariation_type\tref_sequence\talt_sequence\tvariation_length\tvariation_position_on_source_feature\tvariation_position_on_target_feature\n')

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating {target_genome} genes variation details',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target: # for all occurences of the gene
                        generate_variations_details(feat,seg_seq,match,file_out_var,segments_on_target_genome)

    if args.alignment:
        if not args.variation:
            seg_seq=get_segments_sequence(segments_file,segments_list)
        with open(out_aln,'w') as file_out_aln:
            line="Sequence alignment generated from feature path comparison in pangenome graph. Made with GrAnnoT v1.\n\n"
            file_out_aln.write(line)

            for feat in tqdm(list_feature_to_transfer,desc=f'        Generating the {args.source_genome} features alignment with {target_genome} features',unit=" feature",disable=not args.verbose):
                feature=Features[feat]
                if feature.parent=="": # usually a gene
                    for match in feature.segments_list_target: # for all occurences of the gene
                        if match[4]==True:
                            print_alignment(feat,match,seg_seq,file_out_aln)


    if stats:
        # create objects for stats on how many segments are absent in target genome, their average length, etc
        feature_missing_segments=[[],[],[],[],[],[],[]] # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
        # the fist segment of the feature is missing - feature_missing_first
        # the last segment of the feature is missing - feature_missing_last
        # at least one middle segment of the feature is missing - feature_missing_middle
        # the entire feature is missing - feature_missing_all
        # at least one segment is missing first, last, or middle) - feature_missing_total
        # no segment is missing, the feature is complete - feature_ok  
        # total number of features, with missing segments or not - feature_total

        # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
        list_seg=Features[feat].segments_list_source
        [first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])

        for feat in list_feature_to_transfer:
            stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feat,walk,segments_on_target_genome)

        if args.annotation:
            absent_features=reason_features_not_transfered[0];low_cov_id=reason_features_not_transfered[1]
            print(len(Features)-(absent_features+low_cov_id),"out of",len(Features),"features are transfered.")
            print(absent_features,"out of",len(Features),"features are not transfered because they are absent in the new genome.")
            print(low_cov_id,"out of",len(Features),"features are not transfered because their coverage or sequence identity is below threshold.")
            print("average length difference of the transfered genes : ",diff_size_transfered_features[1]/diff_size_transfered_features[0])
        stats_features(feature_missing_segments)

    #clear segment info for next transfer
    segments_on_target_genome.clear() # empty dict for the next genome treated