RsBundle  Artifact [b4905f04d1]

Artifact b4905f04d109c900ebf40bc6148411416f7bc64e:

  • File src/solver/masterprocess.rs — part of check-in [76f1c7f25f] at 2020-07-20 18:03:40 on branch minorant-trait — master: allow only adding of new variables (no moving) (user: fifr size: 12889) [more...]

/*
 * Copyright (c) 2019, 2020 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Asynchronous process solving a master problem.

#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};

use either::Either;
use log::{debug, warn};
use std::sync::Arc;
use threadpool::ThreadPool;

use super::channels::{ClientSender, Message};
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{DVector, Minorant, Real};

#[derive(Debug)]
pub enum Error<MErr, PErr> {
    /// The communication channel for sending requests has been disconnected.
    DisconnectedSender,
    /// The communication channel for sending responds has been disconnected.
    DisconnectedReceiver,
    /// An error occurred when computing an aggregated primal.
    Aggregation(MErr),
    /// An error occurred during the subgradient extension.
    SubgradientExtension(PErr),
    /// An error has been raised by the underlying master problem solver.
    Master(MErr),
}

impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
    MErr: std::fmt::Display,
    PErr: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
        use Error::*;
        match self {
            DisconnectedSender => writeln!(fmt, "Communication channel to master process has been disconnected"),
            DisconnectedReceiver => writeln!(fmt, "Communication channel from master process has been disconnected"),
            Aggregation(err) => writeln!(fmt, "Computation of primal aggregate failed: {}", err),
            SubgradientExtension(err) => writeln!(fmt, "Subgradient extension failed: {}", err),
            Master(err) => writeln!(fmt, "Error in master problem solver: {}", err),
        }
    }
}

impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
    MErr: std::error::Error + 'static,
    PErr: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            DisconnectedSender => None,
            DisconnectedReceiver => None,
            Aggregation(err) => Some(err),
            SubgradientExtension(err) => Some(err),
            Master(err) => Some(err),
        }
    }
}

impl<MErr, PErr, T> From<SendError<T>> for Error<MErr, PErr>
where
    T: Send + 'static,
{
    fn from(_err: SendError<T>) -> Error<MErr, PErr> {
        Error::DisconnectedSender
    }
}

impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
    fn from(_err: RecvError) -> Error<MErr, PErr> {
        Error::DisconnectedReceiver
    }
}

/// Configuration information for setting up a master problem.
pub struct MasterConfig {
    /// The number of subproblems.
    pub num_subproblems: usize,
    /// The number of variables.
    pub num_vars: usize,
    /// The lower bounds on the variables.
    pub lower_bounds: Option<DVector>,
    /// The lower bounds on the variables.
    pub upper_bounds: Option<DVector>,
}

/// A task for the master problem.
enum MasterTask<Mn, PErr, M>
where
    Mn: Minorant,
    M: MasterProblem<Mn>,
{
    /// Add new variables with bounds to the master problem.
    AddVariables(Vec<(Real, Real)>, Box<SubgradientExtender<Mn, PErr>>),

    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Mn),

    /// Move the center of the master problem in the given direction.
    MoveCenter(Real, Arc<DVector>),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },

    /// Compress the bundle.
    Compress,

    /// Set the weight parameter of the master problem.
    SetWeight { weight: Real },

    /// Return the current aggregated primal.
    GetAggregatedPrimal {
        subproblem: usize,
        tx: Sender<Result<Mn::Primal, M::Err>>,
    },
}

/// The response send from a master process.
///
/// The response contains the evaluation results of the latest call to `solve`.
pub enum Response<MErr, PErr> {
    /// A regular master result.
    Result {
        nxt_d: DVector,
        nxt_mod: Real,
        sgnorm: Real,
        /// The number of internal iterations.
        cnt_updates: usize,
    },
    /// An error occurred.
    Error(Error<MErr, PErr>),
}

/// Convenient type for `Response`.
pub type MasterResponse<P, M> =
    Response<<M as MasterProblem<<P as FirstOrderProblem>::Minorant>>::Err, <P as FirstOrderProblem>::Err>;

type ToMasterSender<P, M> = Sender<MasterTask<<P as FirstOrderProblem>::Minorant, <P as FirstOrderProblem>::Err, M>>;

type ToMasterReceiver<P, M> =
    Receiver<MasterTask<<P as FirstOrderProblem>::Minorant, <P as FirstOrderProblem>::Err, M>>;

pub struct MasterProcess<P, M>
where
    P: FirstOrderProblem,
    M: MasterProblem<P::Minorant>,
{
    /// The channel to transmit new tasks to the master problem.
    tx: ToMasterSender<P, M>,

    phantom: std::marker::PhantomData<M>,
}

impl<P, M> MasterProcess<P, M>
where
    P: FirstOrderProblem,
    P::Minorant: Send + 'static,
    P::Err: Send + 'static,
    M: MasterProblem<P::Minorant> + Send + 'static,
    M::Err: Send + 'static,
{
    pub fn start(master: M, master_config: MasterConfig, tx: ClientSender<P, M>, threadpool: &mut ThreadPool) -> Self {
        // Create a pair of communication channels.
        let (to_master_tx, to_master_rx) = channel();

        // The the master process thread.
        threadpool.execute(move || {
            debug!("Master process started");
            let mut from_master_tx = tx;
            let mut sgext = None;
            if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx, &mut sgext) {
                #[allow(unused_must_use)]
                {
                    from_master_tx.send(Message::Master(Response::Error(err)));
                }
            }
            debug!("Master proces stopped");
        });

        MasterProcess {
            tx: to_master_tx,
            phantom: std::marker::PhantomData,
        }
    }

    /// Add new variables to the master problem.
    pub fn add_vars(
        &mut self,
        vars: Vec<(Real, Real)>,
        sgext: Box<SubgradientExtender<P::Minorant, P::Err>>,
    ) -> Result<(), Error<M::Err, P::Err>>
    where
        P::Err: 'static,
    {
        Ok(self.tx.send(MasterTask::AddVariables(vars, sgext))?)
    }

    /// Add a new minorant to the master problem model.
    ///
    /// This adds the specified `minorant` with associated `primal` data to the
    /// model of subproblem `i`.
    pub fn add_minorant(&mut self, i: usize, minorant: P::Minorant) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::AddMinorant(i, minorant))?)
    }

    /// Move the center of the master problem.
    ///
    /// This moves the master problem's center in direction $\\alpha \\cdot d$.
    pub fn move_center(&mut self, alpha: Real, d: Arc<DVector>) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::MoveCenter(alpha, d))?)
    }

    /// Solve the master problem.
    ///
    /// The current function value in the center `center_value`.
    /// Once the master problem is solved the process will send a
    /// [`Response`] message to the `tx` channel.
    pub fn solve(&mut self, center_value: Real) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::Solve { center_value })?)
    }

    /// Compresses the model.
    pub fn compress(&mut self) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::Compress)?)
    }

    /// Sets the new weight of the proximal term in the master problem.
    pub fn set_weight(&mut self, weight: Real) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::SetWeight { weight })?)
    }

    /// Get the current aggregated primal for a certain subproblem.
    pub fn get_aggregated_primal(
        &self,
        subproblem: usize,
    ) -> Result<<P::Minorant as Minorant>::Primal, Error<M::Err, P::Err>> {
        let (tx, rx) = channel();
        self.tx.send(MasterTask::GetAggregatedPrimal { subproblem, tx })?;
        rx.recv()?.map_err(Error::Aggregation)
    }

    /// The main loop of the master process.
    fn master_main(
        master: M,
        master_config: MasterConfig,
        tx: &mut ClientSender<P, M>,
        rx: ToMasterReceiver<P, M>,
        subgradient_extender: &mut Option<Box<SubgradientExtender<P::Minorant, P::Err>>>,
    ) -> Result<(), Error<M::Err, P::Err>> {
        let mut master = master;

        // Initialize the master problem.
        master
            .set_num_subproblems(master_config.num_subproblems)
            .map_err(Error::Master)?;
        master
            .set_vars(
                master_config.num_vars,
                master_config.lower_bounds,
                master_config.upper_bounds,
            )
            .map_err(Error::Master)?;

        // The main iteration: wait for new tasks.
        for m in rx {
            match m {
                MasterTask::AddVariables(vars, mut sgext) => {
                    debug!("master: add {} variables to the subproblem", vars.len());
                    master.add_vars(&vars, &mut sgext).map_err(|err| match err {
                        Either::Left(err) => Error::Master(err),
                        Either::Right(err) => Error::SubgradientExtension(err),
                    })?;
                    *subgradient_extender = Some(sgext);
                }
                MasterTask::AddMinorant(i, mut m) => {
                    debug!("master: add minorant to subproblem {}", i);
                    // It may happen the number new minorant belongs to an earlier evaluation
                    // with less variables (i.e. new variables have been added
                    // after the start of the evaluation but before the new
                    // minorant is added, i.e. now). In this case we must add
                    // extend the minorant accordingly.
                    if m.dim() < master.num_variables() {
                        if let Some(ref mut sgext) = subgradient_extender {
                            sgext(i, &mut m).map_err(Error::<M::Err, P::Err>::SubgradientExtension)?;
                        }
                    }
                    master.add_minorant(i, m).map_err(Error::Master)?;
                }
                MasterTask::MoveCenter(alpha, d) => {
                    debug!("master: move center");
                    master.move_center(alpha, &d);
                }
                MasterTask::Compress => {
                    debug!("Compress bundle");
                    master.compress().map_err(Error::Master)?;
                }
                MasterTask::Solve { center_value } => {
                    debug!("master: solve with center_value {}", center_value);
                    master.solve(center_value).map_err(Error::Master)?;
                    let master_response = Response::Result {
                        nxt_d: master.get_primopt(),
                        nxt_mod: master.get_primoptval(),
                        sgnorm: master.get_dualoptnorm2().sqrt(),
                        cnt_updates: master.cnt_updates(),
                    };
                    if let Err(err) = tx.send(Message::Master(master_response)) {
                        warn!("Master process cancelled because of channel error: {}", err);
                        break;
                    }
                }
                MasterTask::SetWeight { weight } => {
                    debug!("master: set weight {}", weight);
                    master.set_weight(weight).map_err(Error::Master)?;
                }
                MasterTask::GetAggregatedPrimal { subproblem, tx } => {
                    debug!("master: get aggregated primal for {}", subproblem);
                    if tx.send(master.aggregated_primal(subproblem)).is_err() {
                        warn!("Sending of aggregated primal for {} failed", subproblem);
                    }
                }
            };
        }

        Ok(())
    }
}