diff options
author | Ilya Lukyanov <i@ilyaluk.ru> | 2017-08-08 19:23:50 +0300 |
---|---|---|
committer | Ilya Lukyanov <i@ilyaluk.ru> | 2017-08-08 19:23:50 +0300 |
commit | 1c5a96c112eea193f30360eae831de2e54a32ac2 (patch) | |
tree | 26010ea2ac2ed6715994e4487bb2733a951bfbe0 | |
parent | Merge pull request #1309 from 4e554c4c/racey_kills (diff) | |
download | sway-1c5a96c112eea193f30360eae831de2e54a32ac2.tar.gz sway-1c5a96c112eea193f30360eae831de2e54a32ac2.tar.zst sway-1c5a96c112eea193f30360eae831de2e54a32ac2.zip |
Implement nonblocking IO in IPC server
Added client write buffer and handler for writable status on client
socket.
-rw-r--r-- | sway/ipc-server.c | 101 |
1 files changed, 96 insertions, 5 deletions
diff --git a/sway/ipc-server.c b/sway/ipc-server.c index dca881fa..46a37225 100644 --- a/sway/ipc-server.c +++ b/sway/ipc-server.c | |||
@@ -40,11 +40,15 @@ static const char ipc_magic[] = {'i', '3', '-', 'i', 'p', 'c'}; | |||
40 | 40 | ||
41 | struct ipc_client { | 41 | struct ipc_client { |
42 | struct wlc_event_source *event_source; | 42 | struct wlc_event_source *event_source; |
43 | struct wlc_event_source *writable_event_source; | ||
43 | int fd; | 44 | int fd; |
44 | uint32_t payload_length; | 45 | uint32_t payload_length; |
45 | uint32_t security_policy; | 46 | uint32_t security_policy; |
46 | enum ipc_command_type current_command; | 47 | enum ipc_command_type current_command; |
47 | enum ipc_command_type subscribed_events; | 48 | enum ipc_command_type subscribed_events; |
49 | size_t write_buffer_len; | ||
50 | size_t write_buffer_size; | ||
51 | char *write_buffer; | ||
48 | }; | 52 | }; |
49 | 53 | ||
50 | static list_t *ipc_get_pixel_requests = NULL; | 54 | static list_t *ipc_get_pixel_requests = NULL; |
@@ -58,6 +62,7 @@ struct get_pixels_request { | |||
58 | struct sockaddr_un *ipc_user_sockaddr(void); | 62 | struct sockaddr_un *ipc_user_sockaddr(void); |
59 | int ipc_handle_connection(int fd, uint32_t mask, void *data); | 63 | int ipc_handle_connection(int fd, uint32_t mask, void *data); |
60 | int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data); | 64 | int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data); |
65 | int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data); | ||
61 | void ipc_client_disconnect(struct ipc_client *client); | 66 | void ipc_client_disconnect(struct ipc_client *client); |
62 | void ipc_client_handle_command(struct ipc_client *client); | 67 | void ipc_client_handle_command(struct ipc_client *client); |
63 | bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t payload_length); | 68 | bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t payload_length); |
@@ -168,6 +173,12 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) { | |||
168 | close(client_fd); | 173 | close(client_fd); |
169 | return 0; | 174 | return 0; |
170 | } | 175 | } |
176 | if ((flags = fcntl(client_fd, F_GETFL)) == -1 | ||
177 | || fcntl(client_fd, F_SETFL, flags|O_NONBLOCK) == -1) { | ||
178 | sway_log_errno(L_ERROR, "Unable to set NONBLOCK on IPC client socket"); | ||
179 | close(client_fd); | ||
180 | return 0; | ||
181 | } | ||
171 | 182 | ||
172 | struct ipc_client* client = malloc(sizeof(struct ipc_client)); | 183 | struct ipc_client* client = malloc(sizeof(struct ipc_client)); |
173 | if (!client) { | 184 | if (!client) { |
@@ -179,10 +190,22 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) { | |||
179 | client->fd = client_fd; | 190 | client->fd = client_fd; |
180 | client->subscribed_events = 0; | 191 | client->subscribed_events = 0; |
181 | client->event_source = wlc_event_loop_add_fd(client_fd, WLC_EVENT_READABLE, ipc_client_handle_readable, client); | 192 | client->event_source = wlc_event_loop_add_fd(client_fd, WLC_EVENT_READABLE, ipc_client_handle_readable, client); |
193 | client->writable_event_source = NULL; | ||
194 | |||
195 | client->write_buffer_size = 128; | ||
196 | client->write_buffer_len = 0; | ||
197 | client->write_buffer = malloc(client->write_buffer_size); | ||
198 | if (!client->write_buffer) { | ||
199 | sway_log(L_ERROR, "Unable to allocate ipc client write buffer"); | ||
200 | close(client_fd); | ||
201 | return 0; | ||
202 | } | ||
182 | 203 | ||
183 | pid_t pid = get_client_pid(client->fd); | 204 | pid_t pid = get_client_pid(client->fd); |
184 | client->security_policy = get_ipc_policy_mask(pid); | 205 | client->security_policy = get_ipc_policy_mask(pid); |
185 | 206 | ||
207 | sway_log(L_DEBUG, "New client: fd %d, pid %d", client_fd, pid); | ||
208 | |||
186 | list_add(ipc_client_list, client); | 209 | list_add(ipc_client_list, client); |
187 | 210 | ||
188 | return 0; | 211 | return 0; |
@@ -205,6 +228,8 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { | |||
205 | return 0; | 228 | return 0; |
206 | } | 229 | } |
207 | 230 | ||
231 | sway_log(L_DEBUG, "Client %d readable", client->fd); | ||
232 | |||
208 | int read_available; | 233 | int read_available; |
209 | if (ioctl(client_fd, FIONREAD, &read_available) == -1) { | 234 | if (ioctl(client_fd, FIONREAD, &read_available) == -1) { |
210 | sway_log_errno(L_INFO, "Unable to read IPC socket buffer size"); | 235 | sway_log_errno(L_INFO, "Unable to read IPC socket buffer size"); |
@@ -226,6 +251,7 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { | |||
226 | 251 | ||
227 | uint8_t buf[ipc_header_size]; | 252 | uint8_t buf[ipc_header_size]; |
228 | uint32_t *buf32 = (uint32_t*)(buf + sizeof(ipc_magic)); | 253 | uint32_t *buf32 = (uint32_t*)(buf + sizeof(ipc_magic)); |
254 | // Should be fully available, because read_available >= ipc_header_size | ||
229 | ssize_t received = recv(client_fd, buf, ipc_header_size, 0); | 255 | ssize_t received = recv(client_fd, buf, ipc_header_size, 0); |
230 | if (received == -1) { | 256 | if (received == -1) { |
231 | sway_log_errno(L_INFO, "Unable to receive header from IPC client"); | 257 | sway_log_errno(L_INFO, "Unable to receive header from IPC client"); |
@@ -249,6 +275,48 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { | |||
249 | return 0; | 275 | return 0; |
250 | } | 276 | } |
251 | 277 | ||
278 | int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data) { | ||
279 | struct ipc_client *client = data; | ||
280 | |||
281 | if (mask & WLC_EVENT_ERROR) { | ||
282 | sway_log(L_ERROR, "IPC Client socket error, removing client"); | ||
283 | ipc_client_disconnect(client); | ||
284 | return 0; | ||
285 | } | ||
286 | |||
287 | if (mask & WLC_EVENT_HANGUP) { | ||
288 | sway_log(L_DEBUG, "Client %d hung up", client->fd); | ||
289 | ipc_client_disconnect(client); | ||
290 | return 0; | ||
291 | } | ||
292 | |||
293 | if (client->write_buffer_len <= 0) { | ||
294 | return 0; | ||
295 | } | ||
296 | |||
297 | sway_log(L_DEBUG, "Client %d writable", client->fd); | ||
298 | |||
299 | ssize_t written = write(client->fd, client->write_buffer, client->write_buffer_len); | ||
300 | |||
301 | if (written == -1 && errno == EAGAIN) { | ||
302 | return 0; | ||
303 | } else if (written == -1) { | ||
304 | sway_log_errno(L_INFO, "Unable to send data from queue to IPC client"); | ||
305 | ipc_client_disconnect(client); | ||
306 | return 0; | ||
307 | } | ||
308 | |||
309 | memmove(client->write_buffer, client->write_buffer + written, client->write_buffer_len - written); | ||
310 | client->write_buffer_len -= written; | ||
311 | |||
312 | if (client->write_buffer_len == 0 && client->writable_event_source) { | ||
313 | wlc_event_source_remove(client->writable_event_source); | ||
314 | client->writable_event_source = NULL; | ||
315 | } | ||
316 | |||
317 | return 0; | ||
318 | } | ||
319 | |||
252 | void ipc_client_disconnect(struct ipc_client *client) { | 320 | void ipc_client_disconnect(struct ipc_client *client) { |
253 | if (!sway_assert(client != NULL, "client != NULL")) { | 321 | if (!sway_assert(client != NULL, "client != NULL")) { |
254 | return; | 322 | return; |
@@ -260,9 +328,13 @@ void ipc_client_disconnect(struct ipc_client *client) { | |||
260 | 328 | ||
261 | sway_log(L_INFO, "IPC Client %d disconnected", client->fd); | 329 | sway_log(L_INFO, "IPC Client %d disconnected", client->fd); |
262 | wlc_event_source_remove(client->event_source); | 330 | wlc_event_source_remove(client->event_source); |
331 | if (client->writable_event_source) { | ||
332 | wlc_event_source_remove(client->writable_event_source); | ||
333 | } | ||
263 | int i = 0; | 334 | int i = 0; |
264 | while (i < ipc_client_list->length && ipc_client_list->items[i] != client) i++; | 335 | while (i < ipc_client_list->length && ipc_client_list->items[i] != client) i++; |
265 | list_del(ipc_client_list, i); | 336 | list_del(ipc_client_list, i); |
337 | free(client->write_buffer); | ||
266 | close(client->fd); | 338 | close(client->fd); |
267 | free(client); | 339 | free(client); |
268 | } | 340 | } |
@@ -334,6 +406,7 @@ void ipc_client_handle_command(struct ipc_client *client) { | |||
334 | return; | 406 | return; |
335 | } | 407 | } |
336 | if (client->payload_length > 0) { | 408 | if (client->payload_length > 0) { |
409 | // Payload should be fully available | ||
337 | ssize_t received = recv(client->fd, buf, client->payload_length, 0); | 410 | ssize_t received = recv(client->fd, buf, client->payload_length, 0); |
338 | if (received == -1) | 411 | if (received == -1) |
339 | { | 412 | { |
@@ -590,17 +663,35 @@ bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t pay | |||
590 | data32[0] = payload_length; | 663 | data32[0] = payload_length; |
591 | data32[1] = client->current_command; | 664 | data32[1] = client->current_command; |
592 | 665 | ||
593 | if (write(client->fd, data, ipc_header_size) == -1) { | 666 | while (client->write_buffer_len + ipc_header_size + payload_length >= |
594 | sway_log_errno(L_INFO, "Unable to send header to IPC client"); | 667 | client->write_buffer_size) { |
668 | client->write_buffer_size *= 2; | ||
669 | } | ||
670 | |||
671 | if (client->write_buffer_size > (1 << 22)) { // 4 MB | ||
672 | sway_log(L_ERROR, "Client write buffer too big, disconnecting client"); | ||
673 | ipc_client_disconnect(client); | ||
595 | return false; | 674 | return false; |
596 | } | 675 | } |
597 | 676 | ||
598 | if (write(client->fd, payload, payload_length) == -1) { | 677 | char *new_buffer = realloc(client->write_buffer, client->write_buffer_size); |
599 | sway_log_errno(L_INFO, "Unable to send payload to IPC client"); | 678 | if (!new_buffer) { |
679 | sway_log(L_ERROR, "Unable to reallocate ipc client write buffer"); | ||
680 | ipc_client_disconnect(client); | ||
600 | return false; | 681 | return false; |
601 | } | 682 | } |
683 | client->write_buffer = new_buffer; | ||
684 | |||
685 | memcpy(client->write_buffer + client->write_buffer_len, data, ipc_header_size); | ||
686 | client->write_buffer_len += ipc_header_size; | ||
687 | memcpy(client->write_buffer + client->write_buffer_len, payload, payload_length); | ||
688 | client->write_buffer_len += payload_length; | ||
689 | |||
690 | if (!client->writable_event_source) { | ||
691 | client->writable_event_source = wlc_event_loop_add_fd(client->fd, WLC_EVENT_WRITABLE, ipc_client_handle_writable, client); | ||
692 | } | ||
602 | 693 | ||
603 | sway_log(L_DEBUG, "Send IPC reply: %s", payload); | 694 | sway_log(L_DEBUG, "Added IPC reply to client %d queue: %s", client->fd, payload); |
604 | 695 | ||
605 | return true; | 696 | return true; |
606 | } | 697 | } |