Newer
Older

nina.marthe_ird.fr
committed
from .usefull_little_functions import *
from .intersect import Features,Segments,search_segment,invert_seg
from .detect_inversion import detect_feature_inversion

nina.marthe_ird.fr
committed
# functions to get the detail of the variations in the features
# handle the variations analysis between the feature on the source genome and the feature on the target genome
def generate_variations_details(feature_id,seg_seq,match,file_out_var,segments_on_target_genome):
# for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
# do it for the gene, then for its childs.
if match[4]==True: # gene passed the filters
inversion=detect_feature_inversion(Features[feature_id].segments_list_source,feature_target_path)
print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome)
child_variation_details(feature_id,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome)

nina.marthe_ird.fr
committed
def child_variation_details(feature_id,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome):

nina.marthe_ird.fr
committed
# print child feature var
for child_id in Features[feature_id].get_child_list(Features):

nina.marthe_ird.fr
committed
child=Features[child_id]
# look for the first and last seg of the child in its parent path
[child_first_seg,child_last_seg]=get_child_first_last_seg(child.segments_list_source,feature_target_path,inversion)
if child_first_seg!='': # else the child faeture is absent in this occurence of the parent feature
child_target_path=find_child_target_path(child.segments_list_target,walk,copy_id)

nina.marthe_ird.fr
committed
# treat the child feature:
print_variations(child_id,child_first_seg,child_last_seg,copy_id,walk,seg_seq,child_target_path,inversion,file_out_var,segments_on_target_genome)

nina.marthe_ird.fr
committed
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def get_child_first_last_seg(child_segments_list_source,feature_target_path,inversion):
[child_first_seg,child_last_seg]=['','']
child_first_seg=get_child_first_seg(child_segments_list_source,feature_target_path)
if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
child_last_seg=get_child_last_seg(child_segments_list_source,feature_target_path)
if inversion:
temp=child_first_seg
child_first_seg=child_last_seg
child_last_seg=temp
return [child_first_seg,child_last_seg]
def get_child_first_seg(child_segments_list_source,feature_target_path):
for seg in child_segments_list_source: # get first seg in the parent path
if seg in feature_target_path: # parent path
return seg
elif invert_seg(seg) in feature_target_path:
return invert_seg(seg)
return ''
def get_child_last_seg(child_segments_list_source,feature_target_path):
for seg in reversed(child_segments_list_source): # get last seg in the parent path
if seg in feature_target_path: # parent path
return seg
elif invert_seg(seg) in feature_target_path:
return invert_seg(seg)
return ''
def find_child_target_path(child_segments_list_target,walk,copy_id):
for match in child_segments_list_target: # look for the right occurence of the child feature
if (match[0]==walk) and (match[1]==copy_id):
return match[2]
def print_variations(feature_id,first_seg,last_seg,copy_id,walk,seg_seq,feature_target_path,inversion,file_out_var,segments_on_target_genome):

nina.marthe_ird.fr
committed
if first_seg!='': # if the feature is not completly absent # add the else, output absent features
feat_var=FeatureVariation(feature_id,file_out_var,first_seg,last_seg,walk,copy_id,feature_target_path,inversion,segments_on_target_genome)
feature_path_source_genome=feat_var.feature_path_source_genome # removes the strand on the segments list
feature_path_target_genome=feat_var.feature_path_target_genome # removes the strand on the segments list and inverts the list if necessary

nina.marthe_ird.fr
committed
feature=Features[feature_id]
# loop to go through both paths with i and j
[i,j]=[0,0]

nina.marthe_ird.fr
committed
# detect and print variations ignoring the strands
while (i<len(feature_path_source_genome)) and (j<len(feature_path_target_genome)):
if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths
if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome
# if both segments are absent in the other genome, its a substitution
feat_var.last_seg_in_target=feature_path_target_genome[j]
if (feat_var.type=='insertion') or (feat_var.type=='deletion'): # print the current variation before treating the substitution
feat_var.print_current_var(segments_on_target_genome)
if feat_var.type=='substitution':
feat_var.continue_var(seg_seq,i,j,0)

nina.marthe_ird.fr
committed
else: # initiate substitution
feat_var.init_new_var("substitution",i,j,seg_seq,feature)

