Source code for screenpro.ngs

## Copyright (c) 2022-2024 ScreenPro2 Development Team.
## All rights reserved.
## Gilbart Lab, UCSF / Arc Institute.
## Multi-Omics Tech Center, Arc Insititue.

## This part of the software is conceptualized and developed by Abolfazl Arab (@abearab)
## with support from the Nick Youngblut (@nick-youngblut).

'''Scripts to work with NGS data

This module provides functions to process FASTQ files from screens with single or dual guide
libraries. In general, the algorithm is fairly simple:

1. Read the FASTQ file and extract the proper sequences
2. Count the exact number of occurrences for each unique sequence
3. Map the counted sequences to the reference sequence library
4. Return the counted mapped or unmapped events as a dataframe(s)

For single-guide screens, the sequences are counted as single protospacer
from a single-end read file (R1). Then, these sequences are mapped to the reference
library of protospacer sequences.

For dual-guide screens, the sequences are counted as pairs of protospacer A and B
from paired-end read files (R1 and R2). Then, sequences are mapped to the reference
library of protospacer A and B pairs.

Theoretically, the algorithm is able to detect any observed sequence since it is counting first
and then mapping. Therefore, the recombination events can be detected. In dual-guide design
protospacer A and B are not the same pairs as in the reference library. These events include:

- Protospacer A and B pairs are present in the reference library but paired differently
- Only one of the protospacer A and B is present in the reference library
- None of the protospacer A and B is present in the reference library
'''

import pandas as pd
import polars as pl
import anndata as ad
import os

from . import cas9
from . import cas12
from ..load import load_cas9_sgRNA_library
from simple_colors import green


