from Graph_gff import Features,load_intersect from Functions import get_segment_sequence,convert_strand target_genome_name="genome2_chr10" intersect_path='intersect.bed' load_intersect(intersect_path) pos_seg="genome2_chr10.bed" var=open("genome2_chr10_variations.txt",'r') gfa="graph.gfa" def get_segments_sequence_and_paths(gfa): file_gfa=open(gfa,'r') lines_gfa=file_gfa.readlines() file_gfa.close() seg_seq={} paths={} for line in lines_gfa: line=line.split() if (line[0]=="S"): # get the sequence of the segment seg_id='s'+line[1] seg_seq[seg_id]=line[2] if (line[0]=="W") & (line[1]!="_MINIGRAPH_"): # get the walk of the genome path=line[6].replace(">",";>") path=path.replace("<",";<").split(';') list_path=[] for segment in path: if segment[0:1]=='>': list_path.append('+s'+segment[1:]) elif segment[0:1]=='<': list_path.append('-s'+segment[1:]) paths[line[3]]=list_path return [paths,seg_seq] [paths,seg_seq]=get_segments_sequence_and_paths(gfa) segments_on_target_genome={} def get_segments_positions_on_genome(pos_seg): bed=open(pos_seg,'r') lines=bed.readlines() # read line by line ? bed.close() for line in lines: line=line.split() [seg,chrom,start,stop,strand]=[line[3][1:],line[0],line[1],line[2],line[3][0:1]] segments_on_target_genome[seg]=[chrom,start,stop,strand] get_segments_positions_on_genome(pos_seg) def add_feature_sequence(feature,seg_seq): feature_sequence="" for segment in feature.segments_list: if segment==feature.segments_list[0]: feature_sequence+=get_segment_sequence(seg_seq,segment)[feature.pos_start-2:] # revérifier les +/- 1 pour la position, avec de vraies données elif segment==feature.segments_list[-1]: feature_sequence+=get_segment_sequence(seg_seq,segment)[0:feature.pos_stop] # revérifier les +/- 1 pour la position, avec de vraies données else: feature_sequence+=get_segment_sequence(seg_seq,segment) feature.sequence=feature_sequence def get_first_seg(list_seg,segments_on_target_genome): first_seg_found='' for segment in list_seg: if segment[1:] in segments_on_target_genome: first_seg_found=segment[1:] break return first_seg_found def get_feature_path(paths,first_seg,last_seg): # find the path in the target genome first_strand=convert_strand(segments_on_target_genome[first_seg][3]) first_seg_stranded=first_strand+first_seg last_strand=convert_strand(segments_on_target_genome[last_seg][3]) last_seg_stranded=last_strand+last_seg index_first_seg=int(paths[target_genome_name].index(first_seg_stranded)) index_last_seg=int(paths[target_genome_name].index(last_seg_stranded)) first_index=min(index_first_seg,index_last_seg) last_index=max(index_last_seg,index_first_seg) list_segfeat_azu=paths[target_genome_name][first_index:last_index+1] list_segfeat_azu_corrected=[convert_strand(segment_stranded[0])+segment_stranded[1:] for segment_stranded in list_segfeat_azu] return list_segfeat_azu_corrected def get_rna(dna_sequence): return dna_sequence.replace("T","U") # penser à transcrire la séquence codante du gène !! def get_aa(codon): match codon[0:2]: case "UU": if (codon[2]=="U") | (codon[2]=="C"): return "Phe" else: return "Leu" case "UC": return "Ser" case "UA": if (codon[2]=="U") | (codon[2]=="C"): return "Tyr" else: return "*" case "UG": if (codon[2]=="U") | (codon[2]=="C"): return "Cys" elif codon[2]=="A": return "*" else: return "Trp" case "CU": return "Leu" case "CC": return "Pro" case "CA": if (codon[2]=="U") | (codon[2]=="C"): return "His" else: return "Gln" case "CG": return "Arg" case "AU": if codon[2]=="G": return "Met" else: return "Ile" case "AC": return "Thr" case "AA": if (codon[2]=="U") | (codon[2]=="C"): return "Asn" else: return "Lys" case "AG": if (codon[2]=="U") | (codon[2]=="C"): return "Ser" else: return "Arg" case "GU": return "Val" case "GC": return "Ala" case "GA": if (codon[2]=="U") | (codon[2]=="C"): return "Asp" else: return "Glu" case "GG": return "Gly" def traduction(sequence_arn): list_codons=decoupe_codon(sequence_arn) prot=list() for codon in list_codons: prot.append(get_aa(codon)) return prot from textwrap import wrap def decoupe_codon(sequence): return wrap(sequence,3) def get_sequence_before(first_seg,seg_seq,n,paths,feat): first_strand=convert_strand(first_seg[0]) first_seg_stranded=first_strand+first_seg[1:] index_first_seg=int(paths[target_genome_name].index(first_seg_stranded)) sequence_before=seg_seq[first_seg[1:]][0:feat.pos_start-1] # sequence left on the segment on which the cds start (can be empty) current_index=index_first_seg-1 while (len(sequence_before)<n) & (current_index>=0): segment=paths[target_genome_name][current_index] sequence_before=seg_seq[segment[1:]]+sequence_before current_index-=1 return sequence_before[0:99] def get_sequence_after(last_seg,seg_seq,n,paths,feat): last_strand=convert_strand(last_seg[0]) last_seg_stranded=last_strand+last_seg[1:] index_last_seg=int(paths[target_genome_name].index(last_seg_stranded)) sequence_after=seg_seq[last_seg[1:]][feat.pos_stop:] # sequence left on the segment on which the cds ends (can be empty) current_index=index_last_seg+1 while (len(sequence_after)<n) & (current_index>len(paths[target_genome_name])): segment=paths[target_genome_name][current_index] sequence_after=sequence_after+seg_seq[segment[1:]] current_index+=1 return sequence_after[len(sequence_after)-100:] ''' first_strand=convert_strand(segments_on_target_genome[first_seg][3]) first_seg_stranded=first_strand+first_seg last_strand=convert_strand(segments_on_target_genome[last_seg][3]) last_seg_stranded=last_strand+last_seg index_first_seg=int(paths[target_genome_name].index(first_seg_stranded)) index_last_seg=int(paths[target_genome_name].index(last_seg_stranded)) first_index=min(index_first_seg,index_last_seg) last_index=max(index_last_seg,index_first_seg) list_segfeat_azu=paths[target_genome_name][first_index:last_index+1] list_segfeat_azu_corrected=[convert_strand(segment_stranded[0])+segment_stranded[1:] for segment_stranded in list_segfeat_azu] ''' def get_sequence_on_genome(feature,segments_on_target_genome): list_seg=Features[feature].segments_list first_seg=get_first_seg(list_seg,segments_on_target_genome) last_seg=get_first_seg(reversed(list_seg),segments_on_target_genome) path_on_target=get_feature_path(paths,first_seg,last_seg) new_sequence="" for segment in path_on_target: if segment==cds.segments_list[0]: new_sequence+=get_segment_sequence(seg_seq,segment)[cds.pos_start-2:] elif segment==cds.segments_list[-1]: new_sequence+=get_segment_sequence(seg_seq,segment)[0:cds.pos_stop] else: new_sequence+=get_segment_sequence(seg_seq,segment) return new_sequence lines=var.readlines() var.close() # dict cds-var cds_var={} for line in lines: line=line.split() if line[1]=="CDS": cds_id=line[0].replace('.','_').replace(':','_') if cds_id not in cds_var.keys(): cds_var[cds_id]=list() cds_var[cds_id].append(line) def findOtherStart(cds,segments_on_target_genome): print("\nrecherche de nouveau codon start:") frame_shift=0 # chercher codon start en amont du cds, dans le mrna seq_parent=get_sequence_on_genome(cds.parent,segments_on_target_genome) seq_cds=get_sequence_on_genome(cds_id,segments_on_target_genome) sequence_amont=seq_parent[0:seq_parent.rfind(seq_cds)] # get the mrna sequence before the last occurence of the cds sequence print("sequence en amont dans le mRNA:",sequence_amont) if "ATG" in sequence_amont: start_pos_list=[m.start() for m in re.finditer('(?=ATG)', sequence_amont)] stop_pos_list=[m.start() for m in re.finditer('(?=TAG|TAA|TGA)', sequence_amont)] print("position des starts:",start_pos_list,"position des stops:",stop_pos_list,"en amont du cds") # positions (overlapping) où on trouve un atg. # vérifier ensuite s'il y a un stop après un atg, et sinon le cadre de lecture de l'atg (peut décaler toute la prot !) print("verification des stops après les starts:") for start_pos in start_pos_list: start_pos_frame=start_pos%3 n=len(sequence_amont)-start_pos+1 if True not in ( (stop_pos%3==start_pos_frame) & (stop_pos>start_pos) for stop_pos in stop_pos_list) : #print("codon start candidat trouvé dans l'arn messager,",n,"bases en amont du cds") # calculer le décalage : si on en trouve un 2 bases en amont, ça décale le cadre de lecture ! frame_shift=(frame_shift+n)%3 # vérifier le frame shift !! print("le start à la position",start_pos,",",n,"bases en amont du cds, n'a pas de stop en aval dans le même cadre de lecture") else: print("le start à la position",start_pos,",",n,"bases en amont du cds, a un stop en aval dans le même cadre de lecture") # chercher codon start en aval, dans le cds if "ATG" in seq_cds: start_pos_list=[m.start() for m in re.finditer('(?=ATG)', seq_cds)] print("codon start candidat trouvé plus loin dans le cds, à la base",start_pos_list[0]) # print seulement le premier print("\n") return frame_shift version="new" import re for feature in Features.values(): add_feature_sequence(feature,seg_seq) for cds_id in cds_var.keys(): cds=Features[cds_id] print("analyse des variations dans le cds",cds_id) frame_shift=0 for var in cds_var[cds_id]: type_var=var[8] if type_var!="no_var": # if there is a variation posVar_on_ref=int(var[12]) posVar_on_alt=int(var[13]) sequence_target=get_sequence_on_genome(cds_id,segments_on_target_genome) if type_var=="insertion": length_ref=0 else: length_ref=len(var[9]) if type_var=="deletion": length_alt=0 else: length_alt=len(var[10]) print(var) if abs(length_alt-length_ref)%3 == 0: # taille diff 3k -> pas de frame shift. if (posVar_on_ref)%3==0: # taille diff 3k, position 3k print("variation entre deux codons sans décalage du cadre de lecture") if type_var=="insertion": print(type_var,"de",var[10]) elif type_var=="deletion": print(type_var,"de",var[9]) else: print(type_var,"de",var[9],"par",var[10]) len_fragment_after=(3-length_ref)%3 deleted_aa=traduction(get_rna(cds.sequence[posVar_on_ref:posVar_on_ref+length_ref+len_fragment_after])) inserted_aa=traduction(get_rna(sequence_target[posVar_on_alt:posVar_on_alt+length_alt+len_fragment_after])) if (length_ref!=0) & (length_alt!=0): if deleted_aa!=inserted_aa: print("conséquence : changement de",",".join(deleted_aa),"en",",".join(inserted_aa)) else: print("conséquence : mutation synonyme dans",",".join(deleted_aa)) elif length_alt!=0: print("conséquence : insertion de",",".join(inserted_aa)) else: print("conséquence : deletion de",",".join(deleted_aa)) else: # taille diff 3k, position !=3k print("variation au milieu d'un codon sans décalage du cadre de lecture") if type_var=="insertion": print(type_var,"de",var[10]) elif type_var=="deletion": print(type_var,"de",var[9]) else: print(type_var,"de",var[9],"par",var[10]) len_fragment_before=(posVar_on_ref)%3 len_fragment_after=(3-(len_fragment_before+length_ref))%3 total_ins=sequence_target[posVar_on_alt-len_fragment_before:posVar_on_alt+length_alt+len_fragment_after] total_del=cds.sequence[posVar_on_ref-len_fragment_before:posVar_on_ref+length_ref+len_fragment_after] deleted_aa=traduction(get_rna(total_del)) inserted_aa=traduction(get_rna(total_ins)) if deleted_aa!=inserted_aa: print("conséquence : changement de",",".join(deleted_aa),"en",",".join(inserted_aa)) else: print("conséquence : mutation synonyme dans",",".join(deleted_aa)) # possibilité que j'ai print en compte une variation de trop, si on a un snp sur la premiere et la derniere base d'un codon : # pour le traitement de la première j'ai également considéré la dernière ! else: # taille diff !=3k print("changement du cadre de lecture") old_frameshift=frame_shift frame_shift=(frame_shift+length_ref-length_alt)%3 # frameshift=0 -> cadre de lecture rétabli. peut nécessiter d'aller chercher une base en amont. # frameshift=1 -> cadre de lecture décalé de 1 base vers la droite # frameshift=2 -> cadre de lecture décalé de 2 bases vers la droite if old_frameshift==0: print("perte du cadre de lecture originel") elif frame_shift==0: print("rétablissement du cadre de lecture originel") if type_var=="insertion": print(type_var,"de",var[10]) elif type_var=="deletion": print(type_var,"de",var[9]) else: print(type_var,"de",var[9],"par",var[10]) print(frame_shift) if posVar_on_ref<3: print("codon start touché") findOtherStart(cds,segments_on_target_genome) # si on a plusieurs start candidats il faut choisir celui qu'on prend, ils sont pas forcément tous sur le même cadre de lecture # comment traiter la variation sur le start ? # après avoir détecté que la var touche le start, traiter la var ? ou l'inverse print("\n")