ENH: add request-driven polling/consumption of processor interfaces

- with (nPollProcInterfaces < 0) it does the following:

  - loop, waiting for some requests to finish
  - for each out-of-date interface, check if its associated
    requests have now finished (ie, the ready() check).
  - if ready() -> call updateInterfaceMatrix()

  In contrast to (nPollProcInterfaces > 0) which loops a specified
  number of times with several calls to MPI_Test each time, the
  (nPollProcInterfaces < 0) variant relies on internal MPI looping
  within MPI_Waitsome to progress communication.

  The actual dispatch still remains non-deterministic (ie, waiting for
  some requests to finish does not mean that any particular interface
  is eligible for update, or in any particular order). However, using
  Waitsome places the tight looping into the MPI layer, which results
  in few calls and eliminates behaviour dependent on the value of
  nPollProcInterfaces.

TUT: add polling to windAroundBuildings case (for testing purposes)
This commit is contained in:
Mark Olesen
2023-04-11 10:28:34 +02:00
parent e1cb12509e
commit 1f5cf3958b
3 changed files with 126 additions and 55 deletions

View File

@ -115,11 +115,46 @@ void Foam::LduMatrix<Type, DType, LUType>::updateMatrixInterfaces
{ {
const UPstream::commsTypes commsType = UPstream::defaultCommsType; const UPstream::commsTypes commsType = UPstream::defaultCommsType;
// Block until sends/receives have finished if
if (commsType == UPstream::commsTypes::nonBlocking) (
commsType == UPstream::commsTypes::nonBlocking
&& UPstream::nPollProcInterfaces
)
{ {
UPstream::waitRequests(startRequest); // Wait for some interface requests to become available and
// consume them. No guarantee that the finished requests actually
// correspond to any particular interface, but it is reasonably
// probable that some interfaces will be able to start consumption
// without waiting for all requests.
DynamicList<int> indices; // (work array)
while
(
UPstream::nPollProcInterfaces < 0
&& UPstream::waitSomeRequests(startRequest, &indices)
)
{
forAll(interfaces_, interfacei)
{
auto* intf = interfaces_.get(interfacei);
if (intf && !intf->updatedMatrix() && intf->ready())
{
intf->updateInterfaceMatrix
(
result,
add,
lduMesh_.lduAddr(),
interfacei,
psiif,
interfaceCoeffs[interfacei],
commsType
);
} }
}
}
}
if if
( (
@ -127,11 +162,23 @@ void Foam::LduMatrix<Type, DType, LUType>::updateMatrixInterfaces
|| commsType == UPstream::commsTypes::nonBlocking || commsType == UPstream::commsTypes::nonBlocking
) )
{ {
// Wait until sends/receives have finished.
// - effectively a no-op (without waiting) if already completed.
if (commsType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startRequest);
}
// Check/no-check for updatedMatrix() ?
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
forAll(interfaces_, interfacei) forAll(interfaces_, interfacei)
{ {
if (interfaces_.set(interfacei)) auto* intf = interfaces_.get(interfacei);
if (intf && (noCheck || !intf->updatedMatrix()))
{ {
interfaces_[interfacei].updateInterfaceMatrix intf->updateInterfaceMatrix
( (
result, result,
add, add,

View File

@ -118,13 +118,32 @@ void Foam::lduMatrix::updateMatrixInterfaces
{ {
const UPstream::commsTypes commsType = UPstream::defaultCommsType; const UPstream::commsTypes commsType = UPstream::defaultCommsType;
if (commsType == UPstream::commsTypes::blocking) if
(
commsType == UPstream::commsTypes::nonBlocking
&& UPstream::nPollProcInterfaces
)
{
// Wait for some interface requests to become available and
// consume them. No guarantee that the finished requests actually
// correspond to any particular interface, but it is reasonably
// probable that some interfaces will be able to start consumption
// without waiting for all requests.
DynamicList<int> indices; // (work array)
while
(
UPstream::nPollProcInterfaces < 0
&& UPstream::waitSomeRequests(startRequest, &indices)
)
{ {
forAll(interfaces, interfacei) forAll(interfaces, interfacei)
{ {
if (interfaces.set(interfacei)) auto* intf = interfaces.get(interfacei);
if (intf && !intf->updatedMatrix() && intf->ready())
{ {
interfaces[interfacei].updateInterfaceMatrix intf->updateInterfaceMatrix
( (
result, result,
add, add,
@ -138,26 +157,29 @@ void Foam::lduMatrix::updateMatrixInterfaces
} }
} }
} }
else if (commsType == UPstream::commsTypes::nonBlocking)
{
// Try and consume interfaces as they become available
bool allUpdated = false;
for (label i=0; i<UPstream::nPollProcInterfaces; i++) // [OLDER]
// Alternative for consuming interfaces as they become available.
// Within the loop, the ready() calls an MPI_Test
// that can trigger progression. However, a bit less reliably
// (and less efficient) since it is implies multiple calls to
// MPI_Test to progress the MPI transfer, but no guarantee that
// any of them will actually complete within nPollProcInterfaces
// calls.
for (label i = 0; i < UPstream::nPollProcInterfaces; ++i)
{ {
allUpdated = true; bool allUpdated = true;
forAll(interfaces, interfacei) forAll(interfaces, interfacei)
{ {
if auto* intf = interfaces.get(interfacei);
(
interfaces.set(interfacei) if (intf && !intf->updatedMatrix())
&& !interfaces[interfacei].updatedMatrix()
)
{ {
if (interfaces[interfacei].ready()) if (intf->ready())
{ {
interfaces[interfacei].updateInterfaceMatrix intf->updateInterfaceMatrix
( (
result, result,
add, add,
@ -178,35 +200,36 @@ void Foam::lduMatrix::updateMatrixInterfaces
if (allUpdated) if (allUpdated)
{ {
break; break; // Early exit
}
} }
} }
// Block for everything
if (Pstream::parRun())
{
if (allUpdated)
{
// All received. Just remove all outstanding requests
UPstream::resetRequests(startRequest);
}
else
{
// Block for all requests and remove storage
UPstream::waitRequests(startRequest);
}
}
// Consume
forAll(interfaces, interfacei)
{
if if
( (
interfaces.set(interfacei) commsType == UPstream::commsTypes::blocking
&& !interfaces[interfacei].updatedMatrix() || commsType == UPstream::commsTypes::nonBlocking
) )
{ {
interfaces[interfacei].updateInterfaceMatrix // Wait until sends/receives have finished.
// - effectively a no-op (without waiting) if already completed.
if (commsType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startRequest);
}
// Check/no-check for updatedMatrix() ?
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
// Consume anything still outstanding
forAll(interfaces, interfacei)
{
auto* intf = interfaces.get(interfacei);
if (intf && (noCheck || !intf->updatedMatrix()))
{
intf->updateInterfaceMatrix
( (
result, result,
add, add,
@ -282,7 +305,7 @@ void Foam::lduMatrix::updateMatrixInterfaces
psiif, psiif,
coupleCoeffs[interfacei], coupleCoeffs[interfacei],
cmpt, cmpt,
Pstream::commsTypes::blocking UPstream::commsTypes::blocking
); );
} }
} }

View File

@ -9,7 +9,8 @@ runApplication decomposePar
restore0Dir -processor restore0Dir -processor
runParallel $(getApplication) # Test polling interfaces
runParallel $(getApplication) -opt-switch nPollProcInterfaces=-1
runApplication reconstructPar runApplication reconstructPar