RsBundle  Diff

Differences From Artifact [e268746126]:

  • File src/solver/sync.rs — part of check-in [e12bd4bd5d] at 2020-07-18 18:49:23 on branch async — Merge trunk (user: fifr size: 29477) [more...]

To Artifact [288af33138]:

  • File src/solver/sync.rs — part of check-in [5ea4ec77c8] at 2020-07-20 19:32:43 on branch async — Merge trunk (user: fifr size: 29029) [more...]

19
20
21
22
23
24
25
26

27
28
29
30
31
32

33
34
35
36
37
38
39
19
20
21
22
23
24
25

26
27
28
29
30
31

32
33
34
35
36
37
38
39







-
+





-
+








#[cfg(feature = "crossbeam")]
use rs_crossbeam::channel::{unbounded as channel, RecvError};
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel, RecvError};

use log::{debug, info, warn};
use num_traits::Float;
use num_traits::{Float, Zero};
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use threadpool::ThreadPool;

use crate::{DVector, Real};
use crate::{DVector, Minorant, Real};

use super::channels::{
    ChannelResultSender, ChannelUpdateSender, ClientReceiver, ClientSender, EvalResult, Message, Update,
};
use super::masterprocess::{MasterConfig, MasterError, MasterProcess, MasterResponse, Response};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
84
85
86
87
88
89
90
91
92


93
94
95
96
97
98
99
84
85
86
87
88
89
90


91
92
93
94
95
96
97
98
99







-
-
+
+







/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<
        <P as FirstOrderProblem>::Err,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Primal>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Primal,
        <<M as MasterBuilder<<P as FirstOrderProblem>::Minorant>>::MasterProblem as MasterProblem<
            <P as FirstOrderProblem>::Minorant,
        >>::Err,
    >,
>;

impl<PErr, MErr> From<MasterError<PErr, MErr>> for Error<PErr, MErr>
where
    PErr: std::error::Error + 'static,
342
343
344
345
346
347
348
349

350
351
352
353
354
355
356
342
343
344
345
346
347
348

349
350
351
352
353
354
355
356







-
+







    }
}

/// Implementation of a parallel bundle method.
pub struct Solver<P, T = StandardTerminator, W = HKWeighter, M = crate::master::FullMasterBuilder>
where
    P: FirstOrderProblem,
    M: MasterBuilder<P::Primal>,
    M: MasterBuilder<P::Minorant>,
    P::Err: 'static,
{
    /// Parameters for the solver.
    pub params: Parameters,

    /// Termination predicate.
    pub terminator: T,
396
397
398
399
400
401
402
403

404
405
406
407
408
409
410
396
397
398
399
400
401
402

403
404
405
406
407
408
409
410







-
+








impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder<P::Primal>,
    M: MasterBuilder<P::Minorant>,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
    where
        M: Default,
    {
        Self::with_master(problem, M::default())
603
604
605
606
607
608
609
610

611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627

628
629
630
631
632
633
634
603
604
605
606
607
608
609

610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626

627
628
629
630
631
632
633
634







-
+
















-
+







            }
        }
    }

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(&mut self, msg: EvalResult<usize, P::Primal, P::Err>) -> Result<bool, P, M> {
    fn handle_client_response(&mut self, msg: EvalResult<usize, P::Minorant, P::Err>) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
                self.data.nxt_ubs[index] = self.data.nxt_ubs[index].min(value);
            }
            EvalResult::Minorant { index, mut minorant } => {
                debug!("Receive minorant from subproblem {}", index);
                if self.data.nxt_cutvals[index].is_infinite() {
                    self.data.cnt_remaining_mins -= 1;
                }
                // move center of minorant to cur_y
                minorant.move_center(-1.0, &self.data.nxt_d);
                self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant);
                self.data.nxt_cutvals[index] = self.data.nxt_cutvals[index].max(minorant.constant());
                // add minorant to master problem
                master.add_minorant(index, minorant)?;
            }
            EvalResult::Done { .. } => return Ok(false), // nothing to do here
            EvalResult::Error { err, .. } => return Err(Error::Evaluation(err)),
        }

801
802
803
804
805
806
807
808
809
810




811
812
813
814


815
816
817

818
819
820

821
822

823
824
825
826
827
828



829
830

831
832
833
834

835
836
837
838
839
840
841
801
802
803
804
805
806
807



808
809
810
811




812
813



814



815


816






817
818
819
820

821


822

823
824
825
826
827
828
829
830







-
-
-
+
+
+
+
-
-
-
-
+
+
-
-
-
+
-
-
-
+
-
-
+
-
-
-
-
-
-
+
+
+

-
+
-
-

-
+








        let mut have_update = false;
        for msg in update_rx {
            if let Message::Update(update) = msg {
                match update {
                    Update::AddVariables { bounds, sgext, .. } => {
                        have_update = true;
                        let mut newvars = Vec::with_capacity(bounds.len());
                        for (lower, upper) in bounds {
                            if lower > upper {
                        let newvars = bounds
                            .into_iter()
                            .map(|(lower, upper)| {
                                if lower <= upper {
                                return Err(Error::InvalidBounds { lower, upper });
                            }
                            let value = if lower > 0.0 {
                                lower
                                    let value = lower.max(Real::zero()).min(upper);
                                    Ok((lower - value, upper - value, value))
                            } else if upper < 0.0 {
                                upper
                            } else {
                                } else {
                                0.0
                            };
                            //self.bounds.push((lower, upper));
                                    Err(Error::InvalidBounds { lower, upper })
                            newvars.push((None, lower - value, upper - value, value));
                        }
                                }
                        if !newvars.is_empty() {
                            // modify moved variables
                            for (index, val) in newvars.iter().filter_map(|v| v.0.map(|i| (i, v.3))) {
                                self.data.cur_y[index] = val;
                            }

                            })
                            .collect::<Result<Vec<_>, P, M>>()?;
                        if !newvars.is_empty() {
                            // add new variables
                            self.data
                            self.data.cur_y.extend(newvars.iter().map(|v| v.2));
                                .cur_y
                                .extend(newvars.iter().filter(|v| v.0.is_none()).map(|v| v.3));

                            master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1, v.2)).collect(), sgext)?;
                            master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1)).collect(), sgext)?;
                        }
                    }
                    Update::Done { .. } => (), // there's nothing to do
                    Update::Error { err, .. } => return Err(Error::Update(err)),
                }
            } else {
                unreachable!("Only update results allowed during update");
876
877
878
879
880
881
882
883

884
885
886
887
888
889
890
865
866
867
868
869
870
871

872
873
874
875
876
877
878
879







-
+







            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<<P::Minorant as Minorant>::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}