Source code for transmission_models.classes.didelot_unsampled

import json
from itertools import combinations
from math import factorial

from transmission_models import *
from transmission_models import utils
from transmission_models.classes.host import *
# from transmission_models.utils import tree_to_newick
from transmission_models.utils.topology_movements import *

from random import random, randint,choice
from scipy.stats import nbinom, gamma, binom, expon, norm
from scipy.special import gamma as GAMMA
import networkx as nx
from networkx.exception import NetworkXError

from transmission_models.classes.genetic_prior import genetic_prior_tree
from transmission_models.classes.location_prior import same_location_prior_tree


# from ..utils import tree_to_newick


[docs]class didelot_unsampled(): """ Didelot unsampled transmission model. This class implements the Didelot et al. (2017) framework for transmission tree inference with unsampled hosts. It provides methods for building transmission networks, computing likelihoods, and performing MCMC sampling. The model incorporates three main components: 1. Sampling model: Gamma distribution for sampling times 2. Offspring model: Negative binomial distribution for offspring number 3. Infection model: Gamma distribution for infection times Parameters ---------- sampling_params : dict Parameters for the sampling model containing: - pi : float, sampling probability - k_samp : float, shape parameter for gamma distribution - theta_samp : float, scale parameter for gamma distribution offspring_params : dict Parameters for the offspring model containing: - r : float, rate of infection - p_inf : float, probability of infection infection_params : dict Parameters for the infection model containing: - k_inf : float, shape parameter for gamma distribution - theta_inf : float, scale parameter for gamma distribution Attributes ---------- T : networkx.DiGraph The transmission tree. host_dict : dict Dictionary mapping host IDs to host objects. log_likelihood : float Current log likelihood of the model. genetic_prior : genetic_prior_tree, optional Prior for genetic data. same_location_prior : same_location_prior_tree, optional Prior for location data. References ---------- Didelot, X., Gardy, J., & Colijn, C. (2017). Bayesian inference of transmission chains using timing of events, contact and genetic data. PLoS computational biology, 13(4), e1005496. """
[docs] def __init__(self, sampling_params, offspring_params, infection_params, T=None): """ Initialize the Didelot unsampled transmission model. Parameters ---------- sampling_params : dict Parameters for the sampling model containing: - pi : float, sampling probability - k_samp : float, shape parameter for gamma distribution - theta_samp : float, scale parameter for gamma distribution offspring_params : dict Parameters for the offspring model containing: - r : float, rate of infection - p_inf : float, probability of infection infection_params : dict Parameters for the infection model containing: - k_inf : float, shape parameter for gamma distribution - theta_inf : float, scale parameter for gamma distribution T : networkx.DiGraph, optional The transmission tree. If provided, the model will be initialized with this tree. Default is None. Raises ------ KeyError If any required parameter is missing from the input dictionaries. """ # Reading parameters try: self.pi = sampling_params["pi"] self.k_samp = sampling_params["k_samp"] self.theta_samp = sampling_params["theta_samp"] except KeyError as e: raise try: self.r = offspring_params["r"] # rate of infection self.p_inf = offspring_params["p_inf"] # probability of infection self.R = self.r * (1 - self.p_inf) / self.p_inf # Reproduction number except KeyError as e: raise try: self.k_inf = infection_params["k_inf"] # self.theta_inf = infection_params["theta_inf"] except KeyError as e: raise # Distributions functions of the models self.dist_sampling = gamma(a=self.k_samp, scale=self.theta_samp) self.pdf_sampling = lambda t: self.dist_sampling.pdf(t) self.samp_sampling = lambda: self.dist_sampling.rvs() self.dist_infection = gamma(a=self.k_inf, scale=self.theta_inf) self.pdf_infection = lambda t: self.dist_infection.pdf(t) self.samp_infection = lambda: self.dist_infection.rvs() self.dist_offspring = nbinom(self.r, self.p_inf) self.pmf_offspring = lambda k: self.dist_offspring.pmf(k) self.samp_offspring = lambda: self.dist_offspring.rvs() #Distribution in between self.pdf_infection_in_between = lambda t,Dt: (pdf_in_between(self,Dt,t)) self._T = T self.G = nx.DiGraph() self.host_dict = {} self.log_likelihood = 0 self.likelihood = 0 self.infection_likelihood = 0 self.sampling_likelihood = 0 self.offspring_likelihood = 0 self.infection_log_likelihood = 0 self.sampling_log_likelihood = 0 self.offspring_log_likelihood = 0 self.newick = None self.root_host = None self.unsampled_hosts = [] self.N_candidates_to_chain = 0 self.N_candidates_to_chain_old = 0 self.candidates_to_chain = [] self.roots_subtrees = [] self.genetic_log_prior = 0 self.genetic_prior = None self.same_location_log_prior = 0 self.same_location_prior = None self.log_posterior = 0 self.Delta_crit = 4/((1-self.pi)*self.pmf_offspring(1)*((self.theta_inf)**(-self.k_inf))/(GAMMA(self.k_inf)))**(1/(self.k_inf-1))
# def generate_networks(self): @property def T(self): return self._T @T.setter def T(self, value): self._T = value if self.T is None: return return value
[docs] def set_T(self, T): if T is None: raise ValueError("T is None!!") self.T = T # Find root host roots = [h for h in self.T if self.T.in_degree(h) == 0] if roots: self.root_host = roots[0] # Get unsampled hosts self.unsampled_hosts = self.get_unsampled_hosts() # Get candidates to chain self.candidates_to_chain = self.get_candidates_to_chain()
[docs] def samp_t_inf_between(self, h1, h2): """ Sample a time of infection between two hosts. Uses a rejection sampling method to sample the time of infection of the infected host using the chain model from Didelot et al. 2017. Parameters ---------- h1 : host Infector host. h2 : host Infected host. Returns ------- float Time of infection of the host infected by h1 and the infector of h2. Notes ----- This method implements the rejection sampling algorithm described in Didelot et al. (2017) for sampling infection times in transmission chains. """ # Dt = abs(h1 - h2) Dt = h2.t_inf-h1.t_inf return sample_in_between(self,Dt)
[docs] def add_root(self, t_sampl, id="0", genetic_data=[], t_inf=0, t_sample=None): """ Add the root host to the transmission tree. Parameters ---------- t_sampl : float Sampling time of the root host. id : str, optional Identifier for the root host. Default is "0". genetic_data : list, optional Genetic data for the root host. Default is empty list. t_inf : float, optional Infection time of the root host. Default is 0. t_sample : float, optional Sampling time of the root host. Default is None. Returns ------- host The root host object. """ self.root_host = host(id, 0, genetic_data, t_inf, t_sampl) return self.root_host
[docs] def successors(self, host): """ Get the successors (children) of a given host in the transmission tree. Parameters ---------- host : host The host node whose successors are to be returned. Returns ------- iterator An iterator over the successors of the host. """ return self.T.successors(host)
[docs] def parent(self, host): """ Get the parent (infector) of a given host in the transmission tree. Parameters ---------- host : host The host node whose parent is to be returned. Returns ------- host The parent host object. """ return list(self.T.predecessors(host))[0]
[docs] def out_degree(self, host): """ Get the out-degree (number of children) of a host in the transmission tree. Parameters ---------- host : host The host node whose out-degree is to be returned. Returns ------- int The out-degree of the host. """ return self.T.out_degree(host)
[docs] def choose_successors(self, host, k=1): """ Choose k unique successors of a given host. Parameters ---------- host : host Host whose successors will be chosen. k : int, optional Number of successors to choose. Default is 1. Returns ------- list List of k randomly chosen successors of the host. """ return sample(list(self.successors(host)), k)
[docs] def compute_Delta_loc_prior(self, T_new): """ Compute the change in the location prior log-likelihood for a new tree. Parameters ---------- T_new : networkx.DiGraph The new transmission tree. Returns ------- tuple (Delta log prior, new log prior, old log prior, old correction log-likelihood) """ LP_sloc_top_old = self.same_location_prior.correction_LL LP_sloc_old = self.same_location_prior.log_prior LP_sloc_new = self.same_location_prior.log_prior_T(T_new) DL_prior_same_location = LP_sloc_new - LP_sloc_old return DL_prior_same_location, LP_sloc_new, LP_sloc_old, LP_sloc_top_old
[docs] def get_candidates_to_chain(self): """ Get the list of candidate hosts for chain moves in the transmission tree. Returns ------- list List of candidate host nodes for chain moves. """ self.candidates_to_chain = [h for h in self.T if not h == self.root_host and not self.out_degree(self.parent(h)) == 1] return self.candidates_to_chain
[docs] def get_N_candidates_to_chain(self, recompute=False): """ Get the number of candidate hosts for chain moves, optionally recomputing the list. Parameters ---------- recompute : bool, optional If True, recompute the list of candidates. Default is False. Returns ------- int Number of candidate hosts for chain moves. """ if recompute: self.N_candidates_to_chain = len(self.get_candidates_to_chain()) else: self.N_candidates_to_chain = len(self.candidates_to_chain) return self.N_candidates_to_chain
[docs] def get_root_subtrees(self): """ Retrieve the root subtrees of the transmission tree. This method searches for the first sampled siblings of the root host in the transmission tree and stores them in the `roots_subtrees` attribute. Returns ------- list A list of root subtrees. """ self.roots_subtrees = utils.search_firsts_sampled_siblings(self.root_host, self.T) return self.roots_subtrees
[docs] def get_unsampled_hosts(self): """ Get the list of unsampled hosts in the transmission tree (excluding the root). Returns ------- list List of unsampled host nodes. """ self.unsampled_hosts = [h for h in self.T if not h.sampled and h != self.root_host] return self.unsampled_hosts
[docs] def get_sampling_model_likelihood(self, hosts=None, T=None, update=False): """ Compute the likelihood of the sampling model. Computes the likelihood of the sampling model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects Returns ------- L: float The likelihood of the sampling model given the list of hosts """ if T is None: T = self.T L = 1 if hosts is not None: if isinstance(hosts,list): for h in hosts: if not h.sampled: L*=(1-self.pi) else: L*=self.pi*self.pdf_sampling(h.t_sample-h.t_inf) else: if not hosts.sampled: L*=(1-self.pi) else: L*=self.pi*self.pdf_sampling(hosts.t_sample-hosts.t_inf) else: for h in T: if not h.sampled: L*=(1-self.pi) else: L*=self.pi*self.pdf_sampling(h.t_sample-h.t_inf) if update: self.sampling_likelihood = L return L
[docs] def get_sampling_model_log_likelihood(self,hosts=None,T=None, update=False): """ Computes the likelihood of the sampling model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects Returns ------- L: float The likelihood of the sampling model given the list of hosts """ if T is None: T = self.T L = 0 if hosts is not None: if isinstance(hosts,list): for h in hosts: if not h.sampled: L += np.log((1-self.pi)) else: L += np.log(self.pi*self.pdf_sampling(h.t_sample-h.t_inf)) else: if not hosts.sampled: L += np.log((1-self.pi)) else: L += np.log(self.pi*self.pdf_sampling(hosts.t_sample-hosts.t_inf)) else: for h in T: if not h.sampled: L += np.log((1-self.pi)) else: L += np.log(self.pi*self.pdf_sampling(h.t_sample-h.t_inf)) if update: self.sampling_log_likelihood = L return L
[docs] def Delta_log_sampling(self, hosts, T_end, T_ini=None): """ Compute the change in log-likelihood for the sampling model. Parameters ---------- hosts : list List of host objects. T_end : float End time. T_ini : float, optional Initial time. Default is None. Returns ------- float Change in log-likelihood for the sampling model. Notes ----- The function operates as follows: 1. Computes the log-likelihood for the sampling model at T_end. 2. If T_ini is provided, subtracts the log-likelihood at T_ini. 3. Returns the difference. """ L_end = self.get_sampling_model_likelihood(hosts, T_end) if T_ini is None: T_ini = self.T L_ini = self.get_sampling_model_likelihood(hosts, T_ini) return np.log(L_end / L_ini)
[docs] def get_offspring_model_likelihood(self,hosts=None,T=None, update=False): """ Computes the likelihood of the offspring model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects Returns ------- L: float The likelihood of the offspring model given the list of hosts """ if T is None: T = self.T L = 1 if hosts is not None: if isinstance(hosts,list): for h in hosts: L*=self.pmf_offspring(T.out_degree(h)) else: L*=self.pmf_offspring(T.out_degree(hosts)) else: for h in T: L*=self.pmf_offspring(T.out_degree(h)) if update: self.offspring_likelihood = L return L
[docs] def get_offspring_model_log_likelihood(self,hosts=None,T=None, update=False): """ Computes the likelihood of the offspring model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects Returns ------- L: float The likelihood of the offspring model given the list of hosts """ if T is None: T = self.T L = 0 if hosts is not None: if isinstance(hosts,list): for h in hosts: L += self.dist_offspring.logpmf(T.out_degree(h)) else: L += self.dist_offspring.logpmf(T.out_degree(hosts)) else: for h in T: L += self.dist_offspring.logpmf(T.out_degree(h)) if update: self.offspring_log_likelihood = L return L
[docs] def Delta_log_offspring(self, hosts, T_end, T_ini=None): """ Compute the change in log-likelihood for the offspring model. Parameters ---------- hosts : list List of host objects. T_end : float End time. T_ini : float, optional Initial time. Default is None. Returns ------- float Change in log-likelihood for the offspring model. Notes ----- The function operates as follows: 1. Computes the log-likelihood for the offspring model at T_end. 2. If T_ini is provided, subtracts the log-likelihood at T_ini. 3. Returns the difference. """ L_end = self.get_offspring_model_likelihood(hosts, T_end) if T_ini is None: T_ini = self.T L_ini = self.get_offspring_model_likelihood(hosts, T_ini) return np.log(L_end / L_ini)
[docs] def get_infection_model_likelihood(self,hosts=None,T=None, update=False): """ Computes the likelihood of the infection model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects T: DiGraph object Contagious tree which likelihood of the hosts will be computed. If it is None, the network of the model is used. update: bool If True, the likelihood of the infection model is updated in the model object. Returns ------- L: float The likelihood of the infection model given the list of hosts """ if T is None: T = self.T L = 1 if hosts is not None: if isinstance(hosts,list): for h in hosts: for j in T.successors(h): L*=self.pdf_infection(j.t_inf-h.t_inf) # print(f"=======>{str(h)=},{str(j)=}\t{j.t_inf-h.t_inf=}\t{self.pdf_infection(j.t_inf-h.t_inf)=}") else: for j in T.successors(hosts): L*=self.pdf_infection(j.t_inf-hosts.t_inf) else: for h in T: for j in T.successors(h): L*=self.pdf_infection(j.t_inf-h.t_inf) if update: self.infection_likelihood = L return L
[docs] def get_infection_model_log_likelihood(self,hosts=None,T=None, update=False): """ Computes the likelihood of the infection model given a list of hosts. If no list is given, the likelihood of the whole transmission tree is returned. Parameters ---------- hosts: list of host objects T: DiGraph object Contagious tree which likelihood of the hosts will be computed. If it is None, the network of the model is used. update: bool If True, the likelihood of the infection model is updated in the model object. Returns ------- L: float The likelihood of the infection model given the list of hosts """ if T is None: T = self.T L = 0 if hosts is not None: if isinstance(hosts,list): for h in hosts: for j in T.successors(h): L += np.log(self.pdf_infection(j.t_inf-h.t_inf)) # print(f"=======>{str(h)=},{str(j)=}\t{j.t_inf-h.t_inf=}\t{self.pdf_infection(j.t_inf-h.t_inf)=}") else: for j in T.successors(hosts): L += np.log(self.pdf_infection(j.t_inf-hosts.t_inf)) else: for h in T: for j in T.successors(h): L += np.log(self.pdf_infection(j.t_inf-h.t_inf)) if update: self.infection_log_likelihood = L return L
[docs] def Delta_log_infection(self, hosts, T_end, T_ini=None): """ Compute the change in log-likelihood for the infection model. Parameters ---------- hosts : list List of host objects. T_end : float End time. T_ini : float, optional Initial time. Default is None. Returns ------- float Change in log-likelihood for the infection model. Notes ----- The function operates as follows: 1. Computes the log-likelihood for the infection model at T_end. 2. If T_ini is provided, subtracts the log-likelihood at T_ini. 3. Returns the difference. """ if T_ini is None: T_ini = self.T L_end = self.get_infection_model_likelihood(hosts, T_end) L_ini = self.get_infection_model_likelihood(hosts, T_ini) # Delta2 = (self.k_inf-1)*np.log(h.t_inf/) # print() # print(f"{Delta=},{np.log(L_end / L_ini)=}") return np.log(L_end / L_ini)
[docs] def log_likelihood_host(self, host, T=None): """ Computes the log likelihood of a host given the transmission tree. Parameters ---------- host: host object T: DiGraph object Returns ------- log_likelihood: float The log likelihood of the host in the transmission network """ if T is None: T = self.T L = 0 # Sampling model L += self.get_sampling_model_log_likelihood(host, T) # Offspring model L += self.get_offspring_model_log_likelihood(host, T) # Infection model L += self.get_infection_model_log_likelihood(host, T) return L
[docs] def Delta_log_likelihood_host(self, hosts, T_end, T_ini=None): """ Compute the change in log-likelihood for a host. Parameters ---------- hosts : list List of host objects. T_end : float End time. T_ini : float, optional Initial time. Default is None. Returns ------- float Change in log-likelihood for the host. Notes ----- The function operates as follows: 1. Computes the log-likelihood for the host at T_end. 2. If T_ini is provided, subtracts the log-likelihood at T_ini. 3. Returns the difference. """ # print(hosts) L_end = self.log_likelihood_host(hosts, T_end) # Delta = self.Delta_log_infection(hosts, T_end, T_ini) + self.Delta_log_offspring(hosts, T_end, T_ini) + self.Delta_log_sampling(hosts, T_end, T_ini) if T_ini is None: T_ini = self.T L_ini = self.log_likelihood_host(hosts, T_ini) # print("----", hosts, L_end - L_ini, Delta) return L_end - L_ini
[docs] def log_likelihood_hosts_list(self, hosts, T): log_likelihood = 0 for h in hosts: log_likelihood += self.log_likelihood_host(h, T) return log_likelihood
[docs] def log_likelihood_transmission_tree(self, T): log_likelihood = 0 for h in T: log_likelihood += self.log_likelihood_host(h, T) return log_likelihood
[docs] def log_posterior_transmission_tree(self): """ Compute the log-posterior of the current transmission tree. This method calculates the log-posterior probability of the current transmission tree by summing the log-likelihood of the tree and any additional prior log-probabilities, such as genetic and location priors, if they are defined. Returns ------- float The computed log-posterior of the current transmission tree. Notes ----- The log-posterior is computed as: log_posterior = log_likelihood + genetic_log_prior (if defined) + same_location_log_prior (if defined) The method uses the following attributes: - self.log_likelihood: Log-likelihood of the transmission tree. - self.genetic_log_prior: Log-prior from the genetic model (if defined). - self.same_location_log_prior: Log-prior from the location model (if defined). """ self.log_posterior = 0 self.log_posterior += self.log_likelihood if self.genetic_prior is not None: self.log_posterior += self.genetic_log_prior if self.same_location_prior is not None: self.log_posterior += self.same_location_log_prior return self.log_posterior
[docs] def get_log_posterior_transmission_tree(self, T): """ Compute and update the log-posterior of the transmission tree. This method calculates the log-posterior probability of the given transmission tree `T` by combining the log-likelihood of the tree with any additional prior log-probabilities, such as genetic and location priors, if they are defined. The computed log-posterior and any relevant prior log-likelihoods are stored as attributes of the object. Parameters ---------- T : networkx.DiGraph The transmission tree for which to compute the log-posterior. Returns ------- float The computed log-posterior of the transmission tree. Notes ----- The log-posterior is computed as: log_posterior = log_likelihood + genetic_log_prior (if defined) + same_location_log_prior (if defined) The method also updates the following attributes: - self.log_posterior - self.genetic_log_prior (if applicable) - self.same_location_log_prior (if applicable) """ self.log_posterior = self.log_posterior_transmission_tree(T) if self.genetic_prior is not None: self.genetic_log_prior = self.genetic_prior.log_prior_T(T) if self.same_location_prior is not None: self.same_location_log_prior = self.same_location_prior.log_prior_T(T) return self.log_posterior
[docs] def show_log_likelihoods(self, hosts=None, T=None, verbose=False): """ Print and return the log-likelihoods for the sampling, offspring, and infection models. Parameters ---------- hosts : list, optional List of host objects to compute log-likelihoods for. If None, computes for all hosts in T. T : networkx.DiGraph, optional Transmission tree. If None, uses self.T. verbose : bool, optional If True, prints the log-likelihoods. Default is False. Returns ------- tuple (LL_sampling, LL_offspring, LL_infection): Log-likelihoods for the sampling, offspring, and infection models. """ if T is None: T = self.T if hosts is not None: LL_sampling = self.get_sampling_model_log_likelihood(hosts, T) LL_offspring = self.get_offspring_model_log_likelihood(hosts, T) LL_infection = self.get_infection_model_log_likelihood(hosts, T) else: LL_sampling = 0 LL_offspring = 0 LL_infection = 0 for h in T: LL_sampling += self.get_sampling_model_log_likelihood(h, T) LL_offspring += self.get_offspring_model_log_likelihood(h, T) LL_infection += self.get_infection_model_log_likelihood(h, T) if verbose: print("Sampling model:", LL_sampling) print("Offspring model:", LL_offspring) print("Infection model:", LL_infection) return LL_sampling, LL_offspring, LL_infection
[docs] def log_likelihood_transmission_tree_old(self, T): """ Compute the log-likelihood of the entire transmission tree using the old method. Parameters ---------- T : networkx.DiGraph Transmission tree to compute the log-likelihood for. Returns ------- float The log-likelihood of the transmission tree. """ log_likelihood = 0 Pi = 1 for h in T: # Sampling model if h.sampled: sigma = self.pdf_sampling(h.t_sample - h.t_inf) Pi = self.pi * (sigma) else: Pi = (1 - self.pi) # Offspring model Pi *= self.pmf_offspring(T.out_degree(h)) # Infection model for j in T.successors(h): sigma2 = self.pdf_infection(j.t_inf - h.t_inf) if sigma2 == 0: self.log_likelihood = -1e30 print("Impossible times!!!", int(h), int(j), h.t_inf, j.t_inf, j.t_inf - h.t_inf) return self.log_likelihood Pi *= sigma2 log_likelihood += np.log(Pi) return log_likelihood
[docs] def get_log_likelihood_transmission(self): self.sampling_likelihood = 1 self.infection_likelihood = 1 self.offspring_likelihood = 1 self.likelihood = 1 self.log_likelihood = 0 self.sampling_log_likelihood = 0 self.infection_log_likelihood = 0 self.offspring_log_likelihood = 0 for i,h in enumerate(self.T): self.sampling_log_likelihood += self.get_sampling_model_log_likelihood(h) self.infection_log_likelihood += self.get_infection_model_log_likelihood(h) self.offspring_log_likelihood += self.get_offspring_model_log_likelihood(h) self.log_likelihood += self.sampling_log_likelihood + self.infection_log_likelihood + self.offspring_log_likelihood # print(i,h,sampling_likelihood,offspring_likelihood,infection_likelihood,self.likelihood) self.likelihood = np.exp(self.log_likelihood) self.sampling_likelihood = np.exp(self.sampling_log_likelihood) self.infection_likelihood = np.exp(self.infection_log_likelihood) self.offspring_likelihood = np.exp(self.offspring_log_likelihood) return self.log_likelihood
[docs] def add_genetic_prior(self,mu_gen, gen_dist): """ Adds a genetic prior to the model that computes the likelihood that two sampled hosts has a relationship given the genetic distance of the virus of the hosts. Two nodes are considered that has a relationship if the only hosts that are on they are connected through unsampled hosts. Parameters ---------- mu_gen: float Mutation rate gen_dist: np.array Genetic distance matrix of the virus of the hosts. The index has to be identical to the index of the hosts. """ self.genetic_prior = genetic_prior_tree(self, mu_gen, gen_dist) self.genetic_log_prior = self.genetic_prior.log_prior_T(self.T)
[docs] def add_same_location_prior(self, P_NM, tau, loc_dist): """ Adds a genetic prior to the model that computes the likelihood that two sampled hosts has a relationship given the genetic distance of the virus of the hosts. Two nodes are considered that has a relationship if the only hosts that are on they are connected through unsampled hosts. Parameters ---------- log_K: float Log probability of two hosts not being in the same location gen_dist: np.array Genetic distance matrix of the virus of the hosts. The index has to be identical to the index of the hosts. """ self.same_location_prior = same_location_prior_tree(self,P_NM, tau,loc_dist) self.same_location_log_prior = self.same_location_prior.log_prior_T(self.T)
[docs] def create_transmision_phylogeny_nets(self, N, mu, P_mut): """ N: Number of hosts mu: Mutation rate P_mut: Prob of mutation """ n = 1 # Counting hosts for the index self.T = nx.DiGraph() self.G = nx.DiGraph() # Adding first host self.T.add_node(self.root_host) self.G.add_node(self.root_host.get_genetic_str()) new_infected = [self.root_host] genes = [self.root_host.get_genetic_str()] while True: last_infected = list(new_infected) new_infected = [] # print(last_infected) for h in last_infected: # Generating k infections for host h k = nbinom.rvs(self.r, self.p_inf, size=1)[0] if k == 0: new_infected = list(last_infected) for i in range(k): # sampled with probability Pi rnd = random() if rnd < self.pi: sampled = True else: sampled = False sample_time = None # generate mutations genetic_data = binom_mutation(100, self.p_inf, h.genetic_data) genes.append(genetic_data) # Infections time from gamma distribution t_inf = np.random.gamma(shape=self.k_inf, scale=self.theta_inf, size=1)[0] if sampled: t_u_sampl = np.random.gamma(shape=self.k_samp, scale=self.theta_samp, size=1)[0] sample_time = h.t_inf + t_inf + t_u_sampl # print(i+n,h.t_inf+t_inf,t_u_sampl,sample_time,gamma.pdf(t_u_sampl,k,0),gamma.pdf(sample_time,k,h.t_inf+t_inf)) # print("host",i+n) new_infected.append( host(str(i + n), i + n, genetic_data, t_inf=h.t_inf + t_inf, t_sample=sample_time)) # Adding edges self.T.add_edge(h, new_infected[-1]) if h.genetic_data == new_infected[-1].genetic_data: continue ##Adding edges attributes to phylogeny self.G.add_edge(h.get_genetic_str(), new_infected[-1].get_genetic_str(), hosts=[h, new_infected[-1]]) n += k if last_infected == []: last_infected = [self.root_host] # print(k,n,last_infected) if n > N: break self.host_dict = {int(str(h)): h for h in self.T} return self.G, self.T, self.host_dict
[docs] def get_newick(self,lengths=True): self.newick = utils.tree_to_newick(self.T, root=self.root_host,lengths=lengths) return self.newick
[docs] def save_json(self, filename): """ Save the transmission tree to a JSON file. Parameters ---------- filename : str Path to the output JSON file. """ utils.tree_to_json(self, filename)
[docs] @classmethod def json_to_tree(cls, filename, sampling_params=None, offspring_params=None, infection_params=None): """ Load a transmission model from a JSON file and reconstruct the model object. Parameters ---------- filename : str Path to the JSON file. sampling_params : dict, optional Sampling parameters to override those in the file. Default is None. offspring_params : dict, optional Offspring parameters to override those in the file. Default is None. infection_params : dict, optional Infection parameters to override those in the file. Default is None. Returns ------- didelot_unsampled The reconstructed transmission model. """ edge_list = [] with open(filename, "r") as json_data: dict_tree = json.load(json_data) json_data.close() if sampling_params is None: sampling_params = dict_tree["parameters"]["sampling_params"] if offspring_params is None: offspring_params = dict_tree["parameters"]["offspring_params"] if infection_params is None: infection_params = dict_tree["parameters"]["infection_params"] model = cls(sampling_params, offspring_params, infection_params) model.log_likelihood = dict_tree["log_likelihood"] model.T = nx.DiGraph() model.root_host = utils.get_host_from_dict(dict_tree["tree"]) edge_list = utils.read_tree_dict(dict_tree["tree"], h1=model.root_host, edge_list=[]) model.T.add_edges_from(edge_list) return model
[docs] def infection_time_from_sampling_step(self, selected_host=None, metHast=True, verbose=False): """ Propose and possibly accept a new infection time for a sampled host using the Metropolis-Hastings algorithm. This method samples a new infection time for a selected host (or a random sampled host if not provided), computes the acceptance probability, and updates the host's infection time if the proposal is accepted. Parameters ---------- selected_host : host, optional The host whose infection time will be changed. If None, a random sampled host is selected. metHast : bool, optional If True, use the Metropolis-Hastings algorithm to accept or reject the proposal. Default is True. verbose : bool, optional If True, print detailed information about the proposal. Default is False. Returns ------- t_inf_new : float The proposed new infection time. gg : float Proposal ratio for the Metropolis-Hastings step. pp : float Likelihood ratio for the Metropolis-Hastings step. P : float Acceptance probability for the Metropolis-Hastings step. selected_host : host The host whose infection time was proposed to change. """ L_old = self.get_log_likelihood_transmission() rejects = 0 ################################################################## ################################################################## ####### INFECTION TIME ###### ################################################################## ################################################################## if selected_host is None: selected_host = sample(list(self.T.nodes()), 1)[0] # print(selected_host.t_inf) while selected_host.t_sample is None: selected_host = sample(list(self.T.nodes()), 1)[0] # t_inf_old = selected_host.t_inf # print(selected_host.t_sample) # print(t_inf_old) t_inf_old = selected_host.t_inf t_inf_old2 = -selected_host.t_inf + selected_host.t_sample # We don't want transmissions happening before the infectors transmission acceptable = False trys = 0 # Choosing sampled node while not acceptable: trys += 1 # if verbose:print("tries",trys) t_inf_new = self.samp_sampling() for j in self.T.successors(selected_host): if j.t_inf - (selected_host.t_sample - t_inf_new) < 0: acceptable = False break else: for j in self.T.predecessors(selected_host): if -(j.t_inf - (selected_host.t_sample - t_inf_new)) < 0: acceptable = False break else: acceptable = True gg = ((t_inf_old2/t_inf_new ) ** (self.k_samp - 1) * np.exp(-(t_inf_old2 - t_inf_new) / self.theta_samp)) selected_host.t_inf = selected_host.t_sample - t_inf_new selected_host.t_inf = t_inf_old self.log_likelihood = L_old pp = np.exp(L_new - L_old) P = gg * pp if metHast: if P > 1: selected_host.t_inf = selected_host.t_sample - t_inf_new self.log_likelihood = L_new else: rnd = random() if rnd < P: selected_host.t_inf = selected_host.t_sample - t_inf_new self.log_likelihood = L_new return t_inf_new, gg, pp, P, selected_host
[docs] def infection_time_from_infection_model_step(self, selected_host=None, metHast=True, Dt_new=None, verbose=False): """ Method to change the infection time of a host and then accept the change using the Metropolis Hastings algorithm. Parameters ---------- selected_host: host object, default=None Host whose infection time will be changed. If None, a host is randomly selected. metHast: bool, default=True If True, the Metropolis Hastings algorithm is used to accept or reject the change. Dt_new: float, default=None New infection time for the host. If None, a new time is sampled. verbose: bool, default=False If True, prints the results of the step. """ L_old = self.log_likelihood # rejects = 0 ################################################################## ################################################################## ####### INFECTION TIME ###### ################################################################## ################################################################## if selected_host is None: while True: selected_host = sample(list(self.T.nodes()), 1)[0] if selected_host!=self.root_host:break parent = self.parent(selected_host) # print(t_inf_old) t_inf_old = selected_host.t_inf Dt_old = +selected_host.t_inf - parent.t_inf # We don't want transmissions happening before the infectors transmission t_min = None # Choosing sampled node if Dt_new is None: if self.out_degree(selected_host) == 0 and not selected_host.sampled: if selected_host.sampled: t_min = selected_host.t_sample-selected_host.t_inf Dt_new = self.dist_infection.ppf(random() * self.dist_infection.cdf(t_min)) else: Dt_new = self.samp_infection() try: gg = ((Dt_old / Dt_new) ** (self.k_inf - 1) * np.exp(-(Dt_old - Dt_new) / self.theta_inf)) except RuntimeWarning: print(Dt_old, Dt_new, self.pdf_infection(Dt_new), self.k_inf, self.theta_inf, self.out_degree(selected_host), t_min, self.dist_infection.cdf(t_min)) if Dt_old<0: print("NEGATIVE!!!",selected_host.t_inf, parent.t_inf) raise RuntimeWarning if verbose: print("An unsampled leaf has been shifted") print( f"\tDt_new: {Dt_new}, Dt_old: {Dt_old}, gg: {gg}, pp: {1/gg}, P: {1}, selected_host: {selected_host}") if metHast: DL = np.log(1/gg) pp = np.exp(DL) # DL2 = utils.Delta_log_gamma(Dt_old, Dt_new, self.k_inf, self.theta_inf) # if selected_host.sampled: # Dt_samp_old = selected_host.t_sample - t_inf_old # Dt_samp_new = selected_host.t_sample - t_inf_new # # DL += (self.k_samp-1)*np.log(Dt_samp_new/Dt_samp_old) - ((Dt_samp_new-Dt_samp_old)/self.theta_samp) # DL2 += utils.Delta_log_gamma(Dt_samp_old, Dt_samp_new, self.k_samp, self.theta_samp) # print(f"----------->{DL=}\t{DL2=}") #Genetic prior if self.genetic_prior is not None: LP_top_old = self.genetic_prior.correction_LL LP_old = self.genetic_log_prior selected_host.t_inf = parent.t_inf + Dt_new LP_new = self.genetic_prior.log_prior_T(self.T,verbose=verbose) selected_host.t_inf = parent.t_inf + Dt_old DL_prior = LP_new-LP_old pp *= np.exp(DL_prior) #location prior if self.same_location_prior is not None: LP_sloc_top_old = self.same_location_prior.correction_LL LP_sloc_old = self.same_location_log_prior selected_host.t_inf = parent.t_inf + Dt_new LP_sloc_new = self.same_location_prior.log_prior_T(self.T) selected_host.t_inf = parent.t_inf + Dt_old DL_prior_same_location = LP_sloc_new - LP_sloc_old pp *= np.exp(DL_prior_same_location) P = gg*pp if P > 1: selected_host.t_inf = parent.t_inf + Dt_new self.log_likelihood += DL accepted = True if self.genetic_prior is not None: self.genetic_log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new if verbose: print("Time shift accepted") print("__" * 50, "\n\n") else: if random() < P: selected_host.t_inf = parent.t_inf + Dt_new self.log_likelihood += DL accepted = True if self.genetic_prior is not None: self.genetic_log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new if verbose: print("Time shift accepted") print("__" * 50, "\n\n") else: accepted = False if self.genetic_prior is not None: self.genetic_prior.correction_LL = LP_top_old self.genetic_prior.log_prior = LP_old self.genetic_log_prior = LP_old if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_old self.same_location_log_prior = LP_sloc_old self.same_location_prior.correction_LL = LP_sloc_top_old if verbose: print("Time shift rejected") if self.genetic_prior is not None: print(f"------>\t{DL=}\t{DL_prior=}") else: print(f"------>\t{DL=}\t{DL_prior=}") print("__" * 50, "\n\n") # self.log_likelihood += DL return Dt_new, gg, pp, P, selected_host, accepted,DL # # if selected_host.sampled: # DL = (self.k_inf - 1) * np.log(Dt_new / Dt_old) - ((Dt_new - Dt_old) / self.theta_inf) # for h in self.successors(selected_host): # Dt_h_old = h.t_inf - t_inf_old # Dt_h_new = h.t_inf - t_inf_new # DL += (self.k_inf - 1) * np.log(Dt_h_new / Dt_h_old) - ((Dt_h_new - Dt_h_old) / self.theta_inf) # # if selected_host.sampled: # Dt_samp_old = selected_host.t_sample - t_inf_old # Dt_samp_new = selected_host.t_sample - t_inf_new # DL += (self.k_samp - 1) * np.log(Dt_samp_new / Dt_samp_old) - ( # (Dt_samp_new - Dt_samp_old) / self.theta_samp) # # # print( # f"A saco {L_new - L_old}, solo cambios {DL}, diff {L_new - L_old - DL}, similar? {np.abs(L_new - L_old - DL) < 1e-9},sampled? {selected_host.sampled}") # # print(f"\t--> pp={np.exp(L_new - L_old)}, gg={gg}, P={gg * np.exp(L_new - L_old)}") else: #No leaf if self.out_degree(selected_host) > 0: if verbose: print("A no leaf has been shifted") t_min = min(self.T.successors(selected_host), key=lambda j: j.t_inf).t_inf-parent.t_inf #No leaf and sampled if selected_host.sampled: t_min = min(selected_host.t_sample - selected_host.t_inf,t_min) else: #Leaf and sampled if verbose: print("A sampled leaf has been shifted") t_min = selected_host.t_sample-selected_host.t_inf Dt_new = self.dist_infection.ppf(random() * self.dist_infection.cdf(t_min)) try: gg = ((Dt_old / Dt_new) ** (self.k_inf - 1) * np.exp(-(Dt_old - Dt_new) / self.theta_inf)) except RuntimeWarning: print(Dt_old, Dt_new, self.pdf_infection(Dt_new), self.k_inf, self.theta_inf, selected_host.sampled, self.out_degree(selected_host), t_min, self.dist_infection.cdf(t_min)) if Dt_old<0: print("NEGATIVE!!!",selected_host.t_inf , parent.t_inf) raise RuntimeWarning # t_inf_old = selected_host.t_inf # selected_host.t_inf = parent.t_inf + Dt_new t_inf_new = parent.t_inf + Dt_new # L_new = self.get_log_likelihood_transmission() DL = utils.Delta_log_gamma(Dt_old,Dt_new,self.k_inf,self.theta_inf) for h in self.successors(selected_host): Dt_h_old = h.t_inf-t_inf_old Dt_h_new = h.t_inf-t_inf_new # DL += (self.k_inf-1)*np.log(Dt_h_new/Dt_h_old) - ((Dt_h_new-Dt_h_old)/self.theta_inf) DL += utils.Delta_log_gamma(Dt_h_old,Dt_h_new,self.k_inf,self.theta_inf) if selected_host.sampled: Dt_samp_old = selected_host.t_sample-t_inf_old Dt_samp_new = selected_host.t_sample-t_inf_new # DL += (self.k_samp-1)*np.log(Dt_samp_new/Dt_samp_old) - ((Dt_samp_new-Dt_samp_old)/self.theta_samp) DL += utils.Delta_log_gamma(Dt_samp_old,Dt_samp_new,self.k_samp,self.theta_samp) # DL += (self.k_inf - 1) * np.log(t_inf_new / Dt_samp_old) - ((t_inf_new - Dt_samp_old) / self.theta_inf) selected_host.t_inf = parent.t_inf + Dt_new L_new = self.log_likelihood_transmission_tree(self.T) selected_host.t_inf = parent.t_inf + Dt_old # print(f"A saco {L_new-L_old}, solo cambios {DL}, diff {L_new-L_old-DL}, similar? {np.abs(L_new-L_old-DL)<1e-9},sampled? {selected_host.sampled}") pp = np.exp(DL) # Genetic prior if self.genetic_prior is not None: LP_old = self.genetic_log_prior LP_top_old = self.genetic_prior.correction_LL selected_host.t_inf = parent.t_inf + Dt_new LP_new = self.genetic_prior.log_prior_T(self.T) selected_host.t_inf = parent.t_inf + Dt_old DL_prior = LP_new - LP_old pp *= np.exp(DL_prior) # location prior if self.same_location_prior is not None: LP_sloc_top_old = self.same_location_prior.correction_LL LP_sloc_old = self.same_location_log_prior selected_host.t_inf = parent.t_inf + Dt_new LP_sloc_new = self.same_location_prior.log_prior_T(self.T) selected_host.t_inf = parent.t_inf + Dt_old DL_prior_same_location = LP_sloc_new - LP_sloc_old pp *= np.exp(DL_prior_same_location) P = gg * pp if verbose: if self.genetic_prior is not None: print(f"\tDt_new: {Dt_new}, Dt_old: {Dt_old}, gg: {gg}, pp: {pp}, P: {P}, selected_host: {selected_host} , L_new: {L_new}, L_old: {L_old}, DL: {DL} vs {L_new-L_old}") print(f"\t\tGenetic prior: LP_new: {LP_new}, LP_old: {LP_old}, DL_prior: {DL_prior}") else: print(f"\tDt_new: {Dt_new}, Dt_old: {Dt_old}, gg: {gg}, pp: {pp}, P: {P}, selected_host: {selected_host} , L_new: {L_new}, L_old: {L_old}, DL: {DL} vs {L_new-L_old}") # pp2 = likelihood_ratio(self,selected_host,t_inf_old,selected_host.t_sample-Dt_new,log=False) # L_old = self.log_likelihood_transmission() accepted = False # Metropolis Hastings if metHast: if P > 1: accepted = True selected_host.t_inf = parent.t_inf + Dt_new self.log_likelihood += DL if verbose: print("Time shift accepted") print("__" * 50, "\n\n") if self.genetic_prior is not None: self.genetic_log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new else: rnd = random() # print(P,algo) if rnd < P: accepted = True selected_host.t_inf = parent.t_inf + Dt_new self.log_likelihood += DL if verbose: print("Time shift accepted") print("__"*50,"\n\n") if self.genetic_prior is not None: self.genetic_log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new # print("rejected",itt) else: accepted = False # DL = DL if self.genetic_prior is not None: self.genetic_prior.correction_LL = LP_top_old self.genetic_prior.log_prior = LP_old if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_old self.same_location_log_prior = LP_sloc_old if verbose: print("Time shift rejected") print("__"*50,"\n\n") return Dt_new, gg, pp, P, selected_host, accepted, DL
[docs] def add_unsampled_with_times(self, selected_host=None, P_add=0.5, P_rewiring=0.5, P_off=0.5, verbose=False, only_geometrical=False, detailed_probs=False): """ Method to propose the addition of an unsampled host to the transmission tree and get the probability of the proposal. Parameters: ----------- selected_host: host object Host to which the unsampled host will be added. If None, a host is randomly selected. P_add: float Probability of proposing to add a new host to the transmission tree. P_rewiring: float Probability of rewiring the new host to another sibling host. P_off: float Probability to rewire the new host to be a leaf. verbose: bool If True, prints the results of the step. only_geometrical: bool If True, only the proposal of the new topological structure will be considered. detailed_probs: bool If True, the method will return both probabilities of the proposals, of adding and removing a host. Returns: -------- T_new: DiGraph object New transmission tree with the proposed changes. gg: float Ratio of the probabilities of the proposals. g_go: float Probability of the proposal of adding a host. g_ret: float Probability of the proposal of removing a host. prob_time: float Probability of the time of infection of the new host. unsampled: host object Unsampeld host to be added to the transmission tree. added: bool If True, the host was added to the transmission tree. """ if selected_host is None: selected_host = choice(list(self.T.nodes())) # print(str(selected_host),list(self.T.nodes())) k_selected_host = self.out_degree(selected_host) # Choosing add in new link or add between two nodes if k_selected_host > 0: sibling = self.choose_successors(selected_host)[0] t_min = sibling.t_inf Dt_max = t_min - selected_host.t_inf link = [selected_host, sibling] else: sibling = None Dt = self.samp_infection() # if only_geometrical:prob_time = 1 prob_time = self.pdf_infection(Dt) s = "U" Dt = 0 try: unsampled = host(s, -1, genetic_data=[], t_inf=(selected_host.t_inf + Dt)) except Exception as e: print(sibling, selected_host) for h in self.T: print(h, type(h)) raise e # N_no_leaves = len([h for h in self.T.nodes() if self.out_degree(h)>0]) # Rewiring if random() < P_rewiring and k_selected_host != 0: if verbose: print("Trying to rewire too:") if random() < P_off or k_selected_host == 1: # The unsampled host slices to be with no children (to offspring) links = [(selected_host, unsampled)] to_remove = [] if verbose: if k_selected_host > 1: print("\t Unsampled host will no infect no one") else: print("\t Unsampled host CANNOT infect no one") if k_selected_host > 1: g_go = (1 / len(self.T)) * P_rewiring * P_off # links.append((unsampled,sibling)) else: g_go = (1 / len(self.T)) * P_rewiring sibling = None Dt_max = None k_unsampled = 0 if not only_geometrical: Dt = self.dist_infection.ppf(random()) unsampled.t_inf = selected_host.t_inf + Dt prob_time = self.pdf_infection(Dt) g_go *= prob_time else: k_unsampled = randint(1, k_selected_host - 1) links = [(selected_host, unsampled), (unsampled, sibling)] to_remove = [(selected_host, sibling)] if verbose: print(f"\t Unsampled host will infect {k_unsampled + 1} people.") # if k_unsampled>1: to_go_down = utils.random_combination( combinations([h for h in self.successors(selected_host) if h != sibling], k_unsampled))[0] comb_count = factorial(k_selected_host - 1) / ( (factorial(k_selected_host - k_unsampled) * factorial(k_unsampled))) # Ps = prob_time Dt_max = sibling.t_inf - selected_host.t_inf for i, h in enumerate(to_go_down): # if k_unsampled ==1:print("-------",h,type(to_go_down)) links.append((unsampled, h)) to_remove.append((selected_host, h)) # print(h,h.t_inf) if not only_geometrical and (h.t_inf - selected_host.t_inf) < Dt_max: Dt_max = h.t_inf - selected_host.t_inf # if k_unsampled ==1: k_unsampled += 1 # kph=2 g_go = (1 / len(self.T)) * (1 / (k_selected_host * (k_selected_host - 1))) g_go *= k_unsampled * (factorial(k_unsampled - 1) * factorial(k_selected_host - k_unsampled) / ( factorial(k_selected_host - 1))) g_go *= (1 - P_off) * P_rewiring if not only_geometrical: Dt = self.dist_infection.ppf(random() * self.dist_infection.cdf(Dt_max)) unsampled.t_inf = selected_host.t_inf + Dt prob_time = self.pdf_infection(Dt) / self.dist_infection.cdf(Dt_max) g_go *= prob_time else: links = [(selected_host, unsampled)] if k_selected_host > 0: if verbose: print("No rewire:") g_go = (1 / len(self.T)) * (1 - P_rewiring) g_go *= (1 / k_selected_host) if not only_geometrical: Dt_max = sibling.t_inf - selected_host.t_inf # print("------------",selected_host.t_inf - sibling.t_inf,selected_host.t_inf, sibling.t_inf) Dt = self.dist_infection.ppf(random() * self.dist_infection.cdf(Dt_max)) unsampled.t_inf = selected_host.t_inf + Dt prob_time = self.pdf_infection(Dt) / self.dist_infection.cdf(Dt_max) g_go *= prob_time k_unsampled = 1 links.append((unsampled, sibling)) to_remove = [(selected_host, sibling)] else: g_go = (1 / len(self.T)) Dt_max = None if not only_geometrical: if verbose: print("No option for rewiring") Dt = self.dist_infection.ppf(random()) unsampled.t_inf = selected_host.t_inf + Dt prob_time = self.pdf_infection(Dt) g_go *= prob_time k_unsampled = 0 to_remove = [] if only_geometrical: prob_time = None # Ratio of proposals g_ret = 1 / (len(self.unsampled_hosts) + 1) # If there are no more unsampled hosts, the proposal is divided by P_add if len(self.unsampled_hosts) == 0: g_ret /= 2 gg = ((1-P_add)*g_ret) / (P_add*g_go) # if abs(gg*prob_time-0.5)<1e-6: # print(gg,g_go,g_ret,len(self.T),(len(self.unsampled_hosts),k_selected_host)) if verbose: if prob_time is not None: if sibling is not None: print( f"ADDING:\nadd:\n\t-g(+1){g_go}, Dt {Dt}, Dt_max {Dt_max}, pt {prob_time}, g(+1)/pt {g_go / prob_time}\n\t-g(-1){g_ret}\n\tg_ret/g_go {g_ret / g_go}\nAdding {str(unsampled)} below {str(selected_host)} with degree {k_unsampled} and sibling {sibling} {sibling.t_inf}\n-----------------------------------------------------") else: print( f"ADDING:\nadd:\n\t-g(+1){g_go}, Dt {Dt}, Dt_max {Dt_max}, pt {prob_time}, g(+1)/pt {g_go / prob_time}\n\t-g(-1){g_ret}\n\tg_ret/g_go {g_ret / g_go}\nAdding {str(unsampled)} below {str(selected_host)} with degree {k_unsampled} and sibling {sibling}\n-----------------------------------------------------") else: print( f"ADDING:\nadd:\n\t-g(+1){g_go}, pt {prob_time}, g(+1)/pt {g_go}\n\t-g(-1){g_ret}\n\tg_ret/g_go {g_ret / g_go}\nAdding {str(unsampled)} below {str(selected_host)} with degree {k_unsampled}\n-----------------------------------------------------") T_new = nx.DiGraph(self.T) T_new.remove_edges_from(to_remove) T_new.add_edges_from(links) if detailed_probs: return T_new, gg, g_go, g_ret, prob_time, unsampled, True else: return T_new, gg, unsampled, True
[docs] def remove_unsampled_with_times(self, selected_host=None, P_add=0.5, P_rewiring=0.5, P_off=0.5, only_geometrical=False, detailed_probs=False, verbose=False): """ Method to propose the removal of an unsampled host from the transmission tree and get the probability of the proposal. In case that no unsampled hosts are available, a new host is proposed to be added to the transmission tree. Parameters: ----------- selected_host: host object Unsampled host to be removed from the transmission tree. If None, a host is randomly selected. P_add: float Probability of proposing to add a new host to the transmission tree. P_rewiring: float Probability of rewiring the new host to another sibling host. P_off: float Probability to rewire the new host to be a leaf. verbose: bool If True, prints the results of the step. only_geometrical: bool If True, only the proposal of the new topological structure will be considered. detailed_probs: bool If True, the method will return both probabilities of the proposals, of adding and removing a host. Returns: -------- T_new: DiGraph object New transmission tree with the proposed changes. gg: float Ratio of the probabilities of the proposals. g_go: float Probability of the proposal of adding a host. g_ret: float Probability of the proposal of removing a host. prob_time: float Probability of proposing the time of the selected_host. added: bool If True, the host was added to the transmission tree. Else, the node have been removed """ if selected_host is None: if len(self.unsampled_hosts) == 0: if detailed_probs: T_new, gg, g_go, g_ret, pt, unsampled, added = self.add_unsampled_with_times( P_rewiring=P_rewiring, P_off=P_off, detailed_probs=detailed_probs, verbose=verbose) # print("---------",T_new,gg,g_go,g_ret,pt,unsampled,added) return T_new, gg, g_go, g_ret, pt, unsampled, added else: T_new, gg, unsampled, added = self.add_unsampled_with_times( P_rewiring=P_rewiring, P_off=P_off, detailed_probs=detailed_probs, verbose=verbose) return T_new, gg, unsampled, added # gg /= 2 selected_host = choice(self.unsampled_hosts) try: k_selected_host = self.out_degree(selected_host) # outdefgree selected host except NetworkXError as e: print(selected_host, type(selected_host)) raise e try: parent = self.parent(selected_host) except NetworkXError as e: # pos = hierarchy_pos_times(self.T,self.root_host) pos = nx.spring_layout(self.T) tm.plot_transmision_network(self.T, nodes_labels=True, pos=pos, highlighted_nodes=[selected_host]) raise e k_parent = self.out_degree(parent) # outdefgree parent kappa = k_parent + k_selected_host children = list(self.successors(selected_host)) # Ratio of proposals ##probability of coming back kappa = k_selected_host + k_parent ## Probablities of times # if only_geometrical: ###Probability of having a time with no sibling ###Sum of probabilities if not only_geometrical: Dt = selected_host.t_inf - parent.t_inf p_t = self.pdf_infection(Dt) g_go = 1 / len(self.unsampled_hosts) new_len_prob = 1 / (len(self.T) - 1) if k_selected_host == 0: if verbose: print("Removing a leave (rewiring to offspring was choosed):") if k_parent == 1: if verbose: print("\t-It had no rewire") g_ret = new_len_prob elif k_parent == 2: if verbose: print("\t-It had one possible rewire") g_ret = new_len_prob * P_rewiring elif k_parent > 2: if verbose: print("\t-It had a rewire and to become a leave was an option") g_ret = new_len_prob * P_rewiring * P_off if not only_geometrical: if verbose: print(f"\t-p_t {p_t} no Dt_max") g_ret *= p_t # print("ZERO!!!") elif k_selected_host == 1: if verbose: print("Removing a host with 1 sibling (no rewiring was choosed):") if k_parent == 1: g_ret = new_len_prob * (1 - P_rewiring) else: g_ret = new_len_prob * (1 - P_rewiring) / (kappa - 1) if not only_geometrical: Dt_max = next(self.successors(selected_host)).t_inf - parent.t_inf p_t /= self.dist_infection.cdf(Dt_max) if verbose: print(f"\t-p_t {p_t} with Dt {Dt} and Dt_max {Dt_max}") g_ret *= p_t else: # # child = children[0] # if k_parent==1: if verbose: print("Removing a leave (rewiring to chain other hosts was choosen):") g_ret = new_len_prob * P_rewiring * (1 - P_off) g_ret *= factorial(k_parent - 1) * factorial(k_selected_host) / ( (kappa - 1) * (kappa - 2) * factorial(kappa - 2)) if not only_geometrical: Dt_max = 10000000000 for h in self.successors(selected_host): Dt_2 = h.t_inf - parent.t_inf if Dt_2 < Dt_max: Dt_max = Dt_2 p_t /= self.dist_infection.cdf(Dt_max) if verbose: print(f"\t-p_t {p_t} with Dt {Dt} and Dt_max {Dt_max}") g_ret *= p_t if len(self.unsampled_hosts) == 1: g_go /= 2 gg = (P_add * g_ret) / ((1-P_add) * g_go) if only_geometrical: pt = None if verbose: if only_geometrical: print( f"remove:\n\t-g(+1){g_ret}, g(+1)/pt {g_ret}\n\t-g(-1){g_go}\n\tg_ret/g_go {g_ret / g_go}\n{len(self.unsampled_hosts), len(self.T), k_selected_host, k_parent, new_len_prob}\n-----------------------------------------------------") else: print( f"remove:\n\t-g(+1){g_ret},Dt {Dt},pt {p_t},g(+1)/pt {g_ret / p_t}\n\t-g(-1){g_go}\n\tg_ret/g_go {g_ret / g_go}\n{len(self.unsampled_hosts), len(self.T), k_selected_host, k_parent, new_len_prob}\n-----------------------------------------------------") # rewiring T_new = nx.DiGraph(self.T) T_new.remove_edge(parent, selected_host) for child in children: T_new.remove_edge(selected_host, child) T_new.add_edge(parent, child) T_new.remove_node(selected_host) if detailed_probs: # print("AQUI!!!") return T_new, gg, g_go, g_ret, p_t, selected_host, False return T_new, gg, selected_host, False
[docs] def add_remove_step(self, P_add=0.5, P_rewiring=0.5, P_off=0.5, metHast=True, verbose=False): """ Method to propose the addition or removal of an unsampled host to the transmission tree and get the probability of the proposal. Parameters: ----------- P_add: float Probability of proposing an addition of an unsampled host. Else, an unsampled host is going to be proposed for removal. P_rewiring: float Probability of rewiring the new host to another sibling host. P_off: float Probability to rewire the new host to be a leaf. metHast: bool If True, the Metropolis Hastings algorithm is used to accept or reject the change. verbose: bool If True, prints the results of the step. Returns: -------- """ if random() < P_add: T_new, gg, unsampled, added = self.add_unsampled_with_times(P_add=P_add, P_rewiring=P_rewiring, P_off=P_off, verbose=verbose) else: T_new, gg, unsampled, added = self.remove_unsampled_with_times(P_add=P_add, P_rewiring=P_rewiring, P_off=P_off, verbose=verbose) if added: affected_hosts = [list(T_new.predecessors(unsampled))[0]] + [h for h in T_new.successors(unsampled)] Delta_LL = self.Delta_log_likelihood_host(affected_hosts, T_new) + self.log_likelihood_hosts_list([unsampled], T_new) else: affected_hosts = [self.parent(unsampled)]+[h for h in self.successors(unsampled)] Delta_LL = self.Delta_log_likelihood_host(affected_hosts, T_new) - self.log_likelihood_host(unsampled) # L_new = self.log_likelihood_transmission_tree(T_new) # nn = dict_replace.get(nn, nn) pp = np.exp(Delta_LL) if self.genetic_prior is not None: LP_top_old = self.genetic_prior.correction_LL LP_new = self.genetic_prior.log_prior_T(T_new,verbose=verbose) DL_prior = LP_new - self.genetic_log_prior pp *= np.exp(DL_prior) if self.same_location_prior is not None: DL_prior_same_location, LP_sloc_new, LP_sloc_old, LP_sloc_top_old = self.compute_Delta_loc_prior(T_new) pp *= np.exp(DL_prior_same_location) P = gg * pp if metHast: if P > 1: self.log_likelihood = self.log_likelihood+Delta_LL self.T = T_new accepted = True if not added: if verbose: print(f"Removing host accepted with acceptance probability {P}") print("__" * 50, "\n\n") self.unsampled_hosts.remove(unsampled) else: if verbose: print(f"Adding host accepted with acceptance probability {P}") print("__" * 50, "\n\n") self.unsampled_hosts.append(unsampled) if self.genetic_prior is not None: self.genetic_log_prior = LP_new self.genetic_prior.log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new else: if random() < P: accepted = True self.log_likelihood = self.log_likelihood+Delta_LL self.T = T_new if self.genetic_prior is not None: self.genetic_log_prior = LP_new self.genetic_prior.log_prior = LP_new if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_new self.same_location_log_prior = LP_sloc_new if not added: if verbose: print(f"Removing host accepted with acceptance probability {P}") print("__" * 50, "\n\n") self.unsampled_hosts.remove(unsampled) else: if verbose: print(f"Adding host accepted with acceptance probability {P}") print("__" * 50, "\n\n") self.unsampled_hosts.append(unsampled) else: if self.genetic_prior is not None: self.genetic_prior.correction_LL = LP_top_old self.genetic_prior.log_prior = self.genetic_log_prior if self.same_location_prior is not None: self.same_location_prior.log_prior = LP_sloc_old self.same_location_log_prior = LP_sloc_old if verbose: if added: print(f"Adding host rejected with acceptance probability {P}") print("__" * 50, "\n\n") else: print(f"Removing host rejected with acceptance probability {P}") print("__" * 50, "\n\n") accepted = False Delta_LL = 0 return T_new,gg,pp,P,added, accepted, Delta_LL
[docs] def MCMC_step(self, N_steps, verbose=False): L_old = self.get_log_likelihood_transmission() #Getting sure that the number of candidates to chain is computed self.get_N_candidates_to_chain(recompute=True) rejects = 0 for itt in range(N_steps): if verbose: print(itt, L_old) # Choosing type of change selected_movement = randint(0, 1) if selected_movement == 0: self.infection_time_from_sampling_step(verbose=verbose) else: tree_slicing_step(self)