10000 Context manager to acquire Postgres advisory locks by guewen · Pull Request #138 · OCA/connector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Context manager to acquire Postgres advisory locks #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files. 8000
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
#
##############################################################################

import hashlib
import logging
import struct

from contextlib import contextmanager
from openerp import models, fields

from .deprecate import log_deprecate, DeprecatedClass
from .exception import RetryableJobError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -227,6 +231,33 @@ def get_binder_for_model(self, model=None):
log_deprecate('renamed to binder_for()')
return self.binder_for(model=model)

@contextmanager
def try_advisory_lock(self, lock, retry_seconds=1):
""" Context manager, tries to acquire a Postgres transactional
advisory lock.

If the lock cannot be acquired, it raises a
``RetryableJobError`` so the jobs is retried after n
``retry_seconds``.

See :func:``openerp.addons.connector.connector.pg_try_advisory_lock``
for details.

:param lock: The lock name. Can be anything convertible to a
string. It needs to represents what should not be synchronized
concurrently so usually the string will contain at least: the
action, the backend type, the backend id, the model name, the
external id
:param retry_seconds: number of seconds after which a job should
be retried when the lock cannot be acquired.
"""
if pg_try_advisory_lock(self.env, lock):
yield
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrospectively I wonder if the context manager is a good choice as the lock is acquired until the end of the transaction anyway... Maybe the following is more clear?

