Skip to content
Snippets Groups Projects
Commit b9db5b03 authored by nina.marthe_ird.fr's avatar nina.marthe_ird.fr
Browse files

continued code refactoring. removed global variable segments_on_target_genome,...

continued code refactoring. removed global variable segments_on_target_genome, sorted functions in several files
parent 62230c60
No related branches found
No related tags found
No related merge requests found
......@@ -86,6 +86,14 @@ class Feature:
self.first_part='' # if first=False, string, the id of the first part encountered
self.other_parts_list=[] # if first=True, list ot the id of the other parts encountered
# returns a list of feature's child, and their childs' childs, etc.
def get_child_list(self,Features):
list_childs=[]
for child_id in self.childs:
list_childs.append(child_id) # add the child to the list
child=Features[child_id]
list_childs+=child.get_child_list(Features) # add the child's childs to the list
return list_childs
def __str__(self):
if self.parent=="":
......
This diff is collapsed.
This diff is collapsed.
from .intersect import Features
from .usefull_little_functions import get_segment_sequence
from .detect_inversion import detect_feature_inversion,invert_segment_list
# functions to get the alignment for the transfered genes
# get the alignment between the features on the source genome and the features on the target genome
def print_alignment(feat,match,seg_seq,file_out_aln):
if match[0]!='':
feature_target_path=match[2]
if len(feature_target_path)>0: # if the feature is not completely absent
feature_path_source_genome=Features[feat].segments_list_source
inversion=detect_feature_inversion(feature_path_source_genome,feature_target_path)
if inversion:
feature_target_path=invert_segment_list(feature_target_path)
line_aln=create_line_aln(feature_path_source_genome,feature_target_path,seg_seq,feat)
file_out_aln.write(line_aln)
# creates an alignment for two segments
def segment_aln(type,seg_seq,seg_a,seg_b,first,feature_id,last):
match type:
case "identity":
if first:
feature=Features[feature_id]
seq_aln=get_segment_sequence(seg_seq,seg_a)[feature.pos_start-1:]
elif last:
feature=Features[feature_id]
seq_aln=get_segment_sequence(seg_seq,seg_a)[:feature.pos_stop]
else:
seq_aln=get_segment_sequence(seg_seq,seg_a)
line_a=seq_aln
line_b=seq_aln
len_aln=len(seq_aln)
line_c=len_aln*"*"
case "substitution":
seq_aln_a=get_segment_sequence(seg_seq,seg_a)
seq_aln_b=get_segment_sequence(seg_seq,seg_b)
len_a=len(seq_aln_a)
len_b=len(seq_aln_b)
if len_a>len_b:
diff_len=len_a-len_b
line_a=seq_aln_a
line_b=seq_aln_b+diff_len*"-"
line_c=len_a*" "
else:
diff_len=len_b-len_a
line_a=seq_aln_a+diff_len*"-"
line_b=seq_aln_b
line_c=len_b*" "
case "insertion":
seq_aln_b=get_segment_sequence(seg_seq,seg_b)
len_b=len(seq_aln_b)
line_a=len_b*"-"
line_b=seq_aln_b
line_c=len_b*" "
case "deletion":
if first:
feature=Features[feature_id]
seq_aln_a=get_segment_sequence(seg_seq,seg_a)[feature.pos_start-1:]
else:
seq_aln_a=get_segment_sequence(seg_seq,seg_a)
len_a=len(seq_aln_a)
line_a=seq_aln_a
line_b=len_a*"-"
line_c=len_a*" "
case "end_deletion":
seq_aln_a=""
for segment in seg_a[:-1]:
seq_aln_a+=get_segment_sequence(seg_seq,segment)
feature=Features[feature_id]
seq_aln_a+=get_segment_sequence(seg_seq,seg_a[-1])[0:feature.pos_stop] # for the last segment, only take the part that the feature is on
len_a=len(seq_aln_a)
line_a=seq_aln_a
line_b=len_a*"-"
line_c=len_a*" "
return [line_a,line_b,line_c,False] # check the orientation of the segment later
# formats the alignment lines
def parse_aln_lines(line_a,line_b,line_c,feature_id):
if (len(line_a)!=len(line_b)) or (len(line_b)!=len(line_c)):
print("line lengths differ in alignment")
len_to_parse=len(line_a)
len_parsed=0
aln_line=""
nb_res_a=0
nb_res_b=0
while len_parsed<len_to_parse:
len_header=len(feature_id)+11
headers=[feature_id+"_source ",feature_id+"_target ",len_header*" "]
add_a=line_a[len_parsed:len_parsed+60]
add_b=line_b[len_parsed:len_parsed+60]
add_c=line_c[len_parsed:len_parsed+60]
nb_res_a+=len(add_a)-add_a.count("-")
nb_res_b+=len(add_b)-add_b.count("-")
aln_line+=f'{headers[0]}{add_a} {nb_res_a}\n'
aln_line+=f'{headers[1]}{add_b} {nb_res_b}\n'
aln_line+=f'{headers[2]}{add_c}\n\n'
len_parsed+=60
aln_line+="\n"
return aln_line
# creates the alignment for a feature
def create_line_aln(feature_path_source_genome,feature_path_target_genome,seg_seq,feature_id):
line_a=""
line_b=""
line_c=""
[i,j]=[0,0]
first=True # when writing the first part of the feature, dont take the whole segment, only the part that the feature is on
last=False # same for the last part of the feature
while (i<len(feature_path_source_genome)) and (j<len(feature_path_target_genome)):
if i==len(feature_path_source_genome)-1:
last=True
if feature_path_source_genome[i] != feature_path_target_genome[j]: # if there is a difference between the two paths
if feature_path_target_genome[j] not in feature_path_source_genome: # if the segment in target genome is absent in source genome
if feature_path_source_genome[i] not in feature_path_target_genome: # if the segment in source genome is absent is target genome : substitution
[add_a,add_b,add_c,first]=segment_aln("substitution",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j],first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
else: # target genome segment not in source_genome, but source_genome segment in target genome : insertion
[add_a,add_b,add_c,first]=segment_aln("insertion",seg_seq,"",feature_path_target_genome[j],first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
j+=1
elif feature_path_source_genome[i] not in feature_path_target_genome: # source_genome segment not in target genome, but target genome segment in source_genome : deletion
[add_a,add_b,add_c,first]=segment_aln("deletion",seg_seq,feature_path_source_genome[i],"",first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1
else : # if both segments are present in the other genome but not at the same position. weird case never found yet
[add_a,add_b,add_c,first]=segment_aln("substitution",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j],first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
else: # segment present in both, no variation.
[add_a,add_b,add_c,first]=segment_aln("identity",seg_seq,feature_path_source_genome[i],feature_path_target_genome[j],first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
i+=1;j+=1
if i<=len(feature_path_source_genome)-1: # if we didn't reach the length of the segment list for the first genome, the end is missing for the second genome
[add_a,add_b,add_c,first]=segment_aln("end_deletion",seg_seq,feature_path_source_genome[i:],"",first,feature_id,last)
line_a+=add_a;line_b+=add_b;line_c+=add_c
return parse_aln_lines(line_a,line_b,line_c,feature_id)
from .intersect import invert_seg
# find all the copies of the segments from the source list in the target genome
def find_all_seg(list_seg_source,walk_name,segments_on_target_genome):
index=0
list_seg_target=[] # contains list of info for each seg : [seg_id,seg_strand,start_pos,index_on_source_walk]
list_seg_source_unstranded=[] # contains the path in the source genome, but with the strand separated from the segment_id : [s24,>]
for seg in list_seg_source:
list_seg_source_unstranded.append([seg[1:],seg[0]]) # seg_id,seg_strand : [s24,>]
seg_inverted=invert_seg(seg)
# look for all the segment copies in the target genome walk, in both orientations
if (seg in segments_on_target_genome) and (walk_name in segments_on_target_genome[seg]): # if the segment is in the target genome on the right walk (chr,ctg)
for copy in segments_on_target_genome[seg][walk_name]: # take all its copies
seg_info=[seg[1:],seg[0],int(copy[1]),index] # [s24,>,584425,4]
list_seg_target.append(seg_info)
if (seg_inverted in segments_on_target_genome) and (walk_name in segments_on_target_genome[seg_inverted]) : # same but with the segment inverted
for copy in segments_on_target_genome[seg_inverted][walk_name]:
seg_info=[seg_inverted[1:],seg_inverted[0],int(copy[1]),index]
list_seg_target.append(seg_info)
index+=1
list_seg_target.sort(key=sort_seg_info) # order the list of segments by start position
return [list_seg_target,list_seg_source_unstranded]
def find_gene_copies(list_seg_source,walk_name,feature_id,segments_on_target_genome):
# find all copies of all segments from the gene in the target genome (in both orientations)
[list_seg_target,list_seg_source_unstranded]=find_all_seg(list_seg_source,walk_name,segments_on_target_genome)
# find each copy of the gene in the ordered list of segments
[first_segs_list,last_segs_list]=detect_copies(list_seg_target,list_seg_source_unstranded,walk_name,feature_id,segments_on_target_genome)
# join the first and last seg lists
first_last_segs_list=[]
index=0
for first_seg in first_segs_list:
last_seg=last_segs_list[index]
pair=(first_seg,last_seg)
first_last_segs_list.append(pair)
index+=1
return first_last_segs_list # return a list of pairs (first_seg,last_seg)
# called by find_gene_copies
def detect_copies(list_seg_target,list_seg_source_unstranded,walk_name,feature_id,segments_on_target_genome):
# prepare the variables for the first iteration of the for loop
[old_id,old_strand,old_start,old_index]=list_seg_target[0]
[first_segs_list,last_segs_list]=[[],[]]
old_seg_id=old_strand+old_id
first_segs_list.append(old_seg_id)
copy_number=1
copy_id="copy_"+str(copy_number)
feat_copy=(feature_id,copy_id)
for segment in segments_on_target_genome[old_seg_id][walk_name]: # look for the first seg to add the occurence info
if segment[1]==old_start:
copy_id="copy_"+str(copy_number)
feat_copy=(feature_id,copy_id)
segment.append(feat_copy)
break
# adjust old_index for the first iteration of the loop
first_inversion=(old_strand!=list_seg_source_unstranded[old_index][1])
if first_inversion:
old_index+=1
else:
old_index-=1
for seg in list_seg_target: # find and annotate (with feat_copy) all the first and last segments of the feature copies
[new_id,new_strand,new_start,new_index]=seg
new_seg_id=new_strand+new_id
inversion=(seg[1]!=list_seg_source_unstranded[new_index][1]) # inversion if this segment's strand is not the same as in the source walk
if inversion :
if not((old_strand==new_strand) and (old_index>new_index)): # not (if the index decreases and the strand stays the same, it is the same gene copy)
last_segs_list.append(old_seg_id)
add_occurence_info_seg(old_seg_id,walk_name,old_start,feat_copy,segments_on_target_genome) # add info for the last seg of the previous copy
copy_number+=1
copy_id="copy_"+str(copy_number)
feat_copy=(feature_id,copy_id) # new feature copy, change the info
first_segs_list.append(new_seg_id)
add_occurence_info_seg(new_seg_id,walk_name,new_start,feat_copy,segments_on_target_genome) # add info for the first seg of the new copy
else:
if not((old_strand==new_strand) and (old_index<new_index)): # not (if the index increases and the strand stays the same, it is the same gene copy)
last_segs_list.append(old_seg_id)
add_occurence_info_seg(old_seg_id,walk_name,old_start,feat_copy,segments_on_target_genome) # add info for the last seg of the previous copy
copy_number+=1
copy_id="copy_"+str(copy_number)
feat_copy=(feature_id,copy_id) # new feature copy, change the info
first_segs_list.append(new_seg_id)
add_occurence_info_seg(new_seg_id,walk_name,new_start,feat_copy,segments_on_target_genome) # add info for the first seg of the new copy
[old_strand,old_index,old_seg_id,old_start]=[new_strand,new_index,new_seg_id,new_start]
# add the last seg info
last_segs_list.append(old_seg_id)
add_occurence_info_seg(old_seg_id,walk_name,new_start,feat_copy,segments_on_target_genome) # add info for the last seg of the last copy
return [first_segs_list,last_segs_list]
def add_occurence_info_seg(target_seg_id,walk_name,target_start,feat_copy,segments_on_target_genome):
for segment in segments_on_target_genome[target_seg_id][walk_name]: # look for the right occurence of the segment
if segment[1]==target_start:
segment.append(feat_copy) # add seg info
break
def sort_seg_info(seg_info):
return seg_info[2]
# look for the segment on either strand of the target genome
def search_seg_on_target_genome(segment,segments_on_target_genome):
inverted_segment=invert_seg(segment)
if segment in segments_on_target_genome:
#if inverted_segment in segments_on_target_genome:
# print(segment," found in both orientations")
return segment
elif inverted_segment in segments_on_target_genome:
#print("inverted seg found *****")
return inverted_segment
else:
return False
# look for a segment on a walk, in either orientations
def search_seg_on_walk(segment,walk,segments_on_target_genome): # for now just print the first found, look for several later...
inverted_segment=invert_seg(segment)
if segment in segments_on_target_genome:
if walk in segments_on_target_genome[segment]:
return segment
elif inverted_segment in segments_on_target_genome:
if walk in segments_on_target_genome[inverted_segment]:
return inverted_segment
else:
return False
# get the first and last segment of the list that is in the target genome (possibly several pairs)
def get_first_last_seg(list_seg,segments_on_target_genome):
list_first_last_segs=[]
[first_seg_found,last_seg_found,walk_found]=['','','']
list_walks=get_walks_feature_cross(list_seg,segments_on_target_genome) # get all the walks where there is a segment of the feature
for walk in list_walks: # find the first and last seg for each walk
for segment in list_seg: # look for first_seg
seg_found=search_seg_on_walk(segment,walk,segments_on_target_genome)
if seg_found:
first_seg_found=seg_found
break
if first_seg_found!='': # look for last_seg
for segment in reversed(list_seg):
last_seg_found=search_seg_on_walk(segment,walk,segments_on_target_genome)
if last_seg_found:
walk_found=walk
break
list_first_last_segs.append([first_seg_found,last_seg_found,walk_found])
[first_seg_found,last_seg_found,walk_found]=['','','']
# return all the match
return list_first_last_segs
# functions to get the detail of the variations in the features
# find all the walks that contain a segment of the feature (list_seg is the walk of the feature on the source genome)
def get_walks_feature_cross(list_seg,segments_on_target_genome):
list_walks=list()
for segment in list_seg:
seg_found=search_seg_on_target_genome(segment,segments_on_target_genome)
if seg_found: # if the segment or the reverse complement is on the target genome
for walk in segments_on_target_genome[seg_found]:
if walk not in list_walks:
list_walks.append(walk)
return list_walks
\ No newline at end of file
from .intersect import invert_seg
# functions to detect inversions
# gives the list of segments from dict1 that are in dict2
def get_common_segments(dict1,dict2):
list_common=[]
for segment in dict1:
if segment in dict2:
list_common.append(segment)
return list_common
# check if common segments in the two dict have the same strand
def compare_strand(dict1,dict2): # dict1 and dict2 : [seg_id]->[seg_strand]
seg_common=get_common_segments(dict1,dict2)
# for each segment in common, check if the strand is the same
same_strand_count=0
for segment in seg_common:
if dict1[segment]==dict2[segment]:
same_strand_count+=1
return [len(seg_common),same_strand_count]
# check if the two dict have their segments in the inverted order
def detect_segment_order_inversion(dict1,dict2):
list_1_common=get_common_segments(dict1,dict2)
list_2_common=get_common_segments(dict2,dict1) # same segments, different orders
list_2_common_reversed=list(reversed(list_2_common))
[cpt,i]=[0,0]
while i<len(list_1_common):
if list_2_common_reversed[i]==list_1_common[i]:
cpt+=1
i+=1
return (cpt>len(list_1_common)*0.9) # if more than 90% of the segments are on the same position when the lists are reversed, there is an inversion.
# check if the two dict have the same segments but in different orientation
def detect_orient_inversion(dict1,dict2):
[seg_common_count,same_strand_count]=compare_strand(dict1,dict2)
if same_strand_count>=seg_common_count*0.9: # if more than 90% of segments shared have the same strand, no inversion
return [False,dict1,dict2]
else:
return [True,dict1,dict2]
# takes two lists of segments for two genes, check if the first list is an inversion of the second one (if the segments in common are on the opposite strand)
def detect_feature_inversion(list_1,list_2):
# convert list into dict with unstranded segment id as key and strand as value
strand1=[seg[0] for seg in list_1]
id1=[seg[1:] for seg in list_1]
dict1 = {id1[i]: strand1[i] for i in range(len(strand1))}
strand2=[seg[0] for seg in list_2]
id2=[seg[1:] for seg in list_2]
dict2 = {id2[i]: strand2[i] for i in range(len(strand2))}
# check if we have an inversion of the orientation of the segments
[strand_inversion,dict1,dict2]=detect_orient_inversion(dict1,dict2)
# check if we have an inversion of the order of the segments
segment_order_inversion=detect_segment_order_inversion(dict1,dict2)
# if there we have both inversions, the gene is in an inverted region
if segment_order_inversion and strand_inversion:
return True
else :
return False
# invert all the segments in a list (change the orientation)
def invert_segment_list(seg_list):
list_inverted=list()
for seg in seg_list:
list_inverted.append(invert_seg(seg))
return list(reversed(list_inverted))
from .usefull_little_functions import *
from .intersect import invert_seg,Features
from .detect_inversion import detect_feature_inversion
from .Functions import get_id_cov
# outputs the gff line of the target genome for a feature occurence (a feature can be transfered at several positions)
def generate_target_gff(feature_id,seg_size,args,reason_features_not_transfered,match,file_out_gff,segments_on_target_genome):
feature=Features[feature_id]
# get list of the segments where it is and the first and last segment of the feature on the new genome
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
inversion=detect_feature_inversion(feature.segments_list_source,feature_target_path)
if first_seg!='': # feature present on the target genome
if inversion:
size_on_new_genome=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id,segments_on_target_genome)-get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id,segments_on_target_genome)+1
else:
size_on_new_genome=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id,segments_on_target_genome)-get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id,segments_on_target_genome)+1
size_diff=abs(size_on_new_genome-feature.size)
[cov,id]=match[3]
if (cov*100<args.coverage) or (id*100<args.identity):
reason_features_not_transfered[1]+=1
match.append(False) # store the information that this gene copy didnt pass the filters
return "no"
else:
line_gff=create_line_target_gff(first_seg,last_seg,feature_id,size_diff,inversion,walk,cov,id,copy_id,segments_on_target_genome) # transfer the gene
file_out_gff.write(line_gff)
# transfer all the gene's childs
transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size,file_out_gff,segments_on_target_genome)
match.append(True) # store the information that this gene copy was transfered
return size_diff
else :
reason_features_not_transfered[0]+=1
match.append(False) # store the information that this gene copy didnt pass the filters
return "no"
def transfer_childs(feature_id,feature_target_path,walk,copy_id,inversion,seg_size,file_out_gff,segments_on_target_genome):
childs_list=Features[feature_id].get_child_list(Features)
for child_id in childs_list:
child=Features[child_id]
# look for the first and last seg of the child in its parent path
[child_first_seg,child_last_seg]=['','']
for seg in child.segments_list_source: # get first seg in the parent path
if seg in feature_target_path: # parent path
child_first_seg=seg
break
elif invert_seg(seg) in feature_target_path:
child_first_seg=invert_seg(seg)
break
if child_first_seg!='': # check that the child feature is not absent in this occurence of the parent feature
for seg in reversed(child.segments_list_source): # get last seg in the parent path
if seg in feature_target_path: # parent path
child_last_seg=seg
break
elif invert_seg(seg) in feature_target_path:
child_last_seg=invert_seg(seg)
break
if inversion: # calculate the child feature size
child_size_on_new_genome=get_feature_start_on_target_genome_inv(child_last_seg,child_id,walk,copy_id,segments_on_target_genome)-get_feature_stop_on_target_genome_inv(child_first_seg,child_id,walk,copy_id,segments_on_target_genome)+1
else:
child_size_on_new_genome=get_feature_stop_on_target_genome(child_last_seg,child_id,walk,copy_id,segments_on_target_genome)-get_feature_start_on_target_genome(child_first_seg,child_id,walk,copy_id,segments_on_target_genome)+1
child_size_diff=abs(child_size_on_new_genome-child.size)
for match in child.segments_list_target: # look for the right occurence of the child feature
if (match[0]==walk) and (match[1]==copy_id):
seg_list=match[2]
[cov,id]=get_id_cov(child_id,seg_size,seg_list)
break
if inversion:
temp=child_first_seg
child_first_seg=child_last_seg
child_last_seg=temp
# transfer the child feature:
line_gff=create_line_target_gff(child_first_seg,child_last_seg,child_id,child_size_diff,inversion,walk,cov,id,copy_id,segments_on_target_genome)
file_out_gff.write(line_gff)
# generates the line for the gff of the target genome
def create_line_target_gff(first_seg,last_seg,feature_id,size_diff,inversion,walk,cov,id,copy_id,segments_on_target_genome):
[chr,strand,feature]=[get_seg_occ(first_seg,walk,feature_id,copy_id,segments_on_target_genome)[0],Features[feature_id].strand,Features[feature_id]]
annotation=f'{feature.annot};Size_diff={size_diff};coverage={cov};sequence_ID={id}' # Nb_variants={var_count};
if inversion:
start=get_feature_start_on_target_genome_inv(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
stop=get_feature_stop_on_target_genome_inv(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
strand=invert_strand(strand)
else:
start=get_feature_start_on_target_genome(first_seg,feature_id,walk,copy_id,segments_on_target_genome)
stop=get_feature_stop_on_target_genome(last_seg,feature_id,walk,copy_id,segments_on_target_genome)
if start>stop:
temp=start
start=stop
stop=temp
if strand=='':
strand='.'
output_line=f'{chr}\tGrAnnoT\t{feature.type}\t{start}\t{stop}\t.\t{strand}\t.\t{annotation}\n'
return output_line
\ No newline at end of file
from .load_intersect import Segments,Features,find_part_segment_source
from .intersect import Segments,Features,find_part_segment_source
from .usefull_little_functions import *
from tqdm import tqdm
......
from .ClassSegFeat import Segment,Feature
from tqdm import tqdm
import random
import string
import subprocess
import sys
def run_intersect(args):
print("Computing the intersection between the annotations and the graph segments")
seg_coord_path=args.segment_coordinates_path
if args.haplotype:
if args.source_haplotype=='.':
files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
# select haplotype
source_haplotypes=[]
for file in files:
file_haplotype=file.name.split("#")[1]
source_haplotypes.append(file_haplotype)
first_source_haplo=min(source_haplotypes)
args.source_haplotype=first_source_haplo
haplo=f"#{first_source_haplo}#"
print('\033[35m'+f'Warning : No source haplotype was given with \'-sh\'. Haplotype {first_source_haplo} will be used.'+'\033[0m')
files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome haplotype in seg_coord
message=f'No bed files corresponding to the source genome haplotype {first_source_haplo} found in {seg_coord_path}'
else:
haplo = f"#{args.source_haplotype}#"
files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome in seg_coord
message=f'No bed files corresponding to the source genome haplotype found in {seg_coord_path}'
if len(files)==0:
sys.exit(message)
if args.verbose:
print(f' Found {len(files)} paths corresponding to the source genome haplotype {args.source_haplotype}')
else:
files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
message='\33[31m'+f'Error : No bed files corresponding to the source genome found in {seg_coord_path}. Please check that the source genome name is right.'+'\033[0m'
if len(files)==0:
sys.exit(message)
if args.verbose:
print(f' Found {len(files)} paths corresponding to the source genome')
intersect_path=args.outdir.joinpath("intersect")
command=f"echo -n '' > {intersect_path}" # empty the file "intersect"
subprocess.run(command,shell=True,timeout=None)
for file in files:
if args.verbose:
print(f' Building the intersection for the path {file.stem}')
command=f"bedtools intersect -nonamecheck -wo -a {file.as_posix()} -b {args.gff.as_posix()}>>{intersect_path}"
subprocess.run(command,shell=True,timeout=None)
intersect_lines = int(subprocess.Popen(f"wc -l {intersect_path}", shell=True, stdout=subprocess.PIPE).stdout.read().decode()[0])
if intersect_lines==0:
print('\33[31m'+"Error : No lines in the intersect. Please check that the sequence id in the GFF file (1st field) matches the sequence id in the GFA file (4th field of the W lines or 3rd part of the second field of the P lines)."+'\033[0m')
exit()
# create global variables to manipulate them in the functions below and to pass them to Functions.py
global Features
global Segments
Segments={}
Features={}
# functions to create the segments and the features
# get the feature's start position on the segment (among segments on the source genome)
def get_feature_start_on_segment(seg_id,feat_id):
s=Segments[search_segment(seg_id)]
f=Features[find_part_segment_source(s.id,feat_id)]
if s.get_start(f.id)>=f.start:
result=1
else:
result=f.start-s.get_start(f.id)+1
return result
# get the feature's stop position on the segment (among segments on the source genome)
def get_feature_stop_on_segment(seg_id,feat_id):
s=Segments[search_segment(seg_id)]
f=Features[find_part_segment_source(s.id,feat_id)]
if s.get_stop(f.id)<=f.stop:
result=s.get_size(f.id)
else:
result=f.stop-s.get_start(f.id)+1
return result
# get the inverted segment
def invert_seg(seg):
if seg[0]==">":
inv_seg="<"+seg[1:]
elif seg[0]=="<":
inv_seg=">"+seg[1:]
else:
print(seg," not invertable")
return seg
return inv_seg
# look for a segment in the dict Segments, in one orientation or the other
def search_segment(segment):
if segment not in Segments:
if invert_seg(segment) in Segments:
return invert_seg(segment)
return False
return segment
def find_part_segment_source(segment,feature_id):
first_feature=Features[feature_id]
if segment in first_feature.segments_list_source:
return feature_id
else:
list_parts=first_feature.other_parts_list
for part in list_parts:
if segment in Features[part].segments_list_source:
return part
# create a segment with its first feature
def init_seg(line,segment_id,feat_id,strand):
[chr,start,stop]=[line[0],int(line[1])+1,int(line[2])] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
# create the segment, store it in the Segments dict
Segments[segment_id]=Segment(segment_id,feat_id,chr,start,stop,strand)
# create a feature with the first segment its on
def init_feature(line,feature_id,segment_oriented):
[type,annot,chr,start,stop,strand]=[line[6],line[12],line[4],int(line[7]),int(line[8]),line[10]]
if feature_id in Features: # if the feature has already been created (in order to store the child), get the child and rewrite it
childs=Features[feature_id].childs
else:
childs=[]
parent=get_parent(annot,feature_id)
# add the current segment to the list of segments that have the feature
segments_list=[segment_oriented]
# create the feature, store it in the dict Features
Features[feature_id]=Feature(feature_id,type,chr,start,stop,annot,childs,parent,segments_list,strand,True,False)
# if there is a parent, returns the id of the parent feature and add the child to the parent
def get_parent(annot,feature_id): # check how the parent info is stored in the gff format in general (this is a particular case...) todo
if (len(annot.split(";"))>1) and (annot.split(";")[1].split("=")[0]=="Parent"):
# for annotations that look like : ID=LOC_Os01g01010.1:exon_7;Parent=LOC_Os01g01010.1, where the parent is in the field 1
parent=annot.split(";")[1].split("=")[1].replace(".","_").replace(":","_")
add_child(parent,feature_id)
elif (len(annot.split(";"))>2) and (annot.split(";")[2].split("=")[0]=="Parent"):
# for annotations that look like : ID=LOC_Os01g01010.1;Name=LOC_Os01g01010.1;Parent=LOC_Os01g01010, where the parent is in the field 2
parent=annot.split(";")[2].split("=")[1].replace(".","_").replace(":","_")
add_child(parent,feature_id)
else: parent=""
return parent
# add a feature to an existing segment
def add_feature(line,seg,new_feat_id,new_strand):
segment=Segments[seg]
if not segment.find_feat(new_feat_id)[0]:
[chr,start,stop]=[line[0],int(line[1])+1,int(line[2])] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
segment.add_feature(new_feat_id,chr,start,stop,new_strand)
# add a child feature to an existing feature
def add_child(feat,new_child):
if feat in Features: # if the feature exists, add new_child
Features[feat].childs.append(new_child)
else: # create the feature "empty", only with child
Features[feat]=Feature(feat,"","",0,0,"",[new_child],"",[],"",False,False)
# add a segment to an existing feature
def add_seg(feat_id,new_seg_oriented):
# if new_seg_oriented in Features[feat_id].segments_list_source:
# print("seg already in feat, duplicate segment ! feature:",feat_id,"segment:",new_seg_oriented)
# print("list for now :",Features[feat_id].segments_list_source)
Features[feat_id].segments_list_source.append(new_seg_oriented)
# add the position of the feature on its first and last segment
def add_pos(feature_id):
feature=Features[feature_id]
feature.pos_start=get_feature_start_on_segment(feature.segments_list_source[0],feature_id)
feature.pos_stop=get_feature_stop_on_segment(feature.segments_list_source[-1],feature_id)
# handle the Feature object creation
def load_feature_intersect(line,feature_id,seg_oriented):
if (feature_id not in Features) or (not Features[feature_id].complete): # if the feature doesn't exist or is not complete, create it or complete it
init_feature(line,feature_id,seg_oriented)
else: # if it exists, add the current segment to the list of segments that have the existing feature
# if the feature existing starts and stops at the same positions as the current feature, add_seg
current_start=int(line[7])
current_stop=int(line[8])
feature=Features[feature_id]
if (current_start==feature.start) and (current_stop==feature.stop):
add_seg(feature_id,seg_oriented)
else: # same feature_id, but not same part of the feature -> need to store them in two separate objects
add_segment_to_discontinuous_feature(line,feature,seg_oriented,current_start,current_stop)
def add_segment_to_discontinuous_feature(line,feature,seg_oriented,current_start,current_stop):
added=False
for other_part_id in feature.other_parts_list:
other_part=Features[other_part_id]
if (other_part.start==current_start) and (other_part.stop==current_stop): # look for the current part of the feature to add the segment
add_seg(other_part_id,seg_oriented)
added=True
break
if added==False : # if the right part of the feature was not found, create a new part for the feature
new_part_name=feature.id+''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(7))
[chr,start,stop]=[line[4],int(line[7]),int(line[8])]
segments_list=[seg_oriented]
Features[new_part_name]=Feature(feature.id,feature.type,chr,start,stop,feature.annot,feature.childs,feature.parent,segments_list,feature.strand,True,True)
new_part_feature=Features[new_part_name]
new_part_feature.first=False
new_part_feature.first_part=feature.id
feature.other_parts_list.append(new_part_name)
feature.discontinuous=True
if feature.parent in Features:
add_child(feature.parent,new_part_name)
# handle the Segment object creation
def load_segment_intersect(line,segment_id,feat_id,strand):
if segment_id not in Segments: # if the segment doesn't exist, create it and add the current feature to its feat list
init_seg(line,segment_id,feat_id,strand)
else: # if it exists, add the current feature to the list of features on the existing segment
add_feature(line,segment_id,feat_id,strand)
# create a note for the child features that do not have annotation.
def set_note(feature_id): # not universal, depends on the format of the input gff...
# the note contains information on the function of the feature and is used for statistics on hypothetical/putatives features.
# in the gff, the notes are only on the "gene" features. it's easier to have it for the childs than to check the parent's note (or the parent's parent).
feature=Features[feature_id]
note=""
parent_feat=find_parent(feature_id)
if parent_feat!='':
annot=Features[parent_feat].annot.split(';')
if (len(annot)>0) and (annot[-1].split('=')[0]=="Note"):
note=annot[-1].split('=')[1]
feature.note=note
# create all the Segment and Feature objects in the dictionnaries Segments and Features
def load_intersect(intersect_path,verbose):
if not verbose:
print("Loading the intersection information")
# open the file with the intersect between the segments and the gff
total_lines=0
if verbose:
total_lines = len(["" for line in open(intersect_path,"r")])
with open(intersect_path,"r") as intersect_file:
for line in tqdm(intersect_file, desc="Loading the intersection information", total=total_lines, unit=" line",disable=not verbose):
line=line.split()
# get the ids for the dictionnaries' keys
feature_id=line[12].split(';')[0].split("=")[1].replace(".","_").replace(":","_")
segment_id=line[3]
strand=line[10]
segment_oriented=line[3]
load_feature_intersect(line,feature_id,segment_oriented)
load_segment_intersect(line,segment_id,feature_id,strand)
# for all the features, add the position of the feature on its first and last segment, and the note.
# cant do it before because for that i need to have all the segments in the list segments_list for each feature.
for feat_id in Features:
if Features[feat_id].complete==False:
print('***',feat_id,Features[feat_id])
add_pos(feat_id)
set_note(feat_id)
# order the dictionnary
order_dict(Features)
# orders the features so that each features is always preceded by its parent feature (gene -> mrna -> exons, cds, utr). useful for the transfer later.
def order_dict(features_dict):
copy_Features=dict(features_dict) # stores the features in a copy
features_dict.clear()# empty dict, to refill it with the features in the right orcer
for feature_id in copy_Features:
add_feature_dict(feature_id,copy_Features,features_dict)
# add a feature to the ordered dictionnary
def add_feature_dict(feature_id,old_dict_features,new_dict_features):
if feature_id not in new_dict_features:
feature=old_dict_features[feature_id]
# check that the parent is present
if feature.parent!='': # recursively find the parent (most of the time a gene)
add_feature_dict(feature.parent,old_dict_features,new_dict_features)
# if the feature is discontinuous, check that the other parts are present :
if feature.discontinuous and feature.first:
for part_id in feature.other_parts_list:
add_feature_dict(feature.parent,old_dict_features,new_dict_features)
# once the parent is present and the other parts are added, add the feature
new_dict_features[feature_id]=feature
# find the gene parent of a feature
def find_parent(feature_id): # add a check if there is no parent !
feature=Features[feature_id]
if feature.parent=='': # no parent, usually a gene
return feature.id
else:
current=feature.parent
parent_found=False
while parent_found==False:
if Features[current].parent=='': # current doesnt have a parent
return current
else: # if we didn't find the parent, we go up to the current feature's parent until we find it
current=Features[current].parent
from tqdm import tqdm
# get the genome name that corresponds to the file_name (file_name if genome_name+walk_name). not universal..?
def get_genome_name(target_genomes,file_name):
for genome in target_genomes:
if genome in file_name:
return genome
return ""
# create a dictionnary with the length of the segments : s5->8
def get_segments_length(segments,verbose):
seg_len={}
total_lines=0
if verbose:
total_lines=len(["" for line in open(segments,"r")])
with open(segments,'r') as segments_file:
for line in tqdm(segments_file, desc="Computing the lengths of the segments", total=total_lines, unit=" segment", disable=not verbose):
line=line.split()
seg_id=line[1]
seg_len[seg_id]=len(line[2])
return seg_len
# generates a dictionnary that associaces the segments to their sequence : 5->AGGCTAA
def get_segments_sequence(segments_file,segments_list):
with open(segments_file,'r') as file_segments:
seg_seq={}
for line in file_segments:
line=line.split()
seg_id=line[1]
if seg_id in segments_list:
seg_seq[seg_id]=line[2]
return seg_seq
def get_target_genomes(args):
# get the target genomes if there is none specified
if len(args.target)==0:
walk_path=args.segment_coordinates_path.joinpath("walks.txt")
total_lines = len(["" for line in open(walk_path,"r")])
with open(walk_path,'r') as walks:
for line in tqdm(walks,desc="Fetching all the genomes in the graph for the transfer",total=total_lines,unit=" line",disable=not args.verbose):
genome_name=line.split()[1]
if (args.source_genome not in genome_name) and (genome_name not in args.target) and ("MINIGRAPH" not in genome_name):
args.target.append(genome_name)
if args.verbose:
print(f" Genomes found : {args.target}")
# get haplotypes for each target genome
if args.haplotype:
target_set=set()
for genome in args.target:
bed_files=list(args.segment_coordinates_path.glob(f"*{genome}*.bed"))
for haplotype in bed_files:
haplotype_split=haplotype.name.split("#")
target_set.add(haplotype_split[0]+"#"+haplotype_split[1])
args.target=list(target_set)
# generates a dictionnary that associates a walk_name to a list of segments : chr10->[>s1,>s2,>s4]
def get_paths(walks_file,target_genome,haplotype):
with open(walks_file,'r') as file_walks:
paths={}
for line in file_walks:
line=line.split()
if haplotype:
seq_name=line[1]+"#"+line[2]+"#"+line[3]
else:
seq_name=line[1]+'_'+line[3]
if target_genome in seq_name: # get the walk of the genome
path=line[6].split(',')[1:]
list_segments=[]
for segment in path:
list_segments.append(segment)
if seq_name in paths:
paths[seq_name].extend(list_segments)
else:
paths[seq_name]=list_segments
return paths
# appends a dictionnary that associates a segments with its position on all the walks it's on (start stop and index in the segment list)
def get_segments_positions_on_genome(pos_seg,segments_on_target_genome): # add to the dict the info about the segments.
with open(pos_seg,'r') as bed:
seg_count=0
file_name='.'.join(pos_seg.split('/')[-1].split('.')[0:-1]) # split by '.' to get the filename without the extention, then join by '.' in case there is a '.' in the filename
for line in bed:
line=line.split()
[seg,chrom,start,stop,strand,index]=[line[3],line[0],int(line[1])+1,int(line[2]),line[3][0:1],seg_count] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
# check if segment present twice on the same walk ???
#segments_on_target_genome[seg]=[chrom,start,stop,strand,index,file_name]
if seg not in segments_on_target_genome:
segments_on_target_genome[seg]={} # dict of walks->segment_info; you get the info about the segment for each walk
if file_name not in segments_on_target_genome[seg]:
segments_on_target_genome[seg][file_name]=list()
segments_on_target_genome[seg][file_name].append([chrom,start,stop,strand,index])
seg_count+=1
......@@ -2,12 +2,12 @@
# licensed under MIT
import subprocess
from .load_intersect import load_intersect,Features
from .Functions_output import get_paths,transfer_on_target,get_segments_positions_on_genome
from .intersect import run_intersect,load_intersect,Features
from .argparser import read_args,arg
from .setup_transfer import get_genome_name,get_segments_length,get_target_genomes,run_intersect
from .load_gfa import *
#from inference import *
from .graph_annot import graph_gff,graph_gaf
from .setup_transfer import transfer_on_target
from pathlib import Path
def main():
......@@ -18,12 +18,11 @@ def main():
# intersect between gff and all the bed files from the source genome given by seg_coord
run_intersect(args)
# intersect is in the output directory
intersect=Path(args.outdir.joinpath("intersect")).resolve()
intersect=Path(args.outdir.joinpath("intersect")).resolve() # intersect is in the output directory
gfa=args.graph
load_intersect(intersect.as_posix(),args.verbose)
command=f"rm {intersect.as_posix()}"
command=f"rm {intersect.as_posix()}" # delete intersect file
subprocess.run(command,shell=True,timeout=None)
segments=args.segment_coordinates_path.joinpath("segments.txt")
......@@ -31,34 +30,29 @@ def main():
# outputs the gff and gaf of the graph
if args.graph_gff or args.graph_gaf:
print("\n")
if args.graph_gff:
out_graph_gff=args.outdir.joinpath(gfa.stem).as_posix()+".gff" # what if there is several suffixes and the last one isn't '.gfa' ?
graph_gff(out_graph_gff,args.verbose)
if args.graph_gaf:
seg_size=get_segments_length(segments,False)
out_graph_gaf=args.outdir.joinpath(gfa.stem).as_posix()+".gaf"
graph_gaf(out_graph_gaf,seg_size,args.verbose)
if args.graph_gff:
out_graph_gff=args.outdir.joinpath(gfa.stem).as_posix()+".gff" # todo : what if there is several suffixes and the last one isn't '.gfa' ?
graph_gff(out_graph_gff,args.verbose)
if args.graph_gaf:
seg_size=get_segments_length(segments,False)
out_graph_gaf=args.outdir.joinpath(gfa.stem).as_posix()+".gaf"
graph_gaf(out_graph_gaf,seg_size,args.verbose)
# if a transfer on a target genome is asked # what about pav ??
# todo : what about pav ??
if args.annotation or args.variation or args.alignment:
if args.verbose:
print('\n')
get_target_genomes(args)
# if a pav matrix is asked, create dictionnary to store the features, and for each feature the value for all the target genomes.
pav_dict={}
if args.pav_matrix:
pav_line=len(args.target)*[1]
pav_dict["gene_id"]=list(args.target)
for feat in Features.keys():
if Features[feat].type=='gene':
pav_dict[feat]=pav_line.copy()
# initialize the pav matrix
pav_dict=init_pav(args,Features)
# build a dictionnary with the segment sizes to compute the coverage and id
if not args.graph_gaf:
seg_size=get_segments_length(segments,args.verbose)
genome_index=0
for target_genome in args.target:
print(f'\n{target_genome} transfer :')
......@@ -80,16 +74,12 @@ def main():
if genome_name==target_genome :
if args.verbose:
print(f' Loading the segments coordinates for the path {file.stem}')
get_segments_positions_on_genome(file.as_posix())
# create output files names
out_target_gff=genome_dir.joinpath(f'{target_genome}.gff')
out_target_var=genome_dir.joinpath(f'{target_genome}_var.txt')
out_target_aln=genome_dir.joinpath(f'{target_genome}_aln.txt')
segments_on_target_genome={}
get_segments_positions_on_genome(file.as_posix(),segments_on_target_genome)
list_feat_absent=[]
# do the annotation transfer (or var/aln)
transfer_on_target(segments,out_target_gff,out_target_var,out_target_aln,target_genome,target_genome_paths,list_feat_absent,seg_size,args)
transfer_on_target(segments,genome_dir,target_genome,target_genome_paths,list_feat_absent,seg_size,args,segments_on_target_genome)
# if pav matrix is asked, add the information of this transfer on the matrix
if args.pav_matrix:
for feat in list_feat_absent:
......@@ -97,16 +87,31 @@ def main():
pav_dict[feat][genome_index]=0
genome_index+=1
# print the pav matrix
if args.pav_matrix:
print('\nGeneration of the presence-absence matrix for the transfered genes')
pav_output=''
for line in pav_dict:
pav_output+=line
for field in pav_dict[line]:
pav_output+="\t"+str(field)
pav_output+="\n"
out_pav=args.outdir.joinpath("PAV_matrix.txt")
with open(out_pav,'w') as file_out_pav:
file_out_pav.write(pav_output)
print_pav_matrix(pav_dict,args)
def init_pav(args,Features):
# if a pav matrix is asked, create dictionnary to store the features, and for each feature the value for all the target genomes.
pav_dict={}
if args.pav_matrix:
pav_line=len(args.target)*[1]
pav_dict["gene_id"]=list(args.target)
for feat in Features.keys():
if Features[feat].type=='gene':
pav_dict[feat]=pav_line.copy()
return pav_dict
def print_pav_matrix(pav_dict,args):
# print the pav matrix
if args.pav_matrix:
print('\nGeneration of the presence-absence matrix for the transfered genes')
pav_output=''
for line in pav_dict:
pav_output+=line
for field in pav_dict[line]:
pav_output+="\t"+str(field)
pav_output+="\n"
out_pav=args.outdir.joinpath("PAV_matrix.txt")
with open(out_pav,'w') as file_out_pav:
file_out_pav.write(pav_output)
\ No newline at end of file
from .intersect import Features
from .Functions import add_target_genome_paths,get_id_cov,stats_feature_missing_segment,stats_features
from .usefull_little_functions import get_featurePath_ends
from .load_gfa import get_segments_sequence
from .genome_transfer import generate_target_gff
from .variation_details import generate_variations_details
from .alignment import print_alignment
from tqdm import tqdm
import subprocess
import sys
# get the genome name that corresponds to the file_name (file_name if genome_name+walk_name). not universal..?
def get_genome_name(target_genomes,file_name):
for genome in target_genomes:
if genome in file_name:
return genome
return ""
# create a dictionnary with the length of the segments : s5->8
def get_segments_length(segments,verbose):
seg_len={}
total_lines=0
if verbose:
total_lines=len(["" for line in open(segments,"r")])
with open(segments,'r') as segments_file:
for line in tqdm(segments_file, desc="Computing the lengths of the segments", total=total_lines, unit=" segment", disable=not verbose):
line=line.split()
seg_id=line[1]
seg_len[seg_id]=len(line[2])
return seg_len
def get_target_genomes(args):
# get the target genomes if there is none specified
if len(args.target)==0:
walk_path=args.segment_coordinates_path.joinpath("walks.txt")
total_lines = len(["" for line in open(walk_path,"r")])
with open(walk_path,'r') as walks:
for line in tqdm(walks,desc="Fetching all the genomes in the graph for the transfer",total=total_lines,unit=" line",disable=not args.verbose):
genome_name=line.split()[1]
if (args.source_genome not in genome_name) and (genome_name not in args.target) and ("MINIGRAPH" not in genome_name):
args.target.append(genome_name)
if args.verbose:
print(f" Genomes found : {args.target}")
# get haplotypes for each target genome
if args.haplotype:
target_set=set()
for genome in args.target:
bed_files=list(args.segment_coordinates_path.glob(f"*{genome}*.bed"))
for haplotype in bed_files:
haplotype_split=haplotype.name.split("#")
target_set.add(haplotype_split[0]+"#"+haplotype_split[1])
args.target=list(target_set)
def run_intersect(args):
print("Computing the intersection between the annotations and the graph segments")
seg_coord_path=args.segment_coordinates_path
if args.haplotype:
if args.source_haplotype=='.':
files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
# select haplotype
source_haplotypes=[]
for file in files:
file_haplotype=file.name.split("#")[1]
source_haplotypes.append(file_haplotype)
first_source_haplo=min(source_haplotypes)
args.source_haplotype=first_source_haplo
haplo=f"#{first_source_haplo}#"
print('\033[35m'+f'Warning : No source haplotype was given with \'-sh\'. Haplotype {first_source_haplo} will be used.'+'\033[0m')
files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome haplotype in seg_coord
message=f'No bed files corresponding to the source genome haplotype {first_source_haplo} found in {seg_coord_path}'
else:
haplo = f"#{args.source_haplotype}#"
files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome in seg_coord
message=f'No bed files corresponding to the source genome haplotype found in {seg_coord_path}'
if len(files)==0:
sys.exit(message)
if args.verbose:
print(f' Found {len(files)} paths corresponding to the source genome haplotype {args.source_haplotype}')
else:
files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
message='\33[31m'+f'Error : No bed files corresponding to the source genome found in {seg_coord_path}. Please check that the source genome name is right.'+'\033[0m'
if len(files)==0:
sys.exit(message)
if args.verbose:
print(f' Found {len(files)} paths corresponding to the source genome')
intersect_path=args.outdir.joinpath("intersect")
command=f"echo -n '' > {intersect_path}" # empty the file "intersect"
subprocess.run(command,shell=True,timeout=None)
for file in files:
if args.verbose:
print(f' Building the intersection for the path {file.stem}')
command=f"bedtools intersect -nonamecheck -wo -a {file.as_posix()} -b {args.gff.as_posix()}>>{intersect_path}"
subprocess.run(command,shell=True,timeout=None)
intersect_lines = int(subprocess.Popen(f"wc -l {intersect_path}", shell=True, stdout=subprocess.PIPE).stdout.read().decode()[0])
if intersect_lines==0:
print('\33[31m'+"Error : No lines in the intersect. Please check that the sequence id in the GFF file (1st field) matches the sequence id in the GFA file (4th field of the W lines or 3rd part of the second field of the P lines)."+'\033[0m')
exit()
\ No newline at end of file
# handles all the transfer, variation, alignment (stats?), on the target genome.
def transfer_on_target(segments_file,genome_dir,target_genome,target_genome_paths,list_feat_absent,seg_size,args,segments_on_target_genome):
print(f' Generating the {target_genome} output')
stats=False
list_feature_to_transfer= Features.keys()
segments_list={} # list of segments usefull for the transfer. not all segments info will be loaded.
# create output files names
out_gff=genome_dir.joinpath(f'{target_genome}.gff')
out_var=genome_dir.joinpath(f'{target_genome}_var.txt')
out_aln=genome_dir.joinpath(f'{target_genome}_aln.txt')
# clear feature target path from the previous genome
for feature in Features.keys(): # empty the feature target paths for the next genome treated
Features[feature].segments_list_target=[]
for feat in tqdm(list_feature_to_transfer,desc=" Computing features paths in target genome",unit=" feature",disable=not args.verbose):
if Features[feat].parent=='':
add_target_genome_paths(feat,target_genome_paths,segments_on_target_genome)
if args.annotation:
with open(out_gff,'w') as file_out_gff:
reason_features_not_transfered=[0,0] # absent_features, low_cov_id
diff_size_transfered_features=[0,0] # [count,sum], to get the average
for feat in tqdm(list_feature_to_transfer,desc=f' Generating {target_genome} gff',unit=" feature",disable=not args.verbose):
feature=Features[feat]
if feature.parent=="": # usually a gene
for match in feature.segments_list_target: # compute cov and id for all matches.
if match[0]=='':
feature_target_path=[]
else:
feature_target_path=match[2]
match.append(get_id_cov(feat,seg_size,feature_target_path))
for match in feature.segments_list_target:
# if option no_dupl, only transfer the best match
if args.no_duplication:
# look for best match (best cov+id)
all_match=feature.segments_list_target
best_match=all_match[0]
for candidate_match in all_match:
candidate_cov=candidate_match[3][0]
candidate_id=candidate_match[3][1]
best_cov=best_match[3][0]
best_id=best_match[3][1]
if (candidate_cov+candidate_id)>(best_cov+best_id):
best_match=candidate_match
if match!=best_match: # if current match is not best match, dont transfer
match.append(False)
continue
transfer_stat=generate_target_gff(feat,seg_size,args,reason_features_not_transfered,match,file_out_gff,segments_on_target_genome) # the childs are handled in this function
if transfer_stat=="no":
list_feat_absent.append(feat)
else:
diff_size_transfered_features[0]+=1
diff_size_transfered_features[1]+=transfer_stat
if args.variation or args.alignment : # append dict of segments for which we may need the sequence
for feat in tqdm(list_feature_to_transfer,desc=" Fetching the sequence of the segments",unit=" feature",disable=not args.verbose):
list_seg=Features[feat].segments_list_source
if Features[feat].parent=="":
feature_target_path=[]
for occurence in Features[feat].segments_list_target: # add all the occurences of the feature in the target genome
feature_target_path+=occurence[2]
for segment in list_seg:
segments_list[segment[1:]]=''
for segment in feature_target_path:
segments_list[segment[1:]]=''
if not args.annotation:
# cov and id filter tests (if not done in annotation)
for feature_id in tqdm(list_feature_to_transfer,desc=" Computing the coverage and sequence id of the features before transfer",unit=' feature',disable=not args.verbose):
feature=Features[feature_id]
if feature.parent=="": # usually a gene
for match in feature.segments_list_target: # compute cov and id for all matches.
if match[0]=='':
feature_target_path=[]
else:
feature_target_path=match[2]
match.append(get_id_cov(feat,seg_size,feature_target_path))
for match in feature.segments_list_target:
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(match)
# add the filter info in all the matches...
[cov,id]=get_id_cov(feature_id,seg_size,feature_target_path)
if (cov*100<args.coverage) or (id*100<args.identity) or (first_seg==''): # didnt put the "right_size" filter)
match.append(False) # store the information that this gene copy didnt pass the filters
else:
# if option no_dupl, only transfer the best match
if args.no_duplication:
# look for best match (best cov+id)
all_match=feature.segments_list_target
best_match=all_match[0]
for candidate_match in all_match:
candidate_cov=candidate_match[3][0]
candidate_id=candidate_match[3][1]
best_cov=best_match[3][0]
best_id=best_match[3][1]
if (candidate_cov+candidate_id)>(best_cov+best_id):
best_match=candidate_match
if match!=best_match: # if current match is not best match, dont transfer
match.append(False)
continue
match.append(True)
if args.variation:
seg_seq=get_segments_sequence(segments_file,segments_list)
with open(out_var, 'w') as file_out_var:
file_out_var.write(f'# feature_id\tfeature_type\tsequence_id\ttarget_start_position\ttarget_stop_position\ttarget_feature_length\tinversion\tlength_difference\tvariation_type\tref_sequence\talt_sequence\tvariation_length\tvariation_position_on_source_feature\tvariation_position_on_target_feature\n')
for feat in tqdm(list_feature_to_transfer,desc=f' Generating {target_genome} genes variation details',unit=" feature",disable=not args.verbose):
feature=Features[feat]
if feature.parent=="": # usually a gene
for match in feature.segments_list_target: # for all occurences of the gene
generate_variations_details(feat,seg_seq,match,file_out_var,segments_on_target_genome)
if args.alignment:
if not args.variation:
seg_seq=get_segments_sequence(segments_file,segments_list)
with open(out_aln,'w') as file_out_aln:
line="Sequence alignment generated from feature path comparison in pangenome graph. Made with GrAnnoT v1.\n\n"
file_out_aln.write(line)
for feat in tqdm(list_feature_to_transfer,desc=f' Generating the {args.source_genome} features alignment with {target_genome} features',unit=" feature",disable=not args.verbose):
feature=Features[feat]
if feature.parent=="": # usually a gene
for match in feature.segments_list_target: # for all occurences of the gene
if match[4]==True:
print_alignment(feat,match,seg_seq,file_out_aln)
if stats:
# create objects for stats on how many segments are absent in target genome, their average length, etc
feature_missing_segments=[[],[],[],[],[],[],[]] # [feature_missing_first,feature_missing_middle,feature_missing_last,feature_missing_all,feature_missing_total,feature_total,feature_ok]
# the fist segment of the feature is missing - feature_missing_first
# the last segment of the feature is missing - feature_missing_last
# at least one middle segment of the feature is missing - feature_missing_middle
# the entire feature is missing - feature_missing_all
# at least one segment is missing first, last, or middle) - feature_missing_total
# no segment is missing, the feature is complete - feature_ok
# total number of features, with missing segments or not - feature_total
# for each feature, get list of the segments where it is and the first and last segment of the feature on the new genome
list_seg=Features[feat].segments_list_source
[first_seg,last_seg,walk,copy_id,feature_target_path]=get_featurePath_ends(Features[feat].segments_list_target[0])
for feat in list_feature_to_transfer:
stats_feature_missing_segment(feature_missing_segments,first_seg,last_seg,list_seg,feat,walk,segments_on_target_genome)
if args.annotation:
absent_features=reason_features_not_transfered[0];low_cov_id=reason_features_not_transfered[1]
print(len(Features)-(absent_features+low_cov_id),"out of",len(Features),"features are transfered.")
print(absent_features,"out of",len(Features),"features are not transfered because they are absent in the new genome.")
print(low_cov_id,"out of",len(Features),"features are not transfered because their coverage or sequence identity is below threshold.")
print("average length difference of the transfered genes : ",diff_size_transfered_features[1]/diff_size_transfered_features[0])
stats_features(feature_missing_segments)
#clear segment info for next transfer
segments_on_target_genome.clear() # empty dict for the next genome treated
\ No newline at end of file
from .load_intersect import Features,get_feature_start_on_segment,get_feature_stop_on_segment
from .Functions import segments_on_target_genome
from .intersect import Features,get_feature_start_on_segment,get_feature_stop_on_segment
def get_seg_occ(seg,walk,feat,copy):
def get_seg_occ(seg,walk,feat,copy,segments_on_target_genome):
seg_in_walk=segments_on_target_genome[seg][walk]
for seg_occurence in seg_in_walk:
for feat_copy in seg_occurence[5:]:
......@@ -15,26 +14,78 @@ def get_seg_occ(seg,walk,feat,copy):
exit()
# get the start position of the features on the linear target genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_start_on_target_genome(start_seg,feat_id,walk,copy_id):
seg_start_pos=get_seg_occ(start_seg,walk,feat_id,copy_id)[1]
def get_feature_start_on_target_genome(start_seg,feat_id,walk,copy_id,segments_on_target_genome):
seg_start_pos=get_seg_occ(start_seg,walk,feat_id,copy_id,segments_on_target_genome)[1]
feat_start_pos=get_feature_start_on_segment(start_seg,feat_id)
return seg_start_pos+feat_start_pos-1
# get the stop position of the features on the linear target genome, using their coordinates on the graph and the coordinantes of the segments on the genome
def get_feature_stop_on_target_genome(stop_seg,feat_id,walk,copy_id):
seg_start_pos=get_seg_occ(stop_seg,walk,feat_id,copy_id)[1]
def get_feature_stop_on_target_genome(stop_seg,feat_id,walk,copy_id,segments_on_target_genome):
seg_start_pos=get_seg_occ(stop_seg,walk,feat_id,copy_id,segments_on_target_genome)[1]
feat_stop_pos=get_feature_stop_on_segment(stop_seg,feat_id)
return seg_start_pos+feat_stop_pos-1
# get the start position of the features on the linear target genome for inverted features
def get_feature_start_on_target_genome_inv(start_seg,feat_id,walk,copy_id):
seg_end_pos=get_seg_occ(start_seg,walk,feat_id,copy_id)[2]
def get_feature_start_on_target_genome_inv(start_seg,feat_id,walk,copy_id,segments_on_target_genome):
seg_end_pos=get_seg_occ(start_seg,walk,feat_id,copy_id,segments_on_target_genome)[2]
feat_start_pos=get_feature_start_on_segment(start_seg,feat_id)
return seg_end_pos-feat_start_pos+1
# get the stop position of the features on the linear target genome for inverted features
def get_feature_stop_on_target_genome_inv(stop_seg,feat_id,walk,copy_id):
seg_end_pos=get_seg_occ(stop_seg,walk,feat_id,copy_id)[2]
def get_feature_stop_on_target_genome_inv(stop_seg,feat_id,walk,copy_id,segments_on_target_genome):
seg_end_pos=get_seg_occ(stop_seg,walk,feat_id,copy_id,segments_on_target_genome)[2]
feat_stop_pos=get_feature_stop_on_segment(stop_seg,feat_id)
return seg_end_pos-feat_stop_pos+1
\ No newline at end of file
return seg_end_pos-feat_stop_pos+1
# gives the first and last segments of a walk (list)
def get_featurePath_ends(walk_info): # walk_info = [walk_name,copy_id,[list_seg]]
walk_name=walk_info[0]
if walk_name!='':
seg_list=walk_info[2]
copy_id=walk_info[1]
[first_seg,last_seg]=[seg_list[0],seg_list[-1]]
else:
[first_seg,last_seg,walk_name,copy_id,seg_list]=['','','','',[]]
return [first_seg,last_seg,walk_name,copy_id,seg_list]
# invert the given strand
def invert_strand(strand):
match strand:
case "+":
return "-"
case "-":
return "+"
case ">":
return "<"
case "<":
return ">"
case default:
return ""
# outputs the sequence of an oriented segment
def get_segment_sequence(seg_seq,segment):
if segment[0]==">":
return seg_seq[segment[1:]]
else:
return reverse_complement(seg_seq[segment[1:]])
# outputs the reverse complement of a sequence
def reverse_complement(sequence):
sequence_rc=""
for char in sequence:
sequence_rc+=complement(char)
return sequence_rc[::-1]
# outputs the reverse complement of a nucleotide
def complement(nucl):
match nucl:
case "A":
return "T"
case "C":
return "G"
case "G":
return "C"
case "T":
return "A"
return nucl
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment