generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathMySQLDjangoReadWriteSplitting.py
More file actions
345 lines (269 loc) · 11.3 KB
/
MySQLDjangoReadWriteSplitting.py
File metadata and controls
345 lines (269 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Django ORM Read/Write Splitting Example with AWS Advanced Python Wrapper
This example demonstrates how to use the AWS Advanced Python Wrapper with Django ORM
to leverage Aurora features like failover handling and read/write splitting with
internal connection pooling.
"""
from typing import TYPE_CHECKING, Any, Dict
import django
from django.conf import settings
from django.db import connection, models
from aws_advanced_python_wrapper import release_resources
from aws_advanced_python_wrapper.connection_provider import \
ConnectionProviderManager
from aws_advanced_python_wrapper.errors import (
FailoverFailedError, FailoverSuccessError,
TransactionResolutionUnknownError)
from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \
SqlAlchemyPooledConnectionProvider
if TYPE_CHECKING:
from aws_advanced_python_wrapper.hostinfo import HostInfo
def configure_pool(host_info: "HostInfo", props: Dict[str, Any]) -> Dict[str, Any]:
"""Configure connection pool settings for each host."""
return {"pool_size": 5}
def get_pool_key(host_info: "HostInfo", props: Dict[str, Any]) -> str:
"""
Generate a unique key for connection pooling.
Include the URL, user, and database in the connection pool key so that a new
connection pool will be opened for each different instance-user-database combination.
"""
url = host_info.url
user = props.get("user", "")
db = props.get("database", "")
return f"{url}{user}{db}"
# Database connection configuration
DB_CONFIG = {
'ENGINE': 'aws_advanced_python_wrapper.django.backends.mysql_connector',
'NAME': 'test_db',
'USER': 'admin',
'PASSWORD': 'password',
'HOST': 'database.cluster-xyz.us-east-1.rds.amazonaws.com',
'PORT': 3306,
}
# Django settings configuration
DJANGO_SETTINGS = {
'DATABASES': {
'default': { # Writer connection
**DB_CONFIG,
'OPTIONS': {
'plugins': 'read_write_splitting,failover',
'connect_timeout': 10,
'autocommit': True,
},
},
'read': { # Reader connection
**DB_CONFIG,
'OPTIONS': {
'plugins': 'read_write_splitting,failover',
'connect_timeout': 10,
'autocommit': True,
'read_only': True, # This connection will use reader instances
'reader_host_selector_strategy': 'least_connections',
},
},
},
'DATABASE_ROUTERS': ['__main__.ReadWriteRouter'],
}
# Database Router for Read/Write Splitting
class ReadWriteRouter:
"""
A router to control database operations for read/write splitting.
"""
def db_for_read(self, model, **hints):
"""
Direct all read operations to the 'read' database.
"""
return 'read'
def db_for_write(self, model, **hints):
"""
Direct all write operations to the 'default' database.
"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""
Allow relations between objects in the same database.
"""
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""
Allow migrations on all databases.
"""
return True
# Configure Django settings
if not settings.configured:
settings.configure(**DJANGO_SETTINGS)
django.setup()
class BankAccount(models.Model):
"""Example model for demonstrating read/write splitting."""
name: str = models.CharField(max_length=100) # type: ignore[assignment]
account_balance: int = models.IntegerField() # type: ignore[assignment]
class Meta:
app_label = 'myapp'
db_table = 'bank_accounts'
def __str__(self) -> str:
return f"{self.name}: ${self.account_balance}"
def execute_query_with_failover_handling(query_func):
"""
Execute a Django ORM query with failover error handling.
Args:
query_func: A callable that executes the desired query
Returns:
The result of the query function
"""
try:
return query_func()
except FailoverSuccessError:
# Query execution failed and AWS Advanced Python Wrapper successfully failed over to an available instance.
# https://github.com/aws/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#failoversuccesserror
# The connection has been re-established. Retry the query.
print("Failover successful! Retrying query...")
# Retry the query
return query_func()
except FailoverFailedError as e:
# Failover failed. The application should open a new connection,
# check the results of the failed transaction and re-run it if needed.
# https://github.com/aws/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#failoverfailederror
print(f"Failover failed: {e}")
print("Application should open a new connection and retry the transaction.")
raise e
except TransactionResolutionUnknownError as e:
# The transaction state is unknown. The application should check the status
# of the failed transaction and restart it if needed.
# https://github.com/aws/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#transactionresolutionunknownerror
print(f"Transaction resolution unknown: {e}")
print("Application should check transaction status and retry if needed.")
raise e
def create_table():
"""Create the database table with failover handling."""
def _create():
with connection.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS bank_accounts (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
account_balance INT
)
""")
print("Table created successfully")
execute_query_with_failover_handling(_create)
def drop_table():
"""Drop the database table with failover handling."""
def _drop():
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS bank_accounts")
print("Table dropped successfully")
execute_query_with_failover_handling(_drop)
def demonstrate_write_operations():
"""Demonstrate write operations with failover handling (uses 'default' database - writer instance)."""
print("\n--- Write Operations (Writer Instance) ---")
# Create new records with failover handling
def _create1():
account = BankAccount.objects.create(name="Jane Doe", account_balance=1000)
print(f"Created: {account}")
return account
def _create2():
account = BankAccount.objects.create(name="John Smith", account_balance=1500)
print(f"Created: {account}")
return account
account1 = execute_query_with_failover_handling(_create1)
execute_query_with_failover_handling(_create2)
# Update a record with failover handling
def _update():
account1.account_balance = 1200
account1.save()
print(f"Updated: {account1}")
return account1
execute_query_with_failover_handling(_update)
def demonstrate_read_operations():
"""Demonstrate read operations with failover handling (uses 'read' database - reader instance)."""
print("\n--- Read Operations (Reader Instance) ---")
# Query all records with failover handling
def _query_all():
accounts = list(BankAccount.objects.all())
print(f"Total accounts: {len(accounts)}")
for account in accounts:
print(f" {account}")
return accounts
execute_query_with_failover_handling(_query_all)
# Filter records with failover handling
def _query_filtered():
high_balance = list(BankAccount.objects.filter(account_balance__gte=1200))
print("\nAccounts with balance >= $1200:")
for account in high_balance:
print(f" {account}")
return high_balance
execute_query_with_failover_handling(_query_filtered)
def demonstrate_explicit_database_selection():
"""Demonstrate explicitly selecting which database to use with failover handling."""
print("\n--- Explicit Database Selection ---")
# Force read from writer database with failover handling
def _read_from_writer():
print("Reading from writer (default) database:")
accounts = list(BankAccount.objects.using('default').all())
for account in accounts:
print(f" {account}")
return accounts
execute_query_with_failover_handling(_read_from_writer)
# Force read from reader database with failover handling
def _read_from_reader():
print("\nReading from reader database:")
accounts = list(BankAccount.objects.using('read').all())
for account in accounts:
print(f" {account}")
return accounts
execute_query_with_failover_handling(_read_from_reader)
def demonstrate_raw_sql():
"""Demonstrate raw SQL queries with Django and failover handling."""
print("\n--- Raw SQL Queries ---")
# Execute raw SQL query with failover handling
def _raw_query():
accounts = list(BankAccount.objects.raw('SELECT * FROM bank_accounts WHERE account_balance > %s', [1000]))
print("Accounts with balance > $1000:")
for account in accounts:
print(f" {account}")
return accounts
execute_query_with_failover_handling(_raw_query)
if __name__ == "__main__":
# Configure read/write splitting to use internal connection pools.
provider = SqlAlchemyPooledConnectionProvider(configure_pool, get_pool_key)
ConnectionProviderManager.set_connection_provider(provider)
try:
print("Django ORM Read/Write Splitting Example with AWS Advanced Python Wrapper")
print("=" * 60)
# Create table
create_table()
# Demonstrate write operations (uses writer instance)
demonstrate_write_operations()
# Demonstrate read operations (uses reader instance)
demonstrate_read_operations()
# Demonstrate explicit database selection
demonstrate_explicit_database_selection()
# Demonstrate raw SQL
demonstrate_raw_sql()
# Cleanup
print("\n--- Cleanup ---")
drop_table()
print("\n" + "=" * 60)
print("Example completed successfully!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
# Clean up connection pools
ConnectionProviderManager.release_resources()
# Clean up AWS Advanced Python Wrapper resources
release_resources()