nina.marthe_ird.fr
committed
i+=1;j+=1
else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion or continue substitution
if feat_var.type=='deletion': # print the current variation before treating the insertion
feat_var.print_current_var(segments_on_target_genome)
feat_var.last_seg_in_target=feature_path_target_genome[j]
if feat_var.type=='insertion':
feat_var.continue_var(seg_seq,i,j,0)
elif feat_var.type=="substitution":

nina.marthe_ird.fr
committed
while feature_path_target_genome[j]!=feature_path_source_genome[i]:
feat_var.continue_var(seg_seq,i,j,2)

nina.marthe_ird.fr
committed
j+=1
feat_var.print_current_var(segments_on_target_genome)

nina.marthe_ird.fr
committed
j-=1
else: # intiate insertion
feat_var.init_new_var("insertion",i,j,seg_seq,feature)

nina.marthe_ird.fr
committed
j+=1
elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion or continue substitution
if feat_var.type=='insertion': # print the current variation before treating the deletion
feat_var.print_current_var(segments_on_target_genome)
if feat_var.type=='deletion':
feat_var.continue_var(seg_seq,i,j,0)
elif feat_var.type=="substitution":

nina.marthe_ird.fr
committed
while feature_path_target_genome[j]!=feature_path_source_genome[i]:
feat_var.continue_var(seg_seq,i,j,1)

nina.marthe_ird.fr
committed
i+=1
feat_var.print_current_var(segments_on_target_genome)

nina.marthe_ird.fr
committed
i-=1
else: # intiate deletion
feat_var.init_new_var("deletion",i,j,seg_seq,feature)

nina.marthe_ird.fr
committed
i+=1
else : # if both segments are present in the other genome but not at the same position. weird case never found yet
# can be a substitution. check later if its not an inversion
feat_var.last_seg_in_target=feature_path_target_genome[j]
if (feat_var.type=='insertion') or (feat_var.type=='deletion'): # print the current variation before treating the substitution
feat_var.print_current_var(segments_on_target_genome)
if feat_var.type=='substitution':
feat_var.continue_var(seg_seq,i,j,0)

nina.marthe_ird.fr
committed
else: # initiate substitution
feat_var.init_new_var("substitution",i,j,seg_seq,feature)

nina.marthe_ird.fr
committed
i+=1;j+=1
else: # segment present in both, no variation; print the running indel if there is one
if feat_var.type!='': # print the current variation if there is one
feat_var.print_current_var(segments_on_target_genome)
feat_var.last_seg_in_target=feature_path_target_genome[j]

nina.marthe_ird.fr
committed
i+=1;j+=1
if (feat_var.type!=''): # if there was a current variation when we reached the end, print it
feat_var.print_current_var(segments_on_target_genome)

nina.marthe_ird.fr
committed
if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
feat_var.print_last_deletion(i,feature,seg_seq)
if feat_var.var_count==0: # if no variation was encountered
feat_var.print_novar()

nina.marthe_ird.fr
committed
# stores information about a feature and its current variation
class FeatureVariation:
def __init__(self,feature_id,file_out_var,first_seg,last_seg,walk,copy_id,feature_target_path,inversion,segments_on_target_genome):
feature=Features[feature_id]
# get feature paths on the original genome and on the target genome
feature_path_target_genome=feature_target_path
feature_path_source_genome=feature.segments_list_source
if inversion:
feature_path_target_genome=invert_segment_list(feature_path_target_genome)
stop_new_genome=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
start_new_genome=get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
else:
start_new_genome=get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
stop_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
size_new_genome=stop_new_genome-start_new_genome+1
size_diff=str(size_new_genome-feature.size)
sequence_name=get_seg_occ(first_seg,walk,feature_id,copy_id,segments_on_target_genome)[0]
self.feature_id=feature.id # real id in the gff
self.feature_key=feature_id # key to access the feature info in the dict Features
self.feature_type=feature.type
self.chr=sequence_name
self.start_new=start_new_genome
self.stop_new=stop_new_genome

nina.marthe_ird.fr
committed
self.inversion=inversion
self.size_diff=size_diff
self.size_new=size_new_genome

