RsBundle  Artifact [5857b1bc16]

Artifact 5857b1bc160395f07ea70ff863cc4312969da130:

  • File src/parallel/solver.rs — part of check-in [6186a4f7ed] at 2019-07-17 14:41:42 on branch async — solver: make master problem a type argument (user: fifr size: 9959)

/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! An asynchronous parallel bundle solver.

use crossbeam::channel::{unbounded as channel, Receiver, Sender};
use num_cpus;
use std::sync::Arc;
use threadpool::ThreadPool;

use crate::{DVector, Minorant, Real};

use super::problem::{EvalResult, FirstOrderProblem};
use crate::master::{
    BoxedMasterProblem, CplexMaster, Error as MasterProblemError, MasterProblem, UnconstrainedMasterProblem,
};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
    /// An error raised by the master problem process.
    Master(MasterProblemError<E>),
    /// The iteration limit has been reached.
    IterationLimit { limit: usize },
    /// An error raised by a subproblem evaluation.
    Evaluation(E),
}

impl<E> std::fmt::Display for Error<E>
where
    E: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
        use Error::*;
        match self {
            Master(err) => writeln!(fmt, "Error in master problem: {}", err),
            IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
            Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
        }
    }
}

impl<E> std::error::Error for Error<E>
where
    E: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            Master(err) => Some(err),
            Evaluation(err) => Some(err),
            _ => None,
        }
    }
}

impl<E> From<MasterProblemError<E>> for Error<E>
where
    E: std::error::Error + Send + Sync,
{
    fn from(err: MasterProblemError<E>) -> Self {
        Error::Master(err)
    }
}

/// A task for the master problem.
enum MasterTask {
    /// Add a new minorant for a subfunction to the master problem.
    AddMinorant(usize, Minorant),

    /// Move the center of the master problem in the given direction.
    MoveCenter(Real, Arc<DVector>),

    /// Start a new computation of the master problem.
    Solve { center_value: Real },
}

type MasterSender<E> = Sender<std::result::Result<Real, MasterProblemError<E>>>;

type MasterReceiver = Receiver<MasterTask>;

/// Implementation of a parallel bundle method.
pub struct Solver<P>
where
    P: FirstOrderProblem,
{
    /// The first order problem.
    problem: P,

    /// Current center of stability.
    cur_y: DVector,

    /// Function value in the current point.
    cur_val: Real,

    /// The threadpool of the solver.
    threadpool: ThreadPool,

    /// The channel to transmit new tasks to the master problem.
    master_tx: Option<Sender<MasterTask>>,

    /// The channel to receive solutions from the master problem.
    master_rx: Option<Receiver<std::result::Result<Real, MasterProblemError<P::Err>>>>,

    /// The channel to receive the evaluation results from subproblems.
    client_tx: Option<Sender<std::result::Result<EvalResult<usize, P::Primal>, P::Err>>>,

    /// The channel to receive the evaluation results from subproblems.
    client_rx: Option<Receiver<std::result::Result<EvalResult<usize, P::Primal>, P::Err>>>,

    /// Number of descent steps.
    cnt_descent: usize,

    /// Number of function evaluation.
    cnt_evals: usize,
}

impl<P> Solver<P>
where
    P: FirstOrderProblem,
    P::Err: std::error::Error + Send + Sync + 'static,
{
    pub fn new(problem: P) -> Result<Solver<P>, Error<P::Err>> {
        Ok(Solver {
            problem,
            cur_y: dvec![],
            cur_val: 0.0,

            threadpool: ThreadPool::with_name("Parallel bundle solver".to_string(), num_cpus::get()),
            master_tx: None,
            master_rx: None,
            client_tx: None,
            client_rx: None,

            cnt_descent: 0,
            cnt_evals: 0,
        })
    }

    /// Return the underlying threadpool.
    ///
    /// In order to use the same threadpool for concurrent processes,
    /// just clone the returned `ThreadPool`.
    pub fn threadpool(&self) -> &ThreadPool {
        &self.threadpool
    }

    /// Set the threadpool.
    ///
    /// This function allows to use a specific threadpool for all processes
    /// spawned by the solver. Note that this does not involve any threads
    /// used by the problem because the solver is not responsible for executing
    /// the evaluation process of the subproblems. However, the problem might
    /// use the same threadpool as the solver.
    pub fn set_threadpool(&mut self, threadpool: ThreadPool) {
        self.threadpool = threadpool;
    }

    /// Initialize the solver.
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`solve`].
    pub fn init(&mut self) -> Result<(), Error<P::Err>> {
        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        self.cur_y.init0(n);
        self.cnt_descent = 0;
        self.cnt_evals = 0;

        let (tx, rx) = channel();
        self.client_tx = Some(tx);
        self.client_rx = Some(rx);

        let (tx, rx) = channel();
        let (rev_tx, rev_rx) = channel();

        self.master_tx = Some(tx);
        self.master_rx = Some(rev_rx);
        self.threadpool.execute(move || {
            let mut rev_tx = rev_tx;
            if let Err(err) = Self::master_main(&mut rev_tx, rx) {
                #[allow(unused_must_use)]
                {
                    rev_tx.send(Err(err));
                }
            }
        });

        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
                .map_err(Error::Evaluation)?;
        }

        let mut center_values: Vec<Option<Real>> = vec![None; m];
        let mut minorants: Vec<Option<Minorant>> = vec![None; m];
        let mut cnt_remaining_objs = m;
        let mut cnt_remaining_mins = m;
        for m in self.client_rx.as_ref().unwrap() {
            match m {
                Ok(EvalResult::ObjectiveValue { index: i, value }) => {
                    println!("Receive objective {}", i);
                    if center_values[i].is_none() {
                        cnt_remaining_objs -= 1;
                        center_values[i] = Some(value);
                    }
                }
                Ok(EvalResult::Minorant { index: i, minorant, .. }) => {
                    println!("Receive minorant {}", i);
                    if let Some(ref mut min) = minorants[i] {
                        if min.constant < minorant.constant {
                            *min = minorant;
                        }
                    } else {
                        cnt_remaining_mins -= 1;
                        minorants[i] = Some(minorant);
                    }
                }
                Err(err) => return Err(Error::Evaluation(err)),
            };
            if cnt_remaining_mins == 0 && cnt_remaining_objs == 0 {
                break;
            }
        }

        panic!("Initial evaluation completed");

        Ok(())
    }

    fn master_main(
        tx: &mut MasterSender<P::Err>,
        rx: MasterReceiver,
    ) -> std::result::Result<(), MasterProblemError<P::Err>> {
        let mut master = CplexMaster::new().map(BoxedMasterProblem::with_master)?;
        for m in rx {
            match m {
                MasterTask::AddMinorant(i, m) => {
                    master.add_minorant(i, m)?;
                }
                MasterTask::MoveCenter(alpha, d) => {
                    master.move_center(alpha, &d);
                }
                MasterTask::Solve { center_value } => {
                    master.solve(center_value)?;
                }
            };
        }

        Ok(())
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
        unimplemented!()
    }
}