WIP: new code structure. Need a few fixes.

This commit is contained in:
Karchnu 2020-05-14 17:03:09 +02:00
parent 20e2e55d99
commit 0e4396c0c6
17 changed files with 956 additions and 599 deletions

View File

@ -126,7 +126,7 @@ def file_transfer(client : IPC::Client, file : File, file_info : FileStorage::Fi
# Check for the response
m = client.read
mtype = FileStorage::MessageType.new m.type.to_i32
mtype = FileStorage::MessageType.new m.utype.to_i32
if mtype != FileStorage::MessageType::Response
pp! m
raise "Message received was not expected: #{mtype}"

View File

@ -1,207 +0,0 @@
require "uuid"
require "openssl"
require "json"
require "base64"
module FileStorage
extend self
# 1 MB read buffer, on-disk
def file_reading_buffer_size
1_000_000
end
# 1 KB message data buffer, on-network
def message_buffer_size
1_000
end
class Exception < ::Exception
end
enum MessageType
Error
Authentication
UploadRequest
DownloadRequest
Response
Responses
Transfer
end
class Chunk
include JSON::Serializable
property n : Int32 # chunk's number
property on : Int32 # number of chunks
property digest : String # digest of the current chunk
def initialize(@n, @on, data)
@digest = FileStorage.data_digest data.to_slice
end
end
# For now, upload and download are sequentials.
# In a future version, we will be able to send
# arbitrary parts of each file.
class Token
include JSON::Serializable
property uid : Int32
property login : String
def initialize(@uid, @login)
end
end
# Who knows, maybe someday we will be on UDP, too.
#class SHA256
# JSON.mapping({
# chunk: Slice(UInt8)
# })
#end
# A file has a name, a size and tags.
class FileInfo
include JSON::Serializable
property name : String
property size : UInt64
property nb_chunks : Int32
property digest : String
# list of SHA256, if we are on UDP
# chunks: Array(SHA256),
property tags : Array(String)
def initialize(file : File, tags = nil)
@name = File.basename file.path
@size = file.size
@digest = FileStorage.file_digest file
@nb_chunks = (@size / FileStorage.message_buffer_size).ceil.to_i
@tags = tags || [] of String
end
end
class Message
end
alias Request = UploadRequest | DownloadRequest
class UploadRequest < Message
include JSON::Serializable
property mid : String # autogenerated
property file : FileInfo
def initialize(@file : FileInfo)
@mid = UUID.random.to_s
end
end
# WIP
class DownloadRequest < Message
include JSON::Serializable
property mid : String # autogenerated
property filedigest : String? # SHA256 digest of the file, used as ID
property name : String?
property tags : Array(String)?
def initialize(@filedigest = nil, @name = nil, @tags = nil)
@mid = UUID.random.to_s
end
end
class Authentication < Message
include JSON::Serializable
property mid : String # autogenerated
property token : Token
property uploads : Array(UploadRequest)
property downloads : Array(DownloadRequest)
def initialize(@token, @uploads = Array(UploadRequest).new, @downloads = Array(DownloadRequest).new)
@mid = UUID.random.to_s
end
end
class Response < Message
include JSON::Serializable
property mid : String
property response : String
property reason : String?
def initialize(@mid, @response, @reason = nil)
end
end
class Error < Message
include JSON::Serializable
property mid : String
property response : String # a response for each request
property reason : String?
def initialize(@mid, @response, @reason = nil)
end
end
class Responses < Message
include JSON::Serializable
property mid : String
property responses : Array(Response) # a response for each request
property response : String
property reason : String?
def initialize(@mid, @response, @responses, @reason = nil)
end
end
class Transfer < Message
include JSON::Serializable
property mid : String # autogenerated
property filedigest : String # SHA256 digest of the entire file
property chunk : Chunk # For now, just the counter in a string
property data : String # base64 slice
def initialize(file_info : FileInfo, count, bindata)
# count: chunk number
@filedigest = file_info.digest
@data = Base64.encode bindata
@chunk = FileStorage::Chunk.new count, file_info.nb_chunks - 1, @data
@mid = UUID.random.to_s
end
end
# private function
def data_digest(data : Bytes)
iodata = IO::Memory.new data, false
buffer = Bytes.new FileStorage.file_reading_buffer_size
io = OpenSSL::DigestIO.new(iodata, "SHA256")
while io.read(buffer) > 0; end
io.digest.hexstring
end
# private function
def file_digest(file : File)
# 1M read buffer
buffer = Bytes.new(1_000_000)
io = OpenSSL::DigestIO.new(file, "SHA256")
while io.read(buffer) > 0 ; end
io.digest.hexstring
end
end

View File

@ -0,0 +1,80 @@
# We verify the user's rights to upload files.
# TODO RIGHTS
# if user wants to upload but not allowed to: Response
# if user wants to get a file but not allowed to: Response
# The user is authorized to upload files.
# TODO: quotas
# Quotas are not defined yet.
#class FileStorage::Request
# JSONIPC.request Authentication, 10 do
# property mid : String # autogenerated
# property token : Token
# property uploads : Array(UploadRequest)?
# property downloads : Array(DownloadRequest)?
#
# def initialize(@token, @uploads = nil, @downloads = nil)
# @mid = UUID.random.to_s
# end
#
# # Store the client in connected_users and users_status.
# # If already in users_status:
# # check if the requests are the same
# # if not: add them to the user structure in users_status
# def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events)
#
# token = filestoraged.auth.decode_token @token
# userid = token.uid
#
# puts "user authentication: #{userid}"
#
# # Is the user already recorded in users_status?
# if filestoraged.users_status[userid]?
# puts "We already knew this user"
#
# filestoraged.connected_users[event.connection.fd] = userid
# # TODO
# pp! filestoraged.connected_users
# pp! filestoraged.users_status[userid]
#
# # Let's ignore this error, just continue.
# # It could have been a network error, for example.
# # return FileStorage::Response::Error.new @mid, "user already connected"
# else
# # AuthenticationMessage includes requests.
# new_user = User.new token, @uploads, @downloads
#
# filestoraged.connected_users[event.connection.fd] = userid
#
# # record the new user in users_status
# filestoraged.users_status[userid] = new_user
#
# puts "New user is: #{new_user.token.login}"
# end
#
# # The user is now connected.
# user = filestoraged.users_status[userid]
#
# responses = filestoraged.storage.requests [ @uploads, @downloads ].flatten,
# filestoraged.users_status[userid],
# event
#
# # Sending a response, containing a response for each request.
# FileStorage::Response::Success.new @mid, responses
# end
# end
# FileStorage.requests << Authentication
#end
#
#class FileStorage::Response
# JSONIPC.request AuthenticationSuccess, 10 do
# property mid : String
# property responses : Array(FileStorage::Response)
# def initialize(@mid, @responses)
# end
# end
#end

View File

@ -0,0 +1,55 @@
class FileStorage::Request
JSONIPC.request Download, 30 do
property mid : String # autogenerated
property filedigest : String? # SHA256 digest of the file, used as ID
property name : String?
property tags : Array(String)?
def initialize(@filedigest = nil, @name = nil, @tags = nil)
@mid = UUID.random.to_s
end
def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events)
user = altideald.get_logged_user event
raise Exception.new "unauthorized" if user.nil?
# FIXME: Maybe this should be moved to FileStorage::Service
fd = event.connection.fd
user_data = filestoraged.get_user_data user.uid
filestoraged.storage.download self, event
rescue e
return Response::Error.new @mid, "unauthorized"
end
end
FileStorage.requests << Download
end
class FileStorage::Client
def download(filedigest = nil, name = nil, tags = nil)
request = FileStorage::Request::Download.new filedigest, name, tags
send request
response = parse_message [ FileStorage::Response::Download, FileStorage::Response::Error ], read
case response
when FileStorage::Response::Download
when FileStorage::Response::Error
raise "Download request denied: #{response.reason}"
end
response
end
end
class FileStorage::Response
JSONIPC.request Download, 30 do
property mid : String
def initialize(@mid)
end
end
end

