dodb.cr/src/dodb/storage.cr

493 lines
13 KiB
Crystal
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# The `DODB::Storage` abstract class defines the specifications of
# subsequent DODB databases (uncached, cached, RAM-only, etc.).
abstract class DODB::Storage(V)
# List of triggers (basic indexes, partitions, tags, etc.).
property triggers = [] of Trigger(V)
property cached_last_key : Int32
# Directory where data and triggers will be written.
property directory_name : String
# Creates a database.
#
# A DODB database is instanciated with a *path* where data will be written.
# Another directory is created where locks can be written.
# In case the database is empty, the *last_key* is set to *-1*.
def initialize(@directory_name : String)
Dir.mkdir_p data_path
Dir.mkdir_p locks_directory
@cached_last_key = init_last_key
end
# Requests a (named) lock.
# Locks prevent concurrent access to the same data.
#
# In case of a request for a lock that is already in use,
# wait for a millisecond then retry, loop until it works.
# A lock is simply an opened file with the `LibC::O_EXCL` flag.
def request_lock(name, subname = nil)
r = -1
file_path = get_lock_file_path name, subname
file_perms = 0o644
flags = LibC::O_EXCL | LibC::O_CREAT
while (r = LibC.open file_path, flags, file_perms) == -1
sleep 1.milliseconds
end
LibC.close r
end
# Releases a (named) lock.
#
# The implementation is simple, it just removes the file.
def release_lock(name, subname = nil)
File.delete get_lock_file_path name, subname
end
private def key_file
"#{@directory_name}/last-key"
end
# Reads the last database *key* from the storage device.
def init_last_key : Int32
File.read(key_file).to_i
rescue
-1
end
# Reads the (cached) last key.
def last_key : Int32
@cached_last_key
end
# Changes the last *key* in the database.
def last_key=(x : Int32)
file = File.open(key_file, "w")
file << x.to_s
file.close
@cached_last_key = x
x
rescue
raise Exception.new "could not update last-key file"
end
# Take a database key and convert it in a formated string. Example: 343 -> "0000000343"
def stringify_key(key : Int32)
# Negative numbers give strange results with Crystals printf.
if key >= 0
"%010i" % key
else
key.to_s
end
end
# Adds a value to the database without a locking mechanism.
#
# For a thread-safe version, use `#<<`.
#
# WARNING: not thread-safe.
def unsafe_add(item : V)
key = last_key + 1
self[key] = item
self.last_key = key
key # FIXME: Should we really return the internal key?
rescue e
raise e
end
# Adds a value to the database, with a locking mechanism to prevent race conditions.
#
# This operation should be thread-safe since a lock is required before tinkering with the database.
# Because of the file-system operations, this function may be a bit slow.
# For single-thread applications, use the `#unsafe_add` operation instead.
def <<(item : V)
request_lock "key"
key = init_last_key + 1
self[key] = item
self.last_key = key
release_lock "key"
key # FIXME: Should we really return the internal key?
rescue e
release_lock "key"
raise e
end
# Lists all entries in the database.
#
# WARNING: Very slow. Try not to use.
def each(reversed : Bool = false, offset = 0, limit : Int32? = nil)
each_with_key(
reversed: reversed,
offset: offset,
limit: limit
) do |item, key|
yield item
end
end
# Converts all the database into an array.
#
# WARNING: Very slow. Try not to use.
def to_a(reversed : Bool = false, offset = 0, limit : Int32? = nil)
array = ::Array(V).new
each(
reversed: reversed,
offset: offset,
limit: limit
) do |value|
array << value
end
array
end
# Converts the entire database into a hash.
#
# WARNING: Very slow. Try not to use.
def to_h(reversed : Bool = false, offset = 0, limit : Int32? = nil)
hash = ::Hash(Int32, V).new
each_with_key(
reversed: reversed,
offset: offset,
limit: limit
) do |element, key|
hash[key] = element
end
hash
end
# Run triggers (indexes, partitions, tags, etc.) for a value.
def run_triggers(key : Int32, value : V)
@triggers.each &.index(stringify_key(key), value)
end
# Creates a new basic index **with a cache**.
# The *name* parameter is the name of the directory that will be created.
def new_index(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::IndexCached(V).new(self, @directory_name, name, block).tap do |trigger|
@triggers << trigger
end
end
# Creates a new basic index **without a cache**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this will be a lot slower than the cached version.
def new_uncached_index(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::Index(V).new(self, @directory_name, name, block).tap do |trigger|
@triggers << trigger
end
end
# Creates a new basic index **only in RAM**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this index is the fastest, but doesn't have a file-system representation.
def new_RAM_index(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::IndexRAMOnly(V).new(self, @directory_name, name, block).tap do |trigger|
@triggers << trigger
end
end
# Gets an *index object* based on its name.
def get_index(name : String, key)
index = @triggers.find &.name.==(name)
index.not_nil!.as(Trigger).get key
end
# Creates a new partition **with a cache**.
# The *name* parameter is the name of the directory that will be created.
def new_partition(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::PartitionCached(V).new(self, @directory_name, name, block).tap do |table|
@triggers << table
end
end
# Creates a new partition **without a cache**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this will be a lot slower than the cached version.
def new_uncached_partition(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::Partition(V).new(self, @directory_name, name, block).tap do |table|
@triggers << table
end
end
# Creates a new partition **only in RAM**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this partition index is the fastest but doesn't have a file-system representation.
def new_RAM_partition(name : String, &block : Proc(V, String | DODB::NoIndex))
Trigger::PartitionRAMOnly(V).new(self, @directory_name, name, block).tap do |table|
@triggers << table
end
end
# Gets an *index (partition) object* based on its name.
def get_partition(table_name : String, partition_name : String)
partition = @triggers.find &.name.==(table_name)
partition.not_nil!.as(DODB::Partition).get partition_name
end
# Creates a new tag **with a cache**.
# The *name* parameter is the name of the directory that will be created.
def new_tags(name : String, &block : Proc(V, Array(String) | DODB::NoIndex))
Trigger::TagsCached(V).new(self, @directory_name, name, block).tap do |tags|
@triggers << tags
end
end
# Creates a new tag **without a cache**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this will be a lot slower than the cached version.
def new_uncached_tags(name : String, &block : Proc(V, Array(String) | DODB::NoIndex))
Trigger::Tags(V).new(self, @directory_name, name, block).tap do |tags|
@triggers << tags
end
end
# Creates a new partition **only in RAM**.
# The *name* parameter is the name of the directory that will be created.
#
# NOTE: this tag index is the fastest but doesn't have a file-system representation.
def new_RAM_tags(name : String, &block : Proc(V, Array(String) | DODB::NoIndex))
Trigger::TagsRAMOnly(V).new(self, @directory_name, name, block).tap do |tags|
@triggers << tags
end
end
# Gets an *index (tag) object* based on its name.
def get_tags(name, key : String)
tag = @triggers.find &.name.==(name)
tag.not_nil!.as(DODB::Tags).get name, key
end
# WARNING: directed graphs haven't been reviewed in YEARS, assume as dead code.
def new_directed_graph(name : String, index : DODB::Trigger(V), &block : Proc(V, Array(String))) : DirectedGraph(V)
Trigger::DirectedGraph(V).new(self, @directory_name, index, name, block).tap do |table|
@triggers << table
end
end
# Checks for collisions in the indexes.
def check_collisions!(key : Int32, value : V, old_value : V?)
@triggers.each &.check!(stringify_key(key), value, old_value)
end
# Retrieves a value and remove it from the database.
def pop
request_lock "key"
key = last_key
# Some entries may have been removed. Well skip over those.
# Not the most efficient if a large number of indices are empty.
while key >= 0 && self[key]?.nil?
key = key - 1
end
if key < 0
return nil
end
poped = self[key]
self.unsafe_delete key
last_key = key - 1
release_lock "key"
poped
end
private def data_path
"#{@directory_name}/data"
end
private def file_path(key : Int32)
"#{data_path}/%010i" % key
end
private def locks_directory : String
"#{@directory_name}/locks"
end
private def get_lock_file_path(name : String, subname : String? = nil)
if subname
"#{locks_directory}/#{name}-#{subname}.lock" # FIXME: Separator that causes less collisions?
else
"#{locks_directory}/#{name}.lock"
end
end
private def read(file_path : String)
V.from_json ::File.read file_path
end
private def remove_data!
FileUtils.rm_rf data_path
Dir.mkdir_p data_path
end
private def nuke_triggers!
@triggers.each do |trigger|
trigger.nuke_trigger
end
end
private def clear_storage_cache!
puts "DODB::Storage(V) clear_storage_cache! (no cache)"
# There's no cache by default.
# This function has to be changed in storage implementations.
end
# NOTE: clears all caches (`storage` and `triggers`).
def clear_cache!
clear_storage_cache!
@triggers.each do |trigger|
trigger.clear_cache!
end
end
# Removes all indices and then rewrites them all.
#
# WARNING: slow operation.
# NOTE: clears all caches (`storage` and `triggers`).
def reindex_everything!
clear_cache!
nuke_triggers!
each_with_key() do |item, key|
run_triggers key, item
end
end
# Removes all indexes of a value.
def remove_triggers(key : Int32, value : V)
@triggers.each &.deindex(stringify_key(key), value)
end
# Gets the data with the *key*.
# In case the data is missing, returns *nil*.
def []?(key : Int32) : V?
self[key]
rescue MissingEntry
# FIXME: Only rescue JSON and “no such file” errors.
return nil
end
# Gets the data with the *key*.
# In case the data is missing, returns an exception `DODB::MissingEntry`.
def [](key : Int32) : V
raise MissingEntry.new(key) unless ::File.exists? file_path key
read file_path key
end
# Replaces the data with the *key*.
# In case the data is missing, returns an exception `DODB::MissingEntry`.
def []=(key : Int32, value : V)
old_value = self.[key]?
check_collisions! key, value, old_value
# Removes any old indices or partitions pointing to a value about
# to be replaced.
if old_value
remove_triggers key, old_value
end
# Avoids corruption in case the application crashes while writing.
file_path(key).tap do |path|
::File.write "#{path}.new", value.to_json
::FileUtils.mv "#{path}.new", path
end
run_triggers key, value
if key > @cached_last_key
self.last_key = key
end
end
# Deletes the data with the *key* but doesn't request for a lock.
# This function is required since `#pop` is already locked before trying to remove an entry,
# thus calling `#unsafe_delete`.
def unsafe_delete(key : Int32)
value = self[key]?
return if value.nil?
begin
::File.delete file_path key
rescue File::NotFoundError
end
remove_triggers key, value
value
end
# Deletes the data with the *key*.
def delete(key : Int32)
request_lock "key"
value = unsafe_delete key
release_lock "key"
value
end
# Lists all the keys in the database.
def each_key_from_fs(reversed = false)
# Removes the first two "." and ".." directories.
keys = Dir.children(data_path).map(&.to_i).sort
(reversed ? keys.reverse : keys).each do |key|
yield key
end
end
# Lists all database entries with their key.
# Can be useful for making dumps or to restore a database.
#
# WARNING: Very slow. Try not to use.
def each_with_key(reversed : Bool = false, offset = 0, limit : Int32? = -1)
limit = if l = limit
l
else
-1
end
each_key_from_fs(reversed) do |key|
offset -= 1 if offset >= 0
next if offset >= 0
return if limit == 0
limit -= 1 if limit > 0
begin
# FIXME: Only intercept JSON parsing errors.
value = self[key]
rescue
next
end
yield value, key
end
end
end
require "./storage/*"