Source code for AlloViz.AlloViz.Analysis

"""Module with functions to analyze filtered networks

Main function :func:`~AlloViz.AlloViz.Analysis.analyze` manages the analysis of filtered
networks. It calls :func:`~AlloViz.AlloViz.Analysis.single_analysis` for the analysis of
single element-metric combinations and uses the NetworkX's functions defined in
:data:`~AlloViz.AlloViz.Analysis.nodes_dict` and
:data:`~AlloViz.AlloViz.Analysis.edges_dict`.

"""

import os
import sys
import time

import pandas
import numpy as np
import networkx
from concurrent.futures import ProcessPoolExecutor as Pool
from functools import partial

from . import Elements
from . import utils
from .utils import rgetattr, rhasattr



nodes_dict = {
    "btw": "networkx.algorithms.centrality.betweenness_centrality",
    "cfb": "networkx.algorithms.centrality.current_flow_betweenness_centrality",
}
"""
Dictionary that maps nodes network metrics custom names (e.g., betweenness centrality,
"btw") with their corresponding NetworkX function (e.g., 
"networkx.algorithms.centrality.betweenness_centrality").
"""

edges_dict = {
    "btw": "networkx.algorithms.centrality.edge_betweenness_centrality",
    "cfb": "networkx.algorithms.centrality.edge_current_flow_betweenness_centrality",
}
"""
Dictionary that maps edges network metrics custom names (e.g., betweenness centrality,
"btw") with their corresponding NetworkX function (e.g., 
"networkx.algorithms.centrality.edge_betweenness_centrality").
"""


