Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Changes In Branch result-sender Through [43abfc4c67] Excluding Merge-Ins
This is equivalent to a diff from 9c4fe5cd4e to 43abfc4c67
|
2019-11-22
| ||
| 12:00 | Merge trunk check-in: 9198cc93a8 user: fifr tags: result-sender | |
| 11:52 | Merge error check-in: e0832fff42 user: fifr tags: trunk | |
| 11:22 | Use concrete error types propagating through the master problem layers check-in: 7176bf2280 user: fifr tags: error | |
| 09:24 | problem: make `ResultSender` a trait with implementation `ChannelSender` check-in: 43abfc4c67 user: fifr tags: result-sender | |
| 09:01 | problem: hide result channel in opaque type `ResultSender` check-in: 4d0fdfb346 user: fifr tags: result-sender | |
|
2019-11-20
| ||
| 10:27 | Merge trunk check-in: 021afa4a12 user: fifr tags: async | |
| 10:27 | mcf: use logfile only in debug mode check-in: 9c4fe5cd4e user: fifr tags: trunk | |
|
2019-11-19
| ||
| 08:10 | asyn: remove `cnt_updates` parameter from `show_info` check-in: 985706379b user: fifr tags: trunk | |
Changes to examples/cflp.rs.
| ︙ | ︙ | |||
27 28 29 30 31 32 33 |
use std::io::Write as _;
use std::sync::Arc;
use env_logger::{self, fmt::Color};
use ordered_float::NotNan;
use threadpool::ThreadPool;
| | | 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
use std::io::Write as _;
use std::sync::Arc;
use env_logger::{self, fmt::Color};
use ordered_float::NotNan;
use threadpool::ThreadPool;
use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{dvec, DVector, Minorant, Real};
const Nfac: usize = 3;
const Ncus: usize = 5;
const F: [Real; Nfac] = [1000.0, 1000.0, 1000.0];
const CAP: [Real; Nfac] = [500.0, 500.0, 500.0];
|
| ︙ | ︙ | |||
141 142 143 144 145 146 147 |
self.pool = Some(pool)
}
fn stop(&mut self) {
self.pool.take();
}
| < < < < < < | | | < < < | < < < < | | 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
self.pool = Some(pool)
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
{
if self.pool.is_none() {
self.start()
}
let y = y.clone();
self.pool.as_ref().unwrap().execute(move || {
let res = if i < Nfac {
solve_facility_problem(i, &y)
} else {
solve_customer_problem(i - Nfac, &y)
};
match res {
Ok((value, minorant, primal)) => {
if tx.objective(value).is_err() {
return;
}
if tx.minorant(minorant, primal).is_err() {
return;
}
}
Err(err) => if tx.error(err).is_err() {},
};
});
Ok(())
}
}
fn show_primals<F>(f: F) -> Result<(), Box<dyn Error>>
|
| ︙ | ︙ |
Changes to examples/quadratic.rs.
| ︙ | ︙ | |||
23 24 25 26 27 28 29 |
use env_logger::fmt::Color;
use log::{debug, Level};
use rustop::opts;
use std::io::Write;
use std::sync::Arc;
use std::thread;
| | | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
use env_logger::fmt::Color;
use log::{debug, Level};
use rustop::opts;
use std::io::Write;
use std::sync::Arc;
use std::thread;
use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{DVector, Minorant, Real};
#[derive(Clone)]
struct QuadraticProblem {
a: [[Real; 2]; 2],
b: [Real; 2],
|
| ︙ | ︙ | |||
60 61 62 63 64 65 66 |
1
}
fn start(&mut self) {}
fn stop(&mut self) {}
| < < < < < < | | < < < < | | < | | < > | 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
1
}
fn start(&mut self) {}
fn stop(&mut self) {}
fn evaluate<S>(&mut self, fidx: usize, x: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
{
let x = x.clone();
let p = self.clone();
thread::spawn(move || {
assert_eq!(fidx, 0);
let mut objective = p.c;
let mut g = dvec![0.0; 2];
for i in 0..2 {
g[i] += (0..2).map(|j| p.a[i][j] * x[j]).sum::<Real>();
objective += x[i] * (g[i] + p.b[i]);
g[i] = 2.0 * g[i] + p.b[i];
}
debug!("Evaluation at {:?}", x);
debug!(" objective={}", objective);
debug!(" subgradient={}", g);
tx.objective(objective).unwrap();
tx.minorant(
Minorant {
constant: objective,
linear: g,
},
(),
)
.unwrap();
});
Ok(())
}
}
fn main() -> Result<(), Box<dyn Error>> {
|
| ︙ | ︙ |
Changes to src/mcf/problem.rs.
| ︙ | ︙ | |||
12 13 14 15 16 17 18 |
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>
//
use crate::mcf;
use crate::problem::{
| | | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>
//
use crate::mcf;
use crate::problem::{
FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender,
UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};
use itertools::izip;
use log::{debug, warn};
use num_traits::Float;
|
| ︙ | ︙ | |||
439 440 441 442 443 444 445 |
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
| | < < < < < < | < < < < | | < | < > | | 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 |
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
where
S: ResultSender<Self> + 'static,
{
if self.pool.is_none() {
self.start()
}
let y = y.clone();
let sub = self.subs[i].clone();
// Attention: `y` might be shorter than the set of active constraints
// because evaluation and problem update are not synchronized, i.e. the
// evaluation may still use an older model with less constraints.
let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
self.pool
.as_ref()
.unwrap()
.execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
Ok((objective, subg, primal)) => {
tx.objective(objective).unwrap();
tx.minorant(
Minorant {
constant: objective,
linear: subg,
},
primal,
)
.unwrap();
}
Err(err) => tx.error(err).unwrap(),
});
Ok(())
}
fn update<I, U>(&mut self, state: U, index: I, tx: UpdateSender<I, Self::Primal, Self::Err>) -> Result<()>
where
U: ParallelUpdateState<Self::Primal>,
|
| ︙ | ︙ |
Changes to src/problem.rs.
| ︙ | ︙ | |||
17 18 19 20 21 22 23 |
//! An asynchronous first-order oracle.
use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;
| | < | | < < < < | > > | | | | > | | > > | < < | 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
//! An asynchronous first-order oracle.
use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;
/// Channel to send evaluation results to.
pub trait ResultSender<P>: Send
where
P: FirstOrderProblem,
{
type Err: std::error::Error + Send;
/// Send a new objective `value`.
fn objective(&self, value: Real) -> Result<(), Self::Err>;
/// Send a new `minorant` with associated `primal`.
fn minorant(&self, minorant: Minorant, primal: P::Primal) -> Result<(), Self::Err>;
/// Send an error message.
fn error(&self, err: P::Err) -> Result<(), Self::Err>;
}
/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.
pub enum Update<I, P, E> {
|
| ︙ | ︙ | |||
156 157 158 159 160 161 162 |
/// Start the evaluation of the i^th subproblem at the given point.
///
/// The results of the evaluation should be passed to the provided channel.
/// In order to work correctly, the results must contain (an upper bound on)
/// the objective value at $y$ as well as at least one subgradient centered
/// at $y$ eventually.
| < < < < < < | | > | 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
/// Start the evaluation of the i^th subproblem at the given point.
///
/// The results of the evaluation should be passed to the provided channel.
/// In order to work correctly, the results must contain (an upper bound on)
/// the objective value at $y$ as well as at least one subgradient centered
/// at $y$ eventually.
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
where
S: ResultSender<Self> + 'static,
Self: Sized;
/// Called to update the problem.
///
/// This method is called regularly by the solver. The problem should send problem update
/// information (e.g. adding new variables) to the provided channel.
///
/// The updates might be generated asynchronously.
|
| ︙ | ︙ |
Changes to src/solver.rs.
| ︙ | ︙ | |||
19 20 21 22 23 24 25 |
pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};
pub mod asyn;
mod masterprocess;
| > > | 19 20 21 22 23 24 25 26 27 |
pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};
pub mod asyn;
mod masterprocess;
mod channels;
|
Changes to src/solver/asyn.rs.
| ︙ | ︙ | |||
24 25 26 27 28 29 30 31 32 |
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
| > | | 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
|
| ︙ | ︙ | |||
438 439 440 441 442 443 444 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
| | | 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
T: Terminator<SolverData<P>> + Default,
W: Weighter<SolverData<P>> + Default,
M: MasterBuilder,
<M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
|
| ︙ | ︙ | |||
558 559 560 561 562 563 564 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
| | | 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
.map_err(Error::Evaluation)?;
}
debug!("Initialization complete");
self.start_time = Instant::now();
|
| ︙ | ︙ | |||
876 877 878 879 880 881 882 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
| | | 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
.evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
.map_err(Error::Evaluation)?;
}
Ok(false)
}
/// Update the problem after descent or null `step`.
///
|
| ︙ | ︙ |
Added src/solver/channels.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
/*
* Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! Implementation for `ResultSender` using channels.
use crate::problem::{FirstOrderProblem, ResultSender};
use crate::{Minorant, Real};
use crossbeam::channel::{SendError, Sender};
/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
/// 1. The (exact) function value of a sub-function at some point.
/// 2. A minorant of some sub-function.
#[derive(Debug)]
pub enum EvalResult<I, P> {
/// The objective value at some point.
ObjectiveValue { index: I, value: Real },
/// A minorant with an associated primal.
Minorant { index: I, minorant: Minorant, primal: P },
}
pub struct ChannelSender<I, P, E> {
index: I,
tx: Sender<Result<EvalResult<I, P>, E>>,
}
impl<I, P, E> ChannelSender<I, P, E> {
pub fn new(index: I, tx: Sender<Result<EvalResult<I, P>, E>>) -> Self {
ChannelSender { index, tx }
}
}
impl<I, P> ResultSender<P> for ChannelSender<I, P::Primal, P::Err>
where
I: Clone + Send,
P: FirstOrderProblem,
P::Err: Send,
{
type Err = SendError<Result<EvalResult<I, P::Primal>, P::Err>>;
fn objective(&self, value: Real) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Ok(EvalResult::ObjectiveValue {
index: self.index.clone(),
value,
}))
}
fn minorant(
&self,
minorant: Minorant,
primal: P::Primal,
) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Ok(EvalResult::Minorant {
index: self.index.clone(),
minorant,
primal,
}))
}
fn error(&self, err: P::Err) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
self.tx.send(Err(err))
}
}
|
Changes to src/solver/sync.rs.
| ︙ | ︙ | |||
23 24 25 26 27 28 29 30 31 |
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
| > | | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;
/// The default solver.
|
| ︙ | ︙ | |||
416 417 418 419 420 421 422 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
| | | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
/// This is actually the time of the last call to `Solver::init`.
start_time: Instant,
}
impl<P, T, W, M> Solver<P, T, W, M>
where
P: FirstOrderProblem,
P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
T: Terminator<SolverData> + Default,
W: Weighter<SolverData> + Default,
M: MasterBuilder,
<M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
/// Create a new parallel bundle solver.
pub fn new(problem: P) -> Self
|
| ︙ | ︙ | |||
536 537 538 539 540 541 542 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
| | | 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 |
));
debug!("Initial problem evaluation");
// We need an initial evaluation of all oracles for the first center.
let y = Arc::new(self.data.cur_y.clone());
for i in 0..m {
self.problem
.evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
.map_err(Error::Evaluation)?;
}
debug!("Initialization complete");
self.start_time = Instant::now();
|
| ︙ | ︙ | |||
769 770 771 772 773 774 775 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
| | | 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 |
.resize(self.problem.num_subproblems(), -Real::infinity());
self.data.cnt_remaining_mins = self.problem.num_subproblems();
// Start evaluation of all subproblems at the new candidate.
let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
for i in 0..self.problem.num_subproblems() {
self.problem
.evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
.map_err(Error::Evaluation)?;
}
Ok(false)
}
fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
|
| ︙ | ︙ |