def try_advisory_lock(self, lock, retry_seconds=1):
    if not pg_try_advisory_lock(self.env, lock):
        raise RetryableJobError('Could not acquire advisory lock',...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrospectively I wonder if the context manager is a good choice as the lock is acquired until the end of the transaction anyway... Maybe the following is more clear?

@lmignon @damdam-s @pedrobaeza opinions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it can be clearer, but it isn't also very confused this way. What you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @guewen ! The expected behavior of the contextmanager is to 'scope' the changes to the context to the code enclosed by the with statement. In this case the lock is acquired until the end of the transaction not the end of the enclosed lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But does it matter? They both end at the same more or less, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly what I thought. I'll make a PR tomorrow.

else:
raise RetryableJobError('Could not acquire advisory lock',
seconds=retry_seconds,
ignore_retry=True)


class ConnectorEnvironment(object):
""" Environment used by the different units for the synchronization.
Expand Down Expand Up @@ -466,3 +497,74 @@ def unwrap_model(self):
'Cannot unwrap model %s, because it has no %s fields'
% (self.model._name, self._openerp_field))
return column.comodel_name


def pg_try_advisory_lock(env, lock):
""" Try to acquire a Postgres transactional advisory lock.

The function tries to acquire a lock, returns a boolean indicating
if it could be obtained or not. An acquired lock is released at the
end of the transaction.

A typical use is to acquire a lock at the beginning of an importer
to prevent 2 jobs to do the same import at the same time. Since the
record doesn't exist yet, we can't put a lock on a record, so we put
an advisory lock.

Example:
- Job 1 imports Partner A
- Job 2 imports Partner B
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency
- Job 2 import category X as a dependency

Since both jobs are executed concurrently, they both create a record
for category X so we have duplicated records. With this lock:

- Job 1 imports Partner A, it acquires a lock for this partner
- Job 2 imports Partner B, it acquires a lock for this partner
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency, it acquires a lock for
this category
- Job 2 import category X as a dependency, try to acquire a lock
but can't, Job 2 is retried later, and when it is retried, it
sees the category X created by Job 1.

The lock is acquired until the end of the transaction.

Usage example:

::

lock_name = 'import_record({}, {}, {}, {})'.format(
self.backend_record._name,
self.backend_record.id,
self.model._name,
self.lefac_id,
)
if pg_try_advisory_lock(lock_name):
# do sync
else:
raise RetryableJobError('Could not acquire advisory lock',
seconds=2,
ignore_retry=True)

:param env: the Odoo Environment
:param lock: The lock name. Can be anything convertible to a
string. It needs to represents what should not be synchronized
concurrently so usually the string will contain at least: the
action, the backend type, the backend id, the model name, the
external id
:return True/False whether lock was acquired.
"""
hasher = hashlib.sha1()
hasher.update('{}'.format(lock))
# pg_lock accepts an int8 so we build an hash composed with
# contextual information and we throw away some bits
int_lock = struct.unpack('q', hasher.digest()[:8])

env.cr.execute('SELECT pg_try_advisory_xact_lock(%s);', (int_lock,))
acquired = env.cr.fetchone()[0]
return acquired
77 changes: 66 additions & 11 deletions connector/tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,31 @@
import mock
import unittest2

from openerp import api
from openerp.modules.registry import RegistryManager
from openerp.tests import common
from openerp.addons.connector import connector
from openerp.addons.connector.connector import (ConnectorUnit,
ConnectorEnvironment)
from openerp.addons.connector.exception import RetryableJobError
from openerp.addons.connector.connector import (
ConnectorEnvironment,
ConnectorUnit,
pg_try_advisory_lock,
)
from openerp.addons.connector.session import ConnectorSession


def mock_connector_unit(env):
session = ConnectorSession(env.cr, env.uid,
context=env.context)
backend_record = mock.Mock(name='BackendRecord')
backend = mock.Mock(name='Backend')
backend_record.get_backend.return_value = backend
connector_env = connector.ConnectorEnvironment(backend_record,
session,
'res.users')
return ConnectorUnit(connector_env)


class ConnectorHelpers(unittest2.TestCase):

def test_openerp_module_name(self):
Expand Down Expand Up @@ -122,15 +140,7 @@ def test_instance(self):
class ModelUnit(ConnectorUnit):
_model_name = 'res.users'

session = ConnectorSession(self.env.cr, self.env.uid,
context=self.env.context)
backend_record = mock.Mock(name='BackendRecord')
backend = mock.Mock(name='Backend')
backend_record.get_backend.return_value = backend
connector_env = connector.ConnectorEnvironment(backend_record,
session,
'res.users')
unit = ConnectorUnit(connector_env)
unit = mock_connector_unit(self.env)
self.assertEqual(unit.model, self.env['res.users'])
self.assertEqual(unit.env, self.env)
self.assertEqual(unit.localcontext, self.env.context)
Expand Down Expand Up @@ -177,3 +187,48 @@ def __init__(self, backend_record, session, model_name, api=None):

self.assertEqual(type(new_env), MyConnectorEnvironment)
self.assertEqual(new_env.api, api)


class TestAdvisoryLock(common.TransactionCase):

def setUp(self):
super(TestAdvisoryLock, self).setUp()
self.registry2 = RegistryManager.get(common.get_db_name())
self.cr2 = self.registry2.cursor()
self.env2 = api.Environment(self.cr2, self.env.uid, {})

@self.addCleanup
def reset_cr2():
# rollback and close the cursor, and reset the environments
self.env2.reset()
self.cr2.rollback()
self.cr2.close()

def test_concurrent_lock(self):
""" 2 concurrent transactions cannot acquire the same lock """
lock = 'import_record({}, {}, {}, {})'.format(
'backend.name',
1,
'res.partner',
'999999',
)
acquired = pg_try_advisory_lock(self.env, lock)
self.assertTrue(acquired)
inner_acquired = pg_try_advisory_lock(self.env2, lock)
self.assertFalse(inner_acquired)

def test_concurrent_import_lock(self):
""" A 2nd concurrent transaction must retry """
lock = 'import_record({}, {}, {}, {})'.format(
'backend.name',
1,
'res.partner',
'999999',
)
connector_unit = mock_connector_unit(self.env)
with connector_unit.try_advisory_lock(lock):
connector_unit2 = mock_connector_unit(self.env2)
with self.assertRaises(RetryableJobError) as cm:
with connector_unit2.try_advisory_lock(lock, retry_seconds=3):
pass
self.assertEquals(cm.exception.seconds, 3)
0