from Graph_gff import Segments, Features, write_line from Functions import * ### functions to generate a genome's gff from the graph's gff # functions to get the gff with one line per feature # outputs the feature once in a gff, from the first to the last segment present on the new genome (if the size is ok) : def generate_target_gff(first_seg,last_seg,feature_id,max_diff): if (first_seg!=''): # feature present on the target genome size_on_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id)-get_feature_start_on_target_genome(first_seg,feature_id)+1 size_diff=abs(size_on_new_genome-Features[feature_id].size) if right_size(size_on_new_genome,max_diff,feature_id): line_gff=create_line_target_gff(first_seg,last_seg,feature_id,size_diff) write_line(line_gff,output_target_gff,False) return size_diff else: return "bad_size" else : return "absent" # writes the gff of target genome using the gff of the graph def transfer_on_target(segments_file, out_target_gff, out_var,out_aln,target_genome,target_genome_paths,args): print(f'generation of {target_genome} output') stats=False list_feature_to_transfer= Features.keys() segments_list={} for feat in list_feature_to_transfer: add_target_genome_path(feat,target_genome_paths) if args.annotation: print(f' generation of {target_genome} gff') file_out=open(out_target_gff,'w') global output_target_gff output_target_gff=[0,"",file_out] bad_size_features=0 absent_features=0 diff_size_transfered_features=[0,0] # [count,sum], to get the average for feat in list_feature_to_transfer: # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome feature_target_path=Features[feat].segments_list_target [first_seg,last_seg]=get_first_last_seg(feature_target_path) transfer_stat=generate_target_gff(first_seg,last_seg,feat,args.max_difference) # insertions not considered !!! if transfer_stat=="bad_size": bad_size_features+=1 elif transfer_stat=="absent": absent_features+=1 else: diff_size_transfered_features[0]+=1 diff_size_transfered_features[1]+=transfer_stat write_line("",output_target_gff,True) file_out.close() if args.variation | args.alignment : # append dict of segments for which we may need the sequence for feat in list_feature_to_transfer: list_seg=Features[feat].segments_list_source feature_target_path=Features[feat].segments_list_target for segment in list_seg: segments_list[segment[1:]]='' for segment in feature_target_path: segments_list[segment[1:]]='' if args.variation: print(f' generation of {target_genome} genes variation details') seg_seq=get_segments_sequence(segments_file,segments_list) file_out_var = open(out_var, 'w') global output_variations output_variations=[0,"",file_out_var] for feat in list_feature_to_transfer: # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome list_seg=Features[feat].segments_list_source feature_target_path=Features[feat].segments_list_target [first_seg,last_seg]=get_first_last_seg(feature_target_path) print_variations(first_seg,last_seg,feat,seg_seq) write_line("",output_variations,True) file_out_var.close() if args.alignment: print(f' generation of {args.source_genome} genes alignment with {target_genome} genes') if not args.variation: seg_seq=get_segments_sequence(segments_file,segments_list) file_aln=open(out_aln,'w') global output_target_aln output_target_aln=[0,"",file_aln] write_line("Sequence alignment generated from feature path comparison in pangenome graph. Made with GrAnnoT v1.\n\n",output_target_aln,False) for feat in list_feature_to_transfer: list_seg=Features[feat].segments_list_source feature_target_path=Features[feat].segments_list_target [first_seg,last_seg]=get_first_last_seg(feature_target_path) print_alignment(first_seg,feat,seg_seq) write_line("",output_target_aln,True) file_aln.close() if stats: # create objects for stats on how many segments are absent in target genome, their average length, etc feature_missing_segments=[[],[],[],[],[],[],[]] # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok] # the fist segment of the feature is missing - feature_missing_first # the last segment of the feature is missing - feature_missing_last # at least one middle segment of the feature is missing - feature_missing_middle # the entire feature is missing - feature_missing_all # at least one segment is missing first, last, or middle) - feature_missing_total # no segment is missing, the feature is complete - feature_ok # total number of features, with missing segments or not - feature_total # for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome list_seg=Features[feat].segments_list_source feature_target_path=Features[feat].segments_list_target [first_seg,last_seg]=get_first_last_seg(feature_target_path) for feat in list_feature_to_transfer: stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feat) if args.annotation: print(len(Features)-(bad_size_features+absent_features),"out of",len(Features),"features are transfered.") print(bad_size_features,"out of",len(Features), "features are not transfered because they are too big or too small compared to the original genome.") print(absent_features,"out of",len(Features),"features are not transfered because they are absent in the new genome.") print("average length difference of the transfered genes : ",diff_size_transfered_features[1]/diff_size_transfered_features[0]) stats_features(feature_missing_segments) def get_first_last_seg(seg_list): if len(seg_list)!=0: first_seg=seg_list[0] last_seg=seg_list[-1] else: first_seg='' last_seg='' return [first_seg,last_seg] # fct to get the alignment between the features on the source genome and the features on the target genome def print_alignment(first_seg,feat,seg_seq): if first_seg!='': # if the feature is not completly absent feature=Features[feat] feature_path_target_genome=feature.segments_list_target feature_path_source_genome=feature.segments_list_source [feature_path_source_genome,feature_path_target_genome,inversion]=detect_feature_inversion(feature_path_source_genome,feature_path_target_genome) line_aln=create_line_aln(feature_path_source_genome,feature_path_target_genome,seg_seq,feat) write_line(line_aln,output_target_aln,False) # functions to get the detail of the variations in the features def print_variations(first_seg,last_seg,feat,seg_seq): if first_seg!='': # if the feature is not completly absent # add the else, output absent features [variation,feature_path_source_genome,feature_path_target_genome]=create_var(feat,first_seg,last_seg) # removes the strands in the segment lists feature=Features[feat] feat_start=feature.start # loop to go through both paths with i and j [i,j,var_count]=[0,0,0] # detect and print variations ignoring the strands while (i<len(feature_path_source_genome)) & (j<len(feature_path_target_genome)): if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome # if both segments are absent in the other genome, its a substitution variation.last_seg_in_target=feature_path_target_genome[j] if (variation.type=='insertion') | (variation.type=='deletion'): # print the current variation before treating the substitution print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 if variation.type=='substitution': continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0) else: # initiate substitution init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature) i+=1;j+=1 else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution if variation.type=='deletion': # print the current variation before treating the insertion print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 variation.last_seg_in_target=feature_path_target_genome[j] if variation.type=='insertion': continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0) elif variation.type=="substitution": while feature_path_target_genome[j]!=feature_path_source_genome[i]: continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,2) j+=1 print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 j-=1 else: # intiate insertion init_new_var(variation,"insertion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature) j+=1 elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution if variation.type=='insertion': # print the current variation before treating the deletion print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 if variation.type=='deletion': continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0) elif variation.type=="substitution": while feature_path_target_genome[j]!=feature_path_source_genome[i]: continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,1) i+=1 print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 i-=1 else: # intiate deletion init_new_var(variation,"deletion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature) i+=1 else : # if both segments are present in the other genome but not at the same position. weird case never found yet # can be a substitution. check later if its not an inversion variation.last_seg_in_target=feature_path_target_genome[j] if (variation.type=='insertion') | (variation.type=='deletion'): # print the current variation before treating the substitution print_current_var(variation,feat_start,feature_path_target_genome,feat) reset_var(variation) var_count+=1 if variation.type=='substitution': continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0) else: # initiate substitution init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature) i+=1;j+=1 else: # segment present in both, no variation. print the running indel if there is one if variation.type!='': # print the current variation if there is one print_current_var(variation,feat_start,feature_path_target_genome,feat) var_count+=1 reset_var(variation) variation.last_seg_in_target=feature_path_target_genome[j] i+=1;j+=1 if (variation.type!=''): # if there was a current variation when we reached the end, print it print_current_var(variation,feat_start,feature_path_target_genome,feat) var_count+=1 reset_var(variation) if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq) var_count+=1 reset_var(variation) if var_count==0: # if no variation was encountered print_novar(variation) def print_current_var(variation,feat_start,feature_path_target_genome,feat): warning='' if variation.type=='insertion': [pos_old,pos_new]=get_old_new_pos_insertion(variation,feat_start,feature_path_target_genome,feat) line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n' write_line(line,output_variations,False) elif variation.type=='deletion': [pos_old,pos_new]=get_old_new_pos_deletion(variation,feat_start,feature_path_target_genome,feat) line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n' write_line(line,output_variations,False) elif variation.type=='substitution': warning=detect_small_inversion(variation) [pos_old,pos_new]=get_old_new_pos_substitution(feat_start,variation,feature_path_target_genome,feat) size_subs=f'{len(variation.ref)}/{len(variation.alt)}' line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n' # print the substitutions of different size as deletion+insertion. #if len(variation.ref) == len(variation.alt): # if the substituion is between two segment of the same size, print it # size_subs=len(variation.ref) # line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n' #else : # # if the segments of the substitution have a different size, print deletion then insertion at the same position. # line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n' # line+=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n' write_line(line,output_variations,False) def detect_small_inversion(variation): [list_ref_common,list_alt_common]=[list(),list()] list_ref_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_ref] list_alt_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_alt] for seg in variation.seg_ref: if seg[1:] in list_alt_unstrand: list_ref_common.append(seg) for seg in variation.seg_alt: if seg[1:] in list_ref_unstrand: list_alt_common.append(seg) if (len(list_ref_common)>len(list_ref_unstrand)*0.5) & (len(list_alt_common)>len(list_alt_unstrand)*0.5): return f'\t# Suspected inversion within this substitution.' else: return '' def print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq): seg_del=search_segment(feature_path_source_genome[i]) pos_old=int(Segments[seg_del].start)-int(feat_start)+1 del_sequence=get_sequence_list_seg(feature_path_source_genome,i,feature,seg_seq) length=len(del_sequence) pos_new=str(int(variation.size_new)+1) # the deletion is at the end of the feature on the new genome line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tdeletion\t{del_sequence}\t-\t{length}\t{pos_old}\t{pos_new}\n' write_line(line,output_variations,False) def print_novar(variation): line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{variation.inversion}\t{variation.size_diff}\tno_var\t-\t-\t-\t-\t-\n' write_line(line,output_variations,False) # not used. def get_list_segments_missing(list_seg,segments_on_target_genome): segments_missing=[] for segment in list_seg: if segment not in segments_on_target_genome: segments_missing.append(Segments[segment]) return segments_missing # takes a feature and a feature type, returns a list of child features that have the wanted type. def get_child_list(feature,child_type): if type=="": return feature.childs list_childs=[] for child in feature.childs: if Features[child].type==child_type: list_childs.append(child) return list_childs