View File

@ -0,0 +1,59 @@
require "uuid"
require "openssl"
require "json"
require "base64"
class FileStorage::Request
JSONIPC.request Login, 0 do
property mid : String
property token : String
def initialize(@token)
@mid = UUID.random.to_s
end
def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events)
logged_users = filestoraged.logged_users
user, _ = filestoraged.decode_token token
# FIXME: Maybe this should be moved to FileStorage::Service
fd = event.connection.fd
filestoraged.logged_users[fd] = user
filestoraged.logged_connections[fd] = event.connection
user_data = filestoraged.get_user_data user.uid
return Response::Login.new @mid
rescue e
return Response::Error.new "unauthorized"
end
end
FileStorage.requests << Login
end
class FileStorage::Client
def login(token : String)
request = FileStorage::Request::Login.new token
send request
response = parse_message [ FileStorage::Response::Login, FileStorage::Response::Error ], read
case response
when FileStorage::Response::Login
# Received favorites, likes, etc.
when FileStorage::Response::Error
raise "user was not logged in: #{response.reason}"
end
response
end
end
class FileStorage::Response
JSONIPC.request Login, 5 do
property mid : String
def initialize(@mid)
end
end
end

View File

@ -0,0 +1,58 @@
class FileStorage::Request
JSONIPC.request Transfer, 40 do
property mid : String # autogenerated
property filedigest : String # SHA256 digest of the entire file
property chunk : Chunk # For now, just the counter in a string
property data : String # base64 slice
def initialize(file_info : FileInfo, count, bindata)
# count: chunk number
@filedigest = file_info.digest
@data = Base64.encode bindata
@chunk = FileStorage::Chunk.new count, file_info.nb_chunks - 1, @data
@mid = UUID.random.to_s
end
def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events)
user = altideald.get_logged_user event
raise Exception.new "unauthorized" if user.nil?
# FIXME: Maybe this should be moved to FileStorage::Service
fd = event.connection.fd
user_data = filestoraged.get_user_data user.uid
filestoraged.storage.transfer self, event
rescue e
return Response::Error.new @mid, "unauthorized"
end
end
FileStorage.requests << Transfer
end
class FileStorage::Client
def transfer(file_info : FileInfo, count, bindata)
request = FileStorage::Request::Transfer.new file_info, count, bindata
send request
response = parse_message [ FileStorage::Response::Transfer, FileStorage::Response::Error ], read
case response
when FileStorage::Response::Transfer
when FileStorage::Response::Error
raise "File chunk was not transfered: #{response.reason}"
end
response
end
end
class FileStorage::Response
JSONIPC.request Transfer, 40 do
property mid : String
def initialize(@mid)
end
end
end

