Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/net/tcp/sock.c

    r7c912b6 rb1bd89ea  
    5151#include "ucall.h"
    5252
     53#define FRAGMENT_SIZE 1024
     54
    5355#define MAX_BACKLOG 128
    5456
     
    6466static void tcp_sock_connection(ipc_callid_t iid, ipc_call_t *icall, void *arg);
    6567static void tcp_sock_cstate_cb(tcp_conn_t *conn, void *arg);
    66 static int tcp_sock_recv_fibril(void *arg);
    6768
    6869int tcp_sock_init(void)
     
    9697        async_exch_t *exch = async_exchange_begin(sock_core->sess);
    9798        async_msg_5(exch, NET_SOCKET_RECEIVED, (sysarg_t)sock_core->socket_id,
    98             TCP_SOCK_FRAGMENT_SIZE, 0, 0, 1);
     99            FRAGMENT_SIZE, 0, 0, 1);
    99100        async_exchange_end(exch);
    100101}
     
    105106        async_exch_t *exch = async_exchange_begin(lsock_core->sess);
    106107        async_msg_5(exch, NET_SOCKET_ACCEPTED, (sysarg_t)lsock_core->socket_id,
    107             TCP_SOCK_FRAGMENT_SIZE, 0, 0, 0);
     108            FRAGMENT_SIZE, 0, 0, 0);
    108109        async_exchange_end(exch);
    109110}
    110111
    111 static int tcp_sock_create(tcp_client_t *client, tcp_sockdata_t **rsock)
     112static void tcp_sock_socket(tcp_client_t *client, ipc_callid_t callid, ipc_call_t call)
    112113{
    113114        tcp_sockdata_t *sock;
    114 
    115         log_msg(LVL_DEBUG, "tcp_sock_create()");
    116         *rsock = NULL;
    117 
    118         sock = calloc(sizeof(tcp_sockdata_t), 1);
    119         if (sock == NULL)
    120                 return ENOMEM;
    121 
    122         fibril_mutex_initialize(&sock->lock);
    123         sock->client = client;
    124 
    125         sock->recv_buffer_used = 0;
    126         sock->recv_error = TCP_EOK;
    127         fibril_mutex_initialize(&sock->recv_buffer_lock);
    128         fibril_condvar_initialize(&sock->recv_buffer_cv);
    129         list_initialize(&sock->ready);
    130 
    131         *rsock = sock;
    132         return EOK;
    133 }
    134 
    135 static void tcp_sock_uncreate(tcp_sockdata_t *sock)
    136 {
    137         log_msg(LVL_DEBUG, "tcp_sock_uncreate()");
    138         free(sock);
    139 }
    140 
    141 static int tcp_sock_finish_setup(tcp_sockdata_t *sock, int *sock_id)
    142 {
    143115        socket_core_t *sock_core;
    144         int rc;
    145 
    146         log_msg(LVL_DEBUG, "tcp_sock_finish_setup()");
    147 
    148         sock->recv_fibril = fibril_create(tcp_sock_recv_fibril, sock);
    149         if (sock->recv_fibril == 0)
    150                 return ENOMEM;
    151 
    152         rc = socket_create(&sock->client->sockets, sock->client->sess,
    153             sock, sock_id);
    154 
    155         if (rc != EOK)
    156                 return rc;
    157 
    158         sock_core = socket_cores_find(&sock->client->sockets, *sock_id);
    159         assert(sock_core != NULL);
    160         sock->sock_core = sock_core;
    161 
    162         return EOK;
    163 }
    164 
    165 static void tcp_sock_socket(tcp_client_t *client, ipc_callid_t callid, ipc_call_t call)
    166 {
    167         tcp_sockdata_t *sock;
    168116        int sock_id;
    169117        int rc;
     
    171119
    172120        log_msg(LVL_DEBUG, "tcp_sock_socket()");
    173 
    174         rc = tcp_sock_create(client, &sock);
    175         if (rc != EOK) {
    176                 async_answer_0(callid, rc);
    177                 return;
    178         }
    179 
     121        sock = calloc(sizeof(tcp_sockdata_t), 1);
     122        if (sock == NULL) {
     123                async_answer_0(callid, ENOMEM);
     124                return;
     125        }
     126
     127        fibril_mutex_initialize(&sock->lock);
     128        sock->client = client;
    180129        sock->laddr.ipv4 = TCP_IPV4_ANY;
    181130        sock->lconn = NULL;
    182131        sock->backlog = 0;
     132        list_initialize(&sock->ready);
    183133
    184134        sock_id = SOCKET_GET_SOCKET_ID(call);
    185         rc = tcp_sock_finish_setup(sock, &sock_id);
     135        rc = socket_create(&client->sockets, client->sess, sock, &sock_id);
    186136        if (rc != EOK) {
    187                 tcp_sock_uncreate(sock);
    188137                async_answer_0(callid, rc);
    189138                return;
    190139        }
    191140
     141        sock_core = socket_cores_find(&client->sockets, sock_id);
     142        assert(sock_core != NULL);
     143        sock->sock_core = sock_core;
     144
    192145        SOCKET_SET_SOCKET_ID(answer, sock_id);
    193146
    194         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
     147        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
    195148        SOCKET_SET_HEADER_SIZE(answer, sizeof(tcp_header_t));
    196149       
     
    408361        }
    409362
    410         if (rc == EOK)
    411                 fibril_add_ready(socket->recv_fibril);
    412 
    413363        async_answer_0(callid, rc);
     364
     365        /* Push one fragment notification to client's queue */
     366        tcp_sock_notify_data(sock_core);
     367        log_msg(LVL_DEBUG, "tcp_sock_connect(): notify conn\n");
    414368}
    415369
     
    420374        int asock_id;
    421375        socket_core_t *sock_core;
     376        socket_core_t *asock_core;
    422377        tcp_sockdata_t *socket;
    423378        tcp_sockdata_t *asocket;
     
    489444        /* Allocate socket for accepted connection */
    490445
    491         rc = tcp_sock_create(client, &asocket);
    492         if (rc != EOK) {
    493                 fibril_mutex_unlock(&socket->lock);
    494                 async_answer_0(callid, rc);
    495                 return;
    496         }
    497 
     446        log_msg(LVL_DEBUG, "tcp_sock_accept(): allocate asocket\n");
     447        asocket = calloc(sizeof(tcp_sockdata_t), 1);
     448        if (asocket == NULL) {
     449                fibril_mutex_unlock(&socket->lock);
     450                async_answer_0(callid, ENOMEM);
     451                return;
     452        }
     453
     454        fibril_mutex_initialize(&asocket->lock);
     455        asocket->client = client;
    498456        asocket->conn = conn;
    499457        log_msg(LVL_DEBUG, "tcp_sock_accept():create asocket\n");
    500458
    501         rc = tcp_sock_finish_setup(asocket, &asock_id);
     459        rc = socket_create(&client->sockets, client->sess, asocket, &asock_id);
    502460        if (rc != EOK) {
    503                 tcp_sock_uncreate(asocket);
    504461                fibril_mutex_unlock(&socket->lock);
    505462                async_answer_0(callid, rc);
    506463                return;
    507464        }
    508 
    509         fibril_add_ready(asocket->recv_fibril);
    510 
    511465        log_msg(LVL_DEBUG, "tcp_sock_accept(): find acore\n");
    512466
    513         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
     467        asock_core = socket_cores_find(&client->sockets, asock_id);
     468        assert(asock_core != NULL);
     469
     470        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
    514471        SOCKET_SET_SOCKET_ID(answer, asock_id);
    515472        SOCKET_SET_ADDRESS_LENGTH(answer, sizeof(struct sockaddr_in));
    516473       
    517         async_answer_3(callid, asocket->sock_core->socket_id,
     474        async_answer_3(callid, asock_core->socket_id,
    518475            IPC_GET_ARG1(answer), IPC_GET_ARG2(answer),
    519476            IPC_GET_ARG3(answer));
     
    521478        /* Push one fragment notification to client's queue */
    522479        log_msg(LVL_DEBUG, "tcp_sock_accept(): notify data\n");
     480        tcp_sock_notify_data(asock_core);
    523481        fibril_mutex_unlock(&socket->lock);
    524482}
     
    534492        ipc_callid_t wcallid;
    535493        size_t length;
    536         uint8_t buffer[TCP_SOCK_FRAGMENT_SIZE];
     494        uint8_t buffer[FRAGMENT_SIZE];
    537495        tcp_error_t trc;
    538496        int rc;
     
    565523                }
    566524
    567                 if (length > TCP_SOCK_FRAGMENT_SIZE)
    568                         length = TCP_SOCK_FRAGMENT_SIZE;
     525                if (length > FRAGMENT_SIZE)
     526                        length = FRAGMENT_SIZE;
    569527
    570528                rc = async_data_write_finalize(wcallid, buffer, length);
     
    602560
    603561        IPC_SET_ARG1(answer, 0);
    604         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
     562        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
    605563        async_answer_2(callid, EOK, IPC_GET_ARG1(answer),
    606564            IPC_GET_ARG2(answer));
     
    623581        ipc_call_t answer;
    624582        ipc_callid_t rcallid;
     583        uint8_t buffer[FRAGMENT_SIZE];
    625584        size_t data_len;
     585        xflags_t xflags;
     586        tcp_error_t trc;
    626587        struct sockaddr_in addr;
    627588        tcp_sock_t *rsock;
     
    650611        (void)flags;
    651612
    652         log_msg(LVL_DEBUG, "tcp_sock_recvfrom(): lock recv_buffer_lock");
    653         fibril_mutex_lock(&socket->recv_buffer_lock);
    654         while (socket->recv_buffer_used == 0 && socket->recv_error == TCP_EOK) {
    655                 log_msg(LVL_DEBUG, "wait for recv_buffer_cv + recv_buffer_used != 0");
    656                 fibril_condvar_wait(&socket->recv_buffer_cv,
    657                     &socket->recv_buffer_lock);
    658         }
    659 
    660         log_msg(LVL_DEBUG, "Got data in sock recv_buffer");
    661 
    662         data_len = socket->recv_buffer_used;
    663         rc = socket->recv_error;
    664 
    665         switch (socket->recv_error) {
     613        trc = tcp_uc_receive(socket->conn, buffer, FRAGMENT_SIZE, &data_len,
     614            &xflags);
     615        log_msg(LVL_DEBUG, "**** tcp_uc_receive done");
     616
     617        switch (trc) {
    666618        case TCP_EOK:
    667619                rc = EOK;
     
    678630        }
    679631
    680         log_msg(LVL_DEBUG, "**** recv result -> %d", rc);
     632        log_msg(LVL_DEBUG, "**** tcp_uc_receive -> %d", rc);
    681633        if (rc != EOK) {
    682                 fibril_mutex_unlock(&socket->recv_buffer_lock);
    683634                fibril_mutex_unlock(&socket->lock);
    684635                async_answer_0(callid, rc);
     
    695646                log_msg(LVL_DEBUG, "addr read receive");
    696647                if (!async_data_read_receive(&rcallid, &addr_length)) {
    697                         fibril_mutex_unlock(&socket->recv_buffer_lock);
    698648                        fibril_mutex_unlock(&socket->lock);
    699649                        async_answer_0(callid, EINVAL);
     
    707657                rc = async_data_read_finalize(rcallid, &addr, addr_length);
    708658                if (rc != EOK) {
    709                         fibril_mutex_unlock(&socket->recv_buffer_lock);
    710659                        fibril_mutex_unlock(&socket->lock);
    711660                        async_answer_0(callid, EINVAL);
     
    716665        log_msg(LVL_DEBUG, "data read receive");
    717666        if (!async_data_read_receive(&rcallid, &length)) {
    718                 fibril_mutex_unlock(&socket->recv_buffer_lock);
    719667                fibril_mutex_unlock(&socket->lock);
    720668                async_answer_0(callid, EINVAL);
     
    726674
    727675        log_msg(LVL_DEBUG, "data read finalize");
    728         rc = async_data_read_finalize(rcallid, socket->recv_buffer, length);
    729 
    730         socket->recv_buffer_used -= length;
    731         log_msg(LVL_DEBUG, "tcp_sock_recvfrom: %zu left in buffer",
    732             socket->recv_buffer_used);
    733         if (socket->recv_buffer_used > 0) {
    734                 memmove(socket->recv_buffer, socket->recv_buffer + length,
    735                     socket->recv_buffer_used);
    736                 tcp_sock_notify_data(socket->sock_core);
    737         }
    738 
    739         fibril_condvar_broadcast(&socket->recv_buffer_cv);
     676        rc = async_data_read_finalize(rcallid, buffer, length);
    740677
    741678        if (length < data_len && rc == EOK)
     
    744681        SOCKET_SET_READ_DATA_LENGTH(answer, length);
    745682        async_answer_1(callid, EOK, IPC_GET_ARG1(answer));
    746 
    747         fibril_mutex_unlock(&socket->recv_buffer_lock);
     683       
     684        /* Push one fragment notification to client's queue */
     685        tcp_sock_notify_data(sock_core);
    748686        fibril_mutex_unlock(&socket->lock);
    749687}
     
    756694        tcp_error_t trc;
    757695        int rc;
     696        uint8_t buffer[FRAGMENT_SIZE];
     697        size_t data_len;
     698        xflags_t xflags;
    758699
    759700        log_msg(LVL_DEBUG, "tcp_sock_close()");
     
    776717                        return;
    777718                }
     719
     720                /* Drain incoming data. This should really be done in the background. */
     721                do {
     722                        trc = tcp_uc_receive(socket->conn, buffer,
     723                            FRAGMENT_SIZE, &data_len, &xflags);
     724                } while (trc == TCP_EOK);
     725
     726                tcp_uc_delete(socket->conn);
    778727        }
    779728
     
    827776        tcp_sock_notify_aconn(socket->sock_core);
    828777        fibril_mutex_unlock(&socket->lock);
    829 }
    830 
    831 static int tcp_sock_recv_fibril(void *arg)
    832 {
    833         tcp_sockdata_t *sock = (tcp_sockdata_t *)arg;
    834         size_t data_len;
    835         xflags_t xflags;
    836         tcp_error_t trc;
    837 
    838         log_msg(LVL_DEBUG, "tcp_sock_recv_fibril()");
    839 
    840         while (true) {
    841                 log_msg(LVL_DEBUG, "call tcp_uc_receive()");
    842                 fibril_mutex_lock(&sock->recv_buffer_lock);
    843                 while (sock->recv_buffer_used != 0)
    844                         fibril_condvar_wait(&sock->recv_buffer_cv,
    845                             &sock->recv_buffer_lock);
    846 
    847                 trc = tcp_uc_receive(sock->conn, sock->recv_buffer,
    848                     TCP_SOCK_FRAGMENT_SIZE, &data_len, &xflags);
    849 
    850                 if (trc != TCP_EOK) {
    851                         sock->recv_error = trc;
    852                         fibril_condvar_broadcast(&sock->recv_buffer_cv);
    853                         fibril_mutex_unlock(&sock->recv_buffer_lock);
    854                         tcp_sock_notify_data(sock->sock_core);
    855                         break;
    856                 }
    857 
    858                 log_msg(LVL_DEBUG, "got data - broadcast recv_buffer_cv");
    859 
    860                 sock->recv_buffer_used = data_len;
    861                 fibril_condvar_broadcast(&sock->recv_buffer_cv);
    862                 fibril_mutex_unlock(&sock->recv_buffer_lock);
    863                 tcp_sock_notify_data(sock->sock_core);
    864         }
    865 
    866         tcp_uc_delete(sock->conn);
    867 
    868         return 0;
    869778}
    870779
Note: See TracChangeset for help on using the changeset viewer.