-
nina.marthe_ird.fr authorednina.marthe_ird.fr authored
Functions_output.py 24.04 KiB
from Graph_gff import Segments, Features, write_line
from Functions import *
# outputs the gff line of the target genome for a feature occurence (a feature can be transfered at several positions)
def generate_target_gff(feature_id,seg_size,args,reason_features_not_transfered,match):
feature=Features[feature_id]
max_diff=args.max_difference
# get list of the segments where it is and the first and last segment of the feature on the new genome
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
inversion=detect_feature_inversion(feature.segments_list_source,feature_target_path)
if first_seg!='': # feature present on the target genome
if inversion:
size_on_new_genome=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id)-get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id)+1
else:
size_on_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id)-get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id)+1
size_diff=abs(size_on_new_genome-feature.size)
[cov,id]=get_id_cov(feature_id,seg_size,feature_target_path)
if (cov*100<args.coverage) or (id*100<args.identity):
reason_features_not_transfered[2]+=1
match.append(False) # store the information that this gene copy didnt pass the filters
return "no"
if right_size(size_on_new_genome,max_diff,feature_id):
line_gff=create_line_target_gff(first_seg,last_seg,feature_id,size_diff,inversion,walk,cov,id,copy_id) # transfer the gene
write_line(line_gff,output_target_gff,False)
# transfer all the gene's childs
transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size)
match.append(True) # store the information that this gene copy was transfered
return size_diff
else:
reason_features_not_transfered[0]+=1
match.append(False) # store the information that this gene copy didnt pass the filters
return "no"
else :
reason_features_not_transfered[1]+=1
match.append(False) # store the information that this gene copy didnt pass the filters
return "no"
def transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size):
childs_list=get_child_list(feature_id)
for child_id in childs_list:
child=Features[child_id]
# look for the first and last seg of the child in its parent path
[child_first_seg,child_last_seg]=['','']
for seg in child.segments_list_source: # get first seg in the parent path
if seg in feature_target_path: # parent path
child_first_seg=seg
break
elif invert_seg(seg) in feature_target_path:
child_first_seg=invert_seg(seg)
break
if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
for seg in reversed(child.segments_list_source): # get last seg in the parent path
if seg in feature_target_path: # parent path
child_last_seg=seg
break
elif invert_seg(seg) in feature_target_path:
child_last_seg=invert_seg(seg)
break
if inversion: # calculate the child feature size
child_size_on_new_genome=get_feature_start_on_target_genome_inv(child_last_seg,child_id,walk,copy_id)-get_feature_stop_on_target_genome_inv(child_first_seg,child_id,walk,copy_id)+1
else:
child_size_on_new_genome=get_feature_stop_on_target_genome(child_last_seg,child_id,walk,copy_id)-get_feature_start_on_target_genome(child_first_seg,child_id,walk,copy_id)+1
child_size_diff=abs(child_size_on_new_genome-child.size)
for match in child.segments_list_target: # look for the right occurence of the child feature
if (match[0]==walk) and (match[1]==copy_id):
seg_list=match[2]
[cov,id]=get_id_cov(child_id,seg_size,seg_list)
# transfer the child feature:
line_gff=create_line_target_gff(child_first_seg,child_last_seg,child_id,child_size_diff,inversion,walk,cov,id,copy_id)
write_line(line_gff,output_target_gff,False)
# handles all the transfer, variation, alignment (stats?), on the target genome.
def transfer_on_target(segments_file, out_target_gff, out_var,out_aln,target_genome,target_genome_paths,list_feat_absent,seg_size,args):
print(f'generation of {target_genome} output')
stats=False
list_feature_to_transfer= Features.keys()
segments_list={}
for feat in list_feature_to_transfer:
if Features[feat].type=="gene":
add_target_genome_paths(feat,target_genome_paths)
if args.annotation:
print(f' generation of {target_genome} gff')
file_out=open(out_target_gff,'w')
global output_target_gff
output_target_gff=[0,"",file_out]
reason_features_not_transfered=[0,0,0] # bad_size, absent_features, low_cov_id
diff_size_transfered_features=[0,0] # [count,sum], to get the average
for feat in list_feature_to_transfer:
feature=Features[feat]
if feature.parent=="": # usually a gene
for match in feature.segments_list_target:
transfer_stat=generate_target_gff(feat,seg_size,args,reason_features_not_transfered,match) # the childs are handled in this function
if transfer_stat=="no":
list_feat_absent.append(feat)
else:
diff_size_transfered_features[0]+=1
diff_size_transfered_features[1]+=transfer_stat
write_line("",output_target_gff,True)
file_out.close()
if args.variation or args.alignment : # append dict of segments for which we may need the sequence
for feat in list_feature_to_transfer:
list_seg=Features[feat].segments_list_source
feature_target_path=Features[feat].segments_list_target[0][1]
for segment in list_seg:
segments_list[segment[1:]]=''
for segment in feature_target_path:
segments_list[segment[1:]]=''
if args.variation:
print(f' generation of {target_genome} genes variation details')
seg_seq=get_segments_sequence(segments_file,segments_list)
file_out_var = open(out_var, 'w')
global output_variations
output_variations=[0,"",file_out_var]
for feat in list_feature_to_transfer:
# for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
list_seg=Features[feat].segments_list_source
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])
print_variations(first_seg,last_seg,feat,seg_seq,walk,copy_id)
write_line("",output_variations,True)
file_out_var.close()
if args.alignment:
print(f' generation of {args.source_genome} genes alignment with {target_genome} genes')
if not args.variation:
seg_seq=get_segments_sequence(segments_file,segments_list)
file_aln=open(out_aln,'w')
global output_target_aln
output_target_aln=[0,"",file_aln]
write_line("Sequence alignment generated from feature path comparison in pangenome graph. Made with GrAnnoT v1.\n\n",output_target_aln,False)
for feat in list_feature_to_transfer:
list_seg=Features[feat].segments_list_source
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])
print_alignment(first_seg,feat,seg_seq)
write_line("",output_target_aln,True)
file_aln.close()
if stats:
# create objects for stats on how many segments are absent in target genome, their average length, etc
feature_missing_segments=[[],[],[],[],[],[],[]] # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
# the fist segment of the feature is missing - feature_missing_first
# the last segment of the feature is missing - feature_missing_last
# at least one middle segment of the feature is missing - feature_missing_middle
# the entire feature is missing - feature_missing_all
# at least one segment is missing first, last, or middle) - feature_missing_total
# no segment is missing, the feature is complete - feature_ok
# total number of features, with missing segments or not - feature_total
# for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
list_seg=Features[feat].segments_list_source
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])
for feat in list_feature_to_transfer:
stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feat,walk)
if args.annotation:
bad_size_features=reason_features_not_transfered[0];absent_features=reason_features_not_transfered[1];low_cov_id=reason_features_not_transfered[2]
print(len(Features)-(bad_size_features+absent_features+low_cov_id),"out of",len(Features),"features are transfered.")
print(bad_size_features,"out of",len(Features), "features are not transfered because they are too big or too small compared to the original genome.")
print(absent_features,"out of",len(Features),"features are not transfered because they are absent in the new genome.")
print(low_cov_id,"out of",len(Features),"features are not transfered because their coverage or sequence identity is below threshold.")
print("average length difference of the transfered genes : ",diff_size_transfered_features[1]/diff_size_transfered_features[0])
stats_features(feature_missing_segments)
segments_on_target_genome.clear() # empty dict
# gives the first and last segments of a walk (list)
def get_featurePath_ends(walk_info): # walk_info = [walk_name,copy_id,[list_seg]]
walk_name=walk_info[0]
if walk_name!='':
seg_list=walk_info[2]
copy_id=walk_info[1]
[first_seg,last_seg]=[seg_list[0],seg_list[-1]]
else:
[first_seg,last_seg,walk_name,copy_id,seg_list]=['','','','',[]]
return [first_seg,last_seg,walk_name,copy_id,seg_list]
# get the alignment between the features on the source genome and the features on the target genome
def print_alignment(first_seg,feat,seg_seq):
if first_seg!='': # if the feature is not completly absent
feature=Features[feat]
feature_path_target_genome=feature.segments_list_target[0][1]
feature_path_source_genome=feature.segments_list_source
inversion=detect_feature_inversion(feature_path_source_genome,feature_path_target_genome)
if inversion:
feature_path_target_genome=invert_segment_list(feature_path_target_genome)
line_aln=create_line_aln(feature_path_source_genome,feature_path_target_genome,seg_seq,feat)
write_line(line_aln,output_target_aln,False)
# functions to get the detail of the variations in the features
# handle the variations analysis between the feature on the source genome and the feature on the target genome
def print_variations(first_seg,last_seg,feat,seg_seq,walk,copy_id):
if first_seg!='': # if the feature is not completly absent # add the else, output absent features
[variation,feature_path_source_genome,feature_path_target_genome]=create_var(feat,first_seg,last_seg,walk,copy_id) # removes the strands in the segment lists
feature=Features[feat]
feat_start=feature.start
# loop to go through both paths with i and j
[i,j,var_count]=[0,0,0]
# detect and print variations ignoring the strands
start_feat_seg_target=feature_path_target_genome[0]
while (i<len(feature_path_source_genome)) and (j<len(feature_path_target_genome)):
if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths
if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome
# if both segments are absent in the other genome, its a substitution
variation.last_seg_in_target=feature_path_target_genome[j]
if (variation.type=='insertion') or (variation.type=='deletion'): # print the current variation before treating the substitution
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
if variation.type=='substitution':
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
else: # initiate substitution
init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
i+=1;j+=1
else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution
if variation.type=='deletion': # print the current variation before treating the insertion
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
variation.last_seg_in_target=feature_path_target_genome[j]
if variation.type=='insertion':
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
elif variation.type=="substitution":
while feature_path_target_genome[j]!=feature_path_source_genome[i]:
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,2)
j+=1
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
j-=1
else: # intiate insertion
init_new_var(variation,"insertion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
j+=1
elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution
if variation.type=='insertion': # print the current variation before treating the deletion
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
if variation.type=='deletion':
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
elif variation.type=="substitution":
while feature_path_target_genome[j]!=feature_path_source_genome[i]:
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,1)
i+=1
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
i-=1
else: # intiate deletion
init_new_var(variation,"deletion",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
i+=1
else : # if both segments are present in the other genome but not at the same position. weird case never found yet
# can be a substitution. check later if its not an inversion
variation.last_seg_in_target=feature_path_target_genome[j]
if (variation.type=='insertion') or (variation.type=='deletion'): # print the current variation before treating the substitution
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
reset_var(variation)
var_count+=1
if variation.type=='substitution':
continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,0)
else: # initiate substitution
init_new_var(variation,"substitution",feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature)
i+=1;j+=1
else: # segment present in both, no variation. print the running indel if there is one
if variation.type!='': # print the current variation if there is one
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
var_count+=1
reset_var(variation)
variation.last_seg_in_target=feature_path_target_genome[j]
i+=1;j+=1
if (variation.type!=''): # if there was a current variation when we reached the end, print it
print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
var_count+=1
reset_var(variation)
if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq)
var_count+=1
reset_var(variation)
if var_count==0: # if no variation was encountered
print_novar(variation)
# print a variation
def print_current_var(variation,feat_start,start_feat_seg_target,feat,walk,copy_id):
warning=''
if variation.type=='insertion':
[pos_old,pos_new]=get_old_new_pos_insertion(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n'
write_line(line,output_variations,False)
elif variation.type=='deletion':
[pos_old,pos_new]=get_old_new_pos_deletion(variation,feat_start,start_feat_seg_target,feat,walk,copy_id)
line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n'
write_line(line,output_variations,False)
elif variation.type=='substitution':
warning=detect_small_inversion(variation)
[pos_old,pos_new]=get_old_new_pos_substitution(feat_start,variation,start_feat_seg_target,feat,walk,copy_id)
size_subs=f'{len(variation.ref)}/{len(variation.alt)}'
line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
# print the substitutions of different size as deletion+insertion.
#if len(variation.ref) == len(variation.alt): # if the substituion is between two segment of the same size, print it
# size_subs=len(variation.ref)
# line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tsubstitution\t{variation.ref}\t{variation.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
#else :
# # if the segments of the substitution have a different size, print deletion then insertion at the same position.
# line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tdeletion\t{variation.ref}\t-\t{len(variation.ref)}\t{pos_old}\t{pos_new}{warning}\n'
# line+=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tinsertion\t-\t{variation.alt}\t{len(variation.alt)}\t{pos_old}\t{pos_new}{warning}\n'
write_line(line,output_variations,False)
# check if a substitution could be an inversion inside a feature
def detect_small_inversion(variation):
[list_ref_common,list_alt_common]=[list(),list()]
list_ref_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_ref]
list_alt_unstrand=[segment_stranded[1:] for segment_stranded in variation.seg_alt]
for seg in variation.seg_ref:
if seg[1:] in list_alt_unstrand:
list_ref_common.append(seg)
for seg in variation.seg_alt:
if seg[1:] in list_ref_unstrand:
list_alt_common.append(seg)
if (len(list_ref_common)>len(list_ref_unstrand)*0.5) and (len(list_alt_common)>len(list_alt_unstrand)*0.5):
return f'\t# Suspected inversion within this substitution.'
else:
return ''
# print a deletion at the end of a feature
def print_last_deletion(variation,feature_path_source_genome,i,feat_start,feature,seg_seq):
seg_del=search_segment(feature_path_source_genome[i])
pos_old=int(Segments[seg_del].start)-int(feat_start)+1
del_sequence=get_sequence_list_seg(feature_path_source_genome,i,feature,seg_seq)
length=len(del_sequence)
pos_new=str(int(variation.size_new)+1) # the deletion is at the end of the feature on the new genome
if variation.inversion:
inversion='1'
else:
inversion='0'
line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{inversion}\t{variation.size_diff}\tdeletion\t{del_sequence}\t-\t{length}\t{pos_old}\t{pos_new}\n'
write_line(line,output_variations,False)
# print a feature with no variation
def print_novar(variation):
line=f'{variation.feature_id}\t{variation.feature_type}\t{variation.chr}\t{variation.start_new}\t{variation.stop_new}\t{variation.size_new}\t{print_inversion(variation.inversion)}\t{variation.size_diff}\tno_var\t-\t-\t-\t-\t-\n'
write_line(line,output_variations,False)
# get a digit for the inversion (convert bool in int)
def print_inversion(bool):
if bool==True:
return '1'
else:
return '0'
# not used.
# get all the segments from a list that are not on the target genome
def get_list_segments_missing(list_seg,segments_on_target_genome):
segments_missing=[]
for segment in list_seg:
if segment not in segments_on_target_genome:
segments_missing.append(Segments[segment])
return segments_missing
# for two segments on the target genome, find a walk that has both
def find_common_walk(seg1,seg2):
for walk in segments_on_target_genome[seg1]:
if walk in segments_on_target_genome[seg2]:
return walk
return False