diff --git a/spec/test.cr b/spec/test.cr index 3c4d4ce..456da00 100644 --- a/spec/test.cr +++ b/spec/test.cr @@ -397,5 +397,94 @@ describe "DODB::DataBase" do end end end + + describe "parallel support" do + # Not sure how many forks would be safe in a test like that. + fork_count = 25 + entries_per_fork = 100 + + it "works for pushing values" do + db = DODB::SpecDataBase.new + + processes = [] of Process + + fork_count.times do |fork_id| + processes << Process.fork do + entries_per_fork.times do |entry_id| + db << Ship.new("entry-#{fork_id}-#{entry_id}", "???") + end + end + end + + processes.each &.wait + + dump = db.to_a + + dump.size.should eq fork_count * entries_per_fork + end + + it "works for updating values" do + db = DODB::SpecDataBase.new + db_entries_by_name = db.new_index "name", &.name + + # First pass, creating data. + processes = [] of Process + fork_count.times do |fork_id| + processes << Process.fork do + entries_per_fork.times do |entry_id| + db << Ship.new("entry-#{fork_id}-#{entry_id}", "???") + end + end + end + processes.each &.wait + + # Second pass, updating data. + processes = [] of Process + fork_count.times do |fork_id| + processes << Process.fork do + entries_per_fork.times do |entry_id| + db_entries_by_name.update Ship.new("entry-#{fork_id}-#{entry_id}", "???", tags: ["updated"]) + end + end + end + processes.each &.wait + + # Third pass, testing database content. + dump = db.to_a + + fork_count.times do |fork_id| + entries_per_fork.times do |entry_id| + entry = db_entries_by_name.get "entry-#{fork_id}-#{entry_id}" + + entry.tags.should eq ["updated"] + end + end + end + + it "does parallel-safe updates" do + db = DODB::SpecDataBase.new + db_entries_by_name = db.new_index "name", &.name + + # We’ll be storing an integer in the "klass" field, and incrementing + # it in forks in a second time. + db << Ship.new("test", "0") + + processes = [] of Process + fork_count.times do |fork_id| + processes << Process.fork do + entries_per_fork.times do |entry_id| + db_entries_by_name.safe_get "test" do |entry| + entry.klass = (entry.klass.to_i + 1).to_s + + db_entries_by_name.update "test", entry + end + end + end + end + processes.each &.wait + + db_entries_by_name.get("test").klass.should eq((fork_count * entries_per_fork).to_s) + end + end end diff --git a/src/dodb.cr b/src/dodb.cr index 9e69dee..74e321d 100644 --- a/src/dodb.cr +++ b/src/dodb.cr @@ -9,9 +9,9 @@ abstract class DODB::Storage(V) def initialize(@directory_name : String) end - def request_lock(name) + def request_lock(name, subname = nil) r = -1 - file_path = get_lock_file_path name + file_path = get_lock_file_path name, subname file_perms = 0o644 flags = LibC::O_EXCL | LibC::O_CREAT @@ -21,8 +21,8 @@ abstract class DODB::Storage(V) LibC.close r end - def release_lock(name) - File.delete get_lock_file_path name + def release_lock(name, subname = nil) + File.delete get_lock_file_path name, subname end private def index_file @@ -53,9 +53,14 @@ abstract class DODB::Storage(V) end def <<(item : V) + request_lock "index" index = last_index + 1 self[index] = item self.last_index = index + + release_lock "index" + + index # FIXME: Should we really return the internal key? end def each(reversed : Bool = false, start_offset = 0, end_offset : Int32? = nil) @@ -108,17 +113,9 @@ abstract class DODB::Storage(V) end end - ## - # name is the name that will be used on the file system. - def new_partition(name : String, &block : Proc(V, String)) - Partition(V).new(self, @directory_name, name, block).tap do |table| - @indexers << table - end - end - - def new_tags(name : String, &block : Proc(V, Array(String))) - Tags(V).new(@directory_name, name, block).tap do |tags| - @indexers << tags + def new_nilable_index(name : String, &block : Proc(V, String | DODB::NoIndex)) + Index(V).new(self, @directory_name, name, block).tap do |indexer| + @indexers << indexer end end @@ -128,12 +125,30 @@ abstract class DODB::Storage(V) index.not_nil!.as(DODB::Index).get key end + ## + # name is the name that will be used on the file system. + def new_partition(name : String, &block : Proc(V, String)) + Partition(V).new(self, @directory_name, name, block).tap do |table| + @indexers << table + end + end + def get_partition(table_name : String, partition_name : String) partition = @indexers.find &.name.==(table_name) partition.not_nil!.as(DODB::Partition).get partition_name end + def write_partitions(key : Int32, value : V) + @indexers.each &.index(stringify_key(key), value) + end + + def new_tags(name : String, &block : Proc(V, Array(String))) + Tags(V).new(@directory_name, name, block).tap do |tags| + @indexers << tags + end + end + def get_tags(name, key : String) partition = @indexers.find &.name.==(name) @@ -144,11 +159,9 @@ abstract class DODB::Storage(V) @indexers.each &.check!(stringify_key(key), value, old_value) end - def write_partitions(key : Int32, value : V) - @indexers.each &.index(stringify_key(key), value) - end - def pop + request_lock "index" + index = last_index # Some entries may have been removed. We’ll skip over those. @@ -167,6 +180,8 @@ abstract class DODB::Storage(V) last_index = index - 1 + release_lock "index" + poped end @@ -182,8 +197,12 @@ abstract class DODB::Storage(V) "#{@directory_name}/locks" end - private def get_lock_file_path(name : String) - "#{locks_directory}/#{name}.lock" + private def get_lock_file_path(name : String, subname : String? = nil) + if subname + "#{locks_directory}/#{name}-#{subname}.lock" # FIXME: Separator that causes less collisions? + else + "#{locks_directory}/#{name}.lock" + end end private def read(file_path : String) @@ -221,6 +240,13 @@ abstract class DODB::Storage(V) @indexers.each &.deindex(stringify_key(key), value) end + def []?(key : Int32) : V? + self[key] + rescue MissingEntry + # FIXME: Only rescue JSON and “no such file” errors. + return nil + end + abstract def [](key : Int32) abstract def delete(key : Int32) end @@ -239,13 +265,6 @@ class DODB::DataBase(V) < DODB::Storage(V) end end - def []?(key : Int32) : V? - self[key] - rescue MissingEntry - # FIXME: Only rescue JSON and “no such file” errors. - return nil - end - def [](key : Int32) : V raise MissingEntry.new(key) unless ::File.exists? file_path key diff --git a/src/dodb/index.cr b/src/dodb/index.cr index 4e83185..332e157 100644 --- a/src/dodb/index.cr +++ b/src/dodb/index.cr @@ -6,7 +6,7 @@ require "./indexer.cr" class DODB::Index(V) < DODB::Indexer(V) property name : String - property key_proc : Proc(V, String) + property key_proc : Proc(V, String | NoIndex) | Proc(V, String) getter storage_root : String @storage : DODB::Storage(V) @@ -34,6 +34,8 @@ class DODB::Index(V) < DODB::Indexer(V) def index(key, value) index_key = key_proc.call value + return if index_key.is_a? NoIndex + symlink = file_path_index index_key Dir.mkdir_p ::File.dirname symlink @@ -49,6 +51,8 @@ class DODB::Index(V) < DODB::Indexer(V) def deindex(key, value) index_key = key_proc.call value + return if index_key.is_a? NoIndex + symlink = file_path_index index_key ::File.delete symlink @@ -68,13 +72,16 @@ class DODB::Index(V) < DODB::Indexer(V) nil end + # FIXME: Unlock on exception. def safe_get(index : String) : Nil + @storage.request_lock @name, index internal_key = get_key(index).to_s @storage.request_lock internal_key yield get index @storage.release_lock internal_key + @storage.release_lock @name, index end def safe_get?(index : String, &block : Proc(V | Nil, Nil)) : Nil @@ -105,6 +112,9 @@ class DODB::Index(V) < DODB::Indexer(V) # in case new_value hasn't changed its index def update(new_value : V) index = key_proc.call new_value + + raise Exception.new "new value is not indexable" if index.is_a? NoIndex + update index, new_value end diff --git a/src/dodb/no_index.cr b/src/dodb/no_index.cr new file mode 100644 index 0000000..37e7a1c --- /dev/null +++ b/src/dodb/no_index.cr @@ -0,0 +1,8 @@ + +class DODB::NoIndex +end + +module DODB + class_getter no_index = NoIndex.new +end +