Spaces:
Running
on
T4
Running
on
T4
# Copyright 2021 DeepMind Technologies Limited | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Functions for getting templates and calculating template features.""" | |
import dataclasses | |
import datetime | |
import glob | |
import os | |
import re | |
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple | |
from absl import logging | |
from alphafold.common import residue_constants | |
from alphafold.data import mmcif_parsing | |
from alphafold.data import parsers | |
from alphafold.data.tools import kalign | |
import numpy as np | |
# Internal import (7716). | |
class Error(Exception): | |
"""Base class for exceptions.""" | |
class NoChainsError(Error): | |
"""An error indicating that template mmCIF didn't have any chains.""" | |
class SequenceNotInTemplateError(Error): | |
"""An error indicating that template mmCIF didn't contain the sequence.""" | |
class NoAtomDataInTemplateError(Error): | |
"""An error indicating that template mmCIF didn't contain atom positions.""" | |
class TemplateAtomMaskAllZerosError(Error): | |
"""An error indicating that template mmCIF had all atom positions masked.""" | |
class QueryToTemplateAlignError(Error): | |
"""An error indicating that the query can't be aligned to the template.""" | |
class CaDistanceError(Error): | |
"""An error indicating that a CA atom distance exceeds a threshold.""" | |
class MultipleChainsError(Error): | |
"""An error indicating that multiple chains were found for a given ID.""" | |
# Prefilter exceptions. | |
class PrefilterError(Exception): | |
"""A base class for template prefilter exceptions.""" | |
class DateError(PrefilterError): | |
"""An error indicating that the hit date was after the max allowed date.""" | |
class PdbIdError(PrefilterError): | |
"""An error indicating that the hit PDB ID was identical to the query.""" | |
class AlignRatioError(PrefilterError): | |
"""An error indicating that the hit align ratio to the query was too small.""" | |
class DuplicateError(PrefilterError): | |
"""An error indicating that the hit was an exact subsequence of the query.""" | |
class LengthError(PrefilterError): | |
"""An error indicating that the hit was too short.""" | |
TEMPLATE_FEATURES = { | |
'template_aatype': np.float32, | |
'template_all_atom_masks': np.float32, | |
'template_all_atom_positions': np.float32, | |
'template_domain_names': np.object, | |
'template_sequence': np.object, | |
'template_sum_probs': np.float32, | |
} | |
def _get_pdb_id_and_chain(hit: parsers.TemplateHit) -> Tuple[str, str]: | |
"""Returns PDB id and chain id for an HHSearch Hit.""" | |
# PDB ID: 4 letters. Chain ID: 1+ alphanumeric letters or "." if unknown. | |
id_match = re.match(r'[a-zA-Z\d]{4}_[a-zA-Z0-9.]+', hit.name) | |
if not id_match: | |
raise ValueError(f'hit.name did not start with PDBID_chain: {hit.name}') | |
pdb_id, chain_id = id_match.group(0).split('_') | |
return pdb_id.lower(), chain_id | |
def _is_after_cutoff( | |
pdb_id: str, | |
release_dates: Mapping[str, datetime.datetime], | |
release_date_cutoff: Optional[datetime.datetime]) -> bool: | |
"""Checks if the template date is after the release date cutoff. | |
Args: | |
pdb_id: 4 letter pdb code. | |
release_dates: Dictionary mapping PDB ids to their structure release dates. | |
release_date_cutoff: Max release date that is valid for this query. | |
Returns: | |
True if the template release date is after the cutoff, False otherwise. | |
""" | |
if release_date_cutoff is None: | |
raise ValueError('The release_date_cutoff must not be None.') | |
if pdb_id in release_dates: | |
return release_dates[pdb_id] > release_date_cutoff | |
else: | |
# Since this is just a quick prefilter to reduce the number of mmCIF files | |
# we need to parse, we don't have to worry about returning True here. | |
logging.warning('Template structure not in release dates dict: %s', pdb_id) | |
return False | |
def _parse_obsolete(obsolete_file_path: str) -> Mapping[str, str]: | |
"""Parses the data file from PDB that lists which PDB ids are obsolete.""" | |
with open(obsolete_file_path) as f: | |
result = {} | |
for line in f: | |
line = line.strip() | |
# We skip obsolete entries that don't contain a mapping to a new entry. | |
if line.startswith('OBSLTE') and len(line) > 30: | |
# Format: Date From To | |
# 'OBSLTE 31-JUL-94 116L 216L' | |
from_id = line[20:24].lower() | |
to_id = line[29:33].lower() | |
result[from_id] = to_id | |
return result | |
def _parse_release_dates(path: str) -> Mapping[str, datetime.datetime]: | |
"""Parses release dates file, returns a mapping from PDBs to release dates.""" | |
if path.endswith('txt'): | |
release_dates = {} | |
with open(path, 'r') as f: | |
for line in f: | |
pdb_id, date = line.split(':') | |
date = date.strip() | |
# Python 3.6 doesn't have datetime.date.fromisoformat() which is about | |
# 90x faster than strptime. However, splitting the string manually is | |
# about 10x faster than strptime. | |
release_dates[pdb_id.strip()] = datetime.datetime( | |
year=int(date[:4]), month=int(date[5:7]), day=int(date[8:10])) | |
return release_dates | |
else: | |
raise ValueError('Invalid format of the release date file %s.' % path) | |
def _assess_hhsearch_hit( | |
hit: parsers.TemplateHit, | |
hit_pdb_code: str, | |
query_sequence: str, | |
query_pdb_code: Optional[str], | |
release_dates: Mapping[str, datetime.datetime], | |
release_date_cutoff: datetime.datetime, | |
max_subsequence_ratio: float = 0.95, | |
min_align_ratio: float = 0.1) -> bool: | |
"""Determines if template is valid (without parsing the template mmcif file). | |
Args: | |
hit: HhrHit for the template. | |
hit_pdb_code: The 4 letter pdb code of the template hit. This might be | |
different from the value in the actual hit since the original pdb might | |
have become obsolete. | |
query_sequence: Amino acid sequence of the query. | |
query_pdb_code: 4 letter pdb code of the query. | |
release_dates: Dictionary mapping pdb codes to their structure release | |
dates. | |
release_date_cutoff: Max release date that is valid for this query. | |
max_subsequence_ratio: Exclude any exact matches with this much overlap. | |
min_align_ratio: Minimum overlap between the template and query. | |
Returns: | |
True if the hit passed the prefilter. Raises an exception otherwise. | |
Raises: | |
DateError: If the hit date was after the max allowed date. | |
PdbIdError: If the hit PDB ID was identical to the query. | |
AlignRatioError: If the hit align ratio to the query was too small. | |
DuplicateError: If the hit was an exact subsequence of the query. | |
LengthError: If the hit was too short. | |
""" | |
aligned_cols = hit.aligned_cols | |
align_ratio = aligned_cols / len(query_sequence) | |
template_sequence = hit.hit_sequence.replace('-', '') | |
length_ratio = float(len(template_sequence)) / len(query_sequence) | |
# Check whether the template is a large subsequence or duplicate of original | |
# query. This can happen due to duplicate entries in the PDB database. | |
duplicate = (template_sequence in query_sequence and | |
length_ratio > max_subsequence_ratio) | |
if _is_after_cutoff(hit_pdb_code, release_dates, release_date_cutoff): | |
raise DateError(f'Date ({release_dates[hit_pdb_code]}) > max template date ' | |
f'({release_date_cutoff}).') | |
if query_pdb_code is not None: | |
if query_pdb_code.lower() == hit_pdb_code.lower(): | |
raise PdbIdError('PDB code identical to Query PDB code.') | |
if align_ratio <= min_align_ratio: | |
raise AlignRatioError('Proportion of residues aligned to query too small. ' | |
f'Align ratio: {align_ratio}.') | |
if duplicate: | |
raise DuplicateError('Template is an exact subsequence of query with large ' | |
f'coverage. Length ratio: {length_ratio}.') | |
if len(template_sequence) < 10: | |
raise LengthError(f'Template too short. Length: {len(template_sequence)}.') | |
return True | |
def _find_template_in_pdb( | |
template_chain_id: str, | |
template_sequence: str, | |
mmcif_object: mmcif_parsing.MmcifObject) -> Tuple[str, str, int]: | |
"""Tries to find the template chain in the given pdb file. | |
This method tries the three following things in order: | |
1. Tries if there is an exact match in both the chain ID and the sequence. | |
If yes, the chain sequence is returned. Otherwise: | |
2. Tries if there is an exact match only in the sequence. | |
If yes, the chain sequence is returned. Otherwise: | |
3. Tries if there is a fuzzy match (X = wildcard) in the sequence. | |
If yes, the chain sequence is returned. | |
If none of these succeed, a SequenceNotInTemplateError is thrown. | |
Args: | |
template_chain_id: The template chain ID. | |
template_sequence: The template chain sequence. | |
mmcif_object: The PDB object to search for the template in. | |
Returns: | |
A tuple with: | |
* The chain sequence that was found to match the template in the PDB object. | |
* The ID of the chain that is being returned. | |
* The offset where the template sequence starts in the chain sequence. | |
Raises: | |
SequenceNotInTemplateError: If no match is found after the steps described | |
above. | |
""" | |
# Try if there is an exact match in both the chain ID and the (sub)sequence. | |
pdb_id = mmcif_object.file_id | |
chain_sequence = mmcif_object.chain_to_seqres.get(template_chain_id) | |
if chain_sequence and (template_sequence in chain_sequence): | |
logging.info( | |
'Found an exact template match %s_%s.', pdb_id, template_chain_id) | |
mapping_offset = chain_sequence.find(template_sequence) | |
return chain_sequence, template_chain_id, mapping_offset | |
# Try if there is an exact match in the (sub)sequence only. | |
for chain_id, chain_sequence in mmcif_object.chain_to_seqres.items(): | |
if chain_sequence and (template_sequence in chain_sequence): | |
logging.info('Found a sequence-only match %s_%s.', pdb_id, chain_id) | |
mapping_offset = chain_sequence.find(template_sequence) | |
return chain_sequence, chain_id, mapping_offset | |
# Return a chain sequence that fuzzy matches (X = wildcard) the template. | |
# Make parentheses unnamed groups (?:_) to avoid the 100 named groups limit. | |
regex = ['.' if aa == 'X' else '(?:%s|X)' % aa for aa in template_sequence] | |
regex = re.compile(''.join(regex)) | |
for chain_id, chain_sequence in mmcif_object.chain_to_seqres.items(): | |
match = re.search(regex, chain_sequence) | |
if match: | |
logging.info('Found a fuzzy sequence-only match %s_%s.', pdb_id, chain_id) | |
mapping_offset = match.start() | |
return chain_sequence, chain_id, mapping_offset | |
# No hits, raise an error. | |
raise SequenceNotInTemplateError( | |
'Could not find the template sequence in %s_%s. Template sequence: %s, ' | |
'chain_to_seqres: %s' % (pdb_id, template_chain_id, template_sequence, | |
mmcif_object.chain_to_seqres)) | |
def _realign_pdb_template_to_query( | |
old_template_sequence: str, | |
template_chain_id: str, | |
mmcif_object: mmcif_parsing.MmcifObject, | |
old_mapping: Mapping[int, int], | |
kalign_binary_path: str) -> Tuple[str, Mapping[int, int]]: | |
"""Aligns template from the mmcif_object to the query. | |
In case PDB70 contains a different version of the template sequence, we need | |
to perform a realignment to the actual sequence that is in the mmCIF file. | |
This method performs such realignment, but returns the new sequence and | |
mapping only if the sequence in the mmCIF file is 90% identical to the old | |
sequence. | |
Note that the old_template_sequence comes from the hit, and contains only that | |
part of the chain that matches with the query while the new_template_sequence | |
is the full chain. | |
Args: | |
old_template_sequence: The template sequence that was returned by the PDB | |
template search (typically done using HHSearch). | |
template_chain_id: The template chain id was returned by the PDB template | |
search (typically done using HHSearch). This is used to find the right | |
chain in the mmcif_object chain_to_seqres mapping. | |
mmcif_object: A mmcif_object which holds the actual template data. | |
old_mapping: A mapping from the query sequence to the template sequence. | |
This mapping will be used to compute the new mapping from the query | |
sequence to the actual mmcif_object template sequence by aligning the | |
old_template_sequence and the actual template sequence. | |
kalign_binary_path: The path to a kalign executable. | |
Returns: | |
A tuple (new_template_sequence, new_query_to_template_mapping) where: | |
* new_template_sequence is the actual template sequence that was found in | |
the mmcif_object. | |
* new_query_to_template_mapping is the new mapping from the query to the | |
actual template found in the mmcif_object. | |
Raises: | |
QueryToTemplateAlignError: | |
* If there was an error thrown by the alignment tool. | |
* Or if the actual template sequence differs by more than 10% from the | |
old_template_sequence. | |
""" | |
aligner = kalign.Kalign(binary_path=kalign_binary_path) | |
new_template_sequence = mmcif_object.chain_to_seqres.get( | |
template_chain_id, '') | |
# Sometimes the template chain id is unknown. But if there is only a single | |
# sequence within the mmcif_object, it is safe to assume it is that one. | |
if not new_template_sequence: | |
if len(mmcif_object.chain_to_seqres) == 1: | |
logging.info('Could not find %s in %s, but there is only 1 sequence, so ' | |
'using that one.', | |
template_chain_id, | |
mmcif_object.file_id) | |
new_template_sequence = list(mmcif_object.chain_to_seqres.values())[0] | |
else: | |
raise QueryToTemplateAlignError( | |
f'Could not find chain {template_chain_id} in {mmcif_object.file_id}. ' | |
'If there are no mmCIF parsing errors, it is possible it was not a ' | |
'protein chain.') | |
try: | |
(old_aligned_template, new_aligned_template), _ = parsers.parse_a3m( | |
aligner.align([old_template_sequence, new_template_sequence])) | |
except Exception as e: | |
raise QueryToTemplateAlignError( | |
'Could not align old template %s to template %s (%s_%s). Error: %s' % | |
(old_template_sequence, new_template_sequence, mmcif_object.file_id, | |
template_chain_id, str(e))) | |
logging.info('Old aligned template: %s\nNew aligned template: %s', | |
old_aligned_template, new_aligned_template) | |
old_to_new_template_mapping = {} | |
old_template_index = -1 | |
new_template_index = -1 | |
num_same = 0 | |
for old_template_aa, new_template_aa in zip( | |
old_aligned_template, new_aligned_template): | |
if old_template_aa != '-': | |
old_template_index += 1 | |
if new_template_aa != '-': | |
new_template_index += 1 | |
if old_template_aa != '-' and new_template_aa != '-': | |
old_to_new_template_mapping[old_template_index] = new_template_index | |
if old_template_aa == new_template_aa: | |
num_same += 1 | |
# Require at least 90 % sequence identity wrt to the shorter of the sequences. | |
if float(num_same) / min( | |
len(old_template_sequence), len(new_template_sequence)) < 0.9: | |
raise QueryToTemplateAlignError( | |
'Insufficient similarity of the sequence in the database: %s to the ' | |
'actual sequence in the mmCIF file %s_%s: %s. We require at least ' | |
'90 %% similarity wrt to the shorter of the sequences. This is not a ' | |
'problem unless you think this is a template that should be included.' % | |
(old_template_sequence, mmcif_object.file_id, template_chain_id, | |
new_template_sequence)) | |
new_query_to_template_mapping = {} | |
for query_index, old_template_index in old_mapping.items(): | |
new_query_to_template_mapping[query_index] = ( | |
old_to_new_template_mapping.get(old_template_index, -1)) | |
new_template_sequence = new_template_sequence.replace('-', '') | |
return new_template_sequence, new_query_to_template_mapping | |
def _check_residue_distances(all_positions: np.ndarray, | |
all_positions_mask: np.ndarray, | |
max_ca_ca_distance: float): | |
"""Checks if the distance between unmasked neighbor residues is ok.""" | |
ca_position = residue_constants.atom_order['CA'] | |
prev_is_unmasked = False | |
prev_calpha = None | |
for i, (coords, mask) in enumerate(zip(all_positions, all_positions_mask)): | |
this_is_unmasked = bool(mask[ca_position]) | |
if this_is_unmasked: | |
this_calpha = coords[ca_position] | |
if prev_is_unmasked: | |
distance = np.linalg.norm(this_calpha - prev_calpha) | |
if distance > max_ca_ca_distance: | |
raise CaDistanceError( | |
'The distance between residues %d and %d is %f > limit %f.' % ( | |
i, i + 1, distance, max_ca_ca_distance)) | |
prev_calpha = this_calpha | |
prev_is_unmasked = this_is_unmasked | |
def _get_atom_positions( | |
mmcif_object: mmcif_parsing.MmcifObject, | |
auth_chain_id: str, | |
max_ca_ca_distance: float) -> Tuple[np.ndarray, np.ndarray]: | |
"""Gets atom positions and mask from a list of Biopython Residues.""" | |
num_res = len(mmcif_object.chain_to_seqres[auth_chain_id]) | |
relevant_chains = [c for c in mmcif_object.structure.get_chains() | |
if c.id == auth_chain_id] | |
if len(relevant_chains) != 1: | |
raise MultipleChainsError( | |
f'Expected exactly one chain in structure with id {auth_chain_id}.') | |
chain = relevant_chains[0] | |
all_positions = np.zeros([num_res, residue_constants.atom_type_num, 3]) | |
all_positions_mask = np.zeros([num_res, residue_constants.atom_type_num], | |
dtype=np.int64) | |
for res_index in range(num_res): | |
pos = np.zeros([residue_constants.atom_type_num, 3], dtype=np.float32) | |
mask = np.zeros([residue_constants.atom_type_num], dtype=np.float32) | |
res_at_position = mmcif_object.seqres_to_structure[auth_chain_id][res_index] | |
if not res_at_position.is_missing: | |
res = chain[(res_at_position.hetflag, | |
res_at_position.position.residue_number, | |
res_at_position.position.insertion_code)] | |
for atom in res.get_atoms(): | |
atom_name = atom.get_name() | |
x, y, z = atom.get_coord() | |
if atom_name in residue_constants.atom_order.keys(): | |
pos[residue_constants.atom_order[atom_name]] = [x, y, z] | |
mask[residue_constants.atom_order[atom_name]] = 1.0 | |
elif atom_name.upper() == 'SE' and res.get_resname() == 'MSE': | |
# Put the coordinates of the selenium atom in the sulphur column. | |
pos[residue_constants.atom_order['SD']] = [x, y, z] | |
mask[residue_constants.atom_order['SD']] = 1.0 | |
all_positions[res_index] = pos | |
all_positions_mask[res_index] = mask | |
_check_residue_distances( | |
all_positions, all_positions_mask, max_ca_ca_distance) | |
return all_positions, all_positions_mask | |
def _extract_template_features( | |
mmcif_object: mmcif_parsing.MmcifObject, | |
pdb_id: str, | |
mapping: Mapping[int, int], | |
template_sequence: str, | |
query_sequence: str, | |
template_chain_id: str, | |
kalign_binary_path: str) -> Tuple[Dict[str, Any], Optional[str]]: | |
"""Parses atom positions in the target structure and aligns with the query. | |
Atoms for each residue in the template structure are indexed to coincide | |
with their corresponding residue in the query sequence, according to the | |
alignment mapping provided. | |
Args: | |
mmcif_object: mmcif_parsing.MmcifObject representing the template. | |
pdb_id: PDB code for the template. | |
mapping: Dictionary mapping indices in the query sequence to indices in | |
the template sequence. | |
template_sequence: String describing the amino acid sequence for the | |
template protein. | |
query_sequence: String describing the amino acid sequence for the query | |
protein. | |
template_chain_id: String ID describing which chain in the structure proto | |
should be used. | |
kalign_binary_path: The path to a kalign executable used for template | |
realignment. | |
Returns: | |
A tuple with: | |
* A dictionary containing the extra features derived from the template | |
protein structure. | |
* A warning message if the hit was realigned to the actual mmCIF sequence. | |
Otherwise None. | |
Raises: | |
NoChainsError: If the mmcif object doesn't contain any chains. | |
SequenceNotInTemplateError: If the given chain id / sequence can't | |
be found in the mmcif object. | |
QueryToTemplateAlignError: If the actual template in the mmCIF file | |
can't be aligned to the query. | |
NoAtomDataInTemplateError: If the mmcif object doesn't contain | |
atom positions. | |
TemplateAtomMaskAllZerosError: If the mmcif object doesn't have any | |
unmasked residues. | |
""" | |
if mmcif_object is None or not mmcif_object.chain_to_seqres: | |
raise NoChainsError('No chains in PDB: %s_%s' % (pdb_id, template_chain_id)) | |
warning = None | |
try: | |
seqres, chain_id, mapping_offset = _find_template_in_pdb( | |
template_chain_id=template_chain_id, | |
template_sequence=template_sequence, | |
mmcif_object=mmcif_object) | |
except SequenceNotInTemplateError: | |
# If PDB70 contains a different version of the template, we use the sequence | |
# from the mmcif_object. | |
chain_id = template_chain_id | |
warning = ( | |
f'The exact sequence {template_sequence} was not found in ' | |
f'{pdb_id}_{chain_id}. Realigning the template to the actual sequence.') | |
logging.warning(warning) | |
# This throws an exception if it fails to realign the hit. | |
seqres, mapping = _realign_pdb_template_to_query( | |
old_template_sequence=template_sequence, | |
template_chain_id=template_chain_id, | |
mmcif_object=mmcif_object, | |
old_mapping=mapping, | |
kalign_binary_path=kalign_binary_path) | |
logging.info('Sequence in %s_%s: %s successfully realigned to %s', | |
pdb_id, chain_id, template_sequence, seqres) | |
# The template sequence changed. | |
template_sequence = seqres | |
# No mapping offset, the query is aligned to the actual sequence. | |
mapping_offset = 0 | |
try: | |
# Essentially set to infinity - we don't want to reject templates unless | |
# they're really really bad. | |
all_atom_positions, all_atom_mask = _get_atom_positions( | |
mmcif_object, chain_id, max_ca_ca_distance=150.0) | |
except (CaDistanceError, KeyError) as ex: | |
raise NoAtomDataInTemplateError( | |
'Could not get atom data (%s_%s): %s' % (pdb_id, chain_id, str(ex)) | |
) from ex | |
all_atom_positions = np.split(all_atom_positions, all_atom_positions.shape[0]) | |
all_atom_masks = np.split(all_atom_mask, all_atom_mask.shape[0]) | |
output_templates_sequence = [] | |
templates_all_atom_positions = [] | |
templates_all_atom_masks = [] | |
for _ in query_sequence: | |
# Residues in the query_sequence that are not in the template_sequence: | |
templates_all_atom_positions.append( | |
np.zeros((residue_constants.atom_type_num, 3))) | |
templates_all_atom_masks.append(np.zeros(residue_constants.atom_type_num)) | |
output_templates_sequence.append('-') | |
for k, v in mapping.items(): | |
template_index = v + mapping_offset | |
templates_all_atom_positions[k] = all_atom_positions[template_index][0] | |
templates_all_atom_masks[k] = all_atom_masks[template_index][0] | |
output_templates_sequence[k] = template_sequence[v] | |
# Alanine (AA with the lowest number of atoms) has 5 atoms (C, CA, CB, N, O). | |
if np.sum(templates_all_atom_masks) < 5: | |
raise TemplateAtomMaskAllZerosError( | |
'Template all atom mask was all zeros: %s_%s. Residue range: %d-%d' % | |
(pdb_id, chain_id, min(mapping.values()) + mapping_offset, | |
max(mapping.values()) + mapping_offset)) | |
output_templates_sequence = ''.join(output_templates_sequence) | |
templates_aatype = residue_constants.sequence_to_onehot( | |
output_templates_sequence, residue_constants.HHBLITS_AA_TO_ID) | |
return ( | |
{ | |
'template_all_atom_positions': np.array(templates_all_atom_positions), | |
'template_all_atom_masks': np.array(templates_all_atom_masks), | |
'template_sequence': output_templates_sequence.encode(), | |
'template_aatype': np.array(templates_aatype), | |
'template_domain_names': f'{pdb_id.lower()}_{chain_id}'.encode(), | |
}, | |
warning) | |
def _build_query_to_hit_index_mapping( | |
hit_query_sequence: str, | |
hit_sequence: str, | |
indices_hit: Sequence[int], | |
indices_query: Sequence[int], | |
original_query_sequence: str) -> Mapping[int, int]: | |
"""Gets mapping from indices in original query sequence to indices in the hit. | |
hit_query_sequence and hit_sequence are two aligned sequences containing gap | |
characters. hit_query_sequence contains only the part of the original query | |
sequence that matched the hit. When interpreting the indices from the .hhr, we | |
need to correct for this to recover a mapping from original query sequence to | |
the hit sequence. | |
Args: | |
hit_query_sequence: The portion of the query sequence that is in the .hhr | |
hit | |
hit_sequence: The portion of the hit sequence that is in the .hhr | |
indices_hit: The indices for each aminoacid relative to the hit sequence | |
indices_query: The indices for each aminoacid relative to the original query | |
sequence | |
original_query_sequence: String describing the original query sequence. | |
Returns: | |
Dictionary with indices in the original query sequence as keys and indices | |
in the hit sequence as values. | |
""" | |
# If the hit is empty (no aligned residues), return empty mapping | |
if not hit_query_sequence: | |
return {} | |
# Remove gaps and find the offset of hit.query relative to original query. | |
hhsearch_query_sequence = hit_query_sequence.replace('-', '') | |
hit_sequence = hit_sequence.replace('-', '') | |
hhsearch_query_offset = original_query_sequence.find(hhsearch_query_sequence) | |
# Index of -1 used for gap characters. Subtract the min index ignoring gaps. | |
min_idx = min(x for x in indices_hit if x > -1) | |
fixed_indices_hit = [ | |
x - min_idx if x > -1 else -1 for x in indices_hit | |
] | |
min_idx = min(x for x in indices_query if x > -1) | |
fixed_indices_query = [x - min_idx if x > -1 else -1 for x in indices_query] | |
# Zip the corrected indices, ignore case where both seqs have gap characters. | |
mapping = {} | |
for q_i, q_t in zip(fixed_indices_query, fixed_indices_hit): | |
if q_t != -1 and q_i != -1: | |
if (q_t >= len(hit_sequence) or | |
q_i + hhsearch_query_offset >= len(original_query_sequence)): | |
continue | |
mapping[q_i + hhsearch_query_offset] = q_t | |
return mapping | |
class SingleHitResult: | |
features: Optional[Mapping[str, Any]] | |
error: Optional[str] | |
warning: Optional[str] | |
def _process_single_hit( | |
query_sequence: str, | |
query_pdb_code: Optional[str], | |
hit: parsers.TemplateHit, | |
mmcif_dir: str, | |
max_template_date: datetime.datetime, | |
release_dates: Mapping[str, datetime.datetime], | |
obsolete_pdbs: Mapping[str, str], | |
kalign_binary_path: str, | |
strict_error_check: bool = False) -> SingleHitResult: | |
"""Tries to extract template features from a single HHSearch hit.""" | |
# Fail hard if we can't get the PDB ID and chain name from the hit. | |
hit_pdb_code, hit_chain_id = _get_pdb_id_and_chain(hit) | |
if hit_pdb_code not in release_dates: | |
if hit_pdb_code in obsolete_pdbs: | |
hit_pdb_code = obsolete_pdbs[hit_pdb_code] | |
# Pass hit_pdb_code since it might have changed due to the pdb being obsolete. | |
try: | |
_assess_hhsearch_hit( | |
hit=hit, | |
hit_pdb_code=hit_pdb_code, | |
query_sequence=query_sequence, | |
query_pdb_code=query_pdb_code, | |
release_dates=release_dates, | |
release_date_cutoff=max_template_date) | |
except PrefilterError as e: | |
msg = f'hit {hit_pdb_code}_{hit_chain_id} did not pass prefilter: {str(e)}' | |
logging.info('%s: %s', query_pdb_code, msg) | |
if strict_error_check and isinstance( | |
e, (DateError, PdbIdError, DuplicateError)): | |
# In strict mode we treat some prefilter cases as errors. | |
return SingleHitResult(features=None, error=msg, warning=None) | |
return SingleHitResult(features=None, error=None, warning=None) | |
mapping = _build_query_to_hit_index_mapping( | |
hit.query, hit.hit_sequence, hit.indices_hit, hit.indices_query, | |
query_sequence) | |
# The mapping is from the query to the actual hit sequence, so we need to | |
# remove gaps (which regardless have a missing confidence score). | |
template_sequence = hit.hit_sequence.replace('-', '') | |
cif_path = os.path.join(mmcif_dir, hit_pdb_code + '.cif') | |
logging.info('Reading PDB entry from %s. Query: %s, template: %s', | |
cif_path, query_sequence, template_sequence) | |
# Fail if we can't find the mmCIF file. | |
with open(cif_path, 'r') as cif_file: | |
cif_string = cif_file.read() | |
parsing_result = mmcif_parsing.parse( | |
file_id=hit_pdb_code, mmcif_string=cif_string) | |
if parsing_result.mmcif_object is not None: | |
hit_release_date = datetime.datetime.strptime( | |
parsing_result.mmcif_object.header['release_date'], '%Y-%m-%d') | |
if hit_release_date > max_template_date: | |
error = ('Template %s date (%s) > max template date (%s).' % | |
(hit_pdb_code, hit_release_date, max_template_date)) | |
if strict_error_check: | |
return SingleHitResult(features=None, error=error, warning=None) | |
else: | |
logging.warning(error) | |
return SingleHitResult(features=None, error=None, warning=None) | |
try: | |
features, realign_warning = _extract_template_features( | |
mmcif_object=parsing_result.mmcif_object, | |
pdb_id=hit_pdb_code, | |
mapping=mapping, | |
template_sequence=template_sequence, | |
query_sequence=query_sequence, | |
template_chain_id=hit_chain_id, | |
kalign_binary_path=kalign_binary_path) | |
features['template_sum_probs'] = [hit.sum_probs] | |
# It is possible there were some errors when parsing the other chains in the | |
# mmCIF file, but the template features for the chain we want were still | |
# computed. In such case the mmCIF parsing errors are not relevant. | |
return SingleHitResult( | |
features=features, error=None, warning=realign_warning) | |
except (NoChainsError, NoAtomDataInTemplateError, | |
TemplateAtomMaskAllZerosError) as e: | |
# These 3 errors indicate missing mmCIF experimental data rather than a | |
# problem with the template search, so turn them into warnings. | |
warning = ('%s_%s (sum_probs: %.2f, rank: %d): feature extracting errors: ' | |
'%s, mmCIF parsing errors: %s' | |
% (hit_pdb_code, hit_chain_id, hit.sum_probs, hit.index, | |
str(e), parsing_result.errors)) | |
if strict_error_check: | |
return SingleHitResult(features=None, error=warning, warning=None) | |
else: | |
return SingleHitResult(features=None, error=None, warning=warning) | |
except Error as e: | |
error = ('%s_%s (sum_probs: %.2f, rank: %d): feature extracting errors: ' | |
'%s, mmCIF parsing errors: %s' | |
% (hit_pdb_code, hit_chain_id, hit.sum_probs, hit.index, | |
str(e), parsing_result.errors)) | |
return SingleHitResult(features=None, error=error, warning=None) | |
class TemplateSearchResult: | |
features: Mapping[str, Any] | |
errors: Sequence[str] | |
warnings: Sequence[str] | |
class TemplateHitFeaturizer: | |
"""A class for turning hhr hits to template features.""" | |
def __init__( | |
self, | |
mmcif_dir: str, | |
max_template_date: str, | |
max_hits: int, | |
kalign_binary_path: str, | |
release_dates_path: Optional[str], | |
obsolete_pdbs_path: Optional[str], | |
strict_error_check: bool = False): | |
"""Initializes the Template Search. | |
Args: | |
mmcif_dir: Path to a directory with mmCIF structures. Once a template ID | |
is found by HHSearch, this directory is used to retrieve the template | |
data. | |
max_template_date: The maximum date permitted for template structures. No | |
template with date higher than this date will be returned. In ISO8601 | |
date format, YYYY-MM-DD. | |
max_hits: The maximum number of templates that will be returned. | |
kalign_binary_path: The path to a kalign executable used for template | |
realignment. | |
release_dates_path: An optional path to a file with a mapping from PDB IDs | |
to their release dates. Thanks to this we don't have to redundantly | |
parse mmCIF files to get that information. | |
obsolete_pdbs_path: An optional path to a file containing a mapping from | |
obsolete PDB IDs to the PDB IDs of their replacements. | |
strict_error_check: If True, then the following will be treated as errors: | |
* If any template date is after the max_template_date. | |
* If any template has identical PDB ID to the query. | |
* If any template is a duplicate of the query. | |
* Any feature computation errors. | |
""" | |
self._mmcif_dir = mmcif_dir | |
if not glob.glob(os.path.join(self._mmcif_dir, '*.cif')): | |
logging.error('Could not find CIFs in %s', self._mmcif_dir) | |
raise ValueError(f'Could not find CIFs in {self._mmcif_dir}') | |
try: | |
self._max_template_date = datetime.datetime.strptime( | |
max_template_date, '%Y-%m-%d') | |
except ValueError: | |
raise ValueError( | |
'max_template_date must be set and have format YYYY-MM-DD.') | |
self._max_hits = max_hits | |
self._kalign_binary_path = kalign_binary_path | |
self._strict_error_check = strict_error_check | |
if release_dates_path: | |
logging.info('Using precomputed release dates %s.', release_dates_path) | |
self._release_dates = _parse_release_dates(release_dates_path) | |
else: | |
self._release_dates = {} | |
if obsolete_pdbs_path: | |
logging.info('Using precomputed obsolete pdbs %s.', obsolete_pdbs_path) | |
self._obsolete_pdbs = _parse_obsolete(obsolete_pdbs_path) | |
else: | |
self._obsolete_pdbs = {} | |
def get_templates( | |
self, | |
query_sequence: str, | |
query_pdb_code: Optional[str], | |
query_release_date: Optional[datetime.datetime], | |
hits: Sequence[parsers.TemplateHit]) -> TemplateSearchResult: | |
"""Computes the templates for given query sequence (more details above).""" | |
logging.info('Searching for template for: %s', query_pdb_code) | |
template_features = {} | |
for template_feature_name in TEMPLATE_FEATURES: | |
template_features[template_feature_name] = [] | |
# Always use a max_template_date. Set to query_release_date minus 60 days | |
# if that's earlier. | |
template_cutoff_date = self._max_template_date | |
if query_release_date: | |
delta = datetime.timedelta(days=60) | |
if query_release_date - delta < template_cutoff_date: | |
template_cutoff_date = query_release_date - delta | |
assert template_cutoff_date < query_release_date | |
assert template_cutoff_date <= self._max_template_date | |
num_hits = 0 | |
errors = [] | |
warnings = [] | |
for hit in sorted(hits, key=lambda x: x.sum_probs, reverse=True): | |
# We got all the templates we wanted, stop processing hits. | |
if num_hits >= self._max_hits: | |
break | |
result = _process_single_hit( | |
query_sequence=query_sequence, | |
query_pdb_code=query_pdb_code, | |
hit=hit, | |
mmcif_dir=self._mmcif_dir, | |
max_template_date=template_cutoff_date, | |
release_dates=self._release_dates, | |
obsolete_pdbs=self._obsolete_pdbs, | |
strict_error_check=self._strict_error_check, | |
kalign_binary_path=self._kalign_binary_path) | |
if result.error: | |
errors.append(result.error) | |
# There could be an error even if there are some results, e.g. thrown by | |
# other unparsable chains in the same mmCIF file. | |
if result.warning: | |
warnings.append(result.warning) | |
if result.features is None: | |
logging.info('Skipped invalid hit %s, error: %s, warning: %s', | |
hit.name, result.error, result.warning) | |
else: | |
# Increment the hit counter, since we got features out of this hit. | |
num_hits += 1 | |
for k in template_features: | |
template_features[k].append(result.features[k]) | |
for name in template_features: | |
if num_hits > 0: | |
template_features[name] = np.stack( | |
template_features[name], axis=0).astype(TEMPLATE_FEATURES[name]) | |
else: | |
# Make sure the feature has correct dtype even if empty. | |
template_features[name] = np.array([], dtype=TEMPLATE_FEATURES[name]) | |
return TemplateSearchResult( | |
features=template_features, errors=errors, warnings=warnings) | |