-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathpgadapter.py
133 lines (117 loc) · 4.78 KB
/
pgadapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility for starting and stopping PGAdapter in an embedded container
Defines functions for starting and stopping PGAdapter in an embedded Docker
container. Requires that Docker is installed on the local system.
"""
import io
import json
import os
import socket
import time
import google.auth
import google.oauth2.credentials
import google.oauth2.service_account
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
# Global variables for the in-process PGAdapter instance.
in_process_pgadapter_host = None
in_process_pgadapter_port = None
def start_pgadapter(project: str,
instance: str,
emulator: bool = False,
credentials: str = None) -> (DockerContainer, str):
"""Starts PGAdapter in an embedded Docker container
Starts PGAdapter in an embedded Docker container and returns the TCP port
number where PGAdapter is listening for incoming connections. You can Use any
standard PostgreSQL driver to connect to this port.
Parameters
----------
project : str
The Google Cloud project that PGAdapter should connect to.
instance : str
The Cloud Spanner instance that PGAdapter should connect to.
emulator: bool
Whether PGAdapter should connect to the Cloud Spanner emulator or real
Cloud Spanner.
credentials : str or None
The credentials file that PGAdapter should use. If None, then this
function will try to load the default credentials from the environment.
Returns
-------
container, port : tuple[DockerContainer, str]
The Docker container running PGAdapter and
the port where PGAdapter is listening. Connect to this port on localhost
with a standard PostgreSQL driver to connect to Cloud Spanner.
"""
if emulator:
# Start PGAdapter with the Cloud Spanner emulator in a Docker container
container =(
DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator")
.with_exposed_ports(5432)
.with_command("-p " + project + " -i " + instance))
container.start()
else:
# Start PGAdapter in a Docker container
container = DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter") \
.with_exposed_ports(5432) \
.with_command(" -p " + project
+ " -i " + instance
+ " -x -c /credentials.json")
container.start()
# Determine the credentials that should be used by PGAdapter and write these
# to a file in the container.
credentials_info = _determine_credentials(credentials)
container.exec("sh -c 'cat <<EOT >> /credentials.json\n"
+ json.dumps(credentials_info, indent=0)
+ "\nEOT'")
# Wait until PGAdapter has started and is listening on the exposed port.
wait_for_logs(container, "PostgreSQL version:")
port = container.get_exposed_port("5432")
_wait_for_port(port=int(port))
global in_process_pgadapter_host
global in_process_pgadapter_port
in_process_pgadapter_port = port
in_process_pgadapter_host = "localhost"
return container, port
def _determine_credentials(credentials: str):
if credentials is None:
explicit_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
else:
explicit_file = credentials
if explicit_file is None:
credentials, _ = google.auth.default()
if type(credentials).__name__ == \
google.oauth2.credentials.Credentials.__name__:
info = json.loads(credentials.to_json())
info["type"] = "authorized_user"
else:
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS has not been set "
"and no explicit credentials were supplied")
else:
with io.open(explicit_file, "r") as file_obj:
info = json.load(file_obj)
return info
def _wait_for_port(port: int, poll_interval: float = 0.1, timeout: float = 5.0):
start = time.time()
while True:
try:
with socket.create_connection(("localhost", port), timeout=timeout):
break
except OSError:
duration = time.time() - start
if timeout and duration > timeout:
raise TimeoutError("container did not listen on port {} in {} seconds"
.format(port, timeout))
time.sleep(poll_interval)