// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
// Licensed under the MIT License:
|
//
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
// of this software and associated documentation files (the "Software"), to deal
|
// in the Software without restriction, including without limitation the rights
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
// copies of the Software, and to permit persons to whom the Software is
|
// furnished to do so, subject to the following conditions:
|
//
|
// The above copyright notice and this permission notice shall be included in
|
// all copies or substantial portions of the Software.
|
//
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
// THE SOFTWARE.
|
|
#ifndef CAPNP_BENCHMARK_CAPNP_COMMON_H_
|
#define CAPNP_BENCHMARK_CAPNP_COMMON_H_
|
|
#if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS)
|
#pragma GCC system_header
|
#endif
|
|
#include "common.h"
|
#include <capnp/serialize.h>
|
#include <capnp/serialize-packed.h>
|
#include <kj/debug.h>
|
#if HAVE_SNAPPY
|
#include <capnp/serialize-snappy.h>
|
#endif // HAVE_SNAPPY
|
#include <thread>
|
|
namespace capnp {
|
namespace benchmark {
|
namespace capnp {
|
|
class CountingOutputStream: public kj::FdOutputStream {
|
public:
|
CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {}
|
|
uint64_t throughput;
|
|
void write(const void* buffer, size_t size) override {
|
FdOutputStream::write(buffer, size);
|
throughput += size;
|
}
|
|
void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
|
FdOutputStream::write(pieces);
|
for (auto& piece: pieces) {
|
throughput += piece.size();
|
}
|
}
|
};
|
|
// =======================================================================================
|
|
struct Uncompressed {
|
typedef kj::FdInputStream& BufferedInput;
|
typedef InputStreamMessageReader MessageReader;
|
|
class ArrayMessageReader: public FlatArrayMessageReader {
|
public:
|
ArrayMessageReader(kj::ArrayPtr<const byte> array,
|
ReaderOptions options = ReaderOptions(),
|
kj::ArrayPtr<word> scratchSpace = nullptr)
|
: FlatArrayMessageReader(kj::arrayPtr(
|
reinterpret_cast<const word*>(array.begin()),
|
reinterpret_cast<const word*>(array.end())), options) {}
|
};
|
|
static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
|
writeMessage(output, builder);
|
}
|
};
|
|
struct Packed {
|
typedef kj::BufferedInputStreamWrapper BufferedInput;
|
typedef PackedMessageReader MessageReader;
|
|
class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader {
|
public:
|
ArrayMessageReader(kj::ArrayPtr<const byte> array,
|
ReaderOptions options = ReaderOptions(),
|
kj::ArrayPtr<word> scratchSpace = nullptr)
|
: ArrayInputStream(array),
|
PackedMessageReader(*this, options, scratchSpace) {}
|
};
|
|
static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
|
writePackedMessage(output, builder);
|
}
|
|
static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) {
|
writePackedMessage(output, builder);
|
}
|
};
|
|
#if HAVE_SNAPPY
|
static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE];
|
static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE];
|
static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE];
|
|
struct SnappyCompressed {
|
typedef BufferedInputStreamWrapper BufferedInput;
|
typedef SnappyPackedMessageReader MessageReader;
|
|
class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader {
|
public:
|
ArrayMessageReader(kj::ArrayPtr<const byte> array,
|
ReaderOptions options = ReaderOptions(),
|
kj::ArrayPtr<word> scratchSpace = nullptr)
|
: ArrayInputStream(array),
|
SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace,
|
kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {}
|
};
|
|
static inline void write(OutputStream& output, MessageBuilder& builder) {
|
writeSnappyPackedMessage(output, builder,
|
kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE),
|
kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE));
|
}
|
};
|
#endif // HAVE_SNAPPY
|
|
// =======================================================================================
|
|
struct NoScratch {
|
struct ScratchSpace {};
|
|
template <typename Compression>
|
class MessageReader: public Compression::MessageReader {
|
public:
|
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
|
: Compression::MessageReader(input) {}
|
};
|
|
template <typename Compression>
|
class ArrayMessageReader: public Compression::ArrayMessageReader {
|
public:
|
inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
|
: Compression::ArrayMessageReader(input) {}
|
};
|
|
class MessageBuilder: public MallocMessageBuilder {
|
public:
|
inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {}
|
};
|
|
class ObjectSizeCounter {
|
public:
|
ObjectSizeCounter(uint64_t iters): counter(0) {}
|
|
template <typename RequestBuilder, typename ResponseBuilder>
|
void add(RequestBuilder& request, ResponseBuilder& response) {
|
for (auto segment: request.getSegmentsForOutput()) {
|
counter += segment.size() * sizeof(word);
|
}
|
for (auto segment: response.getSegmentsForOutput()) {
|
counter += segment.size() * sizeof(word);
|
}
|
}
|
|
uint64_t get() { return counter; }
|
|
private:
|
uint64_t counter;
|
};
|
};
|
|
constexpr size_t SCRATCH_SIZE = 128 * 1024;
|
word scratchSpace[6 * SCRATCH_SIZE];
|
int scratchCounter = 0;
|
|
struct UseScratch {
|
struct ScratchSpace {
|
word* words;
|
|
ScratchSpace() {
|
KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once.");
|
words = scratchSpace + scratchCounter++ * SCRATCH_SIZE;
|
}
|
~ScratchSpace() noexcept {
|
--scratchCounter;
|
}
|
};
|
|
template <typename Compression>
|
class MessageReader: public Compression::MessageReader {
|
public:
|
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
|
: Compression::MessageReader(
|
input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
|
};
|
|
template <typename Compression>
|
class ArrayMessageReader: public Compression::ArrayMessageReader {
|
public:
|
inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
|
: Compression::ArrayMessageReader(
|
input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
|
};
|
|
class MessageBuilder: public MallocMessageBuilder {
|
public:
|
inline MessageBuilder(ScratchSpace& scratch)
|
: MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
|
};
|
|
class ObjectSizeCounter {
|
public:
|
ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {}
|
|
template <typename RequestBuilder, typename ResponseBuilder>
|
void add(RequestBuilder& request, ResponseBuilder& response) {
|
size_t counter = 0;
|
for (auto segment: request.getSegmentsForOutput()) {
|
counter += segment.size() * sizeof(word);
|
}
|
for (auto segment: response.getSegmentsForOutput()) {
|
counter += segment.size() * sizeof(word);
|
}
|
maxSize = std::max(counter, maxSize);
|
}
|
|
uint64_t get() { return iters * maxSize; }
|
|
private:
|
uint64_t iters;
|
size_t maxSize;
|
};
|
};
|
|
// =======================================================================================
|
|
template <typename TestCase, typename ReuseStrategy, typename Compression>
|
struct BenchmarkMethods {
|
static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
|
kj::FdInputStream inputStream(inputFd);
|
typename Compression::BufferedInput bufferedInput(inputStream);
|
|
CountingOutputStream output(outputFd);
|
typename ReuseStrategy::ScratchSpace builderScratch;
|
typename ReuseStrategy::ScratchSpace readerScratch;
|
|
for (; iters > 0; --iters) {
|
typename TestCase::Expectation expected;
|
{
|
typename ReuseStrategy::MessageBuilder builder(builderScratch);
|
expected = TestCase::setupRequest(
|
builder.template initRoot<typename TestCase::Request>());
|
Compression::write(output, builder);
|
}
|
|
{
|
typename ReuseStrategy::template MessageReader<Compression> reader(
|
bufferedInput, readerScratch);
|
if (!TestCase::checkResponse(
|
reader.template getRoot<typename TestCase::Response>(), expected)) {
|
throw std::logic_error("Incorrect response.");
|
}
|
}
|
}
|
|
return output.throughput;
|
}
|
|
static uint64_t asyncClientSender(
|
int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
|
uint64_t iters) {
|
CountingOutputStream output(outputFd);
|
typename ReuseStrategy::ScratchSpace scratch;
|
|
for (; iters > 0; --iters) {
|
typename ReuseStrategy::MessageBuilder builder(scratch);
|
expectations->post(TestCase::setupRequest(
|
builder.template initRoot<typename TestCase::Request>()));
|
Compression::write(output, builder);
|
}
|
|
return output.throughput;
|
}
|
|
static void asyncClientReceiver(
|
int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
|
uint64_t iters) {
|
kj::FdInputStream inputStream(inputFd);
|
typename Compression::BufferedInput bufferedInput(inputStream);
|
|
typename ReuseStrategy::ScratchSpace scratch;
|
|
for (; iters > 0; --iters) {
|
typename TestCase::Expectation expected = expectations->next();
|
typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
|
if (!TestCase::checkResponse(
|
reader.template getRoot<typename TestCase::Response>(), expected)) {
|
throw std::logic_error("Incorrect response.");
|
}
|
}
|
}
|
|
static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
|
ProducerConsumerQueue<typename TestCase::Expectation> expectations;
|
std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
|
uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
|
receiverThread.join();
|
return throughput;
|
}
|
|
static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
|
kj::FdInputStream inputStream(inputFd);
|
typename Compression::BufferedInput bufferedInput(inputStream);
|
|
CountingOutputStream output(outputFd);
|
typename ReuseStrategy::ScratchSpace builderScratch;
|
typename ReuseStrategy::ScratchSpace readerScratch;
|
|
for (; iters > 0; --iters) {
|
typename ReuseStrategy::MessageBuilder builder(builderScratch);
|
typename ReuseStrategy::template MessageReader<Compression> reader(
|
bufferedInput, readerScratch);
|
TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
|
builder.template initRoot<typename TestCase::Response>());
|
Compression::write(output, builder);
|
}
|
|
return output.throughput;
|
}
|
|
static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
|
typename ReuseStrategy::ScratchSpace requestScratch;
|
typename ReuseStrategy::ScratchSpace responseScratch;
|
|
typename ReuseStrategy::ObjectSizeCounter counter(iters);
|
|
for (; iters > 0; --iters) {
|
typename ReuseStrategy::MessageBuilder requestMessage(requestScratch);
|
auto request = requestMessage.template initRoot<typename TestCase::Request>();
|
typename TestCase::Expectation expected = TestCase::setupRequest(request);
|
|
typename ReuseStrategy::MessageBuilder responseMessage(responseScratch);
|
auto response = responseMessage.template initRoot<typename TestCase::Response>();
|
TestCase::handleRequest(request.asReader(), response);
|
|
if (!TestCase::checkResponse(response.asReader(), expected)) {
|
throw std::logic_error("Incorrect response.");
|
}
|
|
if (countObjectSize) {
|
counter.add(requestMessage, responseMessage);
|
}
|
}
|
|
return counter.get();
|
}
|
|
static uint64_t passByBytes(uint64_t iters) {
|
uint64_t throughput = 0;
|
typename ReuseStrategy::ScratchSpace clientRequestScratch;
|
UseScratch::ScratchSpace requestBytesScratch;
|
typename ReuseStrategy::ScratchSpace serverRequestScratch;
|
typename ReuseStrategy::ScratchSpace serverResponseScratch;
|
UseScratch::ScratchSpace responseBytesScratch;
|
typename ReuseStrategy::ScratchSpace clientResponseScratch;
|
|
for (; iters > 0; --iters) {
|
typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch);
|
typename TestCase::Expectation expected = TestCase::setupRequest(
|
requestBuilder.template initRoot<typename TestCase::Request>());
|
|
kj::ArrayOutputStream requestOutput(kj::arrayPtr(
|
reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word)));
|
Compression::write(requestOutput, requestBuilder);
|
throughput += requestOutput.getArray().size();
|
typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader(
|
requestOutput.getArray(), serverRequestScratch);
|
|
typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch);
|
TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
|
responseBuilder.template initRoot<typename TestCase::Response>());
|
|
kj::ArrayOutputStream responseOutput(
|
kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words),
|
SCRATCH_SIZE * sizeof(word)));
|
Compression::write(responseOutput, responseBuilder);
|
throughput += responseOutput.getArray().size();
|
typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader(
|
responseOutput.getArray(), clientResponseScratch);
|
|
if (!TestCase::checkResponse(
|
responseReader.template getRoot<typename TestCase::Response>(), expected)) {
|
throw std::logic_error("Incorrect response.");
|
}
|
}
|
|
return throughput;
|
}
|
};
|
|
struct BenchmarkTypes {
|
typedef capnp::Uncompressed Uncompressed;
|
typedef capnp::Packed Packed;
|
#if HAVE_SNAPPY
|
typedef capnp::SnappyCompressed SnappyCompressed;
|
#endif // HAVE_SNAPPY
|
|
typedef capnp::UseScratch ReusableResources;
|
typedef capnp::NoScratch SingleUseResources;
|
|
template <typename TestCase, typename ReuseStrategy, typename Compression>
|
struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
|
};
|
|
} // namespace capnp
|
} // namespace benchmark
|
} // namespace capnp
|
|
#endif // CAPNP_BENCHMARK_CAPNP_COMMON_H_
|