RsBundle  Artifact [efed8f22f7]

Artifact efed8f22f734ff42719ae47624bed71d4ffd08e0:

  • File src/solver/masterprocess.rs — part of check-in [383237c005] at 2019-12-25 10:16:20 on branch async-minorants — Merge async (user: fifr size: 13550) [more...]

/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Asynchronous process solving a master problem.

use crossbeam::channel::{unbounded as channel, Receiver, RecvError, SendError, Sender};
use either::Either;
use log::{debug, warn};
use std::sync::Arc;
use threadpool::ThreadPool;

use crate::master::primalmaster::PrimalMaster;
use crate::master::MasterProblem;
use crate::problem::{FirstOrderProblem, SubgradientExtender};
use crate::{DVector, Minorant, Real};

#[derive(Debug)]
#[non_exhaustive]
pub enum Error<MErr, PErr> {
    /// The communication channel for sending requests has been disconnected.
    DisconnectedSender,
    /// The communication channel for sending responds has been disconnected.
    DisconnectedReceiver,
    /// An error occurred when computing an aggregated primal.
    Aggregation(MErr),
    /// An error occurred during the subgradient extension.
    SubgradientExtension(PErr),
    /// An error has been raised by the underlying master problem solver.
    Master(MErr),
}

impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
    MErr: std::fmt::Display,
    PErr: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
        use Error::*;
        match self {
            DisconnectedSender => writeln!(fmt, "Communication channel to master process has been disconnected"),
            DisconnectedReceiver => writeln!(fmt, "Communication channel from master process has been disconnected"),
            Aggregation(err) => writeln!(fmt, "Computation of primal aggregate failed: {}", err),
            SubgradientExtension(err) => writeln!(fmt, "Subgradient extension failed: {}", err),
            Master(err) => writeln!(fmt, "Error in master problem solver: {}", err),
        }
    }
}

impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
    MErr: std::error::Error + 'static,
    PErr: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            DisconnectedSender => None,
            DisconnectedReceiver => None,
            Aggregation(err) => Some(err),
            SubgradientExtension(err) => Some(err),
            Master(err) => Some(err),
        }
    }
}

impl<MErr, PErr, T> From<SendError<T>> for Error<MErr, PErr>
where
    T: Send + 'static,
{
    fn from(_err: SendError<T>) -> Error<MErr, PErr> {
        Error::DisconnectedSender
    }
}

impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
    fn from(_err: RecvError) -> Error<MErr, PErr> {
        Error::DisconnectedReceiver
    }
}

/// Configuration information for setting up a master problem.
pub struct MasterConfig {
    /// The number of subproblems.
    pub num_subproblems: usize,
    /// The number of variables.
    pub num_vars: usize,
    /// The lower bounds on the variables.
    pub lower_bounds: Option<DVector>,
    /// The lower bounds on the variables.
    pub upper_bounds: Option<DVector>,
}

/// A task for the master problem.
enum MasterTask<Pr, PErr, M>
where
    M: MasterProblem,
{
    /// Add new variables to the master problem.
    AddVariables(Vec<(Option<usize>, Real, Real)>, Box<SubgradientExtender<Pr, PErr>>),

    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Minorant, Pr),

    /// Move the center of the master problem in the given direction.
    ///
    /// The third value is a unique identifier for the current center.
    MoveCenter(Real, Arc<DVector>, usize),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },

    /// Compress the bundle.
    Compress,

    /// Set the weight parameter of the master problem.
    SetWeight { weight: Real },

    /// Return the current aggregated primal.
    GetAggregatedPrimal {
        subproblem: usize,
        tx: Sender<Result<Pr, M::Err>>,
    },
}

/// The response send from a master process.
///
/// The response contains the evaluation results of the latest call to `solve`.
pub struct MasterResponse {
    pub nxt_d: DVector,
    pub nxt_mod: Real,
    pub sgnorm: Real,
    /// The number of internal iterations.
    pub cnt_updates: usize,
    /// The index of the center.
    pub center_index: usize,
}

type ToMasterSender<P, M> = Sender<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;

type ToMasterReceiver<P, M> = Receiver<MasterTask<<P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err, M>>;

type MasterSender<MErr, PErr> = Sender<Result<MasterResponse, Error<MErr, PErr>>>;

pub type MasterReceiver<MErr, PErr> = Receiver<Result<MasterResponse, Error<MErr, PErr>>>;

pub struct MasterProcess<P, M>
where
    P: FirstOrderProblem,
    M: MasterProblem,
{
    /// The channel to transmit new tasks to the master problem.
    tx: ToMasterSender<P, M>,

    /// The channel to receive solutions from the master problem.
    pub rx: MasterReceiver<M::Err, P::Err>,

    phantom: std::marker::PhantomData<M>,
}