View File

@ -0,0 +1,63 @@
class FileStorage::Request
JSONIPC.request Upload, 20 do
property mid : String # autogenerated
property file : FileInfo
def initialize(@file : FileInfo)
@mid = UUID.random.to_s
end
def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events)
user = altideald.get_logged_user event
raise Exception.new "unauthorized" if user.nil?
# FIXME: Maybe this should be moved to FileStorage::Service
fd = event.connection.fd
user_data = filestoraged.get_user_data user.uid
filestoraged.storage.upload self, event
rescue e
return Response::Error.new @mid, "unauthorized"
end
end
FileStorage.requests << Upload
end
class FileStorage::Client
def upload(token : String)
request = FileStorage::Request::Upload.new token
send request
response = parse_message [ FileStorage::Response::Upload, FileStorage::Response::Error ], read
case response
when FileStorage::Response::Upload
when FileStorage::Response::Error
raise "Upload request failed: #{response.reason}"
end
response
end
end
class FileStorage::Response
JSONIPC.request Upload, 20 do
property mid : String
def initialize(@mid)
end
end
JSONIPC.request Responses, 100 do
include JSON::Serializable
property mid : String
property responses : Array(Response) # a response for each request
property response : String
property reason : String?
def initialize(@mid, @response, @responses, @reason = nil)
end
end
end

View File

@ -1,17 +0,0 @@
OptionParser.parse do |parser|
parser.on "-d storage-directory",
"--storage-directory storage-directory",
"The directory where to put uploaded files." do |opt|
Context.storage_directory = opt
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
Context.service_name = name
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end

View File

@ -1,62 +0,0 @@
require "dodb"
require "json"
# keep track of connected users and their requests
# TODO: requests should be handled concurrently
class User
property uid : Int32
property token : FileStorage::Token
property uploads : Array(FileStorage::UploadRequest)
property downloads : Array(FileStorage::DownloadRequest)
def initialize(@token,
@uploads = Array(FileStorage::UploadRequest).new,
@downloads = Array(FileStorage::DownloadRequest).new)
@uid = token.uid
end
end
class TransferInfo
include JSON::Serializable
property owner : Int32
property file_info : FileStorage::FileInfo
property chunks : Array(Int32)
def initialize(@owner, @file_info)
@chunks = (0...@file_info.nb_chunks).to_a
end
end
class Context
class_property service_name = "filestorage"
class_property storage_directory = "./storage"
class_property file_info_directory = "./file-infos"
class_property db = DODB::DataBase(TransferInfo).new @@file_info_directory
# search file informations by their index, owner and tags
class_property db_by_filedigest : DODB::Index(TransferInfo) = @@db.new_index "filedigest", &.file_info.digest
class_property db_by_owner : DODB::Partition(TransferInfo) = @@db.new_partition "owner", &.owner.to_s
class_property db_by_tags : DODB::Tags(TransferInfo) = @@db.new_tags "tags", &.file_info.tags
def self.db_reconnect
# In case file_info_directory changes: database reinstanciation
@@db = DODB::DataBase(TransferInfo).new @@file_info_directory
# recreate indexes, partitions and tags objects, too
@@db_by_filedigest = @@db.new_index "filedigest", &.file_info.digest
@@db_by_owner = @@db.new_partition "owner", &.owner.to_s
@@db_by_tags = @@db.new_tags "tags", &.file_info.tags
end
# list of connected users (fd => uid)
class_property connected_users = Hash(Int32, Int32).new
# users_status: keep track of the users' status even if they are
# disconnected, allowing the application to handle connection problems
class_property users_status = Hash(Int32, User).new
class_property service : IPC::Service? = nil
end

