RsBundle  Changes On Branch 43abfc4c67ef4e09

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Changes In Branch result-sender Through [43abfc4c67] Excluding Merge-Ins

This is equivalent to a diff from 9c4fe5cd4e to 43abfc4c67

2019-11-22
12:00
Merge trunk check-in: 9198cc93a8 user: fifr tags: result-sender
11:52
Merge error check-in: e0832fff42 user: fifr tags: trunk
11:22
Use concrete error types propagating through the master problem layers check-in: 7176bf2280 user: fifr tags: error
09:24
problem: make `ResultSender` a trait with implementation `ChannelSender` check-in: 43abfc4c67 user: fifr tags: result-sender
09:01
problem: hide result channel in opaque type `ResultSender` check-in: 4d0fdfb346 user: fifr tags: result-sender
2019-11-20
10:27
Merge trunk check-in: 021afa4a12 user: fifr tags: async
10:27
mcf: use logfile only in debug mode check-in: 9c4fe5cd4e user: fifr tags: trunk
2019-11-19
08:10
asyn: remove `cnt_updates` parameter from `show_info` check-in: 985706379b user: fifr tags: trunk

Changes to examples/cflp.rs.
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::io::Write as _;
use std::sync::Arc;

use env_logger::{self, fmt::Color};
use ordered_float::NotNan;
use threadpool::ThreadPool;

use bundle::problem::{EvalResult, FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{dvec, DVector, Minorant, Real};

const Nfac: usize = 3;
const Ncus: usize = 5;
const F: [Real; Nfac] = [1000.0, 1000.0, 1000.0];
const CAP: [Real; Nfac] = [500.0, 500.0, 500.0];







|







27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::io::Write as _;
use std::sync::Arc;

use env_logger::{self, fmt::Color};
use ordered_float::NotNan;
use threadpool::ThreadPool;

use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{dvec, DVector, Minorant, Real};

const Nfac: usize = 3;
const Ncus: usize = 5;
const F: [Real; Nfac] = [1000.0, 1000.0, 1000.0];
const CAP: [Real; Nfac] = [500.0, 500.0, 500.0];
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
        self.pool = Some(pool)
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<I>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        index: I,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + Copy + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        self.pool.as_ref().unwrap().execute(move || {
            let res = if i < Nfac {
                solve_facility_problem(i, &y)
            } else {
                solve_customer_problem(i - Nfac, &y)
            };
            match res {
                Ok((value, minorant, primal)) => {
                    if tx.send(Ok(EvalResult::ObjectiveValue { index, value })).is_err() {
                        return;
                    }
                    if tx
                        .send(Ok(EvalResult::Minorant {
                            index,
                            minorant,
                            primal,
                        }))
                        .is_err()
                    {
                        return;
                    }
                }
                Err(err) => if tx.send(Err(err)).is_err() {},
            };
        });
        Ok(())
    }
}

fn show_primals<F>(f: F) -> Result<(), Box<dyn Error>>







<
<
<
<
<
<
|

|













|


<
<
<
|
<
<
<
<



|







141
142
143
144
145
146
147






148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166



167




168
169
170
171
172
173
174
175
176
177
178
        self.pool = Some(pool)
    }

    fn stop(&mut self) {
        self.pool.take();
    }







    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        self.pool.as_ref().unwrap().execute(move || {
            let res = if i < Nfac {
                solve_facility_problem(i, &y)
            } else {
                solve_customer_problem(i - Nfac, &y)
            };
            match res {
                Ok((value, minorant, primal)) => {
                    if tx.objective(value).is_err() {
                        return;
                    }



                    if tx.minorant(minorant, primal).is_err() {




                        return;
                    }
                }
                Err(err) => if tx.error(err).is_err() {},
            };
        });
        Ok(())
    }
}

fn show_primals<F>(f: F) -> Result<(), Box<dyn Error>>
Changes to examples/quadratic.rs.
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use env_logger::fmt::Color;
use log::{debug, Level};
use rustop::opts;
use std::io::Write;
use std::sync::Arc;
use std::thread;