nina.marthe_ird.fr
committed
self.type=''
self.last_seg_in_target=''
self.seg_ref=list()
self.seg_alt=list()
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
self.feature_path_source_genome=feature_path_source_genome
self.feature_path_target_genome=feature_path_target_genome
self.feat_start=feature.start
self.copy_id=copy_id
self.walk=walk
self.start_feat_seg_target=feature_path_target_genome[0]
self.var_count=0
self.file_out_var=file_out_var
# check if a substitution could be an inversion inside a feature
def detect_small_inversion(self):
[list_ref_common,list_alt_common]=[list(),list()]
list_ref_unstrand=[segment_stranded[1:] for segment_stranded in self.seg_ref]
list_alt_unstrand=[segment_stranded[1:] for segment_stranded in self.seg_alt]
for seg in self.seg_ref:
if seg[1:] in list_alt_unstrand:
list_ref_common.append(seg)
for seg in self.seg_alt:
if seg[1:] in list_ref_unstrand:
list_alt_common.append(seg)
if (len(list_ref_common)>len(list_ref_unstrand)*0.5) and (len(list_alt_common)>len(list_alt_unstrand)*0.5):
return f'\t# Suspected inversion within this substitution.'
else:
return ''
# print a variation
def print_current_var(self,segments_on_target_genome):
warning=''
if self.type=='insertion':
[pos_old,pos_new]=self.get_old_new_pos_insertion(segments_on_target_genome)
line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tinsertion\t-\t{self.alt}\t{len(self.alt)}\t{pos_old}\t{pos_new}{warning}\n'
elif self.type=='deletion':
[pos_old,pos_new]=self.get_old_new_pos_deletion(segments_on_target_genome)
line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tdeletion\t{self.ref}\t-\t{len(self.ref)}\t{pos_old}\t{pos_new}{warning}\n'
elif self.type=='substitution':
warning=self.detect_small_inversion()
[pos_old,pos_new]=self.get_old_new_pos_substitution(segments_on_target_genome)
size_subs=f'{len(self.ref)}/{len(self.alt)}'
line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tsubstitution\t{self.ref}\t{self.alt}\t{size_subs}\t{pos_old}\t{pos_new}{warning}\n'
self.file_out_var.write(line)
self.var_count+=1
self.reset_var()
# print a feature with no variation
def print_novar(self):
line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tno_var\t-\t-\t-\t-\t-\n'
self.file_out_var.write(line)
# print a deletion at the end of a feature
def print_last_deletion(self,i,feature,seg_seq):
seg_del=search_segment(self.feature_path_source_genome[i])
pos_old=int(Segments[seg_del].get_start(feature.id))-self.feat_start+1
pos_new=str(self.size_new+1) # the deletion is at the end of the feature on the new genome
del_sequence=FeatureVariation.get_sequence_list_seg(self.feature_path_source_genome,i,feature.pos_stop,seg_seq)
line=f'{self.feature_id}\t{self.feature_type}\t{self.chr}\t{self.start_new}\t{self.stop_new}\t{self.size_new}\t{FeatureVariation.print_inversion(self.inversion)}\t{self.size_diff}\tdeletion\t{del_sequence}\t-\t{len(del_sequence)}\t{pos_old}\t{pos_new}\n'
self.file_out_var.write(line)
self.var_count+=1
self.reset_var()
# reset the informations of the variation, but keep the information about the feature
def reset_var(self):
self.type='' # todo : make type enumerate
self.size_var=0
self.start_var=''
self.start_var_index=0
self.ref=''
self.alt=''
# change the variation information, but keep the feature information (the variation is on the feature)
def init_new_var(self,type,i,j,seg_seq,feature):
feature_path_source_genome=self.feature_path_source_genome
feature_path_target_genome=self.feature_path_target_genome
self.type=type
self.start_var=feature_path_source_genome[i]
self.start_var_index=i
if type=="substitution":
self.start_on_target=feature_path_target_genome[j]
self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
self.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
self.seg_ref.append(feature_path_source_genome[i])
self.seg_alt.append(feature_path_target_genome[j])
elif type=="insertion":
self.ref="-"
self.alt=get_segment_sequence(seg_seq,feature_path_target_genome[j])
self.seg_alt.append(feature_path_target_genome[j])
elif type=="deletion":
if i==0: # if the deletion is at the start of the feature, the deletion doesnt always start at the start at the first segment :
#use pos_start, position of the feature on its first segment
self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])[feature.pos_start-1:]
self.seg_ref.append(feature_path_source_genome[i])
else: # else, the deletion will always start at the start of the first segment.
self.ref=get_segment_sequence(seg_seq,feature_path_source_genome[i])
self.seg_ref.append(feature_path_source_genome[i])
self.alt="-"
# update the variation
def continue_var(self,seg_seq,i,j,genome_to_continue):
feature_path_source_genome=self.feature_path_source_genome
feature_path_target_genome=self.feature_path_target_genome
if self.type=="substitution":
if genome_to_continue==0: # genome_to_continue allows to choose if the substitution continues for the original or the target genome, or both.
self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
self.seg_ref.append(feature_path_source_genome[i])
self.seg_alt.append(feature_path_target_genome[j])
elif genome_to_continue==1: # deletion
self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
self.seg_ref.append(feature_path_source_genome[i])
elif genome_to_continue==2: # insertion
self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
self.seg_alt.append(feature_path_target_genome[j])
elif self.type=="insertion":
self.alt+=get_segment_sequence(seg_seq,feature_path_target_genome[j])
self.seg_alt.append(feature_path_target_genome[j])
elif self.type=="deletion":
self.ref+=get_segment_sequence(seg_seq,feature_path_source_genome[i])
self.seg_ref.append(feature_path_source_genome[i])
# find the position of a substitution on the source and the target sequence
def get_old_new_pos_substitution(self,segments_on_target_genome):
feat=self.feature_key
seg_pos=search_segment(self.start_var)
pos_old=str(int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start)
var_start_seg=self.start_on_target
if self.inversion:
var_start_seg=invert_seg(var_start_seg)
end_var=get_seg_occ(var_start_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]
start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_feat-end_var)
else:
start_var=get_seg_occ(var_start_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]
start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
# find the position of an insertion on the source and the target sequence
def get_old_new_pos_insertion(self,segments_on_target_genome):
feat=self.feature_key
seg_pos=search_segment(self.start_var) # start_var is the segment AFTER the insertion
pos_old=str(int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start)