View File

@ -1,180 +0,0 @@
require "dodb"
require "base64"
require "../common/utils"
# XXX TODO FIXME: architectural questions
# wonder why I should keep the user upload and download requests
# the server can be just for uploads, delegating downloads to HTTP
# reception of a file chunk
def hdl_transfer(message : FileStorage::Transfer, user : User) : FileStorage::Response
# We received a message containing a chunk of file.
mid = message.mid
mid ||= "no message id"
# Get the transfer info from the db
transfer_info = Context.db_by_filedigest.get message.filedigest
if transfer_info.nil?
# The user has to send an upload request before sending anything
# If not the case, it should be discarded
raise "file not recorded"
end
chunk_number = message.chunk.n
data = Base64.decode message.data
# TODO: verify that the chunk sent was really missing
if transfer_info.chunks.select(chunk_number).size > 0
write_a_chunk user.uid.to_s, transfer_info.file_info, chunk_number, data
else
raise "non existent chunk or already uploaded"
end
remove_chunk_from_db transfer_info, chunk_number
# TODO: verify the digest, if no more chunks
FileStorage::Response.new mid, "Ok"
rescue e
puts "Error handling transfer: #{e.message}"
FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}"
end
# the client sent an upload request
def hdl_upload(request : FileStorage::UploadRequest, user : User) : FileStorage::Response
mid = request.mid
mid ||= "no message id"
puts "hdl upload: mid=#{request.mid}"
pp! request
# TODO: verify the rights and quotas of the user
# file_info attributes: name, size, nb_chunks, digest, tags
# First: check if the file already exists
transfer_info = Context.db_by_filedigest.get? request.file.digest
if transfer_info.nil?
# In case file informations aren't already registered
# which is normal at this point
transfer_info = TransferInfo.new user.uid, request.file
Context.db << transfer_info
else
# File information already exists, request may be duplicated
# In this case: ignore the upload request
end
FileStorage::Response.new request.mid, "Upload OK"
rescue e
puts "Error handling transfer: #{e.message}"
FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}"
end
# TODO
# the client sent a download request
def hdl_download(request : FileStorage::DownloadRequest,
user : User) : FileStorage::Response
puts "hdl download: mid=#{request.mid}"
pp! request
FileStorage::Response.new request.mid, "Download OK"
end
# Entry point for request management
# Each request should have a response.
# Then, responses are sent in a single message.
def hdl_requests(requests : Array(FileStorage::Request),
user : User,
event : IPC::Event::Message) : Array(FileStorage::Response)
puts "hdl request"
responses = Array(FileStorage::Response).new
requests.each do |request|
case request
when FileStorage::DownloadRequest
responses << hdl_download request, user
when FileStorage::UploadRequest
responses << hdl_upload request, user
else
raise "request not understood"
end
puts
end
responses
end
# store the client in connected_users and users_status
# if already in users_status:
# check if the requests are the same
# if not: add them to the user structure in users_status
def hdl_authentication(event : IPC::Event::Message)
authentication_message =
FileStorage::Authentication.from_json(
String.new event.message.payload
)
userid = authentication_message.token.uid
puts "user authentication: #{userid}"
# Is the user already recorded in users_status?
if Context.users_status[userid]?
puts "We already knew this user"
Context.connected_users[event.connection.fd] = userid
# TODO
pp! Context.connected_users
pp! Context.users_status[userid]
else
# AuthenticationMessage includes requests.
new_user =
User.new authentication_message.token,
authentication_message.uploads,
authentication_message.downloads
Context.connected_users[event.connection.fd] = userid
# record the new user in users_status
Context.users_status[userid] = new_user
puts "New user is: #{new_user.token.login}"
end
# The user is now connected.
user = Context.users_status[userid]
# We verify the user's rights to upload files.
# TODO RIGHTS
# if user wants to upload but not allowed to: Response
# if user wants to get a file but not allowed to: Response
# The user is authorized to upload files.
# TODO: quotas
# Quotas are not defined yet.
responses = hdl_requests [ authentication_message.uploads, authentication_message.downloads ].flatten,
Context.users_status[userid],
event
# Sending a response, containing a response for each request.
# The response is "Ok" when the message is well received and authorized.
response = FileStorage::Responses.new authentication_message.mid, "Ok", responses
event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json
pp! FileStorage::MessageType::Responses.to_u8
pp! response
end

View File

