From 0e4396c0c6e07c5fecacb7bdf93fcaa2ead95593 Mon Sep 17 00:00:00 2001 From: Karchnu Date: Thu, 14 May 2020 17:03:09 +0200 Subject: [PATCH] WIP: new code structure. Need a few fixes. --- src/client/main.cr | 2 +- src/common/filestorage.cr | 207 ------------------- src/common/requests/auth.cr | 80 +++++++ src/common/requests/download.cr | 55 +++++ src/common/requests/login.cr | 59 ++++++ src/common/requests/transfer.cr | 58 ++++++ src/common/requests/upload.cr | 63 ++++++ src/server/cli.cr | 17 -- src/server/context.cr | 62 ------ src/server/handlers.cr | 180 ---------------- src/server/main-loop.cr | 119 ----------- src/server/main.cr | 310 ++++++++++++++++++++++++++-- src/server/network.cr | 74 +++++++ src/server/storage.cr | 150 ++++++++++++++ src/server/storage/file_info.cr | 85 ++++++++ src/server/storage/transfer-info.cr | 13 ++ src/server/storage/user-data.cr | 21 ++ 17 files changed, 956 insertions(+), 599 deletions(-) delete mode 100644 src/common/filestorage.cr create mode 100644 src/common/requests/auth.cr create mode 100644 src/common/requests/download.cr create mode 100644 src/common/requests/login.cr create mode 100644 src/common/requests/transfer.cr create mode 100644 src/common/requests/upload.cr delete mode 100644 src/server/cli.cr delete mode 100644 src/server/context.cr delete mode 100644 src/server/handlers.cr delete mode 100644 src/server/main-loop.cr create mode 100644 src/server/network.cr create mode 100644 src/server/storage.cr create mode 100644 src/server/storage/file_info.cr create mode 100644 src/server/storage/transfer-info.cr create mode 100644 src/server/storage/user-data.cr diff --git a/src/client/main.cr b/src/client/main.cr index abb55ac..5fadce6 100644 --- a/src/client/main.cr +++ b/src/client/main.cr @@ -126,7 +126,7 @@ def file_transfer(client : IPC::Client, file : File, file_info : FileStorage::Fi # Check for the response m = client.read - mtype = FileStorage::MessageType.new m.type.to_i32 + mtype = FileStorage::MessageType.new m.utype.to_i32 if mtype != FileStorage::MessageType::Response pp! m raise "Message received was not expected: #{mtype}" diff --git a/src/common/filestorage.cr b/src/common/filestorage.cr deleted file mode 100644 index 4b81e23..0000000 --- a/src/common/filestorage.cr +++ /dev/null @@ -1,207 +0,0 @@ -require "uuid" -require "openssl" -require "json" -require "base64" - -module FileStorage - - extend self - - # 1 MB read buffer, on-disk - def file_reading_buffer_size - 1_000_000 - end - - # 1 KB message data buffer, on-network - def message_buffer_size - 1_000 - end - - class Exception < ::Exception - end - - enum MessageType - Error - Authentication - UploadRequest - DownloadRequest - Response - Responses - Transfer - end - - class Chunk - include JSON::Serializable - - property n : Int32 # chunk's number - property on : Int32 # number of chunks - property digest : String # digest of the current chunk - - def initialize(@n, @on, data) - @digest = FileStorage.data_digest data.to_slice - end - end - - # For now, upload and download are sequentials. - # In a future version, we will be able to send - # arbitrary parts of each file. - - class Token - include JSON::Serializable - - property uid : Int32 - property login : String - - def initialize(@uid, @login) - end - end - - # Who knows, maybe someday we will be on UDP, too. - #class SHA256 - # JSON.mapping({ - # chunk: Slice(UInt8) - # }) - #end - - - # A file has a name, a size and tags. - class FileInfo - include JSON::Serializable - - property name : String - property size : UInt64 - property nb_chunks : Int32 - property digest : String - - # list of SHA256, if we are on UDP - # chunks: Array(SHA256), - property tags : Array(String) - - def initialize(file : File, tags = nil) - @name = File.basename file.path - @size = file.size - @digest = FileStorage.file_digest file - @nb_chunks = (@size / FileStorage.message_buffer_size).ceil.to_i - @tags = tags || [] of String - end - end - - class Message - end - - alias Request = UploadRequest | DownloadRequest - - class UploadRequest < Message - include JSON::Serializable - - property mid : String # autogenerated - property file : FileInfo - - def initialize(@file : FileInfo) - @mid = UUID.random.to_s - end - end - - - # WIP - class DownloadRequest < Message - include JSON::Serializable - - property mid : String # autogenerated - property filedigest : String? # SHA256 digest of the file, used as ID - property name : String? - property tags : Array(String)? - - def initialize(@filedigest = nil, @name = nil, @tags = nil) - @mid = UUID.random.to_s - end - end - - class Authentication < Message - include JSON::Serializable - - property mid : String # autogenerated - property token : Token - property uploads : Array(UploadRequest) - property downloads : Array(DownloadRequest) - - def initialize(@token, @uploads = Array(UploadRequest).new, @downloads = Array(DownloadRequest).new) - @mid = UUID.random.to_s - end - end - - class Response < Message - include JSON::Serializable - - property mid : String - property response : String - property reason : String? - - def initialize(@mid, @response, @reason = nil) - end - end - - class Error < Message - include JSON::Serializable - - property mid : String - property response : String # a response for each request - property reason : String? - - def initialize(@mid, @response, @reason = nil) - end - end - - class Responses < Message - include JSON::Serializable - - property mid : String - property responses : Array(Response) # a response for each request - property response : String - property reason : String? - - def initialize(@mid, @response, @responses, @reason = nil) - end - end - - class Transfer < Message - include JSON::Serializable - - property mid : String # autogenerated - property filedigest : String # SHA256 digest of the entire file - property chunk : Chunk # For now, just the counter in a string - property data : String # base64 slice - - def initialize(file_info : FileInfo, count, bindata) - # count: chunk number - - @filedigest = file_info.digest - @data = Base64.encode bindata - @chunk = FileStorage::Chunk.new count, file_info.nb_chunks - 1, @data - @mid = UUID.random.to_s - end - end - - # private function - def data_digest(data : Bytes) - - iodata = IO::Memory.new data, false - buffer = Bytes.new FileStorage.file_reading_buffer_size - - io = OpenSSL::DigestIO.new(iodata, "SHA256") - while io.read(buffer) > 0; end - - io.digest.hexstring - end - - # private function - def file_digest(file : File) - # 1M read buffer - buffer = Bytes.new(1_000_000) - - io = OpenSSL::DigestIO.new(file, "SHA256") - while io.read(buffer) > 0 ; end - - io.digest.hexstring - end -end diff --git a/src/common/requests/auth.cr b/src/common/requests/auth.cr new file mode 100644 index 0000000..173bbde --- /dev/null +++ b/src/common/requests/auth.cr @@ -0,0 +1,80 @@ + +# We verify the user's rights to upload files. +# TODO RIGHTS +# if user wants to upload but not allowed to: Response +# if user wants to get a file but not allowed to: Response + +# The user is authorized to upload files. + +# TODO: quotas +# Quotas are not defined yet. + +#class FileStorage::Request +# JSONIPC.request Authentication, 10 do +# property mid : String # autogenerated +# property token : Token +# property uploads : Array(UploadRequest)? +# property downloads : Array(DownloadRequest)? +# +# def initialize(@token, @uploads = nil, @downloads = nil) +# @mid = UUID.random.to_s +# end +# +# # Store the client in connected_users and users_status. +# # If already in users_status: +# # check if the requests are the same +# # if not: add them to the user structure in users_status +# def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events) +# +# token = filestoraged.auth.decode_token @token +# userid = token.uid +# +# puts "user authentication: #{userid}" +# +# # Is the user already recorded in users_status? +# if filestoraged.users_status[userid]? +# puts "We already knew this user" +# +# filestoraged.connected_users[event.connection.fd] = userid +# # TODO +# pp! filestoraged.connected_users +# pp! filestoraged.users_status[userid] +# +# # Let's ignore this error, just continue. +# # It could have been a network error, for example. +# # return FileStorage::Response::Error.new @mid, "user already connected" +# else +# # AuthenticationMessage includes requests. +# new_user = User.new token, @uploads, @downloads +# +# filestoraged.connected_users[event.connection.fd] = userid +# +# # record the new user in users_status +# filestoraged.users_status[userid] = new_user +# +# puts "New user is: #{new_user.token.login}" +# end +# +# # The user is now connected. +# user = filestoraged.users_status[userid] +# +# responses = filestoraged.storage.requests [ @uploads, @downloads ].flatten, +# filestoraged.users_status[userid], +# event +# +# # Sending a response, containing a response for each request. +# FileStorage::Response::Success.new @mid, responses +# end +# end +# FileStorage.requests << Authentication +#end +# +#class FileStorage::Response +# JSONIPC.request AuthenticationSuccess, 10 do +# property mid : String +# property responses : Array(FileStorage::Response) +# def initialize(@mid, @responses) +# end +# end +#end + diff --git a/src/common/requests/download.cr b/src/common/requests/download.cr new file mode 100644 index 0000000..e200a0e --- /dev/null +++ b/src/common/requests/download.cr @@ -0,0 +1,55 @@ +class FileStorage::Request + + JSONIPC.request Download, 30 do + property mid : String # autogenerated + property filedigest : String? # SHA256 digest of the file, used as ID + property name : String? + property tags : Array(String)? + def initialize(@filedigest = nil, @name = nil, @tags = nil) + @mid = UUID.random.to_s + end + + def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events) + user = altideald.get_logged_user event + + raise Exception.new "unauthorized" if user.nil? + + # FIXME: Maybe this should be moved to FileStorage::Service + fd = event.connection.fd + + user_data = filestoraged.get_user_data user.uid + + filestoraged.storage.download self, event + rescue e + return Response::Error.new @mid, "unauthorized" + end + end + FileStorage.requests << Download +end + +class FileStorage::Client + def download(filedigest = nil, name = nil, tags = nil) + request = FileStorage::Request::Download.new filedigest, name, tags + send request + + response = parse_message [ FileStorage::Response::Download, FileStorage::Response::Error ], read + + case response + when FileStorage::Response::Download + when FileStorage::Response::Error + raise "Download request denied: #{response.reason}" + end + + response + end +end + + +class FileStorage::Response + JSONIPC.request Download, 30 do + property mid : String + def initialize(@mid) + end + end +end + diff --git a/src/common/requests/login.cr b/src/common/requests/login.cr new file mode 100644 index 0000000..c1d1e98 --- /dev/null +++ b/src/common/requests/login.cr @@ -0,0 +1,59 @@ +require "uuid" +require "openssl" +require "json" +require "base64" + +class FileStorage::Request + JSONIPC.request Login, 0 do + property mid : String + property token : String + def initialize(@token) + @mid = UUID.random.to_s + end + + def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events) + logged_users = filestoraged.logged_users + + user, _ = filestoraged.decode_token token + + # FIXME: Maybe this should be moved to FileStorage::Service + fd = event.connection.fd + + filestoraged.logged_users[fd] = user + filestoraged.logged_connections[fd] = event.connection + + user_data = filestoraged.get_user_data user.uid + + return Response::Login.new @mid + rescue e + return Response::Error.new "unauthorized" + end + end + FileStorage.requests << Login +end + +class FileStorage::Client + def login(token : String) + request = FileStorage::Request::Login.new token + send request + + response = parse_message [ FileStorage::Response::Login, FileStorage::Response::Error ], read + + case response + when FileStorage::Response::Login + # Received favorites, likes, etc. + when FileStorage::Response::Error + raise "user was not logged in: #{response.reason}" + end + + response + end +end + +class FileStorage::Response + JSONIPC.request Login, 5 do + property mid : String + def initialize(@mid) + end + end +end diff --git a/src/common/requests/transfer.cr b/src/common/requests/transfer.cr new file mode 100644 index 0000000..a309aba --- /dev/null +++ b/src/common/requests/transfer.cr @@ -0,0 +1,58 @@ +class FileStorage::Request + JSONIPC.request Transfer, 40 do + property mid : String # autogenerated + property filedigest : String # SHA256 digest of the entire file + property chunk : Chunk # For now, just the counter in a string + property data : String # base64 slice + def initialize(file_info : FileInfo, count, bindata) + # count: chunk number + + @filedigest = file_info.digest + @data = Base64.encode bindata + @chunk = FileStorage::Chunk.new count, file_info.nb_chunks - 1, @data + @mid = UUID.random.to_s + end + + def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events) + user = altideald.get_logged_user event + + raise Exception.new "unauthorized" if user.nil? + + # FIXME: Maybe this should be moved to FileStorage::Service + fd = event.connection.fd + + user_data = filestoraged.get_user_data user.uid + + filestoraged.storage.transfer self, event + rescue e + return Response::Error.new @mid, "unauthorized" + end + end + FileStorage.requests << Transfer +end + +class FileStorage::Client + def transfer(file_info : FileInfo, count, bindata) + request = FileStorage::Request::Transfer.new file_info, count, bindata + send request + + response = parse_message [ FileStorage::Response::Transfer, FileStorage::Response::Error ], read + + case response + when FileStorage::Response::Transfer + when FileStorage::Response::Error + raise "File chunk was not transfered: #{response.reason}" + end + + response + end +end + + +class FileStorage::Response + JSONIPC.request Transfer, 40 do + property mid : String + def initialize(@mid) + end + end +end diff --git a/src/common/requests/upload.cr b/src/common/requests/upload.cr new file mode 100644 index 0000000..8ac2758 --- /dev/null +++ b/src/common/requests/upload.cr @@ -0,0 +1,63 @@ + +class FileStorage::Request + JSONIPC.request Upload, 20 do + property mid : String # autogenerated + property file : FileInfo + def initialize(@file : FileInfo) + @mid = UUID.random.to_s + end + + def handle(filestoraged : FileStorage::Service, event : IPC::Event::Events) + user = altideald.get_logged_user event + + raise Exception.new "unauthorized" if user.nil? + + # FIXME: Maybe this should be moved to FileStorage::Service + fd = event.connection.fd + + user_data = filestoraged.get_user_data user.uid + + filestoraged.storage.upload self, event + rescue e + return Response::Error.new @mid, "unauthorized" + end + end + FileStorage.requests << Upload +end + +class FileStorage::Client + def upload(token : String) + request = FileStorage::Request::Upload.new token + send request + + response = parse_message [ FileStorage::Response::Upload, FileStorage::Response::Error ], read + + case response + when FileStorage::Response::Upload + when FileStorage::Response::Error + raise "Upload request failed: #{response.reason}" + end + + response + end +end + +class FileStorage::Response + JSONIPC.request Upload, 20 do + property mid : String + def initialize(@mid) + end + end + + JSONIPC.request Responses, 100 do + include JSON::Serializable + + property mid : String + property responses : Array(Response) # a response for each request + property response : String + property reason : String? + + def initialize(@mid, @response, @responses, @reason = nil) + end + end +end diff --git a/src/server/cli.cr b/src/server/cli.cr deleted file mode 100644 index 83b3448..0000000 --- a/src/server/cli.cr +++ /dev/null @@ -1,17 +0,0 @@ - -OptionParser.parse do |parser| - parser.on "-d storage-directory", - "--storage-directory storage-directory", - "The directory where to put uploaded files." do |opt| - Context.storage_directory = opt - end - - parser.on "-s service-name", "--service-name service-name", "Service name." do |name| - Context.service_name = name - end - - parser.on "-h", "--help", "Show this help" do - puts parser - exit 0 - end -end diff --git a/src/server/context.cr b/src/server/context.cr deleted file mode 100644 index 2bba0eb..0000000 --- a/src/server/context.cr +++ /dev/null @@ -1,62 +0,0 @@ - -require "dodb" -require "json" - -# keep track of connected users and their requests -# TODO: requests should be handled concurrently -class User - property uid : Int32 - property token : FileStorage::Token - property uploads : Array(FileStorage::UploadRequest) - property downloads : Array(FileStorage::DownloadRequest) - - def initialize(@token, - @uploads = Array(FileStorage::UploadRequest).new, - @downloads = Array(FileStorage::DownloadRequest).new) - @uid = token.uid - end -end - -class TransferInfo - include JSON::Serializable - - property owner : Int32 - property file_info : FileStorage::FileInfo - property chunks : Array(Int32) - - def initialize(@owner, @file_info) - @chunks = (0...@file_info.nb_chunks).to_a - end -end - -class Context - class_property service_name = "filestorage" - class_property storage_directory = "./storage" - class_property file_info_directory = "./file-infos" - - class_property db = DODB::DataBase(TransferInfo).new @@file_info_directory - - # search file informations by their index, owner and tags - class_property db_by_filedigest : DODB::Index(TransferInfo) = @@db.new_index "filedigest", &.file_info.digest - class_property db_by_owner : DODB::Partition(TransferInfo) = @@db.new_partition "owner", &.owner.to_s - class_property db_by_tags : DODB::Tags(TransferInfo) = @@db.new_tags "tags", &.file_info.tags - - def self.db_reconnect - # In case file_info_directory changes: database reinstanciation - @@db = DODB::DataBase(TransferInfo).new @@file_info_directory - - # recreate indexes, partitions and tags objects, too - @@db_by_filedigest = @@db.new_index "filedigest", &.file_info.digest - @@db_by_owner = @@db.new_partition "owner", &.owner.to_s - @@db_by_tags = @@db.new_tags "tags", &.file_info.tags - end - - # list of connected users (fd => uid) - class_property connected_users = Hash(Int32, Int32).new - - # users_status: keep track of the users' status even if they are - # disconnected, allowing the application to handle connection problems - class_property users_status = Hash(Int32, User).new - - class_property service : IPC::Service? = nil -end diff --git a/src/server/handlers.cr b/src/server/handlers.cr deleted file mode 100644 index 93b333d..0000000 --- a/src/server/handlers.cr +++ /dev/null @@ -1,180 +0,0 @@ - -require "dodb" -require "base64" - -require "../common/utils" - -# XXX TODO FIXME: architectural questions -# wonder why I should keep the user upload and download requests -# the server can be just for uploads, delegating downloads to HTTP - - -# reception of a file chunk -def hdl_transfer(message : FileStorage::Transfer, user : User) : FileStorage::Response - - # We received a message containing a chunk of file. - - mid = message.mid - mid ||= "no message id" - - # Get the transfer info from the db - transfer_info = Context.db_by_filedigest.get message.filedigest - - if transfer_info.nil? - # The user has to send an upload request before sending anything - # If not the case, it should be discarded - raise "file not recorded" - end - - chunk_number = message.chunk.n - - data = Base64.decode message.data - - # TODO: verify that the chunk sent was really missing - if transfer_info.chunks.select(chunk_number).size > 0 - write_a_chunk user.uid.to_s, transfer_info.file_info, chunk_number, data - else - raise "non existent chunk or already uploaded" - end - - remove_chunk_from_db transfer_info, chunk_number - - # TODO: verify the digest, if no more chunks - - FileStorage::Response.new mid, "Ok" - -rescue e - puts "Error handling transfer: #{e.message}" - FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}" -end - -# the client sent an upload request -def hdl_upload(request : FileStorage::UploadRequest, user : User) : FileStorage::Response - - mid = request.mid - mid ||= "no message id" - - puts "hdl upload: mid=#{request.mid}" - pp! request - - # TODO: verify the rights and quotas of the user - # file_info attributes: name, size, nb_chunks, digest, tags - - # First: check if the file already exists - transfer_info = Context.db_by_filedigest.get? request.file.digest - if transfer_info.nil? - # In case file informations aren't already registered - # which is normal at this point - transfer_info = TransferInfo.new user.uid, request.file - Context.db << transfer_info - else - # File information already exists, request may be duplicated - # In this case: ignore the upload request - end - - FileStorage::Response.new request.mid, "Upload OK" -rescue e - puts "Error handling transfer: #{e.message}" - FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}" -end - -# TODO -# the client sent a download request -def hdl_download(request : FileStorage::DownloadRequest, - user : User) : FileStorage::Response - - puts "hdl download: mid=#{request.mid}" - pp! request - - FileStorage::Response.new request.mid, "Download OK" -end - - -# Entry point for request management -# Each request should have a response. -# Then, responses are sent in a single message. -def hdl_requests(requests : Array(FileStorage::Request), - user : User, - event : IPC::Event::Message) : Array(FileStorage::Response) - - puts "hdl request" - responses = Array(FileStorage::Response).new - - requests.each do |request| - case request - when FileStorage::DownloadRequest - responses << hdl_download request, user - when FileStorage::UploadRequest - responses << hdl_upload request, user - else - raise "request not understood" - end - - puts - end - - responses -end - -# store the client in connected_users and users_status -# if already in users_status: -# check if the requests are the same -# if not: add them to the user structure in users_status -def hdl_authentication(event : IPC::Event::Message) - - authentication_message = - FileStorage::Authentication.from_json( - String.new event.message.payload - ) - - userid = authentication_message.token.uid - - puts "user authentication: #{userid}" - - # Is the user already recorded in users_status? - if Context.users_status[userid]? - puts "We already knew this user" - - Context.connected_users[event.connection.fd] = userid - # TODO - pp! Context.connected_users - pp! Context.users_status[userid] - else - # AuthenticationMessage includes requests. - new_user = - User.new authentication_message.token, - authentication_message.uploads, - authentication_message.downloads - - Context.connected_users[event.connection.fd] = userid - - # record the new user in users_status - Context.users_status[userid] = new_user - - puts "New user is: #{new_user.token.login}" - end - - # The user is now connected. - user = Context.users_status[userid] - - # We verify the user's rights to upload files. - # TODO RIGHTS - # if user wants to upload but not allowed to: Response - # if user wants to get a file but not allowed to: Response - - # The user is authorized to upload files. - - # TODO: quotas - # Quotas are not defined yet. - - responses = hdl_requests [ authentication_message.uploads, authentication_message.downloads ].flatten, - Context.users_status[userid], - event - - # Sending a response, containing a response for each request. - # The response is "Ok" when the message is well received and authorized. - response = FileStorage::Responses.new authentication_message.mid, "Ok", responses - event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json - pp! FileStorage::MessageType::Responses.to_u8 - pp! response -end diff --git a/src/server/main-loop.cr b/src/server/main-loop.cr deleted file mode 100644 index 33bd0f9..0000000 --- a/src/server/main-loop.cr +++ /dev/null @@ -1,119 +0,0 @@ - -Context.service = IPC::Service.new Context.service_name - -Context.service.not_nil!.loop do |event| - case event - when IPC::Event::Timer - puts "#{CORANGE}IPC::Event::Timer#{CRESET}" - - when IPC::Event::Connection - puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}" - - when IPC::Event::Disconnection - puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}" - - Context.connected_users.select! do |fd, uid| - fd != event.connection.fd - end - - when IPC::Event::ExtraSocket - puts "#{CRED}IPC::Event::ExtraSocket: should not happen in this service#{CRESET}" - - when IPC::Event::Switch - puts "#{CRED}IPC::Event::Switch: should not happen in this service#{CRESET}" - - # IPC::Event::Message has to be the last entry - # because ExtraSocket and Switch inherit from Message class - when IPC::Event::Message - puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" - - # The first message sent to the server has to be the AuthenticationMessage. - # Users sent their token (JWT) to authenticate themselves. - # The token contains the user id, its login and a few other parameters. - # (see the authd documentation). - # TODO: for now, the token is replaced by a hardcoded one, for debugging - - mtype = FileStorage::MessageType.new event.message.type.to_i32 - - # First, the user has to be authenticated unless we are receiving its first message - userid = Context.connected_users[event.connection.fd]? - - # if the user is not yet connected but does not try to perform authentication - if ! userid && mtype != FileStorage::MessageType::Authentication - # TODO: replace this with an Error message? - mid = "no message id" - response = FileStorage::Response.new mid, "Not OK", "Action on non connected user" - do_response event, response - end - - case mtype - when .authentication? - puts "Receiving an authentication message" - # 1. test if the client is already authenticated - if userid - user = Context.users_status[userid] - raise "Authentication message while the user was already connected: this should not happen" - else - puts "User is not currently connected" - hdl_authentication event - end - when .upload_request? - puts "Upload request" - request = FileStorage::UploadRequest.from_json( - String.new event.message.payload - ) - response = hdl_upload request, Context.users_status[userid] - - do_response event, response - when .download_request? - puts "Download request" - request = FileStorage::DownloadRequest.from_json( - String.new event.message.payload - ) - response = hdl_download request, Context.users_status[userid] - - do_response event, response - when .response? - puts "Response message" - raise "not implemented yet" - when .responses? - puts "Responses message" - raise "not implemented yet" - when .error? - puts "Error message" - raise "not implemented yet" - when .transfer? - # throw an error if the user isn't recorded - unless user = Context.users_status[userid]? - raise "The user isn't recorded in the users_status structure" - end - - transfer = FileStorage::Transfer.from_json( - String.new event.message.payload - ) - response = hdl_transfer transfer, Context.users_status[userid] - - do_response event, response - end - else - raise "Event type not supported." - end -rescue e - puts "A problem occured : #{e.message}" -end - -def do_response(event : IPC::Event::Message, - response : FileStorage::Message) - - case response - when FileStorage::Response - event.connection.send FileStorage::MessageType::Response.to_u8, response.to_json - when FileStorage::Responses - event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json - when FileStorage::Error - event.connection.send FileStorage::MessageType::Error.to_u8, response.to_json - else - puts "response should not happen: #{response}" - pp! response - end -end diff --git a/src/server/main.cr b/src/server/main.cr index 49f1964..81052e4 100644 --- a/src/server/main.cr +++ b/src/server/main.cr @@ -1,12 +1,10 @@ require "option_parser" require "ipc" require "json" +require "authd" require "../common/colors" -require "../common/filestorage.cr" - -require "./context.cr" -require "./handlers.cr" +# require "../common/filestorage.cr" # TODO: if the user is disconnected, we should ask him if it still want to process # for old requests. @@ -17,19 +15,305 @@ require "./handlers.cr" # upload or download its files. # TODO: -# * elegantly handling errors -# * store the file, /files/userid/UID.bin for example: /files/1002/UID.bin -# * metadata should be in a dodb +# * Authd integration. +# * Elegantly handling errors. +# * Store the file, /files/userid/UID.bin for example: /files/1002/UID.bin. +# * Metadata should be in a dodb. # /storage/partitions/by_uid/UID.json -> content: # data: /files/uid/UID.bin (storing raw files) # uid: 1002 # name: "The File About Things" # size: 1500 # tags: thing1 thing2 -# * authd integration -# * knowing which parts of the files are still to be sent -# * rights -# * quotas +# * Knowing which parts of the files are still to be sent. +# * Rights. +# * Quotas. -require "./cli.cr" -require "./main-loop.cr" +require "./storage.cr" +require "./network.cr" +#### require "./sample.cr" + +require "dodb" +require "json" + +class FileStorage::Service < IPC::Service + # List of connected users (fd => uid). + property connected_users = Hash(Int32, Int32).new + + # users_status: keep track of the users' status even if they are disconnected, + # allowing the application to handle connection problems. + property users_status = Hash(Int32, User).new + + # Actual storage. + getter storage : FileStorage::Storage + + getter logged_users : Hash(Int32, AuthD::User::Public) + getter logged_connections : Hash(Int32, IPC::Connection) + getter all_connections : Array(Int32) + + @auth : AuthD::Client + @auth_key : String + + def initialize(storage_directory, file_info_directory, @auth_key) + # Data and metadata storage directories. + # storage_directory : String + # file_info_directory : String + + @storage = FileStorage::Storage.new storage_directory, file_info_directory + + @logged_users = Hash(Int32, AuthD::User::Public).new + @logged_connections = Hash(Int32, IPC::Connection).new + @all_connections = Array(Int32).new + + @auth = AuthD::Client.new + @auth.key = @auth_key + + super "filestorage" + end + + def get_logged_user(event : IPC::Event::Events) + fd = event.connection.fd + + @logged_users[fd]? + end + + def info(message) + STDOUT << ":: ".colorize(:green) << message.colorize(:white) << "\n" + end + def warning(message) + STDERR << "?? ".colorize(:yellow) << message.colorize(:yellow) << "\n" + end + def error(message) + STDERR << "!! ".colorize(:red) << message.colorize(:red) << "\n" + end + + def decode_token(token : String) + @auth.decode_token token + end + + def get_user_data(uid : Int32) + @storage.user_data_per_user.get uid.to_s + rescue e : DODB::MissingEntry + entry = UserData.new uid + entry + end + + def get_user_data(user : ::AuthD::User::Public) + get_user_data user.uid + end + + def update_user_data(user_data : UserData) + @storage.user_data_per_user.update_or_create user_data.uid.to_s, user_data + end + + # TODO: could be useful to send notifications. + #def send_notifications(fd : Int32, value : Int32) + # @all_connections.select(&.!=(fd)).each do |fd| ... end + # IPC::Connection.new(fd).send Response::Something.new ... + #end + + def run + info "Starting filestoraged" + + self.loop do |event| + begin + + case event + when IPC::Event::Timer + puts "#{CORANGE}IPC::Event::Timer#{CRESET}" + + when IPC::Event::Connection + puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}" + @all_connections << event.connection.fd + + when IPC::Event::Disconnection + puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}" + fd = event.connection.fd + + @logged_connections.delete fd + @logged_users.delete fd + @all_connections.select! &.!=(fd) + + @connected_users.select! do |fd, uid| + fd != event.connection.fd + end + + when IPC::Event::ExtraSocket + puts "#{CRED}IPC::Event::ExtraSocket: should not happen in this service#{CRESET}" + + when IPC::Event::Switch + puts "#{CRED}IPC::Event::Switch: should not happen in this service#{CRESET}" + + # IPC::Event::Message has to be the last entry + # because ExtraSocket and Switch inherit from Message class + when IPC::Event::Message + puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" + + request_start = Time.utc + + request = parse_message FileStorage.requests, event.message + + if request.nil? + raise "unknown request type" + end + + info "<< #{request.class.name.sub /^FileStorage::Request::/, ""}" + + response = request.handle self, event + + if response.is_a? FileStorage::Response::Error + warning ">> #{response.class.name.sub /^FileStorage::Response::/, ""} (#{response.reason})" + else + info ">> #{response.class.name.sub /^FileStorage::Response::/, ""}" + end + + + ################################################################# + # THERE START + ################################################################# + +# # The first message sent to the server has to be the AuthenticationMessage. +# # Users sent their token (JWT) to authenticate themselves. +# # The token contains the user id, its login and a few other parameters. +# # (see the authd documentation). +# # TODO: for now, the token is replaced by a hardcoded one, for debugging +# +# mtype = FileStorage::MessageType.new event.message.utype.to_i32 +# +# # First, the user has to be authenticated unless we are receiving its first message. +# userid = Context.connected_users[event.connection.fd]? +# +# # If the user is not yet connected but does not try to perform authentication. +# if ! userid && mtype != FileStorage::MessageType::Authentication +# # TODO: replace this with an Error message. +# mid = "no message id" +# response = FileStorage::Response.new mid, "Not OK", "Action on non connected user" +# do_response event, response +# end +# +# case mtype +# when .authentication? +# puts "Receiving an authentication message" +# # Test if the client is already authenticated. +# if userid +# user = Context.users_status[userid] +# raise "Authentication message while the user was already connected: this should not happen" +# else +# puts "User is not currently connected" +# hdl_authentication event +# end +# +# when .upload_request? +# puts "Upload request" +# request = FileStorage::UploadRequest.from_json( +# String.new event.message.payload +# ) +# response = hdl_upload request, Context.users_status[userid] +# do_response event, response +# +# when .download_request? +# puts "Download request" +# request = FileStorage::DownloadRequest.from_json( +# String.new event.message.payload +# ) +# response = hdl_download request, Context.users_status[userid] +# do_response event, response +# +# when .transfer? +# # throw an error if the user isn't recorded +# unless user = Context.users_status[userid]? +# raise "The user isn't recorded in the users_status structure" +# end +# +# transfer = FileStorage::Transfer.from_json( +# String.new event.message.payload +# ) +# response = hdl_transfer transfer, Context.users_status[userid] +# +# do_response event, response +# end + + ################################################################# + # FINISH + ################################################################# + + + # If clients sent requests with an “id” field, it is copied + # in the responses. Allows identifying responses easily. + response.id = request.id + + event.connection.send response + + duration = Time.utc - request_start + puts "request took: #{duration}" + else + warning "unhandled IPC event: #{event.class}" + end + + # TODOOOOOOOOOO TODO FIXME + + + + rescue exception + error "exception: #{typeof(exception)} - #{exception.message}" + end + end + end + + def self.from_cli + storage_directory = "file-data-storage" + file_info_directory = "file-info-storage" + key = "nico-nico-nii" # Default authd key, as per the specs. :eyes: + + OptionParser.parse do |parser| + parser.banner = "usage: filestoraged [options]" + + parser.on "-d storage-directory", + "--storage-directory storage-directory", + "The directory where to put uploaded files." do |opt| + storage_directory = opt + end + + parser.on "-m metadata-directory", + "--metadata-directory storage-directory", + "The directory where to put metadata of uploaded files." do |opt| + file_info_directory = opt + end + + parser.on "-h", + "--help", + "Displays this help and exits." do + puts parser + exit 0 + end + + # FIXME: Either make this mandatory or print a warning if missing. + parser.on "-k file", + "--key file", + "Reads the authentication key from the provided file." do |file| + key = File.read(file).chomp + end + end + + ::FileStorage::Service.new storage_directory, file_info_directory, key + end + + # TODO: this will probably be dropped at some point. +# def do_response(event : IPC::Event::Message, +# response : FileStorage::Message) +# +# case response +# when FileStorage::Response +# event.connection.send FileStorage::MessageType::Response.to_u8, response.to_json +# when FileStorage::Responses +# event.connection.send FileStorage::MessageType::Responses.to_u8, response.to_json +# when FileStorage::Error +# event.connection.send FileStorage::MessageType::Error.to_u8, response.to_json +# else +# puts "response should not happen: #{response}" +# pp! response +# end +# end +end + +FileStorage::Service.from_cli.run diff --git a/src/server/network.cr b/src/server/network.cr new file mode 100644 index 0000000..4e8c06d --- /dev/null +++ b/src/server/network.cr @@ -0,0 +1,74 @@ +require "ipc" +require "json" + +class JSONIPC + include JSON::Serializable + + getter type = -1 + class_getter type = -1 + + property id : JSON::Any? + + def handle(service : IPC::Service, event : IPC::Event::Events) + raise "unimplemented" + end + + macro request(id, type, &block) + class {{id}} < ::JSONIPC + include JSON::Serializable + + @@type = {{type}} + def type + @@type + end + + {{yield}} + end + end +end + +class IPC::Connection + def send(request : JSONIPC) + send request.type.to_u8, request.to_json + end +end + +class FileStorage + class_getter requests = [] of JSONIPC.class + class_getter responses = [] of JSONIPC.class +end + +class FileStorage::Client < IPC::Client + def initialize + initialize "filestorage" + end +end + +class FileStorage::Response + JSONIPC.request Error, 0 do + property mid : String + property reason : String | Array(String) + + def initialize(@mid, @reason) + end + end + JSONIPC.request Success, 1 do + property mid : String + def initialize(@mid) + end + end +end + +def parse_message(requests : Array(JSONIPC.class), message : IPC::Message) : JSONIPC? + request_type = requests.find &.type.==(message.utype) + + payload = String.new message.payload + + if request_type.nil? + raise "invalid request type (#{message.utype})" + end + + request_type.from_json payload +end + +require "../common/requests/*" diff --git a/src/server/storage.cr b/src/server/storage.cr new file mode 100644 index 0000000..afea7b9 --- /dev/null +++ b/src/server/storage.cr @@ -0,0 +1,150 @@ +require "json" +require "uuid" +require "uuid/json" +require "openssl" + +require "dodb" +require "base64" + +require "../common/utils" + +require "./storage/*" + +# private function +def digest(value : String) + + underlying_io = IO::Memory.new value + buffer = Bytes.new(4096) + + io = OpenSSL::DigestIO.new underlying_io, "SHA256" + io.read buffer + + io.digest.hexstring +end + +# XXX TODO FIXME: architectural questions +# Why keeping upload and download requests? +# The server can be just for uploads, delegating downloads to HTTP. +# In environment without HTTP integration, this could still be pertinent. + +class FileStorage::Storage + property db : DODB::DataBase(TransferInfo) + + # Search file informations by their index, owner and tags. + property db_by_filedigest : DODB::Index(TransferInfo) + property db_by_owner : DODB::Partition(TransferInfo) + property db_by_tags : DODB::Tags(TransferInfo) + + def initialize(storage_directory, file_info_directory) + @db = DODB::DataBase(TransferInfo).new @file_info_directory + + # Create indexes, partitions and tags objects. + @db_by_filedigest = @db.new_index "filedigest", &.file_info.digest + @db_by_owner = @db.new_partition "owner", &.owner.to_s + @db_by_tags = @db.new_tags "tags", &.file_info.tags + end + + # Reception of a file chunk. + def transfer(message : FileStorage::Transfer, user : User) : FileStorage::Response + + # We received a message containing a chunk of file. + mid = message.mid + mid ||= "no message id" + + # Get the transfer info from the db + transfer_info = @db_by_filedigest.get message.filedigest + + if transfer_info.nil? + # The user has to send an upload request before sending anything. + # If not the case, it should be discarded. + raise "file not recorded" + end + + chunk_number = message.chunk.n + + data = Base64.decode message.data + + # TODO: verify that the chunk sent was really missing. + if transfer_info.chunks.select(chunk_number).size > 0 + write_a_chunk user.uid.to_s, transfer_info.file_info, chunk_number, data + else + raise "non existent chunk or already uploaded" + end + + remove_chunk_from_db transfer_info, chunk_number + + # TODO: verify the digest, if no more chunks. + + FileStorage::Response::Transfer.new mid + rescue e + puts "Error handling transfer: #{e.message}" + FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}" + end + + # the client sent an upload request + def upload(request : FileStorage::Request::Upload, user : User) : FileStorage::Response + + mid = request.mid + mid ||= "no message id" + + puts "hdl upload: mid=#{request.mid}" + pp! request + + # TODO: verify the rights and quotas of the user + # file_info attributes: name, size, nb_chunks, digest, tags + + # First: check if the file already exists + transfer_info = @db_by_filedigest.get? request.file.digest + if transfer_info.nil? + # In case file informations aren't already registered + # which is normal at this point + transfer_info = TransferInfo.new user.uid, request.file + @db << transfer_info + else + # File information already exists, request may be duplicated + # In this case: ignore the upload request + end + + FileStorage::Response::Upload.new request.mid + rescue e + puts "Error handling transfer: #{e.message}" + FileStorage::Response.new mid.not_nil!, "Not Ok", "Unexpected error: #{e.message}" + end + + # TODO + # The client sent a download request. + def download(request : FileStorage::DownloadRequest, user : User) : FileStorage::Response + + puts "hdl download: mid=#{request.mid}" + pp! request + + FileStorage::Response::Download.new request.mid + end + + + # Entry point for request management + # Each request should have a response. + # Then, responses are sent in a single message. + def requests(requests : Array(FileStorage::Request), + user : User, + event : IPC::Event::Message) : Array(FileStorage::Response) + + puts "hdl request" + responses = Array(FileStorage::Response).new + + requests.each do |request| + case request + when FileStorage::DownloadRequest + responses << download request, user + when FileStorage::UploadRequest + responses << upload request, user + else + raise "request not understood" + end + + puts + end + + responses + end +end diff --git a/src/server/storage/file_info.cr b/src/server/storage/file_info.cr new file mode 100644 index 0000000..cf69ed6 --- /dev/null +++ b/src/server/storage/file_info.cr @@ -0,0 +1,85 @@ + +# For now, upload and download are sequentials. +# In a future version, we will be able to send +# arbitrary parts of each file. + + # Who knows, maybe someday we will be on UDP, too. + #class SHA256 + # JSON.mapping({ + # chunk: Slice(UInt8) + # }) + #end + +class FileStorage + + # 1 MB read buffer, on-disk + def self.file_reading_buffer_size + 1_000_000 + end + + # 1 KB message data buffer, on-network + def self.message_buffer_size + 1_000 + end + + # private function + def self.data_digest(data : Bytes) + + iodata = IO::Memory.new data, false + buffer = Bytes.new FileStorage.file_reading_buffer_size + + io = OpenSSL::DigestIO.new(iodata, "SHA256") + while io.read(buffer) > 0; end + + io.digest.hexstring + end + + # private function + def self.file_digest(file : File) + # 1M read buffer + buffer = Bytes.new(1_000_000) + + io = OpenSSL::DigestIO.new(file, "SHA256") + while io.read(buffer) > 0 ; end + + io.digest.hexstring + end +end + + +class FileStorage::Exception < ::Exception +end + +class FileStorage::Chunk + include JSON::Serializable + + property n : Int32 # chunk's number + property on : Int32 # number of chunks + property digest : String # digest of the current chunk + + def initialize(@n, @on, data) + @digest = FileStorage.data_digest data.to_slice + end +end + +# A file has a name, a size and tags. +class FileStorage::FileInfo + include JSON::Serializable + + property name : String + property size : UInt64 + property nb_chunks : Int32 + property digest : String + + # list of SHA256, if we are on UDP + # chunks: Array(SHA256), + property tags : Array(String) + + def initialize(file : File, tags = nil) + @name = File.basename file.path + @size = file.size + @digest = FileStorage.file_digest file + @nb_chunks = (@size / FileStorage.message_buffer_size).ceil.to_i + @tags = tags || [] of String + end +end diff --git a/src/server/storage/transfer-info.cr b/src/server/storage/transfer-info.cr new file mode 100644 index 0000000..0b3539c --- /dev/null +++ b/src/server/storage/transfer-info.cr @@ -0,0 +1,13 @@ + +class TransferInfo + include JSON::Serializable + + property owner : Int32 + property file_info : FileStorage::FileInfo + property chunks : Array(Int32) + + def initialize(@owner, @file_info) + @chunks = (0...@file_info.nb_chunks).to_a + end +end + diff --git a/src/server/storage/user-data.cr b/src/server/storage/user-data.cr new file mode 100644 index 0000000..5d8d9ca --- /dev/null +++ b/src/server/storage/user-data.cr @@ -0,0 +1,21 @@ +require "json" +require "uuid" +require "uuid/json" + + +# Keep track of connected users and their requests. +# TODO: requests should be handled concurrently. +class FileStorage::UserData + include JSON::Serializable + + property uid : Int32 + property token : AuthD::User::Public + property uploads : Array(FileStorage::Request::Upload) + property downloads : Array(FileStorage::Request::Download) + + def initialize(@token, + @uploads = Array(FileStorage::Request::Upload).new, + @downloads = Array(FileStorage::Request::Download).new) + @uid = token.uid + end +end