You need to sign in or sign up before continuing.
Newer
Older

nina.marthe_ird.fr
committed
import random
import string
# create global variables to manipulate them in the functions below and to pass them to Functions.py
global Features
global Segments
Segments={}
Features={}
# get the inverted segment
def invert_seg(seg):
if seg[0]==">":
inv_seg="<"+seg[1:]
elif seg[0]=="<":
inv_seg=">"+seg[1:]
else:
print(seg," not invertable")
return seg
return inv_seg
# look for a segment in the dict Segments, in one orientation or the other
def search_segment(segment):
if segment not in Segments:
if invert_seg(segment) in Segments:
return invert_seg(segment)
return False
return segment
# functions to create the segments and the features
# create a segment with its first feature

nina.marthe_ird.fr
committed
def init_seg(line,segment_id,feat_id,strand):
[chr,start,stop]=[line[0],int(line[1])+1,int(line[2])] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
# create the segment, store it in the Segments dict

nina.marthe_ird.fr
committed
Segments[segment_id]=Segment(segment_id,feat_id,chr,start,stop,strand)
# create a feature with the first segment its on
def init_feature(line,feature_id,segment_oriented):
[type,annot,chr,start,stop,strand]=[line[6],line[12],line[4],int(line[7]),int(line[8]),line[10]]
if feature_id in Features: # if the feature has already been created (in order to store the child), get the child and rewrite it
childs=Features[feature_id].childs
else:
childs=[]
parent=get_parent(annot,feature_id)
# add the current segment to the list of segments that have the feature
segments_list=[segment_oriented]
# create the feature, store it in the dict Features

nina.marthe_ird.fr
committed
Features[feature_id]=Feature(feature_id,type,chr,start,stop,annot,childs,parent,segments_list,strand,True,False)
# if there is a parent, returns the id of the parent feature and add the child to the parent
def get_parent(annot,feature_id): # check how the parent info is stored in the gff format in general (this is a particular case...) todo
if (len(annot.split(";"))>1) and (annot.split(";")[1].split("=")[0]=="Parent"):
# for annotations that look like : ID=LOC_Os01g01010.1:exon_7;Parent=LOC_Os01g01010.1, where the parent is in the field 1
parent=annot.split(";")[1].split("=")[1].replace(".","_").replace(":","_")
add_child(parent,feature_id)
elif (len(annot.split(";"))>2) and (annot.split(";")[2].split("=")[0]=="Parent"):
# for annotations that look like : ID=LOC_Os01g01010.1;Name=LOC_Os01g01010.1;Parent=LOC_Os01g01010, where the parent is in the field 2
parent=annot.split(";")[2].split("=")[1].replace(".","_").replace(":","_")
add_child(parent,feature_id)
else: parent=""
return parent
# add a feature to an existing segment

nina.marthe_ird.fr
committed
def add_feature(line,seg,new_feat_id,new_strand):
segment=Segments[seg]
if not segment.find_feat(new_feat_id)[0]:
[chr,start,stop]=[line[0],int(line[1])+1,int(line[2])] # +1 in the start to convert the bed 0-based coordinate to a 1-based system
segment.add_feature(new_feat_id,chr,start,stop,new_strand)
# add a child feature to an existing feature
def add_child(feat,new_child):
if feat in Features: # if the feature exists, add new_child
Features[feat].childs.append(new_child)
else: # create the feature "empty", only with child

nina.marthe_ird.fr
committed
Features[feat]=Feature(feat,"","",0,0,"",[new_child],"",[],"",False,False)
# add a segment to an existing feature
def add_seg(feat_id,new_seg_oriented):

nina.marthe_ird.fr
committed
# if new_seg_oriented in Features[feat_id].segments_list_source:
# print("seg already in feat, duplicate segment ! feature:",feat_id,"segment:",new_seg_oriented)
# print("list for now :",Features[feat_id].segments_list_source)
Features[feat_id].segments_list_source.append(new_seg_oriented)
# add the position of the feature on its first and last segment

nina.marthe_ird.fr
committed
def add_pos(feature_id):
feature=Features[feature_id]
feature.pos_start=get_feature_start_on_segment(feature.segments_list_source[0],feature_id)
feature.pos_stop=get_feature_stop_on_segment(feature.segments_list_source[-1],feature_id)
# handle the Feature object creation
def load_feature_intersect(line,feature_id,seg_oriented):
if (feature_id not in Features) or (not Features[feature_id].complete): # if the feature doesn't exist or is not complete, create it or complete it
init_feature(line,feature_id,seg_oriented)
else: # if it exists, add the current segment to the list of segments that have the existing feature