use bundle::problem::{EvalResult, FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{DVector, Minorant, Real};

#[derive(Clone)]
struct QuadraticProblem {
    a: [[Real; 2]; 2],
    b: [Real; 2],







|







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use env_logger::fmt::Color;
use log::{debug, Level};
use rustop::opts;
use std::io::Write;
use std::sync::Arc;
use std::thread;

use bundle::problem::{FirstOrderProblem as ParallelProblem, ResultSender};
use bundle::solver::sync::{DefaultSolver, NoBundleSolver};
use bundle::{DVector, Minorant, Real};

#[derive(Clone)]
struct QuadraticProblem {
    a: [[Real; 2]; 2],
    b: [Real; 2],
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

107
108
109
110
111
112
113
        1
    }

    fn start(&mut self) {}

    fn stop(&mut self) {}

    fn evaluate<I>(
        &mut self,
        fidx: usize,
        x: Arc<DVector>,
        index: I,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + Copy + 'static,
    {
        let x = x.clone();
        let p = self.clone();
        thread::spawn(move || {
            assert_eq!(fidx, 0);
            let mut objective = p.c;
            let mut g = dvec![0.0; 2];

            for i in 0..2 {
                g[i] += (0..2).map(|j| p.a[i][j] * x[j]).sum::<Real>();
                objective += x[i] * (g[i] + p.b[i]);
                g[i] = 2.0 * g[i] + p.b[i];
            }

            debug!("Evaluation at {:?}", x);
            debug!("  objective={}", objective);
            debug!("  subgradient={}", g);

            tx.send(Ok(EvalResult::ObjectiveValue {
                index,
                value: objective,
            }))
            .unwrap();
            tx.send(Ok(EvalResult::Minorant {
                index,
                minorant: Minorant {
                    constant: objective,
                    linear: g,
                },
                primal: (),
            }))

            .unwrap();
        });
        Ok(())
    }
}

fn main() -> Result<(), Box<dyn Error>> {







<
<
<
<
<
<
|

|


















<
<
<
<
|
|
<
|



|
<
>







60
61
62
63
64
65
66






67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87




88
89

90
91
92
93
94

95
96
97
98
99
100
101
102
        1
    }

    fn start(&mut self) {}

    fn stop(&mut self) {}







    fn evaluate<S>(&mut self, fidx: usize, x: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
    {
        let x = x.clone();
        let p = self.clone();
        thread::spawn(move || {
            assert_eq!(fidx, 0);
            let mut objective = p.c;
            let mut g = dvec![0.0; 2];

            for i in 0..2 {
                g[i] += (0..2).map(|j| p.a[i][j] * x[j]).sum::<Real>();
                objective += x[i] * (g[i] + p.b[i]);
                g[i] = 2.0 * g[i] + p.b[i];
            }

            debug!("Evaluation at {:?}", x);
            debug!("  objective={}", objective);
            debug!("  subgradient={}", g);





            tx.objective(objective).unwrap();
            tx.minorant(

                Minorant {
                    constant: objective,
                    linear: g,
                },
                (),

            )
            .unwrap();
        });
        Ok(())
    }
}

fn main() -> Result<(), Box<dyn Error>> {
Changes to src/mcf/problem.rs.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf;
use crate::problem::{
    EvalResult, FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender,
    UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};

use itertools::izip;
use log::{debug, warn};
use num_traits::Float;







|







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf;
use crate::problem::{
    FirstOrderProblem as ParallelProblem, ResultSender, Update as ParallelUpdate, UpdateSender,
    UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};

use itertools::izip;
use log::{debug, warn};
use num_traits::Float;
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482

483
484
485
486
487
488
489
490
491
492
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<I>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        index: I,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<()>
    where
        I: Send + Copy + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        let sub = self.subs[i].clone();
        // Attention: `y` might be shorter than the set of active constraints
        // because evaluation and problem update are not synchronized, i.e. the
        // evaluation may still use an older model with less constraints.
        let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
        self.pool
            .as_ref()
            .unwrap()
            .execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
                Ok((objective, subg, primal)) => {
                    tx.send(Ok(EvalResult::ObjectiveValue {
                        index,
                        value: objective,
                    }))
                    .unwrap();
                    tx.send(Ok(EvalResult::Minorant {
                        index,
                        minorant: Minorant {
                            constant: objective,
                            linear: subg,
                        },
                        primal,
                    }))

                    .unwrap();
                }
                Err(err) => tx.send(Err(err)).unwrap(),
            });
        Ok(())
    }

    fn update<I, U>(&mut self, state: U, index: I, tx: UpdateSender<I, Self::Primal, Self::Err>) -> Result<()>
    where
        U: ParallelUpdateState<Self::Primal>,







|
<
<
<
<
<
<

|















<
<
<
<
|
|
<
|




<
>


|







439
440
441
442
443
444
445
446






447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463




464
465

466
467
468
469
470