[docs]class GuideCounter: '''Class to count sequences from FASTQ files ''' def __init__(self, cas_type, library_type): self.cas_type = cas_type self.library_type = library_type self.counts_dict = None self.counts_mat = None self.recombinants = None
[docs] def load_library(self, library_path, sep='\t', index_col=0, protospacer_length=19, verbose=False, **args): '''Load library file ''' if self.cas_type == 'cas9': library = load_cas9_sgRNA_library(library_path, library_type=self.library_type, sep=sep, index_col=index_col, protospacer_length=protospacer_length, verbose=verbose, **args) # Check if the library has duplicate sequences and remove them if library.duplicated('sequence').any(): shape_before_dedup = library.shape[0] library = library.drop_duplicates(subset='sequence', keep='first') shape_after_dedup = library.shape[0] if verbose: print(f"Warning: {shape_before_dedup - shape_after_dedup} duplicate sgRNA sequences found and removed.") if self.library_type == "single_guide_design": sgRNA_table = library[['target','sgID','protospacer']].set_index('sgID') elif self.library_type == "dual_guide_design": sgRNA_table = pd.concat([ library[['target','sgID_A','protospacer_A']].rename(columns={'sgID_A':'sgID','protospacer_A':'protospacer'}), library[['target','sgID_B', 'protospacer_B']].rename(columns={'sgID_B':'sgID','protospacer_B':'protospacer'}) ]) # drop duplicates and set index sgRNA_table = sgRNA_table.drop_duplicates(keep='first') if verbose: print('total # of cas9 sgRNAs:', sgRNA_table.shape[0]) elif self.cas_type == 'cas12': raise NotImplementedError("Cas12 library is not yet implemented.") # covert to polar DataFrame library = pl.from_pandas(library) sgRNA_table = pl.from_pandas(sgRNA_table) self.library = library self.sgRNA_table = sgRNA_table
def _process_cas9_single_guide_sample(self, fastq_dir, sample_id, trim_first_g, protospacer_length, write, verbose=False): if verbose: print(green(sample_id, ['bold'])) get_counts = True # check if df_count is already available if os.path.exists(f'{fastq_dir}/{sample_id}_count.arrow'): if verbose: print('count file exists ...') if write != "force": df_count = pl.read_ipc_stream(f'{fastq_dir}/{sample_id}_count.arrow') get_counts = False else: if verbose: print('skip loading count file, force write is set ...') if get_counts: if trim_first_g: trim5p_start = 2 else: trim5p_start = 1 df_count = cas9.fastq_to_count_single_guide( fastq_file_path=f'{fastq_dir}/{sample_id}.fastq.gz', trim5p_start=trim5p_start, trim5p_length=protospacer_length, verbose=verbose ) if write == "force" or write == True: # write df_count to file df_count.write_ipc_stream(f'{fastq_dir}/{sample_id}_count.arrow', compression='lz4') if verbose: print('count file written ...') out = cas9.map_to_library_single_guide( df_count=df_count, library=self.library, return_type='all', verbose=verbose ) return out def _process_cas9_dual_guide_sample(self, fastq_dir, sample_id, get_recombinant, trim_first_g, protospacer_A_length, protospacer_B_length, write, verbose=False): if verbose: print(green(sample_id, ['bold'])) get_counts = True # check if df_count is already available if os.path.exists(f'{fastq_dir}/{sample_id}_count.arrow'): if verbose: print('count file exists ...') if write != "force": df_count = pl.read_ipc_stream(f'{fastq_dir}/{sample_id}_count.arrow') get_counts = False else: if verbose: print('skip loading count file, force write is set ...') if get_counts: if get_counts: if trim_first_g == True or trim_first_g == {'A':True, 'B':True}: trim5p_pos1_start = 2 trim5p_pos2_start = 2 elif trim_first_g == False or trim_first_g == {'A':False, 'B':False}: trim5p_pos1_start = 1 trim5p_pos2_start = 1 elif trim_first_g == {'A':True, 'B':False}: trim5p_pos1_start = 2 trim5p_pos2_start = 1 elif trim_first_g == {'A':False, 'B':True}: trim5p_pos1_start = 1 trim5p_pos2_start = 2 else: raise ValueError("Invalid trim_first_g argument. Please provide a boolean or a dictionary with 'A' and 'B' keys.") df_count = cas9.fastq_to_count_dual_guide( R1_fastq_file_path=f'{fastq_dir}/{sample_id}_R1.fastq.gz', R2_fastq_file_path=f'{fastq_dir}/{sample_id}_R2.fastq.gz', trim5p_pos1_start=trim5p_pos1_start, trim5p_pos1_length=protospacer_A_length, trim5p_pos2_start=trim5p_pos2_start, trim5p_pos2_length=protospacer_B_length, verbose=verbose ) if write == "force" or write == True: # write df_count to file df_count.write_ipc_stream(f'{fastq_dir}/{sample_id}_count.arrow', compression='lz4') if verbose: print('count file written ...') out = cas9.map_to_library_dual_guide( df_count=df_count, library=self.library, get_recombinant=get_recombinant, return_type='all', verbose=verbose ) return out
[docs] def get_counts_matrix(self, fastq_dir, samples, get_recombinant=False, cas_type='cas9', protospacer_length='auto', trim_first_g=False, write=True, verbose=False): '''Get count matrix for given samples ''' if self.cas_type == 'cas9': counts = {} if self.library_type == "single_guide_design": if get_recombinant: raise ValueError("Recombinants are not applicable for single guide design!") if protospacer_length == 'auto': protospacer_length = self.library['protospacer'].str.len_bytes().unique().to_list()[0] for sample_id in samples: cnt = self._process_cas9_single_guide_sample( fastq_dir=fastq_dir, sample_id=sample_id, trim_first_g=trim_first_g, protospacer_length=protospacer_length, write=write, verbose=verbose ) counts[sample_id] = cnt['mapped'] counts_mat = pd.concat([ counts[sample_id].to_pandas().set_index('sgID')['count'].rename(sample_id) for sample_id in counts.keys() ],axis=1).fillna(0) elif self.library_type == "dual_guide_design": if get_recombinant: recombinants = {} if protospacer_length == 'auto': protospacer_A_length = self.library['protospacer_A'].str.len_bytes().unique().to_list()[0] protospacer_B_length = self.library['protospacer_B'].str.len_bytes().unique().to_list()[0] elif isinstance(protospacer_length, dict): protospacer_A_length = protospacer_length['protospacer_A'] protospacer_B_length = protospacer_length['protospacer_B'] elif isinstance(protospacer_length, int): protospacer_A_length = protospacer_length protospacer_B_length = protospacer_length else: raise ValueError("Invalid protospacer_length argument. If not 'auto', please provide an integer or a dictionary with 'protospacer_A' and 'protospacer_B' keys.") for sample_id in samples: cnt = self._process_cas9_dual_guide_sample( fastq_dir=fastq_dir, sample_id=sample_id, get_recombinant=get_recombinant, trim_first_g=trim_first_g, protospacer_A_length=protospacer_A_length, protospacer_B_length=protospacer_B_length, write=write, verbose=verbose ) counts[sample_id] = cnt['mapped'] if get_recombinant: recombinants[sample_id] = cnt['recombinant'] counts_mat = pd.concat([ counts[sample_id].to_pandas().set_index('sgID_AB')['count'].rename(sample_id) for sample_id in counts.keys() ],axis=1).fillna(0) else: raise ValueError("Invalid library type. Please choose from 'single_guide_design' or 'dual_guide_design'.") if cas_type == 'cas12': # TODO: Implement codes to build count matrix for given samples raise NotImplementedError("Cas12 count matrix is not yet implemented.") self.counts_dict = counts self.counts_mat = counts_mat if get_recombinant: self.recombinants = recombinants
[docs] def load_counts_matrix(self, counts_mat_path, **kwargs): '''Load count matrix from file ''' self.counts_mat = pd.read_csv(counts_mat_path, **kwargs)
def _build_cas9_dual_guide_var_table(self, counts_table, source, ctrl_label='negative_control'): '''Build variant table for dual guide design Args: counts_table (pd.DataFrame): count table for dual guide design (e.g. main library mapped counts or recombinant counts) ''' if source=='library': var_table = pd.DataFrame( counts_table.index.str.split('|').to_list(), index = counts_table.index.to_list(), columns=['sgID_A','sgID_B'] ) elif source=='recombinant': var_table = pd.DataFrame( counts_table.index.to_list(), index = ['|'.join(i) for i in counts_table.index.to_list()], columns=['sgID_A','sgID_B'] ) var_table.index.name = 'sgID_AB' sgRNA_table = self.sgRNA_table.to_pandas().set_index('sgID') #TODO: extract "target" values from protospacer IDs var_table = pd.concat([ var_table.reset_index().reset_index(drop=True), sgRNA_table.loc[var_table['sgID_A']].rename(columns={'target':'target_A', 'protospacer':'protospacer_A'}).reset_index(drop=True), sgRNA_table.loc[var_table['sgID_B']].rename(columns={'target':'target_B', 'protospacer':'protospacer_B'}).reset_index(drop=True), ], axis=1).set_index('sgID_AB') var_table['targetType'] = '' var_table['target'] = '' ### assign target types: negative_control control_targets = (var_table.target_A.eq(ctrl_label)) & (var_table.target_B.eq(ctrl_label)) var_table.loc[control_targets,'targetType'] = 'negative_control' var_table.loc[control_targets,'target'] = ctrl_label ### assign target types: gene same_gene_targets = (var_table.target_A == var_table.target_B) & ~(var_table.target_A.eq(ctrl_label)) & ~(var_table.target_B.eq(ctrl_label)) var_table.loc[same_gene_targets,'targetType'] = 'gene' var_table.loc[same_gene_targets,'target'] = var_table.target_A # or target_B ### assign target types: gene-negative_control gene_control_targets = ~(var_table.target_A.eq(ctrl_label)) & (var_table.target_B.eq(ctrl_label)) var_table.loc[gene_control_targets,'targetType'] = 'gene--negative_control' var_table.loc[gene_control_targets,'target'] = var_table.target_A + '|' + var_table.target_B ### assign target types: negative_control-gene control_gene_targets = (var_table.target_A.eq(ctrl_label)) & ~(var_table.target_B.eq(ctrl_label)) var_table.loc[control_gene_targets,'targetType'] = 'negative_control--negative_control' var_table.loc[control_gene_targets,'target'] = var_table.target_A + '|' + var_table.target_B ### assign target types: gene-gene gene_gene_targets = (var_table.target_A != var_table.target_B) & ~(var_table.target_A.eq(ctrl_label)) & ~(var_table.target_B.eq(ctrl_label)) var_table.loc[gene_gene_targets,'targetType'] = 'gene--gene' var_table.loc[gene_gene_targets,'target'] = var_table.target_A + '|' + var_table.target_B var_table.index.name = None var_table.targetType = pd.Categorical( var_table.targetType, categories=[ 'gene','gene--gene', 'gene--negative_control','negative_control--gene', 'negative_control' ] ).remove_unused_categories() var_table['sequence'] = var_table['protospacer_A'] + ';' + var_table['protospacer_B'] return var_table
[docs] def build_counts_anndata(self, source='library', verbose=False): '''Build AnnData object from count matrix ''' if source == 'recombinant' and self.library_type == "single_guide_design": raise ValueError("Recombinants are not applicable for single guide design!") if source == 'recombinant' and self.recombinants is None: raise ValueError("Recombinants are not available. If applicable, please set get_recombinant=True in get_counts_matrix method.") if self.library_type == "single_guide_design": adata = ad.AnnData( X = self.counts_mat.T, var = self.library.to_pandas().set_index('sgID') ) elif self.library_type == "dual_guide_design": adata = ad.AnnData( X = self.counts_mat.T, var = self._build_cas9_dual_guide_var_table(self.counts_mat, source='library') ) if source == 'recombinant': counts_recombinants = {} for sample in self.recombinants.keys(): if verbose: print(green(sample, ['bold'])) d = self.recombinants[sample].drop_nulls() d = d.to_pandas() counts_recombinants[sample] = d.set_index(['sgID_A','sgID_B'])['count'] if verbose: print('recombinant count added ...') counts_recombinants = pd.concat(counts_recombinants,axis=1).fillna(0) if verbose: print('recombinant count matrix built ...') var_table = self._build_cas9_dual_guide_var_table(counts_recombinants, source='recombinant') rdata = ad.AnnData( X = counts_recombinants.T.to_numpy(), var = var_table, obs = adata.obs ) if verbose: print('recombinant AnnData created.') if source == 'mapped' or source == 'library': return adata elif source == 'recombinant': return rdata else: raise ValueError("Invalid source argument. Please choose from 'mapped', 'recombinant' or 'library'. Note: 'mapped' and 'library' act the same way.")