8000 Added ability to offer a buffer range to a publication by nicklauslittle · Pull Request #401 · aeron-io/aeron · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added ability to offer a buffer range to a publication #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions aeron-client/src/main/cpp/Publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define INCLUDED_AERON_PUBLICATION__

#include <iostream>
#include <array>
#include <atomic>
#include <concurrent/AtomicBuffer.h>
#include <concurrent/logbuffer/BufferClaim.h>
Expand Down Expand Up @@ -319,6 +320,112 @@ class Publication
return offer(buffer, 0, buffer.capacity());
}

/**
* Non-blocking publish of buffers containing a message.
*
* @param startBuffer containing part of the message.
* @param lastBuffer after the message.
* @param reservedValueSupplier for the frame.
* @return The new stream position, otherwise {@link #NOT_CONNECTED}, {@link #BACK_PRESSURED},
* {@link #ADMIN_ACTION} or {@link #CLOSED}.
*/
template <class BufferIterator> std::int64_t offer(
BufferIterator startBuffer,
BufferIterator lastBuffer,
const on_reserved_value_supplier_t& reservedValueSupplier = DEFAULT_RESERVED_VALUE_SUPPLIER)
{
util::index_t length = 0;
for (BufferIterator it = startBuffer; it != lastBuffer; ++it)
{
if (AERON_COND_EXPECT(length + it->capacity() < 0, false))
{
throw aeron::util::IllegalStateException(
aeron::util::strPrintf("length overflow: %d + %d -> %d", length, it->capacity(), length + it->capacity()),
SOURCEINFO);
}

length += it->capacity();
}

std::int64_t newPosition = PUBLICATION_CLOSED;

if (!isClosed())
{
const std::int64_t limit = m_publicationLimit.getVolatile();
const std::int32_t termCount = LogBufferDescriptor::activeTermCount(m_logMetaDataBuffer);
TermAppender *termAppender = m_appenders[LogBufferDescriptor::indexByTermCount(termCount)].get();
const std::int64_t rawTail = termAppender->rawTailVolatile();
const std::int64_t termOffset = rawTail & 0xFFFFFFFF;
const std::int32_t termId = LogBufferDescriptor::termId(rawTail);
const std::int64_t position =
LogBufferDescriptor::computeTermBeginPosition(
termId, m_positionBitsToShift, m_initialTermId) + termOffset;

if (termCount != (termId - m_initialTermId))
{
return ADMIN_ACTION;
}

if (position < limit)
{
std::int32_t resultingOffset;
if (length <= m_maxPayloadLength)
{
resultingOffset = termAppender->appendUnfragmentedMessage(
m_headerWriter, startBuffer, length, reservedValueSupplier, termId);
}
else
{
checkForMaxMessageLength(length);
resultingOffset = termAppender->appendFragmentedMessage(
m_headerWriter, startBuffer, length, m_maxPayloadLength, reservedValueSupplier, termId);
}

newPosition =
Publication::newPosition(
termCount, static_cast<std::int32_t>(termOffset), termId, position, resultingOffset);
}
else
{
newPosition = Publication::backPressureStatus(position, length);
}
}

return newPosition;
}

/**
* Non-blocking publish of array of buffers containing a message.
*
* @param buffers containing parts of the message.
* @param length of the array of buffers.
* @param reservedValueSupplier for the frame.
* @return The new stream position, otherwise {@link #NOT_CONNECTED}, {@link #BACK_PRESSURED},
* {@link #ADMIN_ACTION} or {@link #CLOSED}.
*/
std::int64_t offer(
const concurrent::AtomicBuffer buffers[],
size_t length,
const on_reserved_value_supplier_t& reservedValueSupplier = DEFAULT_RESERVED_VALUE_SUPPLIER)
{
return offer(buffers, buffers + length, reservedValueSupplier);
}

