ENH: implement OPstream rewind() to support reuse of output streams

- a somewhat specialized use case, but can be useful when there are
  many ranks with sparse communication but for which the access
  pattern is established during inner loops.

      PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
      pBufs.allowClearRecv(false);

      PtrList<OPstream> output(Pstream::nProcs());

      while (condition)
      {
          // Rewind existing streams
          forAll(output, proci)
          {
              auto* osptr = output.get(proci);
              if (osptr)
              {
                  (*osptr).rewind();
              }
          }

          for (Particle& p : myCloud)
          {
              label toProci = ...;

              // Get or create output stream
              auto* osptr = output.get(toProci);
              if (!osptr)
              {
                  osptr = new OPstream(toProci, pBufs);
                  output.set(toProci, osptr);
              }

              // Append more data...
              (*osptr) << p;
          }

          pBufs.finishedSends();

          ... reads
      }
This commit is contained in:
Mark Olesen
2022-02-28 17:32:50 +01:00
committed by Andrew Heather
parent af8161925b
commit bfca84d11a
6 changed files with 78 additions and 42 deletions

View File

@ -64,8 +64,7 @@ class Pstream
{ {
// Private Static Functions // Private Static Functions
//- Exchange contiguous data. Sends sendData, receives into //- Exchange contiguous data. Sends sendBufs, receives into recvBufs.
// recvData. If block=true will wait for all transfers to finish.
// Data provided and received as container. // Data provided and received as container.
template<class Container, class T> template<class Container, class T>
static void exchangeContainer static void exchangeContainer
@ -75,11 +74,10 @@ class Pstream
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block //!< Wait for all to finish const bool wait //!< Wait for requests to complete
); );
//- Exchange contiguous data. Sends sendData, receives into //- Exchange contiguous data. Sends sendBufs, receives into recvBufs.
// recvData. If block=true will wait for all transfers to finish.
// Data provided and received as pointers. // Data provided and received as pointers.
template<class T> template<class T>
static void exchangeBuf static void exchangeBuf
@ -90,7 +88,7 @@ class Pstream
List<char*>& recvBufs, List<char*>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block //!< Wait for all to finish const bool wait //!< Wait for requests to complete
); );
@ -119,7 +117,7 @@ public:
: :
UPstream(commsType) UPstream(commsType)
{ {
if (bufSize) if (bufSize > 0)
{ {
transferBuf_.setCapacity(bufSize + 2*sizeof(scalar) + 1); transferBuf_.setCapacity(bufSize + 2*sizeof(scalar) + 1);
} }
@ -438,7 +436,7 @@ public:
List<Container>& recvData, List<Container>& recvData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm, const label comm = UPstream::worldComm,
const bool block = true const bool wait = true //!< Wait for requests to complete
); );
//- Exchange contiguous data. Sends sendData, receives into //- Exchange contiguous data. Sends sendData, receives into
@ -451,7 +449,7 @@ public:
List<Container>& recvData, List<Container>& recvData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm, const label comm = UPstream::worldComm,
const bool block = true const bool wait = true //!< Wait for requests to complete
); );
}; };

View File