nina.marthe_ird.fr
committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
start_var_seg=self.start_var
if self.inversion:
start_var_seg=invert_seg(start_var_seg)
end_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]+len(self.alt) # start_var_seg is the segment AFTER the insertion
start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_feat-end_var)
else:
start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]-len(self.alt) # start_var_seg is the segment AFTER the insertion
start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
# find the position of a deletion on the source and the target sequence
def get_old_new_pos_deletion(self,segments_on_target_genome):
feat=self.feature_key
seg_pos=search_segment(self.start_var)
if self.start_var_index==0:
pos_old=int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start+Features[feat].pos_start-1
else:
pos_old=int(Segments[seg_pos].get_start(Features[feat].id))-self.feat_start
if pos_old<0:
pos_old=0
print("error with variation position",self.inversion,"***")
if self.last_seg_in_target=="": # deletion of the beggining of the feature, so no segment placed in the new genome yet.
pos_new=0
else:
start_var_seg=self.last_seg_in_target
if self.inversion:
start_var_seg=invert_seg(start_var_seg)
start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[1]-1
start_feat=get_feature_start_on_target_genome_inv(invert_seg(self.start_feat_seg_target),feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_feat-start_var)
else:
start_var=get_seg_occ(start_var_seg,self.walk,feat,self.copy_id,segments_on_target_genome)[2]+1
start_feat=get_feature_start_on_target_genome(self.start_feat_seg_target,feat,self.walk,self.copy_id,segments_on_target_genome)
pos_new=str(start_var-start_feat)
return [pos_old,pos_new] # pos_old and pos_new are the base before the change
# get a digit for the inversion (convert bool in int)
@staticmethod
def print_inversion(bool):
if bool==True:
return '1'

nina.marthe_ird.fr
committed
else:
return '0'
# outputs the nucleotide sequence of a list of segments, corresponding to the end of a feature
@staticmethod
def get_sequence_list_seg(list_seg,i,feat_pos_stop,seg_seq):
del_sequence=""
for k in range(i,len(list_seg)):
if k==len(list_seg)-1:
del_sequence+=get_segment_sequence(seg_seq,list_seg[k])[0:feat_pos_stop]
else:
del_sequence+=get_segment_sequence(seg_seq,list_seg[k])
return del_sequence