// Copyright (c) 2016-2023 Frank Fischer <frank-fischer@shadow-soft.de>
//
// This program is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>
//
use crate::mcf::{self, Solver};
#[cfg(feature = "mpi")]
use crate::mpi::DistributedFirstOrderProblem;
use crate::problem::{
FirstOrderProblem as ParallelProblem, ResultSender, UpdateSender, UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};
use log::{debug, warn};
use num_traits::Float;
use thiserror::Error;
use threadpool::ThreadPool;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::result;
use std::sync::{Arc, RwLock};
use std::time::Duration;
/// An error in the mmcf file format.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MMCFReadError {
#[error("format error")]
Format(#[source] Box<dyn std::error::Error>),
#[error("missing value '{0}' in line in node file")]
MissingNodField(&'static str),
#[error("missing value '{0}' in line in supply file")]
MissingSupField(&'static str),
#[error("missing value '{0}' in line in arc file")]
MissingArcField(&'static str),
#[error("missing value '{0}' in line in mut file")]
MissingMutField(&'static str),
#[error("extra field at the end of line in node file")]
ExtraNodField,
#[error("extra field at the end of line in supply file")]
ExtraSupField,
#[error("extra field at the end of line in arc file")]
ExtraArcField,
#[error("extra field at the end of line in mut file")]
ExtraMutField,
#[error("invalid node number {0} (must be in 1..{1})")]
InvalidNode(usize, usize),
#[error("invalid arc number {0} (must be in 1..{1})")]
InvalidArc(usize, usize),
#[error("invalid com number {0} (must be in 1..{1})")]
InvalidCom(usize, usize),
#[error("invalid mut number {0} (must be in 1..{1})")]
InvalidMut(usize, usize),
#[error("error in MCF solver")]
Solver(#[from] crate::mcf::solver::Error),
#[error("io error")]
Io(#[from] std::io::Error),
}
impl From<std::num::ParseIntError> for MMCFReadError {
fn from(err: std::num::ParseIntError) -> MMCFReadError {
MMCFReadError::Format(Box::new(err))
}
}
impl From<std::num::ParseFloatError> for MMCFReadError {
fn from(err: std::num::ParseFloatError) -> MMCFReadError {
MMCFReadError::Format(Box::new(err))
}
}
/// Type of errors of the MMCFProblem.
pub type Error = crate::mcf::solver::Error;
/// Result type of the MMCFProblem.
pub type Result<T> = result::Result<T, Error>;
/// Data of a single arc read from an instance file.
#[derive(Clone, Copy, Debug)]
#[allow(dead_code)]
struct ArcInfo {
/// The number of this arc
arc: usize,
/// The source node number
src: usize,
/// The sink node number
snk: usize,
}
/// A single coefficient in a linear coupling constraint
#[derive(Clone, Copy, Debug)]
struct Elem {
/// The variable (arc) index
ind: usize,
/// The coefficient
val: Real,
}
/// A single MMCF subproblem, i.e. one network.
struct Subproblem {
/// The (net, cost) pair.
net: Box<dyn Solver>,
c: DVector,
/// optional delay for evaluation
delay: Option<Duration>,
}
/// Constraint data of one subproblem.
///
/// This information is used to create the augmented objective function, i.e.
/// cbase - lhs'* y as well as the constant term rhs' * y
struct SubData {
/// The lhs of all constraints. For each constraint this is a list of coefficients.
lhs: Vec<Vec<Elem>>,
/// The right-hand side of each constraint
rhs: DVector,
/// The base costs of each arc
cbase: DVector,
}
unsafe impl Send for Subproblem {}
unsafe impl Sync for Subproblem {}
/// Configuration options for MMCFProblem
#[derive(Default)]
pub struct MMCFConfig {
/// If coupling constraints should be separated.
pub separate_constraints: bool,
/// A list of subproblems to be delayed.
pub delayed_subs: Vec<usize>,
/// Delay
pub delay: Duration,
}
/// A single multi-commodity flow problem instance.
///
/// The current implementation always keeps a list of all potential
/// coupling constraints but, depending on the separation choice, the
/// may not be in the model. If separation is used a list of "active"
/// constraints is maintained holding the list of those constraints
/// that are in the model. "inactive" constraints may be separated
/// later on.
pub struct MMCFProblem {
/// The list of subproblems (single network flows)
subs: Vec<Arc<RwLock<Subproblem>>>,
/// The coupling constraints for each subproblem.
subdatas: Arc<Vec<SubData>>,
/// The list of currently active constraints
active_constraints: Arc<RwLock<Vec<usize>>>,
/// The list of inactive constraints
inactive_constraints: Arc<RwLock<Vec<usize>>>,
/// The thread pool for parallel evaluation
pool: Option<ThreadPool>,
}
impl Subproblem {
fn evaluate<I>(&mut self, data: &SubData, y: &[Real], active: I) -> Result<(Real, DVector, DVector)>
where
I: IntoIterator<Item = usize>,
I::IntoIter: Clone,
{
let active = active.into_iter();
// lhs and rhs in correct order
let lhs = &data.lhs;
let lhs = active.clone().map(|i| &lhs[i]);
// compute costs
self.c.clear();
self.c.extend(data.cbase.iter());
for (y, lhs) in y.iter().zip(lhs.clone()) {
for elem in lhs {
self.c[elem.ind] += y * elem.val;
}
}
debug!("y={:?}", y);
debug!("c={}", self.c);
// solve subproblem
self.net.set_objective(&self.c)?;
self.net.solve()?;
debug!("obj={}", self.net.objective()?);
// compute minorant
let (objective, mut subg) = if data.rhs.len() > 0 {
let rhs = active.map(|i| data.rhs[i]);
let rhsval = y.iter().zip(rhs.clone()).map(|(y, r)| y * r).sum::<Real>();
(rhsval - self.net.objective()?, rhs.collect())
} else {
(-self.net.objective()?, dvec![0.0; y.len()])
};
let sol = self.net.get_solution()?;
for (i, lhs) in lhs.enumerate() {
subg[i] -= lhs.iter().map(|elem| elem.val * sol[elem.ind]).sum::<Real>();
}
if let Some(delay) = self.delay {
std::thread::sleep(delay);
}
Ok((objective, subg, sol))
}
}
impl SubData {
fn evaluate_constraint(&self, primal: &DVector, cidx: usize) -> Real {
let rhs = self.rhs.get(cidx).unwrap_or(&0.0);
let lhs = self.lhs[cidx]
.iter()
.map(|elem| elem.val * primal[elem.ind])
.sum::<Real>();
rhs - lhs
}
}
impl MMCFProblem {
pub fn num_subproblems(&self) -> usize {
self.subs.len()
}
pub fn read_mnetgen(basename: &str, config: MMCFConfig) -> result::Result<MMCFProblem, MMCFReadError> {
MMCFProblem::read_mnetgen_with_solver::<mcf::NetSpxSolver>(basename, config)
}
pub fn read_mnetgen_with_solver<S>(basename: &str, config: MMCFConfig) -> result::Result<MMCFProblem, MMCFReadError>
where
S: Solver + 'static,
{
use MMCFReadError::*;
let ncom;
let nnodes;
let narcs;
let ncaps;
{
let mut data = File::open(&format!("{}.nod", basename)).map(BufReader::new)?.lines();
ncom = data.next().ok_or(MissingNodField("ncom"))??.parse::<usize>()?;
nnodes = data.next().ok_or(MissingNodField("nnodes"))??.parse::<usize>()?;
narcs = data.next().ok_or(MissingNodField("narcs"))??.parse::<usize>()?;
ncaps = data.next().ok_or(MissingNodField("ncaps"))??.parse::<usize>()?;
if data.next().is_some() {
return Err(ExtraNodField);
}
}
// read nodes
let mut nets = Vec::with_capacity(ncom);
for i in 0..ncom {
nets.push(S::new(i, nnodes)?)
}
for line in File::open(&format!("{}.sup", basename)).map(BufReader::new)?.lines() {
let line = line?;
let mut data = line.split_whitespace();
let node = data.next().ok_or(MissingSupField("node"))?.parse::<usize>()?;
let com = data.next().ok_or(MissingSupField("com"))?.parse::<usize>()?;
let supply = data.next().ok_or(MissingSupField("supply"))?.parse::<Real>()?;
if data.next().is_some() {
return Err(ExtraSupField);
}
if node < 1 || node > nnodes {
return Err(InvalidNode(node, nnodes));
}
if com < 1 || com > ncom {
return Err(InvalidCom(com, ncom));
}
nets[com - 1].set_balance(node - 1, supply)?;
}
// read arcs
let mut arcmap = vec![vec![]; ncom];
let mut cbase = vec![dvec![]; ncom];
// lhs nonzeros
let mut lhsidx = vec![vec![vec![]; ncom]; ncaps];
for line in File::open(&format!("{}.arc", basename)).map(BufReader::new)?.lines() {
let line = line?;
let mut data = line.split_whitespace();
let arc = data.next().ok_or(MissingArcField("arc"))?.parse::<usize>()?;
let src = data.next().ok_or(MissingArcField("src"))?.parse::<usize>()?;
let snk = data.next().ok_or(MissingArcField("snk"))?.parse::<usize>()?;
let com = data.next().ok_or(MissingArcField("com"))?.parse::<usize>()?;
let cost = data.next().ok_or(MissingArcField("cost"))?.parse::<Real>()?;
let cap = data.next().ok_or(MissingArcField("cap"))?.parse::<Real>()?;
let mt = data.next().ok_or(MissingArcField("mt"))?.parse::<usize>()?;
if data.next().is_some() {
return Err(ExtraArcField);
}
if arc < 1 || arc > narcs {
return Err(InvalidArc(arc, narcs));
}
if src < 1 || src > nnodes {
return Err(InvalidNode(src, nnodes));
}
if snk < 1 || snk > nnodes {
return Err(InvalidNode(snk, nnodes));
}
if com < 1 || com > ncom {
return Err(InvalidCom(com, ncom));
}
if mt > ncaps {
return Err(InvalidMut(mt, ncaps));
}
let com = com - 1;
// set internal coeff
let coeff = arcmap[com].len();
arcmap[com].push(ArcInfo { arc, src, snk });
// add arc
nets[com].add_arc(src - 1, snk - 1, cost, if cap < 0.0 { f64::INFINITY } else { cap })?;
// set objective
cbase[com].push(cost); // + 1e-6 * coeff
// add to mutual capacity constraint
if mt > 0 {
lhsidx[mt - 1][com].push(coeff);
}
}
// read rhs of coupling constraints
let mut rhs = dvec![0.0; ncaps];
for line in File::open(&format!("{}.mut", basename)).map(BufReader::new)?.lines() {
let line = line?;
let mut data = line.split_whitespace();
let mt = data.next().ok_or(MissingMutField("mt"))?.parse::<usize>()?;
let cap = data.next().ok_or(MissingMutField("cap"))?.parse::<Real>()?;
if data.next().is_some() {
return Err(ExtraMutField);
}
if mt < 1 || mt > ncaps {
return Err(InvalidMut(mt, ncaps));
}
rhs[mt - 1] = cap;
}
// set lhs
let lhs = (0..ncom)
.map(|fidx| {
(0..ncaps)
.map(|i| lhsidx[i][fidx].iter().map(|&j| Elem { ind: j, val: 1.0 }).collect())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let subdatas = Arc::new(
cbase
.into_iter()
.zip(lhs)
.zip(std::iter::once(rhs).chain(std::iter::repeat(dvec![])))
.map(|((cbase, lhs), rhs)| SubData { lhs, rhs, cbase })
.collect(),
);
let subproblems = nets
.into_iter()
.enumerate()
.map(|(i, net)| Subproblem {
net: Box::new(net),
c: dvec![],
delay: if config.delayed_subs.contains(&i) {
Some(config.delay)
} else {
None
},
})
.map(RwLock::new)
.map(Arc::new)
.collect();
let mut mcf = MMCFProblem {
subs: subproblems,
subdatas,
active_constraints: Arc::new(RwLock::new((0..ncaps).collect())),
inactive_constraints: Arc::new(RwLock::new(vec![])),
pool: None,
};
mcf.set_separate_constraints(config.separate_constraints);
Ok(mcf)
}
/// Set whether coupling constraints should be separated.
///
/// If `separate` is true than all constraints will be made
/// inactive, otherwise all constraints will be made active.
pub fn set_separate_constraints(&mut self, separate: bool) {
let nconstrs = self.subdatas[0].lhs.len();
if separate {
self.active_constraints.write().unwrap().clear();
self.inactive_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
} else {
self.active_constraints = Arc::new(RwLock::new((0..nconstrs).collect()));
self.inactive_constraints.write().unwrap().clear();
}
}
/// Compute costs for a primal solution.
pub fn get_primal_costs(&self, fidx: usize, primals: &DVector) -> Real {
let sub = &self.subdatas[fidx];
primals.iter().enumerate().map(|(i, x)| x * sub.cbase[i]).sum()
}
/// Aggregate primal vectors.
pub fn aggregate_primals_ref(&self, primals: &[(Real, &Vec<DVector>)]) -> Vec<DVector> {
let mut aggr = primals[0]
.1
.iter()
.map(|x| {
let mut r = dvec![];
r.scal(primals[0].0, x);
r
})
.collect::<Vec<_>>();
for &(alpha, primal) in &primals[1..] {
for (j, x) in primal.iter().enumerate() {
aggr[j].add_scaled(alpha, x);
}
}
aggr
}
fn create_update<U>(&self, state: U, apply: impl FnOnce(Vec<usize>) -> Result<()> + Send + 'static) -> Result<bool>
where
U: ParallelUpdateState<DVector>,
{
if self.inactive_constraints.read().unwrap().is_empty() {
return Ok(false);
}
let subdatas = self.subdatas.clone();
let inactive_constraints = self.inactive_constraints.clone();
self.pool.as_ref().unwrap().execute(move || {
let newconstraints = {
inactive_constraints
.read()
.unwrap()
.iter()
.map(|&cidx| {
subdatas
.iter()
.enumerate()
.map(|(fidx, sub)| {
let primal = state.aggregated_primal(fidx);
sub.evaluate_constraint(primal, cidx)
})
.sum::<Real>()
})
.enumerate()
.filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
.collect::<Vec<_>>()
};
if !newconstraints.is_empty() {
apply(newconstraints).unwrap();
}
});
Ok(true)
}
fn apply_update(
update: &[usize],
active: Arc<RwLock<Vec<usize>>>,
inactive: Arc<RwLock<Vec<usize>>>,
) -> result::Result<(), Error> {
let mut inactive = inactive.write().unwrap();
let mut active = active.write().unwrap();
active.extend(update.iter().rev().map(|&i| inactive.swap_remove(i)));
Ok(())
}
fn send_update<S>(
subdatas: Arc<Vec<SubData>>,
actives: Arc<RwLock<Vec<usize>>>,
update: &[usize],
tx: S,
) -> Result<()>
where
S: UpdateSender<Self> + 'static,
{
let nnew = update.len();
let actives = actives.read().unwrap().clone();
if let Err(err) = tx.add_variables(
vec![(0.0, Real::infinity()); nnew],
Box::new(move |fidx: usize, m: &mut (Real, DVector, DVector)| {
let sub = &subdatas[fidx];
let n = m.dim();
let primal = &m.2;
m.1.extend((n..actives.len()).map(|i| sub.evaluate_constraint(primal, actives[i])));
Ok(())
}),
) {
warn!("Error sending problem update: {}", err);
}
Ok(())
}
}
impl ParallelProblem for MMCFProblem {
type Err = Error;
type Minorant = (Real, DVector, DVector);
fn num_variables(&self) -> usize {
self.active_constraints.read().unwrap().len()
}
fn lower_bounds(&self) -> Option<Vec<Real>> {
Some(vec![0.0; self.num_variables()])
}
fn num_subproblems(&self) -> usize {
self.subs.len()
}
fn start(&mut self) {
let pool = ThreadPool::new(4);
self.pool = Some(pool);
}
fn stop(&mut self) {
self.pool.take();
}
fn evaluate<S>(&mut self, i: usize, y: Arc<DVector>, tx: S) -> Result<()>
where
S: ResultSender<Self> + 'static,
{
if self.pool.is_none() {
self.start()
}
let sub = self.subs[i].clone();
let subdatas = self.subdatas.clone();
// Attention: `y` might be shorter than the set of active constraints
// because evaluation and problem update are not synchronized, i.e. the
// evaluation may still use an older model with less constraints.
let active_constraints = self.active_constraints.read().unwrap()[0..y.len()].to_vec();
self.pool.as_ref().unwrap().execute(move || {
let subdata = &subdatas[i];
match sub.write().unwrap().evaluate(subdata, &y, active_constraints) {
Ok((objective, subg, primal)) => {
tx.minorant((objective, subg, primal)).unwrap();
tx.objective(objective).unwrap();
}
Err(err) => tx.error(err).unwrap(),
}
});
Ok(())
}
fn update<U, S>(&mut self, state: U, tx: S) -> Result<()>
where
U: ParallelUpdateState<<Self::Minorant as Minorant>::Primal>,
S: UpdateSender<Self> + 'static,
{
if self.inactive_constraints.read().unwrap().is_empty() {
return Ok(());
}
let inactive_constraints = self.inactive_constraints.clone();
let active_constraints = self.active_constraints.clone();
let subdatas = self.subdatas.clone();
MMCFProblem::create_update(self, state, move |update| {
MMCFProblem::apply_update(&update, active_constraints.clone(), inactive_constraints)?;
MMCFProblem::send_update(subdatas, active_constraints, &update, tx)?;
Ok(())
})?;
Ok(())
}
}
impl DistributedFirstOrderProblem for MMCFProblem {
type Update = Vec<usize>;
fn compute_update<U>(
&self,
state: U,
apply: impl FnOnce(Self::Update) -> result::Result<(), Self::Err> + Send + 'static,
) -> result::Result<bool, Self::Err>
where
U: ParallelUpdateState<<Self::Minorant as Minorant>::Primal>,
{
MMCFProblem::create_update(self, state, apply)
}
fn apply_update(&mut self, update: &Self::Update) -> result::Result<(), Self::Err> {
MMCFProblem::apply_update(
update,
self.active_constraints.clone(),
self.inactive_constraints.clone(),
)
}
fn send_update<S>(&self, update: &Self::Update, tx: S) -> Result<()>
where
S: UpdateSender<Self> + 'static,
Self: Sized,
{
MMCFProblem::send_update(self.subdatas.clone(), self.active_constraints.clone(), update, tx)
}
}
unsafe impl Send for MMCFProblem {}
unsafe impl Sync for MMCFProblem {}