nina.marthe_ird.fr
committed
# if the feature existing starts and stops at the same positions as the current feature, add_seg
current_start=int(line[7])
current_stop=int(line[8])
feature=Features[feature_id]
if (current_start==feature.start) and (current_stop==feature.stop):
add_seg(feature_id,seg_oriented)
else: # same feature_id, but not same part of the feature -> need to store them in two separate objects
add_segment_to_discontinuous_feature(line,feature,seg_oriented,current_start,current_stop)
def add_segment_to_discontinuous_feature(line,feature,seg_oriented,current_start,current_stop):
added=False
for other_part_id in feature.other_parts_list:
other_part=Features[other_part_id]
if (other_part.start==current_start) and (other_part.stop==current_stop): # look for the current part of the feature to add the segment
add_seg(other_part_id,seg_oriented)
added=True

nina.marthe_ird.fr
committed
break

nina.marthe_ird.fr
committed
if added==False : # if the right part of the feature was not found, create a new part for the feature
new_part_name=feature.id+''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(7))
[chr,start,stop]=[line[4],int(line[7]),int(line[8])]
segments_list=[seg_oriented]
Features[new_part_name]=Feature(feature.id,feature.type,chr,start,stop,feature.annot,feature.childs,feature.parent,segments_list,feature.strand,True,True)
new_part_feature=Features[new_part_name]
new_part_feature.first=False
new_part_feature.first_part=feature.id
feature.other_parts_list.append(new_part_name)
feature.discontinuous=True
if feature.parent in Features:
add_child(feature.parent,new_part_name)
# handle the Segment object creation

nina.marthe_ird.fr
committed
def load_segment_intersect(line,segment_id,feat_id,strand):
if segment_id not in Segments: # if the segment doesn't exist, create it and add the current feature to its feat list

nina.marthe_ird.fr
committed
init_seg(line,segment_id,feat_id,strand)
else: # if it exists, add the current feature to the list of features on the existing segment

nina.marthe_ird.fr
committed
add_feature(line,segment_id,feat_id,strand)
# create a note for the child features that do not have annotation.
def set_note(feature_id): # not universal, depends on the format of the input gff...
# the note contains information on the function of the feature and is used for statistics on hypothetical/putatives features.
# in the gff, the notes are only on the "gene" features. it's easier to have it for the childs than to check the parent's note (or the parent's parent).
feature=Features[feature_id]
note=""
parent_feat=find_parent(feature_id)
if parent_feat!='':
annot=Features[parent_feat].annot.split(';')
if (len(annot)>0) and (annot[-1].split('=')[0]=="Note"):
note=annot[-1].split('=')[1]
feature.note=note
# create all the Segment and Feature objects in the dictionnaries Segments and Features
def load_intersect(intersect_path,verbose):
if not verbose:
print("Loading the intersection information")
# open the file with the intersect between the segments and the gff
total_lines=0
if verbose:
total_lines = len(["" for line in open(intersect_path,"r")])
with open(intersect_path,"r") as intersect_file:
for line in tqdm(intersect_file, desc="Loading the intersection information", total=total_lines, unit=" line",disable=not verbose):
line=line.split()
# get the ids for the dictionnaries' keys
feature_id=line[12].split(';')[0].split("=")[1].replace(".","_").replace(":","_")
segment_id=line[3]
strand=line[10]
segment_oriented=line[3]
load_feature_intersect(line,feature_id,segment_oriented)

nina.marthe_ird.fr
committed
load_segment_intersect(line,segment_id,feature_id,strand)
# for all the features, add the position of the feature on its first and last segment, and the note.
# cant do it before because for that i need to have all the segments in the list segments_list for each feature.
for feat_id in Features:

nina.marthe_ird.fr
committed
if Features[feat_id].complete==False:
print('***',feat_id,Features[feat_id])
add_pos(feat_id)
set_note(feat_id)
# order the dictionnary
order_dict(Features)
# orders the features so that each features is always preceded by its parent feature (gene -> mrna -> exons, cds, utr). useful for the transfer later.
def order_dict(features_dict):
copy_Features=dict(features_dict) # stores the features in a copy
features_dict.clear()# empty dict, to refill it with the features in the right orcer
for feature_id in copy_Features:
add_feature_dict(feature_id,copy_Features,features_dict)
# add a feature to the ordered dictionnary
def add_feature_dict(feature_id,old_dict_features,new_dict_features):
if feature_id not in new_dict_features:
feature=old_dict_features[feature_id]
# check that the parent is present
if feature.parent!='': # recursively find the parent (most of the time a gene)
add_feature_dict(feature.parent,old_dict_features,new_dict_features)

