Store current fd to avoid conflict.
This commit is contained in:
parent
0d74c82e7c
commit
aa6bc4952e
1 changed files with 26 additions and 24 deletions
|
@ -413,6 +413,7 @@ pub const Context = struct {
|
|||
// handle messages
|
||||
// => loop over self.pollfd.items
|
||||
for (self.pollfd.items, 0..) |*fd, i| {
|
||||
var current_fd = fd.fd;
|
||||
// .revents is POLLIN
|
||||
if (fd.revents & std.os.linux.POLL.IN > 0) {
|
||||
// SERVER = new connection
|
||||
|
@ -422,23 +423,23 @@ pub const Context = struct {
|
|||
}
|
||||
// SWITCHED = send message to the right dest (or drop the switch)
|
||||
else if (self.connections.items[i].t == .SWITCHED) {
|
||||
current_event = self.switchdb.handle_event_read(i, fd.fd);
|
||||
current_event = self.switchdb.handle_event_read(i, current_fd);
|
||||
switch (current_event.t) {
|
||||
.SWITCH_RX => {
|
||||
try self.schedule(current_event.m.?);
|
||||
},
|
||||
.DISCONNECTION => {
|
||||
var dest = try self.switchdb.getDest(fd.fd);
|
||||
log.debug("disconnection from {} -> removing {}, too", .{ fd.fd, dest });
|
||||
self.switchdb.nuke(fd.fd);
|
||||
self.safe_close_fd(fd.fd);
|
||||
var dest = try self.switchdb.getDest(current_fd);
|
||||
log.debug("disconnection from {} -> removing {}, too", .{ current_fd, dest });
|
||||
self.switchdb.nuke(current_fd);
|
||||
self.safe_close_fd(current_fd);
|
||||
self.safe_close_fd(dest);
|
||||
},
|
||||
.ERROR => {
|
||||
var dest = try self.switchdb.getDest(fd.fd);
|
||||
log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
|
||||
self.switchdb.nuke(fd.fd);
|
||||
self.safe_close_fd(fd.fd);
|
||||
var dest = try self.switchdb.getDest(current_fd);
|
||||
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
||||
self.switchdb.nuke(current_fd);
|
||||
self.safe_close_fd(current_fd);
|
||||
self.safe_close_fd(dest);
|
||||
},
|
||||
else => {
|
||||
|
@ -450,7 +451,7 @@ pub const Context = struct {
|
|||
}
|
||||
// EXTERNAL = user handles IO
|
||||
else if (self.connections.items[i].t == .EXTERNAL) {
|
||||
return Event.init(Event.Type.EXTERNAL, i, fd.fd, null);
|
||||
return Event.init(Event.Type.EXTERNAL, i, current_fd, null);
|
||||
}
|
||||
// otherwise = new message or disconnection
|
||||
else {
|
||||
|
@ -458,26 +459,26 @@ pub const Context = struct {
|
|||
error.ConnectionResetByPeer => {
|
||||
log.warn("connection reset by peer", .{});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
},
|
||||
error.wrongMessageLength => {
|
||||
log.warn("wrong message length, terminating the connection", .{});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
},
|
||||
else => {
|
||||
log.warn("unmanaged error while reading a message ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
},
|
||||
};
|
||||
|
||||
if (maybe_message) |m| {
|
||||
return Event.init(Event.Type.MESSAGE_RX, i, fd.fd, m);
|
||||
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m);
|
||||
}
|
||||
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -503,10 +504,10 @@ pub const Context = struct {
|
|||
switch (current_event.t) {
|
||||
.SWITCH_TX => {},
|
||||
.ERROR => {
|
||||
var dest = try self.switchdb.getDest(fd.fd);
|
||||
log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
|
||||
self.switchdb.nuke(fd.fd);
|
||||
self.safe_close_fd(fd.fd);
|
||||
var dest = try self.switchdb.getDest(current_fd);
|
||||
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
||||
self.switchdb.nuke(current_fd);
|
||||
self.safe_close_fd(current_fd);
|
||||
self.safe_close_fd(dest);
|
||||
},
|
||||
else => {
|
||||
|
@ -521,21 +522,21 @@ pub const Context = struct {
|
|||
error.BrokenPipe => {
|
||||
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
},
|
||||
else => {
|
||||
log.warn("unmanaged error while sending a message ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
},
|
||||
};
|
||||
return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null);
|
||||
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null);
|
||||
}
|
||||
}
|
||||
// .revent is POLLHUP
|
||||
if (fd.revents & std.os.linux.POLL.HUP > 0) {
|
||||
// handle disconnection
|
||||
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
||||
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
try self.close(i);
|
||||
return current_event;
|
||||
}
|
||||
|
@ -543,7 +544,7 @@ pub const Context = struct {
|
|||
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
|
||||
(fd.revents & std.os.linux.POLL.NVAL > 0))
|
||||
{
|
||||
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -569,6 +570,7 @@ pub const Context = struct {
|
|||
self.allocator.free(path);
|
||||
}
|
||||
var pollfd = self.pollfd.swapRemove(index);
|
||||
log.debug("closed client index {} (fd = {})", .{ index, pollfd.fd });
|
||||
std.os.close(pollfd.fd);
|
||||
|
||||
// Remove all its non-sent messages.
|
||||
|
|
Loading…
Add table
Reference in a new issue