471
472
473
474
475
476
477
478
479
480
481
        self.pool = Some(pool);
    }

    fn stop(&mut self) {
        self.pool.take();
    }

    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>






    where
        S: ResultSender<Self> + 'static,
    {
        if self.pool.is_none() {
            self.start()
        }
        let y = y.clone();
        let sub = self.subs[i].clone();
        // Attention: `y` might be shorter than the set of active constraints
        // because evaluation and problem update are not synchronized, i.e. the
        // evaluation may still use an older model with less constraints.
        let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
        self.pool
            .as_ref()
            .unwrap()
            .execute(move || match sub.write().unwrap().evaluate(&y, active_constraints) {
                Ok((objective, subg, primal)) => {




                    tx.objective(objective).unwrap();
                    tx.minorant(

                        Minorant {
                            constant: objective,
                            linear: subg,
                        },
                        primal,

                    )
                    .unwrap();
                }
                Err(err) => tx.error(err).unwrap(),
            });
        Ok(())
    }

    fn update<I, U>(&mut self, state: U, index: I, tx: UpdateSender<I, Self::Primal, Self::Err>) -> Result<()>
    where
        U: ParallelUpdateState<Self::Primal>,
Changes to src/problem.rs.
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


33
34
35
36

37
38


39
40
41
42
43
44
45
46
47
48

//! An asynchronous first-order oracle.

use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;

/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
///    1. The (exact) function value of a sub-function at some point.
///    2. A minorant of some sub-function.
#[derive(Debug)]


pub enum EvalResult<I, P> {
    /// The objective value at some point.
    ObjectiveValue { index: I, value: Real },
    /// A minorant with an associated primal.

    Minorant { index: I, minorant: Minorant, primal: P },
}



/// Channel to send evaluation results to.
pub type ResultSender<I, P, E> = Sender<Result<EvalResult<I, P>, E>>;

/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.
pub enum Update<I, P, E> {







|
<
|
|
<
<
<
<
|
>
>
|
|
|
|
>
|
|
>
>
|
<
<







17
18
19
20
21
22
23
24

25
26




27
28
29
30
31
32
33
34
35
36
37
38
39


40
41
42
43
44
45
46

//! An asynchronous first-order oracle.

use crate::{Aggregatable, DVector, Minorant, Real};
use crossbeam::channel::Sender;
use std::sync::Arc;

/// Channel to send evaluation results to.

pub trait ResultSender<P>: Send
where




    P: FirstOrderProblem,
{
    type Err: std::error::Error + Send;

    /// Send a new objective `value`.
    fn objective(&self, value: Real) -> Result<(), Self::Err>;

    /// Send a new `minorant` with associated `primal`.
    fn minorant(&self, minorant: Minorant, primal: P::Primal) -> Result<(), Self::Err>;

    /// Send an error message.
    fn error(&self, err: P::Err) -> Result<(), Self::Err>;
}



/// Problem update information.
///
/// The solver calls the `update` method of the problem regularly.
/// This method can modify the problem by adding (or moving)
/// variables. The possible updates are encoded in this type.
pub enum Update<I, P, E> {
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

172
173
174
175
176
177
178

    /// Start the evaluation of the i^th subproblem at the given point.
    ///
    /// The results of the evaluation should be passed to the provided channel.
    /// In order to work correctly, the results must contain (an upper bound on)
    /// the objective value at $y$ as well as at least one subgradient centered
    /// at $y$ eventually.
    fn evaluate<I: Send + Copy + 'static>(
        &mut self,
        i: usize,
        y: Arc<DVector>,
        index: I,
        tx: ResultSender<I, Self::Primal, Self::Err>,
    ) -> Result<(), Self::Err>
    where
        I: Send + 'static;


    /// Called to update the problem.
    ///
    /// This method is called regularly by the solver. The problem should send problem update
    /// information (e.g. adding new variables) to the provided channel.
    ///
    /// The updates might be generated asynchronously.







<
<
<
<
<
<
|

|
>







154
155
156
157
158
159
160






161
162
163
164
165
166
167
168
169
170
171

    /// Start the evaluation of the i^th subproblem at the given point.
    ///
    /// The results of the evaluation should be passed to the provided channel.
    /// In order to work correctly, the results must contain (an upper bound on)
    /// the objective value at $y$ as well as at least one subgradient centered
    /// at $y$ eventually.






    fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<(), Self::Err>
    where
        S: ResultSender<Self> + 'static,
        Self: Sized;

