/*
* Copyright (c) 2019, 2020 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! Asynchronous process solving a master problem.
#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};
use either::Either;
use log::{debug, warn};
use std::sync::Arc;
use thiserror::Error;
use threadpool::ThreadPool;
use super::channels::{ClientSender, Message};
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{Aggregatable, DVector, Minorant, Real};
/// Convenient type alias for master process errors.
pub type Error<P, M> =
MasterError<<P as FirstOrderProblem>::Err, <M as MasterProblem<<P as FirstOrderProblem>::Primal>>::Err>;
/// Result type for the master process.
pub type Result<T, P, M> = std::result::Result<T, Error<P, M>>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MasterError<PErr, MErr>
where
PErr: std::error::Error + 'static,
MErr: std::error::Error + 'static,
{
#[error("sending channel to master has been disconnected")]
DisconnectedSender(#[source] Box<dyn std::error::Error + Send>),
#[error("receiving channel from master has been disconnected")]
DisconnectedReceiver(#[source] RecvError),
#[error("subgradient extension failed")]
SubgradientExtension(#[source] PErr),
#[error("master problem error")]
Master(#[source] MErr),
}
impl<T, PErr, MErr> From<SendError<T>> for MasterError<PErr, MErr>
where
T: Send + 'static,
PErr: std::error::Error + 'static,
MErr: std::error::Error + 'static,
{
fn from(err: SendError<T>) -> MasterError<PErr, MErr> {
MasterError::DisconnectedSender(Box::new(err))
}
}
impl<PErr, MErr> From<RecvError> for MasterError<PErr, MErr>
where
PErr: std::error::Error + 'static,
MErr: std::error::Error + 'static,
{
fn from(err: RecvError) -> MasterError<PErr, MErr> {
MasterError::DisconnectedReceiver(err)
}
}
/// Configuration information for setting up a master problem.
pub struct MasterConfig {
/// The number of subproblems.
pub num_subproblems: usize,
/// The number of variables.
pub num_vars: usize,
/// The lower bounds on the variables.
pub lower_bounds: Option<DVector>,
/// The lower bounds on the variables.
pub upper_bounds: Option<DVector>,
}
/// A task for the master problem.
enum MasterTask<Pr, PErr, MErr>
where
Pr: Aggregatable,
{
/// Add new variables to the master problem.
AddVariables(Vec<(Option<usize>, Real, Real)>, Box<SubgradientExtender<Pr, PErr>>),
/// Add a new minorant for a subfunction to the master problem.
AddMinorant(usize, Minorant<Pr>),
/// Move the center of the master problem in the given direction.
///
/// The third value is a unique identifier for the current center.
MoveCenter(Real, Arc<DVector>, usize),
/// Start a new computation of the master problem.
Solve { center_value: Real },
/// Compress the bundle.
Compress,
/// Set the weight parameter of the master problem.
SetWeight { weight: Real },
/// Return the current aggregated primal.
GetAggregatedPrimal {
subproblem: usize,
tx: Sender<std::result::Result<Pr, MErr>>,
},
}
/// The response send from a master process.
///
/// The response contains the evaluation results of the latest call to `solve`.
pub enum Response<E> {
/// A regular master result.
Result {
nxt_d: DVector,
/// Aggregated model value.
nxt_mod: Real,
/// Submodel values.
nxt_submods: Vec<Real>,
/// Aggregated subgradient norm.
sgnorm: Real,
/// The number of internal iterations.
cnt_updates: usize,
/// The index of the center.
center_index: usize,
},
/// An error occurred.
Error(E),
}
/// Convenient type for `Response`.
pub type MasterResponse<P, M> = Response<Error<P, M>>;
type MasterTsk<P, M> = MasterTask<
<P as FirstOrderProblem>::Primal,
<P as FirstOrderProblem>::Err,
<M as MasterProblem<<P as FirstOrderProblem>::Primal>>::Err,
>;
type ToMasterSender<P, M> = Sender<MasterTsk<P, M>>;
type ToMasterReceiver<P, M> = Receiver<MasterTsk<P, M>>;
pub struct MasterProcess<P, M>
where
P: FirstOrderProblem,
M: MasterProblem<P::Primal>,
{
/// The channel to transmit new tasks to the master problem.
tx: ToMasterSender<P, M>,
phantom: std::marker::PhantomData<M>,
}
impl<P, M> MasterProcess<P, M>
where
P: FirstOrderProblem,
P::Primal: Send + 'static,
P::Err: Send + 'static,
M: MasterProblem<P::Primal> + Send + 'static,
M::Err: Send + 'static,
{
pub fn start<I>(
master: M,
master_config: MasterConfig,
tx: ClientSender<I, P, M>,
threadpool: &mut ThreadPool,
) -> Self
where
I: Send + 'static,
{
// Create a pair of communication channels.
let (to_master_tx, to_master_rx) = channel();
// The the master process thread.
threadpool.execute(move || {
debug!("Master process started");
let mut from_master_tx = tx;
let mut sgext = None;
if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx, &mut sgext) {
#[allow(unused_must_use)]
{
from_master_tx.send(Message::Master(Response::Error(err)));
}
}
debug!("Master proces stopped");
});
MasterProcess {
tx: to_master_tx,
phantom: std::marker::PhantomData,
}
}
/// Add new variables to the master problem.
pub fn add_vars(
&mut self,
vars: Vec<(Option<usize>, Real, Real)>,
sgext: Box<SubgradientExtender<P::Primal, P::Err>>,
) -> Result<(), P, M>
where
P::Err: 'static,
{
Ok(self.tx.send(MasterTask::AddVariables(vars, sgext))?)
}
/// Add a new minorant to the master problem model.
///
/// This adds the specified `minorant` with associated `primal` data to the
/// model of subproblem `i`.
pub fn add_minorant(&mut self, i: usize, minorant: Minorant<P::Primal>) -> Result<(), P, M> {
Ok(self.tx.send(MasterTask::AddMinorant(i, minorant))?)
}
/// Move the center of the master problem.
///
/// This moves the master problem's center in direction $\\alpha \\cdot d$.
pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>, center_index: usize) -> Result<(), P, M> {
Ok(self.tx.send(MasterTask::MoveCenter(alpha, d, center_index))?)
}
/// Solve the master problem.
///
/// The current function value in the center `center_value`.
/// Once the master problem is solved the process will send a
/// [`Response`] message to the `tx` channel.
pub fn solve(&mut self, center_value: Real) -> Result<(), P, M> {
Ok(self.tx.send(MasterTask::Solve { center_value })?)
}
/// Compresses the model.
pub fn compress(&mut self) -> Result<(), P, M> {
Ok(self.tx.send(MasterTask::Compress)?)
}
/// Sets the new weight of the proximal term in the master problem.
pub fn set_weight(&mut self, weight: Real) -> Result<(), P, M> {
Ok(self.tx.send(MasterTask::SetWeight { weight })?)
}
/// Get the current aggregated primal for a certain subproblem.
pub fn get_aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
let (tx, rx) = channel();
self.tx.send(MasterTask::GetAggregatedPrimal { subproblem, tx })?;
rx.recv()?.map_err(MasterError::Master)
}
/// The main loop of the master process.
fn master_main<I>(
master: M,
master_config: MasterConfig,
tx: &mut ClientSender<I, P, M>,
rx: ToMasterReceiver<P, M>,
subgradient_extender: &mut Option<Box<SubgradientExtender<P::Primal, P::Err>>>,
) -> Result<(), P, M> {
let mut master = master;
let mut center_idx = 0;
// Initialize the master problem.
master
.set_num_subproblems(master_config.num_subproblems)
.map_err(MasterError::Master)?;
master
.set_vars(
master_config.num_vars,
master_config.lower_bounds,
master_config.upper_bounds,
)
.map_err(MasterError::Master)?;
// The main iteration: wait for new tasks.
for m in rx {
match m {
MasterTask::AddVariables(vars, mut sgext) => {
debug!("master: add {} variables to the subproblem", vars.len());
master.add_vars(&vars, &mut sgext).map_err(|err| match err {
Either::Left(err) => MasterError::Master(err),
Either::Right(err) => MasterError::SubgradientExtension(err),
})?;
*subgradient_extender = Some(sgext);
}
MasterTask::AddMinorant(i, mut m) => {
debug!("master: add minorant to subproblem {}", i);
// It may happen the number new minorant belongs to an earlier evaluation
// with less variables (i.e. new variables have been added
// after the start of the evaluation but before the new
// minorant is added, i.e. now). In this case we must add
// extend the minorant accordingly.
if m.linear.len() < master.num_variables() {
if let Some(ref mut sgext) = subgradient_extender {
let newinds = (m.linear.len()..master.num_variables()).collect::<Vec<_>>();
let new_subg =
sgext(i, &m.primal, &newinds).map_err(Error::<P, M>::SubgradientExtension)?;
m.linear.extend(new_subg);
}
}
master.add_minorant(i, m).map_err(MasterError::Master)?;
}
MasterTask::MoveCenter(alpha, d, center_index) => {
debug!("master: move center to {}", center_index);
master.move_center(alpha, &d);
center_idx = center_index;
}
MasterTask::Compress => {
debug!("master: compress bundle");
master.compress().map_err(MasterError::Master)?;
}
MasterTask::Solve { center_value } => {
debug!("master: solve with center_value {}", center_value);
master.fill_models(|_, _| {}).map_err(MasterError::Master)?;
master.solve(center_value).map_err(MasterError::Master)?;
let master_response = Response::Result {
nxt_d: master.get_primopt(),
nxt_mod: master.get_primoptval(),
nxt_submods: (0..master.num_subproblems())
.map(|i| master.get_optvalue_subproblem(i))
.collect(),
sgnorm: master.get_dualoptnorm2().sqrt(),
cnt_updates: master.cnt_updates(),
center_index: center_idx,
};
if let Err(err) = tx.send(Message::Master(master_response)) {
warn!("Master process cancelled because of channel error: {}", err);
break;
}
}
MasterTask::SetWeight { weight } => {
debug!("master: set weight {}", weight);
master.set_weight(weight).map_err(MasterError::Master)?;
}
MasterTask::GetAggregatedPrimal { subproblem, tx } => {
debug!("master: get aggregated primal for {}", subproblem);
if tx.send(master.aggregated_primal(subproblem)).is_err() {
warn!("Sending of aggregated primal for {} failed", subproblem);
}
}
};
}
Ok(())
}
}