-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdescriptor_db.cpp
153 lines (124 loc) · 4.83 KB
/
descriptor_db.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include "descriptor_db.hpp"
#include "postgres_protobuf_common.hpp"
#include "postgres_utils.hpp"
#include <cstring>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/util/type_resolver_util.h>
extern "C" {
// Must be included before other Postgres headers
#include <postgres.h>
#include <executor/spi.h>
#include <funcapi.h>
}
using namespace postgres_protobuf::postgres_utils;
namespace postgres_protobuf {
namespace descriptor_db {
const std::shared_ptr<DescDb>& DescDb::GetOrCreateCached() {
if (cached_ != nullptr) {
return cached_;
}
MemoryContext outer_mctx = CurrentMemoryContext;
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed")));
}
const char* sql =
"SELECT name, file_descriptor_set "
"FROM protobuf_file_descriptor_sets";
int status = SPI_execute(sql, true, 0);
if (status != SPI_OK_SELECT) {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("SPI_execute failed: %s", SPI_result_code_string(status))));
}
// Read all rows before allocating anything on the C++ heap.
// We do this because there may be a Postgres error while reading rows.
// We allocate the rows in the memory context of the caller, which outlasts
// `SPI_finish`. (We do this after `SPI_execute` because `SPI_execute` would
// switch the context back. `SPI_getvalue` and `SPI_getbinval` don't do that)
MemoryContextSwitchTo(outer_mctx);
pvector<std::tuple<pstring, pstring>> rows;
rows.reserve(SPI_processed);
TupleDesc tupdesc = SPI_tuptable->tupdesc;
for (uint64 i = 0; i < SPI_processed; ++i) {
HeapTuple row = SPI_tuptable->vals[i];
char* name_cstr = SPI_getvalue(row, tupdesc, 1);
pstring name(name_cstr);
pfree(name_cstr);
name_cstr = nullptr;
bool isnull;
bytea* fds_binary = (bytea*)PG_DETOAST_DATUM_PACKED(
SPI_getbinval(row, tupdesc, 2, &isnull));
if (isnull) {
ereport(WARNING,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Didn't expect postgres_protobuf_file_descriptor_sets to "
"contain nulls")));
continue;
}
pstring fds =
pstring(VARDATA_ANY(fds_binary), VARSIZE_ANY_EXHDR(fds_binary));
rows.emplace_back(std::move(name), std::move(fds));
}
if (SPI_finish() != SPI_OK_FINISH) {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_finish failed")));
}
// We start creating RAII objects that allocate on the C++ heap now.
// No more Postgres operations, which may throw Postgres exceptions,
// are allowed in this block.
{
std::unordered_map<std::string, std::unique_ptr<DescSet>> desc_sets;
for (const auto& row : rows) {
pb::FileDescriptorSet fds;
const pstring& name = std::get<0>(row);
const pstring& fds_data = std::get<1>(row);
if (!fds.ParseFromArray(fds_data.data(), fds_data.size())) {
throw BadProto("failed to parse FileDescriptorSet");
}
std::unique_ptr<DescSet>& desc_set = desc_sets[std::string(name)];
if (desc_set == nullptr) {
desc_set = std::make_unique<DescSet>();
}
while (fds.file_size() > 0) {
auto* fd = fds.mutable_file()->ReleaseLast();
desc_set->desc_db->AddAndOwn(fd);
}
}
cached_ = std::shared_ptr<DescDb>(new DescDb(std::move(desc_sets)));
}
// TODO: can we safely make the cache outlive the transaction?
// We'd need an efficient way to check whether the source table
// has changed.
// These posts suggest checking `xmin` and `ctid` on all rows:
// https://www.postgresql.org/message-id/[email protected]
// https://www.postgresql.org/message-id/[email protected]
MemoryContextCallback* callback =
static_cast<MemoryContextCallback*>(MemoryContextAllocExtended(
CurTransactionContext, sizeof(MemoryContextCallback),
MCXT_ALLOC_ZERO | MCXT_ALLOC_NO_OOM));
if (callback == nullptr) {
throw std::bad_alloc();
}
memset(callback, 0, sizeof(*callback));
callback->func = &DescDb::ClearCacheCallback;
callback->arg = nullptr;
MemoryContextRegisterResetCallback(CurTransactionContext, callback);
PGPROTO_DEBUG("DescDb cache rebuilt");
return cached_;
}
void DescDb::ClearCache() {
cached_.reset();
}
DescDb::DescDb(
std::unordered_map<std::string, std::unique_ptr<DescSet>> desc_sets)
: desc_sets(std::move(desc_sets)) {}
std::shared_ptr<DescDb> DescDb::cached_;
void DescDb::ClearCacheCallback(void*) { ClearCache(); }
DescSet::DescSet()
: desc_db(std::make_unique<pb::SimpleDescriptorDatabase>()),
pool(std::make_unique<pb::DescriptorPool>(desc_db.get())),
type_resolver(pb::util::NewTypeResolverForDescriptorPool(
"type.googleapis.com", pool.get())) {}
} // namespace descriptor_db
} // namespace postgres_protobuf