Newer
Older
from Graph_gff import Segments, Features, get_feature_start_on_segment, get_feature_stop_on_segment
global segments_on_target_genome
segments_on_target_genome={}
# get the start position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_start_on_target_genome(start_seg,feat_id):
seg_start_pos=segments_on_target_genome[start_seg][1]
feat_start_pos=get_feature_start_on_segment(start_seg,feat_id)
return seg_start_pos+feat_start_pos-1
# get the stop position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_stop_on_target_genome(stop_seg,feat_id):
seg_start_pos=segments_on_target_genome[stop_seg][1]
feat_stop_pos=get_feature_stop_on_segment(stop_seg,feat_id)
return seg_start_pos+feat_stop_pos-1
# functions to get the gff with one line per feature
def right_size(size,max_diff,feat):
if max_diff==0:
return True
return not ((size>Features[feat].size*max_diff) | (size<Features[feat].size/max_diff))

nina.marthe_ird.fr
committed
def create_line_target_gff(first_seg,last_seg,feature_id,size_diff,list_seg):
NMarthe
committed
[chr,strand,feature]=[segments_on_target_genome[first_seg][0],Features[feature_id].strand,Features[feature_id]]
variant_count=0
for segment in list_seg:
if segment[1:] not in segments_on_target_genome: # if a segment is absent, it is either a deletion of a substitution. insertions are not considered here.
variant_count+=1
annotation=f'{feature.annot};Size_diff={size_diff};Nb_variants={variant_count}'
output_line=f'{chr}\tGrAnnoT\t{feature.type}\t{get_feature_start_on_target_genome(first_seg,feature_id)}\t{get_feature_stop_on_target_genome(last_seg,feature_id)}\t.\t{strand}\t.\t{annotation}\n'

nina.marthe_ird.fr
committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# functions to get the clustal alignment for the transfered genes
# create clustal aln for a feature
def segment_aln(type,seg_seq,seg_a,seg_b):
match type:
case "identity":
seq_aln=seg_seq[seg_a[1:]]
line_a=seq_aln
line_b=seq_aln
len_aln=len(seq_aln)
line_c=len_aln*"*"
case "substitution":
seq_aln_a=seg_seq[seg_a[1:]]
seq_aln_b=seg_seq[seg_b[1:]]
len_a=len(seq_aln_a)
len_b=len(seq_aln_b)
if len_a>len_b:
diff_len=len_a-len_b
line_a=seq_aln_a
line_b=seq_aln_b+diff_len*"-"
line_c=len_a*" "
else:
diff_len=len_b-len_a
line_a=seq_aln_a+diff_len*"-"
line_b=seq_aln_b
line_c=len_b*" "
case "insertion":
seq_aln_b=seg_seq[seg_b[1:]]
len_b=len(seq_aln_b)
line_a=len_b*"-"
line_b=seq_aln_b
line_c=len_b*" "
case "deletion":
seq_aln_a=seg_seq[seg_a[1:]]
len_a=len(seq_aln_a)
line_a=seq_aln_a
line_b=len_a*"-"
line_c=len_a*" "
case "end_deletion":
seq_aln_a=""
for segment in seg_a:
seq_aln_a+=seg_seq[segment[1:]]
len_a=len(seq_aln_a)
line_a=seq_aln_a
line_b=len_a*"-"
line_c=len_a*" "
return [line_a,line_b,line_c] # check the orientation of the segment later
def parse_clustal_lines(line_a,line_b,line_c,feature_id):
if (len(line_a)!=len(line_b)) | (len(line_b)!=len(line_c)):
print("line length differ in clustal alignment")
len_to_parse=len(line_a)
len_parsed=0
clustal_line=""
nb_res_a=0
nb_res_b=0
while len_parsed<len_to_parse:
len_header=len(feature_id)+11
headers=[feature_id+"_source ",feature_id+"_target ",len_header*" "]
add_a=line_a[len_parsed:len_parsed+60]
add_b=line_b[len_parsed:len_parsed+60]
add_c=line_c[len_parsed:len_parsed+60]
nb_res_a+=len(add_a)-add_a.count("-")
nb_res_b+=len(add_b)-add_b.count("-")
clustal_line+=f'{headers[0]}{add_a} {nb_res_a}\n'
clustal_line+=f'{headers[1]}{add_b} {nb_res_b}\n'
clustal_line+=f'{headers[2]}{add_c}\n\n'
len_parsed+=60
clustal_line+="\n"
return clustal_line
def create_line_clustal(feature_path_source_genome,feature_path_target_genome,seg_seq,feature_id):

