Skip to content
Snippets Groups Projects
argparser.py 10.6 KiB
Newer Older
import argparse
import os
import subprocess
nina.marthe_ird.fr's avatar
nina.marthe_ird.fr committed
from .getSegmentsCoordinates import seg_coord
from pathlib import Path
import sys

def dir_path(path):
    if os.path.isdir(path):
        return path
    else:
        raise argparse.ArgumentTypeError(f"{path} is not a valid path")

def range_type(astr, min=0, max=100):
    value = int(astr)
    if min<= value <= max:
        return value
    else:
        raise argparse.ArgumentTypeError('value not in range %s-%s'%(min,max))

def arg():
    parser = argparse.ArgumentParser(prog='grannot',description='Annotation transfer between genome and pangenome graph.',usage='%(prog)s  graph.gfa  annotation.gff  source_genome  [options]',add_help=False)
    graph = parser.add_argument_group('Graph Annotation Transfer')
    genome = parser.add_argument_group('Genome Annotation Transfer')
    general = parser.add_argument_group('General options')

    # input files
    parser.add_argument('graph', metavar='graph.gfa', type=argparse.FileType('r'), help='pangenome graph file in GFA format containing the genome that the annotation refers to and the target genomes. the GFA file must have W-lines to describe the genomes walks in the graph, and not P-lines')
    parser.add_argument('gff', metavar='annotation.gff', type=argparse.FileType('r') ,help='annotation file in GFF format containing the annotations to be transfered')

    # optionnal input file
    general.add_argument('-coord', '--segment_coordinates_path', metavar="path/to/files", type=dir_path, default=".", help="recommended if the program has already been run with the same genomes; takes the path to the files given by the preprocessing of the graph; if not given the program will do the preprocessing")

    # genome names
    parser.add_argument('source_genome', metavar="source_genome", type=str, help="name of the annotated genome's path in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line), that the annotation file refers to")
    general.add_argument('-sh','--source_haplotype',type=str,help="haplotype of the source genome that the annotation file reffers to; by default the first haplotype will be selected; requires the -ht/--haplotype option")
    genome.add_argument('-t', '--target', metavar="target_genome1 target_genome2", nargs='*', type=str, default=[], help="name of the target genomes' paths in the GFA (2nd field in the W line or first part (separated by '#') of the 1st field in a P line) for the annotation; if the option is not used the program will transfer the annotation on all the genomes in the graph")


    # graph annotation options
    graph.add_argument('-gff','--graph_gff',help="output the annotation of the graph in GFF format (not recommended)",action='store_true')
    graph.add_argument('-gaf','--graph_gaf',help="output the annotation of the graph in GAF format",action='store_true')

    # genome annotation options
    genome.add_argument('-ann','--annotation',help="output the annotation transfer on linear genomes in GFF format",action='store_true')
    genome.add_argument('-var','--variation',help="output the detailled variations for the transfered features",action='store_true')
    genome.add_argument('-aln','--alignment',help="output the alignment of the transfered features",action='store_true')
    genome.add_argument('-pav','--pav_matrix',help="output a presence-absence variation matrix to recapitulate the annotation transfers; requires the option -ann/--annotation",action='store_true')
    # general options
    general.add_argument('-o','--outdir',metavar="outdir",help="directory so store the output files; if not given, the output will go in the current directory",type=str,default='.')
    general.add_argument('-v','--verbose',action='store_true',help="write more information about the program's operations as it runs",default=False)
    # annotation transfer options
    genome.add_argument('-id', '--identity', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum identity percentage required for the feature to be transfered, integer between 0 and 100; default is 80; requires at least one of the following options : --ann, --aln, --var")
    genome.add_argument('-cov', '--coverage', metavar="[0-100]", type=range_type, choices=range(0,101), help="minimum coverage percentage required for the feature to be transfered, integer between 0 and 100; default is 80; requires at least one of the following options : --ann, --aln, --var")
    genome.add_argument('-ht','--haplotype',action="store_true",help="use if there are several haplotypes in the graph; default is false")
    general.add_argument('-V','--version', action='version', version='GrAnnoT 1.0.1')

    general.add_argument('-h','--help', action="help", help="show this help message and exit")

    if len(sys.argv)==1:
        print('Annotation transfer between genome and pangenome graph.',file=sys.stderr)
        parser.print_usage(sys.stderr)
        print('Type \'grannot --help\' for more help', file=sys.stderr)
        sys.exit(1)

    return parser.parse_args()

def read_args(args):

    # check the arguments
    if not (args.graph_gaf or args.graph_gff or args.annotation or args.variation or args.alignment): # todo : add pav to this list later
        sys.exit(f"grannot: error: no operation option was given; please include at least one of the following options : -gff -gaf -ann -aln -var")

    if (args.source_haplotype) and (not args.haplotype):
        sys.exit(f"grannot: error: the option -ht/--haplotype is required for the option -sh/--source_haplotype")
    if (args.pav_matrix) and (not args.annotation):
        sys.exit(f"grannot: error: the option -ann/--annotation is required for the option -pav/--pav_matrix")

    if (args.coverage) and (not (args.annotation or args.variation or args.alignment)):
        sys.exit(f"grannot: error: one of the following options is required for the option -cov/--coverage : -ann/--annotation, -aln/--alignment, -var/--variation")
    if (args.identity) and (not (args.annotation or args.variation or args.alignment)):
        sys.exit(f"grannot: error: one of the following options is required for the option -id/--identity : -ann/--annotation, -aln/--alignment, -var/--variation")
    # todo : adapt pav to have the same requirements as cov and id, or no requirement at all. change in grannot_help.md

    # check bedtools installation
    bedtools_test=subprocess.run("bedtools --version",shell=True,check=False,stdout=subprocess.DEVNULL,stderr=sys.stderr)
    if int(bedtools_test.returncode)>0:
        raise SystemExit('bedtools not installed')

    # create outdir
    args.outdir=Path(args.outdir).resolve()
    args.outdir.mkdir(exist_ok=True)

    # set default values
    if args.identity==None:
        args.identity=80
    if args.coverage==None:
        args.coverage=80

    # convert input files in Path objects
    args.outdir=Path(args.outdir).resolve() # resolve() finds the absolute path, and adds the '/' if needed
    seg_coord_path=Path(args.segment_coordinates_path).resolve()
    args.graph=Path(args.graph.name).resolve()
    args.gff=Path(args.gff.name).resolve()

    # run getSegCoord on the target genomes + the source genome if needed
    if not(args.annotation or args.variation or args.alignment or args.pav_matrix): # if we only need the graph annotation, no need to get SegCoord for all the genomes
        path_list=[]
        transfer=False
    else:
        transfer=True
        path_list=args.target[:]
    path_list.append(args.source_genome)
    if args.segment_coordinates_path==".": # todo : if seg_coord is given, check that all the files are there (bed for all walks + seg + walks)
        print("Computing the segments coordinates on the genomes")
        seg_coord_path=args.outdir.joinpath("seg_coord")
        seg_coord_path.mkdir(exist_ok=True) # create dir to store bed files
        seg_coord(args.graph.as_posix(),path_list,seg_coord_path,args.verbose,transfer,args.haplotype)
    args.segment_coordinates_path=seg_coord_path

    # intersect between gff and all the bed files from the source genome given by seg_coord
    run_intersect(args,seg_coord_path)

# get the genome name that corresponds to the file_name (file_name if genome_name+walk_name). not universal..?
def get_genome_name(target_genomes,file_name):
    for genome in target_genomes:
        if genome in file_name:
            return genome
    return ""

def run_intersect(args,seg_coord_path):
    print("Computing the intersection between the annotations and the graph segments")

    if args.haplotype:
        if args.source_haplotype=='.':
            files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
            # select haplotype
            source_haplotypes=[]
            for file in files:
                file_haplotype=file.name.split("#")[1]
                source_haplotypes.append(file_haplotype)
            first_source_haplo=min(source_haplotypes)
            args.source_haplotype=first_source_haplo
            haplo=f"#{first_source_haplo}#"
            files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome haplotype in seg_coord
            message=f'No bed files corresponding to the source genome haplotype {first_source_haplo} found in {seg_coord_path}'
        else:
            haplo = f"#{args.source_haplotype}#"
            files=list(seg_coord_path.glob(f"*{args.source_genome}*{haplo}*.bed")) # get all bed files from the source genome in seg_coord
            message=f'No bed files corresponding to the source genome haplotype found in {seg_coord_path}'
        if len(files)==0:
            sys.exit(message)
        if args.verbose:
            print(f'     Found {len(files)} paths corresponding to the source genome haplotype {args.source_haplotype}')
    else:
        files=list(seg_coord_path.glob(f"*{args.source_genome}*.bed")) # get all bed files from the source genome in seg_coord
        message=f'No bed files corresponding to the source genome found in {seg_coord_path}'
        if len(files)==0:
            sys.exit(message)
        if args.verbose:
            print(f'     Found {len(files)} paths corresponding to the source genome')

    intersect_path=args.outdir.joinpath("intersect")
    command=f"echo -n '' > {intersect_path}" # empty the file "intersect"
    subprocess.run(command,shell=True,timeout=None)
    for file in files:
        if args.verbose:
            print(f'          Building the intersection for the path {file.stem}')
        command=f"bedtools intersect -nonamecheck -wo -a {file.as_posix()} -b {args.gff.as_posix()}>>{intersect_path}"