@ -43,7 +43,7 @@ void Foam::Pstream::exchangeContainer
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool wait
) )
{ {
const label startOfRequests = Pstream::nRequests(); const label startOfRequests = Pstream::nRequests();
@ -101,7 +101,7 @@ void Foam::Pstream::exchangeContainer
// Wait for all to finish // Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~ // ~~~~~~~~~~~~~~~~~~~~~~
if (block) if (wait)
{ {
UPstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
} }
@ -117,7 +117,7 @@ void Foam::Pstream::exchangeBuf
List<char*>& recvBufs, List<char*>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool wait
) )
{ {
const label startOfRequests = Pstream::nRequests(); const label startOfRequests = Pstream::nRequests();
@ -175,7 +175,7 @@ void Foam::Pstream::exchangeBuf
// Wait for all to finish // Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~ // ~~~~~~~~~~~~~~~~~~~~~~
if (block) if (wait)
{ {
UPstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
} }
@ -190,7 +190,7 @@ void Foam::Pstream::exchange
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool wait
) )
{ {
// OR static_assert(is_contiguous<T>::value, "Contiguous data only!") // OR static_assert(is_contiguous<T>::value, "Contiguous data only!")
@ -234,7 +234,7 @@ void Foam::Pstream::exchange
recvBufs, recvBufs,
tag, tag,
comm, comm,
block wait
); );
} }
else else
@ -327,7 +327,7 @@ void Foam::Pstream::exchange
charRecvBufs, charRecvBufs,
tag, tag,
comm, comm,
block wait
); );
forAll(nSend, proci) forAll(nSend, proci)
@ -410,21 +410,14 @@ void Foam::Pstream::exchangeSizes
/// template<class Container> /// template<class Container>
/// void Foam::Pstream::exchangeSizes /// void Foam::Pstream::exchangeSizes
/// ( /// (
/// const labelUList& sendRecvProcs, /// const labelUList& neighProcs,
/// const Container& sendBufs, /// const Container& sendBufs,
/// labelList& recvSizes, /// labelList& recvSizes,
/// const label tag, /// const label tag,
/// const label comm /// const label comm
/// ) /// )
/// { /// {
/// exchangeSizes<Container> /// exchangeSizes<Container>(neighProcs, neighProcs, sendBufs, tag, comm);
/// (
/// sendRecvProcs,
/// sendRecvProcs,
/// sendBufs,
/// tag,
/// comm
/// );
/// } /// }
@ -462,13 +455,13 @@ void Foam::Pstream::exchange
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool wait
) )
{ {
labelList recvSizes; labelList recvSizes;
exchangeSizes(sendBufs, recvSizes, comm); exchangeSizes(sendBufs, recvSizes, comm);
exchange<Container, T>(sendBufs, recvSizes, recvBufs, tag, comm, block); exchange<Container, T>(sendBufs, recvSizes, recvBufs, tag, comm, wait);
} }

View File

@ -177,17 +177,20 @@ public:
return true; return true;
} }
//- Rewind the stream so that it may be read again
void rewind(); // Positioning
//- Rewind the receive stream position so that it may be read again
virtual void rewind();
// Edit // Edit
//- Set flags of stream //- Set flags of stream
virtual ios_base::fmtflags flags(const ios_base::fmtflags) virtual ios_base::fmtflags flags(const ios_base::fmtflags)
{ {
return ios_base::fmtflags(0); return ios_base::fmtflags(0);
} }
// Print // Print

View File

@ -490,9 +490,26 @@ bool Foam::UIPstreamBase::beginRawRead()
} }
// Not needed yet
///
/// //- The current get position (tellg) in the buffer
/// label pos() const;
///
/// Foam::label Foam::UIPstreamBase::pos() const
/// {
/// return recvBufPos_;
/// }
void Foam::UIPstreamBase::rewind() void Foam::UIPstreamBase::rewind()
{ {
recvBufPos_ = 0; recvBufPos_ = 0; // Assume the entire buffer is for us to read from
setOpened();
setGood();
if (recvBuf_.empty() || !messageSize_)
{
setEof();
}
} }

View File

@ -251,13 +251,19 @@ public:
} }
// Edit // Positioning
//- Set flags of stream //- Rewind the send buffer for overwriting
virtual ios_base::fmtflags flags(const ios_base::fmtflags) virtual void rewind();
{
return ios_base::fmtflags(0);
} // Edit
//- Set flags of stream
virtual ios_base::fmtflags flags(const ios_base::fmtflags)
{
return ios_base::fmtflags(0);
}
// Print // Print

View File

@ -369,6 +369,25 @@ bool Foam::UOPstreamBase::beginRawWrite(std::streamsize count)
} }
// Not needed yet
///
/// //- The current put position (tellp) in the buffer
/// label pos() const;
///
/// Foam::label Foam::UOPstreamBase::pos() const
/// {
/// return sendBuf_.size();
/// }
void Foam::UOPstreamBase::rewind()
{
sendBuf_.clear(); // Overwrite into buffer
setOpened();
setGood();
}
void Foam::UOPstreamBase::print(Ostream& os) const void Foam::UOPstreamBase::print(Ostream& os) const
{ {
os << "Writing from processor " << toProcNo_ os << "Writing from processor " << toProcNo_