nina.marthe_ird.fr
committed
# if the feature is discontinuous, check that the other parts are present :
if feature.discontinuous and feature.first:
for part_id in feature.other_parts_list:
add_feature_dict(feature.parent,old_dict_features,new_dict_features)
# once the parent is present and the other parts are added, add the feature
new_dict_features[feature_id]=feature
# find the gene parent of a feature
def find_parent(feature_id): # add a check if there is no parent !
feature=Features[feature_id]
if feature.parent=='': # no parent, usually a gene
return feature.id
else:
current=feature.parent
parent_found=False
while parent_found==False:
if Features[current].parent=='': # current doesnt have a parent
return current
else: # if we didn't find the parent, we go up to the current feature's parent until we find it
current=Features[current].parent
# functions to generate the graph's gff from the segments and features created with create_seg_feat
# get the feature's start position on the segment (among segments on the source genome)
def get_feature_start_on_segment(seg_id,feat_id):
s=Segments[search_segment(seg_id)]

nina.marthe_ird.fr
committed
f=Features[find_part_segment_source(s.id,feat_id)]
if s.get_start(f.id)>=f.start:

nina.marthe_ird.fr
committed
result=f.start-s.get_start(f.id)+1
return result
# get the feature's stop position on the segment (among segments on the source genome)
def get_feature_stop_on_segment(seg_id,feat_id):
s=Segments[search_segment(seg_id)]

nina.marthe_ird.fr
committed
f=Features[find_part_segment_source(s.id,feat_id)]
if s.get_stop(f.id)<=f.stop:
result=s.get_size(f.id)

nina.marthe_ird.fr
committed
result=f.stop-s.get_start(f.id)+1
return result
# generates the annotation for the gff of the graph, with the rank of the segment in the feature

nina.marthe_ird.fr
committed
def make_annot_graph(feature_id,segment):
# get the rank and the total number of ocurrences for the feature

nina.marthe_ird.fr
committed
feature=Features[find_part_segment_source(segment,feature_id)]
rank=str(feature.segments_list_source.index(segment)+1)
total=str(len(feature.segments_list_source))
# create the annotation with the rank information
annotation=feature.annot+";Rank_occurrence="+rank+";Total_occurrences="+total
return annotation

nina.marthe_ird.fr
committed
def find_part_segment_source(segment,feature_id):
first_feature=Features[feature_id]
if segment in first_feature.segments_list_source:
return feature_id
else:
list_parts=first_feature.other_parts_list
for part in list_parts:
if segment in Features[part].segments_list_source:
return part
# create the line for the gff of the graph
def create_line_graph_gff(feature_stranded,segment):
[strand,feature_id]=[feature_stranded[0:1],feature_stranded[1:]]

nina.marthe_ird.fr
committed
annotation=make_annot_graph(feature_id,segment)
type=Features[feature_id].type
start=get_feature_start_on_segment(segment,feature_id)
stop=get_feature_stop_on_segment(segment,feature_id)
line=segment+"\tGrAnnot\t"+type+"\t"+str(start)+"\t"+str(stop)+"\t.\t"+strand+"\t.\t"+annotation+"\n"
return line
# go through all the segments in Segments and prints the gff of the graph, with one line per segment/feature intersection
def graph_gff(graph_gff_path,verbose):
if not verbose:
print("Generation of the graph's gff")
with open(graph_gff_path,'w') as graph_gff_file:
for segment_stranded in tqdm(Segments,desc="Generation of the graph's gff",unit=" segment",disable=not verbose):

nina.marthe_ird.fr
committed
features_on_seg=Segments[segment_stranded].feature_list # get the list of the features on the segment
for feature_tuple in features_on_seg: # go through all the segment's features, and print the gff line for each
feature_id=feature_tuple[0]
strand=Segments[segment_stranded].get_strand(feature_id)
feature_stranded=strand+feature_id
line = create_line_graph_gff(feature_stranded,segment_stranded)
graph_gff_file.write(line)
# go through all the features in Features and print the gaf of the graph, with one line per feature
def graph_gaf(graph_gaf_path,seg_len,verbose):
if not verbose:
print("Generation of the graph's gaf")
with open(graph_gaf_path,'w') as graph_gaf_file:
for feature_id in tqdm(Features,desc="Generation of the graph's gaf",unit=" feature",disable=not verbose):
feature=Features[feature_id]
feature_segments=feature.segments_list_source
feature_path=""
path_length=0
for segment in feature_segments:
feature_path+=segment
path_length+=seg_len[segment[1:]]
strand=feature.strand
size=feature.size
start_on_path=feature.pos_start-1 # -1 to get the position 0-based bed-like
stop_on_path=start_on_path+size
line=f'{feature.id}\t{size}\t0\t{size}\t{strand}\t{feature_path}\t{path_length}\t{start_on_path}\t{stop_on_path}\t{size}\t{size}\t255\n'
graph_gaf_file.write(line)
# creates a dictionnary with the length of the segments : s5->8
def get_segments_length(segments,verbose):
total_lines=0
if verbose:
total_lines=len(["" for line in open(segments,"r")])
with open(segments,'r') as segments_file:
for line in tqdm(segments_file, desc="Computing the lengths of the segments", total=total_lines, unit=" segment", disable=not verbose):