Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Prev Previous commit
Next Next commit
Refactoring
  • Loading branch information
ankiaga committed Dec 7, 2023
commit 03f42a2888a6d3fd980d9e09de09729d8c31a2f5
69 changes: 32 additions & 37 deletions google/cloud/spanner_dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,34 +242,24 @@ def execute(self, sql, args=None):

try:
parsed_statement = parse_utils.classify_statement(sql)

if parsed_statement.statement_type == StatementType.CLIENT_SIDE:
self._result_set = client_side_statement_executor.execute(
self.connection, parsed_statement
)
if self._result_set is not None:
self._itr = PeekIterator(self._result_set)
return

if self.connection.read_only:
elif self.connection.read_only or (
not self.connection.client_transaction_started
and parsed_statement.statement_type == StatementType.QUERY
):
self._handle_DQL(sql, args or None)
return
if parsed_statement.statement_type == StatementType.DDL:
elif parsed_statement.statement_type == StatementType.DDL:
self._batch_DDLs(sql)
if not self.connection.client_transaction_started:
self.connection.run_prior_DDL_statements()
return

# For every other operation, we've got to ensure that
# any prior DDL statements were run.
self.connection.run_prior_DDL_statements()
if parsed_statement.statement_type == StatementType.UPDATE:
sql = parse_utils.ensure_where_clause(sql)
sql, args = sql_pyformat_args_to_spanner(sql, args or None)

if self.connection.client_transaction_started:
self._execute_statement_in_non_autocommit_mode(sql, args)
else:
self._execute_statement_in_autocommit_mode(sql, args, parsed_statement)
self._execute_in_rw_transaction(parsed_statement, sql, args)

except (AlreadyExists, FailedPrecondition, OutOfRange) as e:
raise IntegrityError(getattr(e, "details", e)) from e
Expand All @@ -281,28 +271,33 @@ def execute(self, sql, args=None):
if self.connection.client_transaction_started is False:
self.connection._spanner_transaction_started = False

def _execute_statement_in_non_autocommit_mode(self, sql, args):
statement = Statement(
sql,
args,
get_param_types(args or None),
ResultsChecksum(),
)
def _execute_in_rw_transaction(self, parsed_statement, sql, args):
# For every other operation, we've got to ensure that
# any prior DDL statements were run.
self.connection.run_prior_DDL_statements()
if parsed_statement.statement_type == StatementType.UPDATE:
sql = parse_utils.ensure_where_clause(sql)
sql, args = sql_pyformat_args_to_spanner(sql, args or None)

(
self._result_set,
self._checksum,
) = self.connection.run_statement(statement)
while True:
try:
self._itr = PeekIterator(self._result_set)
break
except Aborted:
self.connection.retry_transaction()
if self.connection.client_transaction_started:
statement = Statement(
sql,
args,
get_param_types(args or None),
ResultsChecksum(),
)

(
self._result_set,
self._checksum,
) = self.connection.run_statement(statement)

def _execute_statement_in_autocommit_mode(self, sql, args, parsed_statement):
if parsed_statement.statement_type == StatementType.QUERY:
self._handle_DQL(sql, args or None)
while True:
try:
self._itr = PeekIterator(self._result_set)
break
except Aborted:
self.connection.retry_transaction()
else:
self.connection.database.run_in_transaction(
self._do_execute_update_in_autocommit,
Expand Down