@ -1,119 +0,0 @@
Context.service = IPC::Service.new Context.service_name
Context.service.not_nil!.loop do |event|
case event
when IPC::Event::Timer
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
when IPC::Event::Connection
puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}"
when IPC::Event::Disconnection
puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}"
Context.connected_users.select! do |fd, uid|
fd != event.connection.fd
end
when IPC::Event::ExtraSocket
puts "#{CRED}IPC::Event::ExtraSocket: should not happen in this service#{CRESET}"
when IPC::Event::Switch
puts "#{CRED}IPC::Event::Switch: should not happen in this service#{CRESET}"
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::Message
puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}"
# The first message sent to the server has to be the AuthenticationMessage.
# Users sent their token (JWT) to authenticate themselves.
# The token contains the user id, its login and a few other parameters.
# (see the authd documentation).
# TODO: for now, the token is replaced by a hardcoded one, for debugging
mtype = FileStorage::MessageType.new event.message.type.to_i32
# First, the user has to be authenticated unless we are receiving its first message
userid = Context.connected_users[event.connection.fd]?
# if the user is not yet connected but does not try to perform authentication
if ! userid && mtype != FileStorage::MessageType::Authentication
# TODO: replace this with an Error message?
mid = "no message id"
response = FileStorage::Response.new mid, "Not OK", "Action on non connected user"
do_response event, response
end
case mtype
when .authentication?
puts "Receiving an authentication message"
# 1. test if the client is already authenticated
if userid
user = Context.users_status[userid]
raise "Authentication message while the user was already connected: this should not happen"
else
puts "User is not currently connected"
hdl_authentication event
end
when .upload_request?
puts "Upload request"
request = FileStorage::UploadRequest.from_json(
String.new event.message.payload
)
response = hdl_upload request, Context.users_status[userid]
do_response event, response
when .download_request?
puts "Download request"
request = FileStorage::DownloadRequest.from_json(
String.new event.message.payload
)
response = hdl_download request, Context.users_status[userid]
do_response event, response
when .response?
puts "Response message"
raise "not implemented yet"
when .responses?
puts "Responses message"
raise "not implemented yet"
when .error?
puts "Error message"
raise "not implemented yet"
when .transfer?
# throw an error if the user isn't recorded
unless user = Context.users_status[userid]?
raise "The user isn't recorded in the users_status structure"
end
transfer = FileStorage::Transfer.from_json(
String.new event.message.payload
)
response = hdl_transfer transfer, Context.users_status[userid]
do_response event, response
end
else
raise "Event type not supported."
end
rescue e
puts "A problem occured : #{e.message}"
end
def do_response(event : IPC::Event::Message,
response : FileStorage::Message)
case response
when FileStorage::Response
event.connection.send FileStorage::MessageType::Response.to_u8, response.to_json
when FileStorage::Responses
event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json
when FileStorage::Error
event.connection.send FileStorage::MessageType::Error.to_u8, response.to_json
else
puts "response should not happen: #{response}"
pp! response
end
end

View File

