"""Base module for package/software wrapping for AlloViz
Main :class:`~AlloViz.Wrappers.Base.Base` class stores the information about the
:class:`~AlloViz.Protein` object and defines the general private methods that most
child classes use to launch, manage and store the network calculations. It exploits the
'__new__' special method to establish the object's attributes and that can be extended in
the child classes and '__init__' is used to launch calculations, which can be extended or
overriden by child classes.
In this module there are additional classes that override or extend
:class:`~AlloViz.Wrappers.Base.Base`'s private methods and that are used to be combined
to create child classes with multiple inheritance for specific uses.
"""
import os, time
import pandas
import numpy as np
from contextlib import redirect_stdout, redirect_stderr
from importlib import import_module
from lazyasd import LazyObject
from ..AlloViz.Filtering import Filtering
from ..AlloViz.Elements import Edges
from ..AlloViz.utils import get_pool, rgetattr, rhasattr
from ..AlloViz.info import citations
from ..AlloViz import utils
[docs]
def lazy_import(key, val):
"""Return a lazily-imported module to store in a variable
It uses `lazyasd <https://github.com/xonsh/lazyasd>`_.
Parameters
----------
key : str
The variable name with which the module is going to be accessed. In
`import numpy as np` "key" would be `np`.
val : str
Absolute import of the module.
"""
# If the import is inside AlloViz's 'Packages' folder (src/Packages in the repo and AlloViz/Packages in the installation), this extra "package" argument is needed
extra_arg = {"package": 'AlloViz'} if 'Packages' in val else {}
# Return the LazyObject that is going to be stored in a variable.
return LazyObject(lambda: import_module(val, **extra_arg), globals(), key)
[docs]
class Base:
"""Base class for network calculation
This class uses the '__new__' special method to establish the object's attributes,
which can be extended and/or modified in the child classes' '__new__'. The '__init__'
special method is then used to launch calculations, and it can also be extended or
overriden by child classes.
It also defines the private methods :meth:`~AlloViz.Wrappers.Base.Base._calculate` to
launch calculations, which exploits the child classes' custom '_computation' private
method so that the different packages can be run with them using the standardized
information stored in the objects attributes; and
:meth:`~AlloViz.Wrappers.Base.Base._save_pq` to save the results of the calculations
in parquet format. Both can also be extended or overriden by child classes.
:meth:`~AlloViz.Wrappers.Base.Base.filter` allows to filter the calculated raw edges
for posterior analysis.
Parameters
----------
protein : :class:`~AlloViz.Protein` object
Protein object from which to extract the information for calculation.
d : dict
Dictionary containing all the elements of the protein's `__dict__` and also any
keyword arguments passed for calculation. This is needed for successful
parallelization, as pickling objects of Base's child classes means pickling the
Protein object and the attributes that are added to it during its `__init__` are
lost.
"""
def __new__(cls, protein, d):
new = super().__new__(cls)
new._name = new.__class__.__name__
new.protein = protein
new._d = d
new._pdbf = d["_pdbf"]
new._trajs = d["_trajs"]
new._path = f"{d['_datadir']}/{new._name}/raw"
os.makedirs(new._path, exist_ok=True)
new._rawpq = lambda xtc: f"{new._path}/{xtc if isinstance(xtc, int) else xtc.rsplit('.', 1)[0]}.pq"
return new
def __getnewargs__(self):
# This is necessary for successful parallelization/pickling
return self.protein, self._d
def __init__(self, *args):
# Define the list of .pq files that we expect are going to be saved (or be retrieved) and a function to check which of them already exist
pqs = [self._rawpq(xtc) for xtc in self._trajs]
no_exist = lambda pqs: [not os.path.isfile(pq) for pq in pqs]
# If any of the .pq files don't exist, send the calculation for those that don't; then continue to read the saved data
if any(no_exist(pqs)):
for xtc in (xtc for xtc in self._trajs if no_exist(pqs)[xtc-1]):
self._calculate(xtc)
# Function to wait for the calculations to finish in the background; returns the .pq files to be read and added as attributes when they do
def wait_calculate(pqs):
while any(no_exist(pqs)):
time.sleep(5)
return pqs
# Function to read the newly calculated (or retrieved) data files, concatenate them, calculate averages and std if necessary and return it to be added as raw data attribute
def get_raw(pqs):
print(f"adding raw data of {self._name} for {self._pdbf}: ", pqs)
# List of the corresponding .pq files to concatenate
flist = map(lambda pq: pandas.read_parquet(pq), pqs)
df = pandas.concat(flist, axis=1)
# Perform min-max normalization (to range 0-1) on data coming from AlloViz or MDEntropy related methods (their MI values are low) or PyInteraph2_Atomic_Contacts_Strength
if "AlloViz" in self._name or "MDEntropy" in self._name or "PyInteraph2_Atomic_Contacts_Strength" in self._name:
df = (df-df.min())/(df.max()-df.min())
# If there are more than 1 trajectory, calculate the average and standard error of the trajectories' raw data
if len(self._trajs) > 1:
cols = [f"{num}" for num in self._trajs]
df["weight"] = df[cols].fillna(0).mean(axis=1)
df["weight_std"] = df[cols].fillna(0).std(axis=1)
else:
# If there is only 1 trajectory, simply use its raw data as the "weight" variable
df.rename(columns={"1": "weight"}, inplace=True)
return Edges(df, parent=self.protein)
# Function to call get_raw to read and process the raw data and add it as the "raw" attribute to self
add_raw = lambda pqs: setattr(self, "raw", get_raw(pqs))
# Wait asynchronously for analysis to end and then add the data calling add_raw
get_pool().apply_async(wait_calculate,
args=(pqs,),
callback=add_raw)
pkg = [name for name in citations if name in self.__class__.__name__]
if len(pkg)==1 and "AlloViz" not in pkg:
print(f"Please, make sure to correctly cite the package used to compute the network: {pkg[0]} ({citations[pkg[0]]})")
[docs]
def _calculate(self, xtc, *args):
"""Send the calculation for a single trajectory file
Send the calculation with the specific class' `_computation` private method and
capture the standard output and standard error of the calculation into a log
file. The callback function to which the reuslts of the computation are passed is
:meth:`~AlloViz.Wrappers.Base.Base._save_pq` to save the returned calculated
results. \*args parameter is not used.
Parameters
----------
xtc : int
Trajectory number from the processed trajectory dictionary of the Protein
object.
"""
# Function to send the _computation and capture stdout and stderr to a log file
def send_and_log(xtc, *args):
with open(f"{self._path}/{self._name}.log", "a+") as f:
with redirect_stdout(f), redirect_stderr(f):
return self._computation(xtc, *args)
# Send the computation of the trajectory file to the pool with _save_pq as callback
get_pool().apply_async(send_and_log,
args=(xtc, *args),
callback=self._save_pq)
[docs]
def _save_pq(self, args):
"""Save the calculation results in a parquet file
It is used as callback function after the calculation with a specific class'
`_computation` private method to save the calculated data of a single trajectory
file in tabular format in a parquet (.pq) file.
Parameters
----------
args : list
List of elements returned by the `_computation` function. It will always
contain a `corr` matrix (numpy.ndarray) and the `xtc` trajectory number. It
may also include a residue list `resl` with the residue indices out of all
the residues in the Protein object for which values were calculated. This is
used, e.g., for calculations involving dihedral angles, in which residues in
the extremes will not have some dihedrals and thus values won't be calculated
for them.
"""
corr, xtc, *resl = args
# If resl is passed, check that corr has the appropriate shape or select the values using resl if not
if len(resl) != 0:
resl = resl[0]
if corr.shape != (len(resl), len(resl)):
corr = corr[np.ix_(resl, resl)]
# Else, simply make resl a slice that will select data of the same size as corr (i.e., all data)
elif len(resl) == 0:
resl = slice(0, corr.shape[0])
# Make a list of residue names selecting the appropriate residues with resl
resnames = [f"{aa.resname}:{aa.resid}" for aa in self._d["protein"].residues[resl]]
# Transform the square, symmetric corr matrix into a DataFrame and select only the upper triangle (without the diagonal: k=1) and stack it into a tabular format for saving
df = pandas.DataFrame(corr, columns=resnames, index=resnames)
df = df.where( np.triu(np.ones(df.shape), k=1).astype(bool) )
df = pandas.DataFrame({f"{xtc}": df.stack()})
# if not len(df[f"{xtc}"].unique()) == 1:
df.to_parquet(self._rawpq(xtc))
[docs]
def filter(self, filterings="All", **kwargs):
r"""Filter network edges
Filter the networks according to the selected criteria to perform analyses on
(all or) a subset of the edges. It calls
:meth:`AlloViz.Wrappers.Base.Base.filter` and results are stored in instances
of the :class:`AlloViz.AlloViz.Filtering.Filtering` class. The different filtering
options are detailed in the :mod:`~AlloViz.AlloViz.Filtering` module.
Parameters
----------
filterings : str or list of strs and/or lists, default: "all"
Filtering scheme(s) with which to filter the list of network edges before
analysis. It can be a string, or a list of strings and/or lists: a list of
lists (also with or without strings) is used to filter with a combination of
criteria. All available (and combinable) filtering options are functions in
the :mod:`~AlloViz.AlloViz.Filtering` module:
:func:`~AlloViz.AlloViz.Filtering.All`,
:func:`~AlloViz.AlloViz.Filtering.GetContacts_edges`,
:func:`~AlloViz.AlloViz.Filtering.No_Sequence_Neighbors`,
:func:`~AlloViz.AlloViz.Filtering.GPCR_Interhelix`. The default "all"
performs all the available filtering schemes (no combinations).
Other Parameters
----------------
GetContacts_threshold : float
Optional kwarg that can be passed to specify the minimum contact frequency
(0-1, default 0) threshold, which will be used to filter out contacts with a
frequency (average) lower than it before analysis.
Sequence_Neighbor_distance : int
Optional kwarg that can be passed to specify the minimum number of sequence
positions/distance between residues of a pair to retain in
`No_Sequence_Neighbors` filtering, which defaults to 5.
Interresidue_distance : int or float
Optional kwarg that can be passed to specify the minimum number of angstroms
that the CA atoms of residue pairs should have between each other in the initial
PDB/structure (default 10 Å) to be considered spatially distant.
See Also
--------
AlloViz.AlloViz.Filtering.Filtering : Filtering class.
AlloViz.Protein.calculate : Class method to calculate the network(s) raw edge
weights with different network construction methods.
AlloViz.Protein.analyze : Class method to analyze the calculated raw edge weights
with graph theory-based methods.
AlloViz.Protein.view : Class method to visualize the network on the protein
structure.
Examples
--------
>>> opioidGPCR = AlloViz.Protein(GPCR=169)
>>> opioidGPCR.calculate(["getcontacts", "dynetan"], cores=6, taskcpus=2)
>>> opioidGPCR.dynetan.filter(["GetContacts_edges", ["GetContacts_edges", "GPCR_Interhelix"]])
>>> opioidGPCR.dynetan.GetContacts_edges_GPCR_Interhelix
<AlloViz.AlloViz.Filtering.Filtering at 0x7f892c3c0fa0>
"""
for filt in filterings:
# Name used to store as attribute will be that of the filtering scheme chosen or the combination's names joined by "_"
name = filt if isinstance(filt, str) else "_".join(filt)
if not rgetattr(self, name):
setattr(
self,
name,
Filtering(self, filt, name, **kwargs),
)
return rgetattr(self, name) if len(filterings) == 1 else None
# class Use_COM(Base):
# """Class for using the COM's structure file and trajectories
# Classes that inherit this class use the residues' COM structure and trajectory(ies)
# files for calculations instead of the whole protein's.
# """
# def __new__(cls, protein, d):
# new = super().__new__(cls, protein, d)
# new._pdbf = new._d["_compdbf"]
# new._trajs = new._d["_comtrajs"]
# return new
[docs]
class Multicore(Base):
"""Class for multi-core packages calculations
This class defines additional attributes 'taskcpus' and '_empties' in '__new__' for
packages that are able to perform multi-core calculations by themselves (besides
AlloViz's parallelization of the calculations for each trajectory file). 'taskcpus'
is the number of cores the package should use per trajectory and '_empties' the
number of empty jobs that should be sent to the same Pool that the task/_computation
is being sent to to "occupy" the number of cores that the package is going to use in
reality, as sending it to the Pool would only take 1 of its workers. It extends the
:meth:`~AlloViz.Wrappers.Base.Multicore._calculate` private method to do it.
"""
def __new__(cls, protein, d):
new = super().__new__(cls, protein, d)
# If taskcpus is not specified, set it to half of the available cores in the system
if "taskcpus" not in new._d:
new.taskcpus = int(np.ceil(os.cpu_count()/2))
else:
new.taskcpus = new._d["taskcpus"]
# The actual _computation job will only occupy 1 worker of the Pool, so _empties should be the rest of desired taskcpus
new._empties = new.taskcpus-1
return new
[docs]
def _calculate(self, xtc, *args):
"""Extend the function to send empty jobs and finish them when the main job saves the data
"""
super()._calculate(xtc, *args)
def calculate_empty(pq):
# print("sleeping", pq, os.getpid())
while not os.path.isfile(pq):
time.sleep(5)
return
for _ in range(self._empties): get_pool().apply_async(calculate_empty, args=(self._rawpq(xtc),))
[docs]
class Combined_Dihs(Base):
"""Class for combination of dihedral angle data
This class' child classes are used to combine the information from multiple dihedral
angles by overriding the `_calculate` and `_save_pq` private methods. It checks
that the calculations of the desired dihedral angles for the current trajectory
number (`xtc`) are available, or else raises an error, and saves the data with the
child class' `_save_pq` specific private method.
"""
[docs]
def _calculate(self, xtc):
# Child classes' names are: correlationplus_Backbone_Dihs_Avg, AlloViz_Sidechain_Dihs_Max, etc...
pkg = self._name.rsplit("_", 2)[0] if ("Sidechain" in self._name or "Backbone" in self._name) else self._name.rsplit("_", 1)[0]
# If any of the dihedral calculations don't exist, raise error
attrs_exist = {f"{pkg}_{Dih}": rhasattr(self, "protein", f"{pkg}_{Dih}") for Dih in self._dihs}
if any([not attr_exist for attr_exist in attrs_exist.values()]):
raise Exception(
f"Individual dihedrals calculations are needed first: {attrs_exist}"
)
# Function to get the name of the files that we aim to retrieve
get_rawpq = lambda Dih: rgetattr(self, "protein", f"{pkg}_{Dih}", "_rawpq")(xtc)
pqs = [get_rawpq(Dih) for Dih in self._dihs]
self._save_pq(pqs, xtc)
[docs]
class Combined_Dihs_Avg(Combined_Dihs):
"""Class for combination of dihedral angle data by averaging
This class' child classes are used to combine the information from multiple dihedral
angles by averaging, with its specific
:meth:`~AlloViz.Wrappers.Base.Combined_Dihs_Avg._save_pq` private method.
"""
[docs]
def _save_pq(self, pqs, xtc):
# Make a list of the trajectories' dihedrals computations results (absolute numbers)
dfs = [pandas.read_parquet(pq)[f"{xtc}"].abs() for pq in pqs]
# Concatenate them, drop edges for which none of the columns have a value (sanity check) and calculate the rowwise means
avgs = pandas.concat(dfs, axis=1).dropna(how="all").mean(axis=1)
# Save the averages as the final data
pandas.DataFrame({f"{xtc}": avgs}).to_parquet(self._rawpq(xtc))
# class Combined_Dihs_Max(Combined_Dihs):
# """Class for combination of dihedral angle data by taking the maximum value
# This class' child classes are used to combine the information from multiple dihedral
# angles by taking for each edge the maximum edge weight of all the dihedral networks
# that are combined, with its specific
# :meth:`~AlloViz.Wrappers.Base.Combined_Dihs_Max._save_pq` private method.
# """
# def _save_pq(self, pqs, xtc):
# # Make a list of the trajectories' dihedrals computations results (absolute numbers)
# dfs = [pandas.read_parquet(pq)[f"{xtc}"].abs() for pq in pqs]
# # Concatenate them, drop edges for which none of the columns have a value (sanity check) and retrieve the rowwise max values
# maxs = pandas.concat(dfs, axis=1).dropna(how="all").max(axis=1)
# # Save the max values as the final data
# pandas.DataFrame({f"{xtc}": maxs}).to_parquet(self._rawpq(xtc))