    /// Called to update the problem.
    ///
    /// This method is called regularly by the solver. The problem should send problem update
    /// information (e.g. adding new variables) to the provided channel.
    ///
    /// The updates might be generated asynchronously.
Changes to src/solver.rs.
19
20
21
22
23
24
25



pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};

pub mod asyn;

mod masterprocess;









>
>
19
20
21
22
23
24
25
26
27

pub mod sync;
pub use sync::{DefaultSolver, NoBundleSolver};

pub mod asyn;

mod masterprocess;

mod channels;
Changes to src/solver/asyn.rs.
24
25
26
27
28
29
30

31
32
33
34
35
36
37
38
39
40
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};


use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{EvalResult, FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.







>


|







24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::iter::repeat;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};

use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
    T: Terminator<SolverData<P>> + Default,
    W: Weighter<SolverData<P>> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self







|







439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    T: Terminator<SolverData<P>> + Default,
    W: Weighter<SolverData<P>> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();








|







559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();

876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), i, client_tx.clone())
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    /// Update the problem after descent or null `step`.
    ///







|







877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    /// Update the problem after descent or null `step`.
    ///
Added src/solver/channels.rs.




































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
 * Copyright (c) 2019 Frank Fischer <frank-fischer@shadow-soft.de>
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see  <http://www.gnu.org/licenses/>
 */

//! Implementation for `ResultSender` using channels.

use crate::problem::{FirstOrderProblem, ResultSender};
use crate::{Minorant, Real};

use crossbeam::channel::{SendError, Sender};

/// Evaluation result.
///
/// The result of an evaluation is new information to be made
/// available to the solver and the master problem. There are
/// essentially two types of information:
///
///    1. The (exact) function value of a sub-function at some point.
///    2. A minorant of some sub-function.
#[derive(Debug)]
pub enum EvalResult<I, P> {
    /// The objective value at some point.
    ObjectiveValue { index: I, value: Real },
    /// A minorant with an associated primal.
    Minorant { index: I, minorant: Minorant, primal: P },
}

pub struct ChannelSender<I, P, E> {
    index: I,
    tx: Sender<Result<EvalResult<I, P>, E>>,
}

impl<I, P, E> ChannelSender<I, P, E> {
    pub fn new(index: I, tx: Sender<Result<EvalResult<I, P>, E>>) -> Self {
        ChannelSender { index, tx }
    }
}

impl<I, P> ResultSender<P> for ChannelSender<I, P::Primal, P::Err>
where
    I: Clone + Send,
    P: FirstOrderProblem,
    P::Err: Send,
{
    type Err = SendError<Result<EvalResult<I, P::Primal>, P::Err>>;

    fn objective(&self, value: Real) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Ok(EvalResult::ObjectiveValue {
            index: self.index.clone(),
            value,
        }))
    }

    fn minorant(
        &self,
        minorant: Minorant,
        primal: P::Primal,
    ) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Ok(EvalResult::Minorant {
            index: self.index.clone(),
            minorant,
            primal,
        }))
    }

    fn error(&self, err: P::Err) -> Result<(), SendError<Result<EvalResult<I, P::Primal>, P::Err>>> {
        self.tx.send(Err(err))
    }
}
Changes to src/solver/sync.rs.
23
24
25
26
27
28
29

30
31
32
33
34
35
36
37
38
39
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};


use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{EvalResult, FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.







>


|







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;

use crate::{DVector, Real};

use super::channels::{ChannelSender, EvalResult};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, Update, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};

/// The default iteration limit.
pub const DEFAULT_ITERATION_LIMIT: usize = 10_000;

/// The default solver.
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self







|







417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), i, self.client_tx.clone().unwrap())
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();








|







537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
        ));

        debug!("Initial problem evaluation");
        // We need an initial evaluation of all oracles for the first center.
        let y = Arc::new(self.data.cur_y.clone());
        for i in 0..m {
            self.problem
                .evaluate(i, y.clone(), ChannelSender::new(i, self.client_tx.clone().unwrap()))
                .map_err(Error::Evaluation)?;
        }

        debug!("Initialization complete");

        self.start_time = Instant::now();

769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), i, client_tx.clone())
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;







|







770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
            .resize(self.problem.num_subproblems(), -Real::infinity());
        self.data.cnt_remaining_mins = self.problem.num_subproblems();

        // Start evaluation of all subproblems at the new candidate.
        let client_tx = self.client_tx.as_ref().ok_or(Error::NotInitialized)?;
        for i in 0..self.problem.num_subproblems() {
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;