@ -1,12 +1,10 @@
require "option_parser"
require "ipc"
require "json"
require "authd"
require "../common/colors"
require "../common/filestorage.cr"
require "./context.cr"
require "./handlers.cr"
# require "../common/filestorage.cr"
# TODO: if the user is disconnected, we should ask him if it still want to process
# for old requests.
@ -17,19 +15,305 @@ require "./handlers.cr"
# upload or download its files.
# TODO:
# * elegantly handling errors
# * store the file, /files/userid/UID.bin for example: /files/1002/UID.bin
# * metadata should be in a dodb
# * Authd integration.
# * Elegantly handling errors.
# * Store the file, /files/userid/UID.bin for example: /files/1002/UID.bin.
# * Metadata should be in a dodb.
# /storage/partitions/by_uid/UID.json -> content:
# data: /files/uid/UID.bin (storing raw files)
# uid: 1002
# name: "The File About Things"
# size: 1500
# tags: thing1 thing2
# * authd integration
# * knowing which parts of the files are still to be sent
# * rights
# * quotas
# * Knowing which parts of the files are still to be sent.
# * Rights.
# * Quotas.
require "./cli.cr"
require "./main-loop.cr"
require "./storage.cr"
require "./network.cr"
#### require "./sample.cr"
require "dodb"
require "json"
class FileStorage::Service < IPC::Service
# List of connected users (fd => uid).
property connected_users = Hash(Int32, Int32).new
# users_status: keep track of the users' status even if they are disconnected,
# allowing the application to handle connection problems.
property users_status = Hash(Int32, User).new
# Actual storage.
getter storage : FileStorage::Storage
getter logged_users : Hash(Int32, AuthD::User::Public)
getter logged_connections : Hash(Int32, IPC::Connection)
getter all_connections : Array(Int32)
@auth : AuthD::Client
@auth_key : String
def initialize(storage_directory, file_info_directory, @auth_key)
# Data and metadata storage directories.
# storage_directory : String
# file_info_directory : String
@storage = FileStorage::Storage.new storage_directory, file_info_directory
@logged_users = Hash(Int32, AuthD::User::Public).new
@logged_connections = Hash(Int32, IPC::Connection).new
@all_connections = Array(Int32).new
@auth = AuthD::Client.new
@auth.key = @auth_key
super "filestorage"
end
def get_logged_user(event : IPC::Event::Events)
fd = event.connection.fd
@logged_users[fd]?
end
def info(message)
STDOUT << ":: ".colorize(:green) << message.colorize(:white) << "\n"
end
def warning(message)
STDERR << "?? ".colorize(:yellow) << message.colorize(:yellow) << "\n"
end
def error(message)
STDERR << "!! ".colorize(:red) << message.colorize(:red) << "\n"
end
def decode_token(token : String)
@auth.decode_token token
end
def get_user_data(uid : Int32)
@storage.user_data_per_user.get uid.to_s
rescue e : DODB::MissingEntry
entry = UserData.new uid
entry
end
def get_user_data(user : ::AuthD::User::Public)
get_user_data user.uid
end
def update_user_data(user_data : UserData)
@storage.user_data_per_user.update_or_create user_data.uid.to_s, user_data
end
# TODO: could be useful to send notifications.
#def send_notifications(fd : Int32, value : Int32)
# @all_connections.select(&.!=(fd)).each do |fd| ... end
# IPC::Connection.new(fd).send Response::Something.new ...
#end
def run
info "Starting filestoraged"
self.loop do |event|
begin
case event
when IPC::Event::Timer
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
when IPC::Event::Connection
puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}"
@all_connections << event.connection.fd
when IPC::Event::Disconnection
puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}"
fd = event.connection.fd
@logged_connections.delete fd
@logged_users.delete fd
@all_connections.select! &.!=(fd)
@connected_users.select! do |fd, uid|
fd != event.connection.fd
end
when IPC::Event::ExtraSocket
puts "#{CRED}IPC::Event::ExtraSocket: should not happen in this service#{CRESET}"
when IPC::Event::Switch
puts "#{CRED}IPC::Event::Switch: should not happen in this service#{CRESET}"
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::Message
puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}"
request_start = Time.utc
request = parse_message FileStorage.requests, event.message
if request.nil?
raise "unknown request type"
end
info "<< #{request.class.name.sub /^FileStorage::Request::/, ""}"
response = request.handle self, event
if response.is_a? FileStorage::Response::Error
warning ">> #{response.class.name.sub /^FileStorage::Response::/, ""} (#{response.reason})"
else
info ">> #{response.class.name.sub /^FileStorage::Response::/, ""}"
end
#################################################################
# THERE START
#################################################################
# # The first message sent to the server has to be the AuthenticationMessage.
# # Users sent their token (JWT) to authenticate themselves.
# # The token contains the user id, its login and a few other parameters.
# # (see the authd documentation).
# # TODO: for now, the token is replaced by a hardcoded one, for debugging
#
# mtype = FileStorage::MessageType.new event.message.utype.to_i32
#
# # First, the user has to be authenticated unless we are receiving its first message.
# userid = Context.connected_users[event.connection.fd]?
#
# # If the user is not yet connected but does not try to perform authentication.
# if ! userid && mtype != FileStorage::MessageType::Authentication
# # TODO: replace this with an Error message.
# mid = "no message id"
# response = FileStorage::Response.new mid, "Not OK", "Action on non connected user"
# do_response event, response
# end
#
# case mtype
# when .authentication?
# puts "Receiving an authentication message"
# # Test if the client is already authenticated.
# if userid
# user = Context.users_status[userid]
# raise "Authentication message while the user was already connected: this should not happen"
# else
# puts "User is not currently connected"
# hdl_authentication event
# end
#
# when .upload_request?
# puts "Upload request"
# request = FileStorage::UploadRequest.from_json(
# String.new event.message.payload
# )
# response = hdl_upload request, Context.users_status[userid]
# do_response event, response
#
# when .download_request?
# puts "Download request"
# request = FileStorage::DownloadRequest.from_json(
# String.new event.message.payload
# )
# response = hdl_download request, Context.users_status[userid]
# do_response event, response
#
# when .transfer?
# # throw an error if the user isn't recorded
# unless user = Context.users_status[userid]?
# raise "The user isn't recorded in the users_status structure"
# end
#
# transfer = FileStorage::Transfer.from_json(
# String.new event.message.payload
# )
# response = hdl_transfer transfer, Context.users_status[userid]
#
# do_response event, response
# end
#################################################################
# FINISH
#################################################################
# If clients sent requests with an “id” field, it is copied
# in the responses. Allows identifying responses easily.
response.id = request.id
event.connection.send response
duration = Time.utc - request_start
puts "request took: #{duration}"
else
warning "unhandled IPC event: #{event.class}"
end
# TODOOOOOOOOOO TODO FIXME
rescue exception
error "exception: #{typeof(exception)} - #{exception.message}"
end
end
end
def self.from_cli
storage_directory = "file-data-storage"
file_info_directory = "file-info-storage"
key = "nico-nico-nii" # Default authd key, as per the specs. :eyes:
OptionParser.parse do |parser|
parser.banner = "usage: filestoraged [options]"
parser.on "-d storage-directory",
"--storage-directory storage-directory",
"The directory where to put uploaded files." do |opt|
storage_directory = opt
end
parser.on "-m metadata-directory",
"--metadata-directory storage-directory",
"The directory where to put metadata of uploaded files." do |opt|
file_info_directory = opt
end
parser.on "-h",
"--help",
"Displays this help and exits." do
puts parser
exit 0
end
# FIXME: Either make this mandatory or print a warning if missing.
parser.on "-k file",
"--key file",
"Reads the authentication key from the provided file." do |file|
key = File.read(file).chomp
end
end
::FileStorage::Service.new storage_directory, file_info_directory, key
end
# TODO: this will probably be dropped at some point.
# def do_response(event : IPC::Event::Message,
# response : FileStorage::Message)
#
# case response
# when FileStorage::Response
# event.connection.send FileStorage::MessageType::Response.to_u8, response.to_json
# when FileStorage::Responses
# event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json
# when FileStorage::Error
# event.connection.send FileStorage::MessageType::Error.to_u8, response.to_json
# else
# puts "response should not happen: #{response}"
# pp! response
# end
# end
end
FileStorage::Service.from_cli.run