/**
* Non-blocking publish of array of buffers containing a message.
*
* @param buffers containing parts of the message.
* @param reservedValueSupplier for the frame.
* @return The new stream position, otherwise {@link #NOT_CONNECTED}, {@link #BACK_PRESSURED},
* {@link #ADMIN_ACTION} or {@link #CLOSED}.
*/
template <size_t N> std::int64_t offer(
const std::array<concurrent::AtomicBuffer, N>& buffers,
const on_reserved_value_supplier_t& reservedValueSupplier = DEFAULT_RESERVED_VALUE_SUPPLIER)
{
return offer(buffers.begin(), buffers.end(), reservedValueSupplier);
}

/**
* Try to claim a range in the publication log into which a message can be written with zero copy semantics.
* Once the message has been written then {@link BufferClaim#commit()} should be called thus making it available.
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/cpp/concurrent/AtomicBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class AtomicBuffer
return atomic::getAndAddInt32((volatile std::int32_t*)(m_buffer + offset), delta);
}

inline COND_MOCK_VIRTUAL void putBytes(util::index_t index, concurrent::AtomicBuffer& srcBuffer, util::index_t srcIndex, util::index_t length)
inline COND_MOCK_VIRTUAL void putBytes(util::index_t index, const concurrent::AtomicBuffer& srcBuffer, util::index_t srcIndex, util::index_t length)
{
boundsCheck(index, length);
srcBuffer.boundsCheck(srcIndex, length);
Expand Down
127 changes: 127 additions & 0 deletions aeron-client/src/main/cpp/concurrent/logbuffer/TermAppender.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,48 @@ class TermAppender
return static_cast<std::int32_t>(resultingOffset);
}

template <class BufferIterator> std::int32_t appendUnfragmentedMessage(
const HeaderWriter& header,
BufferIterator bufferIt,
util::index_t length,
const on_reserved_value_supplier_t& reservedValueSupplier,
std::int32_t activeTermId)
{
const util::index_t frameLength = length + DataFrameHeader::LENGTH;
const util::index_t alignedLength = util::BitUtil::align(frameLength, FrameDescriptor::FRAME_ALIGNMENT);
const std::int64_t rawTail = getAndAddRawTail(alignedLength);
const std::int64_t termOffset = rawTail & 0xFFFFFFFF;
const std::int32_t termId = LogBufferDescriptor::termId(rawTail);

const std::int32_t termLength = m_termBuffer.capacity();

checkTerm(activeTermId, termId);

std::int64_t resultingOffset = termOffset + alignedLength;
if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(m_termBuffer, termOffset, header, termLength, termId);
}
else
{
const std::int32_t frameOffset = static_cast<std::int32_t>(termOffset);
header.write(m_termBuffer, frameOffset, frameLength, termId);

std::int32_t offset = frameOffset + DataFrameHeader::LENGTH;
for (std::int32_t endingOffset = offset + length; offset < endingOffset; offset += bufferIt->capacity(), ++bufferIt)
{
m_termBuffer.putBytes(offset, *bufferIt, 0, bufferIt->capacity());
}

const std::int64_t reservedValue = reservedValueSupplier(m_termBuffer, frameOffset, frameLength);
m_termBuffer.putInt64(frameOffset + DataFrameHeader::RESERVED_VALUE_FIELD_OFFSET, reservedValue);

FrameDescriptor::frameLengthOrdered(m_termBuffer, frameOffset, frameLength);
}

return static_cast<std::int32_t>(resultingOffset);
}

std::int32_t appendFragmentedMessage(
const HeaderWriter& header,
AtomicBuffer& srcBuffer,
Expand Down Expand Up @@ -212,6 +254,91 @@ class TermAppender
return static_cast<std::int32_t>(resultingOffset);
}

template <class BufferIterator> std::int32_t appendFragmentedMessage(
const HeaderWriter& header,
BufferIterator bufferIt,
util::index_t length,
util::index_t maxPayloadLength,
const on_reserved_value_supplier_t& reservedValueSupplier,
std::int32_t activeTermId)
{
const int numMaxPayloads = length / maxPayloadLength;
const util::index_t remainingPayload = length % maxPayloadLength;
const util::index_t lastFrameLength = (remainingPayload > 0) ?
util::BitUtil::align(remainingPayload + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT) : 0;
const util::index_t requiredLength =
(numMaxPayloads * (maxPayloadLength + DataFrameHeader::LENGTH)) + lastFrameLength;
const std::int64_t rawTail = getAndAddRawTail(requiredLength);
const std::int64_t termOffset = rawTail & 0xFFFFFFFF;
const std::int32_t termId = LogBufferDescriptor::termId(rawTail);

const std::int32_t termLength = m_termBuffer.capacity();

checkTerm(activeTermId, termId);

std::int64_t resultingOffset = termOffset + requiredLength;
if (resultingOffset > termLength)
{
resultingOffset = handleEndOfLogCondition(m_termBuffer, termOffset, header, termLength, termId);
}
else
{
std::uint8_t flags = FrameDescriptor::BEGIN_FRAG;
util::index_t remaining = length;
std::int32_t frameOffset = static_cast<std::int32_t>(termOffset);
util::index_t currentBufferOffset = 0;

do
{
const util::index_t bytesToWrite = std::min(remaining, maxPayloadLength);
const util::index_t frameLength = bytesToWrite + DataFrameHeader::LENGTH;
const util::index_t alignedLength = util::BitUtil::align(frameLength, FrameDescriptor::FRAME_ALIGNMENT);

header.write(m_termBuffer, frameOffset, frameLength, termId);

util::index_t bytesWritten = 0;
util::index_t payloadOffset = frameOffset + DataFrameHeader::LENGTH;
do
{
const util::index_t currentBufferRemaining = bufferIt->capacity() - currentBufferOffset;
const util::index_t numBytes = std::min(bytesToWrite - bytesWritten, currentBufferRemaining);

m_termBuffer.putBytes(payloadOffset, *bufferIt, currentBufferOffset, numBytes);

bytesWritten += numBytes;
payloadOffset += numBytes;
currentBufferOffset += numBytes;

if (currentBufferRemaining <= numBytes)
{
++bufferIt;
currentBufferOffset = 0;
}
}
while (bytesWritten < bytesToWrite);

if (remaining <= maxPayloadLength)
{
flags |= FrameDescriptor::END_FRAG;
}

FrameDescriptor::frameFlags(m_termBuffer, frameOffset, flags);

const std::int64_t reservedValue = reservedValueSupplier(m_termBuffer, frameOffset, frameLength);
m_termBuffer.putInt64(frameOffset + DataFrameHeader::RESERVED_VALUE_FIELD_OFFSET, reservedValue);

FrameDescriptor::frameLengthOrdered(m_termBuffer, frameOffset, frameLength);

flags = 0;
frameOffset += alignedLength;
remaining -= bytesToWrite;
}
while (remaining > 0);
}

return static_cast<std::int32_t>(resultingOffset);
}

private:
AtomicBuffer& m_termBuffer;
AtomicBuffer& m_tailBuffer;
Expand Down
26 changes: 17 additions & 9 deletions aeron-client/src/main/cpp/util/BitUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef INCLUDED_AERON_UTIL_BITUTIL__
#define INCLUDED_AERON_UTIL_BITUTIL__

#include <cstdint>
#include <type_traits>
#include <util/Exceptions.h>

Expand Down Expand Up @@ -82,7 +83,12 @@ namespace BitUtil
#if defined(__GNUC__)
return __builtin_clz(value);
#elif defined(_MSC_VER)
return __lzcnt(value);
unsigned long r;

if (_BitScanReverse(&r, (unsigned long)value))
return 31 - (int)r;

return 32;
#else
#error "do not understand how to clz"
#endif
Expand All @@ -94,9 +100,16 @@ namespace BitUtil
{
#if defined(__GNUC__)
return __builtin_ctz(value);
#elif defined(_MSC_VER)
unsigned long r;

if (_BitScanForward(&r, (unsigned long)value))
return r;

return 32;
#else
static_assert(std::is_integral<value_t>::value, "numberOfTrailingZeroes only available on integral types");
static_assert(sizeof(value_t)==4, "numberOfTrailingZeroes only available on 32-bit integral types");
static_assert(sizeof(value_t) <= 4, "numberOfTrailingZeroes only available on up to 32-bit integral types");

static char table[32] = {
0, 1, 2, 24, 3, 19, 6, 25,
Expand All @@ -109,9 +122,9 @@ namespace BitUtil
return 32;
}

value = (value & -value) * 0x04D7651F;
uint32_t index = static_cast<uint32_t>((value & -value) * 0x04D7651F);

return table[value >> 27];
return table[index >> 27];
#endif
}

Expand All @@ -124,11 +137,6 @@ namespace BitUtil
{
static_assert(std::is_integral<value_t>::value, "findNextPowerOfTwo only available on integral types");

#if defined(__GNUC__)
if (sizeof(value) == sizeof(unsigned int))
return 1 << (32 - numberOfLeadingZeroes(value - 1));
#endif

value--;

// Set all bits below the leading one using binary expansion http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/test/cpp/concurrent/MockAtomicBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MockAtomicBuffer : public AtomicBuffer
MOCK_CONST_METHOD1(getInt32Volatile, std::int32_t(util::index_t offset));
MOCK_METHOD2(getAndAddInt32, std::int32_t(util::index_t offset, std::int32_t delta));
MOCK_METHOD2(getAndAddInt64, std::int64_t(util::index_t offset, std::int64_t delta));
MOCK_METHOD4(putBytes, void(util::index_t index, concurrent::AtomicBuffer& srcBuffer, util::index_t srcIndex, util::index_t length));
MOCK_METHOD4(putBytes, void(util::index_t index, const concurrent::AtomicBuffer& srcBuffer, util::index_t srcIndex, util::index_t length));
MOCK_METHOD3(putBytes, void(util::index_t index, const std::uint8_t *srcBuffer, util::index_t length));
MOCK_METHOD2(putInt32Ordered, void(util::index_t offset, std::int32_t v));
MOCK_METHOD1(getUInt16, std::uint16_t(util::index_t offset));
Expand Down
21 changes: 21 additions & 0 deletions aeron-client/src/test/cpp/util/UtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,25 @@ TEST(utilTests, findNextPowerOfTwo)
{
EXPECT_EQ(BitUtil::findNextPowerOfTwo<std::uint32_t>(33), 64u);
EXPECT_EQ(BitUtil::findNextPowerOfTwo<std::uint32_t>(4096), 4096u);
EXPECT_EQ(BitUtil::findNextPowerOfTwo<std::uint32_t>(4097), 8192u);
}

TEST(utilTests, numberOfLeadingZeroes)
{
EXPECT_EQ(BitUtil::numberOfLeadingZeroes<std::uint32_t>(0xFFFFFFFF), 0);
EXPECT_EQ(BitUtil::numberOfLeadingZeroes<std::uint32_t>(0x10000000), 3);
EXPECT_EQ(BitUtil::numberOfLeadingZeroes<std::uint32_t>(0x010000FF), 7);
EXPECT_EQ(BitUtil::numberOfLeadingZeroes<std::uint32_t>(0x0000FFFF), 16);
EXPECT_EQ(BitUtil::numberOfLeadingZeroes<std::uint32_t>(0x00000001), 31);
}

TEST(utilTests, numberOfTrailingZeroes)
{
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(1 << 21), 21);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0x00000008), 3);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0x80000000), 31);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0x01000080), 7);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0x0000FFFF), 0);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0xFFFF0000), 16);
EXPECT_EQ(BitUtil::numberOfTrailingZeroes<std::uint32_t>(0x00000001), 0);
}
Loading
0