Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fdbclient/DatabaseContext.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
std::vector<Future<Void>> replies;
replies.reserve(oldProxies.size());
GetReadVersionRequest req(
span.context, 1, TransactionPriority::IMMEDIATE, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY);
span.context, 1, TransactionPriority::IMMEDIATE, invalidVersion, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY);
TraceEvent evt("AttemptGRVFromOldProxies");
evt.detail("NumOldProxies", oldProxies.size()).detail("NumNewProxies", newProxies.size());
auto traceProxies = [&](std::vector<GrvProxyInterface>& proxies, std::string const& key) {
Expand Down
132 changes: 6 additions & 126 deletions fdbclient/include/fdbclient/CommitProxyInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
#include "fdbclient/GrvProxyInterface.h"
#include "fdbclient/IdempotencyId.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/VersionVector.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"

struct CommitProxyInterface {
constexpr static FileIdentifier file_identifier = 8954922;
Expand All @@ -44,11 +40,10 @@ struct CommitProxyInterface {
Optional<Key> processId;
bool provisional;
PublicRequestStream<struct CommitTransactionRequest> commit;
PublicRequestStream<struct GetReadVersionRequest>
getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported
// committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is
// received, the latest version reported committed)
// Reserved to preserve the historical adjusted-endpoint numbering for the
// commit proxy interface. Commit proxies do not serve GRV traffic; clients
// use GrvProxyInterface::getConsistentReadVersion for that path.
PublicRequestStream<struct GetReadVersionRequest> legacyGetConsistentReadVersion;
PublicRequestStream<struct GetKeyServerLocationsRequest> getKeyServersLocations;
RequestStream<struct GetStorageServerRejoinInfoRequest> getStorageServerRejoinInfo;

Expand All @@ -73,7 +68,7 @@ struct CommitProxyInterface {
void serialize(Archive& ar) {
serializer(ar, processId, provisional, commit);
if (Archive::isDeserializing) {
getConsistentReadVersion =
legacyGetConsistentReadVersion =
PublicRequestStream<struct GetReadVersionRequest>(commit.getEndpoint().getAdjustedEndpoint(1));
getKeyServersLocations =
PublicRequestStream<struct GetKeyServerLocationsRequest>(commit.getEndpoint().getAdjustedEndpoint(2));
Expand All @@ -97,7 +92,7 @@ struct CommitProxyInterface {
void initEndpoints() {
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(commit.getReceiver(TaskPriority::ReadSocket));
streams.push_back(getConsistentReadVersion.getReceiver(TaskPriority::ReadSocket));
streams.push_back(legacyGetConsistentReadVersion.getReceiver(TaskPriority::ReadSocket));
streams.push_back(getKeyServersLocations.getReceiver(
TaskPriority::ReadSocket)); // priority lowered to TaskPriority::DefaultEndpoint on the proxy
streams.push_back(getStorageServerRejoinInfo.getReceiver(TaskPriority::ProxyStorageRejoin));
Expand Down Expand Up @@ -228,121 +223,6 @@ static inline int getBytes(CommitTransactionRequest const& r) {
return total;
}

struct GetReadVersionReply : public BasicLoadBalancedReply {
constexpr static FileIdentifier file_identifier = 15709388;
Version version;
bool locked;
Optional<Value> metadataVersion;
int64_t midShardSize = 0;
bool rkDefaultThrottled = false;
bool rkBatchThrottled = false;

TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
double proxyTagThrottledDuration{ 0.0 };

VersionVector ssVersionVectorDelta;
UID proxyId; // GRV proxy ID to detect old GRV proxies at client side

GetReadVersionReply() : version(invalidVersion), locked(false) {}

template <class Ar>
void serialize(Ar& ar) {
serializer(ar,
BasicLoadBalancedReply::processBusyTime,
version,
locked,
metadataVersion,
tagThrottleInfo,
midShardSize,
rkDefaultThrottled,
rkBatchThrottled,
ssVersionVectorDelta,
proxyId,
proxyTagThrottledDuration);
}
};

struct GetReadVersionRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 838566;
enum {
PRIORITY_SYSTEM_IMMEDIATE =
15 << 24, // Highest possible priority, always executed even if writes are otherwise blocked
PRIORITY_DEFAULT = 8 << 24,
PRIORITY_BATCH = 1 << 24
};
enum {
FLAG_USE_MIN_KNOWN_COMMITTED_VERSION = 4,
FLAG_USE_PROVISIONAL_PROXIES = 2,
FLAG_CAUSAL_READ_RISKY = 1,
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};

SpanContext spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;

TransactionTagMap<uint32_t> tags;
// Not serialized, because this field does not need to be sent to master.
// It is used for reporting to clients the amount of time spent delayed by
// the TagQueue
double proxyTagThrottledDuration{ 0.0 };

Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;

Version maxVersion; // max version in the client's version vector cache

GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {}
GetReadVersionRequest(SpanContext spanContext,
uint32_t transactionCount,
TransactionPriority priority,
Version maxVersion,
uint32_t flags = 0,
TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority), tags(tags),
debugID(debugID), maxVersion(maxVersion) {
flags = flags & ~FLAG_PRIORITY_MASK;
switch (priority) {
case TransactionPriority::BATCH:
flags |= PRIORITY_BATCH;
break;
case TransactionPriority::DEFAULT:
flags |= PRIORITY_DEFAULT;
break;
case TransactionPriority::IMMEDIATE:
flags |= PRIORITY_SYSTEM_IMMEDIATE;
break;
default:
ASSERT(false);
}
}

bool verify() const { return true; }

bool operator<(GetReadVersionRequest const& rhs) const { return priority < rhs.priority; }

bool isTagged() const { return !tags.empty(); }

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, maxVersion);

if (ar.isDeserializing) {
if ((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
priority = TransactionPriority::IMMEDIATE;
} else if ((flags & PRIORITY_DEFAULT) == PRIORITY_DEFAULT) {
priority = TransactionPriority::DEFAULT;
} else if ((flags & PRIORITY_BATCH) == PRIORITY_BATCH) {
priority = TransactionPriority::BATCH;
} else {
priority = TransactionPriority::DEFAULT;
}
}
}
};

struct GetKeyServerLocationsReply {
constexpr static FileIdentifier file_identifier = 10636023;
Arena arena;
Expand Down
120 changes: 120 additions & 0 deletions fdbclient/include/fdbclient/GrvProxyInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,130 @@
#ifndef FDBCLIENT_GRVPROXYINTERFACE_H
#define FDBCLIENT_GRVPROXYINTERFACE_H
#pragma once
#include "fdbclient/TagThrottle.h"
#include "fdbclient/VersionVector.h"
#include "flow/FileIdentifier.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.actor.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbclient/FDBTypes.h"

struct GetReadVersionReply : public BasicLoadBalancedReply {
constexpr static FileIdentifier file_identifier = 15709388;
Version version;
bool locked;
Optional<Value> metadataVersion;
int64_t midShardSize = 0;
bool rkDefaultThrottled = false;
bool rkBatchThrottled = false;

TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
double proxyTagThrottledDuration{ 0.0 };

VersionVector ssVersionVectorDelta;
UID proxyId; // GRV proxy ID to detect old GRV proxies at client side

GetReadVersionReply() : version(invalidVersion), locked(false) {}

template <class Ar>
void serialize(Ar& ar) {
serializer(ar,
BasicLoadBalancedReply::processBusyTime,
version,
locked,
metadataVersion,
tagThrottleInfo,
midShardSize,
rkDefaultThrottled,
rkBatchThrottled,
ssVersionVectorDelta,
proxyId,
proxyTagThrottledDuration);
}
};

struct GetReadVersionRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 838566;
enum {
PRIORITY_SYSTEM_IMMEDIATE =
15 << 24, // Highest possible priority, always executed even if writes are otherwise blocked
PRIORITY_DEFAULT = 8 << 24,
PRIORITY_BATCH = 1 << 24
};
enum {
FLAG_USE_MIN_KNOWN_COMMITTED_VERSION = 4,
FLAG_USE_PROVISIONAL_PROXIES = 2,
FLAG_CAUSAL_READ_RISKY = 1,
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};

SpanContext spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;

TransactionTagMap<uint32_t> tags;
// Not serialized, because this field does not need to be sent to master.
// It is used for reporting to clients the amount of time spent delayed by
// the TagQueue
double proxyTagThrottledDuration{ 0.0 };

Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;

Version maxVersion; // max version in the client's version vector cache

GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {}
GetReadVersionRequest(SpanContext spanContext,
uint32_t transactionCount,
TransactionPriority priority,
Version maxVersion,
uint32_t flags = 0,
TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority), tags(tags),
debugID(debugID), maxVersion(maxVersion) {
this->flags &= ~FLAG_PRIORITY_MASK;
switch (priority) {
case TransactionPriority::BATCH:
this->flags |= PRIORITY_BATCH;
break;
case TransactionPriority::DEFAULT:
this->flags |= PRIORITY_DEFAULT;
break;
case TransactionPriority::IMMEDIATE:
this->flags |= PRIORITY_SYSTEM_IMMEDIATE;
break;
default:
ASSERT(false);
}
}

bool verify() const { return true; }

bool operator<(GetReadVersionRequest const& rhs) const { return priority < rhs.priority; }

bool isTagged() const { return !tags.empty(); }

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, maxVersion);

if (ar.isDeserializing) {
if ((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
priority = TransactionPriority::IMMEDIATE;
} else if ((flags & PRIORITY_DEFAULT) == PRIORITY_DEFAULT) {
priority = TransactionPriority::DEFAULT;
} else if ((flags & PRIORITY_BATCH) == PRIORITY_BATCH) {
priority = TransactionPriority::BATCH;
} else {
priority = TransactionPriority::DEFAULT;
}
}
}
};

// GrvProxy is proxy primarily specializing on serving GetReadVersion. It also
// serves health metrics since it communicates with RateKeeper to gather health
// information of the cluster, and handles proxied GlobalConfig requests.
Expand Down
1 change: 0 additions & 1 deletion fdbserver/commitproxy/CommitProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,6 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
uint16_t commitProxyIndex) {
state ProxyCommitData commitData(proxy.id(),
master,
proxy.getConsistentReadVersion,
recoveryTransactionVersion,
proxy.commit,
db,
Expand Down
5 changes: 1 addition & 4 deletions fdbserver/commitproxy/ProxyCommitData.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ struct ProxyCommitData {
NotifiedVersion latestLocalCommitBatchResolving;
NotifiedVersion latestLocalCommitBatchLogging;

PublicRequestStream<GetReadVersionRequest> getConsistentReadVersion;
PublicRequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo> const> db;
Expand Down Expand Up @@ -306,7 +305,6 @@ struct ProxyCommitData {

ProxyCommitData(UID dbgid,
MasterInterface master,
PublicRequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion,
PublicRequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo> const> db,
Expand All @@ -320,8 +318,7 @@ struct ProxyCommitData {
lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0), firstProxy(firstProxy),
provisional(provisional), lastCoalesceTime(0), locked(false),
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), localCommitBatchesStarted(0),
getConsistentReadVersion(getConsistentReadVersion), commit(commit),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
commit(commit), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
singleKeyMutationEvent("SingleKeyMutation"_sr), lastTxsPop(0), popRemoteTxs(false), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0), lastMasterReset(now()),
lastResolverReset(now()), commitProxyIndex(commitProxyIndex),
Expand Down
1 change: 0 additions & 1 deletion fdbserver/worker/worker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2754,7 +2754,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
startRole(Role::COMMIT_PROXY, recruited.id(), interf.id(), details);

DUMPTOKEN(recruited.commit);
DUMPTOKEN(recruited.getConsistentReadVersion);
DUMPTOKEN(recruited.getKeyServersLocations);
DUMPTOKEN(recruited.getStorageServerRejoinInfo);
DUMPTOKEN(recruited.waitFailure);
Expand Down