WO2024059351A1 - Mixed-consistency concurrency control (mc3) techniques - Google Patents
Mixed-consistency concurrency control (mc3) techniques Download PDFInfo
- Publication number
- WO2024059351A1 WO2024059351A1 PCT/US2023/061172 US2023061172W WO2024059351A1 WO 2024059351 A1 WO2024059351 A1 WO 2024059351A1 US 2023061172 W US2023061172 W US 2023061172W WO 2024059351 A1 WO2024059351 A1 WO 2024059351A1
- Authority
- WO
- WIPO (PCT)
- Prior art keywords
- value
- replicas
- locked
- key
- read
- Prior art date
Links
- 238000000034 method Methods 0.000 title claims abstract description 76
- 230000004044 response Effects 0.000 claims abstract description 19
- 230000015654 memory Effects 0.000 claims description 35
- 230000003213 activating effect Effects 0.000 claims description 23
- 238000004891 communication Methods 0.000 claims description 8
- 238000003860 storage Methods 0.000 description 73
- 230000006870 function Effects 0.000 description 26
- 238000010200 validation analysis Methods 0.000 description 26
- 238000007726 management method Methods 0.000 description 11
- 238000012545 processing Methods 0.000 description 11
- 230000008901 benefit Effects 0.000 description 9
- 230000006399 behavior Effects 0.000 description 8
- 238000004590 computer program Methods 0.000 description 8
- 238000010586 diagram Methods 0.000 description 8
- 238000005457 optimization Methods 0.000 description 7
- 230000003287 optical effect Effects 0.000 description 6
- 230000003247 decreasing effect Effects 0.000 description 4
- 238000009826 distribution Methods 0.000 description 4
- 230000001960 triggered effect Effects 0.000 description 4
- 238000013459 approach Methods 0.000 description 3
- 238000002955 isolation Methods 0.000 description 3
- 238000012986 modification Methods 0.000 description 3
- 230000004048 modification Effects 0.000 description 3
- 230000010076 replication Effects 0.000 description 3
- 239000008186 active pharmaceutical agent Substances 0.000 description 2
- 230000008859 change Effects 0.000 description 2
- 230000008878 coupling Effects 0.000 description 2
- 238000010168 coupling process Methods 0.000 description 2
- 238000005859 coupling reaction Methods 0.000 description 2
- 238000013500 data storage Methods 0.000 description 2
- 230000003111 delayed effect Effects 0.000 description 2
- 238000013461 design Methods 0.000 description 2
- 238000005516 engineering process Methods 0.000 description 2
- 235000003642 hunger Nutrition 0.000 description 2
- 238000004519 manufacturing process Methods 0.000 description 2
- 239000002245 particle Substances 0.000 description 2
- 230000037351 starvation Effects 0.000 description 2
- 230000004075 alteration Effects 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000005540 biological transmission Effects 0.000 description 1
- 239000000969 carrier Substances 0.000 description 1
- 230000001364 causal effect Effects 0.000 description 1
- 230000001427 coherent effect Effects 0.000 description 1
- 238000010276 construction Methods 0.000 description 1
- 238000005520 cutting process Methods 0.000 description 1
- 230000001419 dependent effect Effects 0.000 description 1
- 230000000694 effects Effects 0.000 description 1
- 230000000977 initiatory effect Effects 0.000 description 1
- 230000003993 interaction Effects 0.000 description 1
- 230000007246 mechanism Effects 0.000 description 1
- 238000012544 monitoring process Methods 0.000 description 1
- 230000006855 networking Effects 0.000 description 1
- 230000008569 process Effects 0.000 description 1
- 238000011160 research Methods 0.000 description 1
- 239000004065 semiconductor Substances 0.000 description 1
- 238000012163 sequencing technique Methods 0.000 description 1
- 238000006467 substitution reaction Methods 0.000 description 1
- 230000001360 synchronised effect Effects 0.000 description 1
- 238000012546 transfer Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
- G06F16/2308—Concurrency control
- G06F16/2336—Pessimistic concurrency control approaches, e.g. locking or multiple versions without time stamps
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
- G06F16/2308—Concurrency control
- G06F16/2336—Pessimistic concurrency control approaches, e.g. locking or multiple versions without time stamps
- G06F16/2343—Locking methods, e.g. distributed locking or locking implementation details
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
- G06F16/273—Asynchronous replication or reconciliation
Definitions
- the present disclosure is related to storing data in a distributed datastore, and in particular to systems and methods for mixed-consistency concurrency control (MC 3 ) configurations for a multi-replica distributed keyvalue store to enable mixing eventual and strong consistency.
- MC 3 mixed-consistency concurrency control
- a distributed key-value store is a key aspect of a distributed system. By distributing large data sets over multiple servers, applications may benefit from increased processing performance.
- concurrency control (CC) schemes interleave read/write requests from multiple clients simultaneously, giving the illusion that each read/write transaction has exclusive access to the data.
- Distributed concurrency control refers to the concurrency control of a datastore distributed over a communication network.
- a computer-implemented method for updating a plurality of replicated data structures of a distributed datastore system includes decoding at least one read request for one or more key-value tuples of the replicated data structures.
- Each key-value tuple of the one or more of key-value tuples includes a key and a corresponding conflict-free replicated data type (CRDT) value.
- the method further includes locking replicas of each key-value tuple of the one or more of key- value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
- the method further includes merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request.
- the method further includes maintaining an update summary on each of the read-locked replicas.
- the method further includes decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set.
- the method further includes validating whether the read set in the commit request is unchanged from current CRDT values of the read- locked replicas.
- the method further includes activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas.
- the method further includes maintaining an update summary on each of the write- locked replicas.
- the method further includes accepting the commit request when the validating and the activating are successful.
- the method further includes unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas.
- the method further includes merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
- the method further includes updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked.
- the method further includes updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
- the accepting of the commit request further includes updating a current CRDT value of each of the write-locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
- the method further includes failing the locking of at least one of the replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction.
- the method further includes failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
- the method further includes ignoring a gossip of a CRDT value to a replica of a keyvalue tuple in the distributed datastore system from another replica if the replica is locked.
- the method further includes updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
- an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
- the method further includes incrementing a version number of the at least one keyvalue tuple of the write set when the commit request is accepted.
- the method further includes denying the commit request when one of the validating or the activating is unsuccessful.
- a node for updating a plurality of replicated data structures of a distributed datastore system.
- the node includes a memory storing instructions and at least one processor in communication with the memory.
- the at least one processor is configured, upon execution of the instructions, to perform operations including decoding at least one read request for one or more keyvalue tuples of the replicated data structures.
- Each key-value tuple of the one or more of key- value tuples includes a key and a corresponding conflict-free replicated datatype (CRDT) value.
- CRDT conflict-free replicated datatype
- the operations further include locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
- the operations further include merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request.
- the operations further include maintaining an update summary on each of the read-locked replicas.
- the operations further include decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set.
- the operations further include validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas.
- the operations further include activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas.
- the operations further include maintaining an update summary on each of the write-locked replicas.
- the operations further include accepting the commit request when the validating and the activating are successful.
- the operations further include unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas.
- the operations further include merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
- the operations further include updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked.
- the operations further include updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
- the operations for accepting the commit request include updating a current CRDT value of each of the write-locked replicas of at least one key- value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
- the operations further include failing the locking of at least one of the replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction.
- the operations further include failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
- the operations further include ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked.
- the operations further include updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
- an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
- the operations further include incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
- the operations further include denying the commit request when one of the validating or the activating is unsuccessful.
- a non-transitory computer-readable medium storing instructions for updating a plurality of replicated data structures of a distributed datastore system.
- the instructions when executed by one or more processors of a node, cause the one or more processors to perform operations including decoding at least one read request for one or more key-value tuples of the replicated data structures.
- Each key-value tuple of the one or more of key-value tuples comprising a key and a corresponding conflict-free replicated data type (CRDT) value.
- CRDT conflict-free replicated data type
- the operations further include locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
- the operations further include merging the locked CRDT values of the read- locked replicas into a merged CRDT value onto a read set in response to the at least one read request.
- the operations further include maintaining an update summary on each of the read-locked replicas.
- the operations further include decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set.
- the operations further include validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas.
- the operations further include activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas.
- the operations further include maintaining an update summary on each of the write-locked replicas.
- the operations further include accepting the commit request when the validating and the activating are successful.
- the operations further include unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas.
- the operations further include merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
- the operations further include updating a current CRDT value of a replica of a key- value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked.
- the operations further include updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read- locked or write -locked.
- the operations for accepting the commit request further include updating a current CRDT value of each of the write- locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
- the operations further include failing the locking of at least one of the replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction.
- the operations further include failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
- the operations further include ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked.
- the operations further include updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
- an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
- the operations further include incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
- the operations further include denying the commit request when one of the validating or the activating is unsuccessful.
- a system for updating a plurality of replicated data structures of a distributed datastore system includes means for decoding at least one read request for one or more key-value tuples of the replicated data structures.
- Each key-value tuple of the one or more of key-value tuples includes a key and a corresponding conflict-free replicated data type (CRDT) value.
- the system further includes means for locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
- the system further includes means for merging the locked CRDT values of the read- locked replicas into a merged CRDT value onto a read set in response to the at least one read request.
- the system further includes means for maintaining an update summary on each of the read-locked replicas.
- the system further includes means for decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set.
- the system further includes means for validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas.
- the system further includes means for activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas.
- the system further includes means for maintaining an update summary on each of the write-locked replicas.
- the system further includes means for accepting the commit request when the validating and the activating are successful.
- the system further includes means for unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas.
- the system further includes means for merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
- FIG. 1 illustrates a distributed datastore system configured with a storage node management module managing storage nodes, which is capable of performing concurrency control and can be used in connection with some example embodiments.
- FIG. 2 is a diagram of storage nodes of the distributed datastore system of FIG. 1 storing data structure replicas and hash tables, which can be used in connection with some example embodiments.
- FIG. 3 is a diagram of MC 3 -related data and event flows performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
- FIG. 4 - FIG. 10 illustrate example pseudocode associated with MC 3 -related functionalities performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
- FIG. 11 is a flowchart of a method suitable for updating a plurality of replicated data structures of a distributed datastore system, according to some example embodiments.
- FIG. 12 is a block diagram illustrating a representative software architecture, which may be used in conjunction with various device hardware described herein, according to some example embodiments.
- FIG. 13 is a block diagram illustrating circuitry for a device that implements algorithms and performs methods, according to some example embodiments.
- the functions or algorithms described herein may be implemented in software in one embodiment.
- the software may consist of computer-executable instructions stored on computer-readable media or a computer-readable storage device such as one or more non-transitory memories or other types of hardware -based storage devices, either local or networked.
- modules which may be software, hardware, firmware, or any combination thereof. Multiple functions may be performed in one or more modules as desired, and the embodiments described are merely examples.
- the software may be executed on a digital signal processor, ASIC, microprocessor, or other types of processor operating on a computer system, such as a personal computer, server, or another computer system, turning such computer system into a specifically programmed machine.
- the functionality can be configured to operate using, for instance, software, hardware, firmware, or the like.
- the phrase “configured to” can refer to a logic circuit structure of a hardware element that is to implement the associated functionality.
- the phrase “configured to” can also refer to a logic circuit structure of a hardware element that is to implement the coding design of associated functionality of firmware or software.
- the term “module” refers to a structural element that can be implemented using any suitable hardware (e.g., a processor, among others), software (e.g., an application, among others), firmware, or any combination of hardware, software, and firmware.
- the term “logic” encompasses any functionality for performing a task. For instance, each operation illustrated in the flowcharts corresponds to logic for performing that operation.
- An operation can be performed using, software, hardware, firmware, or the like.
- the terms “component,” “system,” and the like may refer to computer-related entities, hardware, and software in execution, firmware, or a combination thereof.
- a component may be a process running on a processor, an object, an executable, a program, a function, a subroutine, a computer, or a combination of software and hardware.
- the term “processor,” may refer to a hardware component, such as a processing unit of a computer system.
- the claimed subject matter may be implemented as a method, apparatus, or article of manufacture using standard programming and engineering techniques to produce software, firmware, hardware, or any combination thereof to control a computing device to implement the disclosed subject matter.
- article of manufacture is intended to encompass a computer program accessible from any computer-readable storage device or media.
- Computer-readable storage media can include but are not limited to, magnetic storage devices, e.g., hard disk, floppy disk, magnetic strips, optical disk, compact disk (CD), digital versatile disk (DVD), smart cards, and flash memory devices, among others.
- computer-readable media i.e., not storage media, may additionally include communication media such as transmission media for wireless signals and the like.
- CRDTs are a specific type of state-based object that is guaranteed to achieve eventual consistency.
- Each CRDT has a specific update function that is commutative, associative, and idempotent and updates its CRDT state in a monotonically non-decreasing fashion. Due to these features, a CRDT update can be committed to a replica without any performance-limiting coordination among the replicas.
- Distributed datastore clients can take advantage of high-performance CRDT updates.
- generic applications often find strong consistency updates indispensable at times.
- strong consistency updates including invariant-compliant updates and transactions, require coordination among replicas of the same tuple and even coordination among tuples in multiple storage nodes or shards.
- Bayou system studies are associated with theorems that describe the properties of a distributed system that mixes weak consistency updates and strong consistency updates.
- consistency updates can be associated with the following limitations. Firstly, it can be theoretically impossible to configure a generic system that ensures both eventual consistency guarantee for weak consistency updates and strong consistency guarantee for strong consistency updates.
- FEC fluctuating eventual consistency
- the Bayou system study and theorems can be considered when configuring mixed consistency support.
- the disclosed MC 3 -related techniques further contribute to this research topic in two aspects. Firstly, the disclosed MC 3 -related techniques (which can be part of an MC 3 protocol) can be configured using a state-based CRDT model. In comparison, the Bayou protocol is put together over an operation-based CRDT model. There are pros and cons of a state-based CRDT system in comparison to its equivalent operation-based CRDT system. For example, state-based CRDTs need fewer stipulations on the communication channel. Additionally, when a CRDT update occurs, the local state of a replica is updated through its CRDT update function.
- the full local state can be gossiped to other replicas at a lower rate than the rate of CRDT updates.
- AnnaDB® is one example of a state-based CRDT key-value store.
- the disclosed MC 3 -related techniques can be configured to support mixed consistency over a state-based CRDT system.
- the Bayou protocol covers the replica-level consistency, not the tuple-level consistency. That is, it addresses mixed consistency over multiple replicas of one tuple.
- the disclosed MC 3 -related techniques address mixed consistency over multiple tuples, each of them having multiple replicas.
- the disclosed MC 3 -related techniques can be used for mixing weak consistency updates and strong consistency updates on a distributed system.
- the disclosed MC 3 -related techniques can be applied to a state-based CRDT system where each key-value tuple has multiple replicas, while satisfying the fluctuating eventual consistency (FEC) alluded to by the Bayou theorems.
- the disclosed MC 3 -related techniques can be used for a transactional update on multi-sharded replicated key-value tuples, which is not addressed by the Bayou protocol.
- Example distributed concurrency control schemes include two- phase locking (2PL), snapshot isolation (SI), and Read Committed (RC). They are also common centralized concurrency control schemes. Each allows more concurrency, i.e., more schedules of concurrent transactions to be permitted and hence higher transaction throughput.
- the disclosed MC 3 -related techniques can be based on 2PL. However, 2PL is mostly related to singlereplica multi-tuple transaction concurrency control, while MC 3 is mostly related to replicated (i.e., having multiple replicas) multi-tuple transaction concurrency control. In some aspects, MC 3 can be configured to be coherent with the Bayou theorems for mixing concurrent CRDT updates and non-CRDT updates.
- the fluctuating eventual consistency (FEC) behavior can be related to the commit- back-in-time of a transaction of tuples, whose incremental CRDT update summaries are not empty.
- FEC fluctuating eventual consistency
- FIG. 1 illustrates a distributed datastore system 100 configured with a storage node management module managing storage nodes, which is capable of performing concurrency control and can be used in connection with some example embodiments.
- the distributed datastore system 100 includes transaction clients 120A, 120B, ... , 120C (collectively, at least one transaction client 120) communicatively coupled to storage nodes 150A, 150B, ... , 150C (collectively, at least one storage node 150) via network 160.
- Network 160 can include one or more of a wired network, a wireless network, and the Internet.
- Data in the datastore system 100 is distributed in multiple nodes (e.g., storage nodes 150A, . . . , 150C), and access to the data (e.g., for read operations, write operations, and concurrency control) is performed in a distributed fashion.
- the distributed datastore system 100 further includes a storage management module 130 configured to manage storage nodes 150A, ..., 150C.
- the storage management module 130 manages bringing up and down storage clusters associated with the at least one storage node 150.
- the at least one transaction client 120 is an application configured to access the at least one storage node 150 that provides MC 3 functionalities (e.g., using a coordinator 110 and a replica control module 112).
- the at least one transaction client 120 includes an MC 3 interface 115 (e.g., to MC 3 functionalities provided in connection with data stored by the at least one storage node 150).
- the storage nodes are managed by a central entity while each of the storage nodes has control software that works together in a distributed, networked environment to achieve data storage, replication, and concurrency control.
- the at least one storage node 150 includes at least one data structure replica 145 and a hash table 140 used in connection with the MC 3 functionalities discussed herein.
- the at least one data structure replica 145 can include one or more primary replicas of one or more data structures as well as one or more non-primary replicas of some other data structures.
- a more detailed description of the at least one storage node 150 including different data structure replicas is provided in connection with FIG. 2.
- the at least one storage node is multi-threaded, and replicas of the same tuple can reside on the same storage node.
- the at least one storage node 150 further includes a replica control module 112, which is a module configured to perform MC 3 functionalities (e.g., as discussed in connection with FIG. 3).
- the replica control module 112 uses a functional module such as a coordinator 110 in connection with the MC 3 functionalities.
- a coordinator 110 in connection with the MC 3 functionalities.
- one or more instances of the coordinator 110 can reside in one or more of transaction clients 120A, . . . , 120C.
- FIG. 1 illustrates optional coordinator 110 configured at transaction clients 120A and 120B (and not at transaction client 120C).
- FIG. 2 is a diagram of storage nodes 150A and 150B of the distributed datastore system 100 of FIG. 1 each storing data structure replicas and a hash table 140, which can be used in connection with some example embodiments.
- the storage nodes in FIG. 2 are illustrated to include only some of the components illustrated in FIG. 1 (e.g., each of the storage nodes 150A-150C includes a coordinator 110 and a replica control module 112 which are not illustrated in FIG. 2 for simplicity).
- the at least one data structure replica 145 includes a primary replica 208 stored at one storage node (at storage node 150A) as well as additional replicas 210, . . . , 212 of a data structure (e.g., key-value tuple 1) with each replica stored at a separate storage node3 (e.g., replica 210 is stored at storage node 15 OB and replica 212 is stored at storage node 150C).
- a storage node can store replicas of one or more other tuples (e.g., storage node 150C stores primary replica 220 of key-value tuple 2 and primary replica 222 of key-value tuple 3).
- the at least one storage node 150 is multi-threaded so that each of the storage nodes 150A, . . . , 150C can store multiple replicas of the same tuple.
- the distributed datastore system 100 implements key-value stores based on state -based CRDTs.
- Example implementations may support various CRDTs.
- Each instance of a CRDT is represented as the value part of a key-value tuple, addressable by the key part of the tuple.
- Each tuple may have one or more replicas distributed over several servers (e.g., the at least one storage node 150) of the key-value store.
- the replicas of a tuple can be kept consistent through a gossip mechanism, which is a low-rate, in-the -background broadcast of the local tuple state from one replica to the others.
- a replica receiving a gossip message uses a merging function to move the replica’s local state in a monotonically non-decreasing fashion.
- the disclosed MC 3 functionalities can be configured and performed using the following elements of the distributed datastore system 100 (discussed in greater detail herein below): MC 3 interface 115, a primary replica of the at least one data structure replica 145, coordinator 110, and MC 3 -related data structures.
- the at least one data structure replica 145 can include one or more primary replicas of one or more data structures as well as one or more non-primary replicas of some other data structures.
- storage node 150A can store a primary replica 208 (e.g., of key-value tuple 1), while storage node 150B can store additional (nonprimary) replicas 210, ..., 210 of key-value tuple 1.
- storage node 150B further stores other replicas of other tuples (e.g., primary replica 210 of key-value tuple 2 and primary replica 212 of key-value tuple 3).
- a key-value store that supports mixed consistency can be configured with the MC 3 interface 115 for accessing MC 3 functionalities (e.g., functionalities provided by the MC 3 management module 130).
- the MC 3 interface 115 can be configured as an application interface such as a CRDT update operation which moves the CRDT state in accordance with its monotonicity.
- a CRDT is an integer with a maximum() update function.
- An example instance of such CRDT can have a value of 10.
- a CRDT update operation with an operand of 15 will update the instance value to 15 because the maximum value of 10 and 15 is 15.
- a following CRDT update operation with an operand of 12 will leave the instance value unchanged because the maximum value of 15 and 12 is 15 even though the CRDT update operation is approved and considered successful.
- the CRDT update interface is different from an ordinary key- value store update interface.
- the latter sets the value of the tuple with the specified operand.
- Setting the value of a tuple can be a non-CRDT operation, which can act against the monotonicity of a CRDT. Using the above example, setting the tuple value to 12 can violate the result that the maximum () function may produce, decreasing the tuple value.
- a non-CRDT operation implies replica coordination at least to some degree, i.e., strong consistency handling may be used.
- a non-CRDT operation is not commutative with respect to an adjacent CRDT operation. Therefore, mixing a non-CRDT operation and a CRDT operation follow the limitations alluded to by the Bayou theorems.
- the MC 3 interface 115 can be configured as a transaction operation.
- the transaction operation interface can be defined as a set of associated interfaces: txn_begin(), txn_read(), txn_write(), txn_abort(), and txn_commit().
- a transaction consists of zero or more reads, zero or more possibly conditional computations, and zero or more writes, and it may involve one or more tuples.
- a transaction implies the atomicity, strong consistency, and isolation properties of the operations within the transaction.
- a non-CRDT operation or a strong consistency operation can be framed as a transaction.
- an invariantpreserving CRDT operation can be framed as a transaction.
- one CRDT is a set of natural numbers that do not exceed 10,000 elements with a unionQ update function.
- the invariant-compliant CRDT update operation can be implemented by creating a transaction (using txn_begin()) involving reading and caching the set (using txn_read()), adding (using union()) the operand to the cached set, counting the number of elements in the cached set, aborting the transaction (using txn_abort()) if the number of elements exceeds 10,000, using the cached set as the write value (using txn_write()), and requesting to commit the transaction (using txn_commit()) if the number of elements does not exceed 10,000.
- the txn_commit() may be either approved or denied by the strong consistency validation function of the key-value store. Therefore, txn_commit() can be considered as a request to commit the write set of tuples given the read set of tuples.
- the write set tuple values can be written to the store.
- the transaction operation interface can be sufficiently generic to cover most, if not all, operations that require strong consistency handling.
- the disclosed MC 3 functionalities are discussed in connection with the transaction operation interface and the CRDT update interface.
- Each tuple can be associated with one or more replicas (e.g., data structure replicas 145, including a primary replica 208 and one or more additional replicas 210, . . . , 212).
- the store (or the MC 3 management module 130) can be configured to select one of them as the primary replica of the tuple (e.g., primary replica 208 of key-value tuple 1).
- the selection of the primary replica can be based on ensuring that there is only one primary replica for each tuple at any time.
- the replicas of the same tuple are distributed to some storage nodes (e.g., one replica to each of the storage nodes). In some aspects, one of the replicas is selected as the primary replica.
- the update processing on the primary replica is slightly different from that on the non-primary replicas.
- a CRDT update operation can be handled in the same way on all replicas.
- a weak consistency update can be approved by the receiving replica without coordination with other replicas.
- a CRDT update operation may not be denied. Also, concerning CRDT update operations, there can be multiple writers updating a tuple at the same time.
- a strong consistency update operation can involve the primary replica. Strong consistency can be guaranteed through serialization. Concerning strong consistency update operations, there can be a single writer updating a tuple, and such a tuple can be considered as the primary replica.
- the replica control module 112 is a functional module configured to perform concurrency control in connection with the MC 3 functionalities discussed herein.
- the coordinator 110 can be configured as a functional module that facilitates strong consistency update request processing associated with the concurrency control functions performed by the replica control module 112.
- a coordinator instance repackages and forwards a strong consistency update request to one or more primary replicas of the relevant tuples.
- a coordinator instance can be run on any server of the key-value store.
- coordinator 110 when coordinator 110 receives a txn_read() request, it repackages and forwards the request to the primary replica of the relevant tuple.
- the coordinator receives a txn_commit() request that comprises the read set and write set tuples, it subdivides the read set and write set tuples according to the locations of their respective primary replicas and then repackages and forwards the subsets of read set and write set tuples to the respective servers of the store that manage the subsets of primary replicas. After receiving strong consistency validation results from the respective servers, the coordinator informs the respective servers of the combined approval or denial decision.
- the coordinator facilitates a two-phase commit (2PC) protocol.
- 2PC two-phase commit
- the coordinator informs the respective primary replicas (e.g., primary replica 208) to release their relevant resources such as locks.
- txn_abort() can be implemented as txn_commit() with forcing a denial decision.
- coordinator 110 If coordinator 110 receives atxn_start() or txn_write() request, it does not forward it to any other replica because the request will only affect a local resource. Specifically, a txn_write() request adds a relevant tuple to a locally-buffered write set of the relevant transaction.
- coordinator 110 can be used to implement retries for a transaction that is denied due to contention.
- the coordinator 110 is used for initiating transactional operations such as read, write, and end operation requests and handling responses to the requests.
- Each transaction is identified by a unique transaction identifier (ID).
- a read operation should contain the transaction ID and at least one key, while a write operation should contain the transaction ID and at least one key and corresponding value.
- An end operation requests a commit of the transaction.
- Each response, from other components of the datastore system 100, should indicate an acceptance or a rejection of the operation requested. A rejection should cause an abort of the transaction.
- the acceptance of a read or write operation indicates that coordinator 110 may move on to the next operation.
- the acceptance of a commit operation indicates that the transaction has been validated and serialized and all written data items are stored.
- a key-value tuple (e.g., the data structure associated with primary replica 208) can be interpreted as having three components - a key 214, a version number 216, and a CRDT state 218.
- a tuple is a named instance of a CRDT, named by the key.
- a tuple can be characterized by a CRDT, and a default CRDT can be a last-write-wins type.
- Version number 216 describes the version of the CRDT state 218. Version number 216 facilitates strong consistency enforcement through a merge() function. In some aspects, the primary replica 208 can increment version number 216, and it does so when a strong consistency update request is approved.
- the merge () function merges a received replica value into the local replica value. If the version numbers are different, the replica value with a higher version number becomes the local replica value. Otherwise, the local replica executes its CRDT update function with the received replica CRDT state. In other words, incrementing the version number enables the CRDT state to go against its CRDT monotonicity. When the version number is unchanged, the CRDT state 218 moves in a monotonically non-decreasing fashion.
- An example data structure in MC 3 processing is related to replica locking.
- the distributed datastore system 100 uses a hash table 140 to implement MC 3 locks.
- Each storage node can be configured to maintain one hash table for all tuple replicas it manages.
- the key to the hash table 140 can be the same key to a tuple.
- the absence of an entry in the hash table 140 indicates the absence of locks for the tuple replica. The presence of an entry indicates the tuple replica is being locked potentially.
- An entry of the hash table 140 comprises an entry number 202, a write lock (reference count) 204, a read lock reference count 206, and an incremental update summary 207.
- the write lock is exclusive, and the read lock is shared. Therefore, the write lock reference count 204 may use one bit, while the read lock reference count 206 may need a reference count, e.g., a 31 -bit or large enough number to avoid overflow because a read lock held is not to block a subsequent read lock request.
- a minimum transaction identifier (not illustrated in FIG. 2) can be associated with a hash table entry.
- Each transaction uses a transaction identifier (e.g., a globally unique number), to identify itself.
- the transaction identifier can be constructed using a timestamp followed by a replica identifier.
- the entry’s minimum transaction identifier stores the minimum transaction identifier of the concurrent transactions holding the read lock.
- a write lock held denies a subsequent write lock request or a subsequent read lock request.
- a read lock held allows subsequent read lock requests.
- a read lock held does not deny a subsequent write lock request whose associated transaction identifier is less than or equal to the minimum transaction identifier.
- a replica is considered as completely unlocked or not locked when all locks have been released, i.e., both the write lock reference count and the read lock reference count are zero.
- the incremental update summary 207 is a cache or a pointer to a cache.
- the cache stores the merged result of all CRDT update requests received from an application after the replica has been locked.
- its incremental update summary is initialized with the initial CRDT value, which is specific to each CRDT.
- the initial value for its minimum transaction identifier is the maximum possible value it can take.
- the hash table entry may also comprise a pointer to the tuple replica in the store so that a look-up into the store can be avoided.
- FIG. 3 is a diagram of MC 3 -related data and event flows 300 performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
- the MC 3 -related data and event flows 300 can include the transaction client 302 (which can be the same as the at least one transaction client 120), coordinator 304 (which can be the same as coordinator 110), a first primary replica 306 (e.g., a tuple with key “X”, also referred to as X primary replica 306), additional replica 308 (also referred to as X replica 308), and a second primary replica 310 (e.g., another tuple with key “Y”, also referred to as Y primary replica 310).
- the MC 3 -related data and event flows 300 are performed by the replica control module 112.
- the MC 3 -related data and event flows 300 include three phases of locking - a read lock expansion phase 312, a write lock expansion phase 314, and a lock shrinkage phase 316. Solving a multi-replica multi-sharded transaction (with concurrent eventual consistency updates on any replica) problem is different from solving a single-replica multi-sharded transaction problem. While the MC 3 -related data and event flows 300 utilize the three-phase locking approach, there are additional configurations that make it more than a case of a rigorous 2PL.
- MC 3 executes the read lock expansion phase 312 before txn_commit() (e.g., commit request 318), and the write lock expansion phase 314 and the lock shrinkage phase 316 after the commit request 318.
- replica-level consistency can be differentiated from tuple-level consistency.
- Replica-level consistency is about the consistency of all replicas of the same tuple.
- Tuple-level consistency is about the consistency of all tuples in the same transaction.
- Having the read lock expansion phase 312 early is to facilitate replica-level consistency.
- a read lock on a tuple is applied to all replicas of the tuple. Read locks on replicas freeze the replica values to enable the primary replica to account for all prior CRDT updates contributing to the transaction and come up with a merged, frozen CRDT state of the version number to enable tuple-level consistency enforcement.
- txn_start() is executed to begin a transaction.
- a txn_read(“X”) is executed.
- the read lock expansion phase 312 is triggered by txn_read() requests.
- the read request is forwarded to the X primary replica 306.
- read lock requests are sent to other replicas (e.g., replica 308).
- a read lock is grabbed, a reply with the locked value is generated, gossips are ignored, and the incremental update summary is maintained.
- the X primary replica 306 grabs a read lock, merges the replica values, replies with the merged value, ignores gossips and maintains the incremental update summary.
- coordinator 304 replies with the read value.
- the transaction client 302 returns the txn_read(“X”) call and caches the value in a buffered read set.
- txn_write() calls on tuples X and Y are executed over a buffered write set.
- txn_commit() (e.g., commit request 318) is executed to request a commit on the write set given the read set.
- the X primary replica 306 and the Y primary replica 310 are contacted.
- a validation that the read set value is the same as the replica’s current value is performed, the write lock is grabbed (or activated), and a reply with the validation result is communicated.
- the Y primary replica 310 grabs the write lock and replies with the validation result.
- the validation result is cached, and waiting for all validation results follows.
- the final decision is derived based on all validation results, and the decision is communicated to the primary replicas.
- the replica value is replaced with the write set value, the version number is incremented, the read lock and the write lock are released, a merge with the incremental update summary is performed, and the value and the lock release request are sent to other replicas.
- the replica value is replaced with the received value, including the version, the read lock is released, a merge with the incremental update summary is performed, and an acknowledgment is sent to the primary replica.
- a reply is communicated to coordinator 304.
- wait for all replies follows.
- the replica value is replaced with the write set value, the version is incremented, the write lock is released, a merge with the incremental update summary is performed, and a reply is communicated to coordinator 304.
- a reply is communicated to the application when all replies are received.
- the txn_commit() call is returned with the decision.
- the read lock expansion phase 312 is triggered by txn_read() requests.
- a coordinator 304 receives a txn_read() request on a tuple and forwards it to the primary replica of the tuple.
- the primary replica sends the read lock requests to all replicas of the tuple.
- the primary replica 306 merges the CRDT values into its local CRDT value and grabs its read lock, by creating a hash table entry (e.g., in hash table 140) if necessary, and incrementing the entry’s read lock reference count.
- the primary replica’s CRDT value is frozen until the primary replica is completely unlocked.
- the primary replica replies to the request sender with the frozen CRDT value. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary.
- a non-primary replica When a non-primary replica receives a read lock request, it grabs its read lock, by creating a hash table entry if necessary, and incrementing the entry’s read lock reference count. The replica’s value is frozen until the replica is completely unlocked. The replica replies to the request sender with its frozen CRDT value. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary.
- the read lock expansion phase 312 is marked by read lock requests on all replicas of the tuple in a txn_read() request.
- a transaction may involve more than one txn_read() request.
- the write lock expansion phase 314 is triggered by a txn_commit() request 318.
- a coordinator 304 receives a txn_commit() request on the write set and read set tuples and forwards it to the primary replica of each tuple.
- the primary replica executes a strong consistency validation procedure. It verifies whether it is one of the read set tuples and, if so, whether the tuple value in the read set is in agreement with the tuple replica’s current value. For example, a difference in the version numbers indicates that the earlier-read tuple value stored in the read set has been outdated and the transaction commit request is to be denied.
- the primary replica checks whether it is one of the write set tuples. If so, it issues a write lock request (by creating a hash table entry if necessary and incrementing the entry’s write lock reference count). If the write lock request is denied due to contention, the current transaction commit request is to be denied. If the write lock request is approved, the replica’s value is now frozen, if it has not been frozen by a prior read lock request. Then, the primary replica replies to coordinator 304. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary. [0097] Coordinator 304 may see replies from all relevant primary replicas of the read set and write set tuples to derive a final commit request approval or denial decision.
- the lock shrinkage phase 316 is triggered by the final approval or denial decision.
- a primary replica When a primary replica is aware of the decision (sometimes, it knows the decision ahead of the coordinator because a local denial will lead to a final denial), it first checks whether it is one of the write set tuples and the commit request is approved. If so, it updates its tuple value with the value in the write set and sends the new CRDT state and the new version number to all replicas of the tuple. Then, it releases its write lock. It releases its read lock and informs other replicas to release their read locks if it is one of the read set tuples. After receiving all acknowledgments from other replicas that their replica values are updated, the primary replica replies to the coordinator.
- a non-primary replica When a non-primary replica receives a tuple value update message from the primary replica due to the commit request approval, it replaces its local value with the received value. When it receives a read lock release message from the primary replica, it releases its read lock by decrementing the read lock reference count.
- FIG. 4 - FIG. 10 illustrate example pseudocode associated with MC3 -related functionalities performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
- the MC3 algorithm with respect to a replica is described in pseudocode 400, 500, 600, 700, 800, 900, and 1000 in corresponding FIG. 4, FIG. 5, FIG. 6, FIG. 7, FIG. 8, FIG. 9, and FIG. 10.
- Initial setup and initialization are provided by lines 1-11 of pseudocode 400 in FIG. 4.
- the read lock expansion phase is covered in lines 12-36 of pseudocode 500 in FIG. 5.
- the write lock expansion phase is handled in lines 37-52 of pseudocode 600 in FIG. 6.
- the lock shrinkage phase is described by lines 53-81 of pseudocode 700 in FIG. 7.
- a received gossip message is ignored if a replica is locked, as shown in lines 86-88 of pseudocode 900 in FIG. 9.
- the primary replica if the primary replica has already issued read lock requests to other replicas, it does not need to do it again on the next txn_read().
- the primary replica tracks the actual read lock reference count, and when it is completely unlocked, it asks other replicas to release all their locks. Similarly, the primary replica tracks the write lock on behalf of the other replicas. The optimization reduces the number of coordination messages.
- the hash table entry stores the minimum transaction identifier of the transactions holding the read lock.
- a write lock request is accepted when its associated transaction identifier is smaller or equal to the minimum transaction identifier.
- the version of the replica is incremented (e.g., in line 49 of pseudocode 600 in FIG. 6) regardless of whether the transaction that holds the write lock will be approved or not. Incrementing the version will cause the transactions that have acquired the read lock before the write lock request to be aborted at their validation check.
- a read lock held may approve a write lock request, and the write lock approved denies a subsequent read lock request. That is beneficial because the optimization has low overhead as it requires no tracking of all transaction identifiers of the read lock holders. Yet, the optimization helps increase concurrency, reduce starvation, and enforce the isolation guarantee for inter-dependent multi-sharded transactions by cutting some edges in transaction dependency graphs such that there is no cycle in the graphs, at the cost of denying the transactions associated with the cut edges.
- the version is incremented by more than one, e.g., 10 (e.g., line 56 of pseudocode 700 in FIG. 7) when the commit request is approved.
- 10 e.g., line 56 of pseudocode 700 in FIG. 7
- the version mismatch check e.g., lines 44-45 of pseudocode 600 in FIG. 6
- the version difference should be within 10, and the commit request may be retried, relocking the read set and write set without restarting the transaction and rereading the replica value.
- the primary replica’s value can be included in the lockRelease() message so that the receiving replicas can have a synchronized value before releasing their locks.
- a client is assumed to have a sticky session to a replica, and the client-affined replica acts as a coordinator of a transaction initiated by the client.
- the client-affined replica acts as a coordinator of a transaction initiated by the client.
- the disclosed MC3 -related functionalities provide the serializability guarantee to concurrent transactions amidst concurrent CRDT update requests. Such a guarantee can be provided because MC3 locking freezes all replicas of a tuple, and that enables analyzing its concurrency control from the primary replica’s viewpoint.
- a read lock held denies a write lock request, and a write lock held denies a read lock request. If MC3 implements the same locking functional behavior, MC3 can be configured as a variation of a special case of Rigorous 2PL.
- a more advantageous implementation of the locking functional behavior for MC3 can be configured that allows a read lock held to approve a write lock request whose associated transaction identifier is smaller or equal to the minimum transaction identifier of the read lock.
- the above optimization still guarantees serializability because there is a strong consistency validation check at the commit request time.
- the validation check uses read set information to detect whether a read set value could have been outdated, causing a transaction commit request denial.
- the first transaction may have held the read lock earlier than the second transaction that gets the write lock, passes its validation check, and commits and changes the replica value.
- the first transaction goes through its validation check, it receives a denial because of the version difference.
- 2PL does not have the read set information and the validation check and, therefore, requires a read lock held to deny a write lock request.
- the optimization can be interpreted as a 2PL wound-wait scheme.
- the transaction with a lower transaction identifier is considered to have a higher priority among contentious transactions.
- a write lock request can be approved for the highest-priority transaction among the contentious transactions having acquired the read lock. The approval of the write lock request will cause those contentious transactions to abort due to version mismatch during their validation check (for brevity and clarity, the algorithm in FIG. 4 - FIG. 10 describes how the wound part is implemented and that the wait part is just no-wait.)
- MC3 can be configured based on a statebased CRDT model and guarantees serializability for strong consistency updates.
- the Bayou protocol is presented over an operation-based CRDT model, and serializability (or sequential consistency or linearizability, depending on one’s perspective) is guaranteed through total order broadcast (TOB).
- TOB total order broadcast
- MC3 results in one sequence of weak and strong consistency updates relative to the CRDT states resulting from the prior updates, while the Bayou protocol results in a different sequence of weak and strong consistency updates based on the order established by TOB.
- MC3 uses locking and associated frozen CRDT states to allow an approved transaction to be serialized back in time, that is, sequencing the CRDT updates during the locking period to follow the approved transaction.
- Bayou takes a time stamp ordering approach to provide the mixed consistency guarantee while MC3 takes a 2PL approach. Both provide correct but different results.
- MC 3 has some optimistic concurrency control (OCC) characteristics. Firstly, it defers the write lock expansion phase until the validation check when the application intends to commit the transaction, so it allows more transactions to do txn_read() concurrently. Secondly, regarding making write lock requests, MC 3 can know the write set tuples and sort them before the write lock expansion phase. It is because the txn_write() tuples can be buffered in the write set and sorted before the txn_commit() is invoked. As a result, two transactions that make write lock requests on the same tuples will not get into a deadlock as their write sets are sorted using the same criteria.
- OCC optimistic concurrency control
- MC 3 The second advantage of MC 3 is being programmer-friendly. Conservative 2PL can avoid deadlocks or starvation better than Rigorous 2PL can, but the former requires an application to declare the read set and write set tuples at the beginning of a transaction. The requirement results in an awkward programming model. In contrast, MC 3 defers the write lock expansion phase after txn_commit(), and, by then, the application has declared the read set and write set tuples through the natural usage of the transaction operation interfaces.
- MC 3 can be designed for applications that take advantage of high-throughput eventual consistency updates. If eventual consistency updates were disallowed or never used, there would be no need for locking on the non-primary replicas, and some unnecessary handshakes between the primary replica and non-primary replicas could be skipped.
- the key advantage of MC 3 is enabling applications to mix concurrent eventual consistency updates and strong consistency updates while ensuring that the effect of each update is applied once.
- MC 3 enables mixed consistency on a distributed key-value store, and such a system exhibits the FEC behavior inferred by the Bayou theorems. While some applications may benefit from the concurrency of the FEC behavior, some may not handle it well. The latter may prefer a trade-off: removing the FEC behavior at the cost of delayed responses to the CRDT update requests amidst unfinished strong consistency update requests.
- an MC 3 store can be adapted to provide such a non-FEC version.
- the FEC behavior is due to the treatment of the hash table entry’s incremental update summary.
- an ordinary key-value store get() request returns the replica’s frozen CRDT state merged with the incremental update summary.
- the order of operation requests, observed through get() appears to have changed.
- the store can respond to the get() request with the replica’s frozen CRDT state, without merging the incremental update summary. Also, the response to any CRDT update request, crdt_update(), is delayed until the replica is completely unlocked. In other words, while a replica is locked, any CRDT update request is blocked by the unfinished strong consistency update requests.
- FIG. 11 is a flowchart of method 1100 suitable for updating a plurality of replicated data structures of a distributed datastore system, according to some example embodiments.
- Method 1100 includes operations 1102, 1104, 1106, 1108, 1110, 1112, 1114, 1116, 1118, 1120, and 1122.
- a replica control module e.g., replica control module 112 in FIG. 1, replica control module 1262 in FIG. 12, or replica control module 1365 in FIG. 13
- a coordinator e.g., coordinator 110 in FIG. 1, coordinator 1260 in FIG. 12, or coordinator 1360 in FIG. 13
- method 1100 is based on the MC 3 - related data and event flows 300 and its corresponding three phases discussed in connection with FIG. 3 - the read lock expansion phase 312, the write lock expansion phase 314, and the lock shrinkage phase 316.
- At operation 1102 at least one read request for one or more keyvalue tuples of replicated data structures is decoded.
- Each key-value tuple of the one or more key-value tuples includes a key and a corresponding conflict-free replicated datatype (CRDT) value.
- CRDT conflict-free replicated datatype
- replicas of each key- value tuple of the one or more key-value tuples in the at least one read request are locked to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
- the locked CRDT values of the read-locked replicas are merged into a merged CRDT value onto a read set in response to the at least one read request.
- a commit request (e.g., commit request 318 in FIG. 3) to commit a write set of at least one key-value tuple is decoded.
- the commit request includes the read set.
- a write lock of the at least one key-value tuple of the write set is activated to obtain write-locked replicas.
- the commit request is accepted when the validating and the activating are successful.
- the read-locked replicas and the write-locked replicas are unlocked after the commit request is processed resulting in unlocked replicas.
- the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas is merged to a current CRDT value of the unlocked replica.
- a read request for the plurality of replicated data structures is decoded.
- the read request is executed at operation B in FIG. 3.
- a read set is generated in response to the read request.
- the read set includes a merged conflict-free replicated data type (CRDT) value based on a plurality of CRDT values associated with the plurality of replicated data structures (e.g., replicas 306 and 308).
- CRDT conflict-free replicated data type
- the generation of the read set is performed in connection with operations C-H in FIG. 3.
- a commit request to commit a write set of CRDT values to the plurality of replicated data structures based on the read set is decoded.
- commit request 318 is executed at operation J in FIG. 3.
- a subset of the plurality of CRDT values associated with a subset of the plurality of replicated data structures is validated using the read set and the write set. For example, the validation is performed in connection with the write lock expansion phase 314 and operations K-0 in FIG. 3.
- the commit request is processed based on the result of the validation. For example, processing of the commit request is performed during the lock shrinkage phase 316 and operations O-V in FIG. 3.
- the plurality of replicated data structures includes a plurality of key-value tuples.
- Each keyvalue tuple of the plurality of key-value tuples includes a key and a corresponding CRDT value of the plurality of CRDT values.
- generating the read set further includes forwarding the read request to the subset of the plurality of replicated data structures (e.g., at operation C in FIG. 3).
- the subset includes at least one primary replica of the data structures (e.g., X primary replica 306).
- generating the read set further includes generating by the at least one primary replica, one or more read locks based on the read request, and forwarding the one or more read locks to a remaining subset of the plurality of replicated data structures (e.g., X replica 308). [0140] In some aspects, generating the read set further includes retrieving one or more of the plurality of CRDT values from the remaining subset of the plurality of replicated data structures after the one or more read locks are initiated, and merging the one or more of the plurality of CRDT values retrieved from the remaining subset of the plurality of replicated data structures with a CRDT value of the at least one primary replica to generate the merged CRDT value (e.g., operations E-G in FIG. 3).
- one or more hash table entries are generated in a hash table (e.g., hash table 140).
- the hash table is stored in a storage node of the distributed datastore system.
- the storage node stores the subset of the plurality of replicated data structures.
- a read lock reference count 206 is incremented for each of the one or more hash table entries.
- the read lock reference count 206 indicates a count of read locks initiated for a corresponding one of the subset of the plurality of replicated data structures.
- a transaction write call is executed on the plurality of replicated data structures (e.g., at operation I in FIG. 3).
- the transaction write call is executed before the commit request and is associated with the write set of CRDT values.
- the subset of the plurality of replicated data structures includes primary replicas of the data structures (e.g., primary replicas 306 and 310).
- Validating the subset of the plurality of CRDT values further includes validating at a first primary replica of the primary replicas.
- a value of the read set is equal to a corresponding CRDT value of the subset of the plurality of CRDT values that is associated with the first primary replica.
- a write lock of the corresponding CRDT value associated with the first primary replica is initiated based on a successful validation that the value of the read set is equal to the corresponding CRDT value of the subset.
- a first validation result of the write lock of the corresponding CRDT value associated with the first primary replica is generated.
- At least a second validation result of a write lock of a corresponding CRDT value associated with a second primary replica of the primary replicas is generated.
- the subset of the plurality of CRDT values associated with the subset of the plurality of replicated data structures is validated based on successful validations indicated by the first validation result and at least the second validation result.
- a CRDT value of the subset of the plurality of CRDT values associated with the subset of the plurality of replicated data structures is replaced with a CRDT value of the write set. forwarding a lock release and the CRDT value of the write set to at least one replicated data structure of a remaining set of replicated data structures. A CRDT value of the at least one replicated data structure is replaced with the CRDT value of the write set. A write lock at the at least one replicated data structure is released based on the lock release.
- FIG. 12 is a block diagram illustrating a representative software architecture 1200, which may be used in conjunction with various device hardware described herein, according to some example embodiments.
- FIG. 12 is merely a non-limiting example of software architecture 1202 and it will be appreciated that many other architectures may be implemented to facilitate the functionality described herein.
- the software architecture 1202 may be executed on hardware such as computing device 1300 of FIG. 13 that includes, among other things, processor 1305, memory 1310, storage 1315 and 1320, and I/O components (or interfaces) 1325 and 1330.
- a representative hardware layer 1204 is illustrated and can represent, for example, the computing device 1300 of FIG. 13.
- the representative hardware layer 1204 comprises one or more processing units 1206 having associated executable instructions 1208.
- Executable instructions 1208 represent the executable instructions of the software architecture 1202, including the implementation of the methods, modules, and so forth of FIGS. 1-11.
- Hardware layer 1204 also includes memory and/or storage modules 1210, which also have executable instructions 1208.
- Hardware layer 1204 may also comprise other hardware 1212, which represents any other hardware of the hardware layer 1204, such as the other hardware illustrated as part of computing device 1300.
- the software architecture 1202 may be conceptualized as a stack of layers where each layer provides particular functionality.
- the software architecture 1202 may include layers such as an operating system 1214, libraries 1216, frameworks/middleware 1218, applications 1220, and presentation layer 1244.
- the applications 1220 and/or other components within the layers may invoke application programming interface (API) calls 1224 through the software stack and receive a response, returned values, and so forth illustrated as messages 1226 in response to the API calls 1224.
- API application programming interface
- the layers illustrated in FIG. 12 are representative in nature and not all software architectures 1202 have all layers. For example, some mobile or special-purpose operating systems may not provide frameworks/middleware 1218, while others may provide such a layer. Other software architectures may include additional or different layers.
- the operating system 1214 may manage hardware resources and provide common services.
- the operating system 1214 may include, for example, a kernel 1228, services 1230, and drivers 1232.
- the kernel 1228 may act as an abstraction layer between the hardware and the other software layers. For example, kernel 1228 may be responsible for memory management, processor management (e.g., scheduling), component management, networking, security settings, and so on.
- Services 1230 may provide other common services for the other software layers.
- the drivers 1232 may be responsible for controlling or interfacing with the underlying hardware.
- the drivers 1232 may include display drivers, camera drivers, Bluetooth® drivers, flash memory drivers, serial communication drivers (e.g., Universal Serial Bus (USB) drivers), Wi-Fi® drivers, audio drivers, power management drivers, and so forth, depending on the hardware configuration.
- serial communication drivers e.g., Universal Serial Bus (USB) drivers
- USB Universal Serial Bus
- Wi-Fi® drivers audio drivers
- power management drivers and so forth, depending on the hardware configuration.
- the libraries 1216 may provide a common infrastructure that may be utilized by the applications 1220 and/or other components and/or layers.
- the libraries 1216 typically provide functionality that allows other software modules to perform tasks more easily than to interface directly with the underlying operating system 1214 functionality (e.g., kernel 1228, services 1230, and/or drivers 1232).
- the libraries 1216 may include system libraries 1234 (e.g., C standard library) that may provide functions such as memory allocation functions, string manipulation functions, mathematic functions, and the like.
- libraries 1216 may include API libraries 1236 such as media libraries (e.g., libraries to support presentation and manipulation of various media formats such as MPEG4, H.264, MP3, AAC, AMR, JPG, PNG), graphics libraries (e.g., an OpenGL framework that may be used to render 2D and 3D in a graphic content on a display), datastore libraries (e.g., SQLite that may provide various relational datastore functions), web libraries (e.g., WebKit that may provide web browsing functionality), and the like.
- the libraries 1216 may also include a wide variety of other libraries 1238 to provide many other APIs to the applications 1220 and other software components/modules.
- the frameworks/middleware 1218 may provide a higher-level common infrastructure that may be utilized by the applications 1220 and/or other software components/modules.
- the frameworks/middleware 1218 may provide various graphical user interface (GUI) functions, high-level resource management, high-level location services, and so forth.
- GUI graphical user interface
- the frameworks/middleware 1218 may provide a broad spectrum of other APIs that may be utilized by the applications 1220 and/or other software components/modules, some of which may be specific to a particular operating system 1214 or platform.
- the applications 1220 include built-in applications 1240 and/or third-party applications 1242.
- built-in applications 1240 may include but are not limited to, a contacts application, a browser application, a book reader application, a location application, a media application, a messaging application, and/or a game application.
- Third-party applications 1242 may include any of the built-in applications 1240 as well as a broad assortment of other applications.
- the third-party application 1242 e.g., an application developed using the AndroidTM or iOSTM software development kit (SDK) by an entity other than the vendor of the particular platform
- the third-party application 1242 may be mobile software running on a mobile operating system such as iOSTM, AndroidTM, Windows® Phone, or other mobile operating systems.
- the third-party application 1242 may invoke the API calls 1224 provided by the mobile operating system such as operating system 1214 to facilitate the functionality described herein.
- applications 1220 include the coordinator 1260 and replica control module 1262 used in connection with the disclosed MC 3 functionalities.
- Coordinator 1260 and replica control module 1262 can be the same as (and perform the same functionalities as) coordinator 110 and replica control module 112 discussed in connection with FIG. 1 - FIG. 11.
- the applications 1220 may utilize built-in operating system functions (e.g., kernel 1228, services 1230, and/or drivers 1232), libraries (e.g., system libraries 1234, API libraries 1236, and other libraries 1238), and frameworks/middleware 1218 to create user interfaces to interact with users of the system.
- built-in operating system functions e.g., kernel 1228, services 1230, and/or drivers 1232
- libraries e.g., system libraries 1234, API libraries 1236, and other libraries 1238
- frameworks/middleware 1218 e.g., frameworks/middleware 1218 to create user interfaces to interact with users of the system.
- interactions with a user may occur through a presentation layer, such as presentation layer 1244.
- the application/module “logic” can be separated from the aspects of the application/module that interact with a user.
- Some software architectures utilize virtual machines. In the example of FIG. 12, this is illustrated by virtual machine 1248.
- a virtual machine creates a software environment where applications/modules can execute as if they were executing on a hardware machine (such as the computing device 1300 of FIG. 13, for example).
- a virtual machine 1248 is hosted by a host operating system (operating system 1214 in FIG. 12) and typically, although not always, has a virtual machine monitor 1246, which manages the operation of the virtual machine 1248 as well as the interface with the host operating system (i.e., operating system 1214).
- Software architecture 1202 executes within the virtual machine 1248 such as an operating system 1250, libraries 1252, frameworks/middleware 1254, applications 1256, and/or presentation layer 1258. These layers of software architecture executing within the virtual machine 1248 can be the same as the corresponding layers previously described or may be different.
- FIG. 13 is a block diagram illustrating circuitry for a device that implements algorithms and performs methods, according to some example embodiments. All components need not be used in various embodiments. For example, clients, servers, and cloud-based network devices may each use a different set of components, or in the case of servers, larger storage devices.
- One example computing device in the form of a computer 1300 may include a processor 1305, memory 1310, removable storage 1315, non-removable storage 1320, input interface 1325, output interface 1330, and communication interface 1335, all connected by a bus 1340.
- a processor 1305 may include a processor 1305, memory 1310, removable storage 1315, non-removable storage 1320, input interface 1325, output interface 1330, and communication interface 1335, all connected by a bus 1340.
- the example computing device is illustrated and described as the computer 1300, the computing device may be in different forms in different embodiments.
- Memory 1310 may include volatile memory 1345 and nonvolatile memory 1350 and may store a program 1355.
- the computer 1300 may include - or have access to a computing environment that includes - a variety of computer-readable media, such as the volatile memory 1345, the non-volatile memory 1350, the removable storage 1315, and the non-removable storage 1320.
- Computer storage includes random-access memory (RAM), read-only memory (ROM), erasable programmable read-only memory (EPROM) and electrically erasable programmable read-only memory (EEPROM), flash memory or other memory technologies, compact disk read-only memory (CD ROM), digital versatile disks (DVD) or other optical disk storage, magnetic cassettes, magnetic tape, magnetic disk storage or other magnetic storage devices, or any other medium capable of storing computer-readable instructions.
- RAM random-access memory
- ROM read-only memory
- EPROM erasable programmable read-only memory
- EEPROM electrically erasable programmable read-only memory
- flash memory or other memory technologies
- compact disk read-only memory (CD ROM), digital versatile disks (DVD) or other optical disk storage magnetic cassettes, magnetic tape, magnetic disk storage or other magnetic storage devices, or any other medium capable of storing computer-readable instructions.
- Computer-readable instructions stored on a computer-readable medium are executable by the processor 1305 of the computer 1300.
- a hard drive, CD-ROM, and RAM are some examples of articles including a non-transitory computer-readable medium such as a storage device.
- the terms “computer-readable medium” and “storage device” do not include carrier waves to the extent that carrier waves are deemed too transitory.
- “Computer-readable non-transitory media” includes all types of computer-readable media, including magnetic storage media, optical storage media, flash media, and solid-state storage media. It should be understood that software can be installed on and sold with a computer.
- the software can be obtained and loaded into the computer, including obtaining the software through a physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator.
- the software can be stored on a server for distribution over the Internet, for example.
- the terms “computer-readable medium” and “machine-readable medium” are interchangeable.
- the program 1355 may utilize modules discussed herein, such as coordinator 1360 and replica control module 1365, which can be the same as (and perform the same functionalities as) the coordinator 110 and replica control module 112 discussed in connection with FIG. 1 - FIG. 11.
- Any one or more of the modules described herein may be implemented using hardware (e.g., a processor of a machine, an applicationspecific integrated circuit (ASIC), field-programmable gate array (FPGA), or any suitable combination thereof). Moreover, any two or more of these modules may be combined into a single module, and the functions described herein for a single module may be subdivided among multiple modules. Furthermore, according to various example embodiments, modules described herein as being implemented within a single machine, datastore, or device may be distributed across multiple machines, datastores, or devices.
- hardware e.g., a processor of a machine, an applicationspecific integrated circuit (ASIC), field-programmable gate array (FPGA), or any suitable combination thereof.
- ASIC applicationspecific integrated circuit
- FPGA field-programmable gate array
- the disclosed functionalities can be performed by one or more separate (or dedicated) modules included in the replica control module 1365 and integrated as a single module, performing the corresponding functions of the integrated module.
- software including one or more computer-executable instructions that facilitate processing and operations as described above concerning any one or all of the steps of the disclosure can be installed in and sold with one or more computing devices consistent with the disclosure.
- the software can be obtained and loaded into one or more computing devices, including obtaining software through a physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator.
- the software can be stored on a server for distribution over the Internet, for example.
- the components of the illustrative devices, systems, and methods employed in accordance with the illustrated embodiments can be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or computer hardware, firmware, software, or combinations of them. These components can be implemented, for example, as a computer program product such as a computer program, program code, or computer instructions tangibly embodied in an information carrier, or a machine-readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
- a computer program can be written in any form of programming language, including compiled or interpreted languages, and it can be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or another unit suitable for use in a computing environment.
- a computer program can be deployed to be executed on one computer or multiple computers at one site or distributed across multiple sites and interconnected by a communication network.
- functional programs, codes, and code segments for accomplishing the techniques described herein can be easily construed as within the scope of the claims by programmers skilled in the art to which the techniques described herein pertain.
- Method steps associated with the illustrative embodiments can be performed by one or more programmable processors executing a computer program, code, or instructions to perform functions (e.g., by operating on input data and/or generating an output). Method steps can also be performed, and the apparatus for performing the methods can be implemented as, special -purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC (application-specific integrated circuit), for example.
- FPGA field programmable gate array
- ASIC application-specific integrated circuit
- DSP digital signal processor
- a general-purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine.
- a processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
- processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any one or more processors of any kind of digital computer.
- a processor will receive instructions and data from a read-only memory or a random-access memory, or both.
- the required elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data.
- a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks.
- Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, and CD-ROM and DVD-ROM disks).
- semiconductor memory devices e.g., electrically programmable read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, and CD-ROM and DVD-ROM disks).
- EPROM electrically programmable read-only memory
- EEPROM electrically erasable programmable ROM
- flash memory devices e.g., electrically erasable
- machine-readable medium means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, randomaccess memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., Erasable Programmable Read-Only Memory (EEPROM)), and/or any suitable combination thereof.
- RAM randomaccess memory
- ROM read-only memory
- buffer memory flash memory
- optical media magnetic media
- cache memory other types of storage
- EEPROM Erasable Programmable Read-Only Memory
- machine-readable medium should be taken to include a single medium or multiple media (e.g., a centralized or distributed datastore, or associated caches and servers) able to store processor instructions.
- machine -readable medium shall also be taken to include any medium (or a combination of multiple media) that is capable of storing instructions for execution by one or more processors 1305, such that the instructions, when executed by one or more processors 1305, cause the one or more processors 1305 to perform any one or more of the methodologies described herein. Accordingly, a “machine-readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine- readable medium” as used herein excludes signals per se.
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Data Mining & Analysis (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
A computer-implemented method for updating a plurality of replicated data structures of a distributed datastore system includes decoding at least one read request for one or more key-value tuples of the replicated data structures. Each key-value tuple of the one or more key-value tuples includes a key and a corresponding conflict-free replicated data type (CRDT) value. Replicas of each key-value tuple of the one or more key-value tuples in the at least one read request are locked to obtain locked CRDT values in read-locked replicas of the one or more key- value tuples. The locked CRDT values of the read-locked replicas are merged into a merged CRDT value onto a read set in response to the at least one read request. An update summary is maintained on each of the read- locked replicas.
Description
MIXED-CONSISTENCY CONCURRENCY CONTROL (MC3) TECHNIQUES
TECHNICAL FIELD
[0001] The present disclosure is related to storing data in a distributed datastore, and in particular to systems and methods for mixed-consistency concurrency control (MC3) configurations for a multi-replica distributed keyvalue store to enable mixing eventual and strong consistency.
BACKGROUND
[0002] A distributed key-value store is a key aspect of a distributed system. By distributing large data sets over multiple servers, applications may benefit from increased processing performance. In datastore systems and transaction processing, concurrency control (CC) schemes interleave read/write requests from multiple clients simultaneously, giving the illusion that each read/write transaction has exclusive access to the data. Distributed concurrency control refers to the concurrency control of a datastore distributed over a communication network. When leveraging data replication, not only is fault tolerance improved but also write performance can be improved using concurrency control techniques.
SUMMARY
[0003] Various examples are now described to introduce a selection of concepts in a simplified form that is further described below in the detailed description. The Summary is not intended to identify key or essential features of the claimed subject matter, nor is it intended to be used to limit the scope of the claimed subject matter.
[0004] According to a first aspect of the present disclosure, there is provided a computer-implemented method for updating a plurality of replicated data structures of a distributed datastore system. The method includes decoding at least one read request for one or more key-value tuples of the replicated data structures. Each key-value tuple of the one or more of key-value tuples includes a key and a corresponding conflict-free replicated data type (CRDT) value. The method further includes locking replicas of each key-value tuple of the one or more of key- value tuples in the at least one read request to obtain locked CRDT
values in read-locked replicas of the one or more key-value tuples. The method further includes merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request. The method further includes maintaining an update summary on each of the read-locked replicas. The method further includes decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set. The method further includes validating whether the read set in the commit request is unchanged from current CRDT values of the read- locked replicas. The method further includes activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas. The method further includes maintaining an update summary on each of the write- locked replicas. The method further includes accepting the commit request when the validating and the activating are successful. The method further includes unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas. The method further includes merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
[0005] In a first implementation form of the method according to the first aspect as such, the method further includes updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked. The method further includes updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
[0006] In a second implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, the accepting of the commit request further includes updating a current CRDT value of each of the write-locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
[0007] In a third implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, the method further includes failing the locking of at least one of the replicas of each
key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction. The method further includes failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
[0008] In a fourth implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, the method further includes ignoring a gossip of a CRDT value to a replica of a keyvalue tuple in the distributed datastore system from another replica if the replica is locked. The method further includes updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
[0009] In a fifth implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
[0010] In a sixth implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, the method further includes incrementing a version number of the at least one keyvalue tuple of the write set when the commit request is accepted.
[0011] In a seventh implementation form of the method according to the first aspect as such or any preceding implementation form of the first aspect, the method further includes denying the commit request when one of the validating or the activating is unsuccessful.
[0012] According to a second aspect of the present disclosure, there is provided a node for updating a plurality of replicated data structures of a distributed datastore system. The node includes a memory storing instructions and at least one processor in communication with the memory. The at least one processor is configured, upon execution of the instructions, to perform operations including decoding at least one read request for one or more keyvalue tuples of the replicated data structures. Each key-value tuple of the one or
more of key- value tuples includes a key and a corresponding conflict-free replicated datatype (CRDT) value. The operations further include locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples. The operations further include merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request. The operations further include maintaining an update summary on each of the read-locked replicas. The operations further include decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set. The operations further include validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas. The operations further include activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas. The operations further include maintaining an update summary on each of the write-locked replicas. The operations further include accepting the commit request when the validating and the activating are successful. The operations further include unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas. The operations further include merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
[0013] In a first implementation form of the node according to the second aspect as such, the operations further include updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked. The operations further include updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
[0014] In a second implementation form of the node according to the second aspect as such or any preceding implementation form of the second aspect, the operations for accepting the commit request include updating a current CRDT value of each of the write-locked replicas of at least one key-
value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
[0015] In a third implementation form of the node according to the second aspect as such or any preceding implementation form of the second aspect, the operations further include failing the locking of at least one of the replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction. The operations further include failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
[0016] In a fourth implementation form of the node according to the second aspect as such or any preceding implementation form of the second aspect, the operations further include ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked. The operations further include updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
[0017] In a fifth implementation form of the node according to the second aspect as such or any preceding implementation form of the second aspect, an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
[0018] In a sixth implementation form of the node according to the second aspect as such or any preceding implementation form of the second aspect, the operations further include incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
[0019] In a seventh implementation form of the node according to the second aspect as such or any preceding implementation form of the second
aspect, the operations further include denying the commit request when one of the validating or the activating is unsuccessful.
[0020] According to a third aspect of the present disclosure, there is provided a non-transitory computer-readable medium storing instructions for updating a plurality of replicated data structures of a distributed datastore system. The instructions when executed by one or more processors of a node, cause the one or more processors to perform operations including decoding at least one read request for one or more key-value tuples of the replicated data structures. Each key-value tuple of the one or more of key-value tuples comprising a key and a corresponding conflict-free replicated data type (CRDT) value. The operations further include locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples. The operations further include merging the locked CRDT values of the read- locked replicas into a merged CRDT value onto a read set in response to the at least one read request. The operations further include maintaining an update summary on each of the read-locked replicas. The operations further include decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set. The operations further include validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas. The operations further include activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas. The operations further include maintaining an update summary on each of the write-locked replicas. The operations further include accepting the commit request when the validating and the activating are successful. The operations further include unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas. The operations further include merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
[0021] In a first implementation form of the non-transitory computer- readable medium according to the third aspect as such, the operations further
include updating a current CRDT value of a replica of a key- value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked. The operations further include updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read- locked or write -locked.
[0022] In a second implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, the operations for accepting the commit request further include updating a current CRDT value of each of the write- locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
[0023] In a third implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, the operations further include failing the locking of at least one of the replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction. The operations further include failing the locking of at least one of the replicas of the at least one key-value tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
[0024] In a fourth implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, the operations further include ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked. The operations further include updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
[0025] In a fifth implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, an unlocked replica of the one or more
key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
[0026] In a sixth implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, the operations further include incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
[0027] In a seventh implementation form of the non-transitory computer- readable medium according to the third aspect as such or any preceding implementation form of the third aspect, the operations further include denying the commit request when one of the validating or the activating is unsuccessful.
[0028] According to a fourth aspect of the present disclosure, there is provided a system for updating a plurality of replicated data structures of a distributed datastore system. The system includes means for decoding at least one read request for one or more key-value tuples of the replicated data structures. Each key-value tuple of the one or more of key-value tuples includes a key and a corresponding conflict-free replicated data type (CRDT) value. The system further includes means for locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples. The system further includes means for merging the locked CRDT values of the read- locked replicas into a merged CRDT value onto a read set in response to the at least one read request. The system further includes means for maintaining an update summary on each of the read-locked replicas. The system further includes means for decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set. The system further includes means for validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas. The system further includes means for activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas. The system further includes means for maintaining an update summary on each of the write-locked replicas. The system further includes means for accepting the commit request when the validating and the activating are successful. The system further
includes means for unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas. The system further includes means for merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
[0029] Anyone of the foregoing examples may be combined with any one or more of the other foregoing examples to create a new embodiment within the scope of the present disclosure.
BRIEF DESCRIPTION OF THE DRAWINGS
[0030] In the drawings, which are not necessarily drawn to scale, like numerals may describe similar components in different views. The drawings illustrate generally, by way of example, but not by way of limitation, various embodiments discussed in the present document.
[0031] FIG. 1 illustrates a distributed datastore system configured with a storage node management module managing storage nodes, which is capable of performing concurrency control and can be used in connection with some example embodiments.
[0032] FIG. 2 is a diagram of storage nodes of the distributed datastore system of FIG. 1 storing data structure replicas and hash tables, which can be used in connection with some example embodiments.
[0033] FIG. 3 is a diagram of MC3 -related data and event flows performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
[0034] FIG. 4 - FIG. 10 illustrate example pseudocode associated with MC3-related functionalities performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
[0035] FIG. 11 is a flowchart of a method suitable for updating a plurality of replicated data structures of a distributed datastore system, according to some example embodiments.
[0036] FIG. 12 is a block diagram illustrating a representative software architecture, which may be used in conjunction with various device hardware described herein, according to some example embodiments.
[0037] FIG. 13 is a block diagram illustrating circuitry for a device that implements algorithms and performs methods, according to some example embodiments.
DETAILED DESCRIPTION
[0038] It should be understood at the outset that although an illustrative implementation of one or more embodiments is provided below, the disclosed systems and/or methods described with respect to FIGS. 1-13 may be implemented using any number of techniques, whether currently known or not yet in existence. The disclosure should in no way be limited to the illustrative implementations, drawings, and techniques illustrated below, including the exemplary designs and implementations illustrated and described herein, but may be modified within the scope of the appended claims along with their full scope of equivalents.
[0039] In the following description, reference is made to the accompanying drawings that form a part hereof, and which are shown, by way of illustration, specific embodiments which may be practiced. These embodiments are described in sufficient detail to enable those skilled in the art to practice the inventive subject matter, and it is to be understood that other embodiments may be utilized, and that structural, logical, and electrical changes may be made without departing from the scope of the present disclosure. The following description of example embodiments is, therefore, not to be taken in a limiting sense, and the scope of the present disclosure is defined by the appended claims.
[0040] The functions or algorithms described herein may be implemented in software in one embodiment. The software may consist of computer-executable instructions stored on computer-readable media or a computer-readable storage device such as one or more non-transitory memories or other types of hardware -based storage devices, either local or networked. Further, such functions correspond to modules, which may be software, hardware, firmware, or any combination thereof. Multiple functions may be
performed in one or more modules as desired, and the embodiments described are merely examples. The software may be executed on a digital signal processor, ASIC, microprocessor, or other types of processor operating on a computer system, such as a personal computer, server, or another computer system, turning such computer system into a specifically programmed machine.
[0041] The functionality can be configured to operate using, for instance, software, hardware, firmware, or the like. For example, the phrase “configured to” can refer to a logic circuit structure of a hardware element that is to implement the associated functionality. The phrase “configured to” can also refer to a logic circuit structure of a hardware element that is to implement the coding design of associated functionality of firmware or software. The term “module” refers to a structural element that can be implemented using any suitable hardware (e.g., a processor, among others), software (e.g., an application, among others), firmware, or any combination of hardware, software, and firmware. The term “logic” encompasses any functionality for performing a task. For instance, each operation illustrated in the flowcharts corresponds to logic for performing that operation. An operation can be performed using, software, hardware, firmware, or the like. The terms “component,” “system,” and the like may refer to computer-related entities, hardware, and software in execution, firmware, or a combination thereof. A component may be a process running on a processor, an object, an executable, a program, a function, a subroutine, a computer, or a combination of software and hardware. The term “processor,” may refer to a hardware component, such as a processing unit of a computer system.
[0042] Furthermore, the claimed subject matter may be implemented as a method, apparatus, or article of manufacture using standard programming and engineering techniques to produce software, firmware, hardware, or any combination thereof to control a computing device to implement the disclosed subject matter. The term “article of manufacture,” as used herein is intended to encompass a computer program accessible from any computer-readable storage device or media. Computer-readable storage media can include but are not limited to, magnetic storage devices, e.g., hard disk, floppy disk, magnetic strips, optical disk, compact disk (CD), digital versatile disk (DVD), smart cards, and
flash memory devices, among others. In contrast, computer-readable media, i.e., not storage media, may additionally include communication media such as transmission media for wireless signals and the like.
[0043] In some aspects, when leveraging data replication, not only is fault tolerance improved but also write performance can be improved with concurrent, coordination-free writers on conflict-free replicated data types (CRDTs).
[0044] For example, in a Not-Only-Structured-Query-Language (NoSQL) data store, the data replicas eventually converge to the same state after the stream of update requests ceases. That is, such a store guarantees eventual consistency. Eventual consistency stores often use CRDTs, which are a specific type of state-based object that is guaranteed to achieve eventual consistency. Each CRDT has a specific update function that is commutative, associative, and idempotent and updates its CRDT state in a monotonically non-decreasing fashion. Due to these features, a CRDT update can be committed to a replica without any performance-limiting coordination among the replicas.
[0045] Distributed datastore clients (e.g., applications) can take advantage of high-performance CRDT updates. However, generic applications often find strong consistency updates indispensable at times. Such strong consistency updates, including invariant-compliant updates and transactions, require coordination among replicas of the same tuple and even coordination among tuples in multiple storage nodes or shards.
[0046] In some aspects, Bayou system studies are associated with theorems that describe the properties of a distributed system that mixes weak consistency updates and strong consistency updates. However, such consistency updates can be associated with the following limitations. Firstly, it can be theoretically impossible to configure a generic system that ensures both eventual consistency guarantee for weak consistency updates and strong consistency guarantee for strong consistency updates. Secondly, it can be possible to mix weak consistency updates and strong consistency updates by admitting temporary update reordering such that there is a strong consistency guarantee for strong consistency updates and fluctuating eventual consistency (FEC) guarantee for weak consistency updates. However, due to the temporary update
reordering, an application client may observe an anomaly such as a change in the update execution order.
[0047] The Bayou system study and theorems can be considered when configuring mixed consistency support. The disclosed MC3-related techniques further contribute to this research topic in two aspects. Firstly, the disclosed MC3-related techniques (which can be part of an MC3 protocol) can be configured using a state-based CRDT model. In comparison, the Bayou protocol is put together over an operation-based CRDT model. There are pros and cons of a state-based CRDT system in comparison to its equivalent operation-based CRDT system. For example, state-based CRDTs need fewer stipulations on the communication channel. Additionally, when a CRDT update occurs, the local state of a replica is updated through its CRDT update function. In some aspects, the full local state can be gossiped to other replicas at a lower rate than the rate of CRDT updates. AnnaDB® is one example of a state-based CRDT key-value store. The disclosed MC3-related techniques can be configured to support mixed consistency over a state-based CRDT system.
[0048] Secondly, the Bayou protocol covers the replica-level consistency, not the tuple-level consistency. That is, it addresses mixed consistency over multiple replicas of one tuple. In some aspects, the disclosed MC3-related techniques address mixed consistency over multiple tuples, each of them having multiple replicas.
[0049] In this regard, the disclosed MC3-related techniques can be used for mixing weak consistency updates and strong consistency updates on a distributed system. Specifically, the disclosed MC3 -related techniques can be applied to a state-based CRDT system where each key-value tuple has multiple replicas, while satisfying the fluctuating eventual consistency (FEC) alluded to by the Bayou theorems. Additionally, the disclosed MC3 -related techniques can be used for a transactional update on multi-sharded replicated key-value tuples, which is not addressed by the Bayou protocol.
[0050] Example distributed concurrency control schemes include two- phase locking (2PL), snapshot isolation (SI), and Read Committed (RC). They are also common centralized concurrency control schemes. Each allows more concurrency, i.e., more schedules of concurrent transactions to be permitted and
hence higher transaction throughput. In some aspects, the disclosed MC3-related techniques can be based on 2PL. However, 2PL is mostly related to singlereplica multi-tuple transaction concurrency control, while MC3 is mostly related to replicated (i.e., having multiple replicas) multi-tuple transaction concurrency control. In some aspects, MC3 can be configured to be coherent with the Bayou theorems for mixing concurrent CRDT updates and non-CRDT updates. The fluctuating eventual consistency (FEC) behavior can be related to the commit- back-in-time of a transaction of tuples, whose incremental CRDT update summaries are not empty. A more detailed description of a distributed datastore system configured to perform the disclosed functionalities is provided in relation to FIG. 1 and FIG. 2. A more detailed description of the MC3 functionalities which can be used for updating a plurality of replicated data structures of a distributed datastore system is provided in connection with FIG. 3 - FIG. 11.
[0051] FIG. 1 illustrates a distributed datastore system 100 configured with a storage node management module managing storage nodes, which is capable of performing concurrency control and can be used in connection with some example embodiments. The distributed datastore system 100 includes transaction clients 120A, 120B, ... , 120C (collectively, at least one transaction client 120) communicatively coupled to storage nodes 150A, 150B, ... , 150C (collectively, at least one storage node 150) via network 160. Network 160 can include one or more of a wired network, a wireless network, and the Internet. Data in the datastore system 100 is distributed in multiple nodes (e.g., storage nodes 150A, . . . , 150C), and access to the data (e.g., for read operations, write operations, and concurrency control) is performed in a distributed fashion.
[0052] The distributed datastore system 100 further includes a storage management module 130 configured to manage storage nodes 150A, ..., 150C. For example, the storage management module 130 manages bringing up and down storage clusters associated with the at least one storage node 150.
[0053] In some aspects, the at least one transaction client 120 is an application configured to access the at least one storage node 150 that provides MC3 functionalities (e.g., using a coordinator 110 and a replica control module 112). In some embodiments, the at least one transaction client 120 includes an
MC3 interface 115 (e.g., to MC3 functionalities provided in connection with data stored by the at least one storage node 150).
[0054] Usually, the storage nodes are managed by a central entity while each of the storage nodes has control software that works together in a distributed, networked environment to achieve data storage, replication, and concurrency control. The at least one storage node 150 includes at least one data structure replica 145 and a hash table 140 used in connection with the MC3 functionalities discussed herein. The at least one data structure replica 145 can include one or more primary replicas of one or more data structures as well as one or more non-primary replicas of some other data structures. A more detailed description of the at least one storage node 150 including different data structure replicas is provided in connection with FIG. 2. In some aspects, the at least one storage node is multi-threaded, and replicas of the same tuple can reside on the same storage node.
[0055] In some embodiments, the at least one storage node 150 further includes a replica control module 112, which is a module configured to perform MC3 functionalities (e.g., as discussed in connection with FIG. 3). The replica control module 112 uses a functional module such as a coordinator 110 in connection with the MC3 functionalities. In some aspects, one or more instances of the coordinator 110 can reside in one or more of transaction clients 120A, . . . , 120C. For example, FIG. 1 illustrates optional coordinator 110 configured at transaction clients 120A and 120B (and not at transaction client 120C).
[0056] FIG. 2 is a diagram of storage nodes 150A and 150B of the distributed datastore system 100 of FIG. 1 each storing data structure replicas and a hash table 140, which can be used in connection with some example embodiments. For simplicity, the storage nodes in FIG. 2 are illustrated to include only some of the components illustrated in FIG. 1 (e.g., each of the storage nodes 150A-150C includes a coordinator 110 and a replica control module 112 which are not illustrated in FIG. 2 for simplicity).
[0057] As illustrated in FIG. 2, the at least one data structure replica 145 includes a primary replica 208 stored at one storage node (at storage node 150A) as well as additional replicas 210, . . . , 212 of a data structure (e.g., key-value tuple 1) with each replica stored at a separate storage node3 (e.g., replica 210 is
stored at storage node 15 OB and replica 212 is stored at storage node 150C). A storage node can store replicas of one or more other tuples (e.g., storage node 150C stores primary replica 220 of key-value tuple 2 and primary replica 222 of key-value tuple 3). In some embodiments, the at least one storage node 150 is multi-threaded so that each of the storage nodes 150A, . . . , 150C can store multiple replicas of the same tuple.
[0058] In some aspects, the distributed datastore system 100 implements key-value stores based on state -based CRDTs. Example implementations may support various CRDTs. Each instance of a CRDT is represented as the value part of a key-value tuple, addressable by the key part of the tuple. Each tuple may have one or more replicas distributed over several servers (e.g., the at least one storage node 150) of the key-value store. Like a state-based CRDT system, the replicas of a tuple can be kept consistent through a gossip mechanism, which is a low-rate, in-the -background broadcast of the local tuple state from one replica to the others. A replica receiving a gossip message uses a merging function to move the replica’s local state in a monotonically non-decreasing fashion. The disclosed MC3 functionalities can be configured and performed using the following elements of the distributed datastore system 100 (discussed in greater detail herein below): MC3 interface 115, a primary replica of the at least one data structure replica 145, coordinator 110, and MC3 -related data structures.
[0059] As mentioned above, the at least one data structure replica 145 can include one or more primary replicas of one or more data structures as well as one or more non-primary replicas of some other data structures. For example and as illustrated in FIG. 2, storage node 150A can store a primary replica 208 (e.g., of key-value tuple 1), while storage node 150B can store additional (nonprimary) replicas 210, ..., 210 of key-value tuple 1. Additionally and as illustrated in FIG. 2, storage node 150B further stores other replicas of other tuples (e.g., primary replica 210 of key-value tuple 2 and primary replica 212 of key-value tuple 3).
MC3 Interface 115
[0060] In some aspects, a key-value store that supports mixed consistency can be configured with the MC3 interface 115 for accessing MC3
functionalities (e.g., functionalities provided by the MC3 management module 130).
[0061] The MC3 interface 115 can be configured as an application interface such as a CRDT update operation which moves the CRDT state in accordance with its monotonicity. For example, one CRDT is an integer with a maximum() update function. An example instance of such CRDT can have a value of 10. A CRDT update operation with an operand of 15 will update the instance value to 15 because the maximum value of 10 and 15 is 15. A following CRDT update operation with an operand of 12 will leave the instance value unchanged because the maximum value of 15 and 12 is 15 even though the CRDT update operation is approved and considered successful.
[0062] In some aspects, the CRDT update interface is different from an ordinary key- value store update interface. The latter sets the value of the tuple with the specified operand. Setting the value of a tuple can be a non-CRDT operation, which can act against the monotonicity of a CRDT. Using the above example, setting the tuple value to 12 can violate the result that the maximum () function may produce, decreasing the tuple value. A non-CRDT operation implies replica coordination at least to some degree, i.e., strong consistency handling may be used. A non-CRDT operation is not commutative with respect to an adjacent CRDT operation. Therefore, mixing a non-CRDT operation and a CRDT operation follow the limitations alluded to by the Bayou theorems.
[0063] In some aspects, the MC3 interface 115 can be configured as a transaction operation. The transaction operation interface can be defined as a set of associated interfaces: txn_begin(), txn_read(), txn_write(), txn_abort(), and txn_commit(). A transaction consists of zero or more reads, zero or more possibly conditional computations, and zero or more writes, and it may involve one or more tuples. A transaction implies the atomicity, strong consistency, and isolation properties of the operations within the transaction.
[0064] In some aspects, a non-CRDT operation or a strong consistency operation can be framed as a transaction. In some aspects, an invariantpreserving CRDT operation can be framed as a transaction. For example, one CRDT is a set of natural numbers that do not exceed 10,000 elements with a unionQ update function. The invariant-compliant CRDT update operation can
be implemented by creating a transaction (using txn_begin()) involving reading and caching the set (using txn_read()), adding (using union()) the operand to the cached set, counting the number of elements in the cached set, aborting the transaction (using txn_abort()) if the number of elements exceeds 10,000, using the cached set as the write value (using txn_write()), and requesting to commit the transaction (using txn_commit()) if the number of elements does not exceed 10,000.
[0065] In some aspects, the txn_commit() may be either approved or denied by the strong consistency validation function of the key-value store. Therefore, txn_commit() can be considered as a request to commit the write set of tuples given the read set of tuples. Upon approval, the write set tuple values can be written to the store. Upon a denial, there may be no tuple value change to the store, and it may be up to the application (e.g., the at least one transaction client 120) to handle the denial of a transaction operation interface.
[0066] In some embodiments, the transaction operation interface can be sufficiently generic to cover most, if not all, operations that require strong consistency handling. In this regard, the disclosed MC3 functionalities are discussed in connection with the transaction operation interface and the CRDT update interface.
Primary Replica
[0067] Each tuple can be associated with one or more replicas (e.g., data structure replicas 145, including a primary replica 208 and one or more additional replicas 210, . . . , 212). The store (or the MC3 management module 130) can be configured to select one of them as the primary replica of the tuple (e.g., primary replica 208 of key-value tuple 1). In some aspects, the selection of the primary replica can be based on ensuring that there is only one primary replica for each tuple at any time. In some aspects, the replicas of the same tuple are distributed to some storage nodes (e.g., one replica to each of the storage nodes). In some aspects, one of the replicas is selected as the primary replica.
[0068] In some aspects, the update processing on the primary replica is slightly different from that on the non-primary replicas. A CRDT update operation can be handled in the same way on all replicas. In some aspects, a
weak consistency update can be approved by the receiving replica without coordination with other replicas. In some embodiments, a CRDT update operation may not be denied. Also, concerning CRDT update operations, there can be multiple writers updating a tuple at the same time.
[0069] In some aspects, a strong consistency update operation, including a transaction operation, can involve the primary replica. Strong consistency can be guaranteed through serialization. Concerning strong consistency update operations, there can be a single writer updating a tuple, and such a tuple can be considered as the primary replica.
Coordinator 110 and Replica Control Module 112
[0070] The replica control module 112 is a functional module configured to perform concurrency control in connection with the MC3 functionalities discussed herein. The coordinator 110 can be configured as a functional module that facilitates strong consistency update request processing associated with the concurrency control functions performed by the replica control module 112. In aspects when an application is unaware of the locations of primary replicas, a coordinator instance repackages and forwards a strong consistency update request to one or more primary replicas of the relevant tuples. A coordinator instance can be run on any server of the key-value store.
[0071] Regarding transaction handling, when coordinator 110 receives a txn_read() request, it repackages and forwards the request to the primary replica of the relevant tuple. When the coordinator receives a txn_commit() request that comprises the read set and write set tuples, it subdivides the read set and write set tuples according to the locations of their respective primary replicas and then repackages and forwards the subsets of read set and write set tuples to the respective servers of the store that manage the subsets of primary replicas. After receiving strong consistency validation results from the respective servers, the coordinator informs the respective servers of the combined approval or denial decision. In aspects associated with a multi-sharded transaction (with tuples spanning over two or more servers), the coordinator facilitates a two-phase commit (2PC) protocol.
[0072] When coordinator 110 sees a transaction aborted (e.g., actively through a txn_abort() request or passively through monitoring the system events), the coordinator informs the respective primary replicas (e.g., primary replica 208) to release their relevant resources such as locks. In practice, txn_abort() can be implemented as txn_commit() with forcing a denial decision.
[0073] If coordinator 110 receives atxn_start() or txn_write() request, it does not forward it to any other replica because the request will only affect a local resource. Specifically, a txn_write() request adds a relevant tuple to a locally-buffered write set of the relevant transaction.
[0074] In some aspects, coordinator 110 can be used to implement retries for a transaction that is denied due to contention.
[0075] In sample embodiments, the coordinator 110 is used for initiating transactional operations such as read, write, and end operation requests and handling responses to the requests. Each transaction is identified by a unique transaction identifier (ID). A read operation should contain the transaction ID and at least one key, while a write operation should contain the transaction ID and at least one key and corresponding value. An end operation requests a commit of the transaction. Each response, from other components of the datastore system 100, should indicate an acceptance or a rejection of the operation requested. A rejection should cause an abort of the transaction. The acceptance of a read or write operation indicates that coordinator 110 may move on to the next operation. The acceptance of a commit operation indicates that the transaction has been validated and serialized and all written data items are stored.
Data Structure
[0076] A key-value tuple (e.g., the data structure associated with primary replica 208) can be interpreted as having three components - a key 214, a version number 216, and a CRDT state 218. A tuple is a named instance of a CRDT, named by the key. A tuple can be characterized by a CRDT, and a default CRDT can be a last-write-wins type.
[0077] Version number 216 describes the version of the CRDT state 218. Version number 216 facilitates strong consistency enforcement through a
merge() function. In some aspects, the primary replica 208 can increment version number 216, and it does so when a strong consistency update request is approved.
[0078] At any replica, the merge () function merges a received replica value into the local replica value. If the version numbers are different, the replica value with a higher version number becomes the local replica value. Otherwise, the local replica executes its CRDT update function with the received replica CRDT state. In other words, incrementing the version number enables the CRDT state to go against its CRDT monotonicity. When the version number is unchanged, the CRDT state 218 moves in a monotonically non-decreasing fashion.
[0079] An example data structure in MC3 processing is related to replica locking. In some aspects, the distributed datastore system 100 uses a hash table 140 to implement MC3 locks. Each storage node can be configured to maintain one hash table for all tuple replicas it manages. The key to the hash table 140 can be the same key to a tuple. In some aspects, the absence of an entry in the hash table 140 indicates the absence of locks for the tuple replica. The presence of an entry indicates the tuple replica is being locked potentially.
[0080] An entry of the hash table 140 comprises an entry number 202, a write lock (reference count) 204, a read lock reference count 206, and an incremental update summary 207. In MC3, the write lock is exclusive, and the read lock is shared. Therefore, the write lock reference count 204 may use one bit, while the read lock reference count 206 may need a reference count, e.g., a 31 -bit or large enough number to avoid overflow because a read lock held is not to block a subsequent read lock request.
[0081] In some embodiments, a minimum transaction identifier (not illustrated in FIG. 2) can be associated with a hash table entry. Each transaction uses a transaction identifier (e.g., a globally unique number), to identify itself. For example, the transaction identifier can be constructed using a timestamp followed by a replica identifier. The entry’s minimum transaction identifier stores the minimum transaction identifier of the concurrent transactions holding the read lock.
[0082] In MC3, a write lock held denies a subsequent write lock request or a subsequent read lock request. A read lock held allows subsequent read lock requests. A read lock held does not deny a subsequent write lock request whose associated transaction identifier is less than or equal to the minimum transaction identifier. A replica is considered as completely unlocked or not locked when all locks have been released, i.e., both the write lock reference count and the read lock reference count are zero.
[0083] The incremental update summary 207 is a cache or a pointer to a cache. The cache stores the merged result of all CRDT update requests received from an application after the replica has been locked. When a hash table entry is first created, its incremental update summary is initialized with the initial CRDT value, which is specific to each CRDT. The initial value for its minimum transaction identifier is the maximum possible value it can take.
[0084] In some embodiments, the hash table entry may also comprise a pointer to the tuple replica in the store so that a look-up into the store can be avoided.
[0085] FIG. 3 is a diagram of MC3 -related data and event flows 300 performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments. Referring to FIG. 3, the MC3 -related data and event flows 300 can include the transaction client 302 (which can be the same as the at least one transaction client 120), coordinator 304 (which can be the same as coordinator 110), a first primary replica 306 (e.g., a tuple with key “X”, also referred to as X primary replica 306), additional replica 308 (also referred to as X replica 308), and a second primary replica 310 (e.g., another tuple with key “Y”, also referred to as Y primary replica 310). In some embodiments, the MC3-related data and event flows 300 are performed by the replica control module 112.
[0086] To illustrate the concurrency control protocol MC3, the description below will focus on how the protocol works with transactions and CRDT update requests for generality. In some aspects, strong consistency updates may lead to simpler processing and call flows while using the same MC3 functionalities described herein.
[0087] The MC3-related data and event flows 300 include three phases of locking - a read lock expansion phase 312, a write lock expansion phase 314, and a lock shrinkage phase 316. Solving a multi-replica multi-sharded transaction (with concurrent eventual consistency updates on any replica) problem is different from solving a single-replica multi-sharded transaction problem. While the MC3 -related data and event flows 300 utilize the three-phase locking approach, there are additional configurations that make it more than a case of a rigorous 2PL.
[0088] MC3 executes the read lock expansion phase 312 before txn_commit() (e.g., commit request 318), and the write lock expansion phase 314 and the lock shrinkage phase 316 after the commit request 318. To describe the rationale, replica-level consistency can be differentiated from tuple-level consistency. Replica-level consistency is about the consistency of all replicas of the same tuple. Tuple-level consistency is about the consistency of all tuples in the same transaction. Having the read lock expansion phase 312 early is to facilitate replica-level consistency. A read lock on a tuple is applied to all replicas of the tuple. Read locks on replicas freeze the replica values to enable the primary replica to account for all prior CRDT updates contributing to the transaction and come up with a merged, frozen CRDT state of the version number to enable tuple-level consistency enforcement.
[0089] The following is a summary of operations A-V associated with the MC3-relateddata and event flows 300. At operation A, txn_start() is executed to begin a transaction. At operation B, a txn_read(“X”) is executed. In this regard, the read lock expansion phase 312 is triggered by txn_read() requests. At operation C, the read request is forwarded to the X primary replica 306. At operation D, read lock requests are sent to other replicas (e.g., replica 308). At operation E, a read lock is grabbed, a reply with the locked value is generated, gossips are ignored, and the incremental update summary is maintained. At operation F, the X primary replica 306 grabs a read lock, merges the replica values, replies with the merged value, ignores gossips and maintains the incremental update summary. At operation G, coordinator 304 replies with the read value. At operation H, the transaction client 302 returns the txn_read(“X”) call and caches the value in a buffered read set.
[0090] At operation I, txn_write() calls on tuples X and Y are executed over a buffered write set. At operation J, txn_commit() (e.g., commit request 318) is executed to request a commit on the write set given the read set. At operation K, the X primary replica 306 and the Y primary replica 310 are contacted. At operation L, a validation that the read set value is the same as the replica’s current value is performed, the write lock is grabbed (or activated), and a reply with the validation result is communicated. At operation M, the Y primary replica 310 grabs the write lock and replies with the validation result. At operation N, the validation result is cached, and waiting for all validation results follows. At operation O, the final decision is derived based on all validation results, and the decision is communicated to the primary replicas. At operation P, the replica value is replaced with the write set value, the version number is incremented, the read lock and the write lock are released, a merge with the incremental update summary is performed, and the value and the lock release request are sent to other replicas. At operation Q, the replica value is replaced with the received value, including the version, the read lock is released, a merge with the incremental update summary is performed, and an acknowledgment is sent to the primary replica. At operation R, a reply is communicated to coordinator 304. At operation S, wait for all replies follows. At operation T, the replica value is replaced with the write set value, the version is incremented, the write lock is released, a merge with the incremental update summary is performed, and a reply is communicated to coordinator 304. At operation U, a reply is communicated to the application when all replies are received. At operation V, the txn_commit() call is returned with the decision. A more detailed description of operations A-V is provided herein below.
[0091] As mentioned above, the read lock expansion phase 312 is triggered by txn_read() requests. A coordinator 304 receives a txn_read() request on a tuple and forwards it to the primary replica of the tuple. In response, the primary replica sends the read lock requests to all replicas of the tuple. After receiving all request approvals and CRDT values from the replicas, the primary replica 306 merges the CRDT values into its local CRDT value and grabs its read lock, by creating a hash table entry (e.g., in hash table 140) if necessary, and incrementing the entry’s read lock reference count. The primary replica’s CRDT value is frozen until the primary replica is completely unlocked.
The primary replica replies to the request sender with the frozen CRDT value. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary.
[0092] When a non-primary replica receives a read lock request, it grabs its read lock, by creating a hash table entry if necessary, and incrementing the entry’s read lock reference count. The replica’s value is frozen until the replica is completely unlocked. The replica replies to the request sender with its frozen CRDT value. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary.
[0093] Therefore, the read lock expansion phase 312 is marked by read lock requests on all replicas of the tuple in a txn_read() request. In some aspects, a transaction may involve more than one txn_read() request.
[0094] The write lock expansion phase 314 is triggered by a txn_commit() request 318. A coordinator 304 receives a txn_commit() request on the write set and read set tuples and forwards it to the primary replica of each tuple.
[0095] In response, the primary replica executes a strong consistency validation procedure. It verifies whether it is one of the read set tuples and, if so, whether the tuple value in the read set is in agreement with the tuple replica’s current value. For example, a difference in the version numbers indicates that the earlier-read tuple value stored in the read set has been outdated and the transaction commit request is to be denied.
[0096] The primary replica checks whether it is one of the write set tuples. If so, it issues a write lock request (by creating a hash table entry if necessary and incrementing the entry’s write lock reference count). If the write lock request is denied due to contention, the current transaction commit request is to be denied. If the write lock request is approved, the replica’s value is now frozen, if it has not been frozen by a prior read lock request. Then, the primary replica replies to coordinator 304. Meanwhile, if it receives any CRDT update request, it merges the CRDT update request into the associated hash table entry’s incremental update summary.
[0097] Coordinator 304 may see replies from all relevant primary replicas of the read set and write set tuples to derive a final commit request approval or denial decision.
[0098] The lock shrinkage phase 316 is triggered by the final approval or denial decision. When a primary replica is aware of the decision (sometimes, it knows the decision ahead of the coordinator because a local denial will lead to a final denial), it first checks whether it is one of the write set tuples and the commit request is approved. If so, it updates its tuple value with the value in the write set and sends the new CRDT state and the new version number to all replicas of the tuple. Then, it releases its write lock. It releases its read lock and informs other replicas to release their read locks if it is one of the read set tuples. After receiving all acknowledgments from other replicas that their replica values are updated, the primary replica replies to the coordinator.
[0099] When a non-primary replica receives a tuple value update message from the primary replica due to the commit request approval, it replaces its local value with the received value. When it receives a read lock release message from the primary replica, it releases its read lock by decrementing the read lock reference count.
[0100] When a replica is unlocked, it merges its hash table entry’s incremental update summary into its local CRDT state, yet without changing the version number. Then the hash table entry can be purged.
[0101] FIG. 4 - FIG. 10 illustrate example pseudocode associated with MC3 -related functionalities performed in connection with concurrency control in a distributed datastore system, which can be used in connection with some example embodiments.
[0102] The MC3 algorithm with respect to a replica is described in pseudocode 400, 500, 600, 700, 800, 900, and 1000 in corresponding FIG. 4, FIG. 5, FIG. 6, FIG. 7, FIG. 8, FIG. 9, and FIG. 10. Initial setup and initialization are provided by lines 1-11 of pseudocode 400 in FIG. 4. The read lock expansion phase is covered in lines 12-36 of pseudocode 500 in FIG. 5. The write lock expansion phase is handled in lines 37-52 of pseudocode 600 in FIG. 6. The lock shrinkage phase is described by lines 53-81 of pseudocode 700
in FIG. 7. In some aspects, a received gossip message is ignored if a replica is locked, as shown in lines 86-88 of pseudocode 900 in FIG. 9.
[0103] As an optimization, if the primary replica has already issued read lock requests to other replicas, it does not need to do it again on the next txn_read(). The primary replica tracks the actual read lock reference count, and when it is completely unlocked, it asks other replicas to release all their locks. Similarly, the primary replica tracks the write lock on behalf of the other replicas. The optimization reduces the number of coordination messages.
[0104] As another optimization, the hash table entry stores the minimum transaction identifier of the transactions holding the read lock. A write lock request is accepted when its associated transaction identifier is smaller or equal to the minimum transaction identifier. When the write lock request is accepted, the version of the replica is incremented (e.g., in line 49 of pseudocode 600 in FIG. 6) regardless of whether the transaction that holds the write lock will be approved or not. Incrementing the version will cause the transactions that have acquired the read lock before the write lock request to be aborted at their validation check.
[0105] Therefore, a read lock held may approve a write lock request, and the write lock approved denies a subsequent read lock request. That is beneficial because the optimization has low overhead as it requires no tracking of all transaction identifiers of the read lock holders. Yet, the optimization helps increase concurrency, reduce starvation, and enforce the isolation guarantee for inter-dependent multi-sharded transactions by cutting some edges in transaction dependency graphs such that there is no cycle in the graphs, at the cost of denying the transactions associated with the cut edges.
[0106] In some aspects, the version is incremented by more than one, e.g., 10 (e.g., line 56 of pseudocode 700 in FIG. 7) when the commit request is approved. This way, it provides a way to make the version mismatch check (e.g., lines 44-45 of pseudocode 600 in FIG. 6) more sophisticated to detect whether a mismatch is due to lock contention or due to the approval of a prior commit request. If a commit request is denied due to lock contention, the version difference should be within 10, and the commit request may be retried,
relocking the read set and write set without restarting the transaction and rereading the replica value.
[0107] In some embodiments, the primary replica’s value can be included in the lockRelease() message so that the receiving replicas can have a synchronized value before releasing their locks.
[0108] The algorithm described in connection with FIG. 4 - FIG. 10 assumes the coordinator is configured to handle the 2PC handshake for multi- sharded transactions. In the case of a single-sharded transaction, where all primary replicas of tuples in the transaction reside on the same server, the handshake can be simplified. For generality, the MC3 algorithm is described assuming the need for a 2PC, as in lines 38-41 of pseudocode 600 in FIG. 6.
[0109] In lines 13-15 of pseudocode 500 in FIG. 5, a client is assumed to have a sticky session to a replica, and the client-affined replica acts as a coordinator of a transaction initiated by the client. For maintaining causal consistency for the client, it may be more efficient to reject a txn_read() if the client-affined, coordinator replica has been locked and its incremental update summary has been updated by a prior CRDT update request.
[0110] Serializability
[oni] The disclosed MC3 -related functionalities provide the serializability guarantee to concurrent transactions amidst concurrent CRDT update requests. Such a guarantee can be provided because MC3 locking freezes all replicas of a tuple, and that enables analyzing its concurrency control from the primary replica’s viewpoint.
[0112] In 2PL’s lock expansion phase, locks are acquired, and no locks are released. MC3’s read lock expansion phase and subsequent write lock expansion phase are a special case of 2PL’s lock expansion phase.
[0113] In 2PL’s lock shrinkage phase, locks are released, and no locks are acquired. MC3’s lock shrinkage phase does the same. It releases both read and write locks applied by a transaction only after the transaction has ended.
[0114] In 2PL, a read lock held denies a write lock request, and a write lock held denies a read lock request. If MC3 implements the same locking
functional behavior, MC3 can be configured as a variation of a special case of Rigorous 2PL.
[0115] In some embodiments, a more advantageous implementation of the locking functional behavior for MC3 can be configured that allows a read lock held to approve a write lock request whose associated transaction identifier is smaller or equal to the minimum transaction identifier of the read lock.
[0116] The above optimization still guarantees serializability because there is a strong consistency validation check at the commit request time. The validation check uses read set information to detect whether a read set value could have been outdated, causing a transaction commit request denial. In other words, the first transaction may have held the read lock earlier than the second transaction that gets the write lock, passes its validation check, and commits and changes the replica value. When the first transaction goes through its validation check, it receives a denial because of the version difference. In comparison, 2PL does not have the read set information and the validation check and, therefore, requires a read lock held to deny a write lock request.
[0117] The optimization can be interpreted as a 2PL wound-wait scheme. The transaction with a lower transaction identifier is considered to have a higher priority among contentious transactions. A write lock request can be approved for the highest-priority transaction among the contentious transactions having acquired the read lock. The approval of the write lock request will cause those contentious transactions to abort due to version mismatch during their validation check (for brevity and clarity, the algorithm in FIG. 4 - FIG. 10 describes how the wound part is implemented and that the wait part is just no-wait.)
[0118] In some embodiments, MC3 can be configured based on a statebased CRDT model and guarantees serializability for strong consistency updates. In contrast, the Bayou protocol is presented over an operation-based CRDT model, and serializability (or sequential consistency or linearizability, depending on one’s perspective) is guaranteed through total order broadcast (TOB). MC3 results in one sequence of weak and strong consistency updates relative to the CRDT states resulting from the prior updates, while the Bayou protocol results in a different sequence of weak and strong consistency updates based on the order established by TOB. Specifically, MC3 uses locking and associated frozen
CRDT states to allow an approved transaction to be serialized back in time, that is, sequencing the CRDT updates during the locking period to follow the approved transaction. Bayou takes a time stamp ordering approach to provide the mixed consistency guarantee while MC3 takes a 2PL approach. Both provide correct but different results.
Advantages Over Existing Techniques
[0119] Although MC3 and 2PL cannot be compared in general, mentally restricting MC3 to single-replica multi-tuple transactions, a degenerated case to MC3, can enable detecting some advantages of MC3 over 2PL with respect to transactional support.
[0120] The first advantage of MC3 is higher concurrency compared to Rigorous 2PL. MC3 has some optimistic concurrency control (OCC) characteristics. Firstly, it defers the write lock expansion phase until the validation check when the application intends to commit the transaction, so it allows more transactions to do txn_read() concurrently. Secondly, regarding making write lock requests, MC3 can know the write set tuples and sort them before the write lock expansion phase. It is because the txn_write() tuples can be buffered in the write set and sorted before the txn_commit() is invoked. As a result, two transactions that make write lock requests on the same tuples will not get into a deadlock as their write sets are sorted using the same criteria.
[0121] The second advantage of MC3 is being programmer-friendly. Conservative 2PL can avoid deadlocks or starvation better than Rigorous 2PL can, but the former requires an application to declare the read set and write set tuples at the beginning of a transaction. The requirement results in an awkward programming model. In contrast, MC3 defers the write lock expansion phase after txn_commit(), and, by then, the application has declared the read set and write set tuples through the natural usage of the transaction operation interfaces.
[0122] In some aspects, MC3 can be designed for applications that take advantage of high-throughput eventual consistency updates. If eventual consistency updates were disallowed or never used, there would be no need for locking on the non-primary replicas, and some unnecessary handshakes between the primary replica and non-primary replicas could be skipped. The key
advantage of MC3 is enabling applications to mix concurrent eventual consistency updates and strong consistency updates while ensuring that the effect of each update is applied once.
Non-FEC Variant
[0123] MC3 enables mixed consistency on a distributed key-value store, and such a system exhibits the FEC behavior inferred by the Bayou theorems. While some applications may benefit from the concurrency of the FEC behavior, some may not handle it well. The latter may prefer a trade-off: removing the FEC behavior at the cost of delayed responses to the CRDT update requests amidst unfinished strong consistency update requests. In some aspects, an MC3 store can be adapted to provide such a non-FEC version.
[0124] The FEC behavior is due to the treatment of the hash table entry’s incremental update summary. When a replica is locked, an ordinary key-value store get() request returns the replica’s frozen CRDT state merged with the incremental update summary. When a transaction is approved and the replica’s CRDT state is replaced, the order of operation requests, observed through get(), appears to have changed.
[0125] To remove the FEC behavior, the store can respond to the get() request with the replica’s frozen CRDT state, without merging the incremental update summary. Also, the response to any CRDT update request, crdt_update(), is delayed until the replica is completely unlocked. In other words, while a replica is locked, any CRDT update request is blocked by the unfinished strong consistency update requests.
[0126] FIG. 11 is a flowchart of method 1100 suitable for updating a plurality of replicated data structures of a distributed datastore system, according to some example embodiments. Method 1100 includes operations 1102, 1104, 1106, 1108, 1110, 1112, 1114, 1116, 1118, 1120, and 1122. By way of example and not limitation, method 1100 is described as being performed by a replica control module (e.g., replica control module 112 in FIG. 1, replica control module 1262 in FIG. 12, or replica control module 1365 in FIG. 13) using a coordinator (e.g., coordinator 110 in FIG. 1, coordinator 1260 in FIG. 12, or coordinator 1360 in FIG. 13). Additionally, method 1100 is based on the MC3-
related data and event flows 300 and its corresponding three phases discussed in connection with FIG. 3 - the read lock expansion phase 312, the write lock expansion phase 314, and the lock shrinkage phase 316.
[0127] At operation 1102, at least one read request for one or more keyvalue tuples of replicated data structures is decoded. Each key-value tuple of the one or more key-value tuples includes a key and a corresponding conflict-free replicated datatype (CRDT) value.
[0128] At operation 1104, replicas of each key- value tuple of the one or more key-value tuples in the at least one read request are locked to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples.
[0129] At operation 1106, the locked CRDT values of the read-locked replicas are merged into a merged CRDT value onto a read set in response to the at least one read request.
[0130] At operation 1108, an update summary is maintained on each of the read-locked replicas.
[0131] At operation 1110, a commit request (e.g., commit request 318 in FIG. 3) to commit a write set of at least one key-value tuple is decoded. The commit request includes the read set.
[0132] At operation 1112, a validation is performed on whether the read set in the commit request is unchanged from current CRDT values of the read- locked replicas.
[0133] At operation 1114, a write lock of the at least one key-value tuple of the write set is activated to obtain write-locked replicas.
[0134] At operation 1116, an update summary is maintained on each of the write-locked replicas.
[0135] At operation 1118, the commit request is accepted when the validating and the activating are successful.
[0136] At operation 1120, the read-locked replicas and the write-locked replicas are unlocked after the commit request is processed resulting in unlocked replicas.
[0137] At operation 1122, the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas is merged to a current CRDT value of the unlocked replica.
[0138] In some aspects, a read request for the plurality of replicated data structures is decoded. For example, the read request is executed at operation B in FIG. 3. In some aspects, a read set is generated in response to the read request. The read set includes a merged conflict-free replicated data type (CRDT) value based on a plurality of CRDT values associated with the plurality of replicated data structures (e.g., replicas 306 and 308). For example, the generation of the read set is performed in connection with operations C-H in FIG. 3. In some aspects, a commit request to commit a write set of CRDT values to the plurality of replicated data structures based on the read set is decoded. For example, commit request 318 is executed at operation J in FIG. 3. In some aspects, a subset of the plurality of CRDT values associated with a subset of the plurality of replicated data structures (e.g., the primary replicas) is validated using the read set and the write set. For example, the validation is performed in connection with the write lock expansion phase 314 and operations K-0 in FIG. 3. In some aspects, the commit request is processed based on the result of the validation. For example, processing of the commit request is performed during the lock shrinkage phase 316 and operations O-V in FIG. 3.
[0139] In some aspects, the plurality of replicated data structures (e.g., data structure replicas 145) includes a plurality of key-value tuples. Each keyvalue tuple of the plurality of key-value tuples includes a key and a corresponding CRDT value of the plurality of CRDT values. In some aspects, generating the read set further includes forwarding the read request to the subset of the plurality of replicated data structures (e.g., at operation C in FIG. 3). The subset includes at least one primary replica of the data structures (e.g., X primary replica 306). In some aspects, generating the read set further includes generating by the at least one primary replica, one or more read locks based on the read request, and forwarding the one or more read locks to a remaining subset of the plurality of replicated data structures (e.g., X replica 308).
[0140] In some aspects, generating the read set further includes retrieving one or more of the plurality of CRDT values from the remaining subset of the plurality of replicated data structures after the one or more read locks are initiated, and merging the one or more of the plurality of CRDT values retrieved from the remaining subset of the plurality of replicated data structures with a CRDT value of the at least one primary replica to generate the merged CRDT value (e.g., operations E-G in FIG. 3).
[0141] In some aspects, one or more hash table entries are generated in a hash table (e.g., hash table 140). The hash table is stored in a storage node of the distributed datastore system. The storage node stores the subset of the plurality of replicated data structures. A read lock reference count 206 is incremented for each of the one or more hash table entries. The read lock reference count 206 indicates a count of read locks initiated for a corresponding one of the subset of the plurality of replicated data structures.
[0142] In some embodiments, a transaction write call is executed on the plurality of replicated data structures (e.g., at operation I in FIG. 3). The transaction write call is executed before the commit request and is associated with the write set of CRDT values.
[0143] In some embodiments, the subset of the plurality of replicated data structures includes primary replicas of the data structures (e.g., primary replicas 306 and 310). Validating the subset of the plurality of CRDT values further includes validating at a first primary replica of the primary replicas. A value of the read set is equal to a corresponding CRDT value of the subset of the plurality of CRDT values that is associated with the first primary replica. A write lock of the corresponding CRDT value associated with the first primary replica is initiated based on a successful validation that the value of the read set is equal to the corresponding CRDT value of the subset.
[0144] In some embodiments, a first validation result of the write lock of the corresponding CRDT value associated with the first primary replica is generated. At least a second validation result of a write lock of a corresponding CRDT value associated with a second primary replica of the primary replicas is generated. The subset of the plurality of CRDT values associated with the subset of the plurality of replicated data structures is validated based on successful
validations indicated by the first validation result and at least the second validation result.
[0145] In some aspects (e.g., at operations P and Q in FIG. 3), a CRDT value of the subset of the plurality of CRDT values associated with the subset of the plurality of replicated data structures is replaced with a CRDT value of the write set. forwarding a lock release and the CRDT value of the write set to at least one replicated data structure of a remaining set of replicated data structures. A CRDT value of the at least one replicated data structure is replaced with the CRDT value of the write set. A write lock at the at least one replicated data structure is released based on the lock release.
[0146] FIG. 12 is a block diagram illustrating a representative software architecture 1200, which may be used in conjunction with various device hardware described herein, according to some example embodiments. FIG. 12 is merely a non-limiting example of software architecture 1202 and it will be appreciated that many other architectures may be implemented to facilitate the functionality described herein. The software architecture 1202 may be executed on hardware such as computing device 1300 of FIG. 13 that includes, among other things, processor 1305, memory 1310, storage 1315 and 1320, and I/O components (or interfaces) 1325 and 1330. A representative hardware layer 1204 is illustrated and can represent, for example, the computing device 1300 of FIG. 13. The representative hardware layer 1204 comprises one or more processing units 1206 having associated executable instructions 1208.
Executable instructions 1208 represent the executable instructions of the software architecture 1202, including the implementation of the methods, modules, and so forth of FIGS. 1-11. Hardware layer 1204 also includes memory and/or storage modules 1210, which also have executable instructions 1208. Hardware layer 1204 may also comprise other hardware 1212, which represents any other hardware of the hardware layer 1204, such as the other hardware illustrated as part of computing device 1300.
[0147] In the example architecture of FIG. 12, the software architecture 1202 may be conceptualized as a stack of layers where each layer provides particular functionality. For example, the software architecture 1202 may include layers such as an operating system 1214, libraries 1216,
frameworks/middleware 1218, applications 1220, and presentation layer 1244. Operationally, the applications 1220 and/or other components within the layers may invoke application programming interface (API) calls 1224 through the software stack and receive a response, returned values, and so forth illustrated as messages 1226 in response to the API calls 1224. The layers illustrated in FIG. 12 are representative in nature and not all software architectures 1202 have all layers. For example, some mobile or special-purpose operating systems may not provide frameworks/middleware 1218, while others may provide such a layer. Other software architectures may include additional or different layers.
[0148] The operating system 1214 may manage hardware resources and provide common services. The operating system 1214 may include, for example, a kernel 1228, services 1230, and drivers 1232. The kernel 1228 may act as an abstraction layer between the hardware and the other software layers. For example, kernel 1228 may be responsible for memory management, processor management (e.g., scheduling), component management, networking, security settings, and so on. Services 1230 may provide other common services for the other software layers. The drivers 1232 may be responsible for controlling or interfacing with the underlying hardware. For instance, the drivers 1232 may include display drivers, camera drivers, Bluetooth® drivers, flash memory drivers, serial communication drivers (e.g., Universal Serial Bus (USB) drivers), Wi-Fi® drivers, audio drivers, power management drivers, and so forth, depending on the hardware configuration.
[0149] The libraries 1216 may provide a common infrastructure that may be utilized by the applications 1220 and/or other components and/or layers. The libraries 1216 typically provide functionality that allows other software modules to perform tasks more easily than to interface directly with the underlying operating system 1214 functionality (e.g., kernel 1228, services 1230, and/or drivers 1232). The libraries 1216 may include system libraries 1234 (e.g., C standard library) that may provide functions such as memory allocation functions, string manipulation functions, mathematic functions, and the like. In addition, the libraries 1216 may include API libraries 1236 such as media libraries (e.g., libraries to support presentation and manipulation of various media formats such as MPEG4, H.264, MP3, AAC, AMR, JPG, PNG), graphics
libraries (e.g., an OpenGL framework that may be used to render 2D and 3D in a graphic content on a display), datastore libraries (e.g., SQLite that may provide various relational datastore functions), web libraries (e.g., WebKit that may provide web browsing functionality), and the like. The libraries 1216 may also include a wide variety of other libraries 1238 to provide many other APIs to the applications 1220 and other software components/modules.
[0150] The frameworks/middleware 1218 (also sometimes referred to as middleware) may provide a higher-level common infrastructure that may be utilized by the applications 1220 and/or other software components/modules. For example, the frameworks/middleware 1218 may provide various graphical user interface (GUI) functions, high-level resource management, high-level location services, and so forth. The frameworks/middleware 1218 may provide a broad spectrum of other APIs that may be utilized by the applications 1220 and/or other software components/modules, some of which may be specific to a particular operating system 1214 or platform.
[0151] The applications 1220 include built-in applications 1240 and/or third-party applications 1242. Examples of representative built-in applications 1240 may include but are not limited to, a contacts application, a browser application, a book reader application, a location application, a media application, a messaging application, and/or a game application. Third-party applications 1242 may include any of the built-in applications 1240 as well as a broad assortment of other applications. In a specific example, the third-party application 1242 (e.g., an application developed using the Android™ or iOS™ software development kit (SDK) by an entity other than the vendor of the particular platform) may be mobile software running on a mobile operating system such as iOS™, Android™, Windows® Phone, or other mobile operating systems. In this example, the third-party application 1242 may invoke the API calls 1224 provided by the mobile operating system such as operating system 1214 to facilitate the functionality described herein.
[0152] In some embodiments, applications 1220 include the coordinator 1260 and replica control module 1262 used in connection with the disclosed MC3 functionalities. Coordinator 1260 and replica control module 1262 can be
the same as (and perform the same functionalities as) coordinator 110 and replica control module 112 discussed in connection with FIG. 1 - FIG. 11.
[0153] The applications 1220 may utilize built-in operating system functions (e.g., kernel 1228, services 1230, and/or drivers 1232), libraries (e.g., system libraries 1234, API libraries 1236, and other libraries 1238), and frameworks/middleware 1218 to create user interfaces to interact with users of the system. Alternatively, or additionally, in some systems, interactions with a user may occur through a presentation layer, such as presentation layer 1244. In these systems, the application/module “logic” can be separated from the aspects of the application/module that interact with a user.
[0154] Some software architectures utilize virtual machines. In the example of FIG. 12, this is illustrated by virtual machine 1248. A virtual machine creates a software environment where applications/modules can execute as if they were executing on a hardware machine (such as the computing device 1300 of FIG. 13, for example). A virtual machine 1248 is hosted by a host operating system (operating system 1214 in FIG. 12) and typically, although not always, has a virtual machine monitor 1246, which manages the operation of the virtual machine 1248 as well as the interface with the host operating system (i.e., operating system 1214). Software architecture 1202 executes within the virtual machine 1248 such as an operating system 1250, libraries 1252, frameworks/middleware 1254, applications 1256, and/or presentation layer 1258. These layers of software architecture executing within the virtual machine 1248 can be the same as the corresponding layers previously described or may be different.
[0155] FIG. 13 is a block diagram illustrating circuitry for a device that implements algorithms and performs methods, according to some example embodiments. All components need not be used in various embodiments. For example, clients, servers, and cloud-based network devices may each use a different set of components, or in the case of servers, larger storage devices.
[0156] One example computing device in the form of a computer 1300 (also referred to as computing device 1300, computer system 1300, or computer 1300) may include a processor 1305, memory 1310, removable storage 1315, non-removable storage 1320, input interface 1325, output interface 1330, and
communication interface 1335, all connected by a bus 1340. Although the example computing device is illustrated and described as the computer 1300, the computing device may be in different forms in different embodiments.
[0157] Memory 1310 may include volatile memory 1345 and nonvolatile memory 1350 and may store a program 1355. The computer 1300 may include - or have access to a computing environment that includes - a variety of computer-readable media, such as the volatile memory 1345, the non-volatile memory 1350, the removable storage 1315, and the non-removable storage 1320. Computer storage includes random-access memory (RAM), read-only memory (ROM), erasable programmable read-only memory (EPROM) and electrically erasable programmable read-only memory (EEPROM), flash memory or other memory technologies, compact disk read-only memory (CD ROM), digital versatile disks (DVD) or other optical disk storage, magnetic cassettes, magnetic tape, magnetic disk storage or other magnetic storage devices, or any other medium capable of storing computer-readable instructions.
[0158] Computer-readable instructions stored on a computer-readable medium (e.g., the program 1355 stored in the memory 1310) are executable by the processor 1305 of the computer 1300. A hard drive, CD-ROM, and RAM are some examples of articles including a non-transitory computer-readable medium such as a storage device. The terms “computer-readable medium” and “storage device” do not include carrier waves to the extent that carrier waves are deemed too transitory. “Computer-readable non-transitory media” includes all types of computer-readable media, including magnetic storage media, optical storage media, flash media, and solid-state storage media. It should be understood that software can be installed on and sold with a computer. Alternatively, the software can be obtained and loaded into the computer, including obtaining the software through a physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator. The software can be stored on a server for distribution over the Internet, for example. As used herein, the terms “computer-readable medium” and “machine-readable medium” are interchangeable.
[0159] The program 1355 may utilize modules discussed herein, such as coordinator 1360 and replica control module 1365, which can be the same as (and perform the same functionalities as) the coordinator 110 and replica control module 112 discussed in connection with FIG. 1 - FIG. 11.
[0160] Any one or more of the modules described herein may be implemented using hardware (e.g., a processor of a machine, an applicationspecific integrated circuit (ASIC), field-programmable gate array (FPGA), or any suitable combination thereof). Moreover, any two or more of these modules may be combined into a single module, and the functions described herein for a single module may be subdivided among multiple modules. Furthermore, according to various example embodiments, modules described herein as being implemented within a single machine, datastore, or device may be distributed across multiple machines, datastores, or devices.
[0161] In some aspects, the disclosed functionalities can be performed by one or more separate (or dedicated) modules included in the replica control module 1365 and integrated as a single module, performing the corresponding functions of the integrated module.
[0162] Although a few embodiments have been described in detail above, other modifications are possible. For example, the logic flows depicted in the figures do not require the particular order shown, or sequential order, to achieve desirable results. Other steps may be provided, or steps may be eliminated, from the described flows, and other components may be added to, or removed from, the described systems. Other embodiments may be within the scope of the following claims.
[0163] It should be further understood that software including one or more computer-executable instructions that facilitate processing and operations as described above concerning any one or all of the steps of the disclosure can be installed in and sold with one or more computing devices consistent with the disclosure. Alternatively, the software can be obtained and loaded into one or more computing devices, including obtaining software through a physical medium or distribution system, including, for example, from a server owned by the software creator or from a server not owned but used by the software creator.
The software can be stored on a server for distribution over the Internet, for example.
[0164] Also, it will be understood by one skilled in the art that this disclosure is not limited in its application to the details of construction and the arrangement of components outlined in the description or illustrated in the drawings. The embodiments herein are capable of other embodiments and capable of being practiced or carried out in various ways. Also, it will be understood that the phraseology and terminology used herein are for descriptive purposes and should not be regarded as limiting. The use of "including," "comprising," or "having" and variations thereof herein is meant to encompass the items listed thereafter and equivalents thereof as well as additional items. Unless limited otherwise, the terms "connected," "coupled," and "mounted," and variations thereof herein are used broadly and encompass direct and indirect connections, couplings, and mountings. In addition, the terms "connected" and "coupled" and variations thereof are not restricted to physical or mechanical connections or couplings. Further, terms such as up, down, bottom, and top are relative and are employed to aid illustration but are not limiting.
[0165] The components of the illustrative devices, systems, and methods employed in accordance with the illustrated embodiments can be implemented, at least in part, in digital electronic circuitry, analog electronic circuitry, or computer hardware, firmware, software, or combinations of them. These components can be implemented, for example, as a computer program product such as a computer program, program code, or computer instructions tangibly embodied in an information carrier, or a machine-readable storage device, for execution by, or to control the operation of, data processing apparatus such as a programmable processor, a computer, or multiple computers.
[0166] A computer program can be written in any form of programming language, including compiled or interpreted languages, and it can be deployed in any form, including as a stand-alone program or as a module, component, subroutine, or another unit suitable for use in a computing environment. A computer program can be deployed to be executed on one computer or multiple computers at one site or distributed across multiple sites and interconnected by a communication network. Also, functional programs, codes, and code segments
for accomplishing the techniques described herein can be easily construed as within the scope of the claims by programmers skilled in the art to which the techniques described herein pertain. Method steps associated with the illustrative embodiments can be performed by one or more programmable processors executing a computer program, code, or instructions to perform functions (e.g., by operating on input data and/or generating an output). Method steps can also be performed, and the apparatus for performing the methods can be implemented as, special -purpose logic circuitry, e.g., an FPGA (field programmable gate array) or an ASIC (application-specific integrated circuit), for example.
[0167] The various illustrative logical blocks, modules, and circuits described in connection with the embodiments disclosed herein may be implemented or performed with a general-purpose processor, a digital signal processor (DSP), an ASIC, an FPGA, or other programmable logic devices, discrete gate or transistor logic, discrete hardware components, or any combination thereof designed to perform the functions described herein. A general-purpose processor may be a microprocessor, but in the alternative, the processor may be any conventional processor, controller, microcontroller, or state machine. A processor may also be implemented as a combination of computing devices, e.g., a combination of a DSP and a microprocessor, a plurality of microprocessors, one or more microprocessors in conjunction with a DSP core, or any other such configuration.
[0168] Processors suitable for the execution of a computer program include, by way of example, both general and special purpose microprocessors, and any one or more processors of any kind of digital computer. Generally, a processor will receive instructions and data from a read-only memory or a random-access memory, or both. The required elements of a computer are a processor for executing instructions and one or more memory devices for storing instructions and data. Generally, a computer will also include, or be operatively coupled to receive data from or transfer data to, or both, one or more mass storage devices for storing data, e.g., magnetic, magneto-optical disks, or optical disks. Information carriers suitable for embodying computer program instructions and data include all forms of non-volatile memory, including by way of example, semiconductor memory devices, e.g., electrically programmable
read-only memory or ROM (EPROM), electrically erasable programmable ROM (EEPROM), flash memory devices, and data storage disks (e.g., magnetic disks, internal hard disks, or removable disks, magneto-optical disks, and CD-ROM and DVD-ROM disks). The processor and the memory can be supplemented by, or incorporated into special-purpose logic circuitry.
[0169] Those of skill in the art understand that information and signals may be represented using any of a variety of different technologies and techniques. For example, data, instructions, commands, information, signals, bits, symbols, and chips that may be referenced throughout the above description may be represented by voltages, currents, electromagnetic waves, magnetic fields or particles, optical fields or particles, or any combination thereof.
[0170] As used herein, “machine-readable medium” (or “computer- readable medium”) means a device able to store instructions and data temporarily or permanently and may include, but is not limited to, randomaccess memory (RAM), read-only memory (ROM), buffer memory, flash memory, optical media, magnetic media, cache memory, other types of storage (e.g., Erasable Programmable Read-Only Memory (EEPROM)), and/or any suitable combination thereof. The term “machine-readable medium” should be taken to include a single medium or multiple media (e.g., a centralized or distributed datastore, or associated caches and servers) able to store processor instructions. The term “machine -readable medium” shall also be taken to include any medium (or a combination of multiple media) that is capable of storing instructions for execution by one or more processors 1305, such that the instructions, when executed by one or more processors 1305, cause the one or more processors 1305 to perform any one or more of the methodologies described herein. Accordingly, a “machine-readable medium” refers to a single storage apparatus or device, as well as “cloud-based” storage systems or storage networks that include multiple storage apparatus or devices. The term “machine- readable medium” as used herein excludes signals per se.
[0171] In addition, techniques, systems, subsystems, and methods described and illustrated in the various embodiments as discrete or separate may be combined or integrated with other systems, modules, techniques, or methods without departing from the scope of the present disclosure. Other items shown
or discussed as coupled or directly coupled or communicating with each other may be indirectly coupled or communicating through some interface, device, or intermediate component whether electrically, mechanically, or otherwise. Other examples of changes, substitutions, and alterations are ascertainable by one skilled in the art and could be made without departing from the scope disclosed herein.
[0172] Although the present disclosure has been described concerning specific features and embodiments thereof, it is evident that various modifications and combinations can be made thereto without departing from the scope of the disclosure. For example, other components may be added to, or removed from, the described systems. The specification and drawings are, accordingly, to be regarded simply as an illustration of the disclosure as defined by the appended claims, and are contemplated to cover any modifications, variations, combinations, or equivalents that fall within the scope of the present disclosure. Other aspects may be within the scope of the following claims.
Claims
1. A computer-implemented method for updating a plurality of replicated data structures of a distributed datastore system, the method comprising: decoding at least one read request for one or more key-value tuples of the replicated data structures, each key- value tuple of the one or more of key- value tuples comprising a key and a corresponding conflict-free replicated data type (CRDT) value; locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read- locked replicas of the one or more key-value tuples; merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request; maintaining an update summary on each of the read-locked replicas; decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set; validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas; activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas; maintaining an update summary on each of the write-locked replicas; accepting the commit request when the validating and the activating are successful; unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas; and merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
2. The computer-implemented method of claim 1, further comprising: updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked; and
updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
3. The computer-implemented method of claim 1, wherein accepting the commit request comprises: updating a current CRDT value of each of the write-locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
4. The computer-implemented method of any of claims 1-3, further comprising: failing the locking of at least one of the replicas of each key-value tuple of the one or more of key- value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction; and failing the locking of at least one of the replicas of the at least one keyvalue tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
5. The computer-implemented method of claim 1, further comprising: ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked; and updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
6. The computer-implemented method of claim 1, wherein an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
7. The computer-implemented method of any of claims 1-6, further comprising:
incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
8. The computer-implemented method of any of claims 1-6, further comprising: denying the commit request when one of the validating or the activating is unsuccessful.
9. A node for updating a plurality of replicated data structures of a distributed datastore system, the node comprising: a memory storing instructions; and at least one processor in communication with the memory, the at least one processor configured, upon execution of the instructions, to perform operations comprising: decoding at least one read request for one or more key-value tuples of the replicated data structures, each key-value tuple of the one or more of key-value tuples comprising a key and a corresponding conflict- free replicated data type (CRDT) value; locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples; merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request; maintaining an update summary on each of the read-locked replicas; decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set; validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas; activating a write lock of the at least one key-value tuple of the write set resulting in write-locked replicas; maintaining an update summary on each of the write-locked replicas;
accepting the commit request when the validating and the activating are successful; unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas; and merging the update summary associated with any one of the read- locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
10. The node of claim 9, the operations further comprising: updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked; and updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
11. The node of claim 9, wherein the operations for accepting the commit request comprise: updating a current CRDT value of each of the write-locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
12. The node of any of claims 9-11, the operations further comprising: failing the locking of at least one of the replicas of each key-value tuple of the one or more of key- value tuples in the at least one read request when the at least one of the replicas associated with the read request is locked as a write set of another transaction; and failing the locking of at least one of the replicas of the at least one keyvalue tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
13. The node of claim 9, the operations further comprising:
ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked; and updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
14. The node of claim 9, wherein an unlocked replica of the one or more keyvalue tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
15. The node of any of claims 9-14, the operations further comprising: incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
16. The node of any of claims 9-14, the operations further comprising: denying the commit request when one of the validating or the activating is unsuccessful.
17. A non-transitory computer-readable medium storing computer instructions for updating a plurality of replicated data structures of a distributed datastore system, wherein the instructions when executed by one or more processors of a node, cause the one or more processors to perform operations comprising: decoding at least one read request for one or more key-value tuples of the replicated data structures, each key- value tuple of the one or more of key- value tuples comprising a key and a corresponding conflict-free replicated data type (CRDT) value; locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read- locked replicas of the one or more key-value tuples; merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request; maintaining an update summary on each of the read-locked replicas;
decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set; validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas; activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas; maintaining an update summary on each of the write-locked replicas; accepting the commit request when the validating and the activating are successful; unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas; and merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
18. The non-transitory computer-readable medium of claim 17, the operations further comprising: updating a current CRDT value of a replica of a key-value tuple in the distributed datastore system according to one eventual consistency update when the replica is not locked; and updating an update summary of the replica of the key-value tuple in the distributed datastore system according to the one eventual consistency update when the replica is either read-locked or write-locked.
19. The non-transitory computer-readable medium of claim 17, wherein the operations for accepting the commit request comprise: updating a current CRDT value of each of the write-locked replicas of at least one key-value tuple in the write set by a CRDT value of the at least one key-value tuple in the write set.
20. The non-transitory computer-readable medium of any of claims 17-19, the operations further comprising: failing the locking of at least one of the replicas of each key-value tuple of the one or more of key- value tuples in the at least one read request when the at
least one of the replicas associated with the read request is locked as a write set of another transaction; and failing the locking of at least one of the replicas of the at least one keyvalue tuple of the write set when the at least one of the replicas associated with the commit request is already locked as a write set of another transaction.
21. The non-transitory computer-readable medium of claim 17, the operations further comprising: ignoring a gossip of a CRDT value to a replica of a key-value tuple in the distributed datastore system from another replica if the replica is locked; and updating a current CRDT value of the replica of a key-value tuple in the distributed datastore system with the CRDT value in the gossip from the another replica if the replica is unlocked.
22. The non-transitory computer-readable medium of claim 17, wherein an unlocked replica of the one or more key-value tuples is a replica that is no longer locked as a read set or a write set by at least one data access transaction.
23. The non-transitory computer-readable medium of any of claims 17-22, the operations further comprising: incrementing a version number of the at least one key-value tuple of the write set when the commit request is accepted.
24. The non-transitory computer-readable medium of any of claims 17-22, the operations further comprising: denying the commit request when one of the validating or the activating is unsuccessful.
25. A system for updating a plurality of replicated data structures of a distributed datastore system, the system comprising: means for decoding at least one read request for one or more key-value tuples of the replicated data structures, each key-value tuple of the one or more of key-value tuples comprising a key and a corresponding conflict-free replicated data type (CRDT) value;
means for locking replicas of each key-value tuple of the one or more of key-value tuples in the at least one read request to obtain locked CRDT values in read-locked replicas of the one or more key-value tuples; means for merging the locked CRDT values of the read-locked replicas into a merged CRDT value onto a read set in response to the at least one read request; means for maintaining an update summary on each of the read-locked replicas; means for decoding a commit request to commit a write set of at least one key-value tuple, the commit request including the read set; means for validating whether the read set in the commit request is unchanged from current CRDT values of the read-locked replicas; means for activating a write lock of the at least one key-value tuple of the write set to obtain write-locked replicas; means for maintaining an update summary on each of the write-locked replicas; means for accepting the commit request when the validating and the activating are successful; means for unlocking the read-locked replicas and the write-locked replicas after the commit request is processed resulting in unlocked replicas; and means for merging the update summary associated with any one of the read-locked replicas and the write-locked replicas that has become an unlocked replica of the unlocked replicas to a current CRDT value of the unlocked replica.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
PCT/US2023/061172 WO2024059351A1 (en) | 2023-01-24 | 2023-01-24 | Mixed-consistency concurrency control (mc3) techniques |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
PCT/US2023/061172 WO2024059351A1 (en) | 2023-01-24 | 2023-01-24 | Mixed-consistency concurrency control (mc3) techniques |
Publications (1)
Publication Number | Publication Date |
---|---|
WO2024059351A1 true WO2024059351A1 (en) | 2024-03-21 |
Family
ID=85285324
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
PCT/US2023/061172 WO2024059351A1 (en) | 2023-01-24 | 2023-01-24 | Mixed-consistency concurrency control (mc3) techniques |
Country Status (1)
Country | Link |
---|---|
WO (1) | WO2024059351A1 (en) |
Citations (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20100036844A1 (en) * | 2004-10-21 | 2010-02-11 | Access Co., Ltd. | System and method of using conflicts to maximize concurrency in a database |
WO2022010388A1 (en) * | 2020-07-08 | 2022-01-13 | Telefonaktiebolaget Lm Ericsson (Publ) | Synchronizing conflict-free replicated data types |
US20220391381A1 (en) * | 2021-06-04 | 2022-12-08 | Apple Inc. | Conflict-free graphs of distributed data structures |
-
2023
- 2023-01-24 WO PCT/US2023/061172 patent/WO2024059351A1/en unknown
Patent Citations (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20100036844A1 (en) * | 2004-10-21 | 2010-02-11 | Access Co., Ltd. | System and method of using conflicts to maximize concurrency in a database |
WO2022010388A1 (en) * | 2020-07-08 | 2022-01-13 | Telefonaktiebolaget Lm Ericsson (Publ) | Synchronizing conflict-free replicated data types |
US20220391381A1 (en) * | 2021-06-04 | 2022-12-08 | Apple Inc. | Conflict-free graphs of distributed data structures |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US20200257672A1 (en) | Atomic processing of compound database transactions that modify a metadata entity | |
JP6626459B2 (en) | Shared File Access-File Services Using REST Interface | |
US8595446B2 (en) | System and method for performing dynamic mixed mode read validation in a software transactional memory | |
US8407195B2 (en) | Efficient multi-version locking for main memory databases | |
US10540119B2 (en) | Distributed shared log storage system having an adapter for heterogenous big data workloads | |
US8473952B2 (en) | System and method for communication between concurrent transactions using transaction communicator objects | |
EP2323047A1 (en) | Primary database system, replication database system and method for replicating data of a primary database system | |
CN108572876B (en) | Method and device for realizing read-write lock | |
US9971822B1 (en) | Replicated state management using journal-based registers | |
EP2572290A2 (en) | Sharing and synchronization of objects | |
JP2016517102A (en) | Method and apparatus for processing replay data in a database | |
US10203995B2 (en) | Method or system for access to shared resource | |
US20060282481A1 (en) | Implementing a tree data storage structure in a distributed environment | |
CN116249978A (en) | Concurrent transactions in database systems | |
US8095731B2 (en) | Mutable object caching | |
Kipf et al. | Scalable analytics on fast data | |
CN110730958A (en) | Method and system for managing prioritized database transactions | |
US8732346B2 (en) | Coordination of direct I/O with a filter | |
US10650021B2 (en) | Managing data operations in an integrated database system | |
Ye et al. | Polaris: Enabling transaction priority in optimistic concurrency control | |
CN110546609A (en) | Hardware Transactional Memory (HTM) assisted database transactions | |
WO2024188050A1 (en) | File lock management method for distributed file system, and device and medium | |
WO2024059351A1 (en) | Mixed-consistency concurrency control (mc3) techniques | |
US10860402B2 (en) | Long-running storage manageability operation management | |
Zhu et al. | Interactive transaction processing for in-memory database system |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
121 | Ep: the epo has been informed by wipo that ep was designated in this application |
Ref document number: 23706512 Country of ref document: EP Kind code of ref document: A1 |