74
src/server/network.cr Normal file
View File

@ -0,0 +1,74 @@
require "ipc"
require "json"
class JSONIPC
include JSON::Serializable
getter type = -1
class_getter type = -1
property id : JSON::Any?
def handle(service : IPC::Service, event : IPC::Event::Events)
raise "unimplemented"
end
macro request(id, type, &block)
class {{id}} < ::JSONIPC
include JSON::Serializable
@@type = {{type}}
def type
@@type
end
{{yield}}
end
end
end
class IPC::Connection
def send(request : JSONIPC)
send request.type.to_u8, request.to_json
end
end
class FileStorage
class_getter requests = [] of JSONIPC.class
class_getter responses = [] of JSONIPC.class
end
class FileStorage::Client < IPC::Client
def initialize
initialize "filestorage"
end
end
class FileStorage::Response
JSONIPC.request Error, 0 do
property mid : String
property reason : String | Array(String)
def initialize(@mid, @reason)
end
end
JSONIPC.request Success, 1 do
property mid : String
def initialize(@mid)
end
end
end
def parse_message(requests : Array(JSONIPC.class), message : IPC::Message) : JSONIPC?
request_type = requests.find &.type.==(message.utype)
payload = String.new message.payload
if request_type.nil?
raise "invalid request type (#{message.utype})"
end
request_type.from_json payload
end
require "../common/requests/*"

150
src/server/storage.cr Normal file
View File

