14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
use crossbeam::channel::{select, unbounded as channel, Receiver, RecvError, Sender};
use log::{debug, info};
use num_cpus;
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, ChannelUpdateSender, EvalResult, Update};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
|
|
|
|
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
*/
//! An asynchronous parallel bundle solver.
use crossbeam::channel::{select, unbounded as channel, Receiver, RecvError, Sender};
use log::{debug, info, warn};
use num_cpus;
use num_traits::Float;
use std::sync::Arc;
use std::time::Instant;
use threadpool::ThreadPool;
use crate::{DVector, Real};
use super::channels::{ChannelSender, ChannelUpdateSender, ClientMessage, EvalResult, Update};
use super::masterprocess::{self, MasterConfig, MasterProcess, MasterResponse};
use crate::master::{Builder as MasterBuilder, MasterProblem};
use crate::problem::{FirstOrderProblem, UpdateState};
use crate::terminator::{StandardTerminatable, StandardTerminator, Terminator};
use crate::weighter::{HKWeightable, HKWeighter, Weighter};
/// The default iteration limit.
|
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
fn from(err: RecvError) -> Error<MErr, PErr> {
Error::Process(err)
}
}
type ClientSender<P> = Sender<EvalResult<usize, <P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err>>;
type ClientReceiver<P> = Receiver<EvalResult<usize, <P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err>>;
/// Parameters for tuning the solver.
#[derive(Debug, Clone)]
pub struct Parameters {
/// The descent step acceptance factors, must be in (0,1).
///
/// The default value is 0.1.
|
|
>
|
|
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
fn from(err: RecvError) -> Error<MErr, PErr> {
Error::Process(err)
}
}
type ClientSender<P> = Sender<ClientMessage<usize, <P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err>>;
type ClientReceiver<P> =
Receiver<ClientMessage<usize, <P as FirstOrderProblem>::Primal, <P as FirstOrderProblem>::Err>>;
/// Parameters for tuning the solver.
#[derive(Debug, Clone)]
pub struct Parameters {
/// The descent step acceptance factors, must be in (0,1).
///
/// The default value is 0.1.
|
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
|
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
debug!("Start solving up to {} iterations", niter);
self.reset_iteration_data(niter);
loop {
select! {
recv(self.client_rx.as_ref().ok_or(Error::NotInitialized)?) -> msg => {
if self.handle_client_response(msg?)? {
return Ok(false);
}
},
recv(self.master_proc.as_ref().ok_or(Error::NotInitialized)?.rx) -> msg => {
debug!("Receive master response");
// Receive result (new candidate) from the master
if self.handle_master_response(msg??)? {
return Ok(true);
|
>
>
|
|
>
>
>
>
|
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
|
pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
debug!("Start solving up to {} iterations", niter);
self.reset_iteration_data(niter);
loop {
select! {
recv(self.client_rx.as_ref().ok_or(Error::NotInitialized)?) -> msg => {
match msg? {
ClientMessage::Eval(m) =>
if self.handle_client_response(m)? {
return Ok(false);
},
ClientMessage::Update(_) => {
warn!("Ignore unexpected problem update message from client")
}
}
},
recv(self.master_proc.as_ref().ok_or(Error::NotInitialized)?.rx) -> msg => {
debug!("Receive master response");
// Receive result (new candidate) from the master
if self.handle_master_response(msg??)? {
return Ok(true);
|
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
|
.collect(),
},
ChannelUpdateSender::new(self.data.cnt_iter, update_tx),
)
.map_err(Error::Update)?;
let mut have_update = false;
for update in update_rx {
match update {
Update::AddVariables { bounds, sgext, .. } => {
have_update = true;
let mut newvars = Vec::with_capacity(bounds.len());
for (lower, upper) in bounds {
if lower > upper {
return Err(Error::InvalidBounds { lower, upper });
}
let value = if lower > 0.0 {
lower
} else if upper < 0.0 {
upper
} else {
0.0
};
//self.bounds.push((lower, upper));
newvars.push((None, lower - value, upper - value, value));
}
if !newvars.is_empty() {
// modify moved variables
for (index, val) in newvars.iter().filter_map(|v| v.0.map(|i| (i, v.3))) {
self.data.cur_y[index] = val;
}
// add new variables
self.data
.cur_y
.extend(newvars.iter().filter(|v| v.0.is_none()).map(|v| v.3));
master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1, v.2)).collect(), sgext)?;
}
}
Update::Done { .. } => (), // there's nothing to do
Update::Error { err, .. } => return Err(Error::Update(err)),
}
}
Ok(have_update)
}
/// Return the bound the function value must be below of to enforce a descent step.
|
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
|
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
|
.collect(),
},
ChannelUpdateSender::new(self.data.cnt_iter, update_tx),
)
.map_err(Error::Update)?;
let mut have_update = false;
for msg in update_rx {
if let ClientMessage::Update(update) = msg {
match update {
Update::AddVariables { bounds, sgext, .. } => {
have_update = true;
let mut newvars = Vec::with_capacity(bounds.len());
for (lower, upper) in bounds {
if lower > upper {
return Err(Error::InvalidBounds { lower, upper });
}
let value = if lower > 0.0 {
lower
} else if upper < 0.0 {
upper
} else {
0.0
};
//self.bounds.push((lower, upper));
newvars.push((None, lower - value, upper - value, value));
}
if !newvars.is_empty() {
// modify moved variables
for (index, val) in newvars.iter().filter_map(|v| v.0.map(|i| (i, v.3))) {
self.data.cur_y[index] = val;
}
// add new variables
self.data
.cur_y
.extend(newvars.iter().filter(|v| v.0.is_none()).map(|v| v.3));
master_proc.add_vars(newvars.iter().map(|v| (v.0, v.1, v.2)).collect(), sgext)?;
}
}
Update::Done { .. } => (), // there's nothing to do
Update::Error { err, .. } => return Err(Error::Update(err)),
}
} else {
unreachable!("No evaluation result during update");
}
}
Ok(have_update)
}
/// Return the bound the function value must be below of to enforce a descent step.
|