import argparse import os import subprocess from .getSegmentsCoordinates import seg_coord from pathlib import Path import sys def dir_path(path): if os.path.isdir(path): return path else: raise argparse.ArgumentTypeError(f"{path} is not a valid path") def range_type(astr, min=0, max=100): value = int(astr) if min<= value <= max: return value else: raise argparse.ArgumentTypeError('value not in range %s-%s'%(min,max)) def arg(): parser = argparse.ArgumentParser(prog='grannot',description='Annotation transfer between genome and pangenome graph.',usage='%(prog)s graph.gfa annotation.gff source_genome [options]',add_help=False) graph = parser.add_argument_group('Graph Annotation Transfer') genome = parser.add_argument_group('Genome Annotation Transfer') general = parser.add_argument_group('General options') # input files parser.add_argument('graph', metavar='graph.gfa', type=argparse.FileType('r'), help='pangenome graph file in GFA format containing the genome that the annotation refers to and the target genomes. the GFA file must have W-lines to describe the genomes walks in the graph, and not P-lines') parser.add_argument('gff', metavar='annotation.gff', type=argparse.FileType('r') ,help='annotation file in GFF format containing the annotations to be transfered') # optionnal input file general.add_argument('-coord', '--segment_coordinates_path', metavar="path/to/files", type=dir_path, default=".", help="recommended if the program has already been run with the same genomes; takes the path to the files given by the preprocessing of the graph; if not given the program will do the preprocessing") # genome names parser.add_argument('source_genome', metavar="source_genome", type=str, help="name of the annotated genome's path in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line), that the annotation file refers to") general.add_argument('-sh','--source_haplotype',type=str,help="haplotype of the source genome that the annotation file reffers to; by default the first haplotype will be selected; requires the -ht/--haplotype option") genome.add_argument('-t', '--target', metavar="target_genome1 target_genome2", nargs='*', type=str, default=[], help="name of the target genomes' paths in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line) for the annotation; if the option is not used the program will transfer the annotation on all the genomes in the graph") # graph annotation options graph.add_argument('-gff','--graph_gff',help="output the annotation of the graph in GFF format (not recommended)",action='store_true') graph.add_argument('-gaf','--graph_gaf',help="output the annotation of the graph in GAF format",action='store_true') # genome annotation options genome.add_argument('-ann','--annotation',help="output the annotation transfer on linear genomes in GFF format",action='store_true') genome.add_argument('-var','--variation',help="output the detailled variations for the transfered features",action='store_true') genome.add_argument('-aln','--alignment',help="output the alignment of the transfered features",action='store_true') genome.add_argument('-pav','--pav_matrix',help="output a presence-absence variation matrix to recapitulate the annotation transfers; requires the option -ann/--annotation",action='store_true') # general options general.add_argument('-o','--outdir',metavar="outdir",help="directory so store the output files; if not given, the output will go in the current directory",type=str,default='.') general.add_argument('-v','--verbose',action='store_true',help="write more information about the program's operations as it runs",default=False) # annotation transfer options genome.add_argument('-id', '--identity', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum identity percentage required for the feature to be transfered, integer between 0 and 100; default is 80; requires at least one of the following options : --ann, --aln, --var") genome.add_argument('-cov', '--coverage', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum coverage percentage required for the feature to be transfered, integer between 0 and 100; default is 80; requires at least one of the following options : --ann, --aln, --var") genome.add_argument('-ht','--haplotype',action="store_true",help="use if there are several haplotypes in the graph; default is false") # version general.add_argument('-V','--version', action='version', version='GrAnnoT 1.0.1') general.add_argument('-h','--help', action="help", help="show this help message and exit") if len(sys.argv)==1: print('Annotation transfer between genome and pangenome graph.',file=sys.stderr) parser.print_usage(sys.stderr) print('Type \'grannot --help\' for more help', file=sys.stderr) sys.exit(1) return parser.parse_args() def read_args(args): # check the arguments if not (args.graph_gaf or args.graph_gff or args.annotation or args.variation or args.alignment): # todo : add pav to this list later sys.exit(f"grannot: error: no operation option was given; please include at least one of the following options : -gff -gaf -ann -aln -var") if (args.source_haplotype) and (not args.haplotype): sys.exit(f"grannot: error: the option -ht/--haplotype is required for the option -sh/--source_haplotype") if (args.pav_matrix) and (not args.annotation): sys.exit(f"grannot: error: the option -ann/--annotation is required for the option -pav/--pav_matrix") if (args.coverage) and (not (args.annotation or args.variation or args.alignment)): sys.exit(f"grannot: error: one of the following options is required for the option -cov/--coverage : -ann/--annotation, -aln/--alignment, -var/--variation") if (args.identity) and (not (args.annotation or args.variation or args.alignment)): sys.exit(f"grannot: error: one of the following options is required for the option -id/--identity : -ann/--annotation, -aln/--alignment, -var/--variation") # todo : adapt pav to have the same requirements as cov and id, or no requirement at all. change in grannot_help.md # check bedtools installation bedtools_test=subprocess.run("bedtools --version",shell=True,check=False,stdout=subprocess.DEVNULL,stderr=sys.stderr) if int(bedtools_test.returncode)>0: raise SystemExit('bedtools not installed') # create outdir args.outdir=Path(args.outdir).resolve() args.outdir.mkdir(exist_ok=True) # set default values if args.identity==None: args.identity=80 if args.coverage==None: args.coverage=80 # convert input files in Path objects args.outdir=Path(args.outdir).resolve() # resolve() finds the absolute path, and adds the '/' if needed seg_coord_path=Path(args.segment_coordinates_path).resolve() args.graph=Path(args.graph.name).resolve() args.gff=Path(args.gff.name).resolve() # run getSegCoord on the target genomes + the source genome if needed if not(args.annotation or args.variation or args.alignment or args.pav_matrix): # if we only need the graph annotation, no need to get SegCoord for all the genomes path_list=[] transfer=False else: transfer=True path_list=args.target[:] path_list.append(args.source_genome) if args.segment_coordinates_path==".": # todo : if seg_coord is given, check that all the files are there (bed for all walks + seg + walks) print("Computing the segments coordinates on the genomes") seg_coord_path=args.outdir.joinpath("seg_coord") seg_coord_path.mkdir(exist_ok=True) # create dir to store bed files seg_coord(args.graph.as_posix(),path_list,seg_coord_path,args.verbose,transfer,args.haplotype) args.segment_coordinates_path=seg_coord_path # intersect between gff and all the bed files from the source genome given by seg_coord run_intersect(args,seg_coord_path) # get the genome name that corresponds to the file_name (file_name if genome_name+walk_name). not universal..? def get_genome_name(target_genomes,file_name): for genome in target_genomes: if genome in file_name: return genome return "" def run_intersect(args,seg_coord_path): print("Computing the intersection between the annotations and the graph segments") if args.haplotype: if args.source_haplotype=='.': files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord # select haplotype source_haplotypes=[] for file in files: file_haplotype=file.name.split("#")[1] source_haplotypes.append(file_haplotype) first_source_haplo=min(source_haplotypes) args.source_haplotype=first_source_haplo haplo=f"#{first_source_haplo}#" files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome haplotype in seg_coord message=f'No bed files corresponding to the source genome haplotype {first_source_haplo} found in {seg_coord_path}' else: haplo = f"#{args.source_haplotype}#" files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome in seg_coord message=f'No bed files corresponding to the source genome haplotype found in {seg_coord_path}' if len(files)==0: sys.exit(message) if args.verbose: print(f' Found {len(files)} paths corresponding to the source genome haplotype {args.source_haplotype}') else: files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord message=f'No bed files corresponding to the source genome found in {seg_coord_path}' if len(files)==0: sys.exit(message) if args.verbose: print(f' Found {len(files)} paths corresponding to the source genome') intersect_path=args.outdir.joinpath("intersect") command=f"echo -n '' > {intersect_path}" # empty the file "intersect" subprocess.run(command,shell=True,timeout=None) for file in files: if args.verbose: print(f' Building the intersection for the path {file.stem}') command=f"bedtools intersect -nonamecheck -wo -a {file.as_posix()} -b {args.gff.as_posix()}>>{intersect_path}" subprocess.run(command,shell=True,timeout=None) intersect_lines = int(subprocess.Popen(f"wc -l {intersect_path}", shell=True, stdout=subprocess.PIPE).stdout.read().decode()[0]) if intersect_lines==0: print('\33[31m'+"Error : No lines in the intersect. Please check that the sequence id in the GFF file (1st field) matches the sequence id in the GFA file (4th field of the W lines or 3rd part of the second field of the P lines)."+'\033[0m') exit()