nina.marthe_ird.fr
committed
line_a=""
line_b=""
line_c=""
[i,j]=[0,0]
while (i<len(feature_path_source_genome)) & (j<len(feature_path_target_genome)):
if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths
if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome

nina.marthe_ird.fr
committed
# substitution
[add_a,add_b,add_c]=segment_aln("substitution",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j])

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution
[add_a,add_b,add_c]=segment_aln("insertion",seg_seq,"",feature_path_target_genome[j])

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
j+=1
elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution
[add_a,add_b,add_c]=segment_aln("deletion",seg_seq,feature_path_source_genome[i],"")

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1
else : # if both segments are present in the other genome but not at the same position. weird case never found yet
[add_a,add_b,add_c]=segment_aln("substitution",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j])

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
else: # segment present in both, no variation. print the running indel if there is one
[add_a,add_b,add_c]=segment_aln("identity",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j])

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
[add_a,add_b,add_c]=segment_aln("end_deletion",seg_seq,feature_path_source_genome[i:],"")

nina.marthe_ird.fr
committed
line_a+=add_a;line_b+=add_b;line_c+=add_c
return parse_clustal_lines(line_a,line_b,line_c,feature_id)
# functions to output the stats on the transfer
def stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feature_id):
# [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
feature_missing_segments[5].append(feature_id)
if first_seg=='' : # no segment of the feature is in the genome, the feature is missing entirely
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
feature_missing_segments[3].append(feature_id)
elif first_seg != list_seg[0][1:]: # the first segment is missing
feature_missing_segments[0].append(feature_id)
elif last_seg!=list_seg[-1][1:]: # the last segment is missing
feature_missing_segments[2].append(feature_id)
# go through all the segments, check if some are missing in the middle of the feature
elif (len(list_seg)!=1) & (feature_id not in feature_missing_segments[3]): # to access the second to last element
for segment in list_seg[1-(len(list_seg)-2)]:
if segment[1:] not in segments_on_target_genome:
feature_missing_segments[1].append(feature_id)
break
# go through the segments, to see if one is missing anywhere on the feature
for segment in list_seg:
if segment[1:] not in segments_on_target_genome:
if feature_id not in feature_missing_segments[4]:
feature_missing_segments[4].append(feature_id)
break
# if the feature doesnt have a missing segment, it is complete. ADD THE PATH CHECK FOR INSERTIONS !!
if feature_id not in feature_missing_segments[4]:
feature_missing_segments[6].append(feature_id)
def get_annot_features(list_features):
list_annot_features=[]
for feature in list_features:
list_annot_features.append(Features[feature].note)
return list_annot_features
def count_hypput_total(list_annot_first):
total=len(list_annot_first)
count_hypput=0
for annot in list_annot_first:
if ("hypothetical" in annot) | ("putative" in annot):
count_hypput+=1
return [count_hypput,total]
# print stats on the transfer : number of feature that have segments in different positions missing.
def stats_features(feature_missing_segments):
# [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
list_annot_first=get_annot_features(feature_missing_segments[0])
[hyp_put,total]=count_hypput_total(list_annot_first)
print("\nthe first segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_middle=get_annot_features(feature_missing_segments[1])
[hyp_put,total]=count_hypput_total(list_annot_middle)
print("a middle segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_last=get_annot_features(feature_missing_segments[2])
[hyp_put,total]=count_hypput_total(list_annot_last)
print("the last segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_all=get_annot_features(feature_missing_segments[3])
[hyp_put,total]=count_hypput_total(list_annot_all)
print(total,"features are entirely missing, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_total=get_annot_features(feature_missing_segments[4])
[hyp_put,total]=count_hypput_total(list_annot_total)
print("there is at least one segment missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_ok=get_annot_features(feature_missing_segments[6])
[hyp_put,total]=count_hypput_total(list_annot_ok)
print(total ,"features are entirely present in the new genome, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_features=get_annot_features(feature_missing_segments[5])
[hyp_put,total]=count_hypput_total(list_annot_features)
print("there is", total,"features in total, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
# functions to generate the different gffs
def get_segments_positions_on_genome(pos_seg):
bed=open(pos_seg,'r')
lines=bed.readlines() # read line by line ?
bed.close()

nina.marthe_ird.fr
committed
seg_count=0

nina.marthe_ird.fr
committed
[seg,chrom,start,stop,strand,index]=[line[3][1:],line[0],int(line[1])+1,int(line[2]),line[3][0:1],seg_count] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
segments_on_target_genome[seg]=[chrom,start,stop,strand,index]
seg_count+=1

nina.marthe_ird.fr
committed
def get_segments_sequence(gfa,segments_list):
file_gfa=open(gfa,'r')
lines_gfa=file_gfa.readlines()
file_gfa.close()
seg_seq={}
for line in lines_gfa:
line=line.split()

nina.marthe_ird.fr
committed
if line[0]=="S": # get the sequence of the segment

nina.marthe_ird.fr
committed
if seg_id in segments_list:
seg_seq[seg_id]=line[2]
return seg_seq

nina.marthe_ird.fr
committed
def get_paths(gfa,target_genomes):
file_gfa=open(gfa,'r')
lines_gfa=file_gfa.readlines()
file_gfa.close()
paths={}
for line in lines_gfa:
line=line.split()
if (line[0]=="W") & (line[1]!="_MINIGRAPH_"): # get the walk of the genome

nina.marthe_ird.fr
committed
if line[3] in target_genomes:
path=line[6].replace(">",";>")
path=path.replace("<",";<").split(';')
list_segments=[]
if segment[0:1]=='>':
list_segments.append('>s'+segment[1:])
elif segment[0:1]=='<':
list_segments.append('<s'+segment[1:])
paths[line[3]]=list_segments

nina.marthe_ird.fr
committed
return paths
def get_first_seg(list_seg): # get the first segment of the list that is in the target genome
first_seg_found=''
for segment in list_seg:
if segment[1:] in segments_on_target_genome:
first_seg_found=segment[1:]
break
return first_seg_found
# functions to get the detail of the variations in the features
def compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand):
# get the list of segments in common
seg_common=[]
for segment in list_1_unstrand:
if segment in list_2_unstrand:
seg_common.append(segment)
# for each segment in common, check if the strand is the same. check index in list unstranded to get the segment in list stranded
same_strand_count=0
for segment in seg_common:
index_1=list_1_unstrand.index(segment)
index_2=list_2_unstrand.index(segment)
if list_1[index_1]==list_2[index_2]:
same_strand_count+=1
return [seg_common,same_strand_count]

nina.marthe_ird.fr
committed
def add_target_genome_path(feature_id,target_genome_path):
feature=Features[feature_id]
list_seg=feature.segments_list_source
first_seg=get_first_seg(list_seg)
last_seg=get_first_seg(reversed(list_seg))
feature_path=[]
if first_seg!='':
feature_path=get_feature_path(target_genome_path,first_seg,last_seg)
feature.segments_list_target=feature_path
def get_feature_path(target_genome_path,first_seg,last_seg):
# find the path in target genome

nina.marthe_ird.fr
committed
first_seg_index=segments_on_target_genome[first_seg][4]
last_seg_index=segments_on_target_genome[last_seg][4]
first_index=min(first_seg_index,last_seg_index)
last_index=max(first_seg_index,last_seg_index)
feature_path_target_genome=target_genome_path[first_index:last_index+1]
return feature_path_target_genome
def convert_strand(strand):
match strand:
case "+":
return ">"
case "-":
return "<"
case ">":
return "+"
case "<":
return "-"
case default:
return ""
def get_sequence_list_seg(list_seg,i,feature,seg_seq):
for k in range(i,len(list_seg)):
if k==len(list_seg)-1:
del_sequence+=get_segment_sequence(seg_seq,list_seg[k])[0:feature.pos_stop]
del_sequence+=get_segment_sequence(seg_seq,list_seg[k])
return del_sequence
def get_segment_sequence(seg_seq,segment):
if (segment[0]==">") | (segment[0]=="+"):
return seg_seq[segment[1:]]
else:
return reverse_complement(seg_seq[segment[1:]])
def reverse_complement(sequence):
sequence_rc=""
for char in sequence:
sequence_rc+=complement(char)
return sequence_rc[::-1]
def complement(nucl):
match nucl:
case "A":
return "T"
case "C":
return "G"
case "G":
return "C"
case "T":
return "A"
return nucl
class Variation:
def __init__(self,feature_id,feature_type,chr,start_new,stop_new,inversion,size_diff,size_new):
self.feature_id=feature_id
self.feature_type=feature_type
self.chr=chr
self.start_new=start_new
self.stop_new=stop_new
self.inversion=inversion
self.size_diff=size_diff
self.size_new=size_new
self.type=''
self.last_seg_in_target=''
self.seg_ref=list()
self.seg_alt=list()
# add fct to write line.
#def __str__(self):
# return f"id={self.id}, position on the original genome={self.chr}:{self.start}-{self.stop}, size={self.size}, features={self.features}"
def create_var(feature_id,first_seg,last_seg,target_genome_path):
feature=Features[feature_id]
start_new_genome=get_feature_start_on_target_genome(first_seg,feature_id)-1
stop_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id)
size_new_genome=stop_new_genome-start_new_genome
size_diff=str(size_new_genome-feature.size)
# get feature paths on the original genome and on the target genome

nina.marthe_ird.fr
committed
feature_path_target_genome=feature.segments_list_target
feature_path_source_genome=feature.segments_list_source
[feature_path_source_genome,feature_path_target_genome,inversion]=detect_feature_inversion(feature_path_source_genome,feature_path_target_genome)
variation=Variation(feature_id,feature.type,feature.chr,start_new_genome,stop_new_genome,inversion,size_diff,size_new_genome)
return(variation,feature_path_source_genome,feature_path_target_genome)
def reset_var(variation):
variation.type='' # make type enumerate
variation.size_var=0
variation.start_var=''

nina.marthe_ird.fr
committed
variation.start_var_index=0
variation.ref=''
variation.alt=''
def get_old_new_pos_substitution(feat_start,variation,feature_path_target_genome,feat):
pos_old=str(int(Segments[variation.start_var].start)-int(feat_start))
start_feat=get_feature_start_on_target_genome(feature_path_target_genome[0][1:],feat)
start_var=segments_on_target_genome[variation.start_on_target][1]
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
def get_old_new_pos_insertion(variation,feat_start,feature_path_target_genome,feat):
pos_old=str(int(Segments[variation.start_var].start)-int(feat_start))
start_feat=get_feature_start_on_target_genome(feature_path_target_genome[0][1:],feat)
start_var=segments_on_target_genome[variation.start_var][1]-len(variation.alt)
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
def get_old_new_pos_deletion(variation,feat_start,feature_path_target_genome,feat):

nina.marthe_ird.fr
committed
i=variation.start_var_index

nina.marthe_ird.fr
committed
pos_old=int(Segments[variation.start_var].start)-int(feat_start)+Features[feat].pos_start-1
pos_old=int(Segments[variation.start_var].start)-int(feat_start)

nina.marthe_ird.fr
committed
if pos_old<0:
pos_old=0
if variation.last_seg_in_target=="": # deletion of the beggining of the feature, so no segment placed in the new genome yet.

nina.marthe_ird.fr
committed
pos_new=0
start_feat=get_feature_start_on_target_genome(feature_path_target_genome[0][1:],feat)
start_var=segments_on_target_genome[variation.last_seg_in_target][2]+1
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
def init_new_var(variation,type,feature_path_source_genome,feature_path_target_genome,i,j,seg_seq,feature):
variation.type=type
variation.start_var=feature_path_source_genome[i][1:]

nina.marthe_ird.fr
committed
variation.start_var_index=i
if type=="substitution":
variation.start_on_target=feature_path_target_genome[j][1:]
variation.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
variation.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
variation.seg_ref.append(feature_path_source_genome[i])
variation.seg_alt.append(feature_path_target_genome[j])
elif type=="insertion":
variation.ref="-"
variation.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
variation.seg_alt.append(feature_path_target_genome[j])
elif type=="deletion":
if i==0: # if the deletion is at the start of the feature, the deletion doesnt always start at the start at the first segment :
#use pos_start, position of the feature on its first segment
variation.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])[feature.pos_start-1:]
variation.seg_ref.append(feature_path_source_genome[i])
else: # else, the deletion will always start at the start of the first segment.
variation.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
variation.seg_ref.append(feature_path_source_genome[i])
variation.alt="-"
def continue_var(variation,seg_seq,feature_path_source_genome,feature_path_target_genome,i,j,genome_to_continue):
if variation.type=="substitution":
if genome_to_continue==0: # genome_to_continue allows to choose if the substitution continues for the original or the target genome, or both.
variation.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
variation.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
variation.seg_ref.append(feature_path_source_genome[i])
variation.seg_alt.append(feature_path_target_genome[j])
elif genome_to_continue==1: # deletion
variation.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
variation.seg_ref.append(feature_path_source_genome[i])
elif genome_to_continue==2: # insertion
variation.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
variation.seg_alt.append(feature_path_target_genome[j])
elif variation.type=="insertion":
variation.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
variation.seg_alt.append(feature_path_target_genome[j])
elif variation.type=="deletion":
variation.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
variation.seg_ref.append(feature_path_source_genome[i])
def get_common_segments(list1,list2):
list_output=[]
for elem in list1:
if elem in list2:
list_output.append(elem)
return list_output
def detect_segment_order_inversion(list_1,list_2):
if (len(list_1)==1) | (len(list_2)==1):
return False
[cpt,i]=[0,0]
list_1_common=get_common_segments(list_1,list_2)
list_2_common=get_common_segments(list_2,list_1)
list_2_common_reversed=list(reversed(list_2_common))
while i<len(list_1_common):
if list_2_common_reversed[i]==list_1_common[i]:
return (cpt>len(list_1_common)*0.9) # if more than 90% of the segments are on the same position when the lists are reversed, there is an inversion.
def detect_orient_inversion(list_1,list_2):
list_1_unstrand=[segment_stranded[1:] for segment_stranded in list_1]
list_2_unstrand=[segment_stranded[1:] for segment_stranded in list_2]
[seg_common,same_strand_count]=compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand)
if same_strand_count>=len(seg_common)*0.9: # if more than 90% of segments shared have the same strand, no inversion
strand_inversion=False
strand_inversion=True
return [strand_inversion,list_1_unstrand,list_2_unstrand]
# takes two lists of segments for two genes, check if the first list is an inversion of the second one (if the segments in common are on the opposite strand)
def detect_feature_inversion(list_1,list_2):
# check if we have an inversion of the orientation of the segments
[strand_inversion,list_1_unstrand,list_2_unstrand]=detect_orient_inversion(list_1,list_2)
# check if we have an inversion of the order of the segments
segment_order_inversion=detect_segment_order_inversion(list_1_unstrand,list_2_unstrand)
# if there we have both inversions, the gene is in an inverted region. reverse the second list for the comparison.
if segment_order_inversion & strand_inversion:
inversion=1
list_2=invert_segment_list(list_2)
else :
inversion=0
return [list_1,list_2,str(inversion)]
def invert_segment_list(seg_list):
list_inverted=list()
for seg in seg_list:
if seg[0]==">":
inv_seg="<"+seg[1:]
elif seg[0]=="<":
inv_seg=">"+seg[1:]
else:
inv_seg=seg
list_inverted.append(inv_seg)
return list(reversed(list_inverted))