Newer
Older
from Graph_gff import Segments, Features, get_feature_start_on_segment, get_feature_stop_on_segment
global segments_on_target_genome
segments_on_target_genome={}
# get the start position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_start_on_genome(start_seg,feat_id):
seg_start_pos=segments_on_target_genome[start_seg][1]
feat_start_pos=get_feature_start_on_segment(start_seg,feat_id)
start=int(seg_start_pos)+int(feat_start_pos)-1
return start
# get the stop position of the features on the linear genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_stop_on_genome(stop_seg,feat_id):
seg_start_pos=segments_on_target_genome[stop_seg][1]
feat_stop_pos=get_feature_stop_on_segment(stop_seg,feat_id)
stop=int(seg_start_pos)+int(feat_stop_pos)-1
return stop
# functions to get the detailed gff with all the fragments of the features
# get the position of a part of a feature on the complete feature (on the original genome)
def get_position_on_feature(start_seg,stop_seg,feature_start_segment,feature):
start_on_feature=get_segment_start_on_feature(feature_start_segment,start_seg,feature)
stop_on_feature=get_segment_stop_on_feature(start_seg,stop_seg,feature,start_on_feature)
position=f';position={start_on_feature}-{stop_on_feature}'
return position
def get_segment_start_on_feature(feature_start_segment,start_seg,feature):
feature_start_pos=Segments[feature_start_segment].start+get_feature_start_on_segment(feature_start_segment,feature.id)-1
start_on_reference=Segments[start_seg].start+get_feature_start_on_segment(start_seg,feature.id)-1
start_on_feature=int(start_on_reference)-int(feature_start_pos)+1
return start_on_feature
def get_segment_stop_on_feature(start_seg,stop_seg,feature,start_on_feature):
start_on_new_genome=get_feature_start_on_genome(start_seg,feature.id)
stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature.id)
# length on new genome is the same as length on reference, as we only get the positions of conserved parts of the feature
stop_on_feature=start_on_feature+(stop_on_new_genome-start_on_new_genome) # stop position : start+length
return stop_on_feature
# get the proportion of a part of the feature on the total length
def get_proportion_of_feature_part(start_seg,stop_seg,feature):
start_on_new_genome=get_feature_start_on_genome(start_seg,feature.id)
stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature.id)
# length on new genome is the same as length on reference, as we only get the positions of conserved parts of the feature
proportion=f';proportion={stop_on_new_genome-start_on_new_genome+1}/{feature.size}'
return proportion
# returns the gff line to write in the output file for the function gff_detail
def create_line_detail(last_seg,feature_id,start_seg,feat_start):
NMarthe
committed
[stop_seg,feature,chr,strand]=[last_seg[1:],Features[feature_id],segments_on_target_genome[start_seg][0],Features[feature_id].strand]
# start and stop position of the feature on the genome we transfer on
start_on_new_genome=get_feature_start_on_genome(start_seg,feature_id)
stop_on_new_genome=get_feature_stop_on_genome(stop_seg,feature_id)
proportion=get_proportion_of_feature_part(start_seg,stop_seg,feature)
position=get_position_on_feature(start_seg,stop_seg,feat_start,feature)
annotation=feature.annot+proportion+position
output_line=f'{chr}\tGrAnnoT\t{feature.type}\t{start_on_new_genome}\t{stop_on_new_genome}\t.\t{strand}\t.\t{annotation}\n'
# functions to get the gff with one line per feature
def right_size(size,max_diff,feat):
if max_diff==0:
return True
return not ((size>Features[feat].size*max_diff) | (size<Features[feat].size/max_diff))
def create_line_gff_one(first_seg,last_seg,feature_id,size_diff,list_seg):
NMarthe
committed
[chr,strand,feature]=[segments_on_target_genome[first_seg][0],Features[feature_id].strand,Features[feature_id]]
variant_count=0
for segment in list_seg:
if segment[1:] not in segments_on_target_genome: # if a segment is absent, it is either a deletion of a substitution. insertions are not considered here.
variant_count+=1
annotation=f'{feature.annot};Size_diff={size_diff};Nb_variants={variant_count}'
output_line=f'{chr}\tGrAnnoT\t{feature.type}\t{get_feature_start_on_genome(first_seg,feature_id)}\t{get_feature_stop_on_genome(last_seg,feature_id)}\t.\t{strand}\t.\t{annotation}\n'
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# functions to output the stats on the transfer
def stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feature_id):
# [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
feature_missing_segments[5].append(feature_id)
if (first_seg=='') : # no segment of the feature is in the genome, the feature is missing entirely
feature_missing_segments[3].append(feature_id)
elif first_seg != list_seg[0][1:]: # the first segment is missing
feature_missing_segments[0].append(feature_id)
elif last_seg!=list_seg[-1][1:]: # the last segment is missing
feature_missing_segments[2].append(feature_id)
# go through all the segments, check if some are missing in the middle of the feature
elif (len(list_seg)!=1) & (feature_id not in feature_missing_segments[3]): # to access the second to last element
for segment in list_seg[1-(len(list_seg)-2)]:
if segment[1:] not in segments_on_target_genome:
feature_missing_segments[1].append(feature_id)
break
# go through the segments, to see if one is missing anywhere on the feature
for segment in list_seg:
if segment[1:] not in segments_on_target_genome:
if feature_id not in feature_missing_segments[4]:
feature_missing_segments[4].append(feature_id)
break
# if the feature doesnt have a missing segment, it is complete. ADD THE PATH CHECK FOR INSERTIONS !!
if feature_id not in feature_missing_segments[4]:
feature_missing_segments[6].append(feature_id)
def get_annot_features(list_features):
list_annot_features=[]
for feature in list_features:
list_annot_features.append(Features[feature].note)
return list_annot_features
def count_hypput_total(list_annot_first):
total=len(list_annot_first)
count_hypput=0
for annot in list_annot_first:
if ("hypothetical" in annot) | ("putative" in annot):
count_hypput+=1
return [count_hypput,total]
# print stats on the transfer : number of feature that have segments in different positions missing.
def stats_features(feature_missing_segments):
# [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
list_annot_first=get_annot_features(feature_missing_segments[0])
[hyp_put,total]=count_hypput_total(list_annot_first)
print("\nthe first segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_middle=get_annot_features(feature_missing_segments[1])
[hyp_put,total]=count_hypput_total(list_annot_middle)
print("a middle segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_last=get_annot_features(feature_missing_segments[2])
[hyp_put,total]=count_hypput_total(list_annot_last)
print("the last segment is missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_all=get_annot_features(feature_missing_segments[3])
[hyp_put,total]=count_hypput_total(list_annot_all)
print(total,"features are entirely missing, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_total=get_annot_features(feature_missing_segments[4])
[hyp_put,total]=count_hypput_total(list_annot_total)
print("there is at least one segment missing for", total,"features, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_ok=get_annot_features(feature_missing_segments[6])
[hyp_put,total]=count_hypput_total(list_annot_ok)
print(total ,"features are entirely present in the new genome, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
list_annot_features=get_annot_features(feature_missing_segments[5])
[hyp_put,total]=count_hypput_total(list_annot_features)
print("there is", total,"features in total, including",round(100*(hyp_put)/total,2),"% hypothetical or putative.")
# functions to generate the different gffs
def get_segments_positions_on_genome(pos_seg):
bed=open(pos_seg,'r')
lines=bed.readlines() # read line by line ?
bed.close()
for line in lines:
line=line.split()
[seg,chrom,start,stop,strand]=[line[3][1:],line[0],line[1],line[2],line[3][0:1]]
segments_on_target_genome[seg]=[chrom,start,stop,strand]
def get_segments_sequence_and_paths(gfa):
file_gfa=open(gfa,'r')
lines_gfa=file_gfa.readlines()
file_gfa.close()
seg_seq={}
paths={}
for line in lines_gfa:
line=line.split()
if (line[0]=="S"): # get the sequence of the segment
seg_id='s'+line[1]
seg_seq[seg_id]=line[2]
if (line[0]=="W") & (line[1]!="_MINIGRAPH_"): # get the walk of the genome
path=line[6].replace(">",";>")
path=path.replace("<",";<").split(';')
list_path=[]
for segment in path:
if segment[0:1]=='>':
list_path.append('+s'+segment[1:])
elif segment[0:1]=='<':
list_path.append('-s'+segment[1:])
#else:
# list_path.append('s'+segment[1:])
paths[line[3]]=list_path
return [paths,seg_seq]
def get_first_seg(list_seg):
first_seg_found=''
for segment in list_seg:
if segment[1:] in segments_on_target_genome:
first_seg_found=segment[1:]
break
return first_seg_found
def get_all_features_in_gff(gff):
gff=open(gff,'r')
lines=gff.readlines()
gff.close()
list_feature=[]
for line in lines:
feature_id=(line.split()[8].split(";")[0].split("=")[1].replace(".","_").replace(":","_"))
if feature_id not in list_feature:
list_feature.append(feature_id)
return list_feature
# functions to get the detail of the variations in the features
def compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand):
# get the list of segments in common
seg_common=[]
for segment in list_1_unstrand:
if segment in list_2_unstrand:
seg_common.append(segment)
# for each segment in common, check if the strand is the same. check index in list unstranded to get the segment in list stranded
same_strand_count=0
for segment in seg_common:
index_1=list_1_unstrand.index(segment)
index_2=list_2_unstrand.index(segment)
if list_1[index_1]==list_2[index_2]:
same_strand_count+=1
return [seg_common,same_strand_count]

nina.marthe_ird.fr
committed
def get_feature_path(paths,first_seg,last_seg,target_genome_name):
# find the path in azucena.
first_strand=convert_strand(segments_on_target_genome[first_seg][3])
first_seg_stranded=first_strand+first_seg
last_strand=convert_strand(segments_on_target_genome[last_seg][3])
last_seg_stranded=last_strand+last_seg
index_first_seg=int(paths[target_genome_name].index(first_seg_stranded))
index_last_seg=int(paths[target_genome_name].index(last_seg_stranded))
first_index=min(index_first_seg,index_last_seg)
last_index=max(index_last_seg,index_first_seg)
list_segfeat_azu=paths[target_genome_name][first_index:last_index+1]
list_segfeat_azu_corrected=[convert_strand(segment_stranded[0])+segment_stranded[1:] for segment_stranded in list_segfeat_azu]
return list_segfeat_azu_corrected
def convert_strand(strand):
match strand:
case "+":
return ">"
case "-":
return "<"
case ">":
return "+"
case "<":
return "-"
case default:
return ""
def get_sequence_list_seg(list_segfeat_nb,i,feature,seg_seq):
del_sequence=""
for k in range(i,len(list_segfeat_nb)):
if k==len(list_segfeat_nb)-1:
del_sequence+=get_segment_sequence(seg_seq,list_segfeat_nb[k])[0:feature.pos_stop]
del_sequence+=get_segment_sequence(seg_seq,list_segfeat_nb[k])
return del_sequence
def get_segment_sequence(seg_seq,segment):
if (segment[0]==">") | (segment[0]=="+"):
return seg_seq[segment[1:]]
else:
return reverse_complement(seg_seq[segment[1:]])
def reverse_complement(sequence):
sequence_rc=""
for char in sequence:
sequence_rc+=complement(char)
return sequence_rc[::-1]
def complement(nucl):
match nucl:
case "A":
return "T"
case "C":
return "G"
case "G":
return "C"
case "T":
return "A"
return nucl
class Variation:
def __init__(self,feature_id,feature_type,chr,start_new,stop_new,inversion,size_diff):
self.feature_id=feature_id
self.feature_type=feature_type
self.chr=chr
self.start_new=start_new
self.stop_new=stop_new
self.inversion=str(inversion)
self.size_diff=str(size_diff)
self.size_new=str(self.stop_new-self.start_new+1)
self.type=''
self.last_seg_in_target=''
self.seg_ref=list()
self.seg_alt=list()
# ajouter fct pour écrire la ligne. return line.
#def __str__(self):
# return f"id={self.id}, position on the original genome={self.chr}:{self.start}-{self.stop}, size={self.size}, features={self.features}"

nina.marthe_ird.fr
committed
def create_var(feature_id,first_seg,last_seg,paths,target_genome_name):
feature=Features[feature_id]
start_new_genome=get_feature_start_on_genome(first_seg,feature_id)
stop_new_genome=get_feature_stop_on_genome(last_seg,feature_id)
size_new_genome=int(stop_new_genome)-int(start_new_genome)+1
size_diff=str(size_new_genome-feature.size)
# get feature paths on the original genome and on the target genome

nina.marthe_ird.fr
committed
list_segfeat_azu=get_feature_path(paths,first_seg,last_seg,target_genome_name)
list_segfeat_nb=feature.segments_list
[list_segfeat_nb,list_segfeat_azu,inversion]=detect_gene_inversion(list_segfeat_nb,list_segfeat_azu)
variation=Variation(feature_id,feature.type,feature.chr,start_new_genome,stop_new_genome,inversion,size_diff)
return(variation,list_segfeat_nb,list_segfeat_azu)
def reset_var(variation):
variation.type='' # type énuméré.
variation.size_var=0
variation.start_var=''
variation.ref=''
variation.alt=''
def get_old_new_pos_substitution(feat_start,variation,list_segfeat_azu,feat):
pos_old=str(int(Segments[variation.start_var].start)-int(feat_start)+1)
start_feat=get_feature_start_on_genome(list_segfeat_azu[0][1:],feat)
start_var=int(segments_on_target_genome[variation.start_on_target][1])
pos_new=str(start_var-start_feat+1)
return [pos_old,pos_new]
def get_old_new_pos_insertion(variation,feat_start,list_segfeat_azu,feat):
pos_old=str(int(Segments[variation.start_var].start)-int(feat_start)+1)
start_feat=get_feature_start_on_genome(list_segfeat_azu[0][1:],feat)
start_var=int(segments_on_target_genome[variation.start_var][1])-1
pos_new=str(start_var-start_feat+1)
return [pos_old,pos_new]
def get_old_new_pos_deletion(variation,feat_start,list_segfeat_azu,feat,i):
pos_old=int(Segments[variation.start_var].start)-int(feat_start)+Features[feat].pos_start+1
else:
pos_old=int(Segments[variation.start_var].start)-int(feat_start)+1
if pos_old<1:
pos_old=1
if variation.last_seg_in_target=="": # deletion of the beggining of the feature, so no segment placed in the new genome yet.
start_feat=get_feature_start_on_genome(list_segfeat_azu[0][1:],feat)
start_var=int(segments_on_target_genome[variation.last_seg_in_target][2])+1
pos_new=str(start_var-start_feat+1)
return [pos_old,pos_new]
def init_new_var(variation,type,list_segfeat_nb,list_segfeat_azu,i,j,seg_seq,feature):
variation.type=type
variation.start_var=list_segfeat_nb[i][1:]
if type=="substitution":
variation.start_on_target=list_segfeat_azu[j][1:]
variation.ref=get_segment_sequence(seg_seq,list_segfeat_nb[i])
variation.alt=get_segment_sequence(seg_seq,list_segfeat_azu[j])
variation.seg_ref.append(list_segfeat_nb[i])
variation.seg_alt.append(list_segfeat_azu[j])
elif type=="insertion":
variation.ref="-"
variation.alt=get_segment_sequence(seg_seq,list_segfeat_azu[j])
variation.seg_alt.append(list_segfeat_azu[j])
elif type=="deletion":
if i==0: # if the deletion is at the start of the feature, the deletion doesnt always start at the start at the first segment :
#use pos_start, position of the feature on its first segment
variation.ref=get_segment_sequence(seg_seq,list_segfeat_nb[i])[feature.pos_start-1:]
variation.seg_ref.append(list_segfeat_nb[i])
else: # else, the deletion will always start at the start of the first segment.
variation.ref=get_segment_sequence(seg_seq,list_segfeat_nb[i])
variation.seg_ref.append(list_segfeat_nb[i])
variation.alt="-"
def continue_var(variation,seg_seq,list_segfeat_nb,list_segfeat_azu,i,j,genome_to_continue):
if variation.type=="substitution":
if genome_to_continue==0: # genome_to_continue allows to choose if the substitution continues for the original or the target genome, or both.
variation.ref+=get_segment_sequence(seg_seq,list_segfeat_nb[i])
variation.alt+=get_segment_sequence(seg_seq,list_segfeat_azu[j])
variation.seg_ref.append(list_segfeat_nb[i])
variation.seg_alt.append(list_segfeat_azu[j])
elif genome_to_continue==1: # deletion
variation.ref+=get_segment_sequence(seg_seq,list_segfeat_nb[i])
variation.seg_ref.append(list_segfeat_nb[i])
elif genome_to_continue==2: # insertion
variation.alt+=get_segment_sequence(seg_seq,list_segfeat_azu[j])
variation.seg_alt.append(list_segfeat_azu[j])
elif variation.type=="insertion":
variation.alt+=get_segment_sequence(seg_seq,list_segfeat_azu[j])
variation.seg_alt.append(list_segfeat_azu[j])
elif variation.type=="deletion":
variation.ref+=get_segment_sequence(seg_seq,list_segfeat_nb[i])
variation.seg_ref.append(list_segfeat_nb[i])
def get_common_segments(list1,list2):
list_output=[]
for elem in list1:
if elem in list2:
list_output.append(elem)
return list_output
def detect_segment_order_inversion(list_1,list_2):
if (len(list_1)==1) | (len(list_2)==1):
return False
[cpt,i]=[0,0]
list_1_common=get_common_segments(list_1,list_2)
list_2_common=get_common_segments(list_2,list_1)
list_2_common_reversed=list(reversed(list_2_common))
while i<len(list_1_common):
if list_2_common_reversed[i]==list_1_common[i]:
return (cpt>len(list_1_common)*0.9) # if more than 90% of the segments are on the same position when the lists are reversed, there is an inversion.
def detect_orient_inversion(list_1,list_2):
list_1_unstrand=[segment_stranded[1:] for segment_stranded in list_1]
list_2_unstrand=[segment_stranded[1:] for segment_stranded in list_2]
[seg_common,same_strand_count]=compare_strand(list_1,list_2,list_1_unstrand,list_2_unstrand)
if same_strand_count>=len(seg_common)*0.9: # if more than 90% of segments shared have the same strand, no inversion
strand_inversion=False
strand_inversion=True
return [strand_inversion,list_1_unstrand,list_2_unstrand]
# takes two lists of segments for two genes, check if the first list is an inversion of the second one (if the segments in common are on the opposite strand)
def detect_gene_inversion(list_1,list_2):
# check if we have an inversion of the orientation of the segments
[strand_inversion,list_1_unstrand,list_2_unstrand]=detect_orient_inversion(list_1,list_2)
# check if we have an inversion of the order of the segments
segment_order_inversion=detect_segment_order_inversion(list_1_unstrand,list_2_unstrand)
# if there we have both inversions, the gene is in an inverted region. reverse the second list for the comparison.
if segment_order_inversion & strand_inversion:
inversion=1
list_2=invert_segment_list(list_2)
else :
inversion=0
return [list_1,list_2,inversion]
def invert_segment_list(seg_list):
list_inverted=list()
for seg in seg_list:
if seg[0]==">":
inv_seg="<"+seg[1:]
elif seg[0]=="<":
inv_seg=">"+seg[1:]
else:
inv_seg=seg
list_inverted.append(inv_seg)
return list(reversed(list_inverted))
# pour faire l'inférence, lire le fichier variations.txt et l'interpréter. pour chaque variation, regarder ce que ça fait.
# donc première étape quand on appelle la fct "inférence", c'est d'ajouter dans les features leur séquence avec seg_seq et la liste des segments.
def add_feature_sequence(feature,seg_seq):
feature_sequence=""
for segment in feature.segments_list:
if segment==feature.segments_list[0]:
feature_sequence+=get_segment_sequence(seg_seq,segment)[feature.pos_start-1:] # revérifier les +/- 1 pour la position, avec de vraies données
elif segment==feature.segments_list[-1]:
feature_sequence+=get_segment_sequence(seg_seq,segment)[0:feature.pos_stop] # revérifier les +/- 1 pour la position, avec de vraies données
else:
feature_sequence+=get_segment_sequence(seg_seq,segment)
feature.sequence=feature_sequence
def transcription(dna_sequence):
rna_sequence=""
for dna_base in dna_sequence:
match dna_base:
case "A":
rna_sequence+="U"
case "C":
rna_sequence+="G"
case "G":
rna_sequence+="C"
case "T":
rna_sequence+="A"
return rna_sequence
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
# penser à transcrire la séquence codante du gène !!
def get_aa(codon):
match codon[0:2]:
case "UU":
if (codon[2]=="U") | (codon[2]=="C"):
return "Phe"
else:
return "Leu"
case "UC":
return "Ser"
case "UA":
if (codon[2]=="U") | (codon[2]=="C"):
return "Tyr"
else:
return "Stop"
case "UG":
if (codon[2]=="U") | (codon[2]=="C"):
return "Cys"
elif codon[2]=="A":
return "Stop"
else:
return "Trp"
case "CU":
return "Leu"
case "CC":
return "Pro"
case "CA":
if (codon[2]=="U") | (codon[2]=="C"):
return "His"
else:
return "Gln"
case "CG":
return "Arg"
case "AU":
if codon[2]=="G":
return "Met"
else:
return "Ile"
case "AC":
return "Thr"
case "AA":
if (codon[2]=="U") | (codon[2]=="C"):
return "Asn"
else:
return "Lys"
case "AG":
if (codon[2]=="U") | (codon[2]=="C"):
return "Ser"
else:
return "Arg"
case "GU":
return "Val"
case "GC":
return "Ala"
case "GA":
if (codon[2]=="U") | (codon[2]=="C"):
return "Asp"
else:
return "Glu"
case "GG":
return "Gly"