impl<P, M> MasterProcess<P, M>
where
    P: FirstOrderProblem,
    P::Primal: Send + 'static,
    P::Err: Send + 'static,
    M: MasterProblem + Send + 'static,
    M::MinorantIndex: std::hash::Hash,
    M::Err: Send + 'static,
{
    pub fn start(master: M, master_config: MasterConfig, threadpool: &mut ThreadPool) -> Self {
        // Create a pair of communication channels.
        let (to_master_tx, to_master_rx) = channel();
        let (from_master_tx, from_master_rx) = channel();

        // The the master process thread.
        threadpool.execute(move || {
            debug!("Master process started");
            let mut from_master_tx = from_master_tx;
            let mut sgext = None;
            if let Err(err) = Self::master_main(master, master_config, &mut from_master_tx, to_master_rx, &mut sgext) {
                #[allow(unused_must_use)]
                {
                    from_master_tx.send(Err(err));
                }
            }
            debug!("Master proces stopped");
        });

        MasterProcess {
            tx: to_master_tx,
            rx: from_master_rx,
            phantom: std::marker::PhantomData,
        }
    }

    /// Add new variables to the master problem.
    pub fn add_vars(
        &mut self,
        vars: Vec<(Option<usize>, Real, Real)>,
        sgext: Box<SubgradientExtender<P::Primal, P::Err>>,
    ) -> Result<(), Error<M::Err, P::Err>>
    where
        P::Err: 'static,
    {
        Ok(self.tx.send(MasterTask::AddVariables(vars, sgext))?)
    }

    /// Add a new minorant to the master problem model.
    ///
    /// This adds the specified `minorant` with associated `primal` data to the
    /// model of subproblem `i`.
    pub fn add_minorant(
        &mut self,
        i: usize,
        minorant: Minorant,
        primal: P::Primal,
    ) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::AddMinorant(i, minorant, primal))?)
    }

    /// Move the center of the master problem.
    ///
    /// This moves the master problem's center in direction $\\alpha \\cdot d$.
    pub fn move_center(
        &mut self,
        alpha: Real,
        d: Arc<DVector>,
        center_index: usize,
    ) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::MoveCenter(alpha, d, center_index))?)
    }

    /// Solve the master problem.
    ///
    /// The current function value in the center `center_value`.
    /// Once the master problem is solved the process will send a
    /// [`MasterResponse`] message to the `tx` channel.
    pub fn solve(&mut self, center_value: Real) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::Solve { center_value })?)
    }

    /// Compresses the model.
    pub fn compress(&mut self) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::Compress)?)
    }

    /// Sets the new weight of the proximal term in the master problem.
    pub fn set_weight(&mut self, weight: Real) -> Result<(), Error<M::Err, P::Err>> {
        Ok(self.tx.send(MasterTask::SetWeight { weight })?)
    }

    /// Get the current aggregated primal for a certain subproblem.
    pub fn get_aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, Error<M::Err, P::Err>> {
        let (tx, rx) = channel();
        self.tx.send(MasterTask::GetAggregatedPrimal { subproblem, tx })?;
        rx.recv()?.map_err(Error::Aggregation)
    }

    /// The main loop of the master process.
    fn master_main(
        master: M,
        master_config: MasterConfig,
        tx: &mut MasterSender<M::Err, P::Err>,
        rx: ToMasterReceiver<P, M>,
        subgradient_extender: &mut Option<Box<SubgradientExtender<P::Primal, P::Err>>>,
    ) -> Result<(), Error<M::Err, P::Err>> {
        let mut master = PrimalMaster::<_, P::Primal>::new(master);
        let mut center_idx = 0;

        // Initialize the master problem.
        master
            .set_num_subproblems(master_config.num_subproblems)
            .map_err(Error::Master)?;
        master
            .set_vars(
                master_config.num_vars,
                master_config.lower_bounds,
                master_config.upper_bounds,
            )
            .map_err(Error::Master)?;

        // The main iteration: wait for new tasks.
        for m in rx {
            match m {
                MasterTask::AddVariables(vars, mut sgext) => {
                    debug!("master: add {} variables to the subproblem", vars.len());
                    master.add_vars(vars, &mut sgext).map_err(|err| match err {
                        Either::Left(err) => Error::Master(err),
                        Either::Right(err) => Error::SubgradientExtension(err),
                    })?;
                    *subgradient_extender = Some(sgext);
                }
                MasterTask::AddMinorant(i, mut m, primal) => {
                    debug!("master: add minorant to subproblem {}", i);
                    // It may happen the number new minorant belongs to an earlier evaluation
                    // with less variables (i.e. new variables have been added
                    // after the start of the evaluation but before the new
                    // minorant is added, i.e. now). In this case we must add
                    // extend the minorant accordingly.
                    if m.linear.len() < master.num_variables() {
                        if let Some(ref mut sgext) = subgradient_extender {
                            let newinds = (m.linear.len()..master.num_variables()).collect::<Vec<_>>();
                            let new_subg =
                                sgext(i, &primal, &newinds).map_err(Error::<M::Err, P::Err>::SubgradientExtension)?;
                            m.linear.extend(new_subg);
                        }
                    }
                    master.add_minorant(i, m, primal).map_err(Error::Master)?;
                }
                MasterTask::MoveCenter(alpha, d, center_index) => {
                    debug!("master: move center to {}", center_index);
                    master.move_center(alpha, &d);
                    center_idx = center_index;
                }
                MasterTask::Compress => {
                    debug!("master: compress bundle");
                    master.compress().map_err(Error::Master)?;
                }
                MasterTask::Solve { center_value } => {
                    debug!("master: solve with center_value {}", center_value);
                    master.fill_models().map_err(Error::Master)?;
                    master.solve(center_value).map_err(Error::Master)?;
                    let master_response = MasterResponse {
                        nxt_d: master.get_primopt(),
                        nxt_mod: master.get_primoptval(),
                        sgnorm: master.get_dualoptnorm2().sqrt(),
                        cnt_updates: master.cnt_updates(),
                        center_index: center_idx,
                    };
                    if let Err(err) = tx.send(Ok(master_response)) {
                        warn!("Master process cancelled because of channel error: {}", err);
                        break;
                    }
                }
                MasterTask::SetWeight { weight } => {
                    debug!("master: set weight {}", weight);
                    master.set_weight(weight).map_err(Error::Master)?;
                }
                MasterTask::GetAggregatedPrimal { subproblem, tx } => {
                    debug!("master: get aggregated primal for {}", subproblem);
                    if tx.send(master.aggregated_primal(subproblem)).is_err() {
                        warn!("Sending of aggregated primal for {} failed", subproblem);
                    };
                }
            };
        }

        Ok(())
    }
}