Source code for screenpro.ngs.cas9

from time import time
import pandas as pd
import polars as pl
import biobear as bb


[docs]def fastq_to_count_single_guide( fastq_file_path:str, trim5p_start:int=None, trim5p_length:int=None, verbose: bool=False) -> pl.DataFrame: """ Count the occurrences of unique sequences in single-end FASTQ files to a DataFrame containing counts of unique sequences. e.g. single-guide design R1: protospacer Args: fastq_file_path (str): The path to the FASTQ file. trim5p_start (int, optional): The starting position for trimming the 5' end of the sequences. Defaults to None. trim5p_length (int, optional): The length of the trimmed sequences. Defaults to None. verbose (bool, optional): Whether to print verbose output. Defaults to False. Returns: pl.DataFrame: A DataFrame containing the unique sequences and their respective counts. """ if verbose: ('count unique sequences ...') t0 = time() session = bb.connect() if trim5p_start and trim5p_length: sql_cmd = f""" SELECT substr(f.sequence, {trim5p_start}, {trim5p_length}) AS protospacer, COUNT(*) as count FROM fastq_scan('{fastq_file_path}') f GROUP BY protospacer """ else: sql_cmd = f""" SELECT f.sequence AS protospacer, COUNT(*) as count FROM fastq_scan('{fastq_file_path}') f GROUP BY protospacer """ df_count = session.sql(sql_cmd).to_polars() if verbose: print("done in %0.3fs" % (time() - t0)) return df_count
[docs]def fastq_to_count_dual_guide( R1_fastq_file_path:str, R2_fastq_file_path:str, trim5p_pos1_start:int=None, trim5p_pos1_length:int=None, trim5p_pos2_start:int=None, trim5p_pos2_length:int=None, verbose: bool=False) -> pl.DataFrame: """ Count the occurrences of unique sequences in paired-end FASTQ files to a DataFrame containing counts of unique pairs of sequences. e.g. dual-guide design R1: protospacer_A, R2: protospacer_B Args: R1_fastq_file_path (str): File path of the R1 FASTQ file. R2_fastq_file_path (str): File path of the R2 FASTQ file. trim5p_pos1_start (int, optional): Start position for trimming the 5' end of the R1 sequences. Defaults to None. trim5p_pos1_length (int, optional): Length of the trimmed R1 sequences. Defaults to None. trim5p_pos2_start (int, optional): Start position for trimming the 5' end of the R2 sequences. Defaults to None. trim5p_pos2_length (int, optional): Length of the trimmed R2 sequences. Defaults to None. verbose (bool, optional): Whether to print verbose output. Defaults to False. Returns: pl.DataFrame: DataFrame containing counts of unique sequences with columns 'protospacer_A', 'protospacer_B', and 'count'. Raises: ValueError: If trim5p_pos1_start, trim5p_pos1_length, trim5p_pos2_start, and trim5p_pos2_length are not provided concurrently. """ if verbose: ('count unique sequences ...') t0 = time() session = bb.connect() if trim5p_pos1_start and trim5p_pos1_length and trim5p_pos2_start and trim5p_pos2_length: sql_cmd = f""" WITH pos1 AS ( SELECT REPLACE(name, '_R1', '') trimmed_name, * FROM fastq_scan('{R1_fastq_file_path}') ), pos2 AS ( SELECT REPLACE(name, '_R2', '') trimmed_name, * FROM fastq_scan('{R2_fastq_file_path}') ) SELECT substr(pos1.sequence, {trim5p_pos1_start}, {trim5p_pos1_length}) protospacer_A, reverse_complement(substr(pos2.sequence, {trim5p_pos2_start}, {trim5p_pos2_length})) protospacer_B, COUNT(*) count FROM pos1 JOIN pos2 ON pos1.name = pos2.name GROUP BY protospacer_A, protospacer_B """ elif trim5p_pos1_start==None and trim5p_pos1_length==None and trim5p_pos2_start==None and trim5p_pos2_length==None: sql_cmd = f""" WITH pos1 AS ( SELECT REPLACE(name, '_R1', '') trimmed_name, * FROM fastq_scan('{R1_fastq_file_path}') ), pos2 AS ( SELECT REPLACE(name, '_R2', '') trimmed_name, * FROM fastq_scan('{R2_fastq_file_path}') ) SELECT pos1.sequence protospacer_A, reverse_complement(pos2.sequence) protospacer_B, COUNT(*) count FROM pos1 JOIN pos2 ON pos1.name = pos2.name GROUP BY protospacer_A, protospacer_B """ else: raise ValueError("trim5p_pos1_start, trim5p_pos1_length, \ trim5p_pos2_start, and trim5p_pos2_length \ must be provided concurrently!") df_count = session.sql(sql_cmd).to_polars() if verbose: print("done in %0.3fs" % (time() - t0)) return df_count
[docs]def map_to_library_single_guide(df_count, library, return_type='all', verbose=False): """ Map the counts of unique sequences to a library DataFrame containing sgRNA sequences. User can choose to return mapped reads, unmapped reads, or both. Args: df_count (pandas.DataFrame): The input DataFrame containing counts. library (pandas.DataFrame): The library DataFrame to map to. return_type (str, optional): The type of result to return. Defaults to 'all'. verbose (bool, optional): Whether to print verbose information. Defaults to False. Returns: dict or pandas.DataFrame: The mapped result based on the return_type parameter. Raises: ValueError: If the return_type parameter is invalid. """ # get counts for given input res = df_count.clone() #cheap deepcopy/clone res = res.sort('count', descending=True) res = res.with_columns( pl.col("protospacer").alias("sequence"), ) res_map = pl.DataFrame(library).join( res, on="sequence", how="left" ) if return_type == 'unmapped' or return_type == 'all': res_unmap = res.join( pl.DataFrame(library), on="sequence", how="anti" ) if verbose: print("% mapped reads", 100 * \ res_map.to_pandas()['count'].fillna(0).sum() / \ int(res.select(pl.sum("count")).to_pandas()['count']) ) if return_type == 'unmapped': return res_unmap elif return_type == 'mapped': return res_map elif return_type == 'all': return {'full': res, 'mapped': res_map, 'unmapped': res_unmap} else: raise ValueError("return_type must be either 'unmapped', 'mapped', or 'all'")
[docs]def map_to_library_dual_guide(df_count, library, get_recombinant=False, return_type='all', verbose=False): """ Map the counts of unique sequences to a library DataFrame containing dual-guide sgRNA sequences. Optionally, the function can capture recombinant events. User can choose to return mapped reads, unmapped reads, recombinant events, or all. Args: df_count (pandas.DataFrame): The input DataFrame containing the counts. library (pandas.DataFrame): The library of sequences to map against. get_recombinant (bool, optional): Whether to calculate recombinant events. Defaults to False. return_type (str, optional): The type of reads to return. Can be 'unmapped', 'mapped', 'recombinant', or 'all'. Defaults to 'all'. verbose (bool, optional): Whether to print verbose output. Defaults to False. Returns: pandas.DataFrame or dict: The mapped reads based on the specified return_type. Raises: ValueError: If return_type is not one of 'unmapped', 'mapped', 'recombinant', or 'all'. ValueError: If get_recombinant is False and return_type is 'recombinant'. """ # get counts for given input res = df_count.clone() #cheap deepcopy/clone res = res.rename( {'protospacer_a':'protospacer_A','protospacer_b':'protospacer_B'} ) res = res.sort('count', descending=True) res = res.with_columns( pl.concat_str( [ pl.col("protospacer_A"), pl.col("protospacer_B") ], separator=";", ).alias("sequence"), ) # map to library res_map = pl.DataFrame(library).join( res, on="sequence", how="left" ) # get unmapped reads to the library if get_recombinant or return_type == 'unmapped' or return_type == 'all': res_unmap = res.join( pl.DataFrame(library), on="sequence", how="anti" ) if verbose: print("% mapped reads", 100 * \ res_map.to_pandas()['count'].fillna(0).sum() / \ int(res.select(pl.sum("count")).to_pandas()['count']) ) if get_recombinant: if verbose: print("% unmapped reads", 100 * \ res_unmap.to_pandas()['count'].fillna(0).sum() / \ int(res.select(pl.sum("count")).to_pandas()['count']) ) sgRNA_table = pd.concat([ library.to_pandas()[['sgID_A','protospacer_A']].rename(columns={'sgID_A':'sgID','protospacer_A':'protospacer'}), library.to_pandas()[['sgID_B', 'protospacer_B']].rename(columns={'sgID_B':'sgID','protospacer_B':'protospacer'}) ]).drop_duplicates(keep='first') res_unmap_remapped_a = res_unmap.join( pl.DataFrame(sgRNA_table.rename( columns={'protospacer':'protospacer_A','sgID':'sgID_A'})[['sgID_A','protospacer_A']]), on=["protospacer_A"], how="left" ) res_recomb_events = res_unmap_remapped_a.join( pl.DataFrame(sgRNA_table.rename( columns={'protospacer':'protospacer_B','sgID':'sgID_B'})[['sgID_B','protospacer_B']]), on=["protospacer_B"], how="left" ) if verbose: print("% fully remapped recombination events", 100 * \ res_recomb_events.drop_nulls().to_pandas()['count'].fillna(0).sum() / \ int(res.select(pl.sum("count")).to_pandas()['count']) ) if return_type == 'unmapped': # TODO: add option to return only unmapped reads after mapping recombinant events return res_unmap elif return_type == 'mapped': return res_map elif return_type == 'recombinant': if get_recombinant: return res_recomb_events else: raise ValueError("get_recombinant must be set to True to calculate recombinant events") elif return_type == 'all': if get_recombinant: return {'full': res,'mapped': res_map,'recombinant': res_recomb_events, 'unmapped': res_unmap} else: return {'full': res,'mapped': res_map, 'unmapped': res_unmap} else: raise ValueError("return_type must be either 'unmapped', 'mapped', 'recombinant', or 'all'")