[docs] def analyze_graph(args): r"""Analyze a graph/column from raw filtered data with an element-metric Analyze a stored, filtered raw data column with the passed combination of element-metric/NetworkX' analysis function and return the results. Parameters ---------- graph : :external:ref:`Graph <graph>` object Single column/graph object to analyze. metricf : str NetworkX function to analyze data. It must be written as if it were an absolute import (e.g., "networkx.algorithms.centrality.betweenness_centrality"). colname : str Name of the analyzed column that it will have in the final DataFrame for saving. """ graph, metricf, colname = args nodes = {} # Temporary fix for future use of source-sink network analyses # Try to apply the NetworkX's analysis function to the selected Graph try: analyzed = metricf( graph, weight="weight", **nodes, ) # If it throws an error, return False to print it along with the performed analysis' information and create fake data with all 0s except Exception as e: return e # Sort the result's indices if they are a MultiIndex (edges), so that each pair has the residue with the lowest resnum as the first element of the tuple sort_index = lambda result: { tuple(sorted(k, key=lambda x: int(x.split(":")[-1]))): result[k] for k in result } result = sort_index(analyzed) if len(list(analyzed.keys())[0]) == 2 else analyzed # Return a series return pandas.Series(result, name=colname)
[docs] def single_analysis(graphs, metricf, metric, elem, pq, **kwargs): r"""Analyze raw data with a single element-metric Analyze stored, filtered raw data with the passed combination of element-metric/NetworkX' analysis function and save the results. Parameters ---------- graphs : dict of external:ref:`Graph <graph>` objects Graphs to analyze. metricf : str NetworkX function to analyze data. It must be written as if it were an absolute import (e.g., "networkx.algorithms.centrality.betweenness_centrality"). metric : str Network metric to compute, which must be a key in the `nodes_dict` or `edges_dict` dictionaries. elem : str or list, {"edges", "nodes"} Network element for which the analysis is performed. pq : str Name of the parquet (.pq) file in which to save the analysis results. Other Parameters ---------------- **kwargs Other optional keyword arguments that will be passed to the NetworkX analysis function(s) that is(are) used on the method call in case they need extra parameters. """ # Process the NetworkX's module-function that will be used for analysis and import it into the metricf variable metricf = partial(eval(metricf), **kwargs) # Function to gget the columns' new names (e.g., if there is more than 1 trajectory, there will be "weight", "metric_weight", "metric_1", "metric_"...) get_colname = lambda metric, col: f"{metric}_{col}" if len(graphs) > 1 else metric # Analyze all columns in parallel, returning a Series for each (or False if it couldn't be analyzed) with Pool(len(graphs)) as p: args = [(graph, metricf, get_colname(metric, col)) for col, graph in graphs.items()] results = list(p.map(analyze_graph, args)) p.shutdown() # Check if any of the analyses failed and print the information fails = [(args[i][-1], results.pop(i)) for i in range(len(results)) if not isinstance(results[i], pandas.Series)] if len(fails) > 0: for f in fails: print( "ERROR:", pq, f[0], elem, metric, "\n", f[-1], ) pandas.concat(results, axis=1).to_parquet(pq)
# Function to wait for the analyses to finish in the background; returns the data to be added as attributes when they do
[docs] def wait_analyze(pqs): while any([not os.path.isfile(pq) for pq in pqs]): time.sleep(5) return pqs
# Function to add the newly calculated (or retrieved) data to the Element's attribute after analysis (wait_analyze) finishes and passes the data
[docs] def add_data(pqs, elem, data, filtered): print( f"adding analyzed {elem} {filtered._pkg} {filtered._name} data of for {filtered._pkg.protein._pdbf}" ) sys.stdout.flush() for pq in pqs: # Retrieve the metric name from the .pq filename and read it into a DataFrame metric = pq.rsplit("/", 1)[-1].split("_", 1)[-1].split(".")[0] df = pandas.read_parquet(pq) # If there are more than 1 trajectory, calculate the metric average and standard error from the trajectories' analyzed data if len(filtered._pkg.protein._trajs) > 1: cols = [f"{metric}_{num}" for num in filtered._pkg.protein._trajs if f"{metric}_{num}" in df.columns] df[f"{metric}"] = df[cols].fillna(0).mean(axis=1) # .fillna(0) these methods aren't affected by NAs;;;;;;;;;;; .dropna(how="all") df[f"{metric}_std"] = df[cols].fillna(0).std(axis=1) # .fillna(0) out = df data = pandas.concat([data, out], axis=1) # Retrieve the Element's class from the Visualization module and (re-)set it with the data elemclass = eval(f"Elements.{elem.capitalize()}") setattr(filtered, elem, elemclass(data, parent=filtered._pkg.protein))
# getattr(filtered, elem)._parent = filtered._pkg.protein
[docs] def analyze(filtered, elements, metrics, nodes_dict, edges_dict, **kwargs): r"""Analyze the filtered network Send the analyses of the passed filtered network for the specified combinations of elements-metrics. Each combination is analyzed independently with :func:`~AlloViz.AlloViz.Analysis.single_analysis` using NetworkX' functions and results are stored as new instances of classes from the :mod:`AlloViz.AlloViz.Elements` module, which extend the :class:`pandas.DataFrame` class. Parameters ---------- filtered : :class:`~AlloViz.AlloViz.Filtering.Filtering` object Filtered network object. elements : str or list, {"edges", "nodes"} Network element for which to perform the analysis. metrics : str or list Network metrics to compute, which must be keys in the `nodes_dict` or `edges_dict` dictionaries. Other Parameters ---------------- nodes_dict, edges_dict : dict Optional kwarg(s) of the dictionary(ies) that maps network metrics custom names (e.g., betweenness centrality, "btw") with their corresponding NetworkX function (e.g., "networkx.algorithms.centrality.betweenness_centrality"). Functions strings must be written as if they were absolute imports, and must return a dictionary of edges or nodes, depending on the element dictionary in which they are. The keys of the dictionaries will be used to name the columns of the analyzed data that the functions produce. Defaults are :data:`~AlloViz.AlloViz.Analysis.nodes_dict` and :data:`~AlloViz.AlloViz.Analysis.edges_dict`. **kwargs Other optional keyword arguments that will be passed to the NetworkX analysis function(s) that is(are) used on the method call in case they need extra parameters. """ elements = elements if isinstance(elements, list) else [elements] for elem in elements: # If the Element's attribute doesn't exist yet, use as initial data the raw edge weights from the filtered data (or an empty DF for nodes) if not rhasattr(filtered, elem): if elem == "edges": # cols = ["weight" in col for col in filtered._filtdata.columns] data = filtered._graph_distances #.loc[:, cols] elif elem == "nodes": data = pandas.DataFrame() # Else, retrieve the Element's attribute DataFrame to add columns to it else: data = rgetattr(filtered, elem) # # Retrieve the element's dictionary and if necessary update it with the one passed in kwargs d = eval(f"{elem}_dict").copy() # if f"{elem}_dict" in kwargs: # d.update(kwargs[f"{elem}_dict"]) # Create a list of the desired metrics to calculate metrics = utils.make_list(metrics, if_all = list(d.keys())) # Create a list of the metrics that (i) have been passed, (ii) are also present in the element's dictionary and (iii) aren't already in the Element's attribute elem_metrics = [ metric for metric in metrics if (metric in d and metric not in data) ] # Define the list of .pq files that we expect are going to be saved (or be retrieved) and a function to check which of them already exist pqs = lambda elem: [filtered._datapq(elem, metric) for metric in elem_metrics] no_exist = lambda pqs: [not os.path.isfile(pq) for pq in pqs] # If any of the .pq files don't exist, send the analysis calculations for them if any(no_exist(pqs(elem))): for metric in ( metric for metric in elem_metrics if no_exist(pqs(elem))[pqs(elem).index(filtered._datapq(elem, metric))] ): utils.get_pool().apply_async( partial(single_analysis, **kwargs), args=( filtered.graphs, d[metric], metric, elem, filtered._datapq(elem, metric) ), ) # Wait asynchronously for analysis to end and then add the data utils.get_pool().apply_async( wait_analyze, args=(pqs(elem),), callback=partial(add_data, elem=elem, data=data, filtered=filtered) ) return