@ -0,0 +1,150 @@
require "json"
require "uuid"
require "uuid/json"
require "openssl"
require "dodb"
require "base64"
require "../common/utils"
require "./storage/*"
# private function
def digest(value : String)
underlying_io = IO::Memory.new value
buffer = Bytes.new(4096)
io = OpenSSL::DigestIO.new underlying_io, "SHA256"
io.read buffer
io.digest.hexstring
end
# XXX TODO FIXME: architectural questions
# Why keeping upload and download requests?
# The server can be just for uploads, delegating downloads to HTTP.
# In environment without HTTP integration, this could still be pertinent.
class FileStorage::Storage
property db : DODB::DataBase(TransferInfo)
# Search file informations by their index, owner and tags.
property db_by_filedigest : DODB::Index(TransferInfo)
property db_by_owner : DODB::Partition(TransferInfo)
property db_by_tags : DODB::Tags(TransferInfo)
def initialize(storage_directory, file_info_directory)
@db = DODB::DataBase(TransferInfo).new @file_info_directory
# Create indexes, partitions and tags objects.
@db_by_filedigest = @db.new_index "filedigest", &.file_info.digest
@db_by_owner = @db.new_partition "owner", &.owner.to_s
@db_by_tags = @db.new_tags "tags", &.file_info.tags
end
# Reception of a file chunk.
def transfer(message : FileStorage::Transfer, user : User) : FileStorage::Response
# We received a message containing a chunk of file.
mid = message.mid
mid ||= "no message id"
# Get the transfer info from the db
transfer_info = @db_by_filedigest.get message.filedigest
if transfer_info.nil?
# The user has to send an upload request before sending anything.
# If not the case, it should be discarded.
raise "file not recorded"
end
chunk_number = message.chunk.n
data = Base64.decode message.data
# TODO: verify that the chunk sent was really missing.
if transfer_info.chunks.select(chunk_number).size > 0
write_a_chunk user.uid.to_s, transfer_info.file_info, chunk_number, data
else
raise "non existent chunk or already uploaded"
end
remove_chunk_from_db transfer_info, chunk_number
# TODO: verify the digest, if no more chunks.
FileStorage::Response::Transfer.new mid
rescue e
puts "Error handling transfer: #{e.message}"
FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}"
end
# the client sent an upload request
def upload(request : FileStorage::Request::Upload, user : User) : FileStorage::Response
mid = request.mid
mid ||= "no message id"
puts "hdl upload: mid=#{request.mid}"
pp! request
# TODO: verify the rights and quotas of the user
# file_info attributes: name, size, nb_chunks, digest, tags
# First: check if the file already exists
transfer_info = @db_by_filedigest.get? request.file.digest
if transfer_info.nil?
# In case file informations aren't already registered
# which is normal at this point
transfer_info = TransferInfo.new user.uid, request.file
@db << transfer_info
else
# File information already exists, request may be duplicated
# In this case: ignore the upload request
end
FileStorage::Response::Upload.new request.mid
rescue e
puts "Error handling transfer: #{e.message}"
FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}"
end
# TODO
# The client sent a download request.
def download(request : FileStorage::DownloadRequest, user : User) : FileStorage::Response
puts "hdl download: mid=#{request.mid}"
pp! request
FileStorage::Response::Download.new request.mid
end
# Entry point for request management
# Each request should have a response.
# Then, responses are sent in a single message.
def requests(requests : Array(FileStorage::Request),
user : User,
event : IPC::Event::Message) : Array(FileStorage::Response)
puts "hdl request"
responses = Array(FileStorage::Response).new
requests.each do |request|
case request
when FileStorage::DownloadRequest
responses << download request, user
when FileStorage::UploadRequest
responses << upload request, user
else
raise "request not understood"
end
puts
end
responses
end
end

View File

@ -0,0 +1,85 @@
# For now, upload and download are sequentials.
# In a future version, we will be able to send
# arbitrary parts of each file.
# Who knows, maybe someday we will be on UDP, too.
#class SHA256
# JSON.mapping({
# chunk: Slice(UInt8)
# })
#end
class FileStorage
# 1 MB read buffer, on-disk
def self.file_reading_buffer_size
1_000_000
end
# 1 KB message data buffer, on-network
def self.message_buffer_size
1_000
end
# private function
def self.data_digest(data : Bytes)
iodata = IO::Memory.new data, false
buffer = Bytes.new FileStorage.file_reading_buffer_size
io = OpenSSL::DigestIO.new(iodata, "SHA256")
while io.read(buffer) > 0; end
io.digest.hexstring
end
# private function
def self.file_digest(file : File)
# 1M read buffer
buffer = Bytes.new(1_000_000)
io = OpenSSL::DigestIO.new(file, "SHA256")
while io.read(buffer) > 0 ; end
io.digest.hexstring
end
end
class FileStorage::Exception < ::Exception
end
class FileStorage::Chunk
include JSON::Serializable
property n : Int32 # chunk's number
property on : Int32 # number of chunks
property digest : String # digest of the current chunk
def initialize(@n, @on, data)
@digest = FileStorage.data_digest data.to_slice
end
end
# A file has a name, a size and tags.
class FileStorage::FileInfo
include JSON::Serializable
property name : String
property size : UInt64
property nb_chunks : Int32
property digest : String
# list of SHA256, if we are on UDP
# chunks: Array(SHA256),
property tags : Array(String)
def initialize(file : File, tags = nil)
@name = File.basename file.path
@size = file.size
@digest = FileStorage.file_digest file
@nb_chunks = (@size / FileStorage.message_buffer_size).ceil.to_i
@tags = tags || [] of String
end
end

View File

@ -0,0 +1,13 @@
class TransferInfo
include JSON::Serializable
property owner : Int32
property file_info : FileStorage::FileInfo
property chunks : Array(Int32)
def initialize(@owner, @file_info)
@chunks = (0...@file_info.nb_chunks).to_a
end
end

View File

@ -0,0 +1,21 @@
require "json"
require "uuid"
require "uuid/json"
# Keep track of connected users and their requests.
# TODO: requests should be handled concurrently.
class FileStorage::UserData
include JSON::Serializable
property uid : Int32
property token : AuthD::User::Public
property uploads : Array(FileStorage::Request::Upload)
property downloads : Array(FileStorage::Request::Download)
def initialize(@token,
@uploads = Array(FileStorage::Request::Upload).new,
@downloads = Array(FileStorage::Request::Download).new)
@uid = token.uid
end
end