import argparse import os import subprocess from .getSegmentsCoordinates import seg_coord from pathlib import Path import sys from tqdm import tqdm def dir_path(path): if os.path.isdir(path): return path else: raise argparse.ArgumentTypeError(f"readable_dir:{path} is not a valid path") def range_type(astr, min=0, max=100): value = int(astr) if min<= value <= max: return value else: raise argparse.ArgumentTypeError('value not in range %s-%s'%(min,max)) def arg(): first_parser=argparse.ArgumentParser(add_help=False) first_parser.add_argument('-pav','--pav_matrix',help="output a presence-absence variation matrix to recapitulate the annotation transfers; requires the -ann/--annotation option",action='store_true') # annotation transfer options first_parser.add_argument('-id', '--identity', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum identity percentage required for the feature to be transfered, between 0 and 100; default is 80") first_parser.add_argument('-cov', '--coverage', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum coverage percentage required for the feature to be transfered, between 0 and 100; default is 80") first_parser.add_argument('-sh','--source_haplotype',type=str,default='.',help="haplotype of the source genome that the annotation file reffers to; by default the first haplotype will be selected; requires the -ht/--haplotype option") first_args, _ = first_parser.parse_known_args() parser = argparse.ArgumentParser(prog='grannot',description='Annotation transfer between genome and pangenome graph.',parents=[first_parser],usage='%(prog)s graph_file source_annotation_file source_genome_name [options]') # input files parser.add_argument('graph', metavar='graph.gfa', type=argparse.FileType('r'), help='pangenome graph file in GFA format containing the genome that the annotation refers to and the target genomes. the GFA file must have W-lines to describe the genomes walks in the graph, and not P-lines') parser.add_argument('-ht','--haplotype',action="store_true",required=(first_args.source_haplotype!='.'),help="use if there are several haplotypes in the graph; default is false") parser.add_argument('gff', metavar='annotation.gff', type=argparse.FileType('r') ,help='annotation file in GFF format containing the annotations to be transfered') # optionnal input file parser.add_argument('-coord', '--segment_coordinates_path', metavar="path/to/files", type=dir_path, default=".", help="path to the files given by the preprocessing of the graph; if not given the program will do the preprocessing; recommended if the program will run several times") # genome names parser.add_argument('source_genome', metavar="source_genome", type=str, help="name of the annotated genome's path in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line), that the annotation file refers to") parser.add_argument('-t', '--target', metavar="target_genome", nargs='*', type=str, default=[], help="name of the target genomes' paths in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line) for the annotation; if the option is not used the programm will transfer the annotation on all the genomes") # graph annotation options parser.add_argument('-gff','--graph_gff',help="output the annotation of the graph in GFF format (not recommended)",action='store_true') parser.add_argument('-gaf','--graph_gaf',help="output the annotation of the graph in GAF format",action='store_true') # genome annotation options parser.add_argument('-ann','--annotation',help="output the annotation transfer in GFF format",action='store_true',required=(first_args.pav_matrix)) parser.add_argument('-var','--variation',help="output the detail of the variations for the transfered features",action='store_true') parser.add_argument('-aln','--alignment',help="output the alignment of the transfered features",action='store_true') # general options parser.add_argument('-o','--outdir',metavar="outdir",help="directory so store the output files; if not given, the output will go in the current directory",type=str,default='.') parser.add_argument('-v','--verbose',action='store_true',help="write more information about the program's operations as it runs",default=False) # version parser.add_argument('--version', action='version', version='GrAnnoT 1.0.0') return parser.parse_args() def read_args(args): # check bedtools installation bedtools_test=subprocess.run("bedtools --version",shell=True,check=False,stdout=subprocess.DEVNULL,stderr=sys.stderr) if int(bedtools_test.returncode)>0: raise SystemExit('bedtools not installed') # create outdir args.outdir=Path(args.outdir).resolve() args.outdir.mkdir(exist_ok=True) # set default values if args.identity==None: args.identity=80 if args.coverage==None: args.coverage=80 # convert input files in Path objects args.outdir=Path(args.outdir).resolve() # resolve() finds the absolute path, and adds the '/' if needed seg_coord_path=Path(args.segment_coordinates_path).resolve() args.graph=Path(args.graph.name).resolve() args.gff=Path(args.gff.name).resolve() # run getSegCoord on the target genomes + the source genome if needed if not(args.annotation or args.variation or args.alignment or args.pav_matrix): # if we only need the graph annotation, no need to get SegCoord for all the genomes path_list=[] transfer=False else: transfer=True path_list=args.target[:] path_list.append(args.source_genome) if args.segment_coordinates_path==".": print("Computing the segments coordinates on the genomes") seg_coord_path=args.outdir.joinpath("seg_coord") seg_coord_path.mkdir(exist_ok=True) # create dir to store bed files seg_coord(args.graph.as_posix(),path_list,seg_coord_path,args.verbose,transfer,args.haplotype) args.segment_coordinates_path=seg_coord_path # intersect between gff and all the bed files from the source genome given by seg_coord run_intersect(args,seg_coord_path) # get the genome name that corresponds to the file_name (file_name if genome_name+walk_name). not universal..? def get_genome_name(target_genomes,file_name): for genome in target_genomes: if genome in file_name: return genome return "" def run_intersect(args,seg_coord_path): print("Computing the intersection between the annotations and the graph segments") if args.haplotype: if args.source_haplotype=='.': files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord # select haplotype source_haplotypes=[] for file in files: file_haplotype=file.name.split("#")[1] source_haplotypes.append(file_haplotype) first_source_haplo=min(source_haplotypes) args.source_haplotype=first_source_haplo haplo=f"#{first_source_haplo}#" files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome haplotype in seg_coord message=f'No bed files corresponding to the source genome haplotype {first_source_haplo} found in {seg_coord_path}' else: haplo = f"#{args.source_haplotype}#" files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome in seg_coord message=f'No bed files corresponding to the source genome haplotype found in {seg_coord_path}' if len(files)==0: sys.exit(message) if args.verbose: print(f' Found {len(files)} paths corresponding to the source genome haplotype {args.source_haplotype}') else: files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord message=f'No bed files corresponding to the source genome found in {seg_coord_path}' if len(files)==0: sys.exit(message) if args.verbose: print(f' Found {len(files)} paths corresponding to the source genome') intersect_path=args.outdir.joinpath("intersect") command=f"echo -n '' > {intersect_path}" # empty the file "intersect" subprocess.run(command,shell=True,timeout=None) for file in files: if args.verbose: print(f' Building the intersection for the path {file.stem}') command=f"bedtools intersect -wo -a {file.as_posix()} -b {args.gff.as_posix()}>>{intersect_path}" subprocess.run(command,shell=True,timeout=None)