Merge branch 'master' + DODB::CachedDataBase.
commit
4fe2719f31
89
spec/test.cr
89
spec/test.cr
|
@ -397,5 +397,94 @@ describe "DODB::DataBase" do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "parallel support" do
|
||||
# Not sure how many forks would be safe in a test like that.
|
||||
fork_count = 25
|
||||
entries_per_fork = 100
|
||||
|
||||
it "works for pushing values" do
|
||||
db = DODB::SpecDataBase.new
|
||||
|
||||
processes = [] of Process
|
||||
|
||||
fork_count.times do |fork_id|
|
||||
processes << Process.fork do
|
||||
entries_per_fork.times do |entry_id|
|
||||
db << Ship.new("entry-#{fork_id}-#{entry_id}", "???")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
processes.each &.wait
|
||||
|
||||
dump = db.to_a
|
||||
|
||||
dump.size.should eq fork_count * entries_per_fork
|
||||
end
|
||||
|
||||
it "works for updating values" do
|
||||
db = DODB::SpecDataBase.new
|
||||
db_entries_by_name = db.new_index "name", &.name
|
||||
|
||||
# First pass, creating data.
|
||||
processes = [] of Process
|
||||
fork_count.times do |fork_id|
|
||||
processes << Process.fork do
|
||||
entries_per_fork.times do |entry_id|
|
||||
db << Ship.new("entry-#{fork_id}-#{entry_id}", "???")
|
||||
end
|
||||
end
|
||||
end
|
||||
processes.each &.wait
|
||||
|
||||
# Second pass, updating data.
|
||||
processes = [] of Process
|
||||
fork_count.times do |fork_id|
|
||||
processes << Process.fork do
|
||||
entries_per_fork.times do |entry_id|
|
||||
db_entries_by_name.update Ship.new("entry-#{fork_id}-#{entry_id}", "???", tags: ["updated"])
|
||||
end
|
||||
end
|
||||
end
|
||||
processes.each &.wait
|
||||
|
||||
# Third pass, testing database content.
|
||||
dump = db.to_a
|
||||
|
||||
fork_count.times do |fork_id|
|
||||
entries_per_fork.times do |entry_id|
|
||||
entry = db_entries_by_name.get "entry-#{fork_id}-#{entry_id}"
|
||||
|
||||
entry.tags.should eq ["updated"]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "does parallel-safe updates" do
|
||||
db = DODB::SpecDataBase.new
|
||||
db_entries_by_name = db.new_index "name", &.name
|
||||
|
||||
# We’ll be storing an integer in the "klass" field, and incrementing
|
||||
# it in forks in a second time.
|
||||
db << Ship.new("test", "0")
|
||||
|
||||
processes = [] of Process
|
||||
fork_count.times do |fork_id|
|
||||
processes << Process.fork do
|
||||
entries_per_fork.times do |entry_id|
|
||||
db_entries_by_name.safe_get "test" do |entry|
|
||||
entry.klass = (entry.klass.to_i + 1).to_s
|
||||
|
||||
db_entries_by_name.update "test", entry
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
processes.each &.wait
|
||||
|
||||
db_entries_by_name.get("test").klass.should eq((fork_count * entries_per_fork).to_s)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
75
src/dodb.cr
75
src/dodb.cr
|
@ -9,9 +9,9 @@ abstract class DODB::Storage(V)
|
|||
def initialize(@directory_name : String)
|
||||
end
|
||||
|
||||
def request_lock(name)
|
||||
def request_lock(name, subname = nil)
|
||||
r = -1
|
||||
file_path = get_lock_file_path name
|
||||
file_path = get_lock_file_path name, subname
|
||||
file_perms = 0o644
|
||||
|
||||
flags = LibC::O_EXCL | LibC::O_CREAT
|
||||
|
@ -21,8 +21,8 @@ abstract class DODB::Storage(V)
|
|||
|
||||
LibC.close r
|
||||
end
|
||||
def release_lock(name)
|
||||
File.delete get_lock_file_path name
|
||||
def release_lock(name, subname = nil)
|
||||
File.delete get_lock_file_path name, subname
|
||||
end
|
||||
|
||||
private def index_file
|
||||
|
@ -53,9 +53,14 @@ abstract class DODB::Storage(V)
|
|||
end
|
||||
|
||||
def <<(item : V)
|
||||
request_lock "index"
|
||||
index = last_index + 1
|
||||
self[index] = item
|
||||
self.last_index = index
|
||||
|
||||
release_lock "index"
|
||||
|
||||
index # FIXME: Should we really return the internal key?
|
||||
end
|
||||
|
||||
def each(reversed : Bool = false, start_offset = 0, end_offset : Int32? = nil)
|
||||
|
@ -108,17 +113,9 @@ abstract class DODB::Storage(V)
|
|||
end
|
||||
end
|
||||
|
||||
##
|
||||
# name is the name that will be used on the file system.
|
||||
def new_partition(name : String, &block : Proc(V, String))
|
||||
Partition(V).new(self, @directory_name, name, block).tap do |table|
|
||||
@indexers << table
|
||||
end
|
||||
end
|
||||
|
||||
def new_tags(name : String, &block : Proc(V, Array(String)))
|
||||
Tags(V).new(@directory_name, name, block).tap do |tags|
|
||||
@indexers << tags
|
||||
def new_nilable_index(name : String, &block : Proc(V, String | DODB::NoIndex))
|
||||
Index(V).new(self, @directory_name, name, block).tap do |indexer|
|
||||
@indexers << indexer
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -128,12 +125,30 @@ abstract class DODB::Storage(V)
|
|||
index.not_nil!.as(DODB::Index).get key
|
||||
end
|
||||
|
||||
##
|
||||
# name is the name that will be used on the file system.
|
||||
def new_partition(name : String, &block : Proc(V, String))
|
||||
Partition(V).new(self, @directory_name, name, block).tap do |table|
|
||||
@indexers << table
|
||||
end
|
||||
end
|
||||
|
||||
def get_partition(table_name : String, partition_name : String)
|
||||
partition = @indexers.find &.name.==(table_name)
|
||||
|
||||
partition.not_nil!.as(DODB::Partition).get partition_name
|
||||
end
|
||||
|
||||
def write_partitions(key : Int32, value : V)
|
||||
@indexers.each &.index(stringify_key(key), value)
|
||||
end
|
||||
|
||||
def new_tags(name : String, &block : Proc(V, Array(String)))
|
||||
Tags(V).new(@directory_name, name, block).tap do |tags|
|
||||
@indexers << tags
|
||||
end
|
||||
end
|
||||
|
||||
def get_tags(name, key : String)
|
||||
partition = @indexers.find &.name.==(name)
|
||||
|
||||
|
@ -144,11 +159,9 @@ abstract class DODB::Storage(V)
|
|||
@indexers.each &.check!(stringify_key(key), value, old_value)
|
||||
end
|
||||
|
||||
def write_partitions(key : Int32, value : V)
|
||||
@indexers.each &.index(stringify_key(key), value)
|
||||
end
|
||||
|
||||
def pop
|
||||
request_lock "index"
|
||||
|
||||
index = last_index
|
||||
|
||||
# Some entries may have been removed. We’ll skip over those.
|
||||
|
@ -167,6 +180,8 @@ abstract class DODB::Storage(V)
|
|||
|
||||
last_index = index - 1
|
||||
|
||||
release_lock "index"
|
||||
|
||||
poped
|
||||
end
|
||||
|
||||
|
@ -182,8 +197,12 @@ abstract class DODB::Storage(V)
|
|||
"#{@directory_name}/locks"
|
||||
end
|
||||
|
||||
private def get_lock_file_path(name : String)
|
||||
"#{locks_directory}/#{name}.lock"
|
||||
private def get_lock_file_path(name : String, subname : String? = nil)
|
||||
if subname
|
||||
"#{locks_directory}/#{name}-#{subname}.lock" # FIXME: Separator that causes less collisions?
|
||||
else
|
||||
"#{locks_directory}/#{name}.lock"
|
||||
end
|
||||
end
|
||||
|
||||
private def read(file_path : String)
|
||||
|
@ -221,6 +240,13 @@ abstract class DODB::Storage(V)
|
|||
@indexers.each &.deindex(stringify_key(key), value)
|
||||
end
|
||||
|
||||
def []?(key : Int32) : V?
|
||||
self[key]
|
||||
rescue MissingEntry
|
||||
# FIXME: Only rescue JSON and “no such file” errors.
|
||||
return nil
|
||||
end
|
||||
|
||||
abstract def [](key : Int32)
|
||||
abstract def delete(key : Int32)
|
||||
end
|
||||
|
@ -239,13 +265,6 @@ class DODB::DataBase(V) < DODB::Storage(V)
|
|||
end
|
||||
end
|
||||
|
||||
def []?(key : Int32) : V?
|
||||
self[key]
|
||||
rescue MissingEntry
|
||||
# FIXME: Only rescue JSON and “no such file” errors.
|
||||
return nil
|
||||
end
|
||||
|
||||
def [](key : Int32) : V
|
||||
raise MissingEntry.new(key) unless ::File.exists? file_path key
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ require "./indexer.cr"
|
|||
|
||||
class DODB::Index(V) < DODB::Indexer(V)
|
||||
property name : String
|
||||
property key_proc : Proc(V, String)
|
||||
property key_proc : Proc(V, String | NoIndex) | Proc(V, String)
|
||||
getter storage_root : String
|
||||
|
||||
@storage : DODB::Storage(V)
|
||||
|
@ -34,6 +34,8 @@ class DODB::Index(V) < DODB::Indexer(V)
|
|||
def index(key, value)
|
||||
index_key = key_proc.call value
|
||||
|
||||
return if index_key.is_a? NoIndex
|
||||
|
||||
symlink = file_path_index index_key
|
||||
|
||||
Dir.mkdir_p ::File.dirname symlink
|
||||
|
@ -49,6 +51,8 @@ class DODB::Index(V) < DODB::Indexer(V)
|
|||
def deindex(key, value)
|
||||
index_key = key_proc.call value
|
||||
|
||||
return if index_key.is_a? NoIndex
|
||||
|
||||
symlink = file_path_index index_key
|
||||
|
||||
::File.delete symlink
|
||||
|
@ -68,13 +72,16 @@ class DODB::Index(V) < DODB::Indexer(V)
|
|||
nil
|
||||
end
|
||||
|
||||
# FIXME: Unlock on exception.
|
||||
def safe_get(index : String) : Nil
|
||||
@storage.request_lock @name, index
|
||||
internal_key = get_key(index).to_s
|
||||
@storage.request_lock internal_key
|
||||
|
||||
yield get index
|
||||
|
||||
@storage.release_lock internal_key
|
||||
@storage.release_lock @name, index
|
||||
end
|
||||
|
||||
def safe_get?(index : String, &block : Proc(V | Nil, Nil)) : Nil
|
||||
|
@ -105,6 +112,9 @@ class DODB::Index(V) < DODB::Indexer(V)
|
|||
# in case new_value hasn't changed its index
|
||||
def update(new_value : V)
|
||||
index = key_proc.call new_value
|
||||
|
||||
raise Exception.new "new value is not indexable" if index.is_a? NoIndex
|
||||
|
||||
update index, new_value
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
class DODB::NoIndex
|
||||
end
|
||||
|
||||
module DODB
|
||||
class_getter no_index = NoIndex.new
|
||||
end
|
||||
|
Loading…
Reference in New Issue