bsddb module updated to version 4.7.2devel9.

This patch publishes the work done until now
for Python 3.0 compatibility. Still a lot
to be done.

When possible, we use 3.0 features in Python 2.6,
easing development and testing, and exposing internal
changes to a wider audience, for better test coverage.

Some mode details:
http://www.jcea.es/programacion/pybsddb.htm#bsddb3-4.7.2
This commit is contained in:
Jesus Cea 2008-07-23 11:38:42 +00:00
parent 30e208d525
commit c5a11fabdb
16 changed files with 1395 additions and 752 deletions

View File

@ -33,18 +33,25 @@
#----------------------------------------------------------------------
"""Support for Berkeley DB 4.x with a simple interface.
"""Support for Berkeley DB 4.0 through 4.7 with a simple interface.
For the full featured object oriented interface use the bsddb.db module
instead. It mirrors the Oracle Berkeley DB C API.
"""
import sys
absolute_import = (sys.version_info[0] >= 3)
try:
if __name__ == 'bsddb3':
# import _pybsddb binary as it should be the more recent version from
# a standalone pybsddb addon package than the version included with
# python as bsddb._bsddb.
import _pybsddb
if absolute_import :
# Because this syntaxis is not valid before Python 2.5
exec("from . import _pybsddb")
else :
import _pybsddb
_bsddb = _pybsddb
from bsddb3.dbutils import DeadlockWrap as _DeadlockWrap
else:
@ -66,9 +73,16 @@
import sys, os
import UserDict
from weakref import ref
class _iter_mixin(UserDict.DictMixin):
if sys.version_info[0:2] <= (2, 5) :
import UserDict
MutableMapping = UserDict.DictMixin
else :
import collections
MutableMapping = collections.MutableMapping
class _iter_mixin(MutableMapping):
def _make_iter_cursor(self):
cur = _DeadlockWrap(self.db.cursor)
key = id(cur)
@ -115,8 +129,12 @@ def __iter__(self):
except _bsddb.DBCursorClosedError:
# the database was modified during iteration. abort.
pass
finally:
# When Python 2.3 not supported in bsddb3, we can change this to "finally"
except :
self._in_iter -= 1
raise
self._in_iter -= 1
def iteritems(self):
if not self.db:
@ -154,8 +172,12 @@ def iteritems(self):
except _bsddb.DBCursorClosedError:
# the database was modified during iteration. abort.
pass
finally:
# When Python 2.3 not supported in bsddb3, we can change this to "finally"
except :
self._in_iter -= 1
raise
self._in_iter -= 1
class _DBWithCursor(_iter_mixin):
@ -228,6 +250,12 @@ def __len__(self):
self._checkOpen()
return _DeadlockWrap(lambda: len(self.db)) # len(self.db)
if sys.version_info[0:2] >= (2, 6) :
def __repr__(self) :
if self.isOpen() :
return repr(dict(_DeadlockWrap(self.db.items)))
return repr(dict())
def __getitem__(self, key):
self._checkOpen()
return _DeadlockWrap(lambda: self.db[key]) # self.db[key]
@ -407,8 +435,6 @@ def _checkflag(flag, file):
try:
import thread
del thread
if db.version() < (3, 3, 0):
db.DB_THREAD = 0
except ImportError:
db.DB_THREAD = 0

View File

@ -37,15 +37,24 @@
# case we ever want to augment the stuff in _db in any way. For now
# it just simply imports everything from _db.
if __name__.startswith('bsddb3.'):
# import _pybsddb binary as it should be the more recent version from
# a standalone pybsddb addon package than the version included with
# python as bsddb._bsddb.
from _pybsddb import *
from _pybsddb import __version__
else:
from _bsddb import *
from _bsddb import __version__
import sys
absolute_import = (sys.version_info[0] >= 3)
if version() < (3, 2, 0):
raise ImportError, "correct Berkeley DB symbols not found. Perhaps python was statically linked with an older version?"
if not absolute_import :
if __name__.startswith('bsddb3.') :
# import _pybsddb binary as it should be the more recent version from
# a standalone pybsddb addon package than the version included with
# python as bsddb._bsddb.
from _pybsddb import *
from _pybsddb import __version__
else:
from _bsddb import *
from _bsddb import __version__
else :
# Because this syntaxis is not valid before Python 2.5
if __name__.startswith('bsddb3.') :
exec("from ._pybsddb import *")
exec("from ._pybsddb import __version__")
else :
exec("from ._bsddb import *")
exec("from ._bsddb import __version__")

View File

@ -21,13 +21,24 @@
# added to _bsddb.c.
#
import db
import sys
absolute_import = (sys.version_info[0] >= 3)
if absolute_import :
# Because this syntaxis is not valid before Python 2.5
exec("from . import db")
else :
import db
try:
from UserDict import DictMixin
except ImportError:
# DictMixin is new in Python 2.3
class DictMixin: pass
if sys.version_info[0:2] <= (2, 5) :
try:
from UserDict import DictMixin
except ImportError:
# DictMixin is new in Python 2.3
class DictMixin: pass
MutableMapping = DictMixin
else :
import collections
MutableMapping = collections.MutableMapping
class DBEnv:
def __init__(self, *args, **kwargs):
@ -96,9 +107,8 @@ def log_archive(self, *args, **kwargs):
def set_get_returns_none(self, *args, **kwargs):
return apply(self._cobj.set_get_returns_none, args, kwargs)
if db.version() >= (4,0):
def log_stat(self, *args, **kwargs):
return apply(self._cobj.log_stat, args, kwargs)
def log_stat(self, *args, **kwargs):
return apply(self._cobj.log_stat, args, kwargs)
if db.version() >= (4,1):
def dbremove(self, *args, **kwargs):
@ -113,7 +123,7 @@ def lsn_reset(self, *args, **kwargs):
return apply(self._cobj.lsn_reset, args, kwargs)
class DB(DictMixin):
class DB(MutableMapping):
def __init__(self, dbenv, *args, **kwargs):
# give it the proper DBEnv C object that its expecting
self._cobj = apply(db.DB, (dbenv._cobj,) + args, kwargs)
@ -128,6 +138,10 @@ def __setitem__(self, key, value):
def __delitem__(self, arg):
del self._cobj[arg]
if sys.version_info[0:2] >= (2, 6) :
def __iter__(self) :
return self._cobj.__iter__()
def append(self, *args, **kwargs):
return apply(self._cobj.append, args, kwargs)
def associate(self, *args, **kwargs):

View File

@ -30,11 +30,17 @@
#------------------------------------------------------------------------
import cPickle
import db
import sys
#At version 2.3 cPickle switched to using protocol instead of bin and
#DictMixin was added
import sys
absolute_import = (sys.version_info[0] >= 3)
if absolute_import :
# Because this syntaxis is not valid before Python 2.5
exec("from . import db")
else :
import db
#At version 2.3 cPickle switched to using protocol instead of bin
if sys.version_info[:3] >= (2, 3, 0):
HIGHEST_PROTOCOL = cPickle.HIGHEST_PROTOCOL
# In python 2.3.*, "cPickle.dumps" accepts no
@ -47,13 +53,22 @@ def _dumps(object, protocol):
def _dumps(object, protocol):
return cPickle.dumps(object, protocol=protocol)
from UserDict import DictMixin
else:
HIGHEST_PROTOCOL = None
def _dumps(object, protocol):
return cPickle.dumps(object, bin=protocol)
class DictMixin: pass
if sys.version_info[0:2] <= (2, 5) :
try:
from UserDict import DictMixin
except ImportError:
# DictMixin is new in Python 2.3
class DictMixin: pass
MutableMapping = DictMixin
else :
import collections
MutableMapping = collections.MutableMapping
#------------------------------------------------------------------------
@ -96,7 +111,7 @@ def open(filename, flags=db.DB_CREATE, mode=0660, filetype=db.DB_HASH,
class DBShelveError(db.DBError): pass
class DBShelf(DictMixin):
class DBShelf(MutableMapping):
"""A shelf to hold pickled objects, built upon a bsddb DB object. It
automatically pickles/unpickles data objects going to/from the DB.
"""
@ -147,6 +162,10 @@ def keys(self, txn=None):
else:
return self.db.keys()
if sys.version_info[0:2] >= (2, 6) :
def __iter__(self) :
return self.db.__iter__()
def open(self, *args, **kwargs):
self.db.open(*args, **kwargs)

View File

@ -22,7 +22,6 @@
import copy
import random
import struct
from types import ListType, StringType
import cPickle as pickle
try:
@ -229,7 +228,7 @@ def CreateTable(self, table, columns):
raises TableDBError if it already exists or for other DB errors.
"""
assert isinstance(columns, ListType)
assert isinstance(columns, list)
txn = None
try:
# checking sanity of the table and column names here on
@ -270,7 +269,7 @@ def ListTableColumns(self, table):
"""Return a list of columns in the given table.
[] if the table doesn't exist.
"""
assert isinstance(table, StringType)
assert isinstance(table, str)
if contains_metastrings(table):
raise ValueError, "bad table name: contains reserved metastrings"
@ -300,7 +299,7 @@ def CreateOrExtendTable(self, table, columns):
additional columns present in the given list as well as
all of its current columns.
"""
assert isinstance(columns, ListType)
assert isinstance(columns, list)
try:
self.CreateTable(table, columns)
except TableAlreadyExists:

View File

@ -26,7 +26,13 @@
#
from time import sleep as _sleep
import db
import sys
absolute_import = (sys.version_info[0] >= 3)
if absolute_import :
# Because this syntaxis is not valid before Python 2.5
exec("from . import db")
else :
import db
# always sleep at least N seconds between retrys
_deadlock_MinSleepTime = 1.0/128

View File

@ -7,15 +7,24 @@
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db
import bsddb3 as bsddb
except ImportError:
# For Python 2.3
from bsddb import db
import bsddb
try:
from bsddb3 import test_support
except ImportError:
from test import test_support
try:
from threading import Thread, currentThread
del Thread, currentThread
have_threads = True
except ImportError:
have_threads = False
verbose = 0
if 'verbose' in sys.argv:
verbose = 1
@ -33,6 +42,8 @@ def print_versions():
print 'bsddb.db.version(): %s' % (db.version(), )
print 'bsddb.db.__version__: %s' % db.__version__
print 'bsddb.db.cvsid: %s' % db.cvsid
print 'py module: %s' % bsddb.__file__
print 'extension module: %s' % bsddb._bsddb.__file__
print 'python version: %s' % sys.version
print 'My pid: %s' % os.getpid()
print '-=' * 38
@ -81,11 +92,11 @@ def set_test_path_prefix(path) :
def remove_test_path_directory() :
test_support.rmtree(get_new_path.prefix)
try :
if have_threads :
import threading
get_new_path.mutex=threading.Lock()
del threading
except ImportError:
else :
class Lock(object) :
def acquire(self) :
pass
@ -104,8 +115,12 @@ def testPrintVersions(self):
# This little hack is for when this module is run as main and all the
# other modules import it so they will still be able to get the right
# verbose setting. It's confusing but it works.
import test_all
test_all.verbose = verbose
if sys.version_info[0] < 3 :
import test_all
test_all.verbose = verbose
else :
import sys
print >>sys.stderr, "Work to do!"
def suite(module_prefix='', timing_check=None):

View File

@ -6,14 +6,8 @@
import time
from pprint import pprint
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from test_all import verbose, get_new_environment_path
from test_all import verbose, have_threads, get_new_environment_path
try:
# For Pythons w/distutils pybsddb
@ -435,24 +429,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
def test_suite():
suite = unittest.TestSuite()
if db.version() >= (3, 3, 11):
suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
suite.addTest(unittest.makeSuite(AssociateHashTestCase))
suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
suite.addTest(unittest.makeSuite(AssociateHashTestCase))
suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
if db.version() >= (4, 1):
suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
if db.version() >= (4, 1):
suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
if have_threads:
suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
if have_threads:
suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
return suite

View File

@ -101,14 +101,14 @@ def tearDown(self):
def populateDB(self, _txn=None):
d = self.d
for x in range(self._numKeys/2):
for x in range(self._numKeys//2):
key = '%04d' % (self._numKeys - x) # insert keys in reverse order
data = self.makeData(key)
d.put(key, data, _txn)
d.put('empty value', '', _txn)
for x in range(self._numKeys/2-1):
for x in range(self._numKeys//2-1):
key = '%04d' % x # and now some in forward order
data = self.makeData(key)
d.put(key, data, _txn)
@ -536,10 +536,6 @@ def test05_GetSize(self):
#----------------------------------------
def test06_Truncate(self):
if db.version() < (3,3):
# truncate is a feature of Berkeley DB 3.3 and above
return
d = self.d
if verbose:
print '\n', '-=' * 30
@ -681,12 +677,11 @@ def test06_Transactions(self):
except db.DBIncompleteError:
pass
if db.version() >= (4,0):
statDict = self.env.log_stat(0);
self.assert_(statDict.has_key('magic'))
self.assert_(statDict.has_key('version'))
self.assert_(statDict.has_key('cur_file'))
self.assert_(statDict.has_key('region_nowait'))
statDict = self.env.log_stat(0);
self.assert_(statDict.has_key('magic'))
self.assert_(statDict.has_key('version'))
self.assert_(statDict.has_key('cur_file'))
self.assert_(statDict.has_key('region_nowait'))
# must have at least one log file present:
logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
@ -703,10 +698,6 @@ def test06_Transactions(self):
#----------------------------------------
def test07_TxnTruncate(self):
if db.version() < (3,3):
# truncate is a feature of Berkeley DB 3.3 and above
return
d = self.d
if verbose:
print '\n', '-=' * 30
@ -956,6 +947,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase):
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
class PrivateObject(unittest.TestCase) :
import sys
if sys.version_info[:3] < (2, 4, 0):
def assertTrue(self, expr, msg=None):
self.failUnless(expr,msg=msg)
def tearDown(self) :
del self.obj
def test01_DefaultIsNone(self) :
self.assertEqual(self.obj.get_private(), None)
def test02_assignment(self) :
a = "example of private object"
self.obj.set_private(a)
b = self.obj.get_private()
self.assertTrue(a is b) # Object identity
def test03_leak_assignment(self) :
import sys
a = "example of private object"
refcount = sys.getrefcount(a)
self.obj.set_private(a)
self.assertEqual(refcount+1, sys.getrefcount(a))
self.obj.set_private(None)
self.assertEqual(refcount, sys.getrefcount(a))
def test04_leak_GC(self) :
import sys
a = "example of private object"
refcount = sys.getrefcount(a)
self.obj.set_private(a)
self.obj = None
self.assertEqual(refcount, sys.getrefcount(a))
class DBEnvPrivateObject(PrivateObject) :
def setUp(self) :
self.obj = db.DBEnv()
class DBPrivateObject(PrivateObject) :
def setUp(self) :
self.obj = db.DB()
class CrashAndBurn(unittest.TestCase) :
def test01_OpenCrash(self) :
# See http://bugs.python.org/issue3307
self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535)
#----------------------------------------------------------------------
#----------------------------------------------------------------------
@ -979,6 +1019,9 @@ def test_suite():
suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
suite.addTest(unittest.makeSuite(DBEnvPrivateObject))
suite.addTest(unittest.makeSuite(DBPrivateObject))
#suite.addTest(unittest.makeSuite(CrashAndBurn))
return suite

View File

@ -240,9 +240,8 @@ def test_suite ():
res = unittest.TestSuite ()
res.addTest (unittest.makeSuite (ComparatorTests))
if db.version () >= (3, 3, 11):
res.addTest (unittest.makeSuite (BtreeExceptionsTestCase))
res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase))
res.addTest (unittest.makeSuite (BtreeExceptionsTestCase))
res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase))
return res
if __name__ == '__main__':

View File

@ -4,15 +4,11 @@
import time
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from test_all import verbose, get_new_environment_path, get_new_database_path
from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path
if have_threads :
from threading import Thread, currentThread
try:
# For Pythons w/distutils pybsddb
@ -62,8 +58,7 @@ def test01_simple(self):
self.env.lock_put(lock)
if verbose:
print "Released lock: %s" % lock
if db.version() >= (4,0):
self.env.lock_id_free(anID)
self.env.lock_id_free(anID)
def test02_threaded(self):
@ -132,9 +127,8 @@ def deadlock_detection() :
self.env.lock_put(lock)
t.join()
if db.version() >= (4,0):
self.env.lock_id_free(anID)
self.env.lock_id_free(anID2)
self.env.lock_id_free(anID)
self.env.lock_id_free(anID2)
if db.version() >= (4,6):
self.assertTrue(deadlock_detection.count>0)
@ -159,8 +153,7 @@ def theThread(self, lockType):
if verbose:
print "%s: Released %s lock: %s" % (name, lt, lock)
if db.version() >= (4,0):
self.env.lock_id_free(anID)
self.env.lock_id_free(anID)
#----------------------------------------------------------------------

View File

@ -47,6 +47,18 @@ def test03_repr_closed_db(self):
rp = repr(db)
self.assertEquals(rp, "{}")
def test04_repr_db(self) :
db = hashopen(self.filename)
d = {}
for i in xrange(100) :
db[repr(i)] = repr(100*i)
d[repr(i)] = repr(100*i)
db.close()
db = hashopen(self.filename)
rp = repr(db)
self.assertEquals(rp, repr(d))
db.close()
# http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
#
# See the bug report for details.
@ -54,7 +66,7 @@ def test03_repr_closed_db(self):
# The problem was that make_key_dbt() was not allocating a copy of
# string keys but FREE_DBT() was always being told to free it when the
# database was opened with DB_THREAD.
def test04_double_free_make_key_dbt(self):
def test05_double_free_make_key_dbt(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_BTREE,
@ -67,7 +79,7 @@ def test04_double_free_make_key_dbt(self):
db1.close()
os.unlink(self.filename)
def test05_key_with_null_bytes(self):
def test06_key_with_null_bytes(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
@ -86,7 +98,7 @@ def test05_key_with_null_bytes(self):
db1.close()
os.unlink(self.filename)
def test_DB_set_flags_persists(self):
def test07_DB_set_flags_persists(self):
if db.version() < (4,2):
# The get_flags API required for this to work is only available
# in Berkeley DB >= 4.2

View File

@ -12,7 +12,7 @@
# For Python 2.3
from bsddb import db
from test_all import get_new_environment_path, get_new_database_path
from test_all import have_threads, get_new_environment_path, get_new_database_path
try:
from bsddb3 import test_support
@ -58,6 +58,25 @@ def client_startupdone(a,b,c) :
self.dbenvMaster.set_event_notify(confirmed_master)
self.dbenvClient.set_event_notify(client_startupdone)
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
self.dbMaster = self.dbClient = None
def tearDown(self):
if self.dbClient :
self.dbClient.close()
if self.dbMaster :
self.dbMaster.close()
self.dbenvClient.close()
self.dbenvMaster.close()
test_support.rmtree(self.homeDirClient)
test_support.rmtree(self.homeDirMaster)
def test01_basic_replication(self) :
master_port = test_support.find_unused_port()
self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
client_port = test_support.find_unused_port()
@ -69,6 +88,27 @@ def client_startupdone(a,b,c) :
self.dbenvMaster.rep_set_priority(10)
self.dbenvClient.rep_set_priority(0)
self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
self.assertEquals(self.dbenvMaster.rep_get_timeout(
db.DB_REP_CONNECTION_RETRY), 100123)
self.assertEquals(self.dbenvClient.rep_get_timeout(
db.DB_REP_CONNECTION_RETRY), 100321)
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
self.assertEquals(self.dbenvMaster.rep_get_timeout(
db.DB_REP_ELECTION_TIMEOUT), 100234)
self.assertEquals(self.dbenvClient.rep_get_timeout(
db.DB_REP_ELECTION_TIMEOUT), 100432)
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
self.assertEquals(self.dbenvMaster.rep_get_timeout(
db.DB_REP_ELECTION_RETRY), 100345)
self.assertEquals(self.dbenvClient.rep_get_timeout(
db.DB_REP_ELECTION_RETRY), 100543)
self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
@ -84,23 +124,14 @@ def client_startupdone(a,b,c) :
self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
db.DB_REPMGR_ACKS_ALL)
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
self.dbMaster = self.dbClient = None
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
# is not generated if the master has no new transactions.
# This is solved in BDB 4.6 (#15542).
timeout = time.time()+10
import time
timeout = time.time()+2
while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
time.sleep(0.02)
if db.version() >= (4,6) :
self.assertTrue(time.time()<timeout)
else :
self.assertTrue(time.time()>=timeout)
self.assertTrue(time.time()<timeout)
d = self.dbenvMaster.repmgr_site_list()
self.assertEquals(len(d), 1)
@ -120,17 +151,6 @@ def client_startupdone(a,b,c) :
d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
self.assertTrue("msgs_queued" in d)
def tearDown(self):
if self.dbClient :
self.dbClient.close()
if self.dbMaster :
self.dbMaster.close()
self.dbenvClient.close()
self.dbenvMaster.close()
test_support.rmtree(self.homeDirClient)
test_support.rmtree(self.homeDirMaster)
def test01_basic_replication(self) :
self.dbMaster=db.DB(self.dbenvMaster)
txn=self.dbenvMaster.txn_begin()
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
@ -179,11 +199,221 @@ def test01_basic_replication(self) :
txn.commit()
self.assertEquals(None, v)
class DBBaseReplication(DBReplicationManager):
def setUp(self) :
DBReplicationManager.setUp(self)
def confirmed_master(a,b,c) :
if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
self.confirmed_master = True
def client_startupdone(a,b,c) :
if b == db.DB_EVENT_REP_STARTUPDONE :
self.client_startupdone = True
self.dbenvMaster.set_event_notify(confirmed_master)
self.dbenvClient.set_event_notify(client_startupdone)
import Queue
self.m2c = Queue.Queue()
self.c2m = Queue.Queue()
# There are only two nodes, so we don't need to
# do any routing decision
def m2c(dbenv, control, rec, lsnp, envid, flags) :
self.m2c.put((control, rec))
def c2m(dbenv, control, rec, lsnp, envid, flags) :
self.c2m.put((control, rec))
self.dbenvMaster.rep_set_transport(13,m2c)
self.dbenvMaster.rep_set_priority(10)
self.dbenvClient.rep_set_transport(3,c2m)
self.dbenvClient.rep_set_priority(0)
self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
self.assertEquals(self.dbenvClient.rep_get_priority(),0)
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
def thread_master() :
return self.thread_do(self.dbenvMaster, self.c2m, 3,
self.master_doing_election, True)
def thread_client() :
return self.thread_do(self.dbenvClient, self.m2c, 13,
self.client_doing_election, False)
from threading import Thread
t_m=Thread(target=thread_master)
t_m.setDaemon(True)
t_c=Thread(target=thread_client)
t_c.setDaemon(True)
self.t_m = t_m
self.t_c = t_c
self.dbMaster = self.dbClient = None
self.master_doing_election=[False]
self.client_doing_election=[False]
def tearDown(self):
if self.dbClient :
self.dbClient.close()
if self.dbMaster :
self.dbMaster.close()
self.m2c.put(None)
self.c2m.put(None)
self.t_m.join()
self.t_c.join()
self.dbenvClient.close()
self.dbenvMaster.close()
test_support.rmtree(self.homeDirClient)
test_support.rmtree(self.homeDirMaster)
def basic_rep_threading(self) :
self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
def thread_do(env, q, envid, election_status, must_be_master) :
while True :
v=q.get()
if v == None : return
env.rep_process_message(v[0], v[1], envid)
self.thread_do = thread_do
self.t_m.start()
self.t_c.start()
def test01_basic_replication(self) :
self.basic_rep_threading()
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
# is not generated if the master has no new transactions.
# This is solved in BDB 4.6 (#15542).
import time
timeout = time.time()+2
while (time.time()<timeout) and not (self.confirmed_master and
self.client_startupdone) :
time.sleep(0.02)
self.assertTrue(time.time()<timeout)
self.dbMaster=db.DB(self.dbenvMaster)
txn=self.dbenvMaster.txn_begin()
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
txn.commit()
import time,os.path
timeout=time.time()+10
while (time.time()<timeout) and \
not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
time.sleep(0.01)
self.dbClient=db.DB(self.dbenvClient)
while True :
txn=self.dbenvClient.txn_begin()
try :
self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
mode=0666, txn=txn)
except db.DBRepHandleDeadError :
txn.abort()
self.dbClient.close()
self.dbClient=db.DB(self.dbenvClient)
continue
txn.commit()
break
txn=self.dbenvMaster.txn_begin()
self.dbMaster.put("ABC", "123", txn=txn)
txn.commit()
import time
timeout=time.time()+1
v=None
while (time.time()<timeout) and (v==None) :
txn=self.dbenvClient.txn_begin()
v=self.dbClient.get("ABC", txn=txn)
txn.commit()
self.assertEquals("123", v)
txn=self.dbenvMaster.txn_begin()
self.dbMaster.delete("ABC", txn=txn)
txn.commit()
timeout=time.time()+1
while (time.time()<timeout) and (v!=None) :
txn=self.dbenvClient.txn_begin()
v=self.dbClient.get("ABC", txn=txn)
txn.commit()
self.assertEquals(None, v)
if db.version() >= (4,7) :
def test02_test_request(self) :
self.basic_rep_threading()
(minimum, maximum) = self.dbenvClient.rep_get_request()
self.dbenvClient.rep_set_request(minimum-1, maximum+1)
self.assertEqual(self.dbenvClient.rep_get_request(),
(minimum-1, maximum+1))
if db.version() >= (4,6) :
def test03_master_election(self) :
# Get ready to hold an election
#self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
def thread_do(env, q, envid, election_status, must_be_master) :
while True :
v=q.get()
if v == None : return
r = env.rep_process_message(v[0],v[1],envid)
if must_be_master and self.confirmed_master :
self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
must_be_master = False
if r[0] == db.DB_REP_HOLDELECTION :
def elect() :
while True :
try :
env.rep_elect(2, 1)
election_status[0] = False
break
except db.DBRepUnavailError :
pass
if not election_status[0] and not self.confirmed_master :
from threading import Thread
election_status[0] = True
t=Thread(target=elect)
t.setDaemon(True)
t.start()
self.thread_do = thread_do
self.t_m.start()
self.t_c.start()
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
self.client_doing_election[0] = True
while True :
try :
self.dbenvClient.rep_elect(2, 1)
self.client_doing_election[0] = False
break
except db.DBRepUnavailError :
pass
self.assertTrue(self.confirmed_master)
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
if db.version() >= (4,5) :
if db.version() >= (4, 6) :
dbenv = db.DBEnv()
try :
dbenv.repmgr_get_ack_policy()
@ -194,6 +424,10 @@ def test_suite():
del dbenv
if ReplicationManager_available :
suite.addTest(unittest.makeSuite(DBReplicationManager))
if have_threads :
suite.addTest(unittest.makeSuite(DBBaseReplication))
return suite

View File

@ -7,20 +7,8 @@
import errno
from random import random
try:
True, False
except NameError:
True = 1
False = 0
DASH = '-'
try:
from threading import Thread, currentThread
have_threads = True
except ImportError:
have_threads = False
try:
WindowsError
except NameError:
@ -28,7 +16,10 @@ class WindowsError(Exception):
pass
import unittest
from test_all import verbose, get_new_environment_path, get_new_database_path
from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path
if have_threads :
from threading import Thread, currentThread
try:
@ -103,8 +94,8 @@ def test01_1WriterMultiReaders(self):
keys=range(self.records)
import random
random.shuffle(keys)
records_per_writer=self.records/self.writers
readers_per_writer=self.readers/self.writers
records_per_writer=self.records//self.writers
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@ -143,7 +134,7 @@ def writerThread(self, d, keys, readers):
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
count=len(keys)/len(readers)
count=len(keys)//len(readers)
count2=count
for x in keys :
key = '%04d' % x
@ -218,8 +209,8 @@ def test02_SimpleLocks(self):
keys=range(self.records)
import random
random.shuffle(keys)
records_per_writer=self.records/self.writers
readers_per_writer=self.readers/self.writers
records_per_writer=self.records//self.writers
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@ -258,7 +249,7 @@ def writerThread(self, d, keys, readers):
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
count=len(keys)/len(readers)
count=len(keys)//len(readers)
count2=count
for x in keys :
key = '%04d' % x
@ -332,8 +323,8 @@ def test03_ThreadedTransactions(self):
keys=range(self.records)
import random
random.shuffle(keys)
records_per_writer=self.records/self.writers
readers_per_writer=self.readers/self.writers
records_per_writer=self.records//self.writers
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@ -375,7 +366,7 @@ def test03_ThreadedTransactions(self):
def writerThread(self, d, keys, readers):
name = currentThread().getName()
count=len(keys)/len(readers)
count=len(keys)//len(readers)
while len(keys):
try:
txn = self.env.txn_begin(None, self.txnFlag)

File diff suppressed because it is too large Load Diff

View File

@ -105,7 +105,7 @@
#error "eek! DBVER can't handle minor versions > 9"
#endif
#define PY_BSDDB_VERSION "4.7.0"
#define PY_BSDDB_VERSION "4.7.2devel9"
/* Python object definitions */
@ -131,11 +131,11 @@ typedef struct {
u_int32_t flags; /* saved flags from open() */
int closed;
struct behaviourFlags moduleFlags;
#if (DBVER >= 40)
PyObject* event_notifyCallback;
#endif
struct DBObject *children_dbs;
struct DBTxnObject *children_txns;
PyObject *private;
PyObject *rep_transport;
PyObject *in_weakreflist; /* List of weak references */
} DBEnvObject;
@ -156,11 +156,10 @@ typedef struct DBObject {
struct DBObject *sibling_next;
struct DBObject **sibling_prev_p_txn;
struct DBObject *sibling_next_txn;
#if (DBVER >= 33)
PyObject* associateCallback;
PyObject* btCompareCallback;
int primaryDBType;
#endif
PyObject *private;
PyObject *in_weakreflist; /* List of weak references */
} DBObject;