/*
* Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! Asynchronous process solving a master problem.
use crossbeam::channel::{unbounded as channel, Receiver, Sender};
use log::{debug, warn};
use std::sync::Arc;
use threadpool::ThreadPool;
use super::sync::Error;
use crate::master::primalmaster::PrimalMaster;
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{DVector, Minorant, Real};
/// Configuration information for setting up a master problem.
pub struct MasterConfig {
/// The number of subproblems.
pub num_subproblems: usize,
/// The number of variables.
pub num_vars: usize,
/// The lower bounds on the variables.
pub lower_bounds: Option<DVector>,
/// The lower bounds on the variables.
pub upper_bounds: Option<DVector>,
}
/// A task for the master problem.
enum MasterTask<Pr, PErr, M>
where
M: MasterProblem,
{
/// Add new variables to the master problem.
AddVariables(Vec<(Option<usize>, Real, Real)>, Box<SubgradientExtender<Pr, PErr>>),
/// Add a new minorant for a subfunction to the master problem.
AddMinorant(usize, Minorant, Pr),
/// Move the center of the master problem in the given direction.
MoveCenter(Real, Arc<DVector>),
/// Start a new computation of the master problem.
Solve { center_value: Real },
/// Compress the bundle.
Compress,
/// Set the weight parameter of the master problem.
SetWeight { weight: Real },
/// Return the current aggregated primal.
GetAggregatedPrimal {
subproblem: usize,
tx: Sender<Result<Pr, M::Err>>,
},
}
/// The response send from a master process.
///
/// The response contains the evaluation results of the latest
pub struct MasterResponse {
pub nxt_d: DVector,
pub nxt_mod: Real,
pub sgnorm: Real,
/// The number of internal iterations.
pub cnt_updates: usize,
}
type ToMasterSender<P, M> = Sender<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;
type ToMasterReceiver<P, M> = Receiver<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;
type MasterSender<E> = Sender<Result<MasterResponse, E>>;
pub type MasterReceiver<E> = Receiver<Result<MasterResponse, E>>;
pub struct MasterProcess<P, M>
where
P: FirstOrderProblem,
M: MasterProblem,
{
/// The channel to transmit new tasks to the master problem.
tx: ToMasterSender<P, M>,
/// The channel to receive solutions from the master problem.
pub rx: MasterReceiver<M::Err>,
phantom: std::marker::PhantomData<M>,
}
impl<P, M> MasterProcess<P, M>
where
P: FirstOrderProblem,
P::Primal: Send + 'static,
P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
M: MasterProblem + Send + 'static,
M::MinorantIndex: std::hash::Hash,
M::Err: Send + 'static,
{
pub fn start(master: M, master_config: MasterConfig, threadpool: &mut ThreadPool) -> Self {
// Create a pair of communication channels.
let (to_master_tx, to_master_rx) = channel();
let (from_master_tx, from_master_rx) = channel();
// The the master process thread.
threadpool.execute(move || {
debug!("Master process started");
let mut from_master_tx = from_master_tx;
if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx) {
#[allow(unused_must_use)]
{
from_master_tx.send(Err(err));
}
}
debug!("Master proces stopped");
});
MasterProcess {
tx: to_master_tx,
rx: from_master_rx,
phantom: std::marker::PhantomData,
}
}
/// Add new variables to the master problem.
pub fn add_vars(
&mut self,
vars: Vec<(Option<usize>, Real, Real)>,
sgext: Box<SubgradientExtender<P::Primal, P::Err>>,
) -> Result<(), Error<P::Err>>
where
P::Err: 'static,
{
self.tx
.send(MasterTask::AddVariables(vars, sgext))
.map_err(|err| Error::Process(err.into()))
}
/// Add a new minorant to the master problem model.
///
/// This adds the specified `minorant` with associated `primal` data to the
/// model of subproblem `i`.
pub fn add_minorant(&mut self, i: usize, minorant: Minorant, primal: P::Primal) -> Result<(), Error<P::Err>> {
self.tx
.send(MasterTask::AddMinorant(i, minorant, primal))
.map_err(|err| Error::Process(err.into()))
}
/// Move the center of the master problem.
///
/// This moves the master problem's center in direction $\\alpha \\cdot d$.
pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>) -> Result<(), Error<P::Err>> {
self.tx
.send(MasterTask::MoveCenter(alpha, d))
.map_err(|err| Error::Process(err.into()))
}
/// Solve the master problem.
///
/// The current function value in the center `center_value`.
/// Once the master problem is solved the process will send a
/// [`MasterResponse`] message to the `tx` channel.
pub fn solve(&mut self, center_value: Real) -> Result<(), Error<P::Err>> {
self.tx
.send(MasterTask::Solve { center_value })
.map_err(|err| Error::Process(err.into()))
}
/// Compresses the model.
pub fn compress(&mut self) -> Result<(), Error<P::Err>> {
self.tx
.send(MasterTask::Compress)
.map_err(|err| Error::Process(err.into()))
}
/// Sets the new weight of the proximal term in the master problem.
pub fn set_weight(&mut self, weight: Real) -> Result<(), Error<P::Err>> {
self.tx
.send(MasterTask::SetWeight { weight })
.map_err(|err| Error::Process(err.into()))
}
/// Get the current aggregated primal for a certain subproblem.
pub fn get_aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, Error<P::Err>> {
let (tx, rx) = channel();
self.tx
.send(MasterTask::GetAggregatedPrimal { subproblem, tx })
.map_err(|err| Error::Process(err.into()))?;
rx.recv()
.map_err(|err| Error::Process(err.into()))?
.map_err(|err| Error::Master(err.into()))
}
/// The main loop of the master process.
fn master_main(
master: M,
master_config: MasterConfig,
tx: &mut MasterSender<M::Err>,
rx: ToMasterReceiver<P, M>,
) -> Result<(), M::Err> {
let mut master = PrimalMaster::<_, P::Primal>::new(master);
// Initialize the master problem.
master.set_num_subproblems(master_config.num_subproblems)?;
master.set_vars(
master_config.num_vars,
master_config.lower_bounds,
master_config.upper_bounds,
)?;
// The main iteration: wait for new tasks.
for m in rx {
match m {
MasterTask::AddVariables(vars, sgext) => {
debug!("master: add {} variables to the subproblem", vars.len());
master.add_vars(vars, sgext)?;
}
MasterTask::AddMinorant(i, m, primal) => {
debug!("master: add minorant to subproblem {}", i);
master.add_minorant(i, m, primal)?;
}
MasterTask::MoveCenter(alpha, d) => {
debug!("master: move center");
master.move_center(alpha, &d);
}
MasterTask::Compress => {
debug!("Compress bundle");
master.compress()?;
}
MasterTask::Solve { center_value } => {
debug!("master: solve with center_value {}", center_value);
master.solve(center_value)?;
let master_response = MasterResponse {
nxt_d: master.get_primopt(),
nxt_mod: master.get_primoptval(),
sgnorm: master.get_dualoptnorm2().sqrt(),
cnt_updates: master.cnt_updates(),
};
if let Err(err) = tx.send(Ok(master_response)) {
warn!("Master process cancelled because of channel error: {}", err);
break;
}
}
MasterTask::SetWeight { weight } => {
debug!("master: set weight {}", weight);
master.set_weight(weight)?;
}
MasterTask::GetAggregatedPrimal { subproblem, tx } => {
debug!("master: get aggregated primal for {}", subproblem);
if tx.send(master.aggregated_primal(subproblem)).is_err() {
warn!("Sending of aggregated primal for {} failed", subproblem);
};
